xref: /webrtc/sctp/src/queue/queue_test.rs (revision 254a17e2)
1 use crate::error::{Error, Result};
2 
3 use bytes::{Bytes, BytesMut};
4 
5 ///////////////////////////////////////////////////////////////////
6 //payload_queue_test
7 ///////////////////////////////////////////////////////////////////
8 use super::payload_queue::*;
9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
10 use crate::chunk::chunk_selective_ack::GapAckBlock;
11 
12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData {
13     ChunkPayloadData {
14         tsn,
15         user_data: {
16             let mut b = BytesMut::new();
17             b.resize(n_bytes, 0);
18             b.freeze()
19         },
20         ..Default::default()
21     }
22 }
23 
24 #[test]
25 fn test_payload_queue_push_no_check() -> Result<()> {
26     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
27 
28     pq.push_no_check(make_payload(0, 10));
29     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
30     assert_eq!(1, pq.len(), "item count mismatch");
31     pq.push_no_check(make_payload(1, 11));
32     assert_eq!(21, pq.get_num_bytes(), "total bytes mismatch");
33     assert_eq!(2, pq.len(), "item count mismatch");
34     pq.push_no_check(make_payload(2, 12));
35     assert_eq!(33, pq.get_num_bytes(), "total bytes mismatch");
36     assert_eq!(3, pq.len(), "item count mismatch");
37 
38     for i in 0..3 {
39         assert!(!pq.sorted.is_empty(), "should not be empty");
40         let c = pq.pop(i);
41         assert!(c.is_some(), "pop should succeed");
42         if let Some(c) = c {
43             assert_eq!(i, c.tsn, "TSN should match");
44         }
45     }
46 
47     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
48     assert_eq!(0, pq.len(), "item count mismatch");
49 
50     assert!(pq.sorted.is_empty(), "should be empty");
51     pq.push_no_check(make_payload(3, 13));
52     assert_eq!(13, pq.get_num_bytes(), "total bytes mismatch");
53     pq.push_no_check(make_payload(4, 14));
54     assert_eq!(27, pq.get_num_bytes(), "total bytes mismatch");
55 
56     for i in 3..5 {
57         assert!(!pq.sorted.is_empty(), "should not be empty");
58         let c = pq.pop(i);
59         assert!(c.is_some(), "pop should succeed");
60         if let Some(c) = c {
61             assert_eq!(i, c.tsn, "TSN should match");
62         }
63     }
64 
65     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
66     assert_eq!(0, pq.len(), "item count mismatch");
67 
68     Ok(())
69 }
70 
71 #[test]
72 fn test_payload_queue_get_gap_ack_block() -> Result<()> {
73     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
74 
75     pq.push(make_payload(1, 0), 0);
76     pq.push(make_payload(2, 0), 0);
77     pq.push(make_payload(3, 0), 0);
78     pq.push(make_payload(4, 0), 0);
79     pq.push(make_payload(5, 0), 0);
80     pq.push(make_payload(6, 0), 0);
81 
82     let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
83     let gab2 = pq.get_gap_ack_blocks(0);
84     assert!(!gab2.is_empty());
85     assert_eq!(gab2.len(), 1);
86 
87     assert_eq!(gab1[0].start, gab2[0].start);
88     assert_eq!(gab1[0].end, gab2[0].end);
89 
90     pq.push(make_payload(8, 0), 0);
91     pq.push(make_payload(9, 0), 0);
92 
93     let gab1 = vec![
94         GapAckBlock { start: 1, end: 6 },
95         GapAckBlock { start: 8, end: 9 },
96     ];
97     let gab2 = pq.get_gap_ack_blocks(0);
98     assert!(!gab2.is_empty());
99     assert_eq!(gab2.len(), 2);
100 
101     assert_eq!(gab1[0].start, gab2[0].start);
102     assert_eq!(gab1[0].end, gab2[0].end);
103     assert_eq!(gab1[1].start, gab2[1].start);
104     assert_eq!(gab1[1].end, gab2[1].end);
105 
106     Ok(())
107 }
108 
109 #[test]
110 fn test_payload_queue_get_last_tsn_received() -> Result<()> {
111     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
112 
113     // empty queie should return false
114     let ok = pq.get_last_tsn_received();
115     assert!(ok.is_none(), "should be none");
116 
117     let ok = pq.push(make_payload(20, 0), 0);
118     assert!(ok, "should be true");
119     let tsn = pq.get_last_tsn_received();
120     assert!(tsn.is_some(), "should be false");
121     assert_eq!(Some(&20), tsn, "should match");
122 
123     // append should work
124     let ok = pq.push(make_payload(21, 0), 0);
125     assert!(ok, "should be true");
126     let tsn = pq.get_last_tsn_received();
127     assert!(tsn.is_some(), "should be false");
128     assert_eq!(Some(&21), tsn, "should match");
129 
130     // check if sorting applied
131     let ok = pq.push(make_payload(19, 0), 0);
132     assert!(ok, "should be true");
133     let tsn = pq.get_last_tsn_received();
134     assert!(tsn.is_some(), "should be false");
135     assert_eq!(Some(&21), tsn, "should match");
136 
137     Ok(())
138 }
139 
140 #[test]
141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> {
142     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
143 
144     for i in 0..3 {
145         pq.push(make_payload(i + 1, 10), 0);
146     }
147     pq.mark_as_acked(2);
148     pq.mark_all_to_retrasmit();
149 
150     let c = pq.get(1);
151     assert!(c.is_some(), "should be true");
152     assert!(c.unwrap().retransmit, "should be marked as retransmit");
153     let c = pq.get(2);
154     assert!(c.is_some(), "should be true");
155     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
156     let c = pq.get(3);
157     assert!(c.is_some(), "should be true");
158     assert!(c.unwrap().retransmit, "should be marked as retransmit");
159 
160     Ok(())
161 }
162 
163 #[test]
164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> {
165     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
166 
167     for i in 0..4 {
168         pq.push(make_payload(i + 1, 10), 0);
169     }
170 
171     pq.mark_all_to_retrasmit();
172     pq.mark_as_acked(2); // should cancel retransmission for TSN 2
173     pq.mark_as_acked(4); // should cancel retransmission for TSN 4
174 
175     let c = pq.get(1);
176     assert!(c.is_some(), "should be true");
177     assert!(c.unwrap().retransmit, "should be marked as retransmit");
178     let c = pq.get(2);
179     assert!(c.is_some(), "should be true");
180     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
181     let c = pq.get(3);
182     assert!(c.is_some(), "should be true");
183     assert!(c.unwrap().retransmit, "should be marked as retransmit");
184     let c = pq.get(4);
185     assert!(c.is_some(), "should be true");
186     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
187 
188     Ok(())
189 }
190 
191 ///////////////////////////////////////////////////////////////////
192 //pending_queue_test
193 ///////////////////////////////////////////////////////////////////
194 use super::pending_queue::*;
195 
196 const NO_FRAGMENT: usize = 0;
197 const FRAG_BEGIN: usize = 1;
198 const FRAG_MIDDLE: usize = 2;
199 const FRAG_END: usize = 3;
200 
201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData {
202     let mut b = false;
203     let mut e = false;
204 
205     match frag {
206         NO_FRAGMENT => {
207             b = true;
208             e = true;
209         }
210         FRAG_BEGIN => {
211             b = true;
212         }
213         FRAG_END => e = true,
214         _ => {}
215     };
216 
217     ChunkPayloadData {
218         tsn,
219         unordered,
220         beginning_fragment: b,
221         ending_fragment: e,
222         user_data: {
223             let mut b = BytesMut::new();
224             b.resize(10, 0); // always 10 bytes
225             b.freeze()
226         },
227         ..Default::default()
228     }
229 }
230 
231 #[test]
232 fn test_pending_base_queue_push_and_pop() -> Result<()> {
233     let mut pq = PendingBaseQueue::new();
234     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
235     pq.push_back(make_data_chunk(1, false, NO_FRAGMENT));
236     pq.push_back(make_data_chunk(2, false, NO_FRAGMENT));
237 
238     for i in 0..3 {
239         let c = pq.get(i);
240         assert!(c.is_some(), "should not be none");
241         assert_eq!(i as u32, c.unwrap().tsn, "TSN should match");
242     }
243 
244     for i in 0..3 {
245         let c = pq.pop_front();
246         assert!(c.is_some(), "should not be none");
247         assert_eq!(i, c.unwrap().tsn, "TSN should match");
248     }
249 
250     pq.push_back(make_data_chunk(3, false, NO_FRAGMENT));
251     pq.push_back(make_data_chunk(4, false, NO_FRAGMENT));
252 
253     for i in 3..5 {
254         let c = pq.pop_front();
255         assert!(c.is_some(), "should not be none");
256         assert_eq!(i, c.unwrap().tsn, "TSN should match");
257     }
258     Ok(())
259 }
260 
261 #[test]
262 fn test_pending_base_queue_out_of_bounce() -> Result<()> {
263     let mut pq = PendingBaseQueue::new();
264     assert!(pq.pop_front().is_none(), "should be none");
265     assert!(pq.get(0).is_none(), "should be none");
266 
267     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
268     assert!(pq.get(1).is_none(), "should be none");
269 
270     Ok(())
271 }
272 
273 // NOTE: TSN is not used in pendingQueue in the actual usage.
274 //       Following tests use TSN field as a chunk ID.
275 #[test]
276 fn test_pending_queue_push_and_pop() -> Result<()> {
277     let pq = PendingQueue::new();
278     pq.push(make_data_chunk(0, false, NO_FRAGMENT));
279     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
280     pq.push(make_data_chunk(1, false, NO_FRAGMENT));
281     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
282     pq.push(make_data_chunk(2, false, NO_FRAGMENT));
283     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
284 
285     for i in 0..3 {
286         let c = pq.peek();
287         assert!(c.is_some(), "peek error");
288         let c = c.unwrap();
289         assert_eq!(i, c.tsn, "TSN should match");
290         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
291 
292         let result = pq.pop(beginning_fragment, unordered);
293         assert!(result.is_some(), "should not error: {}", i);
294     }
295 
296     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
297 
298     pq.push(make_data_chunk(3, false, NO_FRAGMENT));
299     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
300     pq.push(make_data_chunk(4, false, NO_FRAGMENT));
301     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
302 
303     for i in 3..5 {
304         let c = pq.peek();
305         assert!(c.is_some(), "peek error");
306         let c = c.unwrap();
307         assert_eq!(i, c.tsn, "TSN should match");
308         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
309 
310         let result = pq.pop(beginning_fragment, unordered);
311         assert!(result.is_some(), "should not error: {}", i);
312     }
313 
314     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
315 
316     Ok(())
317 }
318 
319 #[test]
320 fn test_pending_queue_unordered_wins() -> Result<()> {
321     let pq = PendingQueue::new();
322 
323     pq.push(make_data_chunk(0, false, NO_FRAGMENT));
324     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
325     pq.push(make_data_chunk(1, true, NO_FRAGMENT));
326     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
327     pq.push(make_data_chunk(2, false, NO_FRAGMENT));
328     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
329     pq.push(make_data_chunk(3, true, NO_FRAGMENT));
330     assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch");
331 
332     let c = pq.peek();
333     assert!(c.is_some(), "peek error");
334     let c = c.unwrap();
335     assert_eq!(1, c.tsn, "TSN should match");
336     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
337     let result = pq.pop(beginning_fragment, unordered);
338     assert!(result.is_some(), "should not error");
339 
340     let c = pq.peek();
341     assert!(c.is_some(), "peek error");
342     let c = c.unwrap();
343     assert_eq!(3, c.tsn, "TSN should match");
344     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
345     let result = pq.pop(beginning_fragment, unordered);
346     assert!(result.is_some(), "should not error");
347 
348     let c = pq.peek();
349     assert!(c.is_some(), "peek error");
350     let c = c.unwrap();
351     assert_eq!(0, c.tsn, "TSN should match");
352     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
353     let result = pq.pop(beginning_fragment, unordered);
354     assert!(result.is_some(), "should not error");
355 
356     let c = pq.peek();
357     assert!(c.is_some(), "peek error");
358     let c = c.unwrap();
359     assert_eq!(2, c.tsn, "TSN should match");
360     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
361     let result = pq.pop(beginning_fragment, unordered);
362     assert!(result.is_some(), "should not error");
363 
364     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
365 
366     Ok(())
367 }
368 
369 #[test]
370 fn test_pending_queue_fragments() -> Result<()> {
371     let pq = PendingQueue::new();
372     pq.push(make_data_chunk(0, false, FRAG_BEGIN));
373     pq.push(make_data_chunk(1, false, FRAG_MIDDLE));
374     pq.push(make_data_chunk(2, false, FRAG_END));
375     pq.push(make_data_chunk(3, true, FRAG_BEGIN));
376     pq.push(make_data_chunk(4, true, FRAG_MIDDLE));
377     pq.push(make_data_chunk(5, true, FRAG_END));
378 
379     let expects = vec![3, 4, 5, 0, 1, 2];
380 
381     for exp in expects {
382         let c = pq.peek();
383         assert!(c.is_some(), "peek error");
384         let c = c.unwrap();
385         assert_eq!(exp, c.tsn, "TSN should match");
386         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
387         let result = pq.pop(beginning_fragment, unordered);
388         assert!(result.is_some(), "should not error: {}", exp);
389     }
390 
391     Ok(())
392 }
393 
394 // Once decided ordered or unordered, the decision should persist until
395 // it pops a chunk with ending_fragment flags set to true.
396 #[test]
397 fn test_pending_queue_selection_persistence() -> Result<()> {
398     let pq = PendingQueue::new();
399     pq.push(make_data_chunk(0, false, FRAG_BEGIN));
400 
401     let c = pq.peek();
402     assert!(c.is_some(), "peek error");
403     let c = c.unwrap();
404     assert_eq!(0, c.tsn, "TSN should match");
405     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
406     let result = pq.pop(beginning_fragment, unordered);
407     assert!(result.is_some(), "should not error: {}", 0);
408 
409     pq.push(make_data_chunk(1, true, NO_FRAGMENT));
410     pq.push(make_data_chunk(2, false, FRAG_MIDDLE));
411     pq.push(make_data_chunk(3, false, FRAG_END));
412 
413     let expects = vec![2, 3, 1];
414 
415     for exp in expects {
416         let c = pq.peek();
417         assert!(c.is_some(), "peek error");
418         let c = c.unwrap();
419         assert_eq!(exp, c.tsn, "TSN should match");
420         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
421         let result = pq.pop(beginning_fragment, unordered);
422         assert!(result.is_some(), "should not error: {}", exp);
423     }
424 
425     Ok(())
426 }
427 
428 #[tokio::test]
429 async fn test_pending_queue_append() -> Result<()> {
430     let pq = PendingQueue::new();
431     pq.append(vec![
432         make_data_chunk(0, false, NO_FRAGMENT),
433         make_data_chunk(1, false, NO_FRAGMENT),
434         make_data_chunk(3, false, NO_FRAGMENT),
435     ]);
436     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
437     assert_eq!(3, pq.len(), "len mismatch");
438 
439     Ok(())
440 }
441 
442 ///////////////////////////////////////////////////////////////////
443 //reassembly_queue_test
444 ///////////////////////////////////////////////////////////////////
445 use super::reassembly_queue::*;
446 use std::sync::atomic::AtomicUsize;
447 use std::sync::Arc;
448 
449 #[test]
450 fn test_reassembly_queue_ordered_fragments() -> Result<()> {
451     let mut rq = ReassemblyQueue::new(0);
452 
453     let org_ppi = PayloadProtocolIdentifier::Binary;
454 
455     let chunk = ChunkPayloadData {
456         payload_type: org_ppi,
457         beginning_fragment: true,
458         tsn: 1,
459         stream_sequence_number: 0,
460         user_data: Bytes::from_static(b"ABC"),
461         ..Default::default()
462     };
463 
464     let complete = rq.push(chunk);
465     assert!(!complete, "chunk set should not be complete yet");
466     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
467 
468     let chunk = ChunkPayloadData {
469         payload_type: org_ppi,
470         ending_fragment: true,
471         tsn: 2,
472         stream_sequence_number: 0,
473         user_data: Bytes::from_static(b"DEFG"),
474         ..Default::default()
475     };
476 
477     let complete = rq.push(chunk);
478     assert!(complete, "chunk set should be complete");
479     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
480 
481     let mut buf = vec![0u8; 16];
482 
483     let (n, ppi) = rq.read(&mut buf)?;
484     assert_eq!(7, n, "should received 7 bytes");
485     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
486     assert_eq!(ppi, org_ppi, "should have valid ppi");
487     assert_eq!(&buf[..n], b"ABCDEFG", "data should match");
488 
489     Ok(())
490 }
491 
492 #[test]
493 fn test_reassembly_queue_unordered_fragments() -> Result<()> {
494     let mut rq = ReassemblyQueue::new(0);
495 
496     let org_ppi = PayloadProtocolIdentifier::Binary;
497 
498     let chunk = ChunkPayloadData {
499         payload_type: org_ppi,
500         unordered: true,
501         beginning_fragment: true,
502         tsn: 1,
503         stream_sequence_number: 0,
504         user_data: Bytes::from_static(b"ABC"),
505         ..Default::default()
506     };
507 
508     let complete = rq.push(chunk);
509     assert!(!complete, "chunk set should not be complete yet");
510     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
511 
512     let chunk = ChunkPayloadData {
513         payload_type: org_ppi,
514         unordered: true,
515         tsn: 2,
516         stream_sequence_number: 0,
517         user_data: Bytes::from_static(b"DEFG"),
518         ..Default::default()
519     };
520 
521     let complete = rq.push(chunk);
522     assert!(!complete, "chunk set should not be complete yet");
523     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
524 
525     let chunk = ChunkPayloadData {
526         payload_type: org_ppi,
527         unordered: true,
528         ending_fragment: true,
529         tsn: 3,
530         stream_sequence_number: 0,
531         user_data: Bytes::from_static(b"H"),
532         ..Default::default()
533     };
534 
535     let complete = rq.push(chunk);
536     assert!(complete, "chunk set should be complete");
537     assert_eq!(8, rq.get_num_bytes(), "num bytes mismatch");
538 
539     let mut buf = vec![0u8; 16];
540 
541     let (n, ppi) = rq.read(&mut buf)?;
542     assert_eq!(8, n, "should received 8 bytes");
543     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
544     assert_eq!(ppi, org_ppi, "should have valid ppi");
545     assert_eq!(&buf[..n], b"ABCDEFGH", "data should match");
546 
547     Ok(())
548 }
549 
550 #[test]
551 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> {
552     let mut rq = ReassemblyQueue::new(0);
553     let org_ppi = PayloadProtocolIdentifier::Binary;
554     let chunk = ChunkPayloadData {
555         payload_type: org_ppi,
556         beginning_fragment: true,
557         ending_fragment: true,
558         tsn: 1,
559         stream_sequence_number: 0,
560         user_data: Bytes::from_static(b"ABC"),
561         ..Default::default()
562     };
563 
564     let complete = rq.push(chunk);
565     assert!(complete, "chunk set should be complete");
566     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
567 
568     let chunk = ChunkPayloadData {
569         payload_type: org_ppi,
570         unordered: true,
571         beginning_fragment: true,
572         ending_fragment: true,
573         tsn: 2,
574         stream_sequence_number: 1,
575         user_data: Bytes::from_static(b"DEF"),
576         ..Default::default()
577     };
578 
579     let complete = rq.push(chunk);
580     assert!(complete, "chunk set should be complete");
581     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
582 
583     //
584     // Now we have two complete chunks ready to read in the reassemblyQueue.
585     //
586 
587     let mut buf = vec![0u8; 16];
588 
589     // Should read unordered chunks first
590     let (n, ppi) = rq.read(&mut buf)?;
591     assert_eq!(3, n, "should received 3 bytes");
592     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
593     assert_eq!(ppi, org_ppi, "should have valid ppi");
594     assert_eq!(&buf[..n], b"DEF", "data should match");
595 
596     // Next should read ordered chunks
597     let (n, ppi) = rq.read(&mut buf)?;
598     assert_eq!(3, n, "should received 3 bytes");
599     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
600     assert_eq!(ppi, org_ppi, "should have valid ppi");
601     assert_eq!(&buf[..n], b"ABC", "data should match");
602 
603     Ok(())
604 }
605 
606 #[test]
607 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> {
608     let mut rq = ReassemblyQueue::new(0);
609 
610     let org_ppi = PayloadProtocolIdentifier::Binary;
611 
612     let chunk = ChunkPayloadData {
613         payload_type: org_ppi,
614         unordered: true,
615         beginning_fragment: true,
616         tsn: 10,
617         stream_sequence_number: 0,
618         user_data: Bytes::from_static(b"IN"),
619         ..Default::default()
620     };
621 
622     let complete = rq.push(chunk);
623     assert!(!complete, "chunk set should not be complete yet");
624     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
625 
626     let chunk = ChunkPayloadData {
627         payload_type: org_ppi,
628         unordered: true,
629         ending_fragment: true,
630         tsn: 12, // <- incongiguous
631         stream_sequence_number: 1,
632         user_data: Bytes::from_static(b"COMPLETE"),
633         ..Default::default()
634     };
635 
636     let complete = rq.push(chunk);
637     assert!(!complete, "chunk set should not be complete yet");
638     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
639 
640     let chunk = ChunkPayloadData {
641         payload_type: org_ppi,
642         unordered: true,
643         beginning_fragment: true,
644         ending_fragment: true,
645         tsn: 13,
646         stream_sequence_number: 1,
647         user_data: Bytes::from_static(b"GOOD"),
648         ..Default::default()
649     };
650 
651     let complete = rq.push(chunk);
652     assert!(complete, "chunk set should be complete");
653     assert_eq!(14, rq.get_num_bytes(), "num bytes mismatch");
654 
655     //
656     // Now we have two complete chunks ready to read in the reassemblyQueue.
657     //
658 
659     let mut buf = vec![0u8; 16];
660 
661     // Should pick the one that has "GOOD"
662     let (n, ppi) = rq.read(&mut buf)?;
663     assert_eq!(4, n, "should receive 4 bytes");
664     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
665     assert_eq!(ppi, org_ppi, "should have valid ppi");
666     assert_eq!(&buf[..n], b"GOOD", "data should match");
667 
668     Ok(())
669 }
670 
671 #[test]
672 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> {
673     let mut rq = ReassemblyQueue::new(123);
674 
675     let org_ppi = PayloadProtocolIdentifier::Binary;
676 
677     let chunk = ChunkPayloadData {
678         payload_type: org_ppi,
679         stream_identifier: 124,
680         beginning_fragment: true,
681         ending_fragment: true,
682         tsn: 10,
683         stream_sequence_number: 0,
684         user_data: Bytes::from_static(b"IN"),
685         ..Default::default()
686     };
687 
688     let complete = rq.push(chunk);
689     assert!(!complete, "chunk should be ignored");
690     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
691     Ok(())
692 }
693 
694 #[test]
695 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> {
696     let mut rq = ReassemblyQueue::new(0);
697     rq.next_ssn = 7; // forcibly set expected SSN to 7
698 
699     let org_ppi = PayloadProtocolIdentifier::Binary;
700 
701     let chunk = ChunkPayloadData {
702         payload_type: org_ppi,
703         beginning_fragment: true,
704         ending_fragment: true,
705         tsn: 10,
706         stream_sequence_number: 6, // <-- stale
707         user_data: Bytes::from_static(b"IN"),
708         ..Default::default()
709     };
710 
711     let complete = rq.push(chunk);
712     assert!(!complete, "chunk should not be ignored");
713     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
714 
715     Ok(())
716 }
717 
718 #[test]
719 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> {
720     let mut rq = ReassemblyQueue::new(0);
721 
722     let org_ppi = PayloadProtocolIdentifier::Binary;
723 
724     let chunk = ChunkPayloadData {
725         payload_type: org_ppi,
726         beginning_fragment: true,
727         tsn: 123,
728         stream_sequence_number: 0,
729         user_data: Bytes::from_static(b"IN"),
730         ..Default::default()
731     };
732 
733     let complete = rq.push(chunk);
734     assert!(!complete, "the set should not be complete");
735     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
736 
737     let mut buf = vec![0u8; 16];
738     let result = rq.read(&mut buf);
739     assert!(result.is_err(), "read() should not succeed");
740     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
741 
742     Ok(())
743 }
744 
745 #[test]
746 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> {
747     let mut rq = ReassemblyQueue::new(0);
748 
749     let org_ppi = PayloadProtocolIdentifier::Binary;
750 
751     let chunk = ChunkPayloadData {
752         payload_type: org_ppi,
753         beginning_fragment: true,
754         ending_fragment: true,
755         tsn: 123,
756         stream_sequence_number: 1,
757         user_data: Bytes::from_static(b"IN"),
758         ..Default::default()
759     };
760 
761     let complete = rq.push(chunk);
762     assert!(complete, "the set should be complete");
763     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
764 
765     let mut buf = vec![0u8; 16];
766     let result = rq.read(&mut buf);
767     assert!(result.is_err(), "read() should not succeed");
768     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
769 
770     Ok(())
771 }
772 
773 #[test]
774 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> {
775     let mut rq = ReassemblyQueue::new(0);
776 
777     let org_ppi = PayloadProtocolIdentifier::Binary;
778 
779     let chunk = ChunkPayloadData {
780         payload_type: org_ppi,
781         beginning_fragment: true,
782         ending_fragment: true,
783         tsn: 123,
784         stream_sequence_number: 0,
785         user_data: Bytes::from_static(b"0123456789"),
786         ..Default::default()
787     };
788 
789     let complete = rq.push(chunk);
790     assert!(complete, "the set should be complete");
791     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
792 
793     let mut buf = vec![0u8; 8]; // <- passing buffer too short
794     let result = rq.read(&mut buf);
795     assert!(result.is_err(), "read() should not succeed");
796     if let Err(err) = result {
797         assert_eq!(Error::ErrShortBuffer, err, "read() should not succeed");
798     }
799     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
800 
801     Ok(())
802 }
803 
804 #[test]
805 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> {
806     let mut rq = ReassemblyQueue::new(0);
807 
808     let org_ppi = PayloadProtocolIdentifier::Binary;
809 
810     let ssn_complete = 5u16;
811     let ssn_dropped = 6u16;
812 
813     let chunk = ChunkPayloadData {
814         payload_type: org_ppi,
815         beginning_fragment: true,
816         ending_fragment: true,
817         tsn: 10,
818         stream_sequence_number: ssn_complete,
819         user_data: Bytes::from_static(b"123"),
820         ..Default::default()
821     };
822 
823     let complete = rq.push(chunk);
824     assert!(complete, "chunk set should be complete");
825     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
826 
827     let chunk = ChunkPayloadData {
828         payload_type: org_ppi,
829         beginning_fragment: true,
830         tsn: 11,
831         stream_sequence_number: ssn_dropped,
832         user_data: Bytes::from_static(b"ABC"),
833         ..Default::default()
834     };
835 
836     let complete = rq.push(chunk);
837     assert!(!complete, "chunk set should not be complete yet");
838     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
839 
840     let chunk = ChunkPayloadData {
841         payload_type: org_ppi,
842         tsn: 12,
843         stream_sequence_number: ssn_dropped,
844         user_data: Bytes::from_static(b"DEF"),
845         ..Default::default()
846     };
847 
848     let complete = rq.push(chunk);
849     assert!(!complete, "chunk set should not be complete yet");
850     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
851 
852     rq.forward_tsn_for_ordered(ssn_dropped);
853 
854     assert_eq!(1, rq.ordered.len(), "there should be one chunk left");
855     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
856 
857     Ok(())
858 }
859 
860 #[test]
861 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> {
862     let mut rq = ReassemblyQueue::new(0);
863 
864     let org_ppi = PayloadProtocolIdentifier::Binary;
865 
866     let ssn_dropped = 6u16;
867     let ssn_kept = 7u16;
868 
869     let chunk = ChunkPayloadData {
870         payload_type: org_ppi,
871         unordered: true,
872         beginning_fragment: true,
873         tsn: 11,
874         stream_sequence_number: ssn_dropped,
875         user_data: Bytes::from_static(b"ABC"),
876         ..Default::default()
877     };
878 
879     let complete = rq.push(chunk);
880     assert!(!complete, "chunk set should not be complete yet");
881     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
882 
883     let chunk = ChunkPayloadData {
884         payload_type: org_ppi,
885         unordered: true,
886         tsn: 12,
887         stream_sequence_number: ssn_dropped,
888         user_data: Bytes::from_static(b"DEF"),
889         ..Default::default()
890     };
891 
892     let complete = rq.push(chunk);
893     assert!(!complete, "chunk set should not be complete yet");
894     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
895 
896     let chunk = ChunkPayloadData {
897         payload_type: org_ppi,
898         unordered: true,
899         tsn: 14,
900         beginning_fragment: true,
901         stream_sequence_number: ssn_kept,
902         user_data: Bytes::from_static(b"SOS"),
903         ..Default::default()
904     };
905 
906     let complete = rq.push(chunk);
907     assert!(!complete, "chunk set should not be complete yet");
908     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
909 
910     // At this point, there are 3 chunks in the rq.unorderedChunks.
911     // This call should remove chunks with tsn equals to 13 or older.
912     rq.forward_tsn_for_unordered(13);
913 
914     // As a result, there should be one chunk (tsn=14)
915     assert_eq!(
916         1,
917         rq.unordered_chunks.len(),
918         "there should be one chunk kept"
919     );
920     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
921 
922     Ok(())
923 }
924 
925 #[test]
926 fn test_chunk_set_empty_chunk_set() -> Result<()> {
927     let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
928     assert!(!cset.is_complete(), "empty chunkSet cannot be complete");
929     Ok(())
930 }
931 
932 #[test]
933 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> {
934     let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
935     cset.push(ChunkPayloadData {
936         tsn: 100,
937         beginning_fragment: true,
938         ..Default::default()
939     });
940     let complete = cset.push(ChunkPayloadData {
941         tsn: 100,
942         ending_fragment: true,
943         ..Default::default()
944     });
945     assert!(!complete, "chunk with dup TSN is not complete");
946     assert_eq!(1, cset.chunks.len(), "chunk with dup TSN should be ignored");
947     Ok(())
948 }
949 
950 #[test]
951 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> {
952     let cset = ChunkSet {
953         ssn: 0,
954         ppi: PayloadProtocolIdentifier::default(),
955         chunks: vec![],
956     };
957     assert!(
958         !cset.is_complete(),
959         "chunkSet not starting with B=1 cannot be complete"
960     );
961     Ok(())
962 }
963 
964 #[test]
965 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> {
966     let cset = ChunkSet {
967         ssn: 0,
968         ppi: PayloadProtocolIdentifier::default(),
969         chunks: vec![
970             ChunkPayloadData {
971                 tsn: 100,
972                 beginning_fragment: true,
973                 ..Default::default()
974             },
975             ChunkPayloadData {
976                 tsn: 101,
977                 ..Default::default()
978             },
979             ChunkPayloadData {
980                 tsn: 103,
981                 ending_fragment: true,
982                 ..Default::default()
983             },
984         ],
985     };
986     assert!(
987         !cset.is_complete(),
988         "chunkSet not starting with incontiguous tsn cannot be complete"
989     );
990     Ok(())
991 }
992