xref: /xiu/protocol/rtmp/src/chunk/unpacketizer.rs (revision 74ad8a01)
1 use {
2     super::{
3         define,
4         errors::{UnpackError, UnpackErrorValue},
5         ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType,
6     },
7     crate::messages::define::msg_type_id,
8     byteorder::{BigEndian, LittleEndian},
9     bytes::{BufMut, BytesMut},
10     bytesio::bytes_reader::BytesReader,
11     chrono::prelude::*,
12     std::{cmp::min, collections::HashMap, fmt, vec::Vec},
13 };
14 
15 const PARSE_ERROR_NUMVER: usize = 5;
16 
17 #[derive(Eq, PartialEq, Debug)]
18 pub enum UnpackResult {
19     ChunkBasicHeaderResult(ChunkBasicHeader),
20     ChunkMessageHeaderResult(ChunkMessageHeader),
21     ChunkInfo(ChunkInfo),
22     Chunks(Vec<ChunkInfo>),
23     Success,
24     NotEnoughBytes,
25     Empty,
26 }
27 
28 #[derive(Copy, Clone, Debug)]
29 enum ChunkReadState {
30     ReadBasicHeader = 1,
31     ReadMessageHeader = 2,
32     ReadExtendedTimestamp = 3,
33     ReadMessagePayload = 4,
34     Finish = 5,
35 }
36 
37 impl fmt::Display for ChunkReadState {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result38     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39         match self {
40             ChunkReadState::ReadBasicHeader => {
41                 write!(f, "ReadBasicHeader",)
42             }
43             ChunkReadState::ReadMessageHeader => {
44                 write!(f, "ReadMessageHeader",)
45             }
46             ChunkReadState::ReadExtendedTimestamp => {
47                 write!(f, "ReadExtendedTimestamp",)
48             }
49             ChunkReadState::ReadMessagePayload => {
50                 write!(f, "ReadMessagePayload",)
51             }
52             ChunkReadState::Finish => {
53                 write!(f, "Finish",)
54             }
55         }
56     }
57 }
58 
59 #[derive(Copy, Clone, Debug)]
60 enum MessageHeaderReadState {
61     ReadTimeStamp = 1,
62     ReadMsgLength = 2,
63     ReadMsgTypeID = 3,
64     ReadMsgStreamID = 4,
65 }
66 
67 pub struct ChunkUnpacketizer {
68     pub reader: BytesReader,
69 
70     //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html
71     //https://zhuanlan.zhihu.com/p/165976086
72     //We use this member to generate a complete message:
73     // - basic_header:   the 2 fields will be updated from each chunk.
74     // - message_header: whose fields need to be updated for current chunk
75     //                   depends on the format id from basic header.
76     //                   Each field can inherit the value from the previous chunk.
77     // - payload:        If the message's payload size is longger than the max chunk size,
78     //                   the whole payload will be splitted into several chunks.
79     //
80     pub current_chunk_info: ChunkInfo,
81     chunk_message_headers: HashMap<u32, ChunkMessageHeader>,
82     chunk_read_state: ChunkReadState,
83     msg_header_read_state: MessageHeaderReadState,
84     max_chunk_size: usize,
85     chunk_index: u32,
86     pub session_type: u8,
87     parse_error_number: usize,
88 }
89 
90 impl Default for ChunkUnpacketizer {
default() -> Self91     fn default() -> Self {
92         Self::new()
93     }
94 }
95 
96 impl ChunkUnpacketizer {
new() -> Self97     pub fn new() -> Self {
98         Self {
99             reader: BytesReader::new(BytesMut::new()),
100             current_chunk_info: ChunkInfo::default(),
101             chunk_message_headers: HashMap::new(),
102             chunk_read_state: ChunkReadState::ReadBasicHeader,
103             msg_header_read_state: MessageHeaderReadState::ReadTimeStamp,
104             max_chunk_size: define::INIT_CHUNK_SIZE as usize,
105             chunk_index: 0,
106             session_type: 0,
107             parse_error_number: 0,
108         }
109     }
110 
extend_data(&mut self, data: &[u8])111     pub fn extend_data(&mut self, data: &[u8]) {
112         self.reader.extend_from_slice(data);
113 
114         log::trace!(
115             "extend_data length: {}: content:{:X?}",
116             self.reader.len(),
117             self.reader
118                 .get_remaining_bytes()
119                 .split_to(self.reader.len())
120                 .to_vec()
121         );
122     }
123 
update_max_chunk_size(&mut self, chunk_size: usize)124     pub fn update_max_chunk_size(&mut self, chunk_size: usize) {
125         log::trace!("update max chunk size: {}", chunk_size);
126         self.max_chunk_size = chunk_size;
127     }
128 
read_chunks(&mut self) -> Result<UnpackResult, UnpackError>129     pub fn read_chunks(&mut self) -> Result<UnpackResult, UnpackError> {
130         log::trace!(
131             "read chunks begin, current time: {}, and read state: {}",
132             Local::now().timestamp_nanos(),
133             self.chunk_read_state
134         );
135 
136         // log::trace!(
137         //     "read chunks, reader remaining data: {}",
138         //     self.reader.get_remaining_bytes()
139         // );
140 
141         let mut chunks: Vec<ChunkInfo> = Vec::new();
142 
143         loop {
144             match self.read_chunk() {
145                 Ok(chunk) => {
146                     match chunk {
147                         UnpackResult::ChunkInfo(chunk_info) => {
148                             let msg_type_id = chunk_info.message_header.msg_type_id;
149                             chunks.push(chunk_info);
150 
151                             //if the chunk_size is changed, then break and update chunk_size
152                             if msg_type_id == msg_type_id::SET_CHUNK_SIZE {
153                                 break;
154                             }
155                         }
156                         _ => continue,
157                     }
158                 }
159                 Err(err) => {
160                     if let UnpackErrorValue::CannotParse = err.value {
161                         return Err(err);
162                     }
163                     break;
164                 }
165             }
166         }
167 
168         log::trace!(
169             "read chunks end, current time: {}, and read state: {}",
170             Local::now().timestamp_nanos(),
171             self.chunk_read_state
172         );
173 
174         if !chunks.is_empty() {
175             Ok(UnpackResult::Chunks(chunks))
176         } else {
177             Err(UnpackError {
178                 value: UnpackErrorValue::EmptyChunks,
179             })
180         }
181     }
182 
183     /******************************************************************************
184      * 5.3.1 Chunk Format
185      * Each chunk consists of a header and data. The header itself has three parts:
186      * +--------------+----------------+--------------------+--------------+
187      * | Basic Header | Message Header | Extended Timestamp | Chunk Data |
188      * +--------------+----------------+--------------------+--------------+
189      * |<------------------- Chunk Header ----------------->|
190      ******************************************************************************/
read_chunk(&mut self) -> Result<UnpackResult, UnpackError>191     pub fn read_chunk(&mut self) -> Result<UnpackResult, UnpackError> {
192         let mut result: UnpackResult = UnpackResult::Empty;
193 
194         log::trace!(
195             "read chunk begin, current time: {}, and read state: {}, and chunk index: {}",
196             Local::now().timestamp_nanos(),
197             self.chunk_read_state,
198             self.chunk_index,
199         );
200 
201         self.chunk_index += 1;
202 
203         loop {
204             result = match self.chunk_read_state {
205                 ChunkReadState::ReadBasicHeader => self.read_basic_header()?,
206                 ChunkReadState::ReadMessageHeader => self.read_message_header()?,
207                 ChunkReadState::ReadExtendedTimestamp => self.read_extended_timestamp()?,
208                 ChunkReadState::ReadMessagePayload => self.read_message_payload()?,
209                 ChunkReadState::Finish => {
210                     self.chunk_read_state = ChunkReadState::ReadBasicHeader;
211                     break;
212                 }
213             };
214         }
215 
216         log::trace!(
217             "read chunk end, current time: {}, and read state: {}, and chunk index: {}",
218             Local::now().timestamp_nanos(),
219             self.chunk_read_state,
220             self.chunk_index,
221         );
222         Ok(result)
223 
224         // Ok(UnpackResult::Success)
225     }
226 
print_current_basic_header(&mut self)227     pub fn print_current_basic_header(&mut self) {
228         log::trace!(
229             "print_current_basic_header, csid: {},format id: {}",
230             self.current_chunk_info.basic_header.chunk_stream_id,
231             self.current_chunk_info.basic_header.format
232         );
233     }
234 
235     /******************************************************************
236      * 5.3.1.1. Chunk Basic Header
237      * The Chunk Basic Header encodes the chunk stream ID and the chunk
238      * type(represented by fmt field in the figure below). Chunk type
239      * determines the format of the encoded message header. Chunk Basic
240      * Header field may be 1, 2, or 3 bytes, depending on the chunk stream
241      * ID.
242      *
243      * The bits 0-5 (least significant) in the chunk basic header represent
244      * the chunk stream ID.
245      *
246      * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this
247      * field.
248      *    0 1 2 3 4 5 6 7
249      *   +-+-+-+-+-+-+-+-+
250      *   |fmt|   cs id   |
251      *   +-+-+-+-+-+-+-+-+
252      *   Figure 6 Chunk basic header 1
253      *
254      * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this
255      * field. ID is computed as (the second byte + 64).
256      *   0                   1
257      *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
258      *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
259      *   |fmt|    0      | cs id - 64    |
260      *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
261      *   Figure 7 Chunk basic header 2
262      *
263      * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of
264      * this field. ID is computed as ((the third byte)*256 + the second byte
265      * + 64).
266      *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
267      *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
268      *   |fmt|     1     |         cs id - 64            |
269      *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
270      *   Figure 8 Chunk basic header 3
271      *
272      * cs id: 6 bits
273      * fmt: 2 bits
274      * cs id - 64: 8 or 16 bits
275      *
276      * Chunk stream IDs with values 64-319 could be represented by both 2-
277      * byte version and 3-byte version of this field.
278      ***********************************************************************/
279 
read_basic_header(&mut self) -> Result<UnpackResult, UnpackError>280     pub fn read_basic_header(&mut self) -> Result<UnpackResult, UnpackError> {
281         let byte = self.reader.read_u8()?;
282 
283         let format_id = (byte >> 6) & 0b00000011;
284         let mut csid = (byte & 0b00111111) as u32;
285 
286         match csid {
287             0 => {
288                 if self.reader.is_empty() {
289                     return Ok(UnpackResult::NotEnoughBytes);
290                 }
291                 csid = 64;
292                 csid += self.reader.read_u8()? as u32;
293             }
294             1 => {
295                 if self.reader.is_empty() {
296                     return Ok(UnpackResult::NotEnoughBytes);
297                 }
298                 csid = 64;
299                 csid += self.reader.read_u8()? as u32;
300                 csid += self.reader.read_u8()? as u32 * 256;
301             }
302             _ => {}
303         }
304 
305         //todo
306         //Only when the csid is changed, we restore the chunk message header
307         //One AV message may be splitted into serval chunks, the csid
308         //will be updated when one av message's chunks are completely
309         //sent/received??
310         if csid != self.current_chunk_info.basic_header.chunk_stream_id {
311             log::trace!(
312                 "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}",
313                 csid,
314                 self.current_chunk_info.basic_header.chunk_stream_id,
315                 byte
316             );
317             //If the chunk stream id is changed, then we should
318             //restore the cached chunk message header used for
319             //getting the correct message header fields.
320             match self.chunk_message_headers.get_mut(&csid) {
321                 Some(header) => {
322                     self.current_chunk_info.message_header = header.clone();
323                     self.print_current_basic_header();
324                 }
325                 None => {
326                     //The format id of the first chunk of a new chunk stream id must be zero.
327                     //assert_eq!(format_id, 0);
328                     if format_id != 0 {
329                         log::warn!(
330                             "The chunk stream id: {}'s first chunk format is {}.",
331                             csid,
332                             format_id
333                         );
334 
335                         if self.parse_error_number > PARSE_ERROR_NUMVER {
336                             return Err(UnpackError {
337                                 value: UnpackErrorValue::CannotParse,
338                             });
339                         }
340                         self.parse_error_number += 1;
341                     } else {
342                         //reset
343                         self.parse_error_number = 0;
344                     }
345                 }
346             }
347         }
348 
349         if format_id == 0 {
350             self.current_message_header().timestamp_delta = 0;
351         }
352         // each chunk will read and update the csid and format id
353         self.current_chunk_info.basic_header.chunk_stream_id = csid;
354         self.current_chunk_info.basic_header.format = format_id;
355         self.print_current_basic_header();
356 
357         self.chunk_read_state = ChunkReadState::ReadMessageHeader;
358 
359         Ok(UnpackResult::ChunkBasicHeaderResult(ChunkBasicHeader::new(
360             format_id, csid,
361         )))
362     }
363 
current_message_header(&mut self) -> &mut ChunkMessageHeader364     fn current_message_header(&mut self) -> &mut ChunkMessageHeader {
365         &mut self.current_chunk_info.message_header
366     }
367 
print_current_message_header(&self, state: ChunkReadState)368     fn print_current_message_header(&self, state: ChunkReadState) {
369         log::trace!(
370             "print_current_basic_header state {}, timestamp:{}, timestamp delta:{}, msg length: {},msg type id: {}, msg stream id:{}",
371             state,
372             self.current_chunk_info.message_header.timestamp,
373             self.current_chunk_info.message_header.timestamp_delta,
374             self.current_chunk_info.message_header.msg_length,
375             self.current_chunk_info.message_header.msg_type_id,
376             self.current_chunk_info.message_header.msg_streamd_id
377         );
378     }
379 
read_message_header(&mut self) -> Result<UnpackResult, UnpackError>380     pub fn read_message_header(&mut self) -> Result<UnpackResult, UnpackError> {
381         log::trace!(
382             "read_message_header, data left in buffer: {}",
383             self.reader.len(),
384         );
385 
386         //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will
387         //inherited from the most recent chunk 0, 1, or 2.
388         //(This field is present in Type 3 chunks when the most recent Type 0,
389         //1, or 2 chunk for the same chunk stream ID indicated the presence of
390         //an extended timestamp field. 5.3.1.3)
391         if self.current_chunk_info.basic_header.format != 3 {
392             self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE;
393         }
394 
395         match self.current_chunk_info.basic_header.format {
396             /*****************************************************************/
397             /*      5.3.1.2.1. Type 0                                        */
398             /*****************************************************************
399              0                   1                   2                   3
400              0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
401             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
402             |                timestamp(3bytes)              |message length |
403             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
404             | message length (cont)(3bytes) |message type id| msg stream id |
405             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
406             |       message stream id (cont) (4bytes)       |
407             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
408             *****************************************************************/
409             0 => {
410                 loop {
411                     match self.msg_header_read_state {
412                         MessageHeaderReadState::ReadTimeStamp => {
413                             self.current_message_header().timestamp =
414                                 self.reader.read_u24::<BigEndian>()?;
415                             self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
416                         }
417                         MessageHeaderReadState::ReadMsgLength => {
418                             self.current_message_header().msg_length =
419                                 self.reader.read_u24::<BigEndian>()?;
420 
421                             log::trace!(
422                                 "read_message_header format 0, msg_length: {}",
423                                 self.current_message_header().msg_length,
424                             );
425                             self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
426                         }
427                         MessageHeaderReadState::ReadMsgTypeID => {
428                             self.current_message_header().msg_type_id = self.reader.read_u8()?;
429 
430                             log::trace!(
431                                 "read_message_header format 0, msg_type_id: {}",
432                                 self.current_message_header().msg_type_id
433                             );
434                             self.msg_header_read_state = MessageHeaderReadState::ReadMsgStreamID;
435                         }
436                         MessageHeaderReadState::ReadMsgStreamID => {
437                             self.current_message_header().msg_streamd_id =
438                                 self.reader.read_u32::<LittleEndian>()?;
439                             self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
440                             break;
441                         }
442                     }
443                 }
444 
445                 if self.current_message_header().timestamp >= 0xFFFFFF {
446                     self.current_message_header().extended_timestamp_type =
447                         ExtendTimestampType::FORMAT0;
448                 }
449             }
450             /*****************************************************************/
451             /*      5.3.1.2.2. Type 1                                        */
452             /*****************************************************************
453              0                   1                   2                   3
454              0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
455             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
456             |                timestamp(3bytes)              |message length |
457             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
458             | message length (cont)(3bytes) |message type id|
459             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
460             *****************************************************************/
461             1 => {
462                 loop {
463                     match self.msg_header_read_state {
464                         MessageHeaderReadState::ReadTimeStamp => {
465                             self.current_message_header().timestamp_delta =
466                                 self.reader.read_u24::<BigEndian>()?;
467                             self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength;
468                         }
469                         MessageHeaderReadState::ReadMsgLength => {
470                             self.current_message_header().msg_length =
471                                 self.reader.read_u24::<BigEndian>()?;
472 
473                             log::trace!(
474                                 "read_message_header format 1, msg_length: {}",
475                                 self.current_message_header().msg_length
476                             );
477                             self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID;
478                         }
479                         MessageHeaderReadState::ReadMsgTypeID => {
480                             self.current_message_header().msg_type_id = self.reader.read_u8()?;
481 
482                             log::trace!(
483                                 "read_message_header format 1, msg_type_id: {}",
484                                 self.current_message_header().msg_type_id
485                             );
486                             self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp;
487                             break;
488                         }
489                         _ => {
490                             log::error!("error happend when read chunk message header");
491                             break;
492                         }
493                     }
494                 }
495 
496                 if self.current_message_header().timestamp_delta >= 0xFFFFFF {
497                     self.current_message_header().extended_timestamp_type =
498                         ExtendTimestampType::FORMAT12;
499                 }
500             }
501             /************************************************/
502             /*      5.3.1.2.3. Type 2                       */
503             /************************************************
504              0                   1                   2
505              0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
506             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
507             |                timestamp(3bytes)              |
508             +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
509             ***************************************************/
510             2 => {
511                 log::trace!(
512                     "read_message_header format 2, msg_type_id: {}",
513                     self.current_message_header().msg_type_id
514                 );
515                 self.current_message_header().timestamp_delta =
516                     self.reader.read_u24::<BigEndian>()?;
517 
518                 if self.current_message_header().timestamp_delta >= 0xFFFFFF {
519                     self.current_message_header().extended_timestamp_type =
520                         ExtendTimestampType::FORMAT12;
521                 }
522             }
523 
524             _ => {}
525         }
526 
527         self.chunk_read_state = ChunkReadState::ReadExtendedTimestamp;
528         self.print_current_message_header(ChunkReadState::ReadMessageHeader);
529 
530         Ok(UnpackResult::Success)
531     }
532 
read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError>533     pub fn read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError> {
534         //The extended timestamp field is present in Type 3 chunks when the most recent Type 0,
535         //1, or 2 chunk for the same chunk stream ID indicated the presence of
536         //an extended timestamp field.
537         match self.current_message_header().extended_timestamp_type {
538             //the current fortmat type can be 0 or 3
539             ExtendTimestampType::FORMAT0 => {
540                 self.current_message_header().timestamp = self.reader.read_u32::<BigEndian>()?;
541             }
542             //the current fortmat type can be 1,2 or 3
543             ExtendTimestampType::FORMAT12 => {
544                 self.current_message_header().timestamp_delta =
545                     self.reader.read_u32::<BigEndian>()?;
546             }
547             ExtendTimestampType::NONE => {}
548         }
549 
550         //compute the abs timestamp
551         let cur_format_id = self.current_chunk_info.basic_header.format;
552         if cur_format_id == 1
553             || cur_format_id == 2
554             || (cur_format_id == 3 && self.current_chunk_info.payload.is_empty())
555         {
556             let timestamp = self.current_message_header().timestamp;
557             let timestamp_delta = self.current_message_header().timestamp_delta;
558 
559             let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta);
560             if is_overflow {
561                 log::warn!(
562                     "The current timestamp is overflow, current basic header: {:?}, current message header: {:?}, payload len: {}, abs timestamp: {}",
563                     self.current_chunk_info.basic_header,
564                     self.current_chunk_info.message_header,
565                     self.current_chunk_info.payload.len(),
566                     cur_abs_timestamp
567                 );
568             }
569             self.current_message_header().timestamp = cur_abs_timestamp;
570         }
571 
572         self.chunk_read_state = ChunkReadState::ReadMessagePayload;
573         self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp);
574 
575         Ok(UnpackResult::Success)
576     }
577 
read_message_payload(&mut self) -> Result<UnpackResult, UnpackError>578     pub fn read_message_payload(&mut self) -> Result<UnpackResult, UnpackError> {
579         let whole_msg_length = self.current_message_header().msg_length as usize;
580         let remaining_bytes = whole_msg_length - self.current_chunk_info.payload.len();
581 
582         log::trace!(
583             "read_message_payload whole msg length: {} and remaining bytes need to be read: {}",
584             whole_msg_length,
585             remaining_bytes
586         );
587 
588         let mut need_read_length = remaining_bytes;
589         if whole_msg_length > self.max_chunk_size {
590             need_read_length = min(remaining_bytes, self.max_chunk_size);
591         }
592 
593         let remaining_mut = self.current_chunk_info.payload.remaining_mut();
594         if need_read_length > remaining_mut {
595             let additional = need_read_length - remaining_mut;
596             self.current_chunk_info.payload.reserve(additional);
597         }
598 
599         log::trace!(
600             "read_message_payload buffer len:{}, need_read_length: {}",
601             self.reader.len(),
602             need_read_length
603         );
604 
605         let payload_data = self.reader.read_bytes(need_read_length)?;
606         self.current_chunk_info
607             .payload
608             .extend_from_slice(&payload_data[..]);
609 
610         log::trace!(
611             "read_message_payload current msg payload len:{}",
612             self.current_chunk_info.payload.len()
613         );
614 
615         if self.current_chunk_info.payload.len() == whole_msg_length {
616             self.chunk_read_state = ChunkReadState::Finish;
617             //get the complete chunk and clear the current chunk payload
618             let chunk_info = self.current_chunk_info.clone();
619             self.current_chunk_info.payload.clear();
620 
621             let csid = self.current_chunk_info.basic_header.chunk_stream_id;
622             self.chunk_message_headers
623                 .insert(csid, self.current_chunk_info.message_header.clone());
624 
625             return Ok(UnpackResult::ChunkInfo(chunk_info));
626         }
627 
628         self.chunk_read_state = ChunkReadState::ReadBasicHeader;
629 
630         Ok(UnpackResult::Success)
631     }
632 }
633 
634 #[cfg(test)]
635 mod tests {
636 
637     use super::ChunkInfo;
638     use super::ChunkUnpacketizer;
639     use super::UnpackResult;
640     use bytes::BytesMut;
641 
642     #[test]
test_set_chunk_size()643     fn test_set_chunk_size() {
644         let mut unpacker = ChunkUnpacketizer::new();
645 
646         let data: [u8; 16] = [
647             //
648             2, //|format+csid|
649             00, 00, 00, //timestamp
650             00, 00, 4, //msg_length
651             1, //msg_type_id
652             00, 00, 00, 00, //msg_stream_id
653             00, 00, 10, 00, //body
654         ];
655 
656         unpacker.extend_data(&data[..]);
657 
658         let rv = unpacker.read_chunk();
659 
660         let mut body = BytesMut::new();
661         body.extend_from_slice(&[00, 00, 10, 00]);
662 
663         let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
664 
665         println!("{:?}, {:?}", expected.basic_header, expected.message_header);
666 
667         assert_eq!(
668             rv.unwrap(),
669             UnpackResult::ChunkInfo(expected),
670             "not correct"
671         )
672     }
673 
674     #[test]
test_overflow_add()675     fn test_overflow_add() {
676         let aa: u32 = u32::MAX;
677         println!("{}", aa);
678 
679         let (_a, _b) = aa.overflowing_add(5);
680 
681         let b = aa.wrapping_add(5);
682 
683         println!("{}", b);
684     }
685 
686     use std::collections::VecDeque;
687 
688     #[test]
test_unpacketizer2()689     fn test_unpacketizer2() {
690         let mut queue = VecDeque::new();
691         queue.push_back(2);
692         queue.push_back(3);
693         queue.push_back(4);
694 
695         for (_idx, data) in queue.iter().enumerate() {
696             println!("{}", data);
697         }
698     }
699 
700     // #[test]
701     // fn test_window_acknowlage_size_set_peer_bandwidth() {
702     //     let mut unpacker = ChunkUnpacketizer::new();
703 
704     //     let data: [u8; 33] = [
705     //         0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
706     //         0x10, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00,
707     //         0x00, 0x00, 0x10, 0x00, 0x02,
708     //     ];
709 
710     //     unpacker.extend_data(&data[..]);
711 
712     //     let rv = unpacker.read_chunk();
713 
714     //     let rv2 = unpacker.read_chunk();
715 
716     //     let mut body = BytesMut::new();
717     //     body.extend_from_slice(&[00, 00, 10, 00]);
718 
719     //     let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body);
720 
721     //     assert_eq!(
722     //         rv.unwrap(),
723     //         UnpackResult::ChunkInfo(expected),
724     //         "not correct"
725     //     )
726     // }
727 
728     // #[test]
729     // fn test_on_connect() {
730     //     // 0000   03 00 00 00 00 00 b1 14 00 00 00 00 02 00 07 63  ...............c
731     //     // 0010   6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03  onnect.?........
732     //     // 0020   00 03 61 70 70 02 00 06 68 61 72 6c 61 6e 00 04  ..app...harlan..
733     //     // 0030   74 79 70 65 02 00 0a 6e 6f 6e 70 72 69 76 61 74  type...nonprivat
734     //     // 0040   65 00 08 66 6c 61 73 68 56 65 72 02 00 1f 46 4d  e..flashVer...FM
735     //     // 0050   4c 45 2f 33 2e 30 20 28 63 6f 6d 70 61 74 69 62  LE/3.0 (compatib
736     //     // 0060   6c 65 3b 20 46 4d 53 63 2f 31 2e 30 29 00 06 73  le; FMSc/1.0)..s
737     //     // 0070   77 66 55 72 6c 02 00 1c 72 74 6d 70 3a 2f 2f 6c  wfUrl...rtmp://l
738     //     // 0080   6f 63 61 6c 68 6f 73 74 3a 31 39 33 35 2f 68 61  ocalhost:1935/ha
739     //     // 0090   72 6c 61 6e 00 05 74 63 55 72 6c 02 00 1c 72 74  rlan..tcUrl...rt
740     //     // 00a0   6d 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a 31  mp://localhost:1
741     //     // 00b0   39 33 35 2f 68 61 72 6c 61 6e 00 00 09           935/harlan...
742     //     // let data: [u8; 189] = [
743     //     //     3, //|format+csid|
744     //     //     0, 0, 0, //timestamp
745     //     //     0, 0, 177, //msg_length
746     //     //     20,  //msg_type_id 0x14
747     //     //     0, 0, 0, 0, //msg_stream_id
748     //     //     2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
749     //     //     3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
750     //     //     2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
751     //     //     104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
752     //     //     97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
753     //     //     102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
754     //     //     104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
755     //     //     85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
756     //     //     111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
757     //     // ];
758 
759     //     let data: [u8; 189] = [
760     //         0x03,
761     //         0x00, 0x00, 0x00,
762     //         0x00, 0x00, 0xb1,
763     //         0x14,
764     //         0x00, 0x00, 0x00, 0x00,
765     //         0x02, 0x00,
766     //         0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00,
767     //         0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x06, 0x68, 0x61,
768     //         0x72, 0x6c, 0x61, 0x6e, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e,
769     //         0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61,
770     //         0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33,
771     //         0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65,
772     //         0x3b, 0x20, 0x46, 0x4d, 0x53, 0x63, 0x2f, 0x31, 0x2e, 0x30, 0x29, 0x00, 0x06, 0x73,
773     //         0x77, 0x66, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f,
774     //         0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33,
775     //         0x35, 0x2f, 0x68, 0x61, 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72,
776     //         0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63,
777     //         0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x68, 0x61,
778     //         0x72, 0x6c, 0x61, 0x6e, 0x00, 0x00, 0x09,
779     //     ];
780 
781     //     let mut unpacker = ChunkUnpacketizer::new();
782     //     unpacker.extend_data(&data[..]);
783 
784     //     let rv = unpacker.read_chunk();
785     //     match &rv {
786     //         Err(err) => {
787     //             println!("==={}===", err);
788     //         }
789     //         _ => {}
790     //     }
791 
792     //     let mut body = BytesMut::new();
793     //     body.extend_from_slice(&[
794     //         2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body
795     //         3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101,
796     //         2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115,
797     //         104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112,
798     //         97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119,
799     //         102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108,
800     //         104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99,
801     //         85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104,
802     //         111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9,
803     //     ]);
804 
805     //     let expected = ChunkInfo::new(3, 0, 0, 177, 20, 0, body);
806 
807     //     assert_eq!(
808     //         rv.unwrap(),
809     //         UnpackResult::ChunkInfo(expected),
810     //         "not correct"
811     //     )
812     // }
813 }
814