xref: /webrtc/interceptor/src/mock/mock_stream.rs (revision 98f581dd)
1 use crate::error::{Error, Result};
2 use crate::stream_info::StreamInfo;
3 use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter};
4 
5 use async_trait::async_trait;
6 use std::sync::Arc;
7 use tokio::sync::{mpsc, Mutex};
8 use util::{Marshal, Unmarshal};
9 
10 type RTCPPackets = Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>;
11 
12 /// MockStream is a helper struct for testing interceptors.
13 pub struct MockStream {
14     interceptor: Arc<dyn Interceptor + Send + Sync>,
15 
16     rtcp_writer: Mutex<Option<Arc<dyn RTCPWriter + Send + Sync>>>,
17     rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>,
18 
19     rtcp_out_modified_tx: mpsc::Sender<RTCPPackets>,
20     rtp_out_modified_tx: mpsc::Sender<rtp::packet::Packet>,
21     rtcp_in_rx: Mutex<mpsc::Receiver<RTCPPackets>>,
22     rtp_in_rx: Mutex<mpsc::Receiver<rtp::packet::Packet>>,
23 
24     rtcp_out_modified_rx: Mutex<mpsc::Receiver<RTCPPackets>>,
25     rtp_out_modified_rx: Mutex<mpsc::Receiver<rtp::packet::Packet>>,
26     rtcp_in_tx: Mutex<Option<mpsc::Sender<RTCPPackets>>>,
27     rtp_in_tx: Mutex<Option<mpsc::Sender<rtp::packet::Packet>>>,
28 
29     rtcp_in_modified_rx: Mutex<mpsc::Receiver<Result<RTCPPackets>>>,
30     rtp_in_modified_rx: Mutex<mpsc::Receiver<Result<rtp::packet::Packet>>>,
31 }
32 
33 impl MockStream {
34     /// new creates a new MockStream
new( info: &StreamInfo, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Arc<Self>35     pub async fn new(
36         info: &StreamInfo,
37         interceptor: Arc<dyn Interceptor + Send + Sync>,
38     ) -> Arc<Self> {
39         let (rtcp_in_tx, rtcp_in_rx) = mpsc::channel(1000);
40         let (rtp_in_tx, rtp_in_rx) = mpsc::channel(1000);
41         let (rtcp_out_modified_tx, rtcp_out_modified_rx) = mpsc::channel(1000);
42         let (rtp_out_modified_tx, rtp_out_modified_rx) = mpsc::channel(1000);
43         let (rtcp_in_modified_tx, rtcp_in_modified_rx) = mpsc::channel(1000);
44         let (rtp_in_modified_tx, rtp_in_modified_rx) = mpsc::channel(1000);
45 
46         let stream = Arc::new(MockStream {
47             interceptor: Arc::clone(&interceptor),
48 
49             rtcp_writer: Mutex::new(None),
50             rtp_writer: Mutex::new(None),
51 
52             rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)),
53             rtp_in_tx: Mutex::new(Some(rtp_in_tx)),
54             rtcp_in_rx: Mutex::new(rtcp_in_rx),
55             rtp_in_rx: Mutex::new(rtp_in_rx),
56 
57             rtcp_out_modified_tx,
58             rtp_out_modified_tx,
59             rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx),
60             rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx),
61 
62             rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx),
63             rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx),
64         });
65 
66         let rtcp_writer = interceptor
67             .bind_rtcp_writer(Arc::clone(&stream) as Arc<dyn RTCPWriter + Send + Sync>)
68             .await;
69         {
70             let mut rw = stream.rtcp_writer.lock().await;
71             *rw = Some(rtcp_writer);
72         }
73         let rtp_writer = interceptor
74             .bind_local_stream(
75                 info,
76                 Arc::clone(&stream) as Arc<dyn RTPWriter + Send + Sync>,
77             )
78             .await;
79         {
80             let mut rw = stream.rtp_writer.lock().await;
81             *rw = Some(rtp_writer);
82         }
83 
84         let rtcp_reader = interceptor
85             .bind_rtcp_reader(Arc::clone(&stream) as Arc<dyn RTCPReader + Send + Sync>)
86             .await;
87         tokio::spawn(async move {
88             let mut buf = vec![0u8; 1500];
89             let a = Attributes::new();
90             loop {
91                 let n = match rtcp_reader.read(&mut buf, &a).await {
92                     Ok((n, _)) => n,
93                     Err(err) => {
94                         if Error::ErrIoEOF != err {
95                             let _ = rtcp_in_modified_tx.send(Err(err)).await;
96                         }
97                         break;
98                     }
99                 };
100 
101                 let mut b = &buf[..n];
102                 let pkt = match rtcp::packet::unmarshal(&mut b) {
103                     Ok(pkt) => pkt,
104                     Err(err) => {
105                         let _ = rtcp_in_modified_tx.send(Err(err.into())).await;
106                         break;
107                     }
108                 };
109 
110                 let _ = rtcp_in_modified_tx.send(Ok(pkt)).await;
111             }
112         });
113 
114         let rtp_reader = interceptor
115             .bind_remote_stream(
116                 info,
117                 Arc::clone(&stream) as Arc<dyn RTPReader + Send + Sync>,
118             )
119             .await;
120         tokio::spawn(async move {
121             let mut buf = vec![0u8; 1500];
122             let a = Attributes::new();
123             loop {
124                 let n = match rtp_reader.read(&mut buf, &a).await {
125                     Ok((n, _)) => n,
126                     Err(err) => {
127                         if Error::ErrIoEOF != err {
128                             let _ = rtp_in_modified_tx.send(Err(err)).await;
129                         }
130                         break;
131                     }
132                 };
133 
134                 let mut b = &buf[..n];
135                 let pkt = match rtp::packet::Packet::unmarshal(&mut b) {
136                     Ok(pkt) => pkt,
137                     Err(err) => {
138                         let _ = rtp_in_modified_tx.send(Err(err.into())).await;
139                         break;
140                     }
141                 };
142 
143                 let _ = rtp_in_modified_tx.send(Ok(pkt)).await;
144             }
145         });
146 
147         stream
148     }
149 
150     /// write_rtcp writes a batch of rtcp packet to the stream, using the interceptor
write_rtcp( &self, pkt: &[Box<dyn rtcp::packet::Packet + Send + Sync>], ) -> Result<usize>151     pub async fn write_rtcp(
152         &self,
153         pkt: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
154     ) -> Result<usize> {
155         let a = Attributes::new();
156         let rtcp_writer = self.rtcp_writer.lock().await;
157         if let Some(writer) = &*rtcp_writer {
158             writer.write(pkt, &a).await
159         } else {
160             Err(Error::Other("invalid rtcp_writer".to_owned()))
161         }
162     }
163 
164     /// write_rtp writes an rtp packet to the stream, using the interceptor
write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize>165     pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> {
166         let a = Attributes::new();
167         let rtp_writer = self.rtp_writer.lock().await;
168         if let Some(writer) = &*rtp_writer {
169             writer.write(pkt, &a).await
170         } else {
171             Err(Error::Other("invalid rtp_writer".to_owned()))
172         }
173     }
174 
175     /// receive_rtcp schedules a new rtcp batch, so it can be read be the stream
receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>)176     pub async fn receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>) {
177         let rtcp_in_tx = self.rtcp_in_tx.lock().await;
178         if let Some(tx) = &*rtcp_in_tx {
179             let _ = tx.send(pkts).await;
180         }
181     }
182 
183     /// receive_rtp schedules a rtp packet, so it can be read be the stream
receive_rtp(&self, pkt: rtp::packet::Packet)184     pub async fn receive_rtp(&self, pkt: rtp::packet::Packet) {
185         let rtp_in_tx = self.rtp_in_tx.lock().await;
186         if let Some(tx) = &*rtp_in_tx {
187             let _ = tx.send(pkt).await;
188         }
189     }
190 
191     /// written_rtcp returns a channel containing the rtcp batches written, modified by the interceptor
written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>192     pub async fn written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
193         let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
194         rtcp_out_modified_rx.recv().await
195     }
196 
197     /// Returns the last rtcp packet bacth that was written, modified by the interceptor.
198     ///
199     /// NB: This method discards all other previously recoreded packet batches.
last_written_rtcp( &self, ) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>200     pub async fn last_written_rtcp(
201         &self,
202     ) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> {
203         let mut last = None;
204         let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await;
205 
206         while let Ok(v) = rtcp_out_modified_rx.try_recv() {
207             last = Some(v);
208         }
209 
210         last
211     }
212 
213     /// written_rtp returns a channel containing rtp packets written, modified by the interceptor
written_rtp(&self) -> Option<rtp::packet::Packet>214     pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> {
215         let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
216         rtp_out_modified_rx.recv().await
217     }
218 
219     /// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
read_rtcp( &self, ) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>>220     pub async fn read_rtcp(
221         &self,
222     ) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>> {
223         let mut rtcp_in_modified_rx = self.rtcp_in_modified_rx.lock().await;
224         rtcp_in_modified_rx.recv().await
225     }
226 
227     /// read_rtp returns a channel containing the rtp packets read, modified by the interceptor
read_rtp(&self) -> Option<Result<rtp::packet::Packet>>228     pub async fn read_rtp(&self) -> Option<Result<rtp::packet::Packet>> {
229         let mut rtp_in_modified_rx = self.rtp_in_modified_rx.lock().await;
230         rtp_in_modified_rx.recv().await
231     }
232 
233     /// cose closes the stream and the underlying interceptor
close(&self) -> Result<()>234     pub async fn close(&self) -> Result<()> {
235         {
236             let mut rtcp_in_tx = self.rtcp_in_tx.lock().await;
237             rtcp_in_tx.take();
238         }
239         {
240             let mut rtp_in_tx = self.rtp_in_tx.lock().await;
241             rtp_in_tx.take();
242         }
243         self.interceptor.close().await
244     }
245 }
246 
247 #[async_trait]
248 impl RTCPWriter for MockStream {
write( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], _attributes: &Attributes, ) -> Result<usize>249     async fn write(
250         &self,
251         pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>],
252         _attributes: &Attributes,
253     ) -> Result<usize> {
254         let _ = self.rtcp_out_modified_tx.send(pkts.to_vec()).await;
255 
256         Ok(0)
257     }
258 }
259 
260 #[async_trait]
261 impl RTCPReader for MockStream {
read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>262     async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
263         let pkts = {
264             let mut rtcp_in = self.rtcp_in_rx.lock().await;
265             rtcp_in.recv().await.ok_or(Error::ErrIoEOF)?
266         };
267 
268         let marshaled = rtcp::packet::marshal(&pkts)?;
269         let n = marshaled.len();
270         if n > buf.len() {
271             return Err(Error::ErrShortBuffer);
272         }
273 
274         buf[..n].copy_from_slice(&marshaled);
275         Ok((n, a.clone()))
276     }
277 }
278 
279 #[async_trait]
280 impl RTPWriter for MockStream {
write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize>281     async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize> {
282         let _ = self.rtp_out_modified_tx.send(pkt.clone()).await;
283         Ok(0)
284     }
285 }
286 
287 #[async_trait]
288 impl RTPReader for MockStream {
read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>289     async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
290         let pkt = {
291             let mut rtp_in = self.rtp_in_rx.lock().await;
292             rtp_in.recv().await.ok_or(Error::ErrIoEOF)?
293         };
294 
295         let marshaled = pkt.marshal()?;
296         let n = marshaled.len();
297         if n > buf.len() {
298             return Err(Error::ErrShortBuffer);
299         }
300 
301         buf[..n].copy_from_slice(&marshaled);
302         Ok((n, a.clone()))
303     }
304 }
305 
306 #[cfg(test)]
307 mod test {
308     use super::*;
309     use crate::noop::NoOp;
310     use crate::test::timeout_or_fail;
311     use tokio::time::Duration;
312 
313     #[tokio::test]
test_mock_stream() -> Result<()>314     async fn test_mock_stream() -> Result<()> {
315         use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
316 
317         let s = MockStream::new(&StreamInfo::default(), Arc::new(NoOp)).await;
318 
319         s.write_rtcp(&[Box::<PictureLossIndication>::default()])
320             .await?;
321         timeout_or_fail(Duration::from_millis(10), s.written_rtcp()).await;
322         let result = tokio::time::timeout(Duration::from_millis(10), s.written_rtcp()).await;
323         assert!(
324             result.is_err(),
325             "single rtcp packet written, but multiple found"
326         );
327 
328         s.write_rtp(&rtp::packet::Packet::default()).await?;
329         timeout_or_fail(Duration::from_millis(10), s.written_rtp()).await;
330         let result = tokio::time::timeout(Duration::from_millis(10), s.written_rtp()).await;
331         assert!(
332             result.is_err(),
333             "single rtp packet written, but multiple found"
334         );
335 
336         s.receive_rtcp(vec![Box::<PictureLossIndication>::default()])
337             .await;
338         assert!(
339             timeout_or_fail(Duration::from_millis(10), s.read_rtcp())
340                 .await
341                 .is_some(),
342             "read rtcp returned error",
343         );
344         let result = tokio::time::timeout(Duration::from_millis(10), s.read_rtcp()).await;
345         assert!(
346             result.is_err(),
347             "single rtcp packet written, but multiple found"
348         );
349 
350         s.receive_rtp(rtp::packet::Packet::default()).await;
351         assert!(
352             timeout_or_fail(Duration::from_millis(10), s.read_rtp())
353                 .await
354                 .is_some(),
355             "read rtp returned error",
356         );
357         let result = tokio::time::timeout(Duration::from_millis(10), s.read_rtp()).await;
358         assert!(
359             result.is_err(),
360             "single rtp packet written, but multiple found"
361         );
362 
363         s.close().await?;
364 
365         Ok(())
366     }
367 }
368