1 // Silence warning on `..Default::default()` with no effect:
2 #![allow(clippy::needless_update)]
3
4 use super::*;
5 use crate::mock::mock_stream::MockStream;
6 use crate::stream_info::RTPHeaderExtension;
7 use rtcp::transport_feedbacks::transport_layer_cc::{
8 PacketStatusChunk, RunLengthChunk, StatusChunkTypeTcc, StatusVectorChunk, SymbolSizeTypeTcc,
9 SymbolTypeTcc, TransportLayerCc,
10 };
11 use util::Marshal;
12
13 #[tokio::test]
test_twcc_receiver_interceptor_before_any_packets() -> Result<()>14 async fn test_twcc_receiver_interceptor_before_any_packets() -> Result<()> {
15 let builder = Receiver::builder();
16 let icpr = builder.build("")?;
17
18 let stream = MockStream::new(
19 &StreamInfo {
20 ssrc: 1,
21 rtp_header_extensions: vec![RTPHeaderExtension {
22 uri: TRANSPORT_CC_URI.to_owned(),
23 id: 1,
24 ..Default::default()
25 }],
26 ..Default::default()
27 },
28 icpr,
29 )
30 .await;
31
32 tokio::select! {
33 pkts = stream.written_rtcp() => {
34 assert!(pkts.map(|p| p.is_empty()).unwrap_or(true), "Should not have sent an RTCP packet before receiving the first RTP packets")
35 }
36 _ = tokio::time::sleep(Duration::from_millis(300)) => {
37 // All good
38 }
39 }
40
41 stream.close().await?;
42
43 Ok(())
44 }
45
46 #[tokio::test]
test_twcc_receiver_interceptor_after_rtp_packets() -> Result<()>47 async fn test_twcc_receiver_interceptor_after_rtp_packets() -> Result<()> {
48 let builder = Receiver::builder();
49 let icpr = builder.build("")?;
50
51 let stream = MockStream::new(
52 &StreamInfo {
53 ssrc: 1,
54 rtp_header_extensions: vec![RTPHeaderExtension {
55 uri: TRANSPORT_CC_URI.to_owned(),
56 id: 1,
57 ..Default::default()
58 }],
59 ..Default::default()
60 },
61 icpr,
62 )
63 .await;
64
65 for i in 0..10 {
66 let mut hdr = rtp::header::Header::default();
67 let tcc = TransportCcExtension {
68 transport_sequence: i,
69 }
70 .marshal()?;
71 hdr.set_extension(1, tcc)?;
72 stream
73 .receive_rtp(rtp::packet::Packet {
74 header: hdr,
75 ..Default::default()
76 })
77 .await;
78 }
79
80 let pkts = stream.written_rtcp().await.unwrap();
81 assert_eq!(pkts.len(), 1);
82 if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
83 assert_eq!(cc.media_ssrc, 1);
84 assert_eq!(cc.base_sequence_number, 0);
85 assert_eq!(
86 cc.packet_chunks,
87 vec![PacketStatusChunk::RunLengthChunk(RunLengthChunk {
88 type_tcc: StatusChunkTypeTcc::RunLengthChunk,
89 packet_status_symbol: SymbolTypeTcc::PacketReceivedSmallDelta,
90 run_length: 10,
91 })]
92 );
93 } else {
94 panic!();
95 }
96
97 stream.close().await?;
98
99 Ok(())
100 }
101
102 #[tokio::test(start_paused = true)]
test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -> Result<()>103 async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -> Result<()> {
104 let builder = Receiver::builder().with_interval(Duration::from_millis(500));
105 let icpr = builder.build("")?;
106
107 let stream = MockStream::new(
108 &StreamInfo {
109 ssrc: 1,
110 rtp_header_extensions: vec![RTPHeaderExtension {
111 uri: TRANSPORT_CC_URI.to_owned(),
112 id: 1,
113 ..Default::default()
114 }],
115 ..Default::default()
116 },
117 icpr,
118 )
119 .await;
120
121 let delays = vec![0, 10, 100, 200];
122 for (i, d) in delays.iter().enumerate() {
123 tokio::time::advance(Duration::from_millis(*d)).await;
124
125 let mut hdr = rtp::header::Header::default();
126 let tcc = TransportCcExtension {
127 transport_sequence: i as u16,
128 }
129 .marshal()?;
130
131 hdr.set_extension(1, tcc)?;
132 stream
133 .receive_rtp(rtp::packet::Packet {
134 header: hdr,
135 ..Default::default()
136 })
137 .await;
138
139 // Yield so this packet can be processed
140 tokio::task::yield_now().await;
141 }
142
143 // Force a packet to be generated
144 tokio::time::advance(Duration::from_millis(2001)).await;
145 tokio::task::yield_now().await;
146
147 let pkts = stream.written_rtcp().await.unwrap();
148
149 assert_eq!(pkts.len(), 1);
150 if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
151 assert_eq!(cc.base_sequence_number, 0);
152 assert_eq!(
153 cc.packet_chunks,
154 vec![PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
155 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
156 symbol_size: SymbolSizeTypeTcc::TwoBit,
157 symbol_list: vec![
158 SymbolTypeTcc::PacketReceivedSmallDelta,
159 SymbolTypeTcc::PacketReceivedSmallDelta,
160 SymbolTypeTcc::PacketReceivedLargeDelta,
161 SymbolTypeTcc::PacketReceivedLargeDelta,
162 ],
163 })]
164 );
165 } else {
166 panic!();
167 }
168
169 stream.close().await?;
170
171 Ok(())
172 }
173
174 #[tokio::test(start_paused = true)]
test_twcc_receiver_interceptor_packet_loss() -> Result<()>175 async fn test_twcc_receiver_interceptor_packet_loss() -> Result<()> {
176 let builder = Receiver::builder().with_interval(Duration::from_secs(2));
177 let icpr = builder.build("")?;
178
179 let stream = MockStream::new(
180 &StreamInfo {
181 ssrc: 1,
182 rtp_header_extensions: vec![RTPHeaderExtension {
183 uri: TRANSPORT_CC_URI.to_owned(),
184 id: 1,
185 ..Default::default()
186 }],
187 ..Default::default()
188 },
189 icpr,
190 )
191 .await;
192
193 let sequence_number_to_delay = &[
194 (0, 0),
195 (1, 10),
196 (4, 100),
197 (8, 200),
198 (9, 20),
199 (10, 20),
200 (30, 300),
201 ];
202
203 for (i, d) in sequence_number_to_delay {
204 tokio::time::advance(Duration::from_millis(*d)).await;
205 let mut hdr = rtp::header::Header::default();
206 let tcc = TransportCcExtension {
207 transport_sequence: *i,
208 }
209 .marshal()?;
210 hdr.set_extension(1, tcc)?;
211 stream
212 .receive_rtp(rtp::packet::Packet {
213 header: hdr,
214 ..Default::default()
215 })
216 .await;
217
218 // Yield so this packet can be processed
219 tokio::task::yield_now().await;
220 }
221
222 // Force a packet to be generated
223 tokio::time::advance(Duration::from_millis(2001)).await;
224 tokio::task::yield_now().await;
225
226 let pkts = stream.written_rtcp().await.unwrap();
227
228 assert_eq!(pkts.len(), 1);
229 if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
230 assert_eq!(cc.base_sequence_number, 0);
231 assert_eq!(
232 cc.packet_chunks,
233 vec![
234 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
235 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
236 symbol_size: SymbolSizeTypeTcc::TwoBit,
237 symbol_list: vec![
238 SymbolTypeTcc::PacketReceivedSmallDelta,
239 SymbolTypeTcc::PacketReceivedSmallDelta,
240 SymbolTypeTcc::PacketNotReceived,
241 SymbolTypeTcc::PacketNotReceived,
242 SymbolTypeTcc::PacketReceivedLargeDelta,
243 SymbolTypeTcc::PacketNotReceived,
244 SymbolTypeTcc::PacketNotReceived,
245 ],
246 }),
247 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
248 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
249 symbol_size: SymbolSizeTypeTcc::TwoBit,
250 symbol_list: vec![
251 SymbolTypeTcc::PacketNotReceived,
252 SymbolTypeTcc::PacketReceivedLargeDelta,
253 SymbolTypeTcc::PacketReceivedSmallDelta,
254 SymbolTypeTcc::PacketReceivedSmallDelta,
255 SymbolTypeTcc::PacketNotReceived,
256 SymbolTypeTcc::PacketNotReceived,
257 SymbolTypeTcc::PacketNotReceived,
258 ],
259 }),
260 PacketStatusChunk::RunLengthChunk(RunLengthChunk {
261 type_tcc: StatusChunkTypeTcc::RunLengthChunk,
262 packet_status_symbol: SymbolTypeTcc::PacketNotReceived,
263 run_length: 16,
264 }),
265 PacketStatusChunk::RunLengthChunk(RunLengthChunk {
266 type_tcc: StatusChunkTypeTcc::RunLengthChunk,
267 packet_status_symbol: SymbolTypeTcc::PacketReceivedLargeDelta,
268 run_length: 1,
269 }),
270 ]
271 );
272 } else {
273 panic!();
274 }
275
276 stream.close().await?;
277
278 Ok(())
279 }
280
281 #[tokio::test]
test_twcc_receiver_interceptor_overflow() -> Result<()>282 async fn test_twcc_receiver_interceptor_overflow() -> Result<()> {
283 let builder = Receiver::builder();
284 let icpr = builder.build("")?;
285
286 let stream = MockStream::new(
287 &StreamInfo {
288 ssrc: 1,
289 rtp_header_extensions: vec![RTPHeaderExtension {
290 uri: TRANSPORT_CC_URI.to_owned(),
291 id: 1,
292 ..Default::default()
293 }],
294 ..Default::default()
295 },
296 icpr,
297 )
298 .await;
299
300 for i in [65530, 65534, 65535, 1, 2, 10] {
301 let mut hdr = rtp::header::Header::default();
302 let tcc = TransportCcExtension {
303 transport_sequence: i,
304 }
305 .marshal()?;
306 hdr.set_extension(1, tcc)?;
307 stream
308 .receive_rtp(rtp::packet::Packet {
309 header: hdr,
310 ..Default::default()
311 })
312 .await;
313 }
314
315 let pkts = stream.written_rtcp().await.unwrap();
316 assert_eq!(pkts.len(), 1);
317 if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
318 assert_eq!(cc.base_sequence_number, 65530);
319 assert_eq!(
320 cc.packet_chunks,
321 vec![
322 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
323 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
324 symbol_size: SymbolSizeTypeTcc::OneBit,
325 symbol_list: vec![
326 SymbolTypeTcc::PacketReceivedSmallDelta,
327 SymbolTypeTcc::PacketNotReceived,
328 SymbolTypeTcc::PacketNotReceived,
329 SymbolTypeTcc::PacketNotReceived,
330 SymbolTypeTcc::PacketReceivedSmallDelta,
331 SymbolTypeTcc::PacketReceivedSmallDelta,
332 SymbolTypeTcc::PacketNotReceived,
333 SymbolTypeTcc::PacketReceivedSmallDelta,
334 SymbolTypeTcc::PacketReceivedSmallDelta,
335 SymbolTypeTcc::PacketNotReceived,
336 SymbolTypeTcc::PacketNotReceived,
337 SymbolTypeTcc::PacketNotReceived,
338 SymbolTypeTcc::PacketNotReceived,
339 SymbolTypeTcc::PacketNotReceived,
340 ],
341 }),
342 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
343 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
344 symbol_size: SymbolSizeTypeTcc::TwoBit,
345 symbol_list: vec![
346 SymbolTypeTcc::PacketNotReceived,
347 SymbolTypeTcc::PacketNotReceived,
348 SymbolTypeTcc::PacketReceivedSmallDelta,
349 ],
350 }),
351 ]
352 );
353 } else {
354 panic!();
355 }
356
357 stream.close().await?;
358
359 Ok(())
360 }
361