use { super::{ define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType, }, byteorder::{BigEndian, LittleEndian}, bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, std::{collections::HashMap, sync::Arc}, tokio::sync::Mutex, }; #[derive(Eq, PartialEq, Debug)] pub enum PackResult { Success, NotEnoughBytes, } pub struct ChunkPacketizer { csid_2_chunk_header: HashMap, //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html //https://zhuanlan.zhihu.com/p/165976086 //chunk_info: ChunkInfo, max_chunk_size: usize, //bytes: Cursor>, writer: AsyncBytesWriter, //save extended timestamp need to be write for chunk extended_timestamp: Option, } impl ChunkPacketizer { pub fn new(io: Arc>>) -> Self { Self { csid_2_chunk_header: HashMap::new(), writer: AsyncBytesWriter::new(io), max_chunk_size: CHUNK_SIZE as usize, extended_timestamp: None, } } fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result { chunk_info.basic_header.format = 0; if let Some(pre_header) = self .csid_2_chunk_header .get_mut(&chunk_info.basic_header.chunk_stream_id) { let cur_msg_header = &mut chunk_info.message_header; let pre_msg_header = &mut pre_header.message_header; if cur_msg_header.timestamp < pre_msg_header.timestamp { log::warn!( "Chunk stream id: {}, the current timestamp:{} is smaller than pre chunk timestamp: {}", chunk_info.basic_header.chunk_stream_id, cur_msg_header.timestamp, pre_msg_header.timestamp ); } else if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id { chunk_info.basic_header.format = 1; cur_msg_header.timestamp_delta = cur_msg_header.timestamp - pre_msg_header.timestamp; if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id && cur_msg_header.msg_length == pre_msg_header.msg_length { chunk_info.basic_header.format = 2; if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta { chunk_info.basic_header.format = 3; } } } } else { assert_eq!(chunk_info.message_header.timestamp_delta, 0); } //update pre header self.csid_2_chunk_header.insert( chunk_info.basic_header.chunk_stream_id, ChunkHeader { basic_header: chunk_info.basic_header.clone(), message_header: chunk_info.message_header.clone(), }, ); Ok(PackResult::Success) } fn write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError> { if csid >= 64 + 255 { self.writer.write_u8(fmt << 6 | 1)?; self.writer.write_u16::((csid - 64) as u16)?; } else if csid >= 64 { self.writer.write_u8(fmt << 6)?; self.writer.write_u8((csid - 64) as u8)?; } else { self.writer.write_u8(fmt << 6 | csid as u8)?; } Ok(()) } fn write_message_header( &mut self, basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError> { let message_header_timestamp: u32; (self.extended_timestamp, message_header_timestamp) = match basic_header.format { 0 => { if message_header.timestamp >= 0xFFFFFF { message_header.extended_timestamp_type = ExtendTimestampType::FORMAT0; (Some(message_header.timestamp), 0xFFFFFF) } else { (None, message_header.timestamp) } } 1 | 2 => { if message_header.timestamp_delta >= 0xFFFFFF { //if use the format1,2's extended timestamp, there may be a problem for //av timestamp. log::warn!( "Now use extended timestamp for format {}, the value is: {}", basic_header.format, message_header.timestamp_delta ); message_header.extended_timestamp_type = ExtendTimestampType::FORMAT12; (Some(message_header.timestamp_delta), 0xFFFFFF) } else { (None, message_header.timestamp_delta) } } _ => { //should not be here (None, 0) } }; match basic_header.format { 0 => { self.writer .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; self.writer .write_u32::(message_header.msg_streamd_id)?; } 1 => { self.writer .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; } 2 => { self.writer .write_u24::(message_header_timestamp)?; } _ => {} } Ok(()) } fn write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError> { self.writer.write_u32::(timestamp)?; Ok(()) } pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> { self.zip_chunk_header(chunk_info)?; log::trace!( "write_chunk current timestamp: {}", chunk_info.message_header.timestamp, ); let mut whole_payload_size = chunk_info.payload.len(); self.write_basic_header( chunk_info.basic_header.format, chunk_info.basic_header.chunk_stream_id, )?; self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; if let Some(extended_timestamp) = self.extended_timestamp { self.write_extened_timestamp(extended_timestamp)?; } let mut cur_payload_size: usize; while whole_payload_size > 0 { cur_payload_size = if whole_payload_size > self.max_chunk_size { self.max_chunk_size } else { whole_payload_size }; let payload_bytes = chunk_info.payload.split_to(cur_payload_size); self.writer.write(&payload_bytes[0..])?; whole_payload_size -= cur_payload_size; if whole_payload_size > 0 { self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; if let Some(extended_timestamp) = self.extended_timestamp { self.write_extened_timestamp(extended_timestamp)?; } } } self.writer.flush().await?; Ok(()) } }