xref: /webrtc/sctp/src/queue/queue_test.rs (revision 065cac58)
1 use crate::error::{Error, Result};
2 
3 use bytes::{Bytes, BytesMut};
4 
5 ///////////////////////////////////////////////////////////////////
6 //payload_queue_test
7 ///////////////////////////////////////////////////////////////////
8 use super::payload_queue::*;
9 use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
10 use crate::chunk::chunk_selective_ack::GapAckBlock;
11 
12 fn make_payload(tsn: u32, n_bytes: usize) -> ChunkPayloadData {
13     ChunkPayloadData {
14         tsn,
15         user_data: {
16             let mut b = BytesMut::new();
17             b.resize(n_bytes, 0);
18             b.freeze()
19         },
20         ..Default::default()
21     }
22 }
23 
24 #[test]
25 fn test_payload_queue_push_no_check() -> Result<()> {
26     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
27 
28     pq.push_no_check(make_payload(0, 10));
29     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
30     assert_eq!(1, pq.len(), "item count mismatch");
31     pq.push_no_check(make_payload(1, 11));
32     assert_eq!(21, pq.get_num_bytes(), "total bytes mismatch");
33     assert_eq!(2, pq.len(), "item count mismatch");
34     pq.push_no_check(make_payload(2, 12));
35     assert_eq!(33, pq.get_num_bytes(), "total bytes mismatch");
36     assert_eq!(3, pq.len(), "item count mismatch");
37 
38     for i in 0..3 {
39         assert!(!pq.sorted.is_empty(), "should not be empty");
40         let c = pq.pop(i);
41         assert!(c.is_some(), "pop should succeed");
42         if let Some(c) = c {
43             assert_eq!(i, c.tsn, "TSN should match");
44         }
45     }
46 
47     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
48     assert_eq!(0, pq.len(), "item count mismatch");
49 
50     assert!(pq.sorted.is_empty(), "should be empty");
51     pq.push_no_check(make_payload(3, 13));
52     assert_eq!(13, pq.get_num_bytes(), "total bytes mismatch");
53     pq.push_no_check(make_payload(4, 14));
54     assert_eq!(27, pq.get_num_bytes(), "total bytes mismatch");
55 
56     for i in 3..5 {
57         assert!(!pq.sorted.is_empty(), "should not be empty");
58         let c = pq.pop(i);
59         assert!(c.is_some(), "pop should succeed");
60         if let Some(c) = c {
61             assert_eq!(i, c.tsn, "TSN should match");
62         }
63     }
64 
65     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
66     assert_eq!(0, pq.len(), "item count mismatch");
67 
68     Ok(())
69 }
70 
71 #[test]
72 fn test_payload_queue_get_gap_ack_block() -> Result<()> {
73     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
74 
75     pq.push(make_payload(1, 0), 0);
76     pq.push(make_payload(2, 0), 0);
77     pq.push(make_payload(3, 0), 0);
78     pq.push(make_payload(4, 0), 0);
79     pq.push(make_payload(5, 0), 0);
80     pq.push(make_payload(6, 0), 0);
81 
82     let gab1 = vec![GapAckBlock { start: 1, end: 6 }];
83     let gab2 = pq.get_gap_ack_blocks(0);
84     assert!(!gab2.is_empty());
85     assert_eq!(gab2.len(), 1);
86 
87     assert_eq!(gab1[0].start, gab2[0].start);
88     assert_eq!(gab1[0].end, gab2[0].end);
89 
90     pq.push(make_payload(8, 0), 0);
91     pq.push(make_payload(9, 0), 0);
92 
93     let gab1 = vec![
94         GapAckBlock { start: 1, end: 6 },
95         GapAckBlock { start: 8, end: 9 },
96     ];
97     let gab2 = pq.get_gap_ack_blocks(0);
98     assert!(!gab2.is_empty());
99     assert_eq!(gab2.len(), 2);
100 
101     assert_eq!(gab1[0].start, gab2[0].start);
102     assert_eq!(gab1[0].end, gab2[0].end);
103     assert_eq!(gab1[1].start, gab2[1].start);
104     assert_eq!(gab1[1].end, gab2[1].end);
105 
106     Ok(())
107 }
108 
109 #[test]
110 fn test_payload_queue_get_last_tsn_received() -> Result<()> {
111     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
112 
113     // empty queie should return false
114     let ok = pq.get_last_tsn_received();
115     assert!(ok.is_none(), "should be none");
116 
117     let ok = pq.push(make_payload(20, 0), 0);
118     assert!(ok, "should be true");
119     let tsn = pq.get_last_tsn_received();
120     assert!(tsn.is_some(), "should be false");
121     assert_eq!(Some(&20), tsn, "should match");
122 
123     // append should work
124     let ok = pq.push(make_payload(21, 0), 0);
125     assert!(ok, "should be true");
126     let tsn = pq.get_last_tsn_received();
127     assert!(tsn.is_some(), "should be false");
128     assert_eq!(Some(&21), tsn, "should match");
129 
130     // check if sorting applied
131     let ok = pq.push(make_payload(19, 0), 0);
132     assert!(ok, "should be true");
133     let tsn = pq.get_last_tsn_received();
134     assert!(tsn.is_some(), "should be false");
135     assert_eq!(Some(&21), tsn, "should match");
136 
137     Ok(())
138 }
139 
140 #[test]
141 fn test_payload_queue_mark_all_to_retrasmit() -> Result<()> {
142     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
143 
144     for i in 0..3 {
145         pq.push(make_payload(i + 1, 10), 0);
146     }
147     pq.mark_as_acked(2);
148     pq.mark_all_to_retrasmit();
149 
150     let c = pq.get(1);
151     assert!(c.is_some(), "should be true");
152     assert!(c.unwrap().retransmit, "should be marked as retransmit");
153     let c = pq.get(2);
154     assert!(c.is_some(), "should be true");
155     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
156     let c = pq.get(3);
157     assert!(c.is_some(), "should be true");
158     assert!(c.unwrap().retransmit, "should be marked as retransmit");
159 
160     Ok(())
161 }
162 
163 #[test]
164 fn test_payload_queue_reset_retransmit_flag_on_ack() -> Result<()> {
165     let mut pq = PayloadQueue::new(Arc::new(AtomicUsize::new(0)));
166 
167     for i in 0..4 {
168         pq.push(make_payload(i + 1, 10), 0);
169     }
170 
171     pq.mark_all_to_retrasmit();
172     pq.mark_as_acked(2); // should cancel retransmission for TSN 2
173     pq.mark_as_acked(4); // should cancel retransmission for TSN 4
174 
175     let c = pq.get(1);
176     assert!(c.is_some(), "should be true");
177     assert!(c.unwrap().retransmit, "should be marked as retransmit");
178     let c = pq.get(2);
179     assert!(c.is_some(), "should be true");
180     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
181     let c = pq.get(3);
182     assert!(c.is_some(), "should be true");
183     assert!(c.unwrap().retransmit, "should be marked as retransmit");
184     let c = pq.get(4);
185     assert!(c.is_some(), "should be true");
186     assert!(!c.unwrap().retransmit, "should NOT be marked as retransmit");
187 
188     Ok(())
189 }
190 
191 ///////////////////////////////////////////////////////////////////
192 //pending_queue_test
193 ///////////////////////////////////////////////////////////////////
194 use super::pending_queue::*;
195 
196 const NO_FRAGMENT: usize = 0;
197 const FRAG_BEGIN: usize = 1;
198 const FRAG_MIDDLE: usize = 2;
199 const FRAG_END: usize = 3;
200 
201 fn make_data_chunk(tsn: u32, unordered: bool, frag: usize) -> ChunkPayloadData {
202     let mut b = false;
203     let mut e = false;
204 
205     match frag {
206         NO_FRAGMENT => {
207             b = true;
208             e = true;
209         }
210         FRAG_BEGIN => {
211             b = true;
212         }
213         FRAG_END => e = true,
214         _ => {}
215     };
216 
217     ChunkPayloadData {
218         tsn,
219         unordered,
220         beginning_fragment: b,
221         ending_fragment: e,
222         user_data: {
223             let mut b = BytesMut::new();
224             b.resize(10, 0); // always 10 bytes
225             b.freeze()
226         },
227         ..Default::default()
228     }
229 }
230 
231 #[test]
232 fn test_pending_base_queue_push_and_pop() -> Result<()> {
233     let mut pq = PendingBaseQueue::new();
234     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
235     pq.push_back(make_data_chunk(1, false, NO_FRAGMENT));
236     pq.push_back(make_data_chunk(2, false, NO_FRAGMENT));
237 
238     for i in 0..3 {
239         let c = pq.get(i);
240         assert!(c.is_some(), "should not be none");
241         assert_eq!(i as u32, c.unwrap().tsn, "TSN should match");
242     }
243 
244     for i in 0..3 {
245         let c = pq.pop_front();
246         assert!(c.is_some(), "should not be none");
247         assert_eq!(i, c.unwrap().tsn, "TSN should match");
248     }
249 
250     pq.push_back(make_data_chunk(3, false, NO_FRAGMENT));
251     pq.push_back(make_data_chunk(4, false, NO_FRAGMENT));
252 
253     for i in 3..5 {
254         let c = pq.pop_front();
255         assert!(c.is_some(), "should not be none");
256         assert_eq!(i, c.unwrap().tsn, "TSN should match");
257     }
258     Ok(())
259 }
260 
261 #[test]
262 fn test_pending_base_queue_out_of_bounce() -> Result<()> {
263     let mut pq = PendingBaseQueue::new();
264     assert!(pq.pop_front().is_none(), "should be none");
265     assert!(pq.get(0).is_none(), "should be none");
266 
267     pq.push_back(make_data_chunk(0, false, NO_FRAGMENT));
268     assert!(pq.get(1).is_none(), "should be none");
269 
270     Ok(())
271 }
272 
273 // NOTE: TSN is not used in pendingQueue in the actual usage.
274 //       Following tests use TSN field as a chunk ID.
275 #[tokio::test]
276 async fn test_pending_queue_push_and_pop() -> Result<()> {
277     let pq = PendingQueue::new();
278     pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
279     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
280     pq.push(make_data_chunk(1, false, NO_FRAGMENT)).await;
281     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
282     pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
283     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
284 
285     for i in 0..3 {
286         let c = pq.peek().await;
287         assert!(c.is_some(), "peek error");
288         let c = c.unwrap();
289         assert_eq!(i, c.tsn, "TSN should match");
290         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
291 
292         let result = pq.pop(beginning_fragment, unordered).await;
293         assert!(result.is_some(), "should not error: {}", i);
294     }
295 
296     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
297 
298     pq.push(make_data_chunk(3, false, NO_FRAGMENT)).await;
299     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
300     pq.push(make_data_chunk(4, false, NO_FRAGMENT)).await;
301     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
302 
303     for i in 3..5 {
304         let c = pq.peek().await;
305         assert!(c.is_some(), "peek error");
306         let c = c.unwrap();
307         assert_eq!(i, c.tsn, "TSN should match");
308         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
309 
310         let result = pq.pop(beginning_fragment, unordered).await;
311         assert!(result.is_some(), "should not error: {}", i);
312     }
313 
314     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
315 
316     Ok(())
317 }
318 
319 #[tokio::test]
320 async fn test_pending_queue_unordered_wins() -> Result<()> {
321     let pq = PendingQueue::new();
322 
323     pq.push(make_data_chunk(0, false, NO_FRAGMENT)).await;
324     assert_eq!(10, pq.get_num_bytes(), "total bytes mismatch");
325     pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
326     assert_eq!(20, pq.get_num_bytes(), "total bytes mismatch");
327     pq.push(make_data_chunk(2, false, NO_FRAGMENT)).await;
328     assert_eq!(30, pq.get_num_bytes(), "total bytes mismatch");
329     pq.push(make_data_chunk(3, true, NO_FRAGMENT)).await;
330     assert_eq!(40, pq.get_num_bytes(), "total bytes mismatch");
331 
332     let c = pq.peek().await;
333     assert!(c.is_some(), "peek error");
334     let c = c.unwrap();
335     assert_eq!(1, c.tsn, "TSN should match");
336     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
337     let result = pq.pop(beginning_fragment, unordered).await;
338     assert!(result.is_some(), "should not error");
339 
340     let c = pq.peek().await;
341     assert!(c.is_some(), "peek error");
342     let c = c.unwrap();
343     assert_eq!(3, c.tsn, "TSN should match");
344     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
345     let result = pq.pop(beginning_fragment, unordered).await;
346     assert!(result.is_some(), "should not error");
347 
348     let c = pq.peek().await;
349     assert!(c.is_some(), "peek error");
350     let c = c.unwrap();
351     assert_eq!(0, c.tsn, "TSN should match");
352     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
353     let result = pq.pop(beginning_fragment, unordered).await;
354     assert!(result.is_some(), "should not error");
355 
356     let c = pq.peek().await;
357     assert!(c.is_some(), "peek error");
358     let c = c.unwrap();
359     assert_eq!(2, c.tsn, "TSN should match");
360     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
361     let result = pq.pop(beginning_fragment, unordered).await;
362     assert!(result.is_some(), "should not error");
363 
364     assert_eq!(0, pq.get_num_bytes(), "total bytes mismatch");
365 
366     Ok(())
367 }
368 
369 #[tokio::test]
370 async fn test_pending_queue_fragments() -> Result<()> {
371     let pq = PendingQueue::new();
372     pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
373     pq.push(make_data_chunk(1, false, FRAG_MIDDLE)).await;
374     pq.push(make_data_chunk(2, false, FRAG_END)).await;
375     pq.push(make_data_chunk(3, true, FRAG_BEGIN)).await;
376     pq.push(make_data_chunk(4, true, FRAG_MIDDLE)).await;
377     pq.push(make_data_chunk(5, true, FRAG_END)).await;
378 
379     let expects = vec![3, 4, 5, 0, 1, 2];
380 
381     for exp in expects {
382         let c = pq.peek().await;
383         assert!(c.is_some(), "peek error");
384         let c = c.unwrap();
385         assert_eq!(exp, c.tsn, "TSN should match");
386         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
387         let result = pq.pop(beginning_fragment, unordered).await;
388         assert!(result.is_some(), "should not error: {}", exp);
389     }
390 
391     Ok(())
392 }
393 
394 // Once decided ordered or unordered, the decision should persist until
395 // it pops a chunk with ending_fragment flags set to true.
396 #[tokio::test]
397 async fn test_pending_queue_selection_persistence() -> Result<()> {
398     let pq = PendingQueue::new();
399     pq.push(make_data_chunk(0, false, FRAG_BEGIN)).await;
400 
401     let c = pq.peek().await;
402     assert!(c.is_some(), "peek error");
403     let c = c.unwrap();
404     assert_eq!(0, c.tsn, "TSN should match");
405     let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
406     let result = pq.pop(beginning_fragment, unordered).await;
407     assert!(result.is_some(), "should not error: {}", 0);
408 
409     pq.push(make_data_chunk(1, true, NO_FRAGMENT)).await;
410     pq.push(make_data_chunk(2, false, FRAG_MIDDLE)).await;
411     pq.push(make_data_chunk(3, false, FRAG_END)).await;
412 
413     let expects = vec![2, 3, 1];
414 
415     for exp in expects {
416         let c = pq.peek().await;
417         assert!(c.is_some(), "peek error");
418         let c = c.unwrap();
419         assert_eq!(exp, c.tsn, "TSN should match");
420         let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);
421         let result = pq.pop(beginning_fragment, unordered).await;
422         assert!(result.is_some(), "should not error: {}", exp);
423     }
424 
425     Ok(())
426 }
427 
428 ///////////////////////////////////////////////////////////////////
429 //reassembly_queue_test
430 ///////////////////////////////////////////////////////////////////
431 use super::reassembly_queue::*;
432 use std::sync::atomic::AtomicUsize;
433 use std::sync::Arc;
434 
435 #[test]
436 fn test_reassembly_queue_ordered_fragments() -> Result<()> {
437     let mut rq = ReassemblyQueue::new(0);
438 
439     let org_ppi = PayloadProtocolIdentifier::Binary;
440 
441     let chunk = ChunkPayloadData {
442         payload_type: org_ppi,
443         beginning_fragment: true,
444         tsn: 1,
445         stream_sequence_number: 0,
446         user_data: Bytes::from_static(b"ABC"),
447         ..Default::default()
448     };
449 
450     let complete = rq.push(chunk);
451     assert!(!complete, "chunk set should not be complete yet");
452     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
453 
454     let chunk = ChunkPayloadData {
455         payload_type: org_ppi,
456         ending_fragment: true,
457         tsn: 2,
458         stream_sequence_number: 0,
459         user_data: Bytes::from_static(b"DEFG"),
460         ..Default::default()
461     };
462 
463     let complete = rq.push(chunk);
464     assert!(complete, "chunk set should be complete");
465     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
466 
467     let mut buf = vec![0u8; 16];
468 
469     let (n, ppi) = rq.read(&mut buf)?;
470     assert_eq!(7, n, "should received 7 bytes");
471     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
472     assert_eq!(ppi, org_ppi, "should have valid ppi");
473     assert_eq!(&buf[..n], b"ABCDEFG", "data should match");
474 
475     Ok(())
476 }
477 
478 #[test]
479 fn test_reassembly_queue_unordered_fragments() -> Result<()> {
480     let mut rq = ReassemblyQueue::new(0);
481 
482     let org_ppi = PayloadProtocolIdentifier::Binary;
483 
484     let chunk = ChunkPayloadData {
485         payload_type: org_ppi,
486         unordered: true,
487         beginning_fragment: true,
488         tsn: 1,
489         stream_sequence_number: 0,
490         user_data: Bytes::from_static(b"ABC"),
491         ..Default::default()
492     };
493 
494     let complete = rq.push(chunk);
495     assert!(!complete, "chunk set should not be complete yet");
496     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
497 
498     let chunk = ChunkPayloadData {
499         payload_type: org_ppi,
500         unordered: true,
501         tsn: 2,
502         stream_sequence_number: 0,
503         user_data: Bytes::from_static(b"DEFG"),
504         ..Default::default()
505     };
506 
507     let complete = rq.push(chunk);
508     assert!(!complete, "chunk set should not be complete yet");
509     assert_eq!(7, rq.get_num_bytes(), "num bytes mismatch");
510 
511     let chunk = ChunkPayloadData {
512         payload_type: org_ppi,
513         unordered: true,
514         ending_fragment: true,
515         tsn: 3,
516         stream_sequence_number: 0,
517         user_data: Bytes::from_static(b"H"),
518         ..Default::default()
519     };
520 
521     let complete = rq.push(chunk);
522     assert!(complete, "chunk set should be complete");
523     assert_eq!(8, rq.get_num_bytes(), "num bytes mismatch");
524 
525     let mut buf = vec![0u8; 16];
526 
527     let (n, ppi) = rq.read(&mut buf)?;
528     assert_eq!(8, n, "should received 8 bytes");
529     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
530     assert_eq!(ppi, org_ppi, "should have valid ppi");
531     assert_eq!(&buf[..n], b"ABCDEFGH", "data should match");
532 
533     Ok(())
534 }
535 
536 #[test]
537 fn test_reassembly_queue_ordered_and_unordered_fragments() -> Result<()> {
538     let mut rq = ReassemblyQueue::new(0);
539     let org_ppi = PayloadProtocolIdentifier::Binary;
540     let chunk = ChunkPayloadData {
541         payload_type: org_ppi,
542         beginning_fragment: true,
543         ending_fragment: true,
544         tsn: 1,
545         stream_sequence_number: 0,
546         user_data: Bytes::from_static(b"ABC"),
547         ..Default::default()
548     };
549 
550     let complete = rq.push(chunk);
551     assert!(complete, "chunk set should be complete");
552     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
553 
554     let chunk = ChunkPayloadData {
555         payload_type: org_ppi,
556         unordered: true,
557         beginning_fragment: true,
558         ending_fragment: true,
559         tsn: 2,
560         stream_sequence_number: 1,
561         user_data: Bytes::from_static(b"DEF"),
562         ..Default::default()
563     };
564 
565     let complete = rq.push(chunk);
566     assert!(complete, "chunk set should be complete");
567     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
568 
569     //
570     // Now we have two complete chunks ready to read in the reassemblyQueue.
571     //
572 
573     let mut buf = vec![0u8; 16];
574 
575     // Should read unordered chunks first
576     let (n, ppi) = rq.read(&mut buf)?;
577     assert_eq!(3, n, "should received 3 bytes");
578     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
579     assert_eq!(ppi, org_ppi, "should have valid ppi");
580     assert_eq!(&buf[..n], b"DEF", "data should match");
581 
582     // Next should read ordered chunks
583     let (n, ppi) = rq.read(&mut buf)?;
584     assert_eq!(3, n, "should received 3 bytes");
585     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
586     assert_eq!(ppi, org_ppi, "should have valid ppi");
587     assert_eq!(&buf[..n], b"ABC", "data should match");
588 
589     Ok(())
590 }
591 
592 #[test]
593 fn test_reassembly_queue_unordered_complete_skips_incomplete() -> Result<()> {
594     let mut rq = ReassemblyQueue::new(0);
595 
596     let org_ppi = PayloadProtocolIdentifier::Binary;
597 
598     let chunk = ChunkPayloadData {
599         payload_type: org_ppi,
600         unordered: true,
601         beginning_fragment: true,
602         tsn: 10,
603         stream_sequence_number: 0,
604         user_data: Bytes::from_static(b"IN"),
605         ..Default::default()
606     };
607 
608     let complete = rq.push(chunk);
609     assert!(!complete, "chunk set should not be complete yet");
610     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
611 
612     let chunk = ChunkPayloadData {
613         payload_type: org_ppi,
614         unordered: true,
615         ending_fragment: true,
616         tsn: 12, // <- incongiguous
617         stream_sequence_number: 1,
618         user_data: Bytes::from_static(b"COMPLETE"),
619         ..Default::default()
620     };
621 
622     let complete = rq.push(chunk);
623     assert!(!complete, "chunk set should not be complete yet");
624     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
625 
626     let chunk = ChunkPayloadData {
627         payload_type: org_ppi,
628         unordered: true,
629         beginning_fragment: true,
630         ending_fragment: true,
631         tsn: 13,
632         stream_sequence_number: 1,
633         user_data: Bytes::from_static(b"GOOD"),
634         ..Default::default()
635     };
636 
637     let complete = rq.push(chunk);
638     assert!(complete, "chunk set should be complete");
639     assert_eq!(14, rq.get_num_bytes(), "num bytes mismatch");
640 
641     //
642     // Now we have two complete chunks ready to read in the reassemblyQueue.
643     //
644 
645     let mut buf = vec![0u8; 16];
646 
647     // Should pick the one that has "GOOD"
648     let (n, ppi) = rq.read(&mut buf)?;
649     assert_eq!(4, n, "should receive 4 bytes");
650     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
651     assert_eq!(ppi, org_ppi, "should have valid ppi");
652     assert_eq!(&buf[..n], b"GOOD", "data should match");
653 
654     Ok(())
655 }
656 
657 #[test]
658 fn test_reassembly_queue_ignores_chunk_with_wrong_si() -> Result<()> {
659     let mut rq = ReassemblyQueue::new(123);
660 
661     let org_ppi = PayloadProtocolIdentifier::Binary;
662 
663     let chunk = ChunkPayloadData {
664         payload_type: org_ppi,
665         stream_identifier: 124,
666         beginning_fragment: true,
667         ending_fragment: true,
668         tsn: 10,
669         stream_sequence_number: 0,
670         user_data: Bytes::from_static(b"IN"),
671         ..Default::default()
672     };
673 
674     let complete = rq.push(chunk);
675     assert!(!complete, "chunk should be ignored");
676     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
677     Ok(())
678 }
679 
680 #[test]
681 fn test_reassembly_queue_ignores_chunk_with_stale_ssn() -> Result<()> {
682     let mut rq = ReassemblyQueue::new(0);
683     rq.next_ssn = 7; // forcibly set expected SSN to 7
684 
685     let org_ppi = PayloadProtocolIdentifier::Binary;
686 
687     let chunk = ChunkPayloadData {
688         payload_type: org_ppi,
689         beginning_fragment: true,
690         ending_fragment: true,
691         tsn: 10,
692         stream_sequence_number: 6, // <-- stale
693         user_data: Bytes::from_static(b"IN"),
694         ..Default::default()
695     };
696 
697     let complete = rq.push(chunk);
698     assert!(!complete, "chunk should not be ignored");
699     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
700 
701     Ok(())
702 }
703 
704 #[test]
705 fn test_reassembly_queue_should_fail_to_read_incomplete_chunk() -> Result<()> {
706     let mut rq = ReassemblyQueue::new(0);
707 
708     let org_ppi = PayloadProtocolIdentifier::Binary;
709 
710     let chunk = ChunkPayloadData {
711         payload_type: org_ppi,
712         beginning_fragment: true,
713         tsn: 123,
714         stream_sequence_number: 0,
715         user_data: Bytes::from_static(b"IN"),
716         ..Default::default()
717     };
718 
719     let complete = rq.push(chunk);
720     assert!(!complete, "the set should not be complete");
721     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
722 
723     let mut buf = vec![0u8; 16];
724     let result = rq.read(&mut buf);
725     assert!(result.is_err(), "read() should not succeed");
726     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
727 
728     Ok(())
729 }
730 
731 #[test]
732 fn test_reassembly_queue_should_fail_to_read_if_the_nex_ssn_is_not_ready() -> Result<()> {
733     let mut rq = ReassemblyQueue::new(0);
734 
735     let org_ppi = PayloadProtocolIdentifier::Binary;
736 
737     let chunk = ChunkPayloadData {
738         payload_type: org_ppi,
739         beginning_fragment: true,
740         ending_fragment: true,
741         tsn: 123,
742         stream_sequence_number: 1,
743         user_data: Bytes::from_static(b"IN"),
744         ..Default::default()
745     };
746 
747     let complete = rq.push(chunk);
748     assert!(complete, "the set should be complete");
749     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
750 
751     let mut buf = vec![0u8; 16];
752     let result = rq.read(&mut buf);
753     assert!(result.is_err(), "read() should not succeed");
754     assert_eq!(2, rq.get_num_bytes(), "num bytes mismatch");
755 
756     Ok(())
757 }
758 
759 #[test]
760 fn test_reassembly_queue_detect_buffer_too_short() -> Result<()> {
761     let mut rq = ReassemblyQueue::new(0);
762 
763     let org_ppi = PayloadProtocolIdentifier::Binary;
764 
765     let chunk = ChunkPayloadData {
766         payload_type: org_ppi,
767         beginning_fragment: true,
768         ending_fragment: true,
769         tsn: 123,
770         stream_sequence_number: 0,
771         user_data: Bytes::from_static(b"0123456789"),
772         ..Default::default()
773     };
774 
775     let complete = rq.push(chunk);
776     assert!(complete, "the set should be complete");
777     assert_eq!(10, rq.get_num_bytes(), "num bytes mismatch");
778 
779     let mut buf = vec![0u8; 8]; // <- passing buffer too short
780     let result = rq.read(&mut buf);
781     assert!(result.is_err(), "read() should not succeed");
782     if let Err(err) = result {
783         assert_eq!(Error::ErrShortBuffer, err, "read() should not succeed");
784     }
785     assert_eq!(0, rq.get_num_bytes(), "num bytes mismatch");
786 
787     Ok(())
788 }
789 
790 #[test]
791 fn test_reassembly_queue_forward_tsn_for_ordered_framents() -> Result<()> {
792     let mut rq = ReassemblyQueue::new(0);
793 
794     let org_ppi = PayloadProtocolIdentifier::Binary;
795 
796     let ssn_complete = 5u16;
797     let ssn_dropped = 6u16;
798 
799     let chunk = ChunkPayloadData {
800         payload_type: org_ppi,
801         beginning_fragment: true,
802         ending_fragment: true,
803         tsn: 10,
804         stream_sequence_number: ssn_complete,
805         user_data: Bytes::from_static(b"123"),
806         ..Default::default()
807     };
808 
809     let complete = rq.push(chunk);
810     assert!(complete, "chunk set should be complete");
811     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
812 
813     let chunk = ChunkPayloadData {
814         payload_type: org_ppi,
815         beginning_fragment: true,
816         tsn: 11,
817         stream_sequence_number: ssn_dropped,
818         user_data: Bytes::from_static(b"ABC"),
819         ..Default::default()
820     };
821 
822     let complete = rq.push(chunk);
823     assert!(!complete, "chunk set should not be complete yet");
824     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
825 
826     let chunk = ChunkPayloadData {
827         payload_type: org_ppi,
828         tsn: 12,
829         stream_sequence_number: ssn_dropped,
830         user_data: Bytes::from_static(b"DEF"),
831         ..Default::default()
832     };
833 
834     let complete = rq.push(chunk);
835     assert!(!complete, "chunk set should not be complete yet");
836     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
837 
838     rq.forward_tsn_for_ordered(ssn_dropped);
839 
840     assert_eq!(1, rq.ordered.len(), "there should be one chunk left");
841     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
842 
843     Ok(())
844 }
845 
846 #[test]
847 fn test_reassembly_queue_forward_tsn_for_unordered_framents() -> Result<()> {
848     let mut rq = ReassemblyQueue::new(0);
849 
850     let org_ppi = PayloadProtocolIdentifier::Binary;
851 
852     let ssn_dropped = 6u16;
853     let ssn_kept = 7u16;
854 
855     let chunk = ChunkPayloadData {
856         payload_type: org_ppi,
857         unordered: true,
858         beginning_fragment: true,
859         tsn: 11,
860         stream_sequence_number: ssn_dropped,
861         user_data: Bytes::from_static(b"ABC"),
862         ..Default::default()
863     };
864 
865     let complete = rq.push(chunk);
866     assert!(!complete, "chunk set should not be complete yet");
867     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
868 
869     let chunk = ChunkPayloadData {
870         payload_type: org_ppi,
871         unordered: true,
872         tsn: 12,
873         stream_sequence_number: ssn_dropped,
874         user_data: Bytes::from_static(b"DEF"),
875         ..Default::default()
876     };
877 
878     let complete = rq.push(chunk);
879     assert!(!complete, "chunk set should not be complete yet");
880     assert_eq!(6, rq.get_num_bytes(), "num bytes mismatch");
881 
882     let chunk = ChunkPayloadData {
883         payload_type: org_ppi,
884         unordered: true,
885         tsn: 14,
886         beginning_fragment: true,
887         stream_sequence_number: ssn_kept,
888         user_data: Bytes::from_static(b"SOS"),
889         ..Default::default()
890     };
891 
892     let complete = rq.push(chunk);
893     assert!(!complete, "chunk set should not be complete yet");
894     assert_eq!(9, rq.get_num_bytes(), "num bytes mismatch");
895 
896     // At this point, there are 3 chunks in the rq.unorderedChunks.
897     // This call should remove chunks with tsn equals to 13 or older.
898     rq.forward_tsn_for_unordered(13);
899 
900     // As a result, there should be one chunk (tsn=14)
901     assert_eq!(
902         1,
903         rq.unordered_chunks.len(),
904         "there should be one chunk kept"
905     );
906     assert_eq!(3, rq.get_num_bytes(), "num bytes mismatch");
907 
908     Ok(())
909 }
910 
911 #[test]
912 fn test_chunk_set_empty_chunk_set() -> Result<()> {
913     let cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
914     assert!(!cset.is_complete(), "empty chunkSet cannot be complete");
915     Ok(())
916 }
917 
918 #[test]
919 fn test_chunk_set_push_dup_chunks_to_chunk_set() -> Result<()> {
920     let mut cset = ChunkSet::new(0, PayloadProtocolIdentifier::default());
921     cset.push(ChunkPayloadData {
922         tsn: 100,
923         beginning_fragment: true,
924         ..Default::default()
925     });
926     let complete = cset.push(ChunkPayloadData {
927         tsn: 100,
928         ending_fragment: true,
929         ..Default::default()
930     });
931     assert!(!complete, "chunk with dup TSN is not complete");
932     assert_eq!(1, cset.chunks.len(), "chunk with dup TSN should be ignored");
933     Ok(())
934 }
935 
936 #[test]
937 fn test_chunk_set_incomplete_chunk_set_no_beginning() -> Result<()> {
938     let cset = ChunkSet {
939         ssn: 0,
940         ppi: PayloadProtocolIdentifier::default(),
941         chunks: vec![],
942     };
943     assert!(
944         !cset.is_complete(),
945         "chunkSet not starting with B=1 cannot be complete"
946     );
947     Ok(())
948 }
949 
950 #[test]
951 fn test_chunk_set_incomplete_chunk_set_no_contiguous_tsn() -> Result<()> {
952     let cset = ChunkSet {
953         ssn: 0,
954         ppi: PayloadProtocolIdentifier::default(),
955         chunks: vec![
956             ChunkPayloadData {
957                 tsn: 100,
958                 beginning_fragment: true,
959                 ..Default::default()
960             },
961             ChunkPayloadData {
962                 tsn: 101,
963                 ..Default::default()
964             },
965             ChunkPayloadData {
966                 tsn: 103,
967                 ending_fragment: true,
968                 ..Default::default()
969             },
970         ],
971     };
972     assert!(
973         !cset.is_complete(),
974         "chunkSet not starting with incontiguous tsn cannot be complete"
975     );
976     Ok(())
977 }
978