1 use crate::error::{Error, Result}; 2 use crate::stream_info::StreamInfo; 3 use crate::{Attributes, Interceptor, RTCPReader, RTCPWriter, RTPReader, RTPWriter}; 4 5 use async_trait::async_trait; 6 use std::sync::Arc; 7 use tokio::sync::{mpsc, Mutex}; 8 use util::{Marshal, Unmarshal}; 9 10 type RTCPPackets = Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>; 11 12 /// MockStream is a helper struct for testing interceptors. 13 pub struct MockStream { 14 interceptor: Arc<dyn Interceptor + Send + Sync>, 15 16 rtcp_writer: Mutex<Option<Arc<dyn RTCPWriter + Send + Sync>>>, 17 rtp_writer: Mutex<Option<Arc<dyn RTPWriter + Send + Sync>>>, 18 19 rtcp_out_modified_tx: mpsc::Sender<RTCPPackets>, 20 rtp_out_modified_tx: mpsc::Sender<rtp::packet::Packet>, 21 rtcp_in_rx: Mutex<mpsc::Receiver<RTCPPackets>>, 22 rtp_in_rx: Mutex<mpsc::Receiver<rtp::packet::Packet>>, 23 24 rtcp_out_modified_rx: Mutex<mpsc::Receiver<RTCPPackets>>, 25 rtp_out_modified_rx: Mutex<mpsc::Receiver<rtp::packet::Packet>>, 26 rtcp_in_tx: Mutex<Option<mpsc::Sender<RTCPPackets>>>, 27 rtp_in_tx: Mutex<Option<mpsc::Sender<rtp::packet::Packet>>>, 28 29 rtcp_in_modified_rx: Mutex<mpsc::Receiver<Result<RTCPPackets>>>, 30 rtp_in_modified_rx: Mutex<mpsc::Receiver<Result<rtp::packet::Packet>>>, 31 } 32 33 impl MockStream { 34 /// new creates a new MockStream new( info: &StreamInfo, interceptor: Arc<dyn Interceptor + Send + Sync>, ) -> Arc<Self>35 pub async fn new( 36 info: &StreamInfo, 37 interceptor: Arc<dyn Interceptor + Send + Sync>, 38 ) -> Arc<Self> { 39 let (rtcp_in_tx, rtcp_in_rx) = mpsc::channel(1000); 40 let (rtp_in_tx, rtp_in_rx) = mpsc::channel(1000); 41 let (rtcp_out_modified_tx, rtcp_out_modified_rx) = mpsc::channel(1000); 42 let (rtp_out_modified_tx, rtp_out_modified_rx) = mpsc::channel(1000); 43 let (rtcp_in_modified_tx, rtcp_in_modified_rx) = mpsc::channel(1000); 44 let (rtp_in_modified_tx, rtp_in_modified_rx) = mpsc::channel(1000); 45 46 let stream = Arc::new(MockStream { 47 interceptor: Arc::clone(&interceptor), 48 49 rtcp_writer: Mutex::new(None), 50 rtp_writer: Mutex::new(None), 51 52 rtcp_in_tx: Mutex::new(Some(rtcp_in_tx)), 53 rtp_in_tx: Mutex::new(Some(rtp_in_tx)), 54 rtcp_in_rx: Mutex::new(rtcp_in_rx), 55 rtp_in_rx: Mutex::new(rtp_in_rx), 56 57 rtcp_out_modified_tx, 58 rtp_out_modified_tx, 59 rtcp_out_modified_rx: Mutex::new(rtcp_out_modified_rx), 60 rtp_out_modified_rx: Mutex::new(rtp_out_modified_rx), 61 62 rtcp_in_modified_rx: Mutex::new(rtcp_in_modified_rx), 63 rtp_in_modified_rx: Mutex::new(rtp_in_modified_rx), 64 }); 65 66 let rtcp_writer = interceptor 67 .bind_rtcp_writer(Arc::clone(&stream) as Arc<dyn RTCPWriter + Send + Sync>) 68 .await; 69 { 70 let mut rw = stream.rtcp_writer.lock().await; 71 *rw = Some(rtcp_writer); 72 } 73 let rtp_writer = interceptor 74 .bind_local_stream( 75 info, 76 Arc::clone(&stream) as Arc<dyn RTPWriter + Send + Sync>, 77 ) 78 .await; 79 { 80 let mut rw = stream.rtp_writer.lock().await; 81 *rw = Some(rtp_writer); 82 } 83 84 let rtcp_reader = interceptor 85 .bind_rtcp_reader(Arc::clone(&stream) as Arc<dyn RTCPReader + Send + Sync>) 86 .await; 87 tokio::spawn(async move { 88 let mut buf = vec![0u8; 1500]; 89 let a = Attributes::new(); 90 loop { 91 let n = match rtcp_reader.read(&mut buf, &a).await { 92 Ok((n, _)) => n, 93 Err(err) => { 94 if Error::ErrIoEOF != err { 95 let _ = rtcp_in_modified_tx.send(Err(err)).await; 96 } 97 break; 98 } 99 }; 100 101 let mut b = &buf[..n]; 102 let pkt = match rtcp::packet::unmarshal(&mut b) { 103 Ok(pkt) => pkt, 104 Err(err) => { 105 let _ = rtcp_in_modified_tx.send(Err(err.into())).await; 106 break; 107 } 108 }; 109 110 let _ = rtcp_in_modified_tx.send(Ok(pkt)).await; 111 } 112 }); 113 114 let rtp_reader = interceptor 115 .bind_remote_stream( 116 info, 117 Arc::clone(&stream) as Arc<dyn RTPReader + Send + Sync>, 118 ) 119 .await; 120 tokio::spawn(async move { 121 let mut buf = vec![0u8; 1500]; 122 let a = Attributes::new(); 123 loop { 124 let n = match rtp_reader.read(&mut buf, &a).await { 125 Ok((n, _)) => n, 126 Err(err) => { 127 if Error::ErrIoEOF != err { 128 let _ = rtp_in_modified_tx.send(Err(err)).await; 129 } 130 break; 131 } 132 }; 133 134 let mut b = &buf[..n]; 135 let pkt = match rtp::packet::Packet::unmarshal(&mut b) { 136 Ok(pkt) => pkt, 137 Err(err) => { 138 let _ = rtp_in_modified_tx.send(Err(err.into())).await; 139 break; 140 } 141 }; 142 143 let _ = rtp_in_modified_tx.send(Ok(pkt)).await; 144 } 145 }); 146 147 stream 148 } 149 150 /// write_rtcp writes a batch of rtcp packet to the stream, using the interceptor write_rtcp( &self, pkt: &[Box<dyn rtcp::packet::Packet + Send + Sync>], ) -> Result<usize>151 pub async fn write_rtcp( 152 &self, 153 pkt: &[Box<dyn rtcp::packet::Packet + Send + Sync>], 154 ) -> Result<usize> { 155 let a = Attributes::new(); 156 let rtcp_writer = self.rtcp_writer.lock().await; 157 if let Some(writer) = &*rtcp_writer { 158 writer.write(pkt, &a).await 159 } else { 160 Err(Error::Other("invalid rtcp_writer".to_owned())) 161 } 162 } 163 164 /// write_rtp writes an rtp packet to the stream, using the interceptor write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize>165 pub async fn write_rtp(&self, pkt: &rtp::packet::Packet) -> Result<usize> { 166 let a = Attributes::new(); 167 let rtp_writer = self.rtp_writer.lock().await; 168 if let Some(writer) = &*rtp_writer { 169 writer.write(pkt, &a).await 170 } else { 171 Err(Error::Other("invalid rtp_writer".to_owned())) 172 } 173 } 174 175 /// receive_rtcp schedules a new rtcp batch, so it can be read be the stream receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>)176 pub async fn receive_rtcp(&self, pkts: Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>) { 177 let rtcp_in_tx = self.rtcp_in_tx.lock().await; 178 if let Some(tx) = &*rtcp_in_tx { 179 let _ = tx.send(pkts).await; 180 } 181 } 182 183 /// receive_rtp schedules a rtp packet, so it can be read be the stream receive_rtp(&self, pkt: rtp::packet::Packet)184 pub async fn receive_rtp(&self, pkt: rtp::packet::Packet) { 185 let rtp_in_tx = self.rtp_in_tx.lock().await; 186 if let Some(tx) = &*rtp_in_tx { 187 let _ = tx.send(pkt).await; 188 } 189 } 190 191 /// written_rtcp returns a channel containing the rtcp batches written, modified by the interceptor written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>192 pub async fn written_rtcp(&self) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> { 193 let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await; 194 rtcp_out_modified_rx.recv().await 195 } 196 197 /// Returns the last rtcp packet bacth that was written, modified by the interceptor. 198 /// 199 /// NB: This method discards all other previously recoreded packet batches. last_written_rtcp( &self, ) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>200 pub async fn last_written_rtcp( 201 &self, 202 ) -> Option<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>> { 203 let mut last = None; 204 let mut rtcp_out_modified_rx = self.rtcp_out_modified_rx.lock().await; 205 206 while let Ok(v) = rtcp_out_modified_rx.try_recv() { 207 last = Some(v); 208 } 209 210 last 211 } 212 213 /// written_rtp returns a channel containing rtp packets written, modified by the interceptor written_rtp(&self) -> Option<rtp::packet::Packet>214 pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> { 215 let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await; 216 rtp_out_modified_rx.recv().await 217 } 218 219 /// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor read_rtcp( &self, ) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>>220 pub async fn read_rtcp( 221 &self, 222 ) -> Option<Result<Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>>> { 223 let mut rtcp_in_modified_rx = self.rtcp_in_modified_rx.lock().await; 224 rtcp_in_modified_rx.recv().await 225 } 226 227 /// read_rtp returns a channel containing the rtp packets read, modified by the interceptor read_rtp(&self) -> Option<Result<rtp::packet::Packet>>228 pub async fn read_rtp(&self) -> Option<Result<rtp::packet::Packet>> { 229 let mut rtp_in_modified_rx = self.rtp_in_modified_rx.lock().await; 230 rtp_in_modified_rx.recv().await 231 } 232 233 /// cose closes the stream and the underlying interceptor close(&self) -> Result<()>234 pub async fn close(&self) -> Result<()> { 235 { 236 let mut rtcp_in_tx = self.rtcp_in_tx.lock().await; 237 rtcp_in_tx.take(); 238 } 239 { 240 let mut rtp_in_tx = self.rtp_in_tx.lock().await; 241 rtp_in_tx.take(); 242 } 243 self.interceptor.close().await 244 } 245 } 246 247 #[async_trait] 248 impl RTCPWriter for MockStream { write( &self, pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], _attributes: &Attributes, ) -> Result<usize>249 async fn write( 250 &self, 251 pkts: &[Box<dyn rtcp::packet::Packet + Send + Sync>], 252 _attributes: &Attributes, 253 ) -> Result<usize> { 254 let _ = self.rtcp_out_modified_tx.send(pkts.to_vec()).await; 255 256 Ok(0) 257 } 258 } 259 260 #[async_trait] 261 impl RTCPReader for MockStream { read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>262 async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { 263 let pkts = { 264 let mut rtcp_in = self.rtcp_in_rx.lock().await; 265 rtcp_in.recv().await.ok_or(Error::ErrIoEOF)? 266 }; 267 268 let marshaled = rtcp::packet::marshal(&pkts)?; 269 let n = marshaled.len(); 270 if n > buf.len() { 271 return Err(Error::ErrShortBuffer); 272 } 273 274 buf[..n].copy_from_slice(&marshaled); 275 Ok((n, a.clone())) 276 } 277 } 278 279 #[async_trait] 280 impl RTPWriter for MockStream { write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize>281 async fn write(&self, pkt: &rtp::packet::Packet, _a: &Attributes) -> Result<usize> { 282 let _ = self.rtp_out_modified_tx.send(pkt.clone()).await; 283 Ok(0) 284 } 285 } 286 287 #[async_trait] 288 impl RTPReader for MockStream { read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>289 async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { 290 let pkt = { 291 let mut rtp_in = self.rtp_in_rx.lock().await; 292 rtp_in.recv().await.ok_or(Error::ErrIoEOF)? 293 }; 294 295 let marshaled = pkt.marshal()?; 296 let n = marshaled.len(); 297 if n > buf.len() { 298 return Err(Error::ErrShortBuffer); 299 } 300 301 buf[..n].copy_from_slice(&marshaled); 302 Ok((n, a.clone())) 303 } 304 } 305 306 #[cfg(test)] 307 mod test { 308 use super::*; 309 use crate::noop::NoOp; 310 use crate::test::timeout_or_fail; 311 use tokio::time::Duration; 312 313 #[tokio::test] test_mock_stream() -> Result<()>314 async fn test_mock_stream() -> Result<()> { 315 use rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication; 316 317 let s = MockStream::new(&StreamInfo::default(), Arc::new(NoOp)).await; 318 319 s.write_rtcp(&[Box::<PictureLossIndication>::default()]) 320 .await?; 321 timeout_or_fail(Duration::from_millis(10), s.written_rtcp()).await; 322 let result = tokio::time::timeout(Duration::from_millis(10), s.written_rtcp()).await; 323 assert!( 324 result.is_err(), 325 "single rtcp packet written, but multiple found" 326 ); 327 328 s.write_rtp(&rtp::packet::Packet::default()).await?; 329 timeout_or_fail(Duration::from_millis(10), s.written_rtp()).await; 330 let result = tokio::time::timeout(Duration::from_millis(10), s.written_rtp()).await; 331 assert!( 332 result.is_err(), 333 "single rtp packet written, but multiple found" 334 ); 335 336 s.receive_rtcp(vec![Box::<PictureLossIndication>::default()]) 337 .await; 338 assert!( 339 timeout_or_fail(Duration::from_millis(10), s.read_rtcp()) 340 .await 341 .is_some(), 342 "read rtcp returned error", 343 ); 344 let result = tokio::time::timeout(Duration::from_millis(10), s.read_rtcp()).await; 345 assert!( 346 result.is_err(), 347 "single rtcp packet written, but multiple found" 348 ); 349 350 s.receive_rtp(rtp::packet::Packet::default()).await; 351 assert!( 352 timeout_or_fail(Duration::from_millis(10), s.read_rtp()) 353 .await 354 .is_some(), 355 "read rtp returned error", 356 ); 357 let result = tokio::time::timeout(Duration::from_millis(10), s.read_rtp()).await; 358 assert!( 359 result.is_err(), 360 "single rtp packet written, but multiple found" 361 ); 362 363 s.close().await?; 364 365 Ok(()) 366 } 367 } 368