1 use super::*;
2 use crate::mock::mock_stream::MockStream;
3 use crate::mock::mock_time::MockTime;
4 //use bytes::Bytes;
5 use chrono::prelude::*;
6 use rtp::extension::abs_send_time_extension::unix2ntp;
7 
8 #[tokio::test]
test_receiver_interceptor_before_any_packet() -> Result<()>9 async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
10     let mt = Arc::new(MockTime::default());
11     let time_gen = {
12         let mt = Arc::clone(&mt);
13         Arc::new(move || mt.now())
14     };
15 
16     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
17         .with_interval(Duration::from_millis(50))
18         .with_now_fn(time_gen)
19         .build("")?;
20 
21     let stream = MockStream::new(
22         &StreamInfo {
23             ssrc: 123456,
24             clock_rate: 90000,
25             ..Default::default()
26         },
27         icpr,
28     )
29     .await;
30 
31     let pkts = stream.written_rtcp().await.unwrap();
32     assert_eq!(pkts.len(), 1);
33 
34     if let Some(rr) = pkts[0]
35         .as_any()
36         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
37     {
38         assert_eq!(rr.reports.len(), 1);
39         assert_eq!(
40             rr.reports[0],
41             rtcp::reception_report::ReceptionReport {
42                 ssrc: 123456,
43                 last_sequence_number: 0,
44                 last_sender_report: 0,
45                 fraction_lost: 0,
46                 total_lost: 0,
47                 delay: 0,
48                 jitter: 0,
49             }
50         )
51     } else {
52         panic!();
53     }
54 
55     stream.close().await?;
56 
57     Ok(())
58 }
59 
60 #[tokio::test]
test_receiver_interceptor_after_rtp_packets() -> Result<()>61 async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
62     let mt = Arc::new(MockTime::default());
63     let time_gen = {
64         let mt = Arc::clone(&mt);
65         Arc::new(move || mt.now())
66     };
67 
68     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
69         .with_interval(Duration::from_millis(50))
70         .with_now_fn(time_gen)
71         .build("")?;
72 
73     let stream = MockStream::new(
74         &StreamInfo {
75             ssrc: 123456,
76             clock_rate: 90000,
77             ..Default::default()
78         },
79         icpr,
80     )
81     .await;
82 
83     for i in 0..10u16 {
84         stream
85             .receive_rtp(rtp::packet::Packet {
86                 header: rtp::header::Header {
87                     sequence_number: i,
88                     ..Default::default()
89                 },
90                 ..Default::default()
91             })
92             .await;
93     }
94 
95     let pkts = stream.written_rtcp().await.unwrap();
96     assert_eq!(pkts.len(), 1);
97     if let Some(rr) = pkts[0]
98         .as_any()
99         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
100     {
101         assert_eq!(rr.reports.len(), 1);
102         assert_eq!(
103             rr.reports[0],
104             rtcp::reception_report::ReceptionReport {
105                 ssrc: 123456,
106                 last_sequence_number: 9,
107                 last_sender_report: 0,
108                 fraction_lost: 0,
109                 total_lost: 0,
110                 delay: 0,
111                 jitter: 0,
112             }
113         )
114     } else {
115         panic!();
116     }
117 
118     stream.close().await?;
119 
120     Ok(())
121 }
122 
123 #[tokio::test]
test_receiver_interceptor_after_rtp_and_rtcp_packets() -> Result<()>124 async fn test_receiver_interceptor_after_rtp_and_rtcp_packets() -> Result<()> {
125     let rtp_time: SystemTime = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap().into();
126 
127     let mt = Arc::new(MockTime::default());
128     let time_gen = {
129         let mt = Arc::clone(&mt);
130         Arc::new(move || mt.now())
131     };
132 
133     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
134         .with_interval(Duration::from_millis(50))
135         .with_now_fn(time_gen)
136         .build("")?;
137 
138     let stream = MockStream::new(
139         &StreamInfo {
140             ssrc: 123456,
141             clock_rate: 90000,
142             ..Default::default()
143         },
144         icpr,
145     )
146     .await;
147 
148     for i in 0..10u16 {
149         stream
150             .receive_rtp(rtp::packet::Packet {
151                 header: rtp::header::Header {
152                     sequence_number: i,
153                     ..Default::default()
154                 },
155                 ..Default::default()
156             })
157             .await;
158     }
159 
160     let now: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into();
161     let rt = 987654321u32.wrapping_add(
162         (now.duration_since(rtp_time)
163             .unwrap_or(Duration::from_secs(0))
164             .as_secs_f64()
165             * 90000.0) as u32,
166     );
167     stream
168         .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
169             ssrc: 123456,
170             ntp_time: unix2ntp(now),
171             rtp_time: rt,
172             packet_count: 10,
173             octet_count: 0,
174             ..Default::default()
175         })])
176         .await;
177 
178     let pkts = stream.written_rtcp().await.unwrap();
179     assert_eq!(pkts.len(), 1);
180     if let Some(rr) = pkts[0]
181         .as_any()
182         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
183     {
184         assert_eq!(rr.reports.len(), 1);
185         assert_eq!(
186             rr.reports[0],
187             rtcp::reception_report::ReceptionReport {
188                 ssrc: 123456,
189                 last_sequence_number: 9,
190                 last_sender_report: 1861287936,
191                 fraction_lost: 0,
192                 total_lost: 0,
193                 delay: rr.reports[0].delay,
194                 jitter: 0,
195             }
196         )
197     } else {
198         panic!();
199     }
200 
201     stream.close().await?;
202 
203     Ok(())
204 }
205 
206 #[tokio::test]
test_receiver_interceptor_overflow() -> Result<()>207 async fn test_receiver_interceptor_overflow() -> Result<()> {
208     #![allow(clippy::identity_op)]
209 
210     let mt = Arc::new(MockTime::default());
211     let _mt2 = Arc::clone(&mt);
212     let time_gen = {
213         let mt = Arc::clone(&mt);
214         Arc::new(move || mt.now())
215     };
216 
217     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
218         .with_interval(Duration::from_millis(50))
219         .with_now_fn(time_gen)
220         .build("")?;
221 
222     let stream = MockStream::new(
223         &StreamInfo {
224             ssrc: 123456,
225             clock_rate: 90000,
226             ..Default::default()
227         },
228         icpr,
229     )
230     .await;
231 
232     stream
233         .receive_rtp(rtp::packet::Packet {
234             header: rtp::header::Header {
235                 sequence_number: 0xffff,
236                 ..Default::default()
237             },
238             ..Default::default()
239         })
240         .await;
241 
242     stream
243         .receive_rtp(rtp::packet::Packet {
244             header: rtp::header::Header {
245                 sequence_number: 0,
246                 ..Default::default()
247             },
248             ..Default::default()
249         })
250         .await;
251 
252     let pkts = stream.written_rtcp().await.unwrap();
253     assert_eq!(pkts.len(), 1);
254     if let Some(rr) = pkts[0]
255         .as_any()
256         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
257     {
258         assert_eq!(rr.reports.len(), 1);
259         assert_eq!(
260             rr.reports[0],
261             rtcp::reception_report::ReceptionReport {
262                 ssrc: 123456,
263                 last_sequence_number: {
264                     // most significant bits: 1 << 16
265                     // least significant bits: 0x0000
266                     (1 << 16) | 0x0000
267                 },
268                 last_sender_report: 0,
269                 fraction_lost: 0,
270                 total_lost: 0,
271                 delay: rr.reports[0].delay,
272                 jitter: 0,
273             }
274         )
275     } else {
276         panic!();
277     }
278 
279     stream.close().await?;
280     Ok(())
281 }
282 
283 #[tokio::test]
test_receiver_interceptor_overflow_five_pkts() -> Result<()>284 async fn test_receiver_interceptor_overflow_five_pkts() -> Result<()> {
285     let mt = Arc::new(MockTime::default());
286     let time_gen = {
287         let mt = Arc::clone(&mt);
288         Arc::new(move || mt.now())
289     };
290 
291     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
292         .with_interval(Duration::from_millis(50))
293         .with_now_fn(time_gen)
294         .build("")?;
295 
296     let stream = MockStream::new(
297         &StreamInfo {
298             ssrc: 123456,
299             clock_rate: 90000,
300             ..Default::default()
301         },
302         icpr,
303     )
304     .await;
305 
306     stream
307         .receive_rtp(rtp::packet::Packet {
308             header: rtp::header::Header {
309                 sequence_number: 0xfffd,
310                 ..Default::default()
311             },
312             ..Default::default()
313         })
314         .await;
315 
316     stream
317         .receive_rtp(rtp::packet::Packet {
318             header: rtp::header::Header {
319                 sequence_number: 0xfffe,
320                 ..Default::default()
321             },
322             ..Default::default()
323         })
324         .await;
325 
326     stream
327         .receive_rtp(rtp::packet::Packet {
328             header: rtp::header::Header {
329                 sequence_number: 0xffff,
330                 ..Default::default()
331             },
332             ..Default::default()
333         })
334         .await;
335 
336     stream
337         .receive_rtp(rtp::packet::Packet {
338             header: rtp::header::Header {
339                 sequence_number: 0,
340                 ..Default::default()
341             },
342             ..Default::default()
343         })
344         .await;
345 
346     stream
347         .receive_rtp(rtp::packet::Packet {
348             header: rtp::header::Header {
349                 sequence_number: 1,
350                 ..Default::default()
351             },
352             ..Default::default()
353         })
354         .await;
355 
356     let pkts = stream.written_rtcp().await.unwrap();
357     assert_eq!(pkts.len(), 1);
358     if let Some(rr) = pkts[0]
359         .as_any()
360         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
361     {
362         assert_eq!(rr.reports.len(), 1);
363         assert_eq!(
364             rr.reports[0],
365             rtcp::reception_report::ReceptionReport {
366                 ssrc: 123456,
367                 last_sequence_number: (1 << 16) | 0x0001,
368                 last_sender_report: 0,
369                 fraction_lost: 0,
370                 total_lost: 0,
371                 delay: rr.reports[0].delay,
372                 jitter: 0,
373             }
374         )
375     } else {
376         panic!();
377     }
378 
379     stream.close().await?;
380     Ok(())
381 }
382 
383 #[tokio::test]
test_receiver_interceptor_packet_loss() -> Result<()>384 async fn test_receiver_interceptor_packet_loss() -> Result<()> {
385     let rtp_time: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into();
386 
387     let mt = Arc::new(MockTime::default());
388     let time_gen = {
389         let mt = Arc::clone(&mt);
390         Arc::new(move || mt.now())
391     };
392 
393     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
394         .with_interval(Duration::from_millis(50))
395         .with_now_fn(time_gen)
396         .build("")?;
397 
398     let stream = MockStream::new(
399         &StreamInfo {
400             ssrc: 123456,
401             clock_rate: 90000,
402             ..Default::default()
403         },
404         icpr,
405     )
406     .await;
407 
408     stream
409         .receive_rtp(rtp::packet::Packet {
410             header: rtp::header::Header {
411                 sequence_number: 0x01,
412                 ..Default::default()
413             },
414             ..Default::default()
415         })
416         .await;
417 
418     stream
419         .receive_rtp(rtp::packet::Packet {
420             header: rtp::header::Header {
421                 sequence_number: 0x03,
422                 ..Default::default()
423             },
424             ..Default::default()
425         })
426         .await;
427 
428     let pkts = stream.written_rtcp().await.unwrap();
429     assert_eq!(pkts.len(), 1);
430     if let Some(rr) = pkts[0]
431         .as_any()
432         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
433     {
434         assert_eq!(rr.reports.len(), 1);
435         assert_eq!(
436             rr.reports[0],
437             rtcp::reception_report::ReceptionReport {
438                 ssrc: 123456,
439                 last_sequence_number: 0x03,
440                 last_sender_report: 0,
441                 fraction_lost: ((1u16 << 8) / 3) as u8,
442                 total_lost: 1,
443                 delay: 0,
444                 jitter: 0,
445             }
446         )
447     } else {
448         panic!();
449     }
450 
451     let now: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into();
452     let rt = 987654321u32.wrapping_add(
453         (now.duration_since(rtp_time)
454             .unwrap_or(Duration::from_secs(0))
455             .as_secs_f64()
456             * 90000.0) as u32,
457     );
458     stream
459         .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
460             ssrc: 123456,
461             ntp_time: unix2ntp(now),
462             rtp_time: rt,
463             packet_count: 10,
464             octet_count: 0,
465             ..Default::default()
466         })])
467         .await;
468 
469     let pkts = stream.written_rtcp().await.unwrap();
470     assert_eq!(pkts.len(), 1);
471     if let Some(rr) = pkts[0]
472         .as_any()
473         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
474     {
475         assert_eq!(rr.reports.len(), 1);
476         assert_eq!(
477             rr.reports[0],
478             rtcp::reception_report::ReceptionReport {
479                 ssrc: 123456,
480                 last_sequence_number: 0x03,
481                 last_sender_report: 1861287936,
482                 fraction_lost: 0,
483                 total_lost: 1,
484                 delay: rr.reports[0].delay,
485                 jitter: 0,
486             }
487         )
488     } else {
489         panic!();
490     }
491 
492     stream.close().await?;
493     Ok(())
494 }
495 
496 #[tokio::test]
test_receiver_interceptor_overflow_and_packet_loss() -> Result<()>497 async fn test_receiver_interceptor_overflow_and_packet_loss() -> Result<()> {
498     let mt = Arc::new(MockTime::default());
499     let time_gen = {
500         let mt = Arc::clone(&mt);
501         Arc::new(move || mt.now())
502     };
503 
504     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
505         .with_interval(Duration::from_millis(50))
506         .with_now_fn(time_gen)
507         .build("")?;
508 
509     let stream = MockStream::new(
510         &StreamInfo {
511             ssrc: 123456,
512             clock_rate: 90000,
513             ..Default::default()
514         },
515         icpr,
516     )
517     .await;
518 
519     stream
520         .receive_rtp(rtp::packet::Packet {
521             header: rtp::header::Header {
522                 sequence_number: 0xffff,
523                 ..Default::default()
524             },
525             ..Default::default()
526         })
527         .await;
528 
529     stream
530         .receive_rtp(rtp::packet::Packet {
531             header: rtp::header::Header {
532                 sequence_number: 0x01,
533                 ..Default::default()
534             },
535             ..Default::default()
536         })
537         .await;
538 
539     let pkts = stream.written_rtcp().await.unwrap();
540     assert_eq!(pkts.len(), 1);
541     if let Some(rr) = pkts[0]
542         .as_any()
543         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
544     {
545         assert_eq!(rr.reports.len(), 1);
546         assert_eq!(
547             rr.reports[0],
548             rtcp::reception_report::ReceptionReport {
549                 ssrc: 123456,
550                 last_sequence_number: 1 << 16 | 0x01,
551                 last_sender_report: 0,
552                 fraction_lost: ((1u16 << 8) / 3) as u8,
553                 total_lost: 1,
554                 delay: 0,
555                 jitter: 0,
556             }
557         )
558     } else {
559         panic!();
560     }
561 
562     stream.close().await?;
563     Ok(())
564 }
565 
566 #[tokio::test]
test_receiver_interceptor_reordered_packets() -> Result<()>567 async fn test_receiver_interceptor_reordered_packets() -> Result<()> {
568     let mt = Arc::new(MockTime::default());
569     let time_gen = {
570         let mt = Arc::clone(&mt);
571         Arc::new(move || mt.now())
572     };
573 
574     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
575         .with_interval(Duration::from_millis(50))
576         .with_now_fn(time_gen)
577         .build("")?;
578 
579     let stream = MockStream::new(
580         &StreamInfo {
581             ssrc: 123456,
582             clock_rate: 90000,
583             ..Default::default()
584         },
585         icpr,
586     )
587     .await;
588 
589     for sequence_number in [0x01, 0x03, 0x02, 0x04] {
590         stream
591             .receive_rtp(rtp::packet::Packet {
592                 header: rtp::header::Header {
593                     sequence_number,
594                     ..Default::default()
595                 },
596                 ..Default::default()
597             })
598             .await;
599     }
600 
601     let pkts = stream.written_rtcp().await.unwrap();
602     assert_eq!(pkts.len(), 1);
603     if let Some(rr) = pkts[0]
604         .as_any()
605         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
606     {
607         assert_eq!(rr.reports.len(), 1);
608         assert_eq!(
609             rr.reports[0],
610             rtcp::reception_report::ReceptionReport {
611                 ssrc: 123456,
612                 last_sequence_number: 0x04,
613                 last_sender_report: 0,
614                 fraction_lost: 0,
615                 total_lost: 0,
616                 delay: 0,
617                 jitter: 0,
618             }
619         )
620     } else {
621         panic!();
622     }
623 
624     stream.close().await?;
625     Ok(())
626 }
627 
628 #[tokio::test(start_paused = true)]
test_receiver_interceptor_jitter() -> Result<()>629 async fn test_receiver_interceptor_jitter() -> Result<()> {
630     let mt = Arc::new(MockTime::default());
631     let time_gen = {
632         let mt = Arc::clone(&mt);
633         Arc::new(move || mt.now())
634     };
635 
636     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
637         .with_interval(Duration::from_millis(50))
638         .with_now_fn(time_gen)
639         .build("")?;
640 
641     let stream = MockStream::new(
642         &StreamInfo {
643             ssrc: 123456,
644             clock_rate: 90000,
645             ..Default::default()
646         },
647         icpr,
648     )
649     .await;
650 
651     mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into());
652     stream
653         .receive_rtp(rtp::packet::Packet {
654             header: rtp::header::Header {
655                 sequence_number: 0x01,
656                 timestamp: 42378934,
657                 ..Default::default()
658             },
659             ..Default::default()
660         })
661         .await;
662     stream.read_rtp().await;
663 
664     mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into());
665     stream
666         .receive_rtp(rtp::packet::Packet {
667             header: rtp::header::Header {
668                 sequence_number: 0x02,
669                 timestamp: 42378934 + 60000,
670                 ..Default::default()
671             },
672             ..Default::default()
673         })
674         .await;
675 
676     // Advance the time to generate a report
677     tokio::time::advance(Duration::from_millis(60)).await;
678     // Yield to let the reporting task run
679     tokio::task::yield_now().await;
680 
681     let pkts = stream.last_written_rtcp().await.unwrap();
682     assert_eq!(pkts.len(), 1);
683     if let Some(rr) = pkts[0]
684         .as_any()
685         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
686     {
687         assert_eq!(rr.reports.len(), 1);
688         assert_eq!(
689             rr.reports[0],
690             rtcp::reception_report::ReceptionReport {
691                 ssrc: 123456,
692                 last_sequence_number: 0x02,
693                 last_sender_report: 0,
694                 fraction_lost: 0,
695                 total_lost: 0,
696                 delay: 0,
697                 jitter: 30000 / 16,
698             }
699         )
700     } else {
701         panic!();
702     }
703 
704     stream.close().await?;
705     Ok(())
706 }
707 
708 #[tokio::test]
test_receiver_interceptor_delay() -> Result<()>709 async fn test_receiver_interceptor_delay() -> Result<()> {
710     let mt = Arc::new(MockTime::default());
711     let time_gen = {
712         let mt = Arc::clone(&mt);
713         Arc::new(move || mt.now())
714     };
715 
716     let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
717         .with_interval(Duration::from_millis(50))
718         .with_now_fn(time_gen)
719         .build("")?;
720 
721     let stream = MockStream::new(
722         &StreamInfo {
723             ssrc: 123456,
724             clock_rate: 90000,
725             ..Default::default()
726         },
727         icpr,
728     )
729     .await;
730 
731     mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into());
732     stream
733         .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
734             ssrc: 123456,
735             ntp_time: unix2ntp(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into()),
736             rtp_time: 987654321,
737             packet_count: 0,
738             octet_count: 0,
739             ..Default::default()
740         })])
741         .await;
742     stream.read_rtcp().await;
743 
744     mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into());
745 
746     let pkts = stream.written_rtcp().await.unwrap();
747     assert_eq!(pkts.len(), 1);
748     if let Some(rr) = pkts[0]
749         .as_any()
750         .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
751     {
752         assert_eq!(rr.reports.len(), 1);
753         assert_eq!(
754             rr.reports[0],
755             rtcp::reception_report::ReceptionReport {
756                 ssrc: 123456,
757                 last_sequence_number: 0,
758                 last_sender_report: 1861222400,
759                 fraction_lost: 0,
760                 total_lost: 0,
761                 delay: 65536,
762                 jitter: 0,
763             }
764         )
765     } else {
766         panic!();
767     }
768 
769     stream.close().await?;
770     Ok(())
771 }
772