1 mod sender_stream; 2 #[cfg(test)] 3 mod sender_test; 4 5 use super::*; 6 use crate::error::Error; 7 use crate::*; 8 use sender_stream::SenderStream; 9 10 use std::collections::HashMap; 11 use std::time::{Duration, SystemTime}; 12 use tokio::sync::{mpsc, Mutex}; 13 use waitgroup::WaitGroup; 14 15 pub(crate) struct SenderReportInternal { 16 pub(crate) interval: Duration, 17 pub(crate) now: Option<FnTimeGen>, 18 pub(crate) streams: Mutex<HashMap<u32, Arc<SenderStream>>>, 19 pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>, 20 } 21 22 /// SenderReport interceptor generates sender reports. 23 pub struct SenderReport { 24 pub(crate) internal: Arc<SenderReportInternal>, 25 26 pub(crate) wg: Mutex<Option<WaitGroup>>, 27 pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>, 28 } 29 30 impl SenderReport { 31 /// builder returns a new ReportBuilder. builder() -> ReportBuilder32 pub fn builder() -> ReportBuilder { 33 ReportBuilder { 34 is_rr: false, 35 ..Default::default() 36 } 37 } 38 is_closed(&self) -> bool39 async fn is_closed(&self) -> bool { 40 let close_tx = self.close_tx.lock().await; 41 close_tx.is_none() 42 } 43 run( rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, internal: Arc<SenderReportInternal>, ) -> Result<()>44 async fn run( 45 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, 46 internal: Arc<SenderReportInternal>, 47 ) -> Result<()> { 48 let mut ticker = tokio::time::interval(internal.interval); 49 let mut close_rx = { 50 let mut close_rx = internal.close_rx.lock().await; 51 if let Some(close) = close_rx.take() { 52 close 53 } else { 54 return Err(Error::ErrInvalidCloseRx); 55 } 56 }; 57 58 loop { 59 tokio::select! { 60 _ = ticker.tick() =>{ 61 // TODO(cancel safety): This branch isn't cancel safe 62 let now = if let Some(f) = &internal.now { 63 f() 64 } else { 65 SystemTime::now() 66 }; 67 let streams:Vec<Arc<SenderStream>> = { 68 let m = internal.streams.lock().await; 69 m.values().cloned().collect() 70 }; 71 for stream in streams { 72 let pkt = stream.generate_report(now).await; 73 74 let a = Attributes::new(); 75 if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{ 76 log::warn!("failed sending: {}", err); 77 } 78 } 79 } 80 _ = close_rx.recv() =>{ 81 return Ok(()); 82 } 83 } 84 } 85 } 86 } 87 88 #[async_trait] 89 impl Interceptor for SenderReport { 90 /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might 91 /// change in the future. The returned method will be called once per packet batch. bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>92 async fn bind_rtcp_reader( 93 &self, 94 reader: Arc<dyn RTCPReader + Send + Sync>, 95 ) -> Arc<dyn RTCPReader + Send + Sync> { 96 reader 97 } 98 99 /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method 100 /// will be called once per packet batch. bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>101 async fn bind_rtcp_writer( 102 &self, 103 writer: Arc<dyn RTCPWriter + Send + Sync>, 104 ) -> Arc<dyn RTCPWriter + Send + Sync> { 105 if self.is_closed().await { 106 return writer; 107 } 108 109 let mut w = { 110 let wait_group = self.wg.lock().await; 111 wait_group.as_ref().map(|wg| wg.worker()) 112 }; 113 let writer2 = Arc::clone(&writer); 114 let internal = Arc::clone(&self.internal); 115 tokio::spawn(async move { 116 let _d = w.take(); 117 if let Err(err) = SenderReport::run(writer2, internal).await { 118 log::warn!("bind_rtcp_writer Generator::run got error: {}", err); 119 } 120 }); 121 122 writer 123 } 124 125 /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method 126 /// will be called once per rtp packet. bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>127 async fn bind_local_stream( 128 &self, 129 info: &StreamInfo, 130 writer: Arc<dyn RTPWriter + Send + Sync>, 131 ) -> Arc<dyn RTPWriter + Send + Sync> { 132 let stream = Arc::new(SenderStream::new( 133 info.ssrc, 134 info.clock_rate, 135 writer, 136 self.internal.now.clone(), 137 )); 138 { 139 let mut streams = self.internal.streams.lock().await; 140 streams.insert(info.ssrc, Arc::clone(&stream)); 141 } 142 143 stream 144 } 145 146 /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_local_stream(&self, info: &StreamInfo)147 async fn unbind_local_stream(&self, info: &StreamInfo) { 148 let mut streams = self.internal.streams.lock().await; 149 streams.remove(&info.ssrc); 150 } 151 152 /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method 153 /// will be called once per rtp packet. bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>154 async fn bind_remote_stream( 155 &self, 156 _info: &StreamInfo, 157 reader: Arc<dyn RTPReader + Send + Sync>, 158 ) -> Arc<dyn RTPReader + Send + Sync> { 159 reader 160 } 161 162 /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track. unbind_remote_stream(&self, _info: &StreamInfo)163 async fn unbind_remote_stream(&self, _info: &StreamInfo) {} 164 165 /// close closes the Interceptor, cleaning up any data if necessary. close(&self) -> Result<()>166 async fn close(&self) -> Result<()> { 167 { 168 let mut close_tx = self.close_tx.lock().await; 169 close_tx.take(); 170 } 171 172 { 173 let mut wait_group = self.wg.lock().await; 174 if let Some(wg) = wait_group.take() { 175 wg.wait().await; 176 } 177 } 178 179 Ok(()) 180 } 181 } 182