1 use crate::error::{Error, Result}; 2 3 use bytes::{Bytes, BytesMut}; 4 5 /////////////////////////////////////////////////////////////////// 6 //payload_queue_test 7 /////////////////////////////////////////////////////////////////// 8 use super::payload_queue::*; 9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier}; 10 use crate::chunk::chunk_selective_ack::GapAckBlock; 11 12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData { 13 ChunkPayloadData { 14 tsn, 15 user_data: { 16 let mut b = BytesMut::new(); 17 b.resize(n_bytes, 0); 18 b.freeze() 19 }, 20 ..Default::default() 21 } 22 } 23 24 #[test] 25 fn test_payload_queue_push_no_check() -> Result<()> { 26 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 27 28 pq.push_no_check(make_payload(0, 10)); 29 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 30 assert_eq!(1, pq.len(), "item count mismatch"); 31 pq.push_no_check(make_payload(1, 11)); 32 assert_eq!(21, pq.get_num_bytes(), "total bytes mismatch"); 33 assert_eq!(2, pq.len(), "item count mismatch"); 34 pq.push_no_check(make_payload(2, 12)); 35 assert_eq!(33, pq.get_num_bytes(), "total bytes mismatch"); 36 assert_eq!(3, pq.len(), "item count mismatch"); 37 38 for i in 0..3 { 39 assert!(!pq.sorted.is_empty(), "should not be empty"); 40 let c = pq.pop(i); 41 assert!(c.is_some(), "pop should succeed"); 42 if let Some(c) = c { 43 assert_eq!(i, c.tsn, "TSN should match"); 44 } 45 } 46 47 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 48 assert_eq!(0, pq.len(), "item count mismatch"); 49 50 assert!(pq.sorted.is_empty(), "should be empty"); 51 pq.push_no_check(make_payload(3, 13)); 52 assert_eq!(13, pq.get_num_bytes(), "total bytes mismatch"); 53 pq.push_no_check(make_payload(4, 14)); 54 assert_eq!(27, pq.get_num_bytes(), "total bytes mismatch"); 55 56 for i in 3..5 { 57 assert!(!pq.sorted.is_empty(), "should not be empty"); 58 let c = pq.pop(i); 59 assert!(c.is_some(), "pop should succeed"); 60 if let Some(c) = c { 61 assert_eq!(i, c.tsn, "TSN should match"); 62 } 63 } 64 65 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 66 assert_eq!(0, pq.len(), "item count mismatch"); 67 68 Ok(()) 69 } 70 71 #[test] 72 fn test_payload_queue_get_gap_ack_block() -> Result<()> { 73 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 74 75 pq.push(make_payload(1, 0), 0); 76 pq.push(make_payload(2, 0), 0); 77 pq.push(make_payload(3, 0), 0); 78 pq.push(make_payload(4, 0), 0); 79 pq.push(make_payload(5, 0), 0); 80 pq.push(make_payload(6, 0), 0); 81 82 let gab1 = vec![GapAckBlock { start: 1, end: 6 }]; 83 let gab2 = pq.get_gap_ack_blocks(0); 84 assert!(!gab2.is_empty()); 85 assert_eq!(gab2.len(), 1); 86 87 assert_eq!(gab1[0].start, gab2[0].start); 88 assert_eq!(gab1[0].end, gab2[0].end); 89 90 pq.push(make_payload(8, 0), 0); 91 pq.push(make_payload(9, 0), 0); 92 93 let gab1 = vec![ 94 GapAckBlock { start: 1, end: 6 }, 95 GapAckBlock { start: 8, end: 9 }, 96 ]; 97 let gab2 = pq.get_gap_ack_blocks(0); 98 assert!(!gab2.is_empty()); 99 assert_eq!(gab2.len(), 2); 100 101 assert_eq!(gab1[0].start, gab2[0].start); 102 assert_eq!(gab1[0].end, gab2[0].end); 103 assert_eq!(gab1[1].start, gab2[1].start); 104 assert_eq!(gab1[1].end, gab2[1].end); 105 106 Ok(()) 107 } 108 109 #[test] 110 fn test_payload_queue_get_last_tsn_received() -> Result<()> { 111 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 112 113 // empty queie should return false 114 let ok = pq.get_last_tsn_received(); 115 assert!(ok.is_none(), "should be none"); 116 117 let ok = pq.push(make_payload(20, 0), 0); 118 assert!(ok, "should be true"); 119 let tsn = pq.get_last_tsn_received(); 120 assert!(tsn.is_some(), "should be false"); 121 assert_eq!(Some(&20), tsn, "should match"); 122 123 // append should work 124 let ok = pq.push(make_payload(21, 0), 0); 125 assert!(ok, "should be true"); 126 let tsn = pq.get_last_tsn_received(); 127 assert!(tsn.is_some(), "should be false"); 128 assert_eq!(Some(&21), tsn, "should match"); 129 130 // check if sorting applied 131 let ok = pq.push(make_payload(19, 0), 0); 132 assert!(ok, "should be true"); 133 let tsn = pq.get_last_tsn_received(); 134 assert!(tsn.is_some(), "should be false"); 135 assert_eq!(Some(&21), tsn, "should match"); 136 137 Ok(()) 138 } 139 140 #[test] 141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> { 142 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 143 144 for i in 0..3 { 145 pq.push(make_payload(i + 1, 10), 0); 146 } 147 pq.mark_as_acked(2); 148 pq.mark_all_to_retrasmit(); 149 150 let c = pq.get(1); 151 assert!(c.is_some(), "should be true"); 152 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 153 let c = pq.get(2); 154 assert!(c.is_some(), "should be true"); 155 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 156 let c = pq.get(3); 157 assert!(c.is_some(), "should be true"); 158 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 159 160 Ok(()) 161 } 162 163 #[test] 164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> { 165 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 166 167 for i in 0..4 { 168 pq.push(make_payload(i + 1, 10), 0); 169 } 170 171 pq.mark_all_to_retrasmit(); 172 pq.mark_as_acked(2); // should cancel retransmission for TSN 2 173 pq.mark_as_acked(4); // should cancel retransmission for TSN 4 174 175 let c = pq.get(1); 176 assert!(c.is_some(), "should be true"); 177 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 178 let c = pq.get(2); 179 assert!(c.is_some(), "should be true"); 180 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 181 let c = pq.get(3); 182 assert!(c.is_some(), "should be true"); 183 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 184 let c = pq.get(4); 185 assert!(c.is_some(), "should be true"); 186 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 187 188 Ok(()) 189 } 190 191 /////////////////////////////////////////////////////////////////// 192 //pending_queue_test 193 /////////////////////////////////////////////////////////////////// 194 use super::pending_queue::*; 195 196 const NO_FRAGMENT: usize = 0; 197 const FRAG_BEGIN: usize = 1; 198 const FRAG_MIDDLE: usize = 2; 199 const FRAG_END: usize = 3; 200 201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData { 202 let mut b = false; 203 let mut e = false; 204 205 match frag { 206 NO_FRAGMENT => { 207 b = true; 208 e = true; 209 } 210 FRAG_BEGIN => { 211 b = true; 212 } 213 FRAG_END => e = true, 214 _ => {} 215 }; 216 217 ChunkPayloadData { 218 tsn, 219 unordered, 220 beginning_fragment: b, 221 ending_fragment: e, 222 user_data: { 223 let mut b = BytesMut::new(); 224 b.resize(10, 0); // always 10 bytes 225 b.freeze() 226 }, 227 ..Default::default() 228 } 229 } 230 231 #[test] 232 fn test_pending_base_queue_push_and_pop() -> Result<()> { 233 let mut pq = PendingBaseQueue::new(); 234 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 235 pq.push_back(make_data_chunk(1, false, NO_FRAGMENT)); 236 pq.push_back(make_data_chunk(2, false, NO_FRAGMENT)); 237 238 for i in 0..3 { 239 let c = pq.get(i); 240 assert!(c.is_some(), "should not be none"); 241 assert_eq!(i as u32, c.unwrap().tsn, "TSN should match"); 242 } 243 244 for i in 0..3 { 245 let c = pq.pop_front(); 246 assert!(c.is_some(), "should not be none"); 247 assert_eq!(i, c.unwrap().tsn, "TSN should match"); 248 } 249 250 pq.push_back(make_data_chunk(3, false, NO_FRAGMENT)); 251 pq.push_back(make_data_chunk(4, false, NO_FRAGMENT)); 252 253 for i in 3..5 { 254 let c = pq.pop_front(); 255 assert!(c.is_some(), "should not be none"); 256 assert_eq!(i, c.unwrap().tsn, "TSN should match"); 257 } 258 Ok(()) 259 } 260 261 #[test] 262 fn test_pending_base_queue_out_of_bounce() -> Result<()> { 263 let mut pq = PendingBaseQueue::new(); 264 assert!(pq.pop_front().is_none(), "should be none"); 265 assert!(pq.get(0).is_none(), "should be none"); 266 267 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 268 assert!(pq.get(1).is_none(), "should be none"); 269 270 Ok(()) 271 } 272 273 // NOTE: TSN is not used in pendingQueue in the actual usage. 274 // Following tests use TSN field as a chunk ID. 275 #[tokio::test] 276 async fn test_pending_queue_push_and_pop() -> Result<()> { 277 let pq = PendingQueue::new(); 278 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await; 279 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 280 pq.push(make_data_chunk(1, false, NO_FRAGMENT)).await; 281 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 282 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await; 283 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 284 285 for i in 0..3 { 286 let c = pq.peek().await; 287 assert!(c.is_some(), "peek error"); 288 let c = c.unwrap(); 289 assert_eq!(i, c.tsn, "TSN should match"); 290 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 291 292 let result = pq.pop(beginning_fragment, unordered).await; 293 assert!(result.is_some(), "should not error: {}", i); 294 } 295 296 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 297 298 pq.push(make_data_chunk(3, false, NO_FRAGMENT)).await; 299 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 300 pq.push(make_data_chunk(4, false, NO_FRAGMENT)).await; 301 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 302 303 for i in 3..5 { 304 let c = pq.peek().await; 305 assert!(c.is_some(), "peek error"); 306 let c = c.unwrap(); 307 assert_eq!(i, c.tsn, "TSN should match"); 308 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 309 310 let result = pq.pop(beginning_fragment, unordered).await; 311 assert!(result.is_some(), "should not error: {}", i); 312 } 313 314 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 315 316 Ok(()) 317 } 318 319 #[tokio::test] 320 async fn test_pending_queue_unordered_wins() -> Result<()> { 321 let pq = PendingQueue::new(); 322 323 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await; 324 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 325 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await; 326 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 327 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await; 328 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 329 pq.push(make_data_chunk(3, true, NO_FRAGMENT)).await; 330 assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch"); 331 332 let c = pq.peek().await; 333 assert!(c.is_some(), "peek error"); 334 let c = c.unwrap(); 335 assert_eq!(1, c.tsn, "TSN should match"); 336 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 337 let result = pq.pop(beginning_fragment, unordered).await; 338 assert!(result.is_some(), "should not error"); 339 340 let c = pq.peek().await; 341 assert!(c.is_some(), "peek error"); 342 let c = c.unwrap(); 343 assert_eq!(3, c.tsn, "TSN should match"); 344 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 345 let result = pq.pop(beginning_fragment, unordered).await; 346 assert!(result.is_some(), "should not error"); 347 348 let c = pq.peek().await; 349 assert!(c.is_some(), "peek error"); 350 let c = c.unwrap(); 351 assert_eq!(0, c.tsn, "TSN should match"); 352 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 353 let result = pq.pop(beginning_fragment, unordered).await; 354 assert!(result.is_some(), "should not error"); 355 356 let c = pq.peek().await; 357 assert!(c.is_some(), "peek error"); 358 let c = c.unwrap(); 359 assert_eq!(2, c.tsn, "TSN should match"); 360 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 361 let result = pq.pop(beginning_fragment, unordered).await; 362 assert!(result.is_some(), "should not error"); 363 364 assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch"); 365 366 Ok(()) 367 } 368 369 #[tokio::test] 370 async fn test_pending_queue_fragments() -> Result<()> { 371 let pq = PendingQueue::new(); 372 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await; 373 pq.push(make_data_chunk(1, false, FRAG_MIDDLE)).await; 374 pq.push(make_data_chunk(2, false, FRAG_END)).await; 375 pq.push(make_data_chunk(3, true, FRAG_BEGIN)).await; 376 pq.push(make_data_chunk(4, true, FRAG_MIDDLE)).await; 377 pq.push(make_data_chunk(5, true, FRAG_END)).await; 378 379 let expects = vec![3, 4, 5, 0, 1, 2]; 380 381 for exp in expects { 382 let c = pq.peek().await; 383 assert!(c.is_some(), "peek error"); 384 let c = c.unwrap(); 385 assert_eq!(exp, c.tsn, "TSN should match"); 386 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 387 let result = pq.pop(beginning_fragment, unordered).await; 388 assert!(result.is_some(), "should not error: {}", exp); 389 } 390 391 Ok(()) 392 } 393 394 // Once decided ordered or unordered, the decision should persist until 395 // it pops a chunk with ending_fragment flags set to true. 396 #[tokio::test] 397 async fn test_pending_queue_selection_persistence() -> Result<()> { 398 let pq = PendingQueue::new(); 399 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await; 400 401 let c = pq.peek().await; 402 assert!(c.is_some(), "peek error"); 403 let c = c.unwrap(); 404 assert_eq!(0, c.tsn, "TSN should match"); 405 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 406 let result = pq.pop(beginning_fragment, unordered).await; 407 assert!(result.is_some(), "should not error: {}", 0); 408 409 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await; 410 pq.push(make_data_chunk(2, false, FRAG_MIDDLE)).await; 411 pq.push(make_data_chunk(3, false, FRAG_END)).await; 412 413 let expects = vec![2, 3, 1]; 414 415 for exp in expects { 416 let c = pq.peek().await; 417 assert!(c.is_some(), "peek error"); 418 let c = c.unwrap(); 419 assert_eq!(exp, c.tsn, "TSN should match"); 420 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 421 let result = pq.pop(beginning_fragment, unordered).await; 422 assert!(result.is_some(), "should not error: {}", exp); 423 } 424 425 Ok(()) 426 } 427 428 /////////////////////////////////////////////////////////////////// 429 //reassembly_queue_test 430 /////////////////////////////////////////////////////////////////// 431 use super::reassembly_queue::*; 432 use std::sync::atomic::AtomicUsize; 433 use std::sync::Arc; 434 435 #[test] 436 fn test_reassembly_queue_ordered_fragments() -> Result<()> { 437 let mut rq = ReassemblyQueue::new(0); 438 439 let org_ppi = PayloadProtocolIdentifier::Binary; 440 441 let chunk = ChunkPayloadData { 442 payload_type: org_ppi, 443 beginning_fragment: true, 444 tsn: 1, 445 stream_sequence_number: 0, 446 user_data: Bytes::from_static(b"ABC"), 447 ..Default::default() 448 }; 449 450 let complete = rq.push(chunk); 451 assert!(!complete, "chunk set should not be complete yet"); 452 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 453 454 let chunk = ChunkPayloadData { 455 payload_type: org_ppi, 456 ending_fragment: true, 457 tsn: 2, 458 stream_sequence_number: 0, 459 user_data: Bytes::from_static(b"DEFG"), 460 ..Default::default() 461 }; 462 463 let complete = rq.push(chunk); 464 assert!(complete, "chunk set should be complete"); 465 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch"); 466 467 let mut buf = vec![0u8; 16]; 468 469 let (n, ppi) = rq.read(&mut buf)?; 470 assert_eq!(7, n, "should received 7 bytes"); 471 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 472 assert_eq!(ppi, org_ppi, "should have valid ppi"); 473 assert_eq!(&buf[..n], b"ABCDEFG", "data should match"); 474 475 Ok(()) 476 } 477 478 #[test] 479 fn test_reassembly_queue_unordered_fragments() -> Result<()> { 480 let mut rq = ReassemblyQueue::new(0); 481 482 let org_ppi = PayloadProtocolIdentifier::Binary; 483 484 let chunk = ChunkPayloadData { 485 payload_type: org_ppi, 486 unordered: true, 487 beginning_fragment: true, 488 tsn: 1, 489 stream_sequence_number: 0, 490 user_data: Bytes::from_static(b"ABC"), 491 ..Default::default() 492 }; 493 494 let complete = rq.push(chunk); 495 assert!(!complete, "chunk set should not be complete yet"); 496 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 497 498 let chunk = ChunkPayloadData { 499 payload_type: org_ppi, 500 unordered: true, 501 tsn: 2, 502 stream_sequence_number: 0, 503 user_data: Bytes::from_static(b"DEFG"), 504 ..Default::default() 505 }; 506 507 let complete = rq.push(chunk); 508 assert!(!complete, "chunk set should not be complete yet"); 509 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch"); 510 511 let chunk = ChunkPayloadData { 512 payload_type: org_ppi, 513 unordered: true, 514 ending_fragment: true, 515 tsn: 3, 516 stream_sequence_number: 0, 517 user_data: Bytes::from_static(b"H"), 518 ..Default::default() 519 }; 520 521 let complete = rq.push(chunk); 522 assert!(complete, "chunk set should be complete"); 523 assert_eq!(8, rq.get_num_bytes(), "num bytes mismatch"); 524 525 let mut buf = vec![0u8; 16]; 526 527 let (n, ppi) = rq.read(&mut buf)?; 528 assert_eq!(8, n, "should received 8 bytes"); 529 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 530 assert_eq!(ppi, org_ppi, "should have valid ppi"); 531 assert_eq!(&buf[..n], b"ABCDEFGH", "data should match"); 532 533 Ok(()) 534 } 535 536 #[test] 537 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> { 538 let mut rq = ReassemblyQueue::new(0); 539 let org_ppi = PayloadProtocolIdentifier::Binary; 540 let chunk = ChunkPayloadData { 541 payload_type: org_ppi, 542 beginning_fragment: true, 543 ending_fragment: true, 544 tsn: 1, 545 stream_sequence_number: 0, 546 user_data: Bytes::from_static(b"ABC"), 547 ..Default::default() 548 }; 549 550 let complete = rq.push(chunk); 551 assert!(complete, "chunk set should be complete"); 552 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 553 554 let chunk = ChunkPayloadData { 555 payload_type: org_ppi, 556 unordered: true, 557 beginning_fragment: true, 558 ending_fragment: true, 559 tsn: 2, 560 stream_sequence_number: 1, 561 user_data: Bytes::from_static(b"DEF"), 562 ..Default::default() 563 }; 564 565 let complete = rq.push(chunk); 566 assert!(complete, "chunk set should be complete"); 567 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 568 569 // 570 // Now we have two complete chunks ready to read in the reassemblyQueue. 571 // 572 573 let mut buf = vec![0u8; 16]; 574 575 // Should read unordered chunks first 576 let (n, ppi) = rq.read(&mut buf)?; 577 assert_eq!(3, n, "should received 3 bytes"); 578 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 579 assert_eq!(ppi, org_ppi, "should have valid ppi"); 580 assert_eq!(&buf[..n], b"DEF", "data should match"); 581 582 // Next should read ordered chunks 583 let (n, ppi) = rq.read(&mut buf)?; 584 assert_eq!(3, n, "should received 3 bytes"); 585 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 586 assert_eq!(ppi, org_ppi, "should have valid ppi"); 587 assert_eq!(&buf[..n], b"ABC", "data should match"); 588 589 Ok(()) 590 } 591 592 #[test] 593 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> { 594 let mut rq = ReassemblyQueue::new(0); 595 596 let org_ppi = PayloadProtocolIdentifier::Binary; 597 598 let chunk = ChunkPayloadData { 599 payload_type: org_ppi, 600 unordered: true, 601 beginning_fragment: true, 602 tsn: 10, 603 stream_sequence_number: 0, 604 user_data: Bytes::from_static(b"IN"), 605 ..Default::default() 606 }; 607 608 let complete = rq.push(chunk); 609 assert!(!complete, "chunk set should not be complete yet"); 610 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 611 612 let chunk = ChunkPayloadData { 613 payload_type: org_ppi, 614 unordered: true, 615 ending_fragment: true, 616 tsn: 12, // <- incongiguous 617 stream_sequence_number: 1, 618 user_data: Bytes::from_static(b"COMPLETE"), 619 ..Default::default() 620 }; 621 622 let complete = rq.push(chunk); 623 assert!(!complete, "chunk set should not be complete yet"); 624 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 625 626 let chunk = ChunkPayloadData { 627 payload_type: org_ppi, 628 unordered: true, 629 beginning_fragment: true, 630 ending_fragment: true, 631 tsn: 13, 632 stream_sequence_number: 1, 633 user_data: Bytes::from_static(b"GOOD"), 634 ..Default::default() 635 }; 636 637 let complete = rq.push(chunk); 638 assert!(complete, "chunk set should be complete"); 639 assert_eq!(14, rq.get_num_bytes(), "num bytes mismatch"); 640 641 // 642 // Now we have two complete chunks ready to read in the reassemblyQueue. 643 // 644 645 let mut buf = vec![0u8; 16]; 646 647 // Should pick the one that has "GOOD" 648 let (n, ppi) = rq.read(&mut buf)?; 649 assert_eq!(4, n, "should receive 4 bytes"); 650 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 651 assert_eq!(ppi, org_ppi, "should have valid ppi"); 652 assert_eq!(&buf[..n], b"GOOD", "data should match"); 653 654 Ok(()) 655 } 656 657 #[test] 658 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> { 659 let mut rq = ReassemblyQueue::new(123); 660 661 let org_ppi = PayloadProtocolIdentifier::Binary; 662 663 let chunk = ChunkPayloadData { 664 payload_type: org_ppi, 665 stream_identifier: 124, 666 beginning_fragment: true, 667 ending_fragment: true, 668 tsn: 10, 669 stream_sequence_number: 0, 670 user_data: Bytes::from_static(b"IN"), 671 ..Default::default() 672 }; 673 674 let complete = rq.push(chunk); 675 assert!(!complete, "chunk should be ignored"); 676 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 677 Ok(()) 678 } 679 680 #[test] 681 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> { 682 let mut rq = ReassemblyQueue::new(0); 683 rq.next_ssn = 7; // forcibly set expected SSN to 7 684 685 let org_ppi = PayloadProtocolIdentifier::Binary; 686 687 let chunk = ChunkPayloadData { 688 payload_type: org_ppi, 689 beginning_fragment: true, 690 ending_fragment: true, 691 tsn: 10, 692 stream_sequence_number: 6, // <-- stale 693 user_data: Bytes::from_static(b"IN"), 694 ..Default::default() 695 }; 696 697 let complete = rq.push(chunk); 698 assert!(!complete, "chunk should not be ignored"); 699 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 700 701 Ok(()) 702 } 703 704 #[test] 705 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> { 706 let mut rq = ReassemblyQueue::new(0); 707 708 let org_ppi = PayloadProtocolIdentifier::Binary; 709 710 let chunk = ChunkPayloadData { 711 payload_type: org_ppi, 712 beginning_fragment: true, 713 tsn: 123, 714 stream_sequence_number: 0, 715 user_data: Bytes::from_static(b"IN"), 716 ..Default::default() 717 }; 718 719 let complete = rq.push(chunk); 720 assert!(!complete, "the set should not be complete"); 721 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 722 723 let mut buf = vec![0u8; 16]; 724 let result = rq.read(&mut buf); 725 assert!(result.is_err(), "read() should not succeed"); 726 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 727 728 Ok(()) 729 } 730 731 #[test] 732 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> { 733 let mut rq = ReassemblyQueue::new(0); 734 735 let org_ppi = PayloadProtocolIdentifier::Binary; 736 737 let chunk = ChunkPayloadData { 738 payload_type: org_ppi, 739 beginning_fragment: true, 740 ending_fragment: true, 741 tsn: 123, 742 stream_sequence_number: 1, 743 user_data: Bytes::from_static(b"IN"), 744 ..Default::default() 745 }; 746 747 let complete = rq.push(chunk); 748 assert!(complete, "the set should be complete"); 749 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 750 751 let mut buf = vec![0u8; 16]; 752 let result = rq.read(&mut buf); 753 assert!(result.is_err(), "read() should not succeed"); 754 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 755 756 Ok(()) 757 } 758 759 #[test] 760 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> { 761 let mut rq = ReassemblyQueue::new(0); 762 763 let org_ppi = PayloadProtocolIdentifier::Binary; 764 765 let chunk = ChunkPayloadData { 766 payload_type: org_ppi, 767 beginning_fragment: true, 768 ending_fragment: true, 769 tsn: 123, 770 stream_sequence_number: 0, 771 user_data: Bytes::from_static(b"0123456789"), 772 ..Default::default() 773 }; 774 775 let complete = rq.push(chunk); 776 assert!(complete, "the set should be complete"); 777 assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch"); 778 779 let mut buf = vec![0u8; 8]; // <- passing buffer too short 780 let result = rq.read(&mut buf); 781 assert!(result.is_err(), "read() should not succeed"); 782 if let Err(err) = result { 783 assert_eq!(Error::ErrShortBuffer, err, "read() should not succeed"); 784 } 785 assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch"); 786 787 Ok(()) 788 } 789 790 #[test] 791 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> { 792 let mut rq = ReassemblyQueue::new(0); 793 794 let org_ppi = PayloadProtocolIdentifier::Binary; 795 796 let ssn_complete = 5u16; 797 let ssn_dropped = 6u16; 798 799 let chunk = ChunkPayloadData { 800 payload_type: org_ppi, 801 beginning_fragment: true, 802 ending_fragment: true, 803 tsn: 10, 804 stream_sequence_number: ssn_complete, 805 user_data: Bytes::from_static(b"123"), 806 ..Default::default() 807 }; 808 809 let complete = rq.push(chunk); 810 assert!(complete, "chunk set should be complete"); 811 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 812 813 let chunk = ChunkPayloadData { 814 payload_type: org_ppi, 815 beginning_fragment: true, 816 tsn: 11, 817 stream_sequence_number: ssn_dropped, 818 user_data: Bytes::from_static(b"ABC"), 819 ..Default::default() 820 }; 821 822 let complete = rq.push(chunk); 823 assert!(!complete, "chunk set should not be complete yet"); 824 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 825 826 let chunk = ChunkPayloadData { 827 payload_type: org_ppi, 828 tsn: 12, 829 stream_sequence_number: ssn_dropped, 830 user_data: Bytes::from_static(b"DEF"), 831 ..Default::default() 832 }; 833 834 let complete = rq.push(chunk); 835 assert!(!complete, "chunk set should not be complete yet"); 836 assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch"); 837 838 rq.forward_tsn_for_ordered(ssn_dropped); 839 840 assert_eq!(1, rq.ordered.len(), "there should be one chunk left"); 841 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 842 843 Ok(()) 844 } 845 846 #[test] 847 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> { 848 let mut rq = ReassemblyQueue::new(0); 849 850 let org_ppi = PayloadProtocolIdentifier::Binary; 851 852 let ssn_dropped = 6u16; 853 let ssn_kept = 7u16; 854 855 let chunk = ChunkPayloadData { 856 payload_type: org_ppi, 857 unordered: true, 858 beginning_fragment: true, 859 tsn: 11, 860 stream_sequence_number: ssn_dropped, 861 user_data: Bytes::from_static(b"ABC"), 862 ..Default::default() 863 }; 864 865 let complete = rq.push(chunk); 866 assert!(!complete, "chunk set should not be complete yet"); 867 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 868 869 let chunk = ChunkPayloadData { 870 payload_type: org_ppi, 871 unordered: true, 872 tsn: 12, 873 stream_sequence_number: ssn_dropped, 874 user_data: Bytes::from_static(b"DEF"), 875 ..Default::default() 876 }; 877 878 let complete = rq.push(chunk); 879 assert!(!complete, "chunk set should not be complete yet"); 880 assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch"); 881 882 let chunk = ChunkPayloadData { 883 payload_type: org_ppi, 884 unordered: true, 885 tsn: 14, 886 beginning_fragment: true, 887 stream_sequence_number: ssn_kept, 888 user_data: Bytes::from_static(b"SOS"), 889 ..Default::default() 890 }; 891 892 let complete = rq.push(chunk); 893 assert!(!complete, "chunk set should not be complete yet"); 894 assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch"); 895 896 // At this point, there are 3 chunks in the rq.unorderedChunks. 897 // This call should remove chunks with tsn equals to 13 or older. 898 rq.forward_tsn_for_unordered(13); 899 900 // As a result, there should be one chunk (tsn=14) 901 assert_eq!( 902 1, 903 rq.unordered_chunks.len(), 904 "there should be one chunk kept" 905 ); 906 assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch"); 907 908 Ok(()) 909 } 910 911 #[test] 912 fn test_chunk_set_empty_chunk_set() -> Result<()> { 913 let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 914 assert!(!cset.is_complete(), "empty chunkSet cannot be complete"); 915 Ok(()) 916 } 917 918 #[test] 919 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> { 920 let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 921 cset.push(ChunkPayloadData { 922 tsn: 100, 923 beginning_fragment: true, 924 ..Default::default() 925 }); 926 let complete = cset.push(ChunkPayloadData { 927 tsn: 100, 928 ending_fragment: true, 929 ..Default::default() 930 }); 931 assert!(!complete, "chunk with dup TSN is not complete"); 932 assert_eq!(1, cset.chunks.len(), "chunk with dup TSN should be ignored"); 933 Ok(()) 934 } 935 936 #[test] 937 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> { 938 let cset = ChunkSet { 939 ssn: 0, 940 ppi: PayloadProtocolIdentifier::default(), 941 chunks: vec![], 942 }; 943 assert!( 944 !cset.is_complete(), 945 "chunkSet not starting with B=1 cannot be complete" 946 ); 947 Ok(()) 948 } 949 950 #[test] 951 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> { 952 let cset = ChunkSet { 953 ssn: 0, 954 ppi: PayloadProtocolIdentifier::default(), 955 chunks: vec![ 956 ChunkPayloadData { 957 tsn: 100, 958 beginning_fragment: true, 959 ..Default::default() 960 }, 961 ChunkPayloadData { 962 tsn: 101, 963 ..Default::default() 964 }, 965 ChunkPayloadData { 966 tsn: 103, 967 ending_fragment: true, 968 ..Default::default() 969 }, 970 ], 971 }; 972 assert!( 973 !cset.is_complete(), 974 "chunkSet not starting with incontiguous tsn cannot be complete" 975 ); 976 Ok(()) 977 } 978