1 use { 2 super::{ 3 define, 4 errors::{UnpackError, UnpackErrorValue}, 5 ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType, 6 }, 7 crate::messages::define::msg_type_id, 8 byteorder::{BigEndian, LittleEndian}, 9 bytes::{BufMut, BytesMut}, 10 bytesio::bytes_reader::BytesReader, 11 chrono::prelude::*, 12 std::{cmp::min, collections::HashMap, fmt, vec::Vec}, 13 }; 14 15 const PARSE_ERROR_NUMVER: usize = 5; 16 17 #[derive(Eq, PartialEq, Debug)] 18 pub enum UnpackResult { 19 ChunkBasicHeaderResult(ChunkBasicHeader), 20 ChunkMessageHeaderResult(ChunkMessageHeader), 21 ChunkInfo(ChunkInfo), 22 Chunks(Vec<ChunkInfo>), 23 Success, 24 NotEnoughBytes, 25 Empty, 26 } 27 28 #[derive(Copy, Clone, Debug)] 29 enum ChunkReadState { 30 ReadBasicHeader = 1, 31 ReadMessageHeader = 2, 32 ReadExtendedTimestamp = 3, 33 ReadMessagePayload = 4, 34 Finish = 5, 35 } 36 37 impl fmt::Display for ChunkReadState { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result38 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 39 match self { 40 ChunkReadState::ReadBasicHeader => { 41 write!(f, "ReadBasicHeader",) 42 } 43 ChunkReadState::ReadMessageHeader => { 44 write!(f, "ReadMessageHeader",) 45 } 46 ChunkReadState::ReadExtendedTimestamp => { 47 write!(f, "ReadExtendedTimestamp",) 48 } 49 ChunkReadState::ReadMessagePayload => { 50 write!(f, "ReadMessagePayload",) 51 } 52 ChunkReadState::Finish => { 53 write!(f, "Finish",) 54 } 55 } 56 } 57 } 58 59 #[derive(Copy, Clone, Debug)] 60 enum MessageHeaderReadState { 61 ReadTimeStamp = 1, 62 ReadMsgLength = 2, 63 ReadMsgTypeID = 3, 64 ReadMsgStreamID = 4, 65 } 66 67 pub struct ChunkUnpacketizer { 68 pub reader: BytesReader, 69 70 //https://doc.rust-lang.org/stable/rust-by-example/scope/lifetime/fn.html 71 //https://zhuanlan.zhihu.com/p/165976086 72 //We use this member to generate a complete message: 73 // - basic_header: the 2 fields will be updated from each chunk. 74 // - message_header: whose fields need to be updated for current chunk 75 // depends on the format id from basic header. 76 // Each field can inherit the value from the previous chunk. 77 // - payload: If the message's payload size is longger than the max chunk size, 78 // the whole payload will be splitted into several chunks. 79 // 80 pub current_chunk_info: ChunkInfo, 81 chunk_message_headers: HashMap<u32, ChunkMessageHeader>, 82 chunk_read_state: ChunkReadState, 83 msg_header_read_state: MessageHeaderReadState, 84 max_chunk_size: usize, 85 chunk_index: u32, 86 pub session_type: u8, 87 parse_error_number: usize, 88 } 89 90 impl Default for ChunkUnpacketizer { default() -> Self91 fn default() -> Self { 92 Self::new() 93 } 94 } 95 96 impl ChunkUnpacketizer { new() -> Self97 pub fn new() -> Self { 98 Self { 99 reader: BytesReader::new(BytesMut::new()), 100 current_chunk_info: ChunkInfo::default(), 101 chunk_message_headers: HashMap::new(), 102 chunk_read_state: ChunkReadState::ReadBasicHeader, 103 msg_header_read_state: MessageHeaderReadState::ReadTimeStamp, 104 max_chunk_size: define::INIT_CHUNK_SIZE as usize, 105 chunk_index: 0, 106 session_type: 0, 107 parse_error_number: 0, 108 } 109 } 110 extend_data(&mut self, data: &[u8])111 pub fn extend_data(&mut self, data: &[u8]) { 112 self.reader.extend_from_slice(data); 113 114 log::trace!( 115 "extend_data length: {}: content:{:X?}", 116 self.reader.len(), 117 self.reader 118 .get_remaining_bytes() 119 .split_to(self.reader.len()) 120 .to_vec() 121 ); 122 } 123 update_max_chunk_size(&mut self, chunk_size: usize)124 pub fn update_max_chunk_size(&mut self, chunk_size: usize) { 125 log::trace!("update max chunk size: {}", chunk_size); 126 self.max_chunk_size = chunk_size; 127 } 128 read_chunks(&mut self) -> Result<UnpackResult, UnpackError>129 pub fn read_chunks(&mut self) -> Result<UnpackResult, UnpackError> { 130 log::trace!( 131 "read chunks begin, current time: {}, and read state: {}", 132 Local::now().timestamp_nanos(), 133 self.chunk_read_state 134 ); 135 136 // log::trace!( 137 // "read chunks, reader remaining data: {}", 138 // self.reader.get_remaining_bytes() 139 // ); 140 141 let mut chunks: Vec<ChunkInfo> = Vec::new(); 142 143 loop { 144 match self.read_chunk() { 145 Ok(chunk) => { 146 match chunk { 147 UnpackResult::ChunkInfo(chunk_info) => { 148 let msg_type_id = chunk_info.message_header.msg_type_id; 149 chunks.push(chunk_info); 150 151 //if the chunk_size is changed, then break and update chunk_size 152 if msg_type_id == msg_type_id::SET_CHUNK_SIZE { 153 break; 154 } 155 } 156 _ => continue, 157 } 158 } 159 Err(err) => { 160 if let UnpackErrorValue::CannotParse = err.value { 161 return Err(err); 162 } 163 break; 164 } 165 } 166 } 167 168 log::trace!( 169 "read chunks end, current time: {}, and read state: {}", 170 Local::now().timestamp_nanos(), 171 self.chunk_read_state 172 ); 173 174 if !chunks.is_empty() { 175 Ok(UnpackResult::Chunks(chunks)) 176 } else { 177 Err(UnpackError { 178 value: UnpackErrorValue::EmptyChunks, 179 }) 180 } 181 } 182 183 /****************************************************************************** 184 * 5.3.1 Chunk Format 185 * Each chunk consists of a header and data. The header itself has three parts: 186 * +--------------+----------------+--------------------+--------------+ 187 * | Basic Header | Message Header | Extended Timestamp | Chunk Data | 188 * +--------------+----------------+--------------------+--------------+ 189 * |<------------------- Chunk Header ----------------->| 190 ******************************************************************************/ read_chunk(&mut self) -> Result<UnpackResult, UnpackError>191 pub fn read_chunk(&mut self) -> Result<UnpackResult, UnpackError> { 192 let mut result: UnpackResult = UnpackResult::Empty; 193 194 log::trace!( 195 "read chunk begin, current time: {}, and read state: {}, and chunk index: {}", 196 Local::now().timestamp_nanos(), 197 self.chunk_read_state, 198 self.chunk_index, 199 ); 200 201 self.chunk_index += 1; 202 203 loop { 204 result = match self.chunk_read_state { 205 ChunkReadState::ReadBasicHeader => self.read_basic_header()?, 206 ChunkReadState::ReadMessageHeader => self.read_message_header()?, 207 ChunkReadState::ReadExtendedTimestamp => self.read_extended_timestamp()?, 208 ChunkReadState::ReadMessagePayload => self.read_message_payload()?, 209 ChunkReadState::Finish => { 210 self.chunk_read_state = ChunkReadState::ReadBasicHeader; 211 break; 212 } 213 }; 214 } 215 216 log::trace!( 217 "read chunk end, current time: {}, and read state: {}, and chunk index: {}", 218 Local::now().timestamp_nanos(), 219 self.chunk_read_state, 220 self.chunk_index, 221 ); 222 Ok(result) 223 224 // Ok(UnpackResult::Success) 225 } 226 print_current_basic_header(&mut self)227 pub fn print_current_basic_header(&mut self) { 228 log::trace!( 229 "print_current_basic_header, csid: {},format id: {}", 230 self.current_chunk_info.basic_header.chunk_stream_id, 231 self.current_chunk_info.basic_header.format 232 ); 233 } 234 235 /****************************************************************** 236 * 5.3.1.1. Chunk Basic Header 237 * The Chunk Basic Header encodes the chunk stream ID and the chunk 238 * type(represented by fmt field in the figure below). Chunk type 239 * determines the format of the encoded message header. Chunk Basic 240 * Header field may be 1, 2, or 3 bytes, depending on the chunk stream 241 * ID. 242 * 243 * The bits 0-5 (least significant) in the chunk basic header represent 244 * the chunk stream ID. 245 * 246 * Chunk stream IDs 2-63 can be encoded in the 1-byte version of this 247 * field. 248 * 0 1 2 3 4 5 6 7 249 * +-+-+-+-+-+-+-+-+ 250 * |fmt| cs id | 251 * +-+-+-+-+-+-+-+-+ 252 * Figure 6 Chunk basic header 1 253 * 254 * Chunk stream IDs 64-319 can be encoded in the 2-byte version of this 255 * field. ID is computed as (the second byte + 64). 256 * 0 1 257 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 258 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 259 * |fmt| 0 | cs id - 64 | 260 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 261 * Figure 7 Chunk basic header 2 262 * 263 * Chunk stream IDs 64-65599 can be encoded in the 3-byte version of 264 * this field. ID is computed as ((the third byte)*256 + the second byte 265 * + 64). 266 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 267 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 268 * |fmt| 1 | cs id - 64 | 269 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 270 * Figure 8 Chunk basic header 3 271 * 272 * cs id: 6 bits 273 * fmt: 2 bits 274 * cs id - 64: 8 or 16 bits 275 * 276 * Chunk stream IDs with values 64-319 could be represented by both 2- 277 * byte version and 3-byte version of this field. 278 ***********************************************************************/ 279 read_basic_header(&mut self) -> Result<UnpackResult, UnpackError>280 pub fn read_basic_header(&mut self) -> Result<UnpackResult, UnpackError> { 281 let byte = self.reader.read_u8()?; 282 283 let format_id = (byte >> 6) & 0b00000011; 284 let mut csid = (byte & 0b00111111) as u32; 285 286 match csid { 287 0 => { 288 if self.reader.is_empty() { 289 return Ok(UnpackResult::NotEnoughBytes); 290 } 291 csid = 64; 292 csid += self.reader.read_u8()? as u32; 293 } 294 1 => { 295 if self.reader.is_empty() { 296 return Ok(UnpackResult::NotEnoughBytes); 297 } 298 csid = 64; 299 csid += self.reader.read_u8()? as u32; 300 csid += self.reader.read_u8()? as u32 * 256; 301 } 302 _ => {} 303 } 304 305 //todo 306 //Only when the csid is changed, we restore the chunk message header 307 //One AV message may be splitted into serval chunks, the csid 308 //will be updated when one av message's chunks are completely 309 //sent/received?? 310 if csid != self.current_chunk_info.basic_header.chunk_stream_id { 311 log::trace!( 312 "read_basic_header, chunk stream id update, new: {}, old:{}, byte: {}", 313 csid, 314 self.current_chunk_info.basic_header.chunk_stream_id, 315 byte 316 ); 317 //If the chunk stream id is changed, then we should 318 //restore the cached chunk message header used for 319 //getting the correct message header fields. 320 match self.chunk_message_headers.get_mut(&csid) { 321 Some(header) => { 322 self.current_chunk_info.message_header = header.clone(); 323 self.print_current_basic_header(); 324 } 325 None => { 326 //The format id of the first chunk of a new chunk stream id must be zero. 327 //assert_eq!(format_id, 0); 328 if format_id != 0 { 329 log::warn!( 330 "The chunk stream id: {}'s first chunk format is {}.", 331 csid, 332 format_id 333 ); 334 335 if self.parse_error_number > PARSE_ERROR_NUMVER { 336 return Err(UnpackError { 337 value: UnpackErrorValue::CannotParse, 338 }); 339 } 340 self.parse_error_number += 1; 341 } else { 342 //reset 343 self.parse_error_number = 0; 344 } 345 } 346 } 347 } 348 349 if format_id == 0 { 350 self.current_message_header().timestamp_delta = 0; 351 } 352 // each chunk will read and update the csid and format id 353 self.current_chunk_info.basic_header.chunk_stream_id = csid; 354 self.current_chunk_info.basic_header.format = format_id; 355 self.print_current_basic_header(); 356 357 self.chunk_read_state = ChunkReadState::ReadMessageHeader; 358 359 Ok(UnpackResult::ChunkBasicHeaderResult(ChunkBasicHeader::new( 360 format_id, csid, 361 ))) 362 } 363 current_message_header(&mut self) -> &mut ChunkMessageHeader364 fn current_message_header(&mut self) -> &mut ChunkMessageHeader { 365 &mut self.current_chunk_info.message_header 366 } 367 print_current_message_header(&self, state: ChunkReadState)368 fn print_current_message_header(&self, state: ChunkReadState) { 369 log::trace!( 370 "print_current_basic_header state {}, timestamp:{}, timestamp delta:{}, msg length: {},msg type id: {}, msg stream id:{}", 371 state, 372 self.current_chunk_info.message_header.timestamp, 373 self.current_chunk_info.message_header.timestamp_delta, 374 self.current_chunk_info.message_header.msg_length, 375 self.current_chunk_info.message_header.msg_type_id, 376 self.current_chunk_info.message_header.msg_streamd_id 377 ); 378 } 379 read_message_header(&mut self) -> Result<UnpackResult, UnpackError>380 pub fn read_message_header(&mut self) -> Result<UnpackResult, UnpackError> { 381 log::trace!( 382 "read_message_header, data left in buffer: {}", 383 self.reader.len(), 384 ); 385 386 //Reset is_extended_timestamp for type 0 ,1 ,2 , for type 3 ,this field will 387 //inherited from the most recent chunk 0, 1, or 2. 388 //(This field is present in Type 3 chunks when the most recent Type 0, 389 //1, or 2 chunk for the same chunk stream ID indicated the presence of 390 //an extended timestamp field. 5.3.1.3) 391 if self.current_chunk_info.basic_header.format != 3 { 392 self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE; 393 } 394 395 match self.current_chunk_info.basic_header.format { 396 /*****************************************************************/ 397 /* 5.3.1.2.1. Type 0 */ 398 /***************************************************************** 399 0 1 2 3 400 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 401 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 402 | timestamp(3bytes) |message length | 403 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 404 | message length (cont)(3bytes) |message type id| msg stream id | 405 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 406 | message stream id (cont) (4bytes) | 407 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 408 *****************************************************************/ 409 0 => { 410 loop { 411 match self.msg_header_read_state { 412 MessageHeaderReadState::ReadTimeStamp => { 413 self.current_message_header().timestamp = 414 self.reader.read_u24::<BigEndian>()?; 415 self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength; 416 } 417 MessageHeaderReadState::ReadMsgLength => { 418 self.current_message_header().msg_length = 419 self.reader.read_u24::<BigEndian>()?; 420 421 log::trace!( 422 "read_message_header format 0, msg_length: {}", 423 self.current_message_header().msg_length, 424 ); 425 self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID; 426 } 427 MessageHeaderReadState::ReadMsgTypeID => { 428 self.current_message_header().msg_type_id = self.reader.read_u8()?; 429 430 log::trace!( 431 "read_message_header format 0, msg_type_id: {}", 432 self.current_message_header().msg_type_id 433 ); 434 self.msg_header_read_state = MessageHeaderReadState::ReadMsgStreamID; 435 } 436 MessageHeaderReadState::ReadMsgStreamID => { 437 self.current_message_header().msg_streamd_id = 438 self.reader.read_u32::<LittleEndian>()?; 439 self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp; 440 break; 441 } 442 } 443 } 444 445 if self.current_message_header().timestamp >= 0xFFFFFF { 446 self.current_message_header().extended_timestamp_type = 447 ExtendTimestampType::FORMAT0; 448 } 449 } 450 /*****************************************************************/ 451 /* 5.3.1.2.2. Type 1 */ 452 /***************************************************************** 453 0 1 2 3 454 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 455 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 456 | timestamp(3bytes) |message length | 457 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 458 | message length (cont)(3bytes) |message type id| 459 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 460 *****************************************************************/ 461 1 => { 462 loop { 463 match self.msg_header_read_state { 464 MessageHeaderReadState::ReadTimeStamp => { 465 self.current_message_header().timestamp_delta = 466 self.reader.read_u24::<BigEndian>()?; 467 self.msg_header_read_state = MessageHeaderReadState::ReadMsgLength; 468 } 469 MessageHeaderReadState::ReadMsgLength => { 470 self.current_message_header().msg_length = 471 self.reader.read_u24::<BigEndian>()?; 472 473 log::trace!( 474 "read_message_header format 1, msg_length: {}", 475 self.current_message_header().msg_length 476 ); 477 self.msg_header_read_state = MessageHeaderReadState::ReadMsgTypeID; 478 } 479 MessageHeaderReadState::ReadMsgTypeID => { 480 self.current_message_header().msg_type_id = self.reader.read_u8()?; 481 482 log::trace!( 483 "read_message_header format 1, msg_type_id: {}", 484 self.current_message_header().msg_type_id 485 ); 486 self.msg_header_read_state = MessageHeaderReadState::ReadTimeStamp; 487 break; 488 } 489 _ => { 490 log::error!("error happend when read chunk message header"); 491 break; 492 } 493 } 494 } 495 496 if self.current_message_header().timestamp_delta >= 0xFFFFFF { 497 self.current_message_header().extended_timestamp_type = 498 ExtendTimestampType::FORMAT12; 499 } 500 } 501 /************************************************/ 502 /* 5.3.1.2.3. Type 2 */ 503 /************************************************ 504 0 1 2 505 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 506 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 507 | timestamp(3bytes) | 508 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 509 ***************************************************/ 510 2 => { 511 log::trace!( 512 "read_message_header format 2, msg_type_id: {}", 513 self.current_message_header().msg_type_id 514 ); 515 self.current_message_header().timestamp_delta = 516 self.reader.read_u24::<BigEndian>()?; 517 518 if self.current_message_header().timestamp_delta >= 0xFFFFFF { 519 self.current_message_header().extended_timestamp_type = 520 ExtendTimestampType::FORMAT12; 521 } 522 } 523 524 _ => {} 525 } 526 527 self.chunk_read_state = ChunkReadState::ReadExtendedTimestamp; 528 self.print_current_message_header(ChunkReadState::ReadMessageHeader); 529 530 Ok(UnpackResult::Success) 531 } 532 read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError>533 pub fn read_extended_timestamp(&mut self) -> Result<UnpackResult, UnpackError> { 534 //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, 535 //1, or 2 chunk for the same chunk stream ID indicated the presence of 536 //an extended timestamp field. 537 match self.current_message_header().extended_timestamp_type { 538 //the current fortmat type can be 0 or 3 539 ExtendTimestampType::FORMAT0 => { 540 self.current_message_header().timestamp = self.reader.read_u32::<BigEndian>()?; 541 } 542 //the current fortmat type can be 1,2 or 3 543 ExtendTimestampType::FORMAT12 => { 544 self.current_message_header().timestamp_delta = 545 self.reader.read_u32::<BigEndian>()?; 546 } 547 ExtendTimestampType::NONE => {} 548 } 549 550 //compute the abs timestamp 551 let cur_format_id = self.current_chunk_info.basic_header.format; 552 if cur_format_id == 1 553 || cur_format_id == 2 554 || (cur_format_id == 3 && self.current_chunk_info.payload.is_empty()) 555 { 556 let timestamp = self.current_message_header().timestamp; 557 let timestamp_delta = self.current_message_header().timestamp_delta; 558 559 let (cur_abs_timestamp, is_overflow) = timestamp.overflowing_add(timestamp_delta); 560 if is_overflow { 561 log::warn!( 562 "The current timestamp is overflow, current basic header: {:?}, current message header: {:?}, payload len: {}, abs timestamp: {}", 563 self.current_chunk_info.basic_header, 564 self.current_chunk_info.message_header, 565 self.current_chunk_info.payload.len(), 566 cur_abs_timestamp 567 ); 568 } 569 self.current_message_header().timestamp = cur_abs_timestamp; 570 } 571 572 self.chunk_read_state = ChunkReadState::ReadMessagePayload; 573 self.print_current_message_header(ChunkReadState::ReadExtendedTimestamp); 574 575 Ok(UnpackResult::Success) 576 } 577 read_message_payload(&mut self) -> Result<UnpackResult, UnpackError>578 pub fn read_message_payload(&mut self) -> Result<UnpackResult, UnpackError> { 579 let whole_msg_length = self.current_message_header().msg_length as usize; 580 let remaining_bytes = whole_msg_length - self.current_chunk_info.payload.len(); 581 582 log::trace!( 583 "read_message_payload whole msg length: {} and remaining bytes need to be read: {}", 584 whole_msg_length, 585 remaining_bytes 586 ); 587 588 let mut need_read_length = remaining_bytes; 589 if whole_msg_length > self.max_chunk_size { 590 need_read_length = min(remaining_bytes, self.max_chunk_size); 591 } 592 593 let remaining_mut = self.current_chunk_info.payload.remaining_mut(); 594 if need_read_length > remaining_mut { 595 let additional = need_read_length - remaining_mut; 596 self.current_chunk_info.payload.reserve(additional); 597 } 598 599 log::trace!( 600 "read_message_payload buffer len:{}, need_read_length: {}", 601 self.reader.len(), 602 need_read_length 603 ); 604 605 let payload_data = self.reader.read_bytes(need_read_length)?; 606 self.current_chunk_info 607 .payload 608 .extend_from_slice(&payload_data[..]); 609 610 log::trace!( 611 "read_message_payload current msg payload len:{}", 612 self.current_chunk_info.payload.len() 613 ); 614 615 if self.current_chunk_info.payload.len() == whole_msg_length { 616 self.chunk_read_state = ChunkReadState::Finish; 617 //get the complete chunk and clear the current chunk payload 618 let chunk_info = self.current_chunk_info.clone(); 619 self.current_chunk_info.payload.clear(); 620 621 let csid = self.current_chunk_info.basic_header.chunk_stream_id; 622 self.chunk_message_headers 623 .insert(csid, self.current_chunk_info.message_header.clone()); 624 625 return Ok(UnpackResult::ChunkInfo(chunk_info)); 626 } 627 628 self.chunk_read_state = ChunkReadState::ReadBasicHeader; 629 630 Ok(UnpackResult::Success) 631 } 632 } 633 634 #[cfg(test)] 635 mod tests { 636 637 use super::ChunkInfo; 638 use super::ChunkUnpacketizer; 639 use super::UnpackResult; 640 use bytes::BytesMut; 641 642 #[test] test_set_chunk_size()643 fn test_set_chunk_size() { 644 let mut unpacker = ChunkUnpacketizer::new(); 645 646 let data: [u8; 16] = [ 647 // 648 2, //|format+csid| 649 00, 00, 00, //timestamp 650 00, 00, 4, //msg_length 651 1, //msg_type_id 652 00, 00, 00, 00, //msg_stream_id 653 00, 00, 10, 00, //body 654 ]; 655 656 unpacker.extend_data(&data[..]); 657 658 let rv = unpacker.read_chunk(); 659 660 let mut body = BytesMut::new(); 661 body.extend_from_slice(&[00, 00, 10, 00]); 662 663 let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body); 664 665 println!("{:?}, {:?}", expected.basic_header, expected.message_header); 666 667 assert_eq!( 668 rv.unwrap(), 669 UnpackResult::ChunkInfo(expected), 670 "not correct" 671 ) 672 } 673 674 #[test] test_overflow_add()675 fn test_overflow_add() { 676 let aa: u32 = u32::MAX; 677 println!("{}", aa); 678 679 let (_a, _b) = aa.overflowing_add(5); 680 681 let b = aa.wrapping_add(5); 682 683 println!("{}", b); 684 } 685 686 use std::collections::VecDeque; 687 688 #[test] test_unpacketizer2()689 fn test_unpacketizer2() { 690 let mut queue = VecDeque::new(); 691 queue.push_back(2); 692 queue.push_back(3); 693 queue.push_back(4); 694 695 for (_idx, data) in queue.iter().enumerate() { 696 println!("{}", data); 697 } 698 } 699 700 // #[test] 701 // fn test_window_acknowlage_size_set_peer_bandwidth() { 702 // let mut unpacker = ChunkUnpacketizer::new(); 703 704 // let data: [u8; 33] = [ 705 // 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 706 // 0x10, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00, 707 // 0x00, 0x00, 0x10, 0x00, 0x02, 708 // ]; 709 710 // unpacker.extend_data(&data[..]); 711 712 // let rv = unpacker.read_chunk(); 713 714 // let rv2 = unpacker.read_chunk(); 715 716 // let mut body = BytesMut::new(); 717 // body.extend_from_slice(&[00, 00, 10, 00]); 718 719 // let expected = ChunkInfo::new(2, 0, 0, 4, 1, 0, body); 720 721 // assert_eq!( 722 // rv.unwrap(), 723 // UnpackResult::ChunkInfo(expected), 724 // "not correct" 725 // ) 726 // } 727 728 // #[test] 729 // fn test_on_connect() { 730 // // 0000 03 00 00 00 00 00 b1 14 00 00 00 00 02 00 07 63 ...............c 731 // // 0010 6f 6e 6e 65 63 74 00 3f f0 00 00 00 00 00 00 03 onnect.?........ 732 // // 0020 00 03 61 70 70 02 00 06 68 61 72 6c 61 6e 00 04 ..app...harlan.. 733 // // 0030 74 79 70 65 02 00 0a 6e 6f 6e 70 72 69 76 61 74 type...nonprivat 734 // // 0040 65 00 08 66 6c 61 73 68 56 65 72 02 00 1f 46 4d e..flashVer...FM 735 // // 0050 4c 45 2f 33 2e 30 20 28 63 6f 6d 70 61 74 69 62 LE/3.0 (compatib 736 // // 0060 6c 65 3b 20 46 4d 53 63 2f 31 2e 30 29 00 06 73 le; FMSc/1.0)..s 737 // // 0070 77 66 55 72 6c 02 00 1c 72 74 6d 70 3a 2f 2f 6c wfUrl...rtmp://l 738 // // 0080 6f 63 61 6c 68 6f 73 74 3a 31 39 33 35 2f 68 61 ocalhost:1935/ha 739 // // 0090 72 6c 61 6e 00 05 74 63 55 72 6c 02 00 1c 72 74 rlan..tcUrl...rt 740 // // 00a0 6d 70 3a 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a 31 mp://localhost:1 741 // // 00b0 39 33 35 2f 68 61 72 6c 61 6e 00 00 09 935/harlan... 742 // // let data: [u8; 189] = [ 743 // // 3, //|format+csid| 744 // // 0, 0, 0, //timestamp 745 // // 0, 0, 177, //msg_length 746 // // 20, //msg_type_id 0x14 747 // // 0, 0, 0, 0, //msg_stream_id 748 // // 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 749 // // 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 750 // // 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 751 // // 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 752 // // 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 753 // // 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 754 // // 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 755 // // 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 756 // // 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 757 // // ]; 758 759 // let data: [u8; 189] = [ 760 // 0x03, 761 // 0x00, 0x00, 0x00, 762 // 0x00, 0x00, 0xb1, 763 // 0x14, 764 // 0x00, 0x00, 0x00, 0x00, 765 // 0x02, 0x00, 766 // 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x00, 0x3f, 0xf0, 0x00, 0x00, 0x00, 767 // 0x00, 0x00, 0x00, 0x03, 0x00, 0x03, 0x61, 0x70, 0x70, 0x02, 0x00, 0x06, 0x68, 0x61, 768 // 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x04, 0x74, 0x79, 0x70, 0x65, 0x02, 0x00, 0x0a, 0x6e, 769 // 0x6f, 0x6e, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, 0x65, 0x00, 0x08, 0x66, 0x6c, 0x61, 770 // 0x73, 0x68, 0x56, 0x65, 0x72, 0x02, 0x00, 0x1f, 0x46, 0x4d, 0x4c, 0x45, 0x2f, 0x33, 771 // 0x2e, 0x30, 0x20, 0x28, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x74, 0x69, 0x62, 0x6c, 0x65, 772 // 0x3b, 0x20, 0x46, 0x4d, 0x53, 0x63, 0x2f, 0x31, 0x2e, 0x30, 0x29, 0x00, 0x06, 0x73, 773 // 0x77, 0x66, 0x55, 0x72, 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 774 // 0x2f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 775 // 0x35, 0x2f, 0x68, 0x61, 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x05, 0x74, 0x63, 0x55, 0x72, 776 // 0x6c, 0x02, 0x00, 0x1c, 0x72, 0x74, 0x6d, 0x70, 0x3a, 0x2f, 0x2f, 0x6c, 0x6f, 0x63, 777 // 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, 0x3a, 0x31, 0x39, 0x33, 0x35, 0x2f, 0x68, 0x61, 778 // 0x72, 0x6c, 0x61, 0x6e, 0x00, 0x00, 0x09, 779 // ]; 780 781 // let mut unpacker = ChunkUnpacketizer::new(); 782 // unpacker.extend_data(&data[..]); 783 784 // let rv = unpacker.read_chunk(); 785 // match &rv { 786 // Err(err) => { 787 // println!("==={}===", err); 788 // } 789 // _ => {} 790 // } 791 792 // let mut body = BytesMut::new(); 793 // body.extend_from_slice(&[ 794 // 2, 0, 7, 99, 111, 110, 110, 101, 99, 116, 0, 63, 240, 0, 0, 0, 0, 0, 0, //body 795 // 3, 0, 3, 97, 112, 112, 2, 0, 6, 104, 97, 114, 108, 97, 110, 0, 4, 116, 121, 112, 101, 796 // 2, 0, 10, 110, 111, 110, 112, 114, 105, 118, 97, 116, 101, 0, 8, 102, 108, 97, 115, 797 // 104, 86, 101, 114, 2, 0, 31, 70, 77, 76, 69, 47, 51, 46, 48, 32, 40, 99, 111, 109, 112, 798 // 97, 116, 105, 98, 108, 101, 59, 32, 70, 77, 83, 99, 47, 49, 46, 48, 41, 0, 6, 115, 119, 799 // 102, 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 800 // 104, 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 5, 116, 99, 801 // 85, 114, 108, 2, 0, 28, 114, 116, 109, 112, 58, 47, 47, 108, 111, 99, 97, 108, 104, 802 // 111, 115, 116, 58, 49, 57, 51, 53, 47, 104, 97, 114, 108, 97, 110, 0, 0, 9, 803 // ]); 804 805 // let expected = ChunkInfo::new(3, 0, 0, 177, 20, 0, body); 806 807 // assert_eq!( 808 // rv.unwrap(), 809 // UnpackResult::ChunkInfo(expected), 810 // "not correct" 811 // ) 812 // } 813 } 814