1 use crate::error::{Error, Result}; 2 3 use bytes::{Bytes, BytesMut}; 4 5 /////////////////////////////////////////////////////////////////// 6 //payload_queue_test 7 /////////////////////////////////////////////////////////////////// 8 use super::payload_queue::*; 9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier}; 10 use crate::chunk::chunk_selective_ack::GapAckBlock; 11 12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData { 13 ChunkPayloadData { 14 tsn, 15 user_data: { 16 let mut b = BytesMut::new(); 17 b.resize(n_bytes, 0); 18 b.freeze() 19 }, 20 ..Default::default() 21 } 22 } 23 24 #[test] 25 fn test_payload_queue_push_no_check() -> Result<()> { 26 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 27 28 pq.push_no_check(make_payload(0, 10)); 29 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch"); 30 assert_eq!(pq.len(), 1, "item count mismatch"); 31 pq.push_no_check(make_payload(1, 11)); 32 assert_eq!(pq.get_num_bytes(), 21, "total bytes mismatch"); 33 assert_eq!(pq.len(), 2, "item count mismatch"); 34 pq.push_no_check(make_payload(2, 12)); 35 assert_eq!(pq.get_num_bytes(), 33, "total bytes mismatch"); 36 assert_eq!(pq.len(), 3, "item count mismatch"); 37 38 for i in 0..3 { 39 assert!(!pq.sorted.is_empty(), "should not be empty"); 40 let c = pq.pop(i); 41 assert!(c.is_some(), "pop should succeed"); 42 if let Some(c) = c { 43 assert_eq!(c.tsn, i, "TSN should match"); 44 } 45 } 46 47 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch"); 48 assert_eq!(pq.len(), 0, "item count mismatch"); 49 50 assert!(pq.sorted.is_empty(), "should be empty"); 51 pq.push_no_check(make_payload(3, 13)); 52 assert_eq!(pq.get_num_bytes(), 13, "total bytes mismatch"); 53 pq.push_no_check(make_payload(4, 14)); 54 assert_eq!(pq.get_num_bytes(), 27, "total bytes mismatch"); 55 56 for i in 3..5 { 57 assert!(!pq.sorted.is_empty(), "should not be empty"); 58 let c = pq.pop(i); 59 assert!(c.is_some(), "pop should succeed"); 60 if let Some(c) = c { 61 assert_eq!(c.tsn, i, "TSN should match"); 62 } 63 } 64 65 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch"); 66 assert_eq!(pq.len(), 0, "item count mismatch"); 67 68 Ok(()) 69 } 70 71 #[test] 72 fn test_payload_queue_get_gap_ack_block() -> Result<()> { 73 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 74 75 pq.push(make_payload(1, 0), 0); 76 pq.push(make_payload(2, 0), 0); 77 pq.push(make_payload(3, 0), 0); 78 pq.push(make_payload(4, 0), 0); 79 pq.push(make_payload(5, 0), 0); 80 pq.push(make_payload(6, 0), 0); 81 82 let gab1 = vec![GapAckBlock { start: 1, end: 6 }]; 83 let gab2 = pq.get_gap_ack_blocks(0); 84 assert!(!gab2.is_empty()); 85 assert_eq!(gab2.len(), 1); 86 87 assert_eq!(gab2[0].start, gab1[0].start); 88 assert_eq!(gab2[0].end, gab1[0].end); 89 90 pq.push(make_payload(8, 0), 0); 91 pq.push(make_payload(9, 0), 0); 92 93 let gab1 = vec![ 94 GapAckBlock { start: 1, end: 6 }, 95 GapAckBlock { start: 8, end: 9 }, 96 ]; 97 let gab2 = pq.get_gap_ack_blocks(0); 98 assert!(!gab2.is_empty()); 99 assert_eq!(gab2.len(), 2); 100 101 assert_eq!(gab2[0].start, gab1[0].start); 102 assert_eq!(gab2[0].end, gab1[0].end); 103 assert_eq!(gab2[1].start, gab1[1].start); 104 assert_eq!(gab2[1].end, gab1[1].end); 105 106 Ok(()) 107 } 108 109 #[test] 110 fn test_payload_queue_get_last_tsn_received() -> Result<()> { 111 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 112 113 // empty queie should return false 114 let ok = pq.get_last_tsn_received(); 115 assert!(ok.is_none(), "should be none"); 116 117 let ok = pq.push(make_payload(20, 0), 0); 118 assert!(ok, "should be true"); 119 let tsn = pq.get_last_tsn_received(); 120 assert!(tsn.is_some(), "should be false"); 121 assert_eq!(tsn, Some(&20), "should match"); 122 123 // append should work 124 let ok = pq.push(make_payload(21, 0), 0); 125 assert!(ok, "should be true"); 126 let tsn = pq.get_last_tsn_received(); 127 assert!(tsn.is_some(), "should be false"); 128 assert_eq!(tsn, Some(&21), "should match"); 129 130 // check if sorting applied 131 let ok = pq.push(make_payload(19, 0), 0); 132 assert!(ok, "should be true"); 133 let tsn = pq.get_last_tsn_received(); 134 assert!(tsn.is_some(), "should be false"); 135 assert_eq!(tsn, Some(&21), "should match"); 136 137 Ok(()) 138 } 139 140 #[test] 141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> { 142 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 143 144 for i in 0..3 { 145 pq.push(make_payload(i + 1, 10), 0); 146 } 147 pq.mark_as_acked(2); 148 pq.mark_all_to_retrasmit(); 149 150 let c = pq.get(1); 151 assert!(c.is_some(), "should be true"); 152 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 153 let c = pq.get(2); 154 assert!(c.is_some(), "should be true"); 155 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 156 let c = pq.get(3); 157 assert!(c.is_some(), "should be true"); 158 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 159 160 Ok(()) 161 } 162 163 #[test] 164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> { 165 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0))); 166 167 for i in 0..4 { 168 pq.push(make_payload(i + 1, 10), 0); 169 } 170 171 pq.mark_all_to_retrasmit(); 172 pq.mark_as_acked(2); // should cancel retransmission for TSN 2 173 pq.mark_as_acked(4); // should cancel retransmission for TSN 4 174 175 let c = pq.get(1); 176 assert!(c.is_some(), "should be true"); 177 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 178 let c = pq.get(2); 179 assert!(c.is_some(), "should be true"); 180 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 181 let c = pq.get(3); 182 assert!(c.is_some(), "should be true"); 183 assert!(c.unwrap().retransmit, "should be marked as retransmit"); 184 let c = pq.get(4); 185 assert!(c.is_some(), "should be true"); 186 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit"); 187 188 Ok(()) 189 } 190 191 /////////////////////////////////////////////////////////////////// 192 //pending_queue_test 193 /////////////////////////////////////////////////////////////////// 194 use super::pending_queue::*; 195 196 const NO_FRAGMENT: usize = 0; 197 const FRAG_BEGIN: usize = 1; 198 const FRAG_MIDDLE: usize = 2; 199 const FRAG_END: usize = 3; 200 201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData { 202 let mut b = false; 203 let mut e = false; 204 205 match frag { 206 NO_FRAGMENT => { 207 b = true; 208 e = true; 209 } 210 FRAG_BEGIN => { 211 b = true; 212 } 213 FRAG_END => e = true, 214 _ => {} 215 }; 216 217 ChunkPayloadData { 218 tsn, 219 unordered, 220 beginning_fragment: b, 221 ending_fragment: e, 222 user_data: { 223 let mut b = BytesMut::new(); 224 b.resize(10, 0); // always 10 bytes 225 b.freeze() 226 }, 227 ..Default::default() 228 } 229 } 230 231 #[test] 232 fn test_pending_base_queue_push_and_pop() -> Result<()> { 233 let mut pq = PendingBaseQueue::new(); 234 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 235 pq.push_back(make_data_chunk(1, false, NO_FRAGMENT)); 236 pq.push_back(make_data_chunk(2, false, NO_FRAGMENT)); 237 238 for i in 0..3 { 239 let c = pq.get(i); 240 assert!(c.is_some(), "should not be none"); 241 assert_eq!(c.unwrap().tsn, i as u32, "TSN should match"); 242 } 243 244 for i in 0..3 { 245 let c = pq.pop_front(); 246 assert!(c.is_some(), "should not be none"); 247 assert_eq!(c.unwrap().tsn, i, "TSN should match"); 248 } 249 250 pq.push_back(make_data_chunk(3, false, NO_FRAGMENT)); 251 pq.push_back(make_data_chunk(4, false, NO_FRAGMENT)); 252 253 for i in 3..5 { 254 let c = pq.pop_front(); 255 assert!(c.is_some(), "should not be none"); 256 assert_eq!(c.unwrap().tsn, i, "TSN should match"); 257 } 258 Ok(()) 259 } 260 261 #[test] 262 fn test_pending_base_queue_out_of_bounce() -> Result<()> { 263 let mut pq = PendingBaseQueue::new(); 264 assert!(pq.pop_front().is_none(), "should be none"); 265 assert!(pq.get(0).is_none(), "should be none"); 266 267 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT)); 268 assert!(pq.get(1).is_none(), "should be none"); 269 270 Ok(()) 271 } 272 273 // NOTE: TSN is not used in pendingQueue in the actual usage. 274 // Following tests use TSN field as a chunk ID. 275 #[tokio::test] 276 async fn test_pending_queue_push_and_pop() -> Result<()> { 277 let pq = PendingQueue::new(); 278 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await; 279 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch"); 280 pq.push(make_data_chunk(1, false, NO_FRAGMENT)).await; 281 assert_eq!(pq.get_num_bytes(), 20, "total bytes mismatch"); 282 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await; 283 assert_eq!(pq.get_num_bytes(), 30, "total bytes mismatch"); 284 285 for i in 0..3 { 286 let c = pq.peek(); 287 assert!(c.is_some(), "peek error"); 288 let c = c.unwrap(); 289 assert_eq!(c.tsn, i, "TSN should match"); 290 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 291 292 let result = pq.pop(beginning_fragment, unordered); 293 assert!(result.is_some(), "should not error: {i}"); 294 } 295 296 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch"); 297 298 pq.push(make_data_chunk(3, false, NO_FRAGMENT)).await; 299 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch"); 300 pq.push(make_data_chunk(4, false, NO_FRAGMENT)).await; 301 assert_eq!(pq.get_num_bytes(), 20, "total bytes mismatch"); 302 303 for i in 3..5 { 304 let c = pq.peek(); 305 assert!(c.is_some(), "peek error"); 306 let c = c.unwrap(); 307 assert_eq!(c.tsn, i, "TSN should match"); 308 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 309 310 let result = pq.pop(beginning_fragment, unordered); 311 assert!(result.is_some(), "should not error: {i}"); 312 } 313 314 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch"); 315 316 Ok(()) 317 } 318 319 #[tokio::test] 320 async fn test_pending_queue_unordered_wins() -> Result<()> { 321 let pq = PendingQueue::new(); 322 323 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await; 324 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch"); 325 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await; 326 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch"); 327 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await; 328 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch"); 329 pq.push(make_data_chunk(3, true, NO_FRAGMENT)).await; 330 assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch"); 331 332 let c = pq.peek(); 333 assert!(c.is_some(), "peek error"); 334 let c = c.unwrap(); 335 assert_eq!(c.tsn, 1, "TSN should match"); 336 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 337 let result = pq.pop(beginning_fragment, unordered); 338 assert!(result.is_some(), "should not error"); 339 340 let c = pq.peek(); 341 assert!(c.is_some(), "peek error"); 342 let c = c.unwrap(); 343 assert_eq!(c.tsn, 3, "TSN should match"); 344 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 345 let result = pq.pop(beginning_fragment, unordered); 346 assert!(result.is_some(), "should not error"); 347 348 let c = pq.peek(); 349 assert!(c.is_some(), "peek error"); 350 let c = c.unwrap(); 351 assert_eq!(c.tsn, 0, "TSN should match"); 352 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 353 let result = pq.pop(beginning_fragment, unordered); 354 assert!(result.is_some(), "should not error"); 355 356 let c = pq.peek(); 357 assert!(c.is_some(), "peek error"); 358 let c = c.unwrap(); 359 assert_eq!(c.tsn, 2, "TSN should match"); 360 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 361 let result = pq.pop(beginning_fragment, unordered); 362 assert!(result.is_some(), "should not error"); 363 364 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch"); 365 366 Ok(()) 367 } 368 369 #[tokio::test] 370 async fn test_pending_queue_fragments() -> Result<()> { 371 let pq = PendingQueue::new(); 372 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await; 373 pq.push(make_data_chunk(1, false, FRAG_MIDDLE)).await; 374 pq.push(make_data_chunk(2, false, FRAG_END)).await; 375 pq.push(make_data_chunk(3, true, FRAG_BEGIN)).await; 376 pq.push(make_data_chunk(4, true, FRAG_MIDDLE)).await; 377 pq.push(make_data_chunk(5, true, FRAG_END)).await; 378 379 let expects = vec![3, 4, 5, 0, 1, 2]; 380 381 for exp in expects { 382 let c = pq.peek(); 383 assert!(c.is_some(), "peek error"); 384 let c = c.unwrap(); 385 assert_eq!(c.tsn, exp, "TSN should match"); 386 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 387 let result = pq.pop(beginning_fragment, unordered); 388 assert!(result.is_some(), "should not error: {exp}"); 389 } 390 391 Ok(()) 392 } 393 394 // Once decided ordered or unordered, the decision should persist until 395 // it pops a chunk with ending_fragment flags set to true. 396 #[tokio::test] 397 async fn test_pending_queue_selection_persistence() -> Result<()> { 398 let pq = PendingQueue::new(); 399 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await; 400 401 let c = pq.peek(); 402 assert!(c.is_some(), "peek error"); 403 let c = c.unwrap(); 404 assert_eq!(c.tsn, 0, "TSN should match"); 405 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 406 let result = pq.pop(beginning_fragment, unordered); 407 assert!(result.is_some(), "should not error: {}", 0); 408 409 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await; 410 pq.push(make_data_chunk(2, false, FRAG_MIDDLE)).await; 411 pq.push(make_data_chunk(3, false, FRAG_END)).await; 412 413 let expects = vec![2, 3, 1]; 414 415 for exp in expects { 416 let c = pq.peek(); 417 assert!(c.is_some(), "peek error"); 418 let c = c.unwrap(); 419 assert_eq!(c.tsn, exp, "TSN should match"); 420 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered); 421 let result = pq.pop(beginning_fragment, unordered); 422 assert!(result.is_some(), "should not error: {exp}"); 423 } 424 425 Ok(()) 426 } 427 428 #[tokio::test] 429 async fn test_pending_queue_append() -> Result<()> { 430 let pq = PendingQueue::new(); 431 pq.append(vec![ 432 make_data_chunk(0, false, NO_FRAGMENT), 433 make_data_chunk(1, false, NO_FRAGMENT), 434 make_data_chunk(3, false, NO_FRAGMENT), 435 ]) 436 .await; 437 assert_eq!(pq.get_num_bytes(), 30, "total bytes mismatch"); 438 assert_eq!(pq.len(), 3, "len mismatch"); 439 440 Ok(()) 441 } 442 443 /////////////////////////////////////////////////////////////////// 444 //reassembly_queue_test 445 /////////////////////////////////////////////////////////////////// 446 use super::reassembly_queue::*; 447 use std::sync::atomic::AtomicUsize; 448 use std::sync::Arc; 449 450 #[test] 451 fn test_reassembly_queue_ordered_fragments() -> Result<()> { 452 let mut rq = ReassemblyQueue::new(0); 453 454 let org_ppi = PayloadProtocolIdentifier::Binary; 455 456 let chunk = ChunkPayloadData { 457 payload_type: org_ppi, 458 beginning_fragment: true, 459 tsn: 1, 460 stream_sequence_number: 0, 461 user_data: Bytes::from_static(b"ABC"), 462 ..Default::default() 463 }; 464 465 let complete = rq.push(chunk); 466 assert!(!complete, "chunk set should not be complete yet"); 467 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 468 469 let chunk = ChunkPayloadData { 470 payload_type: org_ppi, 471 ending_fragment: true, 472 tsn: 2, 473 stream_sequence_number: 0, 474 user_data: Bytes::from_static(b"DEFG"), 475 ..Default::default() 476 }; 477 478 let complete = rq.push(chunk); 479 assert!(complete, "chunk set should be complete"); 480 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch"); 481 482 let mut buf = vec![0u8; 16]; 483 484 let (n, ppi) = rq.read(&mut buf)?; 485 assert_eq!(n, 7, "should received 7 bytes"); 486 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 487 assert_eq!(ppi, org_ppi, "should have valid ppi"); 488 assert_eq!(&buf[..n], b"ABCDEFG", "data should match"); 489 490 Ok(()) 491 } 492 493 #[test] 494 fn test_reassembly_queue_unordered_fragments() -> Result<()> { 495 let mut rq = ReassemblyQueue::new(0); 496 497 let org_ppi = PayloadProtocolIdentifier::Binary; 498 499 let chunk = ChunkPayloadData { 500 payload_type: org_ppi, 501 unordered: true, 502 beginning_fragment: true, 503 tsn: 1, 504 stream_sequence_number: 0, 505 user_data: Bytes::from_static(b"ABC"), 506 ..Default::default() 507 }; 508 509 let complete = rq.push(chunk); 510 assert!(!complete, "chunk set should not be complete yet"); 511 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 512 513 let chunk = ChunkPayloadData { 514 payload_type: org_ppi, 515 unordered: true, 516 tsn: 2, 517 stream_sequence_number: 0, 518 user_data: Bytes::from_static(b"DEFG"), 519 ..Default::default() 520 }; 521 522 let complete = rq.push(chunk); 523 assert!(!complete, "chunk set should not be complete yet"); 524 assert_eq!(rq.get_num_bytes(), 7, "num bytes mismatch"); 525 526 let chunk = ChunkPayloadData { 527 payload_type: org_ppi, 528 unordered: true, 529 ending_fragment: true, 530 tsn: 3, 531 stream_sequence_number: 0, 532 user_data: Bytes::from_static(b"H"), 533 ..Default::default() 534 }; 535 536 let complete = rq.push(chunk); 537 assert!(complete, "chunk set should be complete"); 538 assert_eq!(rq.get_num_bytes(), 8, "num bytes mismatch"); 539 540 let mut buf = vec![0u8; 16]; 541 542 let (n, ppi) = rq.read(&mut buf)?; 543 assert_eq!(n, 8, "should received 8 bytes"); 544 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 545 assert_eq!(ppi, org_ppi, "should have valid ppi"); 546 assert_eq!(&buf[..n], b"ABCDEFGH", "data should match"); 547 548 Ok(()) 549 } 550 551 #[test] 552 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> { 553 let mut rq = ReassemblyQueue::new(0); 554 let org_ppi = PayloadProtocolIdentifier::Binary; 555 let chunk = ChunkPayloadData { 556 payload_type: org_ppi, 557 beginning_fragment: true, 558 ending_fragment: true, 559 tsn: 1, 560 stream_sequence_number: 0, 561 user_data: Bytes::from_static(b"ABC"), 562 ..Default::default() 563 }; 564 565 let complete = rq.push(chunk); 566 assert!(complete, "chunk set should be complete"); 567 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 568 569 let chunk = ChunkPayloadData { 570 payload_type: org_ppi, 571 unordered: true, 572 beginning_fragment: true, 573 ending_fragment: true, 574 tsn: 2, 575 stream_sequence_number: 1, 576 user_data: Bytes::from_static(b"DEF"), 577 ..Default::default() 578 }; 579 580 let complete = rq.push(chunk); 581 assert!(complete, "chunk set should be complete"); 582 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch"); 583 584 // 585 // Now we have two complete chunks ready to read in the reassemblyQueue. 586 // 587 588 let mut buf = vec![0u8; 16]; 589 590 // Should read unordered chunks first 591 let (n, ppi) = rq.read(&mut buf)?; 592 assert_eq!(n, 3, "should received 3 bytes"); 593 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 594 assert_eq!(ppi, org_ppi, "should have valid ppi"); 595 assert_eq!(&buf[..n], b"DEF", "data should match"); 596 597 // Next should read ordered chunks 598 let (n, ppi) = rq.read(&mut buf)?; 599 assert_eq!(n, 3, "should received 3 bytes"); 600 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 601 assert_eq!(ppi, org_ppi, "should have valid ppi"); 602 assert_eq!(&buf[..n], b"ABC", "data should match"); 603 604 Ok(()) 605 } 606 607 #[test] 608 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> { 609 let mut rq = ReassemblyQueue::new(0); 610 611 let org_ppi = PayloadProtocolIdentifier::Binary; 612 613 let chunk = ChunkPayloadData { 614 payload_type: org_ppi, 615 unordered: true, 616 beginning_fragment: true, 617 tsn: 10, 618 stream_sequence_number: 0, 619 user_data: Bytes::from_static(b"IN"), 620 ..Default::default() 621 }; 622 623 let complete = rq.push(chunk); 624 assert!(!complete, "chunk set should not be complete yet"); 625 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch"); 626 627 let chunk = ChunkPayloadData { 628 payload_type: org_ppi, 629 unordered: true, 630 ending_fragment: true, 631 tsn: 12, // <- incongiguous 632 stream_sequence_number: 1, 633 user_data: Bytes::from_static(b"COMPLETE"), 634 ..Default::default() 635 }; 636 637 let complete = rq.push(chunk); 638 assert!(!complete, "chunk set should not be complete yet"); 639 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch"); 640 641 let chunk = ChunkPayloadData { 642 payload_type: org_ppi, 643 unordered: true, 644 beginning_fragment: true, 645 ending_fragment: true, 646 tsn: 13, 647 stream_sequence_number: 1, 648 user_data: Bytes::from_static(b"GOOD"), 649 ..Default::default() 650 }; 651 652 let complete = rq.push(chunk); 653 assert!(complete, "chunk set should be complete"); 654 assert_eq!(rq.get_num_bytes(), 14, "num bytes mismatch"); 655 656 // 657 // Now we have two complete chunks ready to read in the reassemblyQueue. 658 // 659 660 let mut buf = vec![0u8; 16]; 661 662 // Should pick the one that has "GOOD" 663 let (n, ppi) = rq.read(&mut buf)?; 664 assert_eq!(n, 4, "should receive 4 bytes"); 665 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch"); 666 assert_eq!(ppi, org_ppi, "should have valid ppi"); 667 assert_eq!(&buf[..n], b"GOOD", "data should match"); 668 669 Ok(()) 670 } 671 672 #[test] 673 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> { 674 let mut rq = ReassemblyQueue::new(123); 675 676 let org_ppi = PayloadProtocolIdentifier::Binary; 677 678 let chunk = ChunkPayloadData { 679 payload_type: org_ppi, 680 stream_identifier: 124, 681 beginning_fragment: true, 682 ending_fragment: true, 683 tsn: 10, 684 stream_sequence_number: 0, 685 user_data: Bytes::from_static(b"IN"), 686 ..Default::default() 687 }; 688 689 let complete = rq.push(chunk); 690 assert!(!complete, "chunk should be ignored"); 691 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 692 Ok(()) 693 } 694 695 #[test] 696 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> { 697 let mut rq = ReassemblyQueue::new(0); 698 rq.next_ssn = 7; // forcibly set expected SSN to 7 699 700 let org_ppi = PayloadProtocolIdentifier::Binary; 701 702 let chunk = ChunkPayloadData { 703 payload_type: org_ppi, 704 beginning_fragment: true, 705 ending_fragment: true, 706 tsn: 10, 707 stream_sequence_number: 6, // <-- stale 708 user_data: Bytes::from_static(b"IN"), 709 ..Default::default() 710 }; 711 712 let complete = rq.push(chunk); 713 assert!(!complete, "chunk should not be ignored"); 714 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 715 716 Ok(()) 717 } 718 719 #[test] 720 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> { 721 let mut rq = ReassemblyQueue::new(0); 722 723 let org_ppi = PayloadProtocolIdentifier::Binary; 724 725 let chunk = ChunkPayloadData { 726 payload_type: org_ppi, 727 beginning_fragment: true, 728 tsn: 123, 729 stream_sequence_number: 0, 730 user_data: Bytes::from_static(b"IN"), 731 ..Default::default() 732 }; 733 734 let complete = rq.push(chunk); 735 assert!(!complete, "the set should not be complete"); 736 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch"); 737 738 let mut buf = vec![0u8; 16]; 739 let result = rq.read(&mut buf); 740 assert!(result.is_err(), "read() should not succeed"); 741 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch"); 742 743 Ok(()) 744 } 745 746 #[test] 747 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> { 748 let mut rq = ReassemblyQueue::new(0); 749 750 let org_ppi = PayloadProtocolIdentifier::Binary; 751 752 let chunk = ChunkPayloadData { 753 payload_type: org_ppi, 754 beginning_fragment: true, 755 ending_fragment: true, 756 tsn: 123, 757 stream_sequence_number: 1, 758 user_data: Bytes::from_static(b"IN"), 759 ..Default::default() 760 }; 761 762 let complete = rq.push(chunk); 763 assert!(complete, "the set should be complete"); 764 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch"); 765 766 let mut buf = vec![0u8; 16]; 767 let result = rq.read(&mut buf); 768 assert!(result.is_err(), "read() should not succeed"); 769 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch"); 770 771 Ok(()) 772 } 773 774 #[test] 775 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> { 776 let mut rq = ReassemblyQueue::new(0); 777 778 let org_ppi = PayloadProtocolIdentifier::Binary; 779 780 let chunk = ChunkPayloadData { 781 payload_type: org_ppi, 782 beginning_fragment: true, 783 ending_fragment: true, 784 tsn: 123, 785 stream_sequence_number: 0, 786 user_data: Bytes::from_static(b"0123456789"), 787 ..Default::default() 788 }; 789 790 let complete = rq.push(chunk); 791 assert!(complete, "the set should be complete"); 792 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch"); 793 794 let mut buf = vec![0u8; 8]; // <- passing buffer too short 795 let result = rq.read(&mut buf); 796 assert!(result.is_err(), "read() should not succeed"); 797 if let Err(err) = result { 798 assert_eq!(err, Error::ErrShortBuffer, "read() should not succeed"); 799 } 800 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch"); 801 802 Ok(()) 803 } 804 805 #[test] 806 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> { 807 let mut rq = ReassemblyQueue::new(0); 808 809 let org_ppi = PayloadProtocolIdentifier::Binary; 810 811 let ssn_complete = 5u16; 812 let ssn_dropped = 6u16; 813 814 let chunk = ChunkPayloadData { 815 payload_type: org_ppi, 816 beginning_fragment: true, 817 ending_fragment: true, 818 tsn: 10, 819 stream_sequence_number: ssn_complete, 820 user_data: Bytes::from_static(b"123"), 821 ..Default::default() 822 }; 823 824 let complete = rq.push(chunk); 825 assert!(complete, "chunk set should be complete"); 826 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 827 828 let chunk = ChunkPayloadData { 829 payload_type: org_ppi, 830 beginning_fragment: true, 831 tsn: 11, 832 stream_sequence_number: ssn_dropped, 833 user_data: Bytes::from_static(b"ABC"), 834 ..Default::default() 835 }; 836 837 let complete = rq.push(chunk); 838 assert!(!complete, "chunk set should not be complete yet"); 839 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch"); 840 841 let chunk = ChunkPayloadData { 842 payload_type: org_ppi, 843 tsn: 12, 844 stream_sequence_number: ssn_dropped, 845 user_data: Bytes::from_static(b"DEF"), 846 ..Default::default() 847 }; 848 849 let complete = rq.push(chunk); 850 assert!(!complete, "chunk set should not be complete yet"); 851 assert_eq!(rq.get_num_bytes(), 9, "num bytes mismatch"); 852 853 rq.forward_tsn_for_ordered(ssn_dropped); 854 855 assert_eq!(rq.ordered.len(), 1, "there should be one chunk left"); 856 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 857 858 Ok(()) 859 } 860 861 #[test] 862 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> { 863 let mut rq = ReassemblyQueue::new(0); 864 865 let org_ppi = PayloadProtocolIdentifier::Binary; 866 867 let ssn_dropped = 6u16; 868 let ssn_kept = 7u16; 869 870 let chunk = ChunkPayloadData { 871 payload_type: org_ppi, 872 unordered: true, 873 beginning_fragment: true, 874 tsn: 11, 875 stream_sequence_number: ssn_dropped, 876 user_data: Bytes::from_static(b"ABC"), 877 ..Default::default() 878 }; 879 880 let complete = rq.push(chunk); 881 assert!(!complete, "chunk set should not be complete yet"); 882 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 883 884 let chunk = ChunkPayloadData { 885 payload_type: org_ppi, 886 unordered: true, 887 tsn: 12, 888 stream_sequence_number: ssn_dropped, 889 user_data: Bytes::from_static(b"DEF"), 890 ..Default::default() 891 }; 892 893 let complete = rq.push(chunk); 894 assert!(!complete, "chunk set should not be complete yet"); 895 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch"); 896 897 let chunk = ChunkPayloadData { 898 payload_type: org_ppi, 899 unordered: true, 900 tsn: 14, 901 beginning_fragment: true, 902 stream_sequence_number: ssn_kept, 903 user_data: Bytes::from_static(b"SOS"), 904 ..Default::default() 905 }; 906 907 let complete = rq.push(chunk); 908 assert!(!complete, "chunk set should not be complete yet"); 909 assert_eq!(rq.get_num_bytes(), 9, "num bytes mismatch"); 910 911 // At this point, there are 3 chunks in the rq.unorderedChunks. 912 // This call should remove chunks with tsn equals to 13 or older. 913 rq.forward_tsn_for_unordered(13); 914 915 // As a result, there should be one chunk (tsn=14) 916 assert_eq!( 917 rq.unordered_chunks.len(), 918 1, 919 "there should be one chunk kept" 920 ); 921 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch"); 922 923 Ok(()) 924 } 925 926 #[test] 927 fn test_chunk_set_empty_chunk_set() -> Result<()> { 928 let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 929 assert!(!cset.is_complete(), "empty chunkSet cannot be complete"); 930 Ok(()) 931 } 932 933 #[test] 934 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> { 935 let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default()); 936 cset.push(ChunkPayloadData { 937 tsn: 100, 938 beginning_fragment: true, 939 ..Default::default() 940 }); 941 let complete = cset.push(ChunkPayloadData { 942 tsn: 100, 943 ending_fragment: true, 944 ..Default::default() 945 }); 946 assert!(!complete, "chunk with dup TSN is not complete"); 947 assert_eq!(cset.chunks.len(), 1, "chunk with dup TSN should be ignored"); 948 Ok(()) 949 } 950 951 #[test] 952 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> { 953 let cset = ChunkSet { 954 ssn: 0, 955 ppi: PayloadProtocolIdentifier::default(), 956 chunks: vec![], 957 }; 958 assert!( 959 !cset.is_complete(), 960 "chunkSet not starting with B=1 cannot be complete" 961 ); 962 Ok(()) 963 } 964 965 #[test] 966 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> { 967 let cset = ChunkSet { 968 ssn: 0, 969 ppi: PayloadProtocolIdentifier::default(), 970 chunks: vec![ 971 ChunkPayloadData { 972 tsn: 100, 973 beginning_fragment: true, 974 ..Default::default() 975 }, 976 ChunkPayloadData { 977 tsn: 101, 978 ..Default::default() 979 }, 980 ChunkPayloadData { 981 tsn: 103, 982 ending_fragment: true, 983 ..Default::default() 984 }, 985 ], 986 }; 987 assert!( 988 !cset.is_complete(), 989 "chunkSet not starting with incontiguous tsn cannot be complete" 990 ); 991 Ok(()) 992 } 993