1 use { 2 super::{ 3 define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo, 4 ChunkMessageHeader, ExtendTimestampType, 5 }, 6 byteorder::{BigEndian, LittleEndian}, 7 bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, 8 std::{collections::HashMap, sync::Arc}, 9 tokio::sync::Mutex, 10 }; 11 12 #[derive(Eq, PartialEq, Debug)] 13 pub enum PackResult { 14 Success, 15 NotEnoughBytes, 16 } 17 18 pub struct ChunkPacketizer { 19 csid_2_chunk_header: HashMap<u32, ChunkHeader>, 20 //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html 21 //https://zhuanlan.zhihu.com/p/165976086 22 //chunk_info: ChunkInfo, 23 max_chunk_size: usize, 24 //bytes: Cursor<Vec<u8>>, 25 writer: AsyncBytesWriter, 26 //save extended timestamp need to be write for chunk 27 extended_timestamp: Option<u32>, 28 } 29 30 impl ChunkPacketizer { new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self31 pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self { 32 Self { 33 csid_2_chunk_header: HashMap::new(), 34 writer: AsyncBytesWriter::new(io), 35 max_chunk_size: CHUNK_SIZE as usize, 36 extended_timestamp: None, 37 } 38 } zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError>39 fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError> { 40 chunk_info.basic_header.format = 0; 41 42 if let Some(pre_header) = self 43 .csid_2_chunk_header 44 .get_mut(&chunk_info.basic_header.chunk_stream_id) 45 { 46 let cur_msg_header = &mut chunk_info.message_header; 47 let pre_msg_header = &mut pre_header.message_header; 48 49 if cur_msg_header.timestamp < pre_msg_header.timestamp { 50 log::warn!( 51 "Chunk stream id: {}, the current timestamp:{} is smaller than pre chunk timestamp: {}", 52 chunk_info.basic_header.chunk_stream_id, 53 cur_msg_header.timestamp, 54 pre_msg_header.timestamp 55 ); 56 } else if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { 57 chunk_info.basic_header.format = 1; 58 cur_msg_header.timestamp_delta = 59 cur_msg_header.timestamp - pre_msg_header.timestamp; 60 61 if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id 62 && cur_msg_header.msg_length == pre_msg_header.msg_length 63 { 64 chunk_info.basic_header.format = 2; 65 if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta { 66 chunk_info.basic_header.format = 3; 67 } 68 } 69 } 70 } else { 71 assert_eq!(chunk_info.message_header.timestamp_delta, 0); 72 } 73 74 //update pre header 75 self.csid_2_chunk_header.insert( 76 chunk_info.basic_header.chunk_stream_id, 77 ChunkHeader { 78 basic_header: chunk_info.basic_header.clone(), 79 message_header: chunk_info.message_header.clone(), 80 }, 81 ); 82 83 Ok(PackResult::Success) 84 } 85 write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError>86 fn write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError> { 87 if csid >= 64 + 255 { 88 self.writer.write_u8(fmt << 6 | 1)?; 89 self.writer.write_u16::<BigEndian>((csid - 64) as u16)?; 90 } else if csid >= 64 { 91 self.writer.write_u8(fmt << 6)?; 92 self.writer.write_u8((csid - 64) as u8)?; 93 } else { 94 self.writer.write_u8(fmt << 6 | csid as u8)?; 95 } 96 97 Ok(()) 98 } 99 write_message_header( &mut self, basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError>100 fn write_message_header( 101 &mut self, 102 basic_header: &ChunkBasicHeader, 103 message_header: &mut ChunkMessageHeader, 104 ) -> Result<(), PackError> { 105 let message_header_timestamp: u32; 106 (self.extended_timestamp, message_header_timestamp) = match basic_header.format { 107 0 => { 108 if message_header.timestamp >= 0xFFFFFF { 109 message_header.extended_timestamp_type = ExtendTimestampType::FORMAT0; 110 (Some(message_header.timestamp), 0xFFFFFF) 111 } else { 112 (None, message_header.timestamp) 113 } 114 } 115 1 | 2 => { 116 if message_header.timestamp_delta >= 0xFFFFFF { 117 //if use the format1,2's extended timestamp, there may be a problem for 118 //av timestamp. 119 log::warn!( 120 "Now use extended timestamp for format {}, the value is: {}", 121 basic_header.format, 122 message_header.timestamp_delta 123 ); 124 message_header.extended_timestamp_type = ExtendTimestampType::FORMAT12; 125 (Some(message_header.timestamp_delta), 0xFFFFFF) 126 } else { 127 (None, message_header.timestamp_delta) 128 } 129 } 130 _ => { 131 //should not be here 132 (None, 0) 133 } 134 }; 135 136 match basic_header.format { 137 0 => { 138 self.writer 139 .write_u24::<BigEndian>(message_header_timestamp)?; 140 self.writer 141 .write_u24::<BigEndian>(message_header.msg_length)?; 142 self.writer.write_u8(message_header.msg_type_id)?; 143 self.writer 144 .write_u32::<LittleEndian>(message_header.msg_streamd_id)?; 145 } 146 1 => { 147 self.writer 148 .write_u24::<BigEndian>(message_header_timestamp)?; 149 self.writer 150 .write_u24::<BigEndian>(message_header.msg_length)?; 151 self.writer.write_u8(message_header.msg_type_id)?; 152 } 153 2 => { 154 self.writer 155 .write_u24::<BigEndian>(message_header_timestamp)?; 156 } 157 _ => {} 158 } 159 160 Ok(()) 161 } 162 write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError>163 fn write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError> { 164 self.writer.write_u32::<BigEndian>(timestamp)?; 165 166 Ok(()) 167 } 168 write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError>169 pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> { 170 self.zip_chunk_header(chunk_info)?; 171 172 log::trace!( 173 "write_chunk current timestamp: {}", 174 chunk_info.message_header.timestamp, 175 ); 176 177 let mut whole_payload_size = chunk_info.payload.len(); 178 179 self.write_basic_header( 180 chunk_info.basic_header.format, 181 chunk_info.basic_header.chunk_stream_id, 182 )?; 183 184 self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; 185 186 if let Some(extended_timestamp) = self.extended_timestamp { 187 self.write_extened_timestamp(extended_timestamp)?; 188 } 189 190 let mut cur_payload_size: usize; 191 while whole_payload_size > 0 { 192 cur_payload_size = if whole_payload_size > self.max_chunk_size { 193 self.max_chunk_size 194 } else { 195 whole_payload_size 196 }; 197 198 let payload_bytes = chunk_info.payload.split_to(cur_payload_size); 199 self.writer.write(&payload_bytes[0..])?; 200 201 whole_payload_size -= cur_payload_size; 202 203 if whole_payload_size > 0 { 204 self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; 205 206 if let Some(extended_timestamp) = self.extended_timestamp { 207 self.write_extened_timestamp(extended_timestamp)?; 208 } 209 } 210 } 211 self.writer.flush().await?; 212 213 Ok(()) 214 } 215 } 216