1 use crate::error::{Error, Result};
2
3 use bytes::{Bytes, BytesMut};
4
5 ///////////////////////////////////////////////////////////////////
6 //payload_queue_test
7 ///////////////////////////////////////////////////////////////////
8 use super::payload_queue::*;
9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
10 use crate::chunk::chunk_selective_ack::GapAckBlock;
11
make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData {
13 ChunkPayloadData {
14 tsn,
15 user_data: {
16 let mut b = BytesMut::new();
17 b.resize(n_bytes, 0);
18 b.freeze()
19 },
20 ..Default::default()
21 }
22 }
23
24 #[test]
test_payload_queue_push_no_check() -> Result<()>25 fn test_payload_queue_push_no_check() -> Result<()> {
26 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
27
28 pq.push_no_check(make_payload(0, 10));
29 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch");
30 assert_eq!(pq.len(), 1, "item count mismatch");
31 pq.push_no_check(make_payload(1, 11));
32 assert_eq!(pq.get_num_bytes(), 21, "total bytes mismatch");
33 assert_eq!(pq.len(), 2, "item count mismatch");
34 pq.push_no_check(make_payload(2, 12));
35 assert_eq!(pq.get_num_bytes(), 33, "total bytes mismatch");
36 assert_eq!(pq.len(), 3, "item count mismatch");
37
38 for i in 0..3 {
39 assert!(!pq.sorted.is_empty(), "should not be empty");
40 let c = pq.pop(i);
41 assert!(c.is_some(), "pop should succeed");
42 if let Some(c) = c {
43 assert_eq!(c.tsn, i, "TSN should match");
44 }
45 }
46
47 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch");
48 assert_eq!(pq.len(), 0, "item count mismatch");
49
50 assert!(pq.sorted.is_empty(), "should be empty");
51 pq.push_no_check(make_payload(3, 13));
52 assert_eq!(pq.get_num_bytes(), 13, "total bytes mismatch");
53 pq.push_no_check(make_payload(4, 14));
54 assert_eq!(pq.get_num_bytes(), 27, "total bytes mismatch");
55
56 for i in 3..5 {
57 assert!(!pq.sorted.is_empty(), "should not be empty");
58 let c = pq.pop(i);
59 assert!(c.is_some(), "pop should succeed");
60 if let Some(c) = c {
61 assert_eq!(c.tsn, i, "TSN should match");
62 }
63 }
64
65 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch");
66 assert_eq!(pq.len(), 0, "item count mismatch");
67
68 Ok(())
69 }
70
71 #[test]
test_payload_queue_get_gap_ack_block() -> Result<()>72 fn test_payload_queue_get_gap_ack_block() -> Result<()> {
73 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
74
75 pq.push(make_payload(1, 0), 0);
76 pq.push(make_payload(2, 0), 0);
77 pq.push(make_payload(3, 0), 0);
78 pq.push(make_payload(4, 0), 0);
79 pq.push(make_payload(5, 0), 0);
80 pq.push(make_payload(6, 0), 0);
81
82 let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
83 let gab2 = pq.get_gap_ack_blocks(0);
84 assert!(!gab2.is_empty());
85 assert_eq!(gab2.len(), 1);
86
87 assert_eq!(gab2[0].start, gab1[0].start);
88 assert_eq!(gab2[0].end, gab1[0].end);
89
90 pq.push(make_payload(8, 0), 0);
91 pq.push(make_payload(9, 0), 0);
92
93 let gab1 = vec![
94 GapAckBlock { start: 1, end: 6 },
95 GapAckBlock { start: 8, end: 9 },
96 ];
97 let gab2 = pq.get_gap_ack_blocks(0);
98 assert!(!gab2.is_empty());
99 assert_eq!(gab2.len(), 2);
100
101 assert_eq!(gab2[0].start, gab1[0].start);
102 assert_eq!(gab2[0].end, gab1[0].end);
103 assert_eq!(gab2[1].start, gab1[1].start);
104 assert_eq!(gab2[1].end, gab1[1].end);
105
106 Ok(())
107 }
108
109 #[test]
test_payload_queue_get_last_tsn_received() -> Result<()>110 fn test_payload_queue_get_last_tsn_received() -> Result<()> {
111 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
112
113 // empty queie should return false
114 let ok = pq.get_last_tsn_received();
115 assert!(ok.is_none(), "should be none");
116
117 let ok = pq.push(make_payload(20, 0), 0);
118 assert!(ok, "should be true");
119 let tsn = pq.get_last_tsn_received();
120 assert!(tsn.is_some(), "should be false");
121 assert_eq!(tsn, Some(&20), "should match");
122
123 // append should work
124 let ok = pq.push(make_payload(21, 0), 0);
125 assert!(ok, "should be true");
126 let tsn = pq.get_last_tsn_received();
127 assert!(tsn.is_some(), "should be false");
128 assert_eq!(tsn, Some(&21), "should match");
129
130 // check if sorting applied
131 let ok = pq.push(make_payload(19, 0), 0);
132 assert!(ok, "should be true");
133 let tsn = pq.get_last_tsn_received();
134 assert!(tsn.is_some(), "should be false");
135 assert_eq!(tsn, Some(&21), "should match");
136
137 Ok(())
138 }
139
140 #[test]
test_payload_queue_mark_all_to_retrasmit() -> Result<()>141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> {
142 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
143
144 for i in 0..3 {
145 pq.push(make_payload(i + 1, 10), 0);
146 }
147 pq.mark_as_acked(2);
148 pq.mark_all_to_retrasmit();
149
150 let c = pq.get(1);
151 assert!(c.is_some(), "should be true");
152 assert!(c.unwrap().retransmit, "should be marked as retransmit");
153 let c = pq.get(2);
154 assert!(c.is_some(), "should be true");
155 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
156 let c = pq.get(3);
157 assert!(c.is_some(), "should be true");
158 assert!(c.unwrap().retransmit, "should be marked as retransmit");
159
160 Ok(())
161 }
162
163 #[test]
test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()>164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> {
165 let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
166
167 for i in 0..4 {
168 pq.push(make_payload(i + 1, 10), 0);
169 }
170
171 pq.mark_all_to_retrasmit();
172 pq.mark_as_acked(2); // should cancel retransmission for TSN 2
173 pq.mark_as_acked(4); // should cancel retransmission for TSN 4
174
175 let c = pq.get(1);
176 assert!(c.is_some(), "should be true");
177 assert!(c.unwrap().retransmit, "should be marked as retransmit");
178 let c = pq.get(2);
179 assert!(c.is_some(), "should be true");
180 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
181 let c = pq.get(3);
182 assert!(c.is_some(), "should be true");
183 assert!(c.unwrap().retransmit, "should be marked as retransmit");
184 let c = pq.get(4);
185 assert!(c.is_some(), "should be true");
186 assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
187
188 Ok(())
189 }
190
191 ///////////////////////////////////////////////////////////////////
192 //pending_queue_test
193 ///////////////////////////////////////////////////////////////////
194 use super::pending_queue::*;
195
196 const NO_FRAGMENT: usize = 0;
197 const FRAG_BEGIN: usize = 1;
198 const FRAG_MIDDLE: usize = 2;
199 const FRAG_END: usize = 3;
200
make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData {
202 let mut b = false;
203 let mut e = false;
204
205 match frag {
206 NO_FRAGMENT => {
207 b = true;
208 e = true;
209 }
210 FRAG_BEGIN => {
211 b = true;
212 }
213 FRAG_END => e = true,
214 _ => {}
215 };
216
217 ChunkPayloadData {
218 tsn,
219 unordered,
220 beginning_fragment: b,
221 ending_fragment: e,
222 user_data: {
223 let mut b = BytesMut::new();
224 b.resize(10, 0); // always 10 bytes
225 b.freeze()
226 },
227 ..Default::default()
228 }
229 }
230
231 #[test]
test_pending_base_queue_push_and_pop() -> Result<()>232 fn test_pending_base_queue_push_and_pop() -> Result<()> {
233 let mut pq = PendingBaseQueue::new();
234 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
235 pq.push_back(make_data_chunk(1, false, NO_FRAGMENT));
236 pq.push_back(make_data_chunk(2, false, NO_FRAGMENT));
237
238 for i in 0..3 {
239 let c = pq.get(i);
240 assert!(c.is_some(), "should not be none");
241 assert_eq!(c.unwrap().tsn, i as u32, "TSN should match");
242 }
243
244 for i in 0..3 {
245 let c = pq.pop_front();
246 assert!(c.is_some(), "should not be none");
247 assert_eq!(c.unwrap().tsn, i, "TSN should match");
248 }
249
250 pq.push_back(make_data_chunk(3, false, NO_FRAGMENT));
251 pq.push_back(make_data_chunk(4, false, NO_FRAGMENT));
252
253 for i in 3..5 {
254 let c = pq.pop_front();
255 assert!(c.is_some(), "should not be none");
256 assert_eq!(c.unwrap().tsn, i, "TSN should match");
257 }
258 Ok(())
259 }
260
261 #[test]
test_pending_base_queue_out_of_bounce() -> Result<()>262 fn test_pending_base_queue_out_of_bounce() -> Result<()> {
263 let mut pq = PendingBaseQueue::new();
264 assert!(pq.pop_front().is_none(), "should be none");
265 assert!(pq.get(0).is_none(), "should be none");
266
267 pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
268 assert!(pq.get(1).is_none(), "should be none");
269
270 Ok(())
271 }
272
273 // NOTE: TSN is not used in pendingQueue in the actual usage.
274 // Following tests use TSN field as a chunk ID.
275 #[tokio::test]
test_pending_queue_push_and_pop() -> Result<()>276 async fn test_pending_queue_push_and_pop() -> Result<()> {
277 let pq = PendingQueue::new();
278 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
279 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch");
280 pq.push(make_data_chunk(1, false, NO_FRAGMENT)).await;
281 assert_eq!(pq.get_num_bytes(), 20, "total bytes mismatch");
282 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
283 assert_eq!(pq.get_num_bytes(), 30, "total bytes mismatch");
284
285 for i in 0..3 {
286 let c = pq.peek();
287 assert!(c.is_some(), "peek error");
288 let c = c.unwrap();
289 assert_eq!(c.tsn, i, "TSN should match");
290 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
291
292 let result = pq.pop(beginning_fragment, unordered);
293 assert!(result.is_some(), "should not error: {i}");
294 }
295
296 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch");
297
298 pq.push(make_data_chunk(3, false, NO_FRAGMENT)).await;
299 assert_eq!(pq.get_num_bytes(), 10, "total bytes mismatch");
300 pq.push(make_data_chunk(4, false, NO_FRAGMENT)).await;
301 assert_eq!(pq.get_num_bytes(), 20, "total bytes mismatch");
302
303 for i in 3..5 {
304 let c = pq.peek();
305 assert!(c.is_some(), "peek error");
306 let c = c.unwrap();
307 assert_eq!(c.tsn, i, "TSN should match");
308 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
309
310 let result = pq.pop(beginning_fragment, unordered);
311 assert!(result.is_some(), "should not error: {i}");
312 }
313
314 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch");
315
316 Ok(())
317 }
318
319 #[tokio::test]
test_pending_queue_unordered_wins() -> Result<()>320 async fn test_pending_queue_unordered_wins() -> Result<()> {
321 let pq = PendingQueue::new();
322
323 pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
324 assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
325 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
326 assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
327 pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
328 assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
329 pq.push(make_data_chunk(3, true, NO_FRAGMENT)).await;
330 assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch");
331
332 let c = pq.peek();
333 assert!(c.is_some(), "peek error");
334 let c = c.unwrap();
335 assert_eq!(c.tsn, 1, "TSN should match");
336 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
337 let result = pq.pop(beginning_fragment, unordered);
338 assert!(result.is_some(), "should not error");
339
340 let c = pq.peek();
341 assert!(c.is_some(), "peek error");
342 let c = c.unwrap();
343 assert_eq!(c.tsn, 3, "TSN should match");
344 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
345 let result = pq.pop(beginning_fragment, unordered);
346 assert!(result.is_some(), "should not error");
347
348 let c = pq.peek();
349 assert!(c.is_some(), "peek error");
350 let c = c.unwrap();
351 assert_eq!(c.tsn, 0, "TSN should match");
352 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
353 let result = pq.pop(beginning_fragment, unordered);
354 assert!(result.is_some(), "should not error");
355
356 let c = pq.peek();
357 assert!(c.is_some(), "peek error");
358 let c = c.unwrap();
359 assert_eq!(c.tsn, 2, "TSN should match");
360 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
361 let result = pq.pop(beginning_fragment, unordered);
362 assert!(result.is_some(), "should not error");
363
364 assert_eq!(pq.get_num_bytes(), 0, "total bytes mismatch");
365
366 Ok(())
367 }
368
369 #[tokio::test]
test_pending_queue_fragments() -> Result<()>370 async fn test_pending_queue_fragments() -> Result<()> {
371 let pq = PendingQueue::new();
372 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
373 pq.push(make_data_chunk(1, false, FRAG_MIDDLE)).await;
374 pq.push(make_data_chunk(2, false, FRAG_END)).await;
375 pq.push(make_data_chunk(3, true, FRAG_BEGIN)).await;
376 pq.push(make_data_chunk(4, true, FRAG_MIDDLE)).await;
377 pq.push(make_data_chunk(5, true, FRAG_END)).await;
378
379 let expects = vec![3, 4, 5, 0, 1, 2];
380
381 for exp in expects {
382 let c = pq.peek();
383 assert!(c.is_some(), "peek error");
384 let c = c.unwrap();
385 assert_eq!(c.tsn, exp, "TSN should match");
386 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
387 let result = pq.pop(beginning_fragment, unordered);
388 assert!(result.is_some(), "should not error: {exp}");
389 }
390
391 Ok(())
392 }
393
394 // Once decided ordered or unordered, the decision should persist until
395 // it pops a chunk with ending_fragment flags set to true.
396 #[tokio::test]
test_pending_queue_selection_persistence() -> Result<()>397 async fn test_pending_queue_selection_persistence() -> Result<()> {
398 let pq = PendingQueue::new();
399 pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
400
401 let c = pq.peek();
402 assert!(c.is_some(), "peek error");
403 let c = c.unwrap();
404 assert_eq!(c.tsn, 0, "TSN should match");
405 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
406 let result = pq.pop(beginning_fragment, unordered);
407 assert!(result.is_some(), "should not error: {}", 0);
408
409 pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
410 pq.push(make_data_chunk(2, false, FRAG_MIDDLE)).await;
411 pq.push(make_data_chunk(3, false, FRAG_END)).await;
412
413 let expects = vec![2, 3, 1];
414
415 for exp in expects {
416 let c = pq.peek();
417 assert!(c.is_some(), "peek error");
418 let c = c.unwrap();
419 assert_eq!(c.tsn, exp, "TSN should match");
420 let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
421 let result = pq.pop(beginning_fragment, unordered);
422 assert!(result.is_some(), "should not error: {exp}");
423 }
424
425 Ok(())
426 }
427
428 #[tokio::test]
test_pending_queue_append() -> Result<()>429 async fn test_pending_queue_append() -> Result<()> {
430 let pq = PendingQueue::new();
431 pq.append(vec![
432 make_data_chunk(0, false, NO_FRAGMENT),
433 make_data_chunk(1, false, NO_FRAGMENT),
434 make_data_chunk(3, false, NO_FRAGMENT),
435 ])
436 .await;
437 assert_eq!(pq.get_num_bytes(), 30, "total bytes mismatch");
438 assert_eq!(pq.len(), 3, "len mismatch");
439
440 Ok(())
441 }
442
443 ///////////////////////////////////////////////////////////////////
444 //reassembly_queue_test
445 ///////////////////////////////////////////////////////////////////
446 use super::reassembly_queue::*;
447 use std::sync::atomic::AtomicUsize;
448 use std::sync::Arc;
449
450 #[test]
test_reassembly_queue_ordered_fragments() -> Result<()>451 fn test_reassembly_queue_ordered_fragments() -> Result<()> {
452 let mut rq = ReassemblyQueue::new(0);
453
454 let org_ppi = PayloadProtocolIdentifier::Binary;
455
456 let chunk = ChunkPayloadData {
457 payload_type: org_ppi,
458 beginning_fragment: true,
459 tsn: 1,
460 stream_sequence_number: 0,
461 user_data: Bytes::from_static(b"ABC"),
462 ..Default::default()
463 };
464
465 let complete = rq.push(chunk);
466 assert!(!complete, "chunk set should not be complete yet");
467 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
468
469 let chunk = ChunkPayloadData {
470 payload_type: org_ppi,
471 ending_fragment: true,
472 tsn: 2,
473 stream_sequence_number: 0,
474 user_data: Bytes::from_static(b"DEFG"),
475 ..Default::default()
476 };
477
478 let complete = rq.push(chunk);
479 assert!(complete, "chunk set should be complete");
480 assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
481
482 let mut buf = vec![0u8; 16];
483
484 let (n, ppi) = rq.read(&mut buf)?;
485 assert_eq!(n, 7, "should received 7 bytes");
486 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
487 assert_eq!(ppi, org_ppi, "should have valid ppi");
488 assert_eq!(&buf[..n], b"ABCDEFG", "data should match");
489
490 Ok(())
491 }
492
493 #[test]
test_reassembly_queue_unordered_fragments() -> Result<()>494 fn test_reassembly_queue_unordered_fragments() -> Result<()> {
495 let mut rq = ReassemblyQueue::new(0);
496
497 let org_ppi = PayloadProtocolIdentifier::Binary;
498
499 let chunk = ChunkPayloadData {
500 payload_type: org_ppi,
501 unordered: true,
502 beginning_fragment: true,
503 tsn: 1,
504 stream_sequence_number: 0,
505 user_data: Bytes::from_static(b"ABC"),
506 ..Default::default()
507 };
508
509 let complete = rq.push(chunk);
510 assert!(!complete, "chunk set should not be complete yet");
511 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
512
513 let chunk = ChunkPayloadData {
514 payload_type: org_ppi,
515 unordered: true,
516 tsn: 2,
517 stream_sequence_number: 0,
518 user_data: Bytes::from_static(b"DEFG"),
519 ..Default::default()
520 };
521
522 let complete = rq.push(chunk);
523 assert!(!complete, "chunk set should not be complete yet");
524 assert_eq!(rq.get_num_bytes(), 7, "num bytes mismatch");
525
526 let chunk = ChunkPayloadData {
527 payload_type: org_ppi,
528 unordered: true,
529 ending_fragment: true,
530 tsn: 3,
531 stream_sequence_number: 0,
532 user_data: Bytes::from_static(b"H"),
533 ..Default::default()
534 };
535
536 let complete = rq.push(chunk);
537 assert!(complete, "chunk set should be complete");
538 assert_eq!(rq.get_num_bytes(), 8, "num bytes mismatch");
539
540 let mut buf = vec![0u8; 16];
541
542 let (n, ppi) = rq.read(&mut buf)?;
543 assert_eq!(n, 8, "should received 8 bytes");
544 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
545 assert_eq!(ppi, org_ppi, "should have valid ppi");
546 assert_eq!(&buf[..n], b"ABCDEFGH", "data should match");
547
548 Ok(())
549 }
550
551 #[test]
test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()>552 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> {
553 let mut rq = ReassemblyQueue::new(0);
554 let org_ppi = PayloadProtocolIdentifier::Binary;
555 let chunk = ChunkPayloadData {
556 payload_type: org_ppi,
557 beginning_fragment: true,
558 ending_fragment: true,
559 tsn: 1,
560 stream_sequence_number: 0,
561 user_data: Bytes::from_static(b"ABC"),
562 ..Default::default()
563 };
564
565 let complete = rq.push(chunk);
566 assert!(complete, "chunk set should be complete");
567 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
568
569 let chunk = ChunkPayloadData {
570 payload_type: org_ppi,
571 unordered: true,
572 beginning_fragment: true,
573 ending_fragment: true,
574 tsn: 2,
575 stream_sequence_number: 1,
576 user_data: Bytes::from_static(b"DEF"),
577 ..Default::default()
578 };
579
580 let complete = rq.push(chunk);
581 assert!(complete, "chunk set should be complete");
582 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch");
583
584 //
585 // Now we have two complete chunks ready to read in the reassemblyQueue.
586 //
587
588 let mut buf = vec![0u8; 16];
589
590 // Should read unordered chunks first
591 let (n, ppi) = rq.read(&mut buf)?;
592 assert_eq!(n, 3, "should received 3 bytes");
593 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
594 assert_eq!(ppi, org_ppi, "should have valid ppi");
595 assert_eq!(&buf[..n], b"DEF", "data should match");
596
597 // Next should read ordered chunks
598 let (n, ppi) = rq.read(&mut buf)?;
599 assert_eq!(n, 3, "should received 3 bytes");
600 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
601 assert_eq!(ppi, org_ppi, "should have valid ppi");
602 assert_eq!(&buf[..n], b"ABC", "data should match");
603
604 Ok(())
605 }
606
607 #[test]
test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()>608 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> {
609 let mut rq = ReassemblyQueue::new(0);
610
611 let org_ppi = PayloadProtocolIdentifier::Binary;
612
613 let chunk = ChunkPayloadData {
614 payload_type: org_ppi,
615 unordered: true,
616 beginning_fragment: true,
617 tsn: 10,
618 stream_sequence_number: 0,
619 user_data: Bytes::from_static(b"IN"),
620 ..Default::default()
621 };
622
623 let complete = rq.push(chunk);
624 assert!(!complete, "chunk set should not be complete yet");
625 assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
626
627 let chunk = ChunkPayloadData {
628 payload_type: org_ppi,
629 unordered: true,
630 ending_fragment: true,
631 tsn: 12, // <- incongiguous
632 stream_sequence_number: 1,
633 user_data: Bytes::from_static(b"COMPLETE"),
634 ..Default::default()
635 };
636
637 let complete = rq.push(chunk);
638 assert!(!complete, "chunk set should not be complete yet");
639 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch");
640
641 let chunk = ChunkPayloadData {
642 payload_type: org_ppi,
643 unordered: true,
644 beginning_fragment: true,
645 ending_fragment: true,
646 tsn: 13,
647 stream_sequence_number: 1,
648 user_data: Bytes::from_static(b"GOOD"),
649 ..Default::default()
650 };
651
652 let complete = rq.push(chunk);
653 assert!(complete, "chunk set should be complete");
654 assert_eq!(rq.get_num_bytes(), 14, "num bytes mismatch");
655
656 //
657 // Now we have two complete chunks ready to read in the reassemblyQueue.
658 //
659
660 let mut buf = vec![0u8; 16];
661
662 // Should pick the one that has "GOOD"
663 let (n, ppi) = rq.read(&mut buf)?;
664 assert_eq!(n, 4, "should receive 4 bytes");
665 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch");
666 assert_eq!(ppi, org_ppi, "should have valid ppi");
667 assert_eq!(&buf[..n], b"GOOD", "data should match");
668
669 Ok(())
670 }
671
672 #[test]
test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()>673 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> {
674 let mut rq = ReassemblyQueue::new(123);
675
676 let org_ppi = PayloadProtocolIdentifier::Binary;
677
678 let chunk = ChunkPayloadData {
679 payload_type: org_ppi,
680 stream_identifier: 124,
681 beginning_fragment: true,
682 ending_fragment: true,
683 tsn: 10,
684 stream_sequence_number: 0,
685 user_data: Bytes::from_static(b"IN"),
686 ..Default::default()
687 };
688
689 let complete = rq.push(chunk);
690 assert!(!complete, "chunk should be ignored");
691 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
692 Ok(())
693 }
694
695 #[test]
test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()>696 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> {
697 let mut rq = ReassemblyQueue::new(0);
698 rq.next_ssn = 7; // forcibly set expected SSN to 7
699
700 let org_ppi = PayloadProtocolIdentifier::Binary;
701
702 let chunk = ChunkPayloadData {
703 payload_type: org_ppi,
704 beginning_fragment: true,
705 ending_fragment: true,
706 tsn: 10,
707 stream_sequence_number: 6, // <-- stale
708 user_data: Bytes::from_static(b"IN"),
709 ..Default::default()
710 };
711
712 let complete = rq.push(chunk);
713 assert!(!complete, "chunk should not be ignored");
714 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
715
716 Ok(())
717 }
718
719 #[test]
test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()>720 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> {
721 let mut rq = ReassemblyQueue::new(0);
722
723 let org_ppi = PayloadProtocolIdentifier::Binary;
724
725 let chunk = ChunkPayloadData {
726 payload_type: org_ppi,
727 beginning_fragment: true,
728 tsn: 123,
729 stream_sequence_number: 0,
730 user_data: Bytes::from_static(b"IN"),
731 ..Default::default()
732 };
733
734 let complete = rq.push(chunk);
735 assert!(!complete, "the set should not be complete");
736 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch");
737
738 let mut buf = vec![0u8; 16];
739 let result = rq.read(&mut buf);
740 assert!(result.is_err(), "read() should not succeed");
741 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch");
742
743 Ok(())
744 }
745
746 #[test]
test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()>747 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> {
748 let mut rq = ReassemblyQueue::new(0);
749
750 let org_ppi = PayloadProtocolIdentifier::Binary;
751
752 let chunk = ChunkPayloadData {
753 payload_type: org_ppi,
754 beginning_fragment: true,
755 ending_fragment: true,
756 tsn: 123,
757 stream_sequence_number: 1,
758 user_data: Bytes::from_static(b"IN"),
759 ..Default::default()
760 };
761
762 let complete = rq.push(chunk);
763 assert!(complete, "the set should be complete");
764 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch");
765
766 let mut buf = vec![0u8; 16];
767 let result = rq.read(&mut buf);
768 assert!(result.is_err(), "read() should not succeed");
769 assert_eq!(rq.get_num_bytes(), 2, "num bytes mismatch");
770
771 Ok(())
772 }
773
774 #[test]
test_reassembly_queue_detect_buffer_too_short() -> Result<()>775 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> {
776 let mut rq = ReassemblyQueue::new(0);
777
778 let org_ppi = PayloadProtocolIdentifier::Binary;
779
780 let chunk = ChunkPayloadData {
781 payload_type: org_ppi,
782 beginning_fragment: true,
783 ending_fragment: true,
784 tsn: 123,
785 stream_sequence_number: 0,
786 user_data: Bytes::from_static(b"0123456789"),
787 ..Default::default()
788 };
789
790 let complete = rq.push(chunk);
791 assert!(complete, "the set should be complete");
792 assert_eq!(rq.get_num_bytes(), 10, "num bytes mismatch");
793
794 let mut buf = vec![0u8; 8]; // <- passing buffer too short
795 let result = rq.read(&mut buf);
796 assert!(result.is_err(), "read() should not succeed");
797 if let Err(err) = result {
798 assert_eq!(err, Error::ErrShortBuffer, "read() should not succeed");
799 }
800 assert_eq!(rq.get_num_bytes(), 0, "num bytes mismatch");
801
802 Ok(())
803 }
804
805 #[test]
test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()>806 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> {
807 let mut rq = ReassemblyQueue::new(0);
808
809 let org_ppi = PayloadProtocolIdentifier::Binary;
810
811 let ssn_complete = 5u16;
812 let ssn_dropped = 6u16;
813
814 let chunk = ChunkPayloadData {
815 payload_type: org_ppi,
816 beginning_fragment: true,
817 ending_fragment: true,
818 tsn: 10,
819 stream_sequence_number: ssn_complete,
820 user_data: Bytes::from_static(b"123"),
821 ..Default::default()
822 };
823
824 let complete = rq.push(chunk);
825 assert!(complete, "chunk set should be complete");
826 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
827
828 let chunk = ChunkPayloadData {
829 payload_type: org_ppi,
830 beginning_fragment: true,
831 tsn: 11,
832 stream_sequence_number: ssn_dropped,
833 user_data: Bytes::from_static(b"ABC"),
834 ..Default::default()
835 };
836
837 let complete = rq.push(chunk);
838 assert!(!complete, "chunk set should not be complete yet");
839 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch");
840
841 let chunk = ChunkPayloadData {
842 payload_type: org_ppi,
843 tsn: 12,
844 stream_sequence_number: ssn_dropped,
845 user_data: Bytes::from_static(b"DEF"),
846 ..Default::default()
847 };
848
849 let complete = rq.push(chunk);
850 assert!(!complete, "chunk set should not be complete yet");
851 assert_eq!(rq.get_num_bytes(), 9, "num bytes mismatch");
852
853 rq.forward_tsn_for_ordered(ssn_dropped);
854
855 assert_eq!(rq.ordered.len(), 1, "there should be one chunk left");
856 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
857
858 Ok(())
859 }
860
861 #[test]
test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()>862 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> {
863 let mut rq = ReassemblyQueue::new(0);
864
865 let org_ppi = PayloadProtocolIdentifier::Binary;
866
867 let ssn_dropped = 6u16;
868 let ssn_kept = 7u16;
869
870 let chunk = ChunkPayloadData {
871 payload_type: org_ppi,
872 unordered: true,
873 beginning_fragment: true,
874 tsn: 11,
875 stream_sequence_number: ssn_dropped,
876 user_data: Bytes::from_static(b"ABC"),
877 ..Default::default()
878 };
879
880 let complete = rq.push(chunk);
881 assert!(!complete, "chunk set should not be complete yet");
882 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
883
884 let chunk = ChunkPayloadData {
885 payload_type: org_ppi,
886 unordered: true,
887 tsn: 12,
888 stream_sequence_number: ssn_dropped,
889 user_data: Bytes::from_static(b"DEF"),
890 ..Default::default()
891 };
892
893 let complete = rq.push(chunk);
894 assert!(!complete, "chunk set should not be complete yet");
895 assert_eq!(rq.get_num_bytes(), 6, "num bytes mismatch");
896
897 let chunk = ChunkPayloadData {
898 payload_type: org_ppi,
899 unordered: true,
900 tsn: 14,
901 beginning_fragment: true,
902 stream_sequence_number: ssn_kept,
903 user_data: Bytes::from_static(b"SOS"),
904 ..Default::default()
905 };
906
907 let complete = rq.push(chunk);
908 assert!(!complete, "chunk set should not be complete yet");
909 assert_eq!(rq.get_num_bytes(), 9, "num bytes mismatch");
910
911 // At this point, there are 3 chunks in the rq.unorderedChunks.
912 // This call should remove chunks with tsn equals to 13 or older.
913 rq.forward_tsn_for_unordered(13);
914
915 // As a result, there should be one chunk (tsn=14)
916 assert_eq!(
917 rq.unordered_chunks.len(),
918 1,
919 "there should be one chunk kept"
920 );
921 assert_eq!(rq.get_num_bytes(), 3, "num bytes mismatch");
922
923 Ok(())
924 }
925
926 #[test]
test_chunk_set_empty_chunk_set() -> Result<()>927 fn test_chunk_set_empty_chunk_set() -> Result<()> {
928 let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
929 assert!(!cset.is_complete(), "empty chunkSet cannot be complete");
930 Ok(())
931 }
932
933 #[test]
test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()>934 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> {
935 let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
936 cset.push(ChunkPayloadData {
937 tsn: 100,
938 beginning_fragment: true,
939 ..Default::default()
940 });
941 let complete = cset.push(ChunkPayloadData {
942 tsn: 100,
943 ending_fragment: true,
944 ..Default::default()
945 });
946 assert!(!complete, "chunk with dup TSN is not complete");
947 assert_eq!(cset.chunks.len(), 1, "chunk with dup TSN should be ignored");
948 Ok(())
949 }
950
951 #[test]
test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()>952 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> {
953 let cset = ChunkSet {
954 ssn: 0,
955 ppi: PayloadProtocolIdentifier::default(),
956 chunks: vec![],
957 };
958 assert!(
959 !cset.is_complete(),
960 "chunkSet not starting with B=1 cannot be complete"
961 );
962 Ok(())
963 }
964
965 #[test]
test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()>966 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> {
967 let cset = ChunkSet {
968 ssn: 0,
969 ppi: PayloadProtocolIdentifier::default(),
970 chunks: vec![
971 ChunkPayloadData {
972 tsn: 100,
973 beginning_fragment: true,
974 ..Default::default()
975 },
976 ChunkPayloadData {
977 tsn: 101,
978 ..Default::default()
979 },
980 ChunkPayloadData {
981 tsn: 103,
982 ending_fragment: true,
983 ..Default::default()
984 },
985 ],
986 };
987 assert!(
988 !cset.is_complete(),
989 "chunkSet not starting with incontiguous tsn cannot be complete"
990 );
991 Ok(())
992 }
993