1 mod receiver_stream; 2 #[cfg(test)] 3 mod receiver_test; 4 5 use super::*; 6 use crate::error::Error; 7 use crate::*; 8 use receiver_stream::ReceiverStream; 9 10 use std::collections::HashMap; 11 use std::time::{Duration, SystemTime}; 12 use tokio::sync::{mpsc, Mutex}; 13 use waitgroup::WaitGroup; 14 15 pub(crate) struct ReceiverReportInternal { 16 pub(crate) interval: Duration, 17 pub(crate) now: Option<FnTimeGen>, 18 pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>, 19 pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>, 20 } 21 22 pub(crate) struct ReceiverReportRtcpReader { 23 pub(crate) internal: Arc<ReceiverReportInternal>, 24 pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>, 25 } 26 27 #[async_trait] 28 impl RTCPReader for ReceiverReportRtcpReader { read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>29 async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { 30 let (n, attr) = self.parent_rtcp_reader.read(buf, a).await?; 31 32 let mut b = &buf[..n]; 33 let pkts = rtcp::packet::unmarshal(&mut b)?; 34 35 let now = if let Some(f) = &self.internal.now { 36 f() 37 } else { 38 SystemTime::now() 39 }; 40 41 for p in &pkts { 42 if let Some(sr) = p 43 .as_any() 44 .downcast_ref::<rtcp::sender_report::SenderReport>() 45 { 46 let stream = { 47 let m = self.internal.streams.lock().await; 48 m.get(&sr.ssrc).cloned() 49 }; 50 if let Some(stream) = stream { 51 stream.process_sender_report(now, sr); 52 } 53 } 54 } 55 56 Ok((n, attr)) 57 } 58 } 59 60 /// ReceiverReport interceptor generates receiver reports. 61 pub struct ReceiverReport { 62 pub(crate) internal: Arc<ReceiverReportInternal>, 63 64 pub(crate) wg: Mutex<Option<WaitGroup>>, 65 pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>, 66 } 67 68 impl ReceiverReport { 69 /// builder returns a new ReportBuilder. builder() -> ReportBuilder70 pub fn builder() -> ReportBuilder { 71 ReportBuilder { 72 is_rr: true, 73 ..Default::default() 74 } 75 } 76 is_closed(&self) -> bool77 async fn is_closed(&self) -> bool { 78 let close_tx = self.close_tx.lock().await; 79 close_tx.is_none() 80 } 81 run( rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, internal: Arc<ReceiverReportInternal>, ) -> Result<()>82 async fn run( 83 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, 84 internal: Arc<ReceiverReportInternal>, 85 ) -> Result<()> { 86 let mut ticker = tokio::time::interval(internal.interval); 87 let mut close_rx = { 88 let mut close_rx = internal.close_rx.lock().await; 89 if let Some(close) = close_rx.take() { 90 close 91 } else { 92 return Err(Error::ErrInvalidCloseRx); 93 } 94 }; 95 96 loop { 97 tokio::select! { 98 _ = ticker.tick() =>{ 99 // TODO(cancel safety): This branch isn't cancel safe 100 101 let now = if let Some(f) = &internal.now { 102 f() 103 } else { 104 SystemTime::now() 105 }; 106 let streams:Vec<Arc<ReceiverStream>> = { 107 let m = internal.streams.lock().await; 108 m.values().cloned().collect() 109 }; 110 for stream in streams { 111 let pkt = stream.generate_report(now); 112 113 let a = Attributes::new(); 114 if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{ 115 log::warn!("failed sending: {}", err); 116 } 117 } 118 } 119 _ = close_rx.recv() =>{ 120 return Ok(()); 121 } 122 } 123 } 124 } 125 } 126 127 #[async_trait] 128 impl Interceptor for ReceiverReport { 129 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might 130 /// change in the future. The returned method will be called once per packet batch. bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>131 async fn bind_rtcp_reader( 132 &self, 133 reader: Arc<dyn RTCPReader + Send + Sync>, 134 ) -> Arc<dyn RTCPReader + Send + Sync> { 135 Arc::new(ReceiverReportRtcpReader { 136 internal: Arc::clone(&self.internal), 137 parent_rtcp_reader: reader, 138 }) 139 } 140 141 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method 142 /// will be called once per packet batch. bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>143 async fn bind_rtcp_writer( 144 &self, 145 writer: Arc<dyn RTCPWriter + Send + Sync>, 146 ) -> Arc<dyn RTCPWriter + Send + Sync> { 147 if self.is_closed().await { 148 return writer; 149 } 150 151 let mut w = { 152 let wait_group = self.wg.lock().await; 153 wait_group.as_ref().map(|wg| wg.worker()) 154 }; 155 let writer2 = Arc::clone(&writer); 156 let internal = Arc::clone(&self.internal); 157 tokio::spawn(async move { 158 let _d = w.take(); 159 if let Err(err) = ReceiverReport::run(writer2, internal).await { 160 log::warn!("bind_rtcp_writer ReceiverReport::run got error: {}", err); 161 } 162 }); 163 164 writer 165 } 166 167 /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method 168 /// will be called once per rtp packet. bind_local_stream( &self, _info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>169 async fn bind_local_stream( 170 &self, 171 _info: &StreamInfo, 172 writer: Arc<dyn RTPWriter + Send + Sync>, 173 ) -> Arc<dyn RTPWriter + Send + Sync> { 174 writer 175 } 176 177 /// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_local_stream(&self, _info: &StreamInfo)178 async fn unbind_local_stream(&self, _info: &StreamInfo) {} 179 180 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method 181 /// will be called once per rtp packet. bind_remote_stream( &self, info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>182 async fn bind_remote_stream( 183 &self, 184 info: &StreamInfo, 185 reader: Arc<dyn RTPReader + Send + Sync>, 186 ) -> Arc<dyn RTPReader + Send + Sync> { 187 let stream = Arc::new(ReceiverStream::new( 188 info.ssrc, 189 info.clock_rate, 190 reader, 191 self.internal.now.clone(), 192 )); 193 { 194 let mut streams = self.internal.streams.lock().await; 195 streams.insert(info.ssrc, Arc::clone(&stream)); 196 } 197 198 stream 199 } 200 201 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_remote_stream(&self, info: &StreamInfo)202 async fn unbind_remote_stream(&self, info: &StreamInfo) { 203 let mut streams = self.internal.streams.lock().await; 204 streams.remove(&info.ssrc); 205 } 206 207 /// close closes the Interceptor, cleaning up any data if necessary. close(&self) -> Result<()>208 async fn close(&self) -> Result<()> { 209 { 210 let mut close_tx = self.close_tx.lock().await; 211 close_tx.take(); 212 } 213 214 { 215 let mut wait_group = self.wg.lock().await; 216 if let Some(wg) = wait_group.take() { 217 wg.wait().await; 218 } 219 } 220 221 Ok(()) 222 } 223 } 224