1 use super::*;
2 use crate::mock::mock_stream::MockStream;
3 use crate::mock::mock_time::MockTime;
4 use bytes::Bytes;
5 use chrono::prelude::*;
6 use rtp::extension::abs_send_time_extension::unix2ntp;
7
8 #[tokio::test]
test_sender_interceptor_before_any_packet() -> Result<()>9 async fn test_sender_interceptor_before_any_packet() -> Result<()> {
10 let mt = Arc::new(MockTime::default());
11 let time_gen = {
12 let mt = Arc::clone(&mt);
13 Arc::new(move || mt.now())
14 };
15
16 let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
17 .with_interval(Duration::from_millis(50))
18 .with_now_fn(time_gen)
19 .build("")?;
20
21 let stream = MockStream::new(
22 &StreamInfo {
23 ssrc: 123456,
24 clock_rate: 90000,
25 ..Default::default()
26 },
27 icpr,
28 )
29 .await;
30
31 let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
32 mt.set_now(dt.into());
33
34 let pkts = stream.written_rtcp().await.unwrap();
35 assert_eq!(pkts.len(), 1);
36 if let Some(sr) = pkts[0]
37 .as_any()
38 .downcast_ref::<rtcp::sender_report::SenderReport>()
39 {
40 assert_eq!(
41 sr,
42 &rtcp::sender_report::SenderReport {
43 ssrc: 123456,
44 ntp_time: unix2ntp(mt.now()),
45 rtp_time: 4294967295, // pion: 2269117121,
46 packet_count: 0,
47 octet_count: 0,
48 ..Default::default()
49 }
50 )
51 } else {
52 panic!();
53 }
54
55 stream.close().await?;
56
57 Ok(())
58 }
59
60 #[tokio::test]
test_sender_interceptor_after_rtp_packets() -> Result<()>61 async fn test_sender_interceptor_after_rtp_packets() -> Result<()> {
62 let mt = Arc::new(MockTime::default());
63 let time_gen = {
64 let mt = Arc::clone(&mt);
65 Arc::new(move || mt.now())
66 };
67
68 let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
69 .with_interval(Duration::from_millis(50))
70 .with_now_fn(time_gen)
71 .build("")?;
72
73 let stream = MockStream::new(
74 &StreamInfo {
75 ssrc: 123456,
76 clock_rate: 90000,
77 ..Default::default()
78 },
79 icpr,
80 )
81 .await;
82
83 for i in 0..10u16 {
84 stream
85 .write_rtp(&rtp::packet::Packet {
86 header: rtp::header::Header {
87 sequence_number: i,
88 ..Default::default()
89 },
90 payload: Bytes::from_static(b"\x00\x00"),
91 })
92 .await?;
93 }
94
95 let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
96 mt.set_now(dt.into());
97
98 let pkts = stream.written_rtcp().await.unwrap();
99 assert_eq!(pkts.len(), 1);
100 if let Some(sr) = pkts[0]
101 .as_any()
102 .downcast_ref::<rtcp::sender_report::SenderReport>()
103 {
104 assert_eq!(
105 sr,
106 &rtcp::sender_report::SenderReport {
107 ssrc: 123456,
108 ntp_time: unix2ntp(mt.now()),
109 rtp_time: 4294967295, // pion: 2269117121,
110 packet_count: 10,
111 octet_count: 20,
112 ..Default::default()
113 }
114 )
115 } else {
116 panic!();
117 }
118
119 stream.close().await?;
120
121 Ok(())
122 }
123
124 #[tokio::test]
test_sender_interceptor_after_rtp_packets_overflow() -> Result<()>125 async fn test_sender_interceptor_after_rtp_packets_overflow() -> Result<()> {
126 let mt = Arc::new(MockTime::default());
127 let time_gen = {
128 let mt = Arc::clone(&mt);
129 Arc::new(move || mt.now())
130 };
131
132 let icpr: Arc<dyn Interceptor + Send + Sync> = SenderReport::builder()
133 .with_interval(Duration::from_millis(50))
134 .with_now_fn(time_gen)
135 .build("")?;
136
137 let stream = MockStream::new(
138 &StreamInfo {
139 ssrc: 123456,
140 clock_rate: 90000,
141 ..Default::default()
142 },
143 icpr,
144 )
145 .await;
146
147 stream
148 .write_rtp(&rtp::packet::Packet {
149 header: rtp::header::Header {
150 sequence_number: 0xfffd,
151 ..Default::default()
152 },
153 payload: Bytes::from_static(b"\x00\x00"),
154 })
155 .await?;
156
157 stream
158 .write_rtp(&rtp::packet::Packet {
159 header: rtp::header::Header {
160 sequence_number: 0xfffe,
161 ..Default::default()
162 },
163 payload: Bytes::from_static(b"\x00\x00"),
164 })
165 .await?;
166
167 stream
168 .write_rtp(&rtp::packet::Packet {
169 header: rtp::header::Header {
170 sequence_number: 0xffff,
171 ..Default::default()
172 },
173 payload: Bytes::from_static(b"\x00\x00"),
174 })
175 .await?;
176
177 stream
178 .write_rtp(&rtp::packet::Packet {
179 header: rtp::header::Header {
180 sequence_number: 0,
181 ..Default::default()
182 },
183 payload: Bytes::from_static(b"\x00\x00"),
184 })
185 .await?;
186
187 stream
188 .write_rtp(&rtp::packet::Packet {
189 header: rtp::header::Header {
190 sequence_number: 1,
191 ..Default::default()
192 },
193 payload: Bytes::from_static(b"\x00\x00"),
194 })
195 .await?;
196
197 let dt = Utc.with_ymd_and_hms(2009, 10, 23, 0, 0, 0).unwrap();
198 mt.set_now(dt.into());
199
200 let pkts = stream.written_rtcp().await.unwrap();
201 assert_eq!(pkts.len(), 1);
202 if let Some(sr) = pkts[0]
203 .as_any()
204 .downcast_ref::<rtcp::sender_report::SenderReport>()
205 {
206 assert_eq!(
207 sr,
208 &rtcp::sender_report::SenderReport {
209 ssrc: 123456,
210 ntp_time: unix2ntp(mt.now()),
211 rtp_time: 4294967295, // pion: 2269117121,
212 packet_count: 5,
213 octet_count: 10,
214 ..Default::default()
215 }
216 )
217 } else {
218 panic!();
219 }
220
221 stream.close().await?;
222
223 Ok(())
224 }
225
226 #[tokio::test]
test_stream_counters_initially_zero() -> Result<()>227 async fn test_stream_counters_initially_zero() -> Result<()> {
228 let counters = sender_stream::Counters::default();
229 assert_eq!(counters.octet_count(), 0);
230 assert_eq!(counters.packet_count(), 0);
231 Ok(())
232 }
233
234 #[tokio::test]
test_stream_packet_counter_wraps_on_overflow() -> Result<()>235 async fn test_stream_packet_counter_wraps_on_overflow() -> Result<()> {
236 let mut counters = sender_stream::Counters::mock(u32::MAX, 0);
237 for _ in 0..3 {
238 counters.increment_packets();
239 }
240 assert_eq!(counters.packet_count(), 2);
241 Ok(())
242 }
243
244 #[tokio::test]
test_stream_octet_counter_wraps_on_overflow() -> Result<()>245 async fn test_stream_octet_counter_wraps_on_overflow() -> Result<()> {
246 let mut counters = sender_stream::Counters::default();
247 counters.count_octets(u32::MAX as usize);
248 counters.count_octets(3);
249 assert_eq!(counters.octet_count(), 2);
250 Ok(())
251 }
252
253 #[tokio::test]
test_stream_octet_counter_saturates_u32_from_usize() -> Result<()>254 async fn test_stream_octet_counter_saturates_u32_from_usize() -> Result<()> {
255 let mut counters = sender_stream::Counters::default();
256 counters.count_octets(0xabcdef01234567_usize);
257 assert_eq!(counters.octet_count(), 0xffffffff_u32);
258 Ok(())
259 }
260