1 use super::*;
2 use crate::mock::mock_stream::MockStream;
3 use crate::mock::mock_time::MockTime;
4 //use bytes::Bytes;
5 use chrono::prelude::*;
6 use rtp::extension::abs_send_time_extension::unix2ntp;
7
8 #[tokio::test]
test_receiver_interceptor_before_any_packet() -> Result<()>9 async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
10 let mt = Arc::new(MockTime::default());
11 let time_gen = {
12 let mt = Arc::clone(&mt);
13 Arc::new(move || mt.now())
14 };
15
16 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
17 .with_interval(Duration::from_millis(50))
18 .with_now_fn(time_gen)
19 .build("")?;
20
21 let stream = MockStream::new(
22 &StreamInfo {
23 ssrc: 123456,
24 clock_rate: 90000,
25 ..Default::default()
26 },
27 icpr,
28 )
29 .await;
30
31 let pkts = stream.written_rtcp().await.unwrap();
32 assert_eq!(pkts.len(), 1);
33
34 if let Some(rr) = pkts[0]
35 .as_any()
36 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
37 {
38 assert_eq!(rr.reports.len(), 1);
39 assert_eq!(
40 rr.reports[0],
41 rtcp::reception_report::ReceptionReport {
42 ssrc: 123456,
43 last_sequence_number: 0,
44 last_sender_report: 0,
45 fraction_lost: 0,
46 total_lost: 0,
47 delay: 0,
48 jitter: 0,
49 }
50 )
51 } else {
52 panic!();
53 }
54
55 stream.close().await?;
56
57 Ok(())
58 }
59
60 #[tokio::test]
test_receiver_interceptor_after_rtp_packets() -> Result<()>61 async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
62 let mt = Arc::new(MockTime::default());
63 let time_gen = {
64 let mt = Arc::clone(&mt);
65 Arc::new(move || mt.now())
66 };
67
68 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
69 .with_interval(Duration::from_millis(50))
70 .with_now_fn(time_gen)
71 .build("")?;
72
73 let stream = MockStream::new(
74 &StreamInfo {
75 ssrc: 123456,
76 clock_rate: 90000,
77 ..Default::default()
78 },
79 icpr,
80 )
81 .await;
82
83 for i in 0..10u16 {
84 stream
85 .receive_rtp(rtp::packet::Packet {
86 header: rtp::header::Header {
87 sequence_number: i,
88 ..Default::default()
89 },
90 ..Default::default()
91 })
92 .await;
93 }
94
95 let pkts = stream.written_rtcp().await.unwrap();
96 assert_eq!(pkts.len(), 1);
97 if let Some(rr) = pkts[0]
98 .as_any()
99 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
100 {
101 assert_eq!(rr.reports.len(), 1);
102 assert_eq!(
103 rr.reports[0],
104 rtcp::reception_report::ReceptionReport {
105 ssrc: 123456,
106 last_sequence_number: 9,
107 last_sender_report: 0,
108 fraction_lost: 0,
109 total_lost: 0,
110 delay: 0,
111 jitter: 0,
112 }
113 )
114 } else {
115 panic!();
116 }
117
118 stream.close().await?;
119
120 Ok(())
121 }
122
123 #[tokio::test]
test_receiver_interceptor_after_rtp_and_rtcp_packets() -> Result<()>124 async fn test_receiver_interceptor_after_rtp_and_rtcp_packets() -> Result<()> {
125 let rtp_time: SystemTime = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap().into();
126
127 let mt = Arc::new(MockTime::default());
128 let time_gen = {
129 let mt = Arc::clone(&mt);
130 Arc::new(move || mt.now())
131 };
132
133 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
134 .with_interval(Duration::from_millis(50))
135 .with_now_fn(time_gen)
136 .build("")?;
137
138 let stream = MockStream::new(
139 &StreamInfo {
140 ssrc: 123456,
141 clock_rate: 90000,
142 ..Default::default()
143 },
144 icpr,
145 )
146 .await;
147
148 for i in 0..10u16 {
149 stream
150 .receive_rtp(rtp::packet::Packet {
151 header: rtp::header::Header {
152 sequence_number: i,
153 ..Default::default()
154 },
155 ..Default::default()
156 })
157 .await;
158 }
159
160 let now: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into();
161 let rt = 987654321u32.wrapping_add(
162 (now.duration_since(rtp_time)
163 .unwrap_or(Duration::from_secs(0))
164 .as_secs_f64()
165 * 90000.0) as u32,
166 );
167 stream
168 .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
169 ssrc: 123456,
170 ntp_time: unix2ntp(now),
171 rtp_time: rt,
172 packet_count: 10,
173 octet_count: 0,
174 ..Default::default()
175 })])
176 .await;
177
178 let pkts = stream.written_rtcp().await.unwrap();
179 assert_eq!(pkts.len(), 1);
180 if let Some(rr) = pkts[0]
181 .as_any()
182 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
183 {
184 assert_eq!(rr.reports.len(), 1);
185 assert_eq!(
186 rr.reports[0],
187 rtcp::reception_report::ReceptionReport {
188 ssrc: 123456,
189 last_sequence_number: 9,
190 last_sender_report: 1861287936,
191 fraction_lost: 0,
192 total_lost: 0,
193 delay: rr.reports[0].delay,
194 jitter: 0,
195 }
196 )
197 } else {
198 panic!();
199 }
200
201 stream.close().await?;
202
203 Ok(())
204 }
205
206 #[tokio::test]
test_receiver_interceptor_overflow() -> Result<()>207 async fn test_receiver_interceptor_overflow() -> Result<()> {
208 #![allow(clippy::identity_op)]
209
210 let mt = Arc::new(MockTime::default());
211 let _mt2 = Arc::clone(&mt);
212 let time_gen = {
213 let mt = Arc::clone(&mt);
214 Arc::new(move || mt.now())
215 };
216
217 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
218 .with_interval(Duration::from_millis(50))
219 .with_now_fn(time_gen)
220 .build("")?;
221
222 let stream = MockStream::new(
223 &StreamInfo {
224 ssrc: 123456,
225 clock_rate: 90000,
226 ..Default::default()
227 },
228 icpr,
229 )
230 .await;
231
232 stream
233 .receive_rtp(rtp::packet::Packet {
234 header: rtp::header::Header {
235 sequence_number: 0xffff,
236 ..Default::default()
237 },
238 ..Default::default()
239 })
240 .await;
241
242 stream
243 .receive_rtp(rtp::packet::Packet {
244 header: rtp::header::Header {
245 sequence_number: 0,
246 ..Default::default()
247 },
248 ..Default::default()
249 })
250 .await;
251
252 let pkts = stream.written_rtcp().await.unwrap();
253 assert_eq!(pkts.len(), 1);
254 if let Some(rr) = pkts[0]
255 .as_any()
256 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
257 {
258 assert_eq!(rr.reports.len(), 1);
259 assert_eq!(
260 rr.reports[0],
261 rtcp::reception_report::ReceptionReport {
262 ssrc: 123456,
263 last_sequence_number: {
264 // most significant bits: 1 << 16
265 // least significant bits: 0x0000
266 (1 << 16) | 0x0000
267 },
268 last_sender_report: 0,
269 fraction_lost: 0,
270 total_lost: 0,
271 delay: rr.reports[0].delay,
272 jitter: 0,
273 }
274 )
275 } else {
276 panic!();
277 }
278
279 stream.close().await?;
280 Ok(())
281 }
282
283 #[tokio::test]
test_receiver_interceptor_overflow_five_pkts() -> Result<()>284 async fn test_receiver_interceptor_overflow_five_pkts() -> Result<()> {
285 let mt = Arc::new(MockTime::default());
286 let time_gen = {
287 let mt = Arc::clone(&mt);
288 Arc::new(move || mt.now())
289 };
290
291 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
292 .with_interval(Duration::from_millis(50))
293 .with_now_fn(time_gen)
294 .build("")?;
295
296 let stream = MockStream::new(
297 &StreamInfo {
298 ssrc: 123456,
299 clock_rate: 90000,
300 ..Default::default()
301 },
302 icpr,
303 )
304 .await;
305
306 stream
307 .receive_rtp(rtp::packet::Packet {
308 header: rtp::header::Header {
309 sequence_number: 0xfffd,
310 ..Default::default()
311 },
312 ..Default::default()
313 })
314 .await;
315
316 stream
317 .receive_rtp(rtp::packet::Packet {
318 header: rtp::header::Header {
319 sequence_number: 0xfffe,
320 ..Default::default()
321 },
322 ..Default::default()
323 })
324 .await;
325
326 stream
327 .receive_rtp(rtp::packet::Packet {
328 header: rtp::header::Header {
329 sequence_number: 0xffff,
330 ..Default::default()
331 },
332 ..Default::default()
333 })
334 .await;
335
336 stream
337 .receive_rtp(rtp::packet::Packet {
338 header: rtp::header::Header {
339 sequence_number: 0,
340 ..Default::default()
341 },
342 ..Default::default()
343 })
344 .await;
345
346 stream
347 .receive_rtp(rtp::packet::Packet {
348 header: rtp::header::Header {
349 sequence_number: 1,
350 ..Default::default()
351 },
352 ..Default::default()
353 })
354 .await;
355
356 let pkts = stream.written_rtcp().await.unwrap();
357 assert_eq!(pkts.len(), 1);
358 if let Some(rr) = pkts[0]
359 .as_any()
360 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
361 {
362 assert_eq!(rr.reports.len(), 1);
363 assert_eq!(
364 rr.reports[0],
365 rtcp::reception_report::ReceptionReport {
366 ssrc: 123456,
367 last_sequence_number: (1 << 16) | 0x0001,
368 last_sender_report: 0,
369 fraction_lost: 0,
370 total_lost: 0,
371 delay: rr.reports[0].delay,
372 jitter: 0,
373 }
374 )
375 } else {
376 panic!();
377 }
378
379 stream.close().await?;
380 Ok(())
381 }
382
383 #[tokio::test]
test_receiver_interceptor_packet_loss() -> Result<()>384 async fn test_receiver_interceptor_packet_loss() -> Result<()> {
385 let rtp_time: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into();
386
387 let mt = Arc::new(MockTime::default());
388 let time_gen = {
389 let mt = Arc::clone(&mt);
390 Arc::new(move || mt.now())
391 };
392
393 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
394 .with_interval(Duration::from_millis(50))
395 .with_now_fn(time_gen)
396 .build("")?;
397
398 let stream = MockStream::new(
399 &StreamInfo {
400 ssrc: 123456,
401 clock_rate: 90000,
402 ..Default::default()
403 },
404 icpr,
405 )
406 .await;
407
408 stream
409 .receive_rtp(rtp::packet::Packet {
410 header: rtp::header::Header {
411 sequence_number: 0x01,
412 ..Default::default()
413 },
414 ..Default::default()
415 })
416 .await;
417
418 stream
419 .receive_rtp(rtp::packet::Packet {
420 header: rtp::header::Header {
421 sequence_number: 0x03,
422 ..Default::default()
423 },
424 ..Default::default()
425 })
426 .await;
427
428 let pkts = stream.written_rtcp().await.unwrap();
429 assert_eq!(pkts.len(), 1);
430 if let Some(rr) = pkts[0]
431 .as_any()
432 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
433 {
434 assert_eq!(rr.reports.len(), 1);
435 assert_eq!(
436 rr.reports[0],
437 rtcp::reception_report::ReceptionReport {
438 ssrc: 123456,
439 last_sequence_number: 0x03,
440 last_sender_report: 0,
441 fraction_lost: ((1u16 << 8) / 3) as u8,
442 total_lost: 1,
443 delay: 0,
444 jitter: 0,
445 }
446 )
447 } else {
448 panic!();
449 }
450
451 let now: SystemTime = Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into();
452 let rt = 987654321u32.wrapping_add(
453 (now.duration_since(rtp_time)
454 .unwrap_or(Duration::from_secs(0))
455 .as_secs_f64()
456 * 90000.0) as u32,
457 );
458 stream
459 .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
460 ssrc: 123456,
461 ntp_time: unix2ntp(now),
462 rtp_time: rt,
463 packet_count: 10,
464 octet_count: 0,
465 ..Default::default()
466 })])
467 .await;
468
469 let pkts = stream.written_rtcp().await.unwrap();
470 assert_eq!(pkts.len(), 1);
471 if let Some(rr) = pkts[0]
472 .as_any()
473 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
474 {
475 assert_eq!(rr.reports.len(), 1);
476 assert_eq!(
477 rr.reports[0],
478 rtcp::reception_report::ReceptionReport {
479 ssrc: 123456,
480 last_sequence_number: 0x03,
481 last_sender_report: 1861287936,
482 fraction_lost: 0,
483 total_lost: 1,
484 delay: rr.reports[0].delay,
485 jitter: 0,
486 }
487 )
488 } else {
489 panic!();
490 }
491
492 stream.close().await?;
493 Ok(())
494 }
495
496 #[tokio::test]
test_receiver_interceptor_overflow_and_packet_loss() -> Result<()>497 async fn test_receiver_interceptor_overflow_and_packet_loss() -> Result<()> {
498 let mt = Arc::new(MockTime::default());
499 let time_gen = {
500 let mt = Arc::clone(&mt);
501 Arc::new(move || mt.now())
502 };
503
504 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
505 .with_interval(Duration::from_millis(50))
506 .with_now_fn(time_gen)
507 .build("")?;
508
509 let stream = MockStream::new(
510 &StreamInfo {
511 ssrc: 123456,
512 clock_rate: 90000,
513 ..Default::default()
514 },
515 icpr,
516 )
517 .await;
518
519 stream
520 .receive_rtp(rtp::packet::Packet {
521 header: rtp::header::Header {
522 sequence_number: 0xffff,
523 ..Default::default()
524 },
525 ..Default::default()
526 })
527 .await;
528
529 stream
530 .receive_rtp(rtp::packet::Packet {
531 header: rtp::header::Header {
532 sequence_number: 0x01,
533 ..Default::default()
534 },
535 ..Default::default()
536 })
537 .await;
538
539 let pkts = stream.written_rtcp().await.unwrap();
540 assert_eq!(pkts.len(), 1);
541 if let Some(rr) = pkts[0]
542 .as_any()
543 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
544 {
545 assert_eq!(rr.reports.len(), 1);
546 assert_eq!(
547 rr.reports[0],
548 rtcp::reception_report::ReceptionReport {
549 ssrc: 123456,
550 last_sequence_number: 1 << 16 | 0x01,
551 last_sender_report: 0,
552 fraction_lost: ((1u16 << 8) / 3) as u8,
553 total_lost: 1,
554 delay: 0,
555 jitter: 0,
556 }
557 )
558 } else {
559 panic!();
560 }
561
562 stream.close().await?;
563 Ok(())
564 }
565
566 #[tokio::test]
test_receiver_interceptor_reordered_packets() -> Result<()>567 async fn test_receiver_interceptor_reordered_packets() -> Result<()> {
568 let mt = Arc::new(MockTime::default());
569 let time_gen = {
570 let mt = Arc::clone(&mt);
571 Arc::new(move || mt.now())
572 };
573
574 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
575 .with_interval(Duration::from_millis(50))
576 .with_now_fn(time_gen)
577 .build("")?;
578
579 let stream = MockStream::new(
580 &StreamInfo {
581 ssrc: 123456,
582 clock_rate: 90000,
583 ..Default::default()
584 },
585 icpr,
586 )
587 .await;
588
589 for sequence_number in [0x01, 0x03, 0x02, 0x04] {
590 stream
591 .receive_rtp(rtp::packet::Packet {
592 header: rtp::header::Header {
593 sequence_number,
594 ..Default::default()
595 },
596 ..Default::default()
597 })
598 .await;
599 }
600
601 let pkts = stream.written_rtcp().await.unwrap();
602 assert_eq!(pkts.len(), 1);
603 if let Some(rr) = pkts[0]
604 .as_any()
605 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
606 {
607 assert_eq!(rr.reports.len(), 1);
608 assert_eq!(
609 rr.reports[0],
610 rtcp::reception_report::ReceptionReport {
611 ssrc: 123456,
612 last_sequence_number: 0x04,
613 last_sender_report: 0,
614 fraction_lost: 0,
615 total_lost: 0,
616 delay: 0,
617 jitter: 0,
618 }
619 )
620 } else {
621 panic!();
622 }
623
624 stream.close().await?;
625 Ok(())
626 }
627
628 #[tokio::test(start_paused = true)]
test_receiver_interceptor_jitter() -> Result<()>629 async fn test_receiver_interceptor_jitter() -> Result<()> {
630 let mt = Arc::new(MockTime::default());
631 let time_gen = {
632 let mt = Arc::clone(&mt);
633 Arc::new(move || mt.now())
634 };
635
636 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
637 .with_interval(Duration::from_millis(50))
638 .with_now_fn(time_gen)
639 .build("")?;
640
641 let stream = MockStream::new(
642 &StreamInfo {
643 ssrc: 123456,
644 clock_rate: 90000,
645 ..Default::default()
646 },
647 icpr,
648 )
649 .await;
650
651 mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into());
652 stream
653 .receive_rtp(rtp::packet::Packet {
654 header: rtp::header::Header {
655 sequence_number: 0x01,
656 timestamp: 42378934,
657 ..Default::default()
658 },
659 ..Default::default()
660 })
661 .await;
662 stream.read_rtp().await;
663
664 mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into());
665 stream
666 .receive_rtp(rtp::packet::Packet {
667 header: rtp::header::Header {
668 sequence_number: 0x02,
669 timestamp: 42378934 + 60000,
670 ..Default::default()
671 },
672 ..Default::default()
673 })
674 .await;
675
676 // Advance the time to generate a report
677 tokio::time::advance(Duration::from_millis(60)).await;
678 // Yield to let the reporting task run
679 tokio::task::yield_now().await;
680
681 let pkts = stream.last_written_rtcp().await.unwrap();
682 assert_eq!(pkts.len(), 1);
683 if let Some(rr) = pkts[0]
684 .as_any()
685 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
686 {
687 assert_eq!(rr.reports.len(), 1);
688 assert_eq!(
689 rr.reports[0],
690 rtcp::reception_report::ReceptionReport {
691 ssrc: 123456,
692 last_sequence_number: 0x02,
693 last_sender_report: 0,
694 fraction_lost: 0,
695 total_lost: 0,
696 delay: 0,
697 jitter: 30000 / 16,
698 }
699 )
700 } else {
701 panic!();
702 }
703
704 stream.close().await?;
705 Ok(())
706 }
707
708 #[tokio::test]
test_receiver_interceptor_delay() -> Result<()>709 async fn test_receiver_interceptor_delay() -> Result<()> {
710 let mt = Arc::new(MockTime::default());
711 let time_gen = {
712 let mt = Arc::clone(&mt);
713 Arc::new(move || mt.now())
714 };
715
716 let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
717 .with_interval(Duration::from_millis(50))
718 .with_now_fn(time_gen)
719 .build("")?;
720
721 let stream = MockStream::new(
722 &StreamInfo {
723 ssrc: 123456,
724 clock_rate: 90000,
725 ..Default::default()
726 },
727 icpr,
728 )
729 .await;
730
731 mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into());
732 stream
733 .receive_rtcp(vec![Box::new(rtcp::sender_report::SenderReport {
734 ssrc: 123456,
735 ntp_time: unix2ntp(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 0).unwrap().into()),
736 rtp_time: 987654321,
737 packet_count: 0,
738 octet_count: 0,
739 ..Default::default()
740 })])
741 .await;
742 stream.read_rtcp().await;
743
744 mt.set_now(Utc.with_ymd_and_hms(2009, 11, 10, 23, 0, 1).unwrap().into());
745
746 let pkts = stream.written_rtcp().await.unwrap();
747 assert_eq!(pkts.len(), 1);
748 if let Some(rr) = pkts[0]
749 .as_any()
750 .downcast_ref::<rtcp::receiver_report::ReceiverReport>()
751 {
752 assert_eq!(rr.reports.len(), 1);
753 assert_eq!(
754 rr.reports[0],
755 rtcp::reception_report::ReceptionReport {
756 ssrc: 123456,
757 last_sequence_number: 0,
758 last_sender_report: 1861222400,
759 fraction_lost: 0,
760 total_lost: 0,
761 delay: 65536,
762 jitter: 0,
763 }
764 )
765 } else {
766 panic!();
767 }
768
769 stream.close().await?;
770 Ok(())
771 }
772