1 use super::*;
2 use crate::mock::mock_stream::MockStream;
3 use crate::mock::mock_time::MockTime;
4 use bytes::Bytes;
5 use chrono::prelude::*;
6 use rtp::extension::abs_send_time_extension::unix2ntp;
7 
8 #[tokio::test]
test_sender_interceptor_before_any_packet() -> Result<()>9 async fn test_sender_interceptor_before_any_packet() -> Result<()> {
10     let mt = Arc::new(MockTime::default());
11     let time_gen = {
12         let mt = Arc::clone(&mt);
13         Arc::new(move || mt.now())
14     };
15 
16     let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
17         .with_interval(Duration::from_millis(50))
18         .with_now_fn(time_gen)
19         .build("")?;
20 
21     let stream = MockStream::new(
22         &StreamInfo {
23             ssrc: 123456,
24             clock_rate: 90000,
25             ..Default::default()
26         },
27         icpr,
28     )
29     .await;
30 
31     let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
32     mt.set_now(dt.into());
33 
34     let pkts = stream.written_rtcp().await.unwrap();
35     assert_eq!(pkts.len(), 1);
36     if let Some(sr) = pkts[0]
37         .as_any()
38         .downcast_ref::<rtcp::sender_report::SenderReport>()
39     {
40         assert_eq!(
41             sr,
42             &rtcp::sender_report::SenderReport {
43                 ssrc: 123456,
44                 ntp_time: unix2ntp(mt.now()),
45                 rtp_time: 4294967295, // pion: 2269117121,
46                 packet_count: 0,
47                 octet_count: 0,
48                 ..Default::default()
49             }
50         )
51     } else {
52         panic!();
53     }
54 
55     stream.close().await?;
56 
57     Ok(())
58 }
59 
60 #[tokio::test]
test_sender_interceptor_after_rtp_packets() -> Result<()>61 async fn test_sender_interceptor_after_rtp_packets() -> Result<()> {
62     let mt = Arc::new(MockTime::default());
63     let time_gen = {
64         let mt = Arc::clone(&mt);
65         Arc::new(move || mt.now())
66     };
67 
68     let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
69         .with_interval(Duration::from_millis(50))
70         .with_now_fn(time_gen)
71         .build("")?;
72 
73     let stream = MockStream::new(
74         &StreamInfo {
75             ssrc: 123456,
76             clock_rate: 90000,
77             ..Default::default()
78         },
79         icpr,
80     )
81     .await;
82 
83     for i in 0..10u16 {
84         stream
85             .write_rtp(&rtp::packet::Packet {
86                 header: rtp::header::Header {
87                     sequence_number: i,
88                     ..Default::default()
89                 },
90                 payload: Bytes::from_static(b"\x00\x00"),
91             })
92             .await?;
93     }
94 
95     let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
96     mt.set_now(dt.into());
97 
98     let pkts = stream.written_rtcp().await.unwrap();
99     assert_eq!(pkts.len(), 1);
100     if let Some(sr) = pkts[0]
101         .as_any()
102         .downcast_ref::<rtcp::sender_report::SenderReport>()
103     {
104         assert_eq!(
105             sr,
106             &rtcp::sender_report::SenderReport {
107                 ssrc: 123456,
108                 ntp_time: unix2ntp(mt.now()),
109                 rtp_time: 4294967295, // pion: 2269117121,
110                 packet_count: 10,
111                 octet_count: 20,
112                 ..Default::default()
113             }
114         )
115     } else {
116         panic!();
117     }
118 
119     stream.close().await?;
120 
121     Ok(())
122 }
123 
124 #[tokio::test]
test_sender_interceptor_after_rtp_packets_overflow() -> Result<()>125 async fn test_sender_interceptor_after_rtp_packets_overflow() -> Result<()> {
126     let mt = Arc::new(MockTime::default());
127     let time_gen = {
128         let mt = Arc::clone(&mt);
129         Arc::new(move || mt.now())
130     };
131 
132     let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
133         .with_interval(Duration::from_millis(50))
134         .with_now_fn(time_gen)
135         .build("")?;
136 
137     let stream = MockStream::new(
138         &StreamInfo {
139             ssrc: 123456,
140             clock_rate: 90000,
141             ..Default::default()
142         },
143         icpr,
144     )
145     .await;
146 
147     stream
148         .write_rtp(&rtp::packet::Packet {
149             header: rtp::header::Header {
150                 sequence_number: 0xfffd,
151                 ..Default::default()
152             },
153             payload: Bytes::from_static(b"\x00\x00"),
154         })
155         .await?;
156 
157     stream
158         .write_rtp(&rtp::packet::Packet {
159             header: rtp::header::Header {
160                 sequence_number: 0xfffe,
161                 ..Default::default()
162             },
163             payload: Bytes::from_static(b"\x00\x00"),
164         })
165         .await?;
166 
167     stream
168         .write_rtp(&rtp::packet::Packet {
169             header: rtp::header::Header {
170                 sequence_number: 0xffff,
171                 ..Default::default()
172             },
173             payload: Bytes::from_static(b"\x00\x00"),
174         })
175         .await?;
176 
177     stream
178         .write_rtp(&rtp::packet::Packet {
179             header: rtp::header::Header {
180                 sequence_number: 0,
181                 ..Default::default()
182             },
183             payload: Bytes::from_static(b"\x00\x00"),
184         })
185         .await?;
186 
187     stream
188         .write_rtp(&rtp::packet::Packet {
189             header: rtp::header::Header {
190                 sequence_number: 1,
191                 ..Default::default()
192             },
193             payload: Bytes::from_static(b"\x00\x00"),
194         })
195         .await?;
196 
197     let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
198     mt.set_now(dt.into());
199 
200     let pkts = stream.written_rtcp().await.unwrap();
201     assert_eq!(pkts.len(), 1);
202     if let Some(sr) = pkts[0]
203         .as_any()
204         .downcast_ref::<rtcp::sender_report::SenderReport>()
205     {
206         assert_eq!(
207             sr,
208             &rtcp::sender_report::SenderReport {
209                 ssrc: 123456,
210                 ntp_time: unix2ntp(mt.now()),
211                 rtp_time: 4294967295, // pion: 2269117121,
212                 packet_count: 5,
213                 octet_count: 10,
214                 ..Default::default()
215             }
216         )
217     } else {
218         panic!();
219     }
220 
221     stream.close().await?;
222 
223     Ok(())
224 }
225 
226 #[tokio::test]
test_stream_counters_initially_zero() -> Result<()>227 async fn test_stream_counters_initially_zero() -> Result<()> {
228     let counters = sender_stream::Counters::default();
229     assert_eq!(counters.octet_count(), 0);
230     assert_eq!(counters.packet_count(), 0);
231     Ok(())
232 }
233 
234 #[tokio::test]
test_stream_packet_counter_wraps_on_overflow() -> Result<()>235 async fn test_stream_packet_counter_wraps_on_overflow() -> Result<()> {
236     let mut counters = sender_stream::Counters::mock(u32::MAX, 0);
237     for _ in 0..3 {
238         counters.increment_packets();
239     }
240     assert_eq!(counters.packet_count(), 2);
241     Ok(())
242 }
243 
244 #[tokio::test]
test_stream_octet_counter_wraps_on_overflow() -> Result<()>245 async fn test_stream_octet_counter_wraps_on_overflow() -> Result<()> {
246     let mut counters = sender_stream::Counters::default();
247     counters.count_octets(u32::MAX as usize);
248     counters.count_octets(3);
249     assert_eq!(counters.octet_count(), 2);
250     Ok(())
251 }
252 
253 #[tokio::test]
test_stream_octet_counter_saturates_u32_from_usize() -> Result<()>254 async fn test_stream_octet_counter_saturates_u32_from_usize() -> Result<()> {
255     let mut counters = sender_stream::Counters::default();
256     counters.count_octets(0xabcdef01234567_usize);
257     assert_eq!(counters.octet_count(), 0xffffffff_u32);
258     Ok(())
259 }
260