1 use crate::error::{Error, Result}; 2 3 use bytes::{Bytes, BytesMut}; 4 5 /////////////////////////////////////////////////////////////////// 6 //payload_queue_test 7 /////////////////////////////////////////////////////////////////// 8 use super::payload_queue::*; 9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier}; 10 use crate::chunk::chunk_selective_ack::GapAckBlock; 11 12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData { 13 ChunkPayloadData { 14 tsn, 15 user_data: { 16 let mut b = BytesMut::new(); 17 b.resize(n_bytes, 0); 18 b.freeze() 19 }, 20 ..Default::default() 21 } 22 } 23 24 #[test] 25 fn test_payload_queue_push_no_check() -> Result<()> { 26 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 27 28 pq.push_no_check(make_payload(0, 10)); 29 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 30 assert_eq!(1, pq.len(), "item count mismatch"); 31 pq.push_no_check(make_payload(1, 11)); 32 assert_eq!(21, pq.get_num_bytes(), "total bytes mismatch"); 33 assert_eq!(2, pq.len(), "item count mismatch"); 34 pq.push_no_check(make_payload(2, 12)); 35 assert_eq!(33, pq.get_num_bytes(), "total bytes mismatch"); 36 assert_eq!(3, pq.len(), "item count mismatch"); 37 38 for i in 0..3 { 39 assert!(!pq.sorted.is_empty(), "should not be empty"); 40 let c = pq.pop(i); 41 assert!(c.is_some(), "pop should succeed"); 42 if let Some(c) = c { 43 assert_eq!(i, c.tsn, "TSN should match"); 44 } 45 } 46 47 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 48 assert_eq!(0, pq.len(), "item count mismatch"); 49 50 assert!(pq.sorted.is_empty(), "should be empty"); 51 pq.push_no_check(make_payload(3, 13)); 52 assert_eq!(13, pq.get_num_bytes(), "total bytes mismatch"); 53 pq.push_no_check(make_payload(4, 14)); 54 assert_eq!(27, pq.get_num_bytes(), "total bytes mismatch"); 55 56 for i in 3..5 { 57 assert!(!pq.sorted.is_empty(), "should not be empty"); 58 let c = pq.pop(i); 59 assert!(c.is_some(), "pop should succeed"); 60 if let Some(c) = c { 61 assert_eq!(i, c.tsn, "TSN should match"); 62 } 63 } 64 65 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 66 assert_eq!(0, pq.len(), "item count mismatch"); 67 68 Ok(()) 69 } 70 71 #[test] 72 fn test_payload_queue_get_gap_ack_block() -> Result<()> { 73 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 74 75 pq.push(make_payload(1, 0), 0); 76 pq.push(make_payload(2, 0), 0); 77 pq.push(make_payload(3, 0), 0); 78 pq.push(make_payload(4, 0), 0); 79 pq.push(make_payload(5, 0), 0); 80 pq.push(make_payload(6, 0), 0); 81 82 let gab1 = vec![GapAckBlock { start: 1, end: 6 }]; 83 let gab2 = pq.get_gap_ack_blocks(0); 84 assert!(!gab2.is_empty()); 85 assert_eq!(gab2.len(), 1); 86 87 assert_eq!(gab1[0].start, gab2[0].start); 88 assert_eq!(gab1[0].end, gab2[0].end); 89 90 pq.push(make_payload(8, 0), 0); 91 pq.push(make_payload(9, 0), 0); 92 93 let gab1 = vec![ 94 GapAckBlock { start: 1, end: 6 }, 95 GapAckBlock { start: 8, end: 9 }, 96 ]; 97 let gab2 = pq.get_gap_ack_blocks(0); 98 assert!(!gab2.is_empty()); 99 assert_eq!(gab2.len(), 2); 100 101 assert_eq!(gab1[0].start, gab2[0].start); 102 assert_eq!(gab1[0].end, gab2[0].end); 103 assert_eq!(gab1[1].start, gab2[1].start); 104 assert_eq!(gab1[1].end, gab2[1].end); 105 106 Ok(()) 107 } 108 109 #[test] 110 fn test_payload_queue_get_last_tsn_received() -> Result<()> { 111 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 112 113 // empty queie should return false 114 let ok = pq.get_last_tsn_received(); 115 assert!(ok.is_none(), "should be none"); 116 117 let ok = pq.push(make_payload(20, 0), 0); 118 assert!(ok, "should be true"); 119 let tsn = pq.get_last_tsn_received(); 120 assert!(tsn.is_some(), "should be false"); 121 assert_eq!(Some(&20), tsn, "should match"); 122 123 // append should work 124 let ok = pq.push(make_payload(21, 0), 0); 125 assert!(ok, "should be true"); 126 let tsn = pq.get_last_tsn_received(); 127 assert!(tsn.is_some(), "should be false"); 128 assert_eq!(Some(&21), tsn, "should match"); 129 130 // check if sorting applied 131 let ok = pq.push(make_payload(19, 0), 0); 132 assert!(ok, "should be true"); 133 let tsn = pq.get_last_tsn_received(); 134 assert!(tsn.is_some(), "should be false"); 135 assert_eq!(Some(&21), tsn, "should match"); 136 137 Ok(()) 138 } 139 140 #[test] 141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> { 142 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 143 144 for i in 0..3 { 145 pq.push(make_payload(i + 1, 10), 0); 146 } 147 pq.mark_as_acked(2); 148 pq.mark_all_to_retrasmit(); 149 150 let c = pq.get(1); 151 assert!(c.is_some(), "should be true"); 152 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 153 let c = pq.get(2); 154 assert!(c.is_some(), "should be true"); 155 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 156 let c = pq.get(3); 157 assert!(c.is_some(), "should be true"); 158 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 159 160 Ok(()) 161 } 162 163 #[test] 164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> { 165 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 166 167 for i in 0..4 { 168 pq.push(make_payload(i + 1, 10), 0); 169 } 170 171 pq.mark_all_to_retrasmit(); 172 pq.mark_as_acked(2); // should cancel retransmission for TSN 2 173 pq.mark_as_acked(4); // should cancel retransmission for TSN 4 174 175 let c = pq.get(1); 176 assert!(c.is_some(), "should be true"); 177 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 178 let c = pq.get(2); 179 assert!(c.is_some(), "should be true"); 180 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 181 let c = pq.get(3); 182 assert!(c.is_some(), "should be true"); 183 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 184 let c = pq.get(4); 185 assert!(c.is_some(), "should be true"); 186 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 187 188 Ok(()) 189 } 190 191 /////////////////////////////////////////////////////////////////// 192 //pending_queue_test 193 /////////////////////////////////////////////////////////////////// 194 use super::pending_queue::*; 195 196 const NO_FRAGMENT: usize = 0; 197 const FRAG_BEGIN: usize = 1; 198 const FRAG_MIDDLE: usize = 2; 199 const FRAG_END: usize = 3; 200 201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData { 202 let mut b = false; 203 let mut e = false; 204 205 match frag { 206 NO_FRAGMENT => { 207 b = true; 208 e = true; 209 } 210 FRAG_BEGIN => { 211 b = true; 212 } 213 FRAG_END => e = true, 214 _ => {} 215 }; 216 217 ChunkPayloadData { 218 tsn, 219 unordered, 220 beginning_fragment: b, 221 ending_fragment: e, 222 user_data: { 223 let mut b = BytesMut::new(); 224 b.resize(10, 0); // always 10 bytes 225 b.freeze() 226 }, 227 ..Default::default() 228 } 229 } 230 231 #[test] 232 fn test_pending_base_queue_push_and_pop() -> Result<()> { 233 let mut pq = PendingBaseQueue::new(); 234 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 235 pq.push_back(make_data_chunk(1, false, NO_FRAGMENT)); 236 pq.push_back(make_data_chunk(2, false, NO_FRAGMENT)); 237 238 for i in 0..3 { 239 let c = pq.get(i); 240 assert!(c.is_some(), "should not be none"); 241 assert_eq!(i as u32, c.unwrap().tsn, "TSN should match"); 242 } 243 244 for i in 0..3 { 245 let c = pq.pop_front(); 246 assert!(c.is_some(), "should not be none"); 247 assert_eq!(i, c.unwrap().tsn, "TSN should match"); 248 } 249 250 pq.push_back(make_data_chunk(3, false, NO_FRAGMENT)); 251 pq.push_back(make_data_chunk(4, false, NO_FRAGMENT)); 252 253 for i in 3..5 { 254 let c = pq.pop_front(); 255 assert!(c.is_some(), "should not be none"); 256 assert_eq!(i, c.unwrap().tsn, "TSN should match"); 257 } 258 Ok(()) 259 } 260 261 #[test] 262 fn test_pending_base_queue_out_of_bounce() -> Result<()> { 263 let mut pq = PendingBaseQueue::new(); 264 assert!(pq.pop_front().is_none(), "should be none"); 265 assert!(pq.get(0).is_none(), "should be none"); 266 267 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 268 assert!(pq.get(1).is_none(), "should be none"); 269 270 Ok(()) 271 } 272 273 // NOTE: TSN is not used in pendingQueue in the actual usage. 274 // Following tests use TSN field as a chunk ID. 275 #[test] 276 fn test_pending_queue_push_and_pop() -> Result<()> { 277 let pq = PendingQueue::new(); 278 pq.push(make_data_chunk(0, false, NO_FRAGMENT)); 279 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 280 pq.push(make_data_chunk(1, false, NO_FRAGMENT)); 281 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 282 pq.push(make_data_chunk(2, false, NO_FRAGMENT)); 283 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 284 285 for i in 0..3 { 286 let c = pq.peek(); 287 assert!(c.is_some(), "peek error"); 288 let c = c.unwrap(); 289 assert_eq!(i, c.tsn, "TSN should match"); 290 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 291 292 let result = pq.pop(beginning_fragment, unordered); 293 assert!(result.is_some(), "should not error: {}", i); 294 } 295 296 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 297 298 pq.push(make_data_chunk(3, false, NO_FRAGMENT)); 299 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 300 pq.push(make_data_chunk(4, false, NO_FRAGMENT)); 301 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 302 303 for i in 3..5 { 304 let c = pq.peek(); 305 assert!(c.is_some(), "peek error"); 306 let c = c.unwrap(); 307 assert_eq!(i, c.tsn, "TSN should match"); 308 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 309 310 let result = pq.pop(beginning_fragment, unordered); 311 assert!(result.is_some(), "should not error: {}", i); 312 } 313 314 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 315 316 Ok(()) 317 } 318 319 #[test] 320 fn test_pending_queue_unordered_wins() -> Result<()> { 321 let pq = PendingQueue::new(); 322 323 pq.push(make_data_chunk(0, false, NO_FRAGMENT)); 324 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 325 pq.push(make_data_chunk(1, true, NO_FRAGMENT)); 326 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 327 pq.push(make_data_chunk(2, false, NO_FRAGMENT)); 328 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 329 pq.push(make_data_chunk(3, true, NO_FRAGMENT)); 330 assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch"); 331 332 let c = pq.peek(); 333 assert!(c.is_some(), "peek error"); 334 let c = c.unwrap(); 335 assert_eq!(1, c.tsn, "TSN should match"); 336 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 337 let result = pq.pop(beginning_fragment, unordered); 338 assert!(result.is_some(), "should not error"); 339 340 let c = pq.peek(); 341 assert!(c.is_some(), "peek error"); 342 let c = c.unwrap(); 343 assert_eq!(3, c.tsn, "TSN should match"); 344 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 345 let result = pq.pop(beginning_fragment, unordered); 346 assert!(result.is_some(), "should not error"); 347 348 let c = pq.peek(); 349 assert!(c.is_some(), "peek error"); 350 let c = c.unwrap(); 351 assert_eq!(0, c.tsn, "TSN should match"); 352 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 353 let result = pq.pop(beginning_fragment, unordered); 354 assert!(result.is_some(), "should not error"); 355 356 let c = pq.peek(); 357 assert!(c.is_some(), "peek error"); 358 let c = c.unwrap(); 359 assert_eq!(2, c.tsn, "TSN should match"); 360 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 361 let result = pq.pop(beginning_fragment, unordered); 362 assert!(result.is_some(), "should not error"); 363 364 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 365 366 Ok(()) 367 } 368 369 #[test] 370 fn test_pending_queue_fragments() -> Result<()> { 371 let pq = PendingQueue::new(); 372 pq.push(make_data_chunk(0, false, FRAG_BEGIN)); 373 pq.push(make_data_chunk(1, false, FRAG_MIDDLE)); 374 pq.push(make_data_chunk(2, false, FRAG_END)); 375 pq.push(make_data_chunk(3, true, FRAG_BEGIN)); 376 pq.push(make_data_chunk(4, true, FRAG_MIDDLE)); 377 pq.push(make_data_chunk(5, true, FRAG_END)); 378 379 let expects = vec![3, 4, 5, 0, 1, 2]; 380 381 for exp in expects { 382 let c = pq.peek(); 383 assert!(c.is_some(), "peek error"); 384 let c = c.unwrap(); 385 assert_eq!(exp, c.tsn, "TSN should match"); 386 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 387 let result = pq.pop(beginning_fragment, unordered); 388 assert!(result.is_some(), "should not error: {}", exp); 389 } 390 391 Ok(()) 392 } 393 394 // Once decided ordered or unordered, the decision should persist until 395 // it pops a chunk with ending_fragment flags set to true. 396 #[test] 397 fn test_pending_queue_selection_persistence() -> Result<()> { 398 let pq = PendingQueue::new(); 399 pq.push(make_data_chunk(0, false, FRAG_BEGIN)); 400 401 let c = pq.peek(); 402 assert!(c.is_some(), "peek error"); 403 let c = c.unwrap(); 404 assert_eq!(0, c.tsn, "TSN should match"); 405 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 406 let result = pq.pop(beginning_fragment, unordered); 407 assert!(result.is_some(), "should not error: {}", 0); 408 409 pq.push(make_data_chunk(1, true, NO_FRAGMENT)); 410 pq.push(make_data_chunk(2, false, FRAG_MIDDLE)); 411 pq.push(make_data_chunk(3, false, FRAG_END)); 412 413 let expects = vec![2, 3, 1]; 414 415 for exp in expects { 416 let c = pq.peek(); 417 assert!(c.is_some(), "peek error"); 418 let c = c.unwrap(); 419 assert_eq!(exp, c.tsn, "TSN should match"); 420 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 421 let result = pq.pop(beginning_fragment, unordered); 422 assert!(result.is_some(), "should not error: {}", exp); 423 } 424 425 Ok(()) 426 } 427 428 #[tokio::test] 429 async fn test_pending_queue_append() -> Result<()> { 430 let pq = PendingQueue::new(); 431 pq.append(vec![ 432 make_data_chunk(0, false, NO_FRAGMENT), 433 make_data_chunk(1, false, NO_FRAGMENT), 434 make_data_chunk(3, false, NO_FRAGMENT), 435 ]); 436 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 437 assert_eq!(3, pq.len(), "len mismatch"); 438 439 Ok(()) 440 } 441 442 /////////////////////////////////////////////////////////////////// 443 //reassembly_queue_test 444 /////////////////////////////////////////////////////////////////// 445 use super::reassembly_queue::*; 446 use std::sync::atomic::AtomicUsize; 447 use std::sync::Arc; 448 449 #[test] 450 fn test_reassembly_queue_ordered_fragments() -> Result<()> { 451 let mut rq = ReassemblyQueue::new(0); 452 453 let org_ppi = PayloadProtocolIdentifier::Binary; 454 455 let chunk = ChunkPayloadData { 456 payload_type: org_ppi, 457 beginning_fragment: true, 458 tsn: 1, 459 stream_sequence_number: 0, 460 user_data: Bytes::from_static(b"ABC"), 461 ..Default::default() 462 }; 463 464 let complete = rq.push(chunk); 465 assert!(!complete, "chunk set should not be complete yet"); 466 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 467 468 let chunk = ChunkPayloadData { 469 payload_type: org_ppi, 470 ending_fragment: true, 471 tsn: 2, 472 stream_sequence_number: 0, 473 user_data: Bytes::from_static(b"DEFG"), 474 ..Default::default() 475 }; 476 477 let complete = rq.push(chunk); 478 assert!(complete, "chunk set should be complete"); 479 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch"); 480 481 let mut buf = vec![0u8; 16]; 482 483 let (n, ppi) = rq.read(&mut buf)?; 484 assert_eq!(7, n, "should received 7 bytes"); 485 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 486 assert_eq!(ppi, org_ppi, "should have valid ppi"); 487 assert_eq!(&buf[..n], b"ABCDEFG", "data should match"); 488 489 Ok(()) 490 } 491 492 #[test] 493 fn test_reassembly_queue_unordered_fragments() -> Result<()> { 494 let mut rq = ReassemblyQueue::new(0); 495 496 let org_ppi = PayloadProtocolIdentifier::Binary; 497 498 let chunk = ChunkPayloadData { 499 payload_type: org_ppi, 500 unordered: true, 501 beginning_fragment: true, 502 tsn: 1, 503 stream_sequence_number: 0, 504 user_data: Bytes::from_static(b"ABC"), 505 ..Default::default() 506 }; 507 508 let complete = rq.push(chunk); 509 assert!(!complete, "chunk set should not be complete yet"); 510 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 511 512 let chunk = ChunkPayloadData { 513 payload_type: org_ppi, 514 unordered: true, 515 tsn: 2, 516 stream_sequence_number: 0, 517 user_data: Bytes::from_static(b"DEFG"), 518 ..Default::default() 519 }; 520 521 let complete = rq.push(chunk); 522 assert!(!complete, "chunk set should not be complete yet"); 523 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch"); 524 525 let chunk = ChunkPayloadData { 526 payload_type: org_ppi, 527 unordered: true, 528 ending_fragment: true, 529 tsn: 3, 530 stream_sequence_number: 0, 531 user_data: Bytes::from_static(b"H"), 532 ..Default::default() 533 }; 534 535 let complete = rq.push(chunk); 536 assert!(complete, "chunk set should be complete"); 537 assert_eq!(8, rq.get_num_bytes(), "num bytes mismatch"); 538 539 let mut buf = vec![0u8; 16]; 540 541 let (n, ppi) = rq.read(&mut buf)?; 542 assert_eq!(8, n, "should received 8 bytes"); 543 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 544 assert_eq!(ppi, org_ppi, "should have valid ppi"); 545 assert_eq!(&buf[..n], b"ABCDEFGH", "data should match"); 546 547 Ok(()) 548 } 549 550 #[test] 551 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> { 552 let mut rq = ReassemblyQueue::new(0); 553 let org_ppi = PayloadProtocolIdentifier::Binary; 554 let chunk = ChunkPayloadData { 555 payload_type: org_ppi, 556 beginning_fragment: true, 557 ending_fragment: true, 558 tsn: 1, 559 stream_sequence_number: 0, 560 user_data: Bytes::from_static(b"ABC"), 561 ..Default::default() 562 }; 563 564 let complete = rq.push(chunk); 565 assert!(complete, "chunk set should be complete"); 566 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 567 568 let chunk = ChunkPayloadData { 569 payload_type: org_ppi, 570 unordered: true, 571 beginning_fragment: true, 572 ending_fragment: true, 573 tsn: 2, 574 stream_sequence_number: 1, 575 user_data: Bytes::from_static(b"DEF"), 576 ..Default::default() 577 }; 578 579 let complete = rq.push(chunk); 580 assert!(complete, "chunk set should be complete"); 581 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 582 583 // 584 // Now we have two complete chunks ready to read in the reassemblyQueue. 585 // 586 587 let mut buf = vec![0u8; 16]; 588 589 // Should read unordered chunks first 590 let (n, ppi) = rq.read(&mut buf)?; 591 assert_eq!(3, n, "should received 3 bytes"); 592 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 593 assert_eq!(ppi, org_ppi, "should have valid ppi"); 594 assert_eq!(&buf[..n], b"DEF", "data should match"); 595 596 // Next should read ordered chunks 597 let (n, ppi) = rq.read(&mut buf)?; 598 assert_eq!(3, n, "should received 3 bytes"); 599 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 600 assert_eq!(ppi, org_ppi, "should have valid ppi"); 601 assert_eq!(&buf[..n], b"ABC", "data should match"); 602 603 Ok(()) 604 } 605 606 #[test] 607 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> { 608 let mut rq = ReassemblyQueue::new(0); 609 610 let org_ppi = PayloadProtocolIdentifier::Binary; 611 612 let chunk = ChunkPayloadData { 613 payload_type: org_ppi, 614 unordered: true, 615 beginning_fragment: true, 616 tsn: 10, 617 stream_sequence_number: 0, 618 user_data: Bytes::from_static(b"IN"), 619 ..Default::default() 620 }; 621 622 let complete = rq.push(chunk); 623 assert!(!complete, "chunk set should not be complete yet"); 624 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 625 626 let chunk = ChunkPayloadData { 627 payload_type: org_ppi, 628 unordered: true, 629 ending_fragment: true, 630 tsn: 12, // <- incongiguous 631 stream_sequence_number: 1, 632 user_data: Bytes::from_static(b"COMPLETE"), 633 ..Default::default() 634 }; 635 636 let complete = rq.push(chunk); 637 assert!(!complete, "chunk set should not be complete yet"); 638 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 639 640 let chunk = ChunkPayloadData { 641 payload_type: org_ppi, 642 unordered: true, 643 beginning_fragment: true, 644 ending_fragment: true, 645 tsn: 13, 646 stream_sequence_number: 1, 647 user_data: Bytes::from_static(b"GOOD"), 648 ..Default::default() 649 }; 650 651 let complete = rq.push(chunk); 652 assert!(complete, "chunk set should be complete"); 653 assert_eq!(14, rq.get_num_bytes(), "num bytes mismatch"); 654 655 // 656 // Now we have two complete chunks ready to read in the reassemblyQueue. 657 // 658 659 let mut buf = vec![0u8; 16]; 660 661 // Should pick the one that has "GOOD" 662 let (n, ppi) = rq.read(&mut buf)?; 663 assert_eq!(4, n, "should receive 4 bytes"); 664 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 665 assert_eq!(ppi, org_ppi, "should have valid ppi"); 666 assert_eq!(&buf[..n], b"GOOD", "data should match"); 667 668 Ok(()) 669 } 670 671 #[test] 672 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> { 673 let mut rq = ReassemblyQueue::new(123); 674 675 let org_ppi = PayloadProtocolIdentifier::Binary; 676 677 let chunk = ChunkPayloadData { 678 payload_type: org_ppi, 679 stream_identifier: 124, 680 beginning_fragment: true, 681 ending_fragment: true, 682 tsn: 10, 683 stream_sequence_number: 0, 684 user_data: Bytes::from_static(b"IN"), 685 ..Default::default() 686 }; 687 688 let complete = rq.push(chunk); 689 assert!(!complete, "chunk should be ignored"); 690 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 691 Ok(()) 692 } 693 694 #[test] 695 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> { 696 let mut rq = ReassemblyQueue::new(0); 697 rq.next_ssn = 7; // forcibly set expected SSN to 7 698 699 let org_ppi = PayloadProtocolIdentifier::Binary; 700 701 let chunk = ChunkPayloadData { 702 payload_type: org_ppi, 703 beginning_fragment: true, 704 ending_fragment: true, 705 tsn: 10, 706 stream_sequence_number: 6, // <-- stale 707 user_data: Bytes::from_static(b"IN"), 708 ..Default::default() 709 }; 710 711 let complete = rq.push(chunk); 712 assert!(!complete, "chunk should not be ignored"); 713 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 714 715 Ok(()) 716 } 717 718 #[test] 719 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> { 720 let mut rq = ReassemblyQueue::new(0); 721 722 let org_ppi = PayloadProtocolIdentifier::Binary; 723 724 let chunk = ChunkPayloadData { 725 payload_type: org_ppi, 726 beginning_fragment: true, 727 tsn: 123, 728 stream_sequence_number: 0, 729 user_data: Bytes::from_static(b"IN"), 730 ..Default::default() 731 }; 732 733 let complete = rq.push(chunk); 734 assert!(!complete, "the set should not be complete"); 735 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 736 737 let mut buf = vec![0u8; 16]; 738 let result = rq.read(&mut buf); 739 assert!(result.is_err(), "read() should not succeed"); 740 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 741 742 Ok(()) 743 } 744 745 #[test] 746 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> { 747 let mut rq = ReassemblyQueue::new(0); 748 749 let org_ppi = PayloadProtocolIdentifier::Binary; 750 751 let chunk = ChunkPayloadData { 752 payload_type: org_ppi, 753 beginning_fragment: true, 754 ending_fragment: true, 755 tsn: 123, 756 stream_sequence_number: 1, 757 user_data: Bytes::from_static(b"IN"), 758 ..Default::default() 759 }; 760 761 let complete = rq.push(chunk); 762 assert!(complete, "the set should be complete"); 763 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 764 765 let mut buf = vec![0u8; 16]; 766 let result = rq.read(&mut buf); 767 assert!(result.is_err(), "read() should not succeed"); 768 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 769 770 Ok(()) 771 } 772 773 #[test] 774 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> { 775 let mut rq = ReassemblyQueue::new(0); 776 777 let org_ppi = PayloadProtocolIdentifier::Binary; 778 779 let chunk = ChunkPayloadData { 780 payload_type: org_ppi, 781 beginning_fragment: true, 782 ending_fragment: true, 783 tsn: 123, 784 stream_sequence_number: 0, 785 user_data: Bytes::from_static(b"0123456789"), 786 ..Default::default() 787 }; 788 789 let complete = rq.push(chunk); 790 assert!(complete, "the set should be complete"); 791 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 792 793 let mut buf = vec![0u8; 8]; // <- passing buffer too short 794 let result = rq.read(&mut buf); 795 assert!(result.is_err(), "read() should not succeed"); 796 if let Err(err) = result { 797 assert_eq!(Error::ErrShortBuffer, err, "read() should not succeed"); 798 } 799 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 800 801 Ok(()) 802 } 803 804 #[test] 805 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> { 806 let mut rq = ReassemblyQueue::new(0); 807 808 let org_ppi = PayloadProtocolIdentifier::Binary; 809 810 let ssn_complete = 5u16; 811 let ssn_dropped = 6u16; 812 813 let chunk = ChunkPayloadData { 814 payload_type: org_ppi, 815 beginning_fragment: true, 816 ending_fragment: true, 817 tsn: 10, 818 stream_sequence_number: ssn_complete, 819 user_data: Bytes::from_static(b"123"), 820 ..Default::default() 821 }; 822 823 let complete = rq.push(chunk); 824 assert!(complete, "chunk set should be complete"); 825 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 826 827 let chunk = ChunkPayloadData { 828 payload_type: org_ppi, 829 beginning_fragment: true, 830 tsn: 11, 831 stream_sequence_number: ssn_dropped, 832 user_data: Bytes::from_static(b"ABC"), 833 ..Default::default() 834 }; 835 836 let complete = rq.push(chunk); 837 assert!(!complete, "chunk set should not be complete yet"); 838 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 839 840 let chunk = ChunkPayloadData { 841 payload_type: org_ppi, 842 tsn: 12, 843 stream_sequence_number: ssn_dropped, 844 user_data: Bytes::from_static(b"DEF"), 845 ..Default::default() 846 }; 847 848 let complete = rq.push(chunk); 849 assert!(!complete, "chunk set should not be complete yet"); 850 assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch"); 851 852 rq.forward_tsn_for_ordered(ssn_dropped); 853 854 assert_eq!(1, rq.ordered.len(), "there should be one chunk left"); 855 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 856 857 Ok(()) 858 } 859 860 #[test] 861 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> { 862 let mut rq = ReassemblyQueue::new(0); 863 864 let org_ppi = PayloadProtocolIdentifier::Binary; 865 866 let ssn_dropped = 6u16; 867 let ssn_kept = 7u16; 868 869 let chunk = ChunkPayloadData { 870 payload_type: org_ppi, 871 unordered: true, 872 beginning_fragment: true, 873 tsn: 11, 874 stream_sequence_number: ssn_dropped, 875 user_data: Bytes::from_static(b"ABC"), 876 ..Default::default() 877 }; 878 879 let complete = rq.push(chunk); 880 assert!(!complete, "chunk set should not be complete yet"); 881 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 882 883 let chunk = ChunkPayloadData { 884 payload_type: org_ppi, 885 unordered: true, 886 tsn: 12, 887 stream_sequence_number: ssn_dropped, 888 user_data: Bytes::from_static(b"DEF"), 889 ..Default::default() 890 }; 891 892 let complete = rq.push(chunk); 893 assert!(!complete, "chunk set should not be complete yet"); 894 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 895 896 let chunk = ChunkPayloadData { 897 payload_type: org_ppi, 898 unordered: true, 899 tsn: 14, 900 beginning_fragment: true, 901 stream_sequence_number: ssn_kept, 902 user_data: Bytes::from_static(b"SOS"), 903 ..Default::default() 904 }; 905 906 let complete = rq.push(chunk); 907 assert!(!complete, "chunk set should not be complete yet"); 908 assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch"); 909 910 // At this point, there are 3 chunks in the rq.unorderedChunks. 911 // This call should remove chunks with tsn equals to 13 or older. 912 rq.forward_tsn_for_unordered(13); 913 914 // As a result, there should be one chunk (tsn=14) 915 assert_eq!( 916 1, 917 rq.unordered_chunks.len(), 918 "there should be one chunk kept" 919 ); 920 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 921 922 Ok(()) 923 } 924 925 #[test] 926 fn test_chunk_set_empty_chunk_set() -> Result<()> { 927 let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 928 assert!(!cset.is_complete(), "empty chunkSet cannot be complete"); 929 Ok(()) 930 } 931 932 #[test] 933 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> { 934 let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 935 cset.push(ChunkPayloadData { 936 tsn: 100, 937 beginning_fragment: true, 938 ..Default::default() 939 }); 940 let complete = cset.push(ChunkPayloadData { 941 tsn: 100, 942 ending_fragment: true, 943 ..Default::default() 944 }); 945 assert!(!complete, "chunk with dup TSN is not complete"); 946 assert_eq!(1, cset.chunks.len(), "chunk with dup TSN should be ignored"); 947 Ok(()) 948 } 949 950 #[test] 951 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> { 952 let cset = ChunkSet { 953 ssn: 0, 954 ppi: PayloadProtocolIdentifier::default(), 955 chunks: vec![], 956 }; 957 assert!( 958 !cset.is_complete(), 959 "chunkSet not starting with B=1 cannot be complete" 960 ); 961 Ok(()) 962 } 963 964 #[test] 965 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> { 966 let cset = ChunkSet { 967 ssn: 0, 968 ppi: PayloadProtocolIdentifier::default(), 969 chunks: vec![ 970 ChunkPayloadData { 971 tsn: 100, 972 beginning_fragment: true, 973 ..Default::default() 974 }, 975 ChunkPayloadData { 976 tsn: 101, 977 ..Default::default() 978 }, 979 ChunkPayloadData { 980 tsn: 103, 981 ending_fragment: true, 982 ..Default::default() 983 }, 984 ], 985 }; 986 assert!( 987 !cset.is_complete(), 988 "chunkSet not starting with incontiguous tsn cannot be complete" 989 ); 990 Ok(()) 991 } 992