xref: /webrtc/sctp/src/queue/queue_test.rs (revision f45000cd)
1 use crate::error::{Error, Result};
2 
3 use bytes::{Bytes, BytesMut};
4 
5 ///////////////////////////////////////////////////////////////////
6 //payload_queue_test
7 ///////////////////////////////////////////////////////////////////
8 use super::payload_queue::*;
9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
10 use crate::chunk::chunk_selective_ack::GapAckBlock;
11 
12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData {
13     ChunkPayloadData {
14         tsn,
15         user_data: {
16             let mut b = BytesMut::new();
17             b.resize(n_bytes, 0);
18             b.freeze()
19         },
20         ..Default::default()
21     }
22 }
23 
24 #[test]
25 fn test_payload_queue_push_no_check() -> Result<()> {
26     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
27 
28     pq.push_no_check(make_payload(0, 10));
29     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
30     assert_eq!(1, pq.len(), "item count mismatch");
31     pq.push_no_check(make_payload(1, 11));
32     assert_eq!(21, pq.get_num_bytes(), "total bytes mismatch");
33     assert_eq!(2, pq.len(), "item count mismatch");
34     pq.push_no_check(make_payload(2, 12));
35     assert_eq!(33, pq.get_num_bytes(), "total bytes mismatch");
36     assert_eq!(3, pq.len(), "item count mismatch");
37 
38     for i in 0..3 {
39         assert!(!pq.sorted.is_empty(), "should not be empty");
40         let c = pq.pop(i);
41         assert!(c.is_some(), "pop should succeed");
42         if let Some(c) = c {
43             assert_eq!(i, c.tsn, "TSN should match");
44         }
45     }
46 
47     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
48     assert_eq!(0, pq.len(), "item count mismatch");
49 
50     assert!(pq.sorted.is_empty(), "should be empty");
51     pq.push_no_check(make_payload(3, 13));
52     assert_eq!(13, pq.get_num_bytes(), "total bytes mismatch");
53     pq.push_no_check(make_payload(4, 14));
54     assert_eq!(27, pq.get_num_bytes(), "total bytes mismatch");
55 
56     for i in 3..5 {
57         assert!(!pq.sorted.is_empty(), "should not be empty");
58         let c = pq.pop(i);
59         assert!(c.is_some(), "pop should succeed");
60         if let Some(c) = c {
61             assert_eq!(i, c.tsn, "TSN should match");
62         }
63     }
64 
65     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
66     assert_eq!(0, pq.len(), "item count mismatch");
67 
68     Ok(())
69 }
70 
71 #[test]
72 fn test_payload_queue_get_gap_ack_block() -> Result<()> {
73     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
74 
75     pq.push(make_payload(1, 0), 0);
76     pq.push(make_payload(2, 0), 0);
77     pq.push(make_payload(3, 0), 0);
78     pq.push(make_payload(4, 0), 0);
79     pq.push(make_payload(5, 0), 0);
80     pq.push(make_payload(6, 0), 0);
81 
82     let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
83     let gab2 = pq.get_gap_ack_blocks(0);
84     assert!(!gab2.is_empty());
85     assert_eq!(gab2.len(), 1);
86 
87     assert_eq!(gab1[0].start, gab2[0].start);
88     assert_eq!(gab1[0].end, gab2[0].end);
89 
90     pq.push(make_payload(8, 0), 0);
91     pq.push(make_payload(9, 0), 0);
92 
93     let gab1 = vec![
94         GapAckBlock { start: 1, end: 6 },
95         GapAckBlock { start: 8, end: 9 },
96     ];
97     let gab2 = pq.get_gap_ack_blocks(0);
98     assert!(!gab2.is_empty());
99     assert_eq!(gab2.len(), 2);
100 
101     assert_eq!(gab1[0].start, gab2[0].start);
102     assert_eq!(gab1[0].end, gab2[0].end);
103     assert_eq!(gab1[1].start, gab2[1].start);
104     assert_eq!(gab1[1].end, gab2[1].end);
105 
106     Ok(())
107 }
108 
109 #[test]
110 fn test_payload_queue_get_last_tsn_received() -> Result<()> {
111     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
112 
113     // empty queie should return false
114     let ok = pq.get_last_tsn_received();
115     assert!(ok.is_none(), "should be none");
116 
117     let ok = pq.push(make_payload(20, 0), 0);
118     assert!(ok, "should be true");
119     let tsn = pq.get_last_tsn_received();
120     assert!(tsn.is_some(), "should be false");
121     assert_eq!(Some(&20), tsn, "should match");
122 
123     // append should work
124     let ok = pq.push(make_payload(21, 0), 0);
125     assert!(ok, "should be true");
126     let tsn = pq.get_last_tsn_received();
127     assert!(tsn.is_some(), "should be false");
128     assert_eq!(Some(&21), tsn, "should match");
129 
130     // check if sorting applied
131     let ok = pq.push(make_payload(19, 0), 0);
132     assert!(ok, "should be true");
133     let tsn = pq.get_last_tsn_received();
134     assert!(tsn.is_some(), "should be false");
135     assert_eq!(Some(&21), tsn, "should match");
136 
137     Ok(())
138 }
139 
140 #[test]
141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> {
142     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
143 
144     for i in 0..3 {
145         pq.push(make_payload(i + 1, 10), 0);
146     }
147     pq.mark_as_acked(2);
148     pq.mark_all_to_retrasmit();
149 
150     let c = pq.get(1);
151     assert!(c.is_some(), "should be true");
152     assert!(c.unwrap().retransmit, "should be marked as retransmit");
153     let c = pq.get(2);
154     assert!(c.is_some(), "should be true");
155     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
156     let c = pq.get(3);
157     assert!(c.is_some(), "should be true");
158     assert!(c.unwrap().retransmit, "should be marked as retransmit");
159 
160     Ok(())
161 }
162 
163 #[test]
164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> {
165     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
166 
167     for i in 0..4 {
168         pq.push(make_payload(i + 1, 10), 0);
169     }
170 
171     pq.mark_all_to_retrasmit();
172     pq.mark_as_acked(2); // should cancel retransmission for TSN 2
173     pq.mark_as_acked(4); // should cancel retransmission for TSN 4
174 
175     let c = pq.get(1);
176     assert!(c.is_some(), "should be true");
177     assert!(c.unwrap().retransmit, "should be marked as retransmit");
178     let c = pq.get(2);
179     assert!(c.is_some(), "should be true");
180     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
181     let c = pq.get(3);
182     assert!(c.is_some(), "should be true");
183     assert!(c.unwrap().retransmit, "should be marked as retransmit");
184     let c = pq.get(4);
185     assert!(c.is_some(), "should be true");
186     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
187 
188     Ok(())
189 }
190 
191 ///////////////////////////////////////////////////////////////////
192 //pending_queue_test
193 ///////////////////////////////////////////////////////////////////
194 use super::pending_queue::*;
195 
196 const NO_FRAGMENT: usize = 0;
197 const FRAG_BEGIN: usize = 1;
198 const FRAG_MIDDLE: usize = 2;
199 const FRAG_END: usize = 3;
200 
201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData {
202     let mut b = false;
203     let mut e = false;
204 
205     match frag {
206         NO_FRAGMENT => {
207             b = true;
208             e = true;
209         }
210         FRAG_BEGIN => {
211             b = true;
212         }
213         FRAG_END => e = true,
214         _ => {}
215     };
216 
217     ChunkPayloadData {
218         tsn,
219         unordered,
220         beginning_fragment: b,
221         ending_fragment: e,
222         user_data: {
223             let mut b = BytesMut::new();
224             b.resize(10, 0); // always 10 bytes
225             b.freeze()
226         },
227         ..Default::default()
228     }
229 }
230 
231 #[test]
232 fn test_pending_base_queue_push_and_pop() -> Result<()> {
233     let mut pq = PendingBaseQueue::new();
234     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
235     pq.push_back(make_data_chunk(1, false, NO_FRAGMENT));
236     pq.push_back(make_data_chunk(2, false, NO_FRAGMENT));
237 
238     for i in 0..3 {
239         let c = pq.get(i);
240         assert!(c.is_some(), "should not be none");
241         assert_eq!(i as u32, c.unwrap().tsn, "TSN should match");
242     }
243 
244     for i in 0..3 {
245         let c = pq.pop_front();
246         assert!(c.is_some(), "should not be none");
247         assert_eq!(i, c.unwrap().tsn, "TSN should match");
248     }
249 
250     pq.push_back(make_data_chunk(3, false, NO_FRAGMENT));
251     pq.push_back(make_data_chunk(4, false, NO_FRAGMENT));
252 
253     for i in 3..5 {
254         let c = pq.pop_front();
255         assert!(c.is_some(), "should not be none");
256         assert_eq!(i, c.unwrap().tsn, "TSN should match");
257     }
258     Ok(())
259 }
260 
261 #[test]
262 fn test_pending_base_queue_out_of_bounce() -> Result<()> {
263     let mut pq = PendingBaseQueue::new();
264     assert!(pq.pop_front().is_none(), "should be none");
265     assert!(pq.get(0).is_none(), "should be none");
266 
267     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
268     assert!(pq.get(1).is_none(), "should be none");
269 
270     Ok(())
271 }
272 
273 // NOTE: TSN is not used in pendingQueue in the actual usage.
274 //       Following tests use TSN field as a chunk ID.
275 #[tokio::test]
276 async fn test_pending_queue_push_and_pop() -> Result<()> {
277     let pq = PendingQueue::new();
278     pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
279     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
280     pq.push(make_data_chunk(1, false, NO_FRAGMENT)).await;
281     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
282     pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
283     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
284 
285     for i in 0..3 {
286         let c = pq.peek();
287         assert!(c.is_some(), "peek error");
288         let c = c.unwrap();
289         assert_eq!(i, c.tsn, "TSN should match");
290         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
291 
292         let result = pq.pop(beginning_fragment, unordered);
293         assert!(result.is_some(), "should not error: {}", i);
294     }
295 
296     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
297 
298     pq.push(make_data_chunk(3, false, NO_FRAGMENT)).await;
299     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
300     pq.push(make_data_chunk(4, false, NO_FRAGMENT)).await;
301     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
302 
303     for i in 3..5 {
304         let c = pq.peek();
305         assert!(c.is_some(), "peek error");
306         let c = c.unwrap();
307         assert_eq!(i, c.tsn, "TSN should match");
308         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
309 
310         let result = pq.pop(beginning_fragment, unordered);
311         assert!(result.is_some(), "should not error: {}", i);
312     }
313 
314     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
315 
316     Ok(())
317 }
318 
319 #[tokio::test]
320 async fn test_pending_queue_unordered_wins() -> Result<()> {
321     let pq = PendingQueue::new();
322 
323     pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
324     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
325     pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
326     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
327     pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
328     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
329     pq.push(make_data_chunk(3, true, NO_FRAGMENT)).await;
330     assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch");
331 
332     let c = pq.peek();
333     assert!(c.is_some(), "peek error");
334     let c = c.unwrap();
335     assert_eq!(1, c.tsn, "TSN should match");
336     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
337     let result = pq.pop(beginning_fragment, unordered);
338     assert!(result.is_some(), "should not error");
339 
340     let c = pq.peek();
341     assert!(c.is_some(), "peek error");
342     let c = c.unwrap();
343     assert_eq!(3, c.tsn, "TSN should match");
344     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
345     let result = pq.pop(beginning_fragment, unordered);
346     assert!(result.is_some(), "should not error");
347 
348     let c = pq.peek();
349     assert!(c.is_some(), "peek error");
350     let c = c.unwrap();
351     assert_eq!(0, c.tsn, "TSN should match");
352     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
353     let result = pq.pop(beginning_fragment, unordered);
354     assert!(result.is_some(), "should not error");
355 
356     let c = pq.peek();
357     assert!(c.is_some(), "peek error");
358     let c = c.unwrap();
359     assert_eq!(2, c.tsn, "TSN should match");
360     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
361     let result = pq.pop(beginning_fragment, unordered);
362     assert!(result.is_some(), "should not error");
363 
364     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
365 
366     Ok(())
367 }
368 
369 #[tokio::test]
370 async fn test_pending_queue_fragments() -> Result<()> {
371     let pq = PendingQueue::new();
372     pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
373     pq.push(make_data_chunk(1, false, FRAG_MIDDLE)).await;
374     pq.push(make_data_chunk(2, false, FRAG_END)).await;
375     pq.push(make_data_chunk(3, true, FRAG_BEGIN)).await;
376     pq.push(make_data_chunk(4, true, FRAG_MIDDLE)).await;
377     pq.push(make_data_chunk(5, true, FRAG_END)).await;
378 
379     let expects = vec![3, 4, 5, 0, 1, 2];
380 
381     for exp in expects {
382         let c = pq.peek();
383         assert!(c.is_some(), "peek error");
384         let c = c.unwrap();
385         assert_eq!(exp, c.tsn, "TSN should match");
386         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
387         let result = pq.pop(beginning_fragment, unordered);
388         assert!(result.is_some(), "should not error: {}", exp);
389     }
390 
391     Ok(())
392 }
393 
394 // Once decided ordered or unordered, the decision should persist until
395 // it pops a chunk with ending_fragment flags set to true.
396 #[tokio::test]
397 async fn test_pending_queue_selection_persistence() -> Result<()> {
398     let pq = PendingQueue::new();
399     pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
400 
401     let c = pq.peek();
402     assert!(c.is_some(), "peek error");
403     let c = c.unwrap();
404     assert_eq!(0, c.tsn, "TSN should match");
405     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
406     let result = pq.pop(beginning_fragment, unordered);
407     assert!(result.is_some(), "should not error: {}", 0);
408 
409     pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
410     pq.push(make_data_chunk(2, false, FRAG_MIDDLE)).await;
411     pq.push(make_data_chunk(3, false, FRAG_END)).await;
412 
413     let expects = vec![2, 3, 1];
414 
415     for exp in expects {
416         let c = pq.peek();
417         assert!(c.is_some(), "peek error");
418         let c = c.unwrap();
419         assert_eq!(exp, c.tsn, "TSN should match");
420         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
421         let result = pq.pop(beginning_fragment, unordered);
422         assert!(result.is_some(), "should not error: {}", exp);
423     }
424 
425     Ok(())
426 }
427 
428 #[tokio::test]
429 async fn test_pending_queue_append() -> Result<()> {
430     let pq = PendingQueue::new();
431     pq.append(vec![
432         make_data_chunk(0, false, NO_FRAGMENT),
433         make_data_chunk(1, false, NO_FRAGMENT),
434         make_data_chunk(3, false, NO_FRAGMENT),
435     ])
436     .await;
437     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
438     assert_eq!(3, pq.len(), "len mismatch");
439 
440     Ok(())
441 }
442 
443 ///////////////////////////////////////////////////////////////////
444 //reassembly_queue_test
445 ///////////////////////////////////////////////////////////////////
446 use super::reassembly_queue::*;
447 use std::sync::atomic::AtomicUsize;
448 use std::sync::Arc;
449 
450 #[test]
451 fn test_reassembly_queue_ordered_fragments() -> Result<()> {
452     let mut rq = ReassemblyQueue::new(0);
453 
454     let org_ppi = PayloadProtocolIdentifier::Binary;
455 
456     let chunk = ChunkPayloadData {
457         payload_type: org_ppi,
458         beginning_fragment: true,
459         tsn: 1,
460         stream_sequence_number: 0,
461         user_data: Bytes::from_static(b"ABC"),
462         ..Default::default()
463     };
464 
465     let complete = rq.push(chunk);
466     assert!(!complete, "chunk set should not be complete yet");
467     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
468 
469     let chunk = ChunkPayloadData {
470         payload_type: org_ppi,
471         ending_fragment: true,
472         tsn: 2,
473         stream_sequence_number: 0,
474         user_data: Bytes::from_static(b"DEFG"),
475         ..Default::default()
476     };
477 
478     let complete = rq.push(chunk);
479     assert!(complete, "chunk set should be complete");
480     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
481 
482     let mut buf = vec![0u8; 16];
483 
484     let (n, ppi) = rq.read(&mut buf)?;
485     assert_eq!(7, n, "should received 7 bytes");
486     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
487     assert_eq!(ppi, org_ppi, "should have valid ppi");
488     assert_eq!(&buf[..n], b"ABCDEFG", "data should match");
489 
490     Ok(())
491 }
492 
493 #[test]
494 fn test_reassembly_queue_unordered_fragments() -> Result<()> {
495     let mut rq = ReassemblyQueue::new(0);
496 
497     let org_ppi = PayloadProtocolIdentifier::Binary;
498 
499     let chunk = ChunkPayloadData {
500         payload_type: org_ppi,
501         unordered: true,
502         beginning_fragment: true,
503         tsn: 1,
504         stream_sequence_number: 0,
505         user_data: Bytes::from_static(b"ABC"),
506         ..Default::default()
507     };
508 
509     let complete = rq.push(chunk);
510     assert!(!complete, "chunk set should not be complete yet");
511     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
512 
513     let chunk = ChunkPayloadData {
514         payload_type: org_ppi,
515         unordered: true,
516         tsn: 2,
517         stream_sequence_number: 0,
518         user_data: Bytes::from_static(b"DEFG"),
519         ..Default::default()
520     };
521 
522     let complete = rq.push(chunk);
523     assert!(!complete, "chunk set should not be complete yet");
524     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
525 
526     let chunk = ChunkPayloadData {
527         payload_type: org_ppi,
528         unordered: true,
529         ending_fragment: true,
530         tsn: 3,
531         stream_sequence_number: 0,
532         user_data: Bytes::from_static(b"H"),
533         ..Default::default()
534     };
535 
536     let complete = rq.push(chunk);
537     assert!(complete, "chunk set should be complete");
538     assert_eq!(8, rq.get_num_bytes(), "num bytes mismatch");
539 
540     let mut buf = vec![0u8; 16];
541 
542     let (n, ppi) = rq.read(&mut buf)?;
543     assert_eq!(8, n, "should received 8 bytes");
544     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
545     assert_eq!(ppi, org_ppi, "should have valid ppi");
546     assert_eq!(&buf[..n], b"ABCDEFGH", "data should match");
547 
548     Ok(())
549 }
550 
551 #[test]
552 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> {
553     let mut rq = ReassemblyQueue::new(0);
554     let org_ppi = PayloadProtocolIdentifier::Binary;
555     let chunk = ChunkPayloadData {
556         payload_type: org_ppi,
557         beginning_fragment: true,
558         ending_fragment: true,
559         tsn: 1,
560         stream_sequence_number: 0,
561         user_data: Bytes::from_static(b"ABC"),
562         ..Default::default()
563     };
564 
565     let complete = rq.push(chunk);
566     assert!(complete, "chunk set should be complete");
567     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
568 
569     let chunk = ChunkPayloadData {
570         payload_type: org_ppi,
571         unordered: true,
572         beginning_fragment: true,
573         ending_fragment: true,
574         tsn: 2,
575         stream_sequence_number: 1,
576         user_data: Bytes::from_static(b"DEF"),
577         ..Default::default()
578     };
579 
580     let complete = rq.push(chunk);
581     assert!(complete, "chunk set should be complete");
582     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
583 
584     //
585     // Now we have two complete chunks ready to read in the reassemblyQueue.
586     //
587 
588     let mut buf = vec![0u8; 16];
589 
590     // Should read unordered chunks first
591     let (n, ppi) = rq.read(&mut buf)?;
592     assert_eq!(3, n, "should received 3 bytes");
593     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
594     assert_eq!(ppi, org_ppi, "should have valid ppi");
595     assert_eq!(&buf[..n], b"DEF", "data should match");
596 
597     // Next should read ordered chunks
598     let (n, ppi) = rq.read(&mut buf)?;
599     assert_eq!(3, n, "should received 3 bytes");
600     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
601     assert_eq!(ppi, org_ppi, "should have valid ppi");
602     assert_eq!(&buf[..n], b"ABC", "data should match");
603 
604     Ok(())
605 }
606 
607 #[test]
608 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> {
609     let mut rq = ReassemblyQueue::new(0);
610 
611     let org_ppi = PayloadProtocolIdentifier::Binary;
612 
613     let chunk = ChunkPayloadData {
614         payload_type: org_ppi,
615         unordered: true,
616         beginning_fragment: true,
617         tsn: 10,
618         stream_sequence_number: 0,
619         user_data: Bytes::from_static(b"IN"),
620         ..Default::default()
621     };
622 
623     let complete = rq.push(chunk);
624     assert!(!complete, "chunk set should not be complete yet");
625     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
626 
627     let chunk = ChunkPayloadData {
628         payload_type: org_ppi,
629         unordered: true,
630         ending_fragment: true,
631         tsn: 12, // <- incongiguous
632         stream_sequence_number: 1,
633         user_data: Bytes::from_static(b"COMPLETE"),
634         ..Default::default()
635     };
636 
637     let complete = rq.push(chunk);
638     assert!(!complete, "chunk set should not be complete yet");
639     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
640 
641     let chunk = ChunkPayloadData {
642         payload_type: org_ppi,
643         unordered: true,
644         beginning_fragment: true,
645         ending_fragment: true,
646         tsn: 13,
647         stream_sequence_number: 1,
648         user_data: Bytes::from_static(b"GOOD"),
649         ..Default::default()
650     };
651 
652     let complete = rq.push(chunk);
653     assert!(complete, "chunk set should be complete");
654     assert_eq!(14, rq.get_num_bytes(), "num bytes mismatch");
655 
656     //
657     // Now we have two complete chunks ready to read in the reassemblyQueue.
658     //
659 
660     let mut buf = vec![0u8; 16];
661 
662     // Should pick the one that has "GOOD"
663     let (n, ppi) = rq.read(&mut buf)?;
664     assert_eq!(4, n, "should receive 4 bytes");
665     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
666     assert_eq!(ppi, org_ppi, "should have valid ppi");
667     assert_eq!(&buf[..n], b"GOOD", "data should match");
668 
669     Ok(())
670 }
671 
672 #[test]
673 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> {
674     let mut rq = ReassemblyQueue::new(123);
675 
676     let org_ppi = PayloadProtocolIdentifier::Binary;
677 
678     let chunk = ChunkPayloadData {
679         payload_type: org_ppi,
680         stream_identifier: 124,
681         beginning_fragment: true,
682         ending_fragment: true,
683         tsn: 10,
684         stream_sequence_number: 0,
685         user_data: Bytes::from_static(b"IN"),
686         ..Default::default()
687     };
688 
689     let complete = rq.push(chunk);
690     assert!(!complete, "chunk should be ignored");
691     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
692     Ok(())
693 }
694 
695 #[test]
696 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> {
697     let mut rq = ReassemblyQueue::new(0);
698     rq.next_ssn = 7; // forcibly set expected SSN to 7
699 
700     let org_ppi = PayloadProtocolIdentifier::Binary;
701 
702     let chunk = ChunkPayloadData {
703         payload_type: org_ppi,
704         beginning_fragment: true,
705         ending_fragment: true,
706         tsn: 10,
707         stream_sequence_number: 6, // <-- stale
708         user_data: Bytes::from_static(b"IN"),
709         ..Default::default()
710     };
711 
712     let complete = rq.push(chunk);
713     assert!(!complete, "chunk should not be ignored");
714     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
715 
716     Ok(())
717 }
718 
719 #[test]
720 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> {
721     let mut rq = ReassemblyQueue::new(0);
722 
723     let org_ppi = PayloadProtocolIdentifier::Binary;
724 
725     let chunk = ChunkPayloadData {
726         payload_type: org_ppi,
727         beginning_fragment: true,
728         tsn: 123,
729         stream_sequence_number: 0,
730         user_data: Bytes::from_static(b"IN"),
731         ..Default::default()
732     };
733 
734     let complete = rq.push(chunk);
735     assert!(!complete, "the set should not be complete");
736     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
737 
738     let mut buf = vec![0u8; 16];
739     let result = rq.read(&mut buf);
740     assert!(result.is_err(), "read() should not succeed");
741     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
742 
743     Ok(())
744 }
745 
746 #[test]
747 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> {
748     let mut rq = ReassemblyQueue::new(0);
749 
750     let org_ppi = PayloadProtocolIdentifier::Binary;
751 
752     let chunk = ChunkPayloadData {
753         payload_type: org_ppi,
754         beginning_fragment: true,
755         ending_fragment: true,
756         tsn: 123,
757         stream_sequence_number: 1,
758         user_data: Bytes::from_static(b"IN"),
759         ..Default::default()
760     };
761 
762     let complete = rq.push(chunk);
763     assert!(complete, "the set should be complete");
764     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
765 
766     let mut buf = vec![0u8; 16];
767     let result = rq.read(&mut buf);
768     assert!(result.is_err(), "read() should not succeed");
769     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
770 
771     Ok(())
772 }
773 
774 #[test]
775 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> {
776     let mut rq = ReassemblyQueue::new(0);
777 
778     let org_ppi = PayloadProtocolIdentifier::Binary;
779 
780     let chunk = ChunkPayloadData {
781         payload_type: org_ppi,
782         beginning_fragment: true,
783         ending_fragment: true,
784         tsn: 123,
785         stream_sequence_number: 0,
786         user_data: Bytes::from_static(b"0123456789"),
787         ..Default::default()
788     };
789 
790     let complete = rq.push(chunk);
791     assert!(complete, "the set should be complete");
792     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
793 
794     let mut buf = vec![0u8; 8]; // <- passing buffer too short
795     let result = rq.read(&mut buf);
796     assert!(result.is_err(), "read() should not succeed");
797     if let Err(err) = result {
798         assert_eq!(Error::ErrShortBuffer, err, "read() should not succeed");
799     }
800     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
801 
802     Ok(())
803 }
804 
805 #[test]
806 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> {
807     let mut rq = ReassemblyQueue::new(0);
808 
809     let org_ppi = PayloadProtocolIdentifier::Binary;
810 
811     let ssn_complete = 5u16;
812     let ssn_dropped = 6u16;
813 
814     let chunk = ChunkPayloadData {
815         payload_type: org_ppi,
816         beginning_fragment: true,
817         ending_fragment: true,
818         tsn: 10,
819         stream_sequence_number: ssn_complete,
820         user_data: Bytes::from_static(b"123"),
821         ..Default::default()
822     };
823 
824     let complete = rq.push(chunk);
825     assert!(complete, "chunk set should be complete");
826     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
827 
828     let chunk = ChunkPayloadData {
829         payload_type: org_ppi,
830         beginning_fragment: true,
831         tsn: 11,
832         stream_sequence_number: ssn_dropped,
833         user_data: Bytes::from_static(b"ABC"),
834         ..Default::default()
835     };
836 
837     let complete = rq.push(chunk);
838     assert!(!complete, "chunk set should not be complete yet");
839     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
840 
841     let chunk = ChunkPayloadData {
842         payload_type: org_ppi,
843         tsn: 12,
844         stream_sequence_number: ssn_dropped,
845         user_data: Bytes::from_static(b"DEF"),
846         ..Default::default()
847     };
848 
849     let complete = rq.push(chunk);
850     assert!(!complete, "chunk set should not be complete yet");
851     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
852 
853     rq.forward_tsn_for_ordered(ssn_dropped);
854 
855     assert_eq!(1, rq.ordered.len(), "there should be one chunk left");
856     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
857 
858     Ok(())
859 }
860 
861 #[test]
862 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> {
863     let mut rq = ReassemblyQueue::new(0);
864 
865     let org_ppi = PayloadProtocolIdentifier::Binary;
866 
867     let ssn_dropped = 6u16;
868     let ssn_kept = 7u16;
869 
870     let chunk = ChunkPayloadData {
871         payload_type: org_ppi,
872         unordered: true,
873         beginning_fragment: true,
874         tsn: 11,
875         stream_sequence_number: ssn_dropped,
876         user_data: Bytes::from_static(b"ABC"),
877         ..Default::default()
878     };
879 
880     let complete = rq.push(chunk);
881     assert!(!complete, "chunk set should not be complete yet");
882     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
883 
884     let chunk = ChunkPayloadData {
885         payload_type: org_ppi,
886         unordered: true,
887         tsn: 12,
888         stream_sequence_number: ssn_dropped,
889         user_data: Bytes::from_static(b"DEF"),
890         ..Default::default()
891     };
892 
893     let complete = rq.push(chunk);
894     assert!(!complete, "chunk set should not be complete yet");
895     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
896 
897     let chunk = ChunkPayloadData {
898         payload_type: org_ppi,
899         unordered: true,
900         tsn: 14,
901         beginning_fragment: true,
902         stream_sequence_number: ssn_kept,
903         user_data: Bytes::from_static(b"SOS"),
904         ..Default::default()
905     };
906 
907     let complete = rq.push(chunk);
908     assert!(!complete, "chunk set should not be complete yet");
909     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
910 
911     // At this point, there are 3 chunks in the rq.unorderedChunks.
912     // This call should remove chunks with tsn equals to 13 or older.
913     rq.forward_tsn_for_unordered(13);
914 
915     // As a result, there should be one chunk (tsn=14)
916     assert_eq!(
917         1,
918         rq.unordered_chunks.len(),
919         "there should be one chunk kept"
920     );
921     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
922 
923     Ok(())
924 }
925 
926 #[test]
927 fn test_chunk_set_empty_chunk_set() -> Result<()> {
928     let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
929     assert!(!cset.is_complete(), "empty chunkSet cannot be complete");
930     Ok(())
931 }
932 
933 #[test]
934 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> {
935     let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
936     cset.push(ChunkPayloadData {
937         tsn: 100,
938         beginning_fragment: true,
939         ..Default::default()
940     });
941     let complete = cset.push(ChunkPayloadData {
942         tsn: 100,
943         ending_fragment: true,
944         ..Default::default()
945     });
946     assert!(!complete, "chunk with dup TSN is not complete");
947     assert_eq!(1, cset.chunks.len(), "chunk with dup TSN should be ignored");
948     Ok(())
949 }
950 
951 #[test]
952 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> {
953     let cset = ChunkSet {
954         ssn: 0,
955         ppi: PayloadProtocolIdentifier::default(),
956         chunks: vec![],
957     };
958     assert!(
959         !cset.is_complete(),
960         "chunkSet not starting with B=1 cannot be complete"
961     );
962     Ok(())
963 }
964 
965 #[test]
966 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> {
967     let cset = ChunkSet {
968         ssn: 0,
969         ppi: PayloadProtocolIdentifier::default(),
970         chunks: vec![
971             ChunkPayloadData {
972                 tsn: 100,
973                 beginning_fragment: true,
974                 ..Default::default()
975             },
976             ChunkPayloadData {
977                 tsn: 101,
978                 ..Default::default()
979             },
980             ChunkPayloadData {
981                 tsn: 103,
982                 ending_fragment: true,
983                 ..Default::default()
984             },
985         ],
986     };
987     assert!(
988         !cset.is_complete(),
989         "chunkSet not starting with incontiguous tsn cannot be complete"
990     );
991     Ok(())
992 }
993