xref: /xiu/protocol/rtmp/src/chunk/packetizer.rs (revision 69de9bbd)
1 use {
2     super::{
3         define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo,
4         ChunkMessageHeader, ExtendTimestampType,
5     },
6     byteorder::{BigEndian, LittleEndian},
7     bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO},
8     std::{collections::HashMap, sync::Arc},
9     tokio::sync::Mutex,
10 };
11 
12 #[derive(Eq, PartialEq, Debug)]
13 pub enum PackResult {
14     Success,
15     NotEnoughBytes,
16 }
17 
18 pub struct ChunkPacketizer {
19     csid_2_chunk_header: HashMap<u32, ChunkHeader>,
20     //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
21     //https://zhuanlan.zhihu.com/p/165976086
22     //chunk_info: ChunkInfo,
23     max_chunk_size: usize,
24     //bytes: Cursor<Vec<u8>>,
25     writer: AsyncBytesWriter,
26     //save extended timestamp need to be write for chunk
27     extended_timestamp: Option<u32>,
28 }
29 
30 impl ChunkPacketizer {
new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self31     pub fn new(io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>) -> Self {
32         Self {
33             csid_2_chunk_header: HashMap::new(),
34             writer: AsyncBytesWriter::new(io),
35             max_chunk_size: CHUNK_SIZE as usize,
36             extended_timestamp: None,
37         }
38     }
zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError>39     fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result<PackResult, PackError> {
40         chunk_info.basic_header.format = 0;
41 
42         if let Some(pre_header) = self
43             .csid_2_chunk_header
44             .get_mut(&chunk_info.basic_header.chunk_stream_id)
45         {
46             let cur_msg_header = &mut chunk_info.message_header;
47             let pre_msg_header = &mut pre_header.message_header;
48 
49             if cur_msg_header.timestamp < pre_msg_header.timestamp {
50                 log::warn!(
51                     "Chunk stream id: {}, the current timestamp:{}  is smaller than pre chunk timestamp: {}",
52                     chunk_info.basic_header.chunk_stream_id,
53                     cur_msg_header.timestamp,
54                     pre_msg_header.timestamp
55                 );
56             } else if cur_msg_header.msg_streamd_id == pre_msg_header.msg_streamd_id {
57                 chunk_info.basic_header.format = 1;
58                 cur_msg_header.timestamp_delta =
59                     cur_msg_header.timestamp - pre_msg_header.timestamp;
60 
61                 if cur_msg_header.msg_type_id == pre_msg_header.msg_type_id
62                     && cur_msg_header.msg_length == pre_msg_header.msg_length
63                 {
64                     chunk_info.basic_header.format = 2;
65                     if cur_msg_header.timestamp_delta == pre_msg_header.timestamp_delta {
66                         chunk_info.basic_header.format = 3;
67                     }
68                 }
69             }
70         } else {
71             assert_eq!(chunk_info.message_header.timestamp_delta, 0);
72         }
73 
74         //update pre header
75         self.csid_2_chunk_header.insert(
76             chunk_info.basic_header.chunk_stream_id,
77             ChunkHeader {
78                 basic_header: chunk_info.basic_header.clone(),
79                 message_header: chunk_info.message_header.clone(),
80             },
81         );
82 
83         Ok(PackResult::Success)
84     }
85 
write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError>86     fn write_basic_header(&mut self, fmt: u8, csid: u32) -> Result<(), PackError> {
87         if csid >= 64 + 255 {
88             self.writer.write_u8(fmt << 6 | 1)?;
89             self.writer.write_u16::<BigEndian>((csid - 64) as u16)?;
90         } else if csid >= 64 {
91             self.writer.write_u8(fmt << 6)?;
92             self.writer.write_u8((csid - 64) as u8)?;
93         } else {
94             self.writer.write_u8(fmt << 6 | csid as u8)?;
95         }
96 
97         Ok(())
98     }
99 
write_message_header( &mut self, basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError>100     fn write_message_header(
101         &mut self,
102         basic_header: &ChunkBasicHeader,
103         message_header: &mut ChunkMessageHeader,
104     ) -> Result<(), PackError> {
105         let message_header_timestamp: u32;
106         (self.extended_timestamp, message_header_timestamp) = match basic_header.format {
107             0 => {
108                 if message_header.timestamp >= 0xFFFFFF {
109                     message_header.extended_timestamp_type = ExtendTimestampType::FORMAT0;
110                     (Some(message_header.timestamp), 0xFFFFFF)
111                 } else {
112                     (None, message_header.timestamp)
113                 }
114             }
115             1 | 2 => {
116                 if message_header.timestamp_delta >= 0xFFFFFF {
117                     //if use the format1,2's extended timestamp, there may be a problem for
118                     //av timestamp.
119                     log::warn!(
120                         "Now use extended timestamp for format {}, the value is: {}",
121                         basic_header.format,
122                         message_header.timestamp_delta
123                     );
124                     message_header.extended_timestamp_type = ExtendTimestampType::FORMAT12;
125                     (Some(message_header.timestamp_delta), 0xFFFFFF)
126                 } else {
127                     (None, message_header.timestamp_delta)
128                 }
129             }
130             _ => {
131                 //should not be here
132                 (None, 0)
133             }
134         };
135 
136         match basic_header.format {
137             0 => {
138                 self.writer
139                     .write_u24::<BigEndian>(message_header_timestamp)?;
140                 self.writer
141                     .write_u24::<BigEndian>(message_header.msg_length)?;
142                 self.writer.write_u8(message_header.msg_type_id)?;
143                 self.writer
144                     .write_u32::<LittleEndian>(message_header.msg_streamd_id)?;
145             }
146             1 => {
147                 self.writer
148                     .write_u24::<BigEndian>(message_header_timestamp)?;
149                 self.writer
150                     .write_u24::<BigEndian>(message_header.msg_length)?;
151                 self.writer.write_u8(message_header.msg_type_id)?;
152             }
153             2 => {
154                 self.writer
155                     .write_u24::<BigEndian>(message_header_timestamp)?;
156             }
157             _ => {}
158         }
159 
160         Ok(())
161     }
162 
write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError>163     fn write_extened_timestamp(&mut self, timestamp: u32) -> Result<(), PackError> {
164         self.writer.write_u32::<BigEndian>(timestamp)?;
165 
166         Ok(())
167     }
168 
write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError>169     pub async fn write_chunk(&mut self, chunk_info: &mut ChunkInfo) -> Result<(), PackError> {
170         self.zip_chunk_header(chunk_info)?;
171 
172         log::trace!(
173             "write_chunk  current timestamp: {}",
174             chunk_info.message_header.timestamp,
175         );
176 
177         let mut whole_payload_size = chunk_info.payload.len();
178 
179         self.write_basic_header(
180             chunk_info.basic_header.format,
181             chunk_info.basic_header.chunk_stream_id,
182         )?;
183 
184         self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?;
185 
186         if let Some(extended_timestamp) = self.extended_timestamp {
187             self.write_extened_timestamp(extended_timestamp)?;
188         }
189 
190         let mut cur_payload_size: usize;
191         while whole_payload_size > 0 {
192             cur_payload_size = if whole_payload_size > self.max_chunk_size {
193                 self.max_chunk_size
194             } else {
195                 whole_payload_size
196             };
197 
198             let payload_bytes = chunk_info.payload.split_to(cur_payload_size);
199             self.writer.write(&payload_bytes[0..])?;
200 
201             whole_payload_size -= cur_payload_size;
202 
203             if whole_payload_size > 0 {
204                 self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?;
205 
206                 if let Some(extended_timestamp) = self.extended_timestamp {
207                     self.write_extened_timestamp(extended_timestamp)?;
208                 }
209             }
210         }
211         self.writer.flush().await?;
212 
213         Ok(())
214     }
215 }
216