1 use { 2 super::{ 3 define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo, 4 ChunkMessageHeader, 5 }, 6 byteorder::{BigEndian, LittleEndian}, 7 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, 8 std::{collections::HashMap, sync::Arc}, 9 tokio::sync::Mutex, 10 }; 11 12 #[derive(Eq, PartialEq, Debug)] 13 pub enum PackResult { 14 Success, 15 NotEnoughBytes, 16 } 17 18 pub struct ChunkPacketizer { 19 csid_2_chunk_header: HashMap<u32, ChunkHeader>, 20 //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html 21 //https://zhuanlan.zhihu.com/p/165976086 22 //chunk_info: ChunkInfo, 23 max_chunk_size: usize, 24 //bytes: Cursor<Vec<u8>>, 25 writer: AsyncBytesWriter, 26 } 27 28 impl ChunkPacketizer { 29 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 30 Self { 31 csid_2_chunk_header: HashMap::new(), 32 //chunk_info: ChunkInfo::new(), 33 writer: AsyncBytesWriter::new(io), 34 max_chunk_size: CHUNK_SIZE as usize, 35 } 36 } 37 fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError> { 38 chunk_info.basic_header.format = 0; 39 40 let pre_header = self 41 .csid_2_chunk_header 42 .get_mut(&chunk_info.basic_header.chunk_stream_id); 43 44 if let Some(val) = pre_header { 45 let cur_msg_header = &mut chunk_info.message_header; 46 let pre_msg_header = &val.message_header; 47 48 if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { 49 chunk_info.basic_header.format = 1; 50 cur_msg_header.timestamp -= pre_msg_header.timestamp; 51 52 if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id 53 && cur_msg_header.msg_length == pre_msg_header.msg_length 54 { 55 chunk_info.basic_header.format = 2; 56 if chunk_info.message_header.timestamp == pre_msg_header.timestamp { 57 chunk_info.basic_header.format = 3; 58 } 59 } 60 } 61 } 62 Ok(PackResult::Success) 63 } 64 65 fn write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError> { 66 if csid >= 64 + 255 { 67 self.writer.write_u8(fmt << 6 | 1)?; 68 self.writer.write_u16::<BigEndian>((csid - 64) as u16)?; 69 } else if csid >= 64 { 70 self.writer.write_u8(fmt << 6)?; 71 self.writer.write_u8((csid - 64) as u8)?; 72 } else { 73 self.writer.write_u8(fmt << 6 | csid as u8)?; 74 } 75 76 Ok(()) 77 } 78 79 fn write_message_header( 80 &mut self, 81 basic_header: &ChunkBasicHeader, 82 message_header: &mut ChunkMessageHeader, 83 ) -> Result<(), PackError> { 84 let timestamp = if message_header.timestamp >= 0xFFFFFF { 85 message_header.is_extended_timestamp = true; 86 0xFFFFFF 87 } else { 88 message_header.timestamp 89 }; 90 91 match basic_header.format { 92 0 => { 93 self.writer.write_u24::<BigEndian>(timestamp)?; 94 self.writer 95 .write_u24::<BigEndian>(message_header.msg_length)?; 96 self.writer.write_u8(message_header.msg_type_id)?; 97 self.writer 98 .write_u32::<LittleEndian>(message_header.msg_streamd_id)?; 99 } 100 1 => { 101 self.writer.write_u24::<BigEndian>(timestamp)?; 102 self.writer 103 .write_u24::<BigEndian>(message_header.msg_length)?; 104 } 105 2 => { 106 self.writer.write_u24::<BigEndian>(timestamp)?; 107 } 108 _ => {} 109 } 110 111 Ok(()) 112 } 113 114 fn write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError> { 115 self.writer.write_u32::<BigEndian>(timestamp)?; 116 117 Ok(()) 118 } 119 120 pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> { 121 self.zip_chunk_header(chunk_info)?; 122 123 let mut whole_payload_size = chunk_info.payload.len(); 124 125 self.write_basic_header( 126 chunk_info.basic_header.format, 127 chunk_info.basic_header.chunk_stream_id, 128 )?; 129 self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; 130 131 if chunk_info.message_header.is_extended_timestamp { 132 self.write_extened_timestamp(chunk_info.message_header.timestamp)?; 133 } 134 135 let mut cur_payload_size: usize; 136 137 while whole_payload_size > 0 { 138 cur_payload_size = if whole_payload_size > self.max_chunk_size { 139 self.max_chunk_size 140 } else { 141 whole_payload_size 142 }; 143 144 let payload_bytes = chunk_info.payload.split_to(cur_payload_size); 145 self.writer.write(&payload_bytes[0..])?; 146 147 whole_payload_size -= cur_payload_size; 148 149 if whole_payload_size > 0 { 150 self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; 151 if chunk_info.message_header.is_extended_timestamp { 152 self.write_extened_timestamp(chunk_info.message_header.timestamp)?; 153 } 154 } 155 } 156 self.writer.flush().await?; 157 158 Ok(()) 159 } 160 } 161