1 // Silence warning on `..Default::default()` with no effect:
2 #![allow(clippy::needless_update)]
3 
4 use super::*;
5 use crate::mock::mock_stream::MockStream;
6 use crate::stream_info::RTPHeaderExtension;
7 use rtcp::transport_feedbacks::transport_layer_cc::{
8     PacketStatusChunk, RunLengthChunk, StatusChunkTypeTcc, StatusVectorChunk, SymbolSizeTypeTcc,
9     SymbolTypeTcc, TransportLayerCc,
10 };
11 use util::Marshal;
12 
13 #[tokio::test]
test_twcc_receiver_interceptor_before_any_packets() -> Result<()>14 async fn test_twcc_receiver_interceptor_before_any_packets() -> Result<()> {
15     let builder = Receiver::builder();
16     let icpr = builder.build("")?;
17 
18     let stream = MockStream::new(
19         &StreamInfo {
20             ssrc: 1,
21             rtp_header_extensions: vec![RTPHeaderExtension {
22                 uri: TRANSPORT_CC_URI.to_owned(),
23                 id: 1,
24                 ..Default::default()
25             }],
26             ..Default::default()
27         },
28         icpr,
29     )
30     .await;
31 
32     tokio::select! {
33         pkts = stream.written_rtcp() => {
34             assert!(pkts.map(|p| p.is_empty()).unwrap_or(true), "Should not have sent an RTCP packet before receiving the first RTP packets")
35         }
36         _ = tokio::time::sleep(Duration::from_millis(300)) => {
37             // All good
38         }
39     }
40 
41     stream.close().await?;
42 
43     Ok(())
44 }
45 
46 #[tokio::test]
test_twcc_receiver_interceptor_after_rtp_packets() -> Result<()>47 async fn test_twcc_receiver_interceptor_after_rtp_packets() -> Result<()> {
48     let builder = Receiver::builder();
49     let icpr = builder.build("")?;
50 
51     let stream = MockStream::new(
52         &StreamInfo {
53             ssrc: 1,
54             rtp_header_extensions: vec![RTPHeaderExtension {
55                 uri: TRANSPORT_CC_URI.to_owned(),
56                 id: 1,
57                 ..Default::default()
58             }],
59             ..Default::default()
60         },
61         icpr,
62     )
63     .await;
64 
65     for i in 0..10 {
66         let mut hdr = rtp::header::Header::default();
67         let tcc = TransportCcExtension {
68             transport_sequence: i,
69         }
70         .marshal()?;
71         hdr.set_extension(1, tcc)?;
72         stream
73             .receive_rtp(rtp::packet::Packet {
74                 header: hdr,
75                 ..Default::default()
76             })
77             .await;
78     }
79 
80     let pkts = stream.written_rtcp().await.unwrap();
81     assert_eq!(pkts.len(), 1);
82     if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
83         assert_eq!(cc.media_ssrc, 1);
84         assert_eq!(cc.base_sequence_number, 0);
85         assert_eq!(
86             cc.packet_chunks,
87             vec![PacketStatusChunk::RunLengthChunk(RunLengthChunk {
88                 type_tcc: StatusChunkTypeTcc::RunLengthChunk,
89                 packet_status_symbol: SymbolTypeTcc::PacketReceivedSmallDelta,
90                 run_length: 10,
91             })]
92         );
93     } else {
94         panic!();
95     }
96 
97     stream.close().await?;
98 
99     Ok(())
100 }
101 
102 #[tokio::test(start_paused = true)]
test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -> Result<()>103 async fn test_twcc_receiver_interceptor_different_delays_between_rtp_packets() -> Result<()> {
104     let builder = Receiver::builder().with_interval(Duration::from_millis(500));
105     let icpr = builder.build("")?;
106 
107     let stream = MockStream::new(
108         &StreamInfo {
109             ssrc: 1,
110             rtp_header_extensions: vec![RTPHeaderExtension {
111                 uri: TRANSPORT_CC_URI.to_owned(),
112                 id: 1,
113                 ..Default::default()
114             }],
115             ..Default::default()
116         },
117         icpr,
118     )
119     .await;
120 
121     let delays = vec![0, 10, 100, 200];
122     for (i, d) in delays.iter().enumerate() {
123         tokio::time::advance(Duration::from_millis(*d)).await;
124 
125         let mut hdr = rtp::header::Header::default();
126         let tcc = TransportCcExtension {
127             transport_sequence: i as u16,
128         }
129         .marshal()?;
130 
131         hdr.set_extension(1, tcc)?;
132         stream
133             .receive_rtp(rtp::packet::Packet {
134                 header: hdr,
135                 ..Default::default()
136             })
137             .await;
138 
139         // Yield so this packet can be processed
140         tokio::task::yield_now().await;
141     }
142 
143     // Force a packet to be generated
144     tokio::time::advance(Duration::from_millis(2001)).await;
145     tokio::task::yield_now().await;
146 
147     let pkts = stream.written_rtcp().await.unwrap();
148 
149     assert_eq!(pkts.len(), 1);
150     if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
151         assert_eq!(cc.base_sequence_number, 0);
152         assert_eq!(
153             cc.packet_chunks,
154             vec![PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
155                 type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
156                 symbol_size: SymbolSizeTypeTcc::TwoBit,
157                 symbol_list: vec![
158                     SymbolTypeTcc::PacketReceivedSmallDelta,
159                     SymbolTypeTcc::PacketReceivedSmallDelta,
160                     SymbolTypeTcc::PacketReceivedLargeDelta,
161                     SymbolTypeTcc::PacketReceivedLargeDelta,
162                 ],
163             })]
164         );
165     } else {
166         panic!();
167     }
168 
169     stream.close().await?;
170 
171     Ok(())
172 }
173 
174 #[tokio::test(start_paused = true)]
test_twcc_receiver_interceptor_packet_loss() -> Result<()>175 async fn test_twcc_receiver_interceptor_packet_loss() -> Result<()> {
176     let builder = Receiver::builder().with_interval(Duration::from_secs(2));
177     let icpr = builder.build("")?;
178 
179     let stream = MockStream::new(
180         &StreamInfo {
181             ssrc: 1,
182             rtp_header_extensions: vec![RTPHeaderExtension {
183                 uri: TRANSPORT_CC_URI.to_owned(),
184                 id: 1,
185                 ..Default::default()
186             }],
187             ..Default::default()
188         },
189         icpr,
190     )
191     .await;
192 
193     let sequence_number_to_delay = &[
194         (0, 0),
195         (1, 10),
196         (4, 100),
197         (8, 200),
198         (9, 20),
199         (10, 20),
200         (30, 300),
201     ];
202 
203     for (i, d) in sequence_number_to_delay {
204         tokio::time::advance(Duration::from_millis(*d)).await;
205         let mut hdr = rtp::header::Header::default();
206         let tcc = TransportCcExtension {
207             transport_sequence: *i,
208         }
209         .marshal()?;
210         hdr.set_extension(1, tcc)?;
211         stream
212             .receive_rtp(rtp::packet::Packet {
213                 header: hdr,
214                 ..Default::default()
215             })
216             .await;
217 
218         // Yield so this packet can be processed
219         tokio::task::yield_now().await;
220     }
221 
222     // Force a packet to be generated
223     tokio::time::advance(Duration::from_millis(2001)).await;
224     tokio::task::yield_now().await;
225 
226     let pkts = stream.written_rtcp().await.unwrap();
227 
228     assert_eq!(pkts.len(), 1);
229     if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
230         assert_eq!(cc.base_sequence_number, 0);
231         assert_eq!(
232             cc.packet_chunks,
233             vec![
234                 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
235                     type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
236                     symbol_size: SymbolSizeTypeTcc::TwoBit,
237                     symbol_list: vec![
238                         SymbolTypeTcc::PacketReceivedSmallDelta,
239                         SymbolTypeTcc::PacketReceivedSmallDelta,
240                         SymbolTypeTcc::PacketNotReceived,
241                         SymbolTypeTcc::PacketNotReceived,
242                         SymbolTypeTcc::PacketReceivedLargeDelta,
243                         SymbolTypeTcc::PacketNotReceived,
244                         SymbolTypeTcc::PacketNotReceived,
245                     ],
246                 }),
247                 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
248                     type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
249                     symbol_size: SymbolSizeTypeTcc::TwoBit,
250                     symbol_list: vec![
251                         SymbolTypeTcc::PacketNotReceived,
252                         SymbolTypeTcc::PacketReceivedLargeDelta,
253                         SymbolTypeTcc::PacketReceivedSmallDelta,
254                         SymbolTypeTcc::PacketReceivedSmallDelta,
255                         SymbolTypeTcc::PacketNotReceived,
256                         SymbolTypeTcc::PacketNotReceived,
257                         SymbolTypeTcc::PacketNotReceived,
258                     ],
259                 }),
260                 PacketStatusChunk::RunLengthChunk(RunLengthChunk {
261                     type_tcc: StatusChunkTypeTcc::RunLengthChunk,
262                     packet_status_symbol: SymbolTypeTcc::PacketNotReceived,
263                     run_length: 16,
264                 }),
265                 PacketStatusChunk::RunLengthChunk(RunLengthChunk {
266                     type_tcc: StatusChunkTypeTcc::RunLengthChunk,
267                     packet_status_symbol: SymbolTypeTcc::PacketReceivedLargeDelta,
268                     run_length: 1,
269                 }),
270             ]
271         );
272     } else {
273         panic!();
274     }
275 
276     stream.close().await?;
277 
278     Ok(())
279 }
280 
281 #[tokio::test]
test_twcc_receiver_interceptor_overflow() -> Result<()>282 async fn test_twcc_receiver_interceptor_overflow() -> Result<()> {
283     let builder = Receiver::builder();
284     let icpr = builder.build("")?;
285 
286     let stream = MockStream::new(
287         &StreamInfo {
288             ssrc: 1,
289             rtp_header_extensions: vec![RTPHeaderExtension {
290                 uri: TRANSPORT_CC_URI.to_owned(),
291                 id: 1,
292                 ..Default::default()
293             }],
294             ..Default::default()
295         },
296         icpr,
297     )
298     .await;
299 
300     for i in [65530, 65534, 65535, 1, 2, 10] {
301         let mut hdr = rtp::header::Header::default();
302         let tcc = TransportCcExtension {
303             transport_sequence: i,
304         }
305         .marshal()?;
306         hdr.set_extension(1, tcc)?;
307         stream
308             .receive_rtp(rtp::packet::Packet {
309                 header: hdr,
310                 ..Default::default()
311             })
312             .await;
313     }
314 
315     let pkts = stream.written_rtcp().await.unwrap();
316     assert_eq!(pkts.len(), 1);
317     if let Some(cc) = pkts[0].as_any().downcast_ref::<TransportLayerCc>() {
318         assert_eq!(cc.base_sequence_number, 65530);
319         assert_eq!(
320             cc.packet_chunks,
321             vec![
322                 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
323                     type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
324                     symbol_size: SymbolSizeTypeTcc::OneBit,
325                     symbol_list: vec![
326                         SymbolTypeTcc::PacketReceivedSmallDelta,
327                         SymbolTypeTcc::PacketNotReceived,
328                         SymbolTypeTcc::PacketNotReceived,
329                         SymbolTypeTcc::PacketNotReceived,
330                         SymbolTypeTcc::PacketReceivedSmallDelta,
331                         SymbolTypeTcc::PacketReceivedSmallDelta,
332                         SymbolTypeTcc::PacketNotReceived,
333                         SymbolTypeTcc::PacketReceivedSmallDelta,
334                         SymbolTypeTcc::PacketReceivedSmallDelta,
335                         SymbolTypeTcc::PacketNotReceived,
336                         SymbolTypeTcc::PacketNotReceived,
337                         SymbolTypeTcc::PacketNotReceived,
338                         SymbolTypeTcc::PacketNotReceived,
339                         SymbolTypeTcc::PacketNotReceived,
340                     ],
341                 }),
342                 PacketStatusChunk::StatusVectorChunk(StatusVectorChunk {
343                     type_tcc: StatusChunkTypeTcc::StatusVectorChunk,
344                     symbol_size: SymbolSizeTypeTcc::TwoBit,
345                     symbol_list: vec![
346                         SymbolTypeTcc::PacketNotReceived,
347                         SymbolTypeTcc::PacketNotReceived,
348                         SymbolTypeTcc::PacketReceivedSmallDelta,
349                     ],
350                 }),
351             ]
352         );
353     } else {
354         panic!();
355     }
356 
357     stream.close().await?;
358 
359     Ok(())
360 }
361