xref: /webrtc/interceptor/src/report/receiver/mod.rs (revision ce164036)
1 mod receiver_stream;
2 #[cfg(test)]
3 mod receiver_test;
4 
5 use super::*;
6 use crate::error::Error;
7 use crate::*;
8 use receiver_stream::ReceiverStream;
9 
10 use std::collections::HashMap;
11 use std::time::{Duration, SystemTime};
12 use tokio::sync::{mpsc, Mutex};
13 use waitgroup::WaitGroup;
14 
15 pub(crate) struct ReceiverReportInternal {
16     pub(crate) interval: Duration,
17     pub(crate) now: Option<FnTimeGen>,
18     pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
19     pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
20 }
21 
22 pub(crate) struct ReceiverReportRtcpReader {
23     pub(crate) internal: Arc<ReceiverReportInternal>,
24     pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
25 }
26 
27 #[async_trait]
28 impl RTCPReader for ReceiverReportRtcpReader {
read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)>29     async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
30         let (n, attr) = self.parent_rtcp_reader.read(buf, a).await?;
31 
32         let mut b = &buf[..n];
33         let pkts = rtcp::packet::unmarshal(&mut b)?;
34 
35         let now = if let Some(f) = &self.internal.now {
36             f()
37         } else {
38             SystemTime::now()
39         };
40 
41         for p in &pkts {
42             if let Some(sr) = p
43                 .as_any()
44                 .downcast_ref::<rtcp::sender_report::SenderReport>()
45             {
46                 let stream = {
47                     let m = self.internal.streams.lock().await;
48                     m.get(&sr.ssrc).cloned()
49                 };
50                 if let Some(stream) = stream {
51                     stream.process_sender_report(now, sr);
52                 }
53             }
54         }
55 
56         Ok((n, attr))
57     }
58 }
59 
60 /// ReceiverReport interceptor generates receiver reports.
61 pub struct ReceiverReport {
62     pub(crate) internal: Arc<ReceiverReportInternal>,
63 
64     pub(crate) wg: Mutex<Option<WaitGroup>>,
65     pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
66 }
67 
68 impl ReceiverReport {
69     /// builder returns a new ReportBuilder.
builder() -> ReportBuilder70     pub fn builder() -> ReportBuilder {
71         ReportBuilder {
72             is_rr: true,
73             ..Default::default()
74         }
75     }
76 
is_closed(&self) -> bool77     async fn is_closed(&self) -> bool {
78         let close_tx = self.close_tx.lock().await;
79         close_tx.is_none()
80     }
81 
run( rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, internal: Arc<ReceiverReportInternal>, ) -> Result<()>82     async fn run(
83         rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
84         internal: Arc<ReceiverReportInternal>,
85     ) -> Result<()> {
86         let mut ticker = tokio::time::interval(internal.interval);
87         let mut close_rx = {
88             let mut close_rx = internal.close_rx.lock().await;
89             if let Some(close) = close_rx.take() {
90                 close
91             } else {
92                 return Err(Error::ErrInvalidCloseRx);
93             }
94         };
95 
96         loop {
97             tokio::select! {
98                 _ = ticker.tick() =>{
99                     // TODO(cancel safety): This branch isn't cancel safe
100 
101                     let now = if let Some(f) = &internal.now {
102                         f()
103                     } else {
104                         SystemTime::now()
105                     };
106                     let streams:Vec<Arc<ReceiverStream>> = {
107                         let m = internal.streams.lock().await;
108                         m.values().cloned().collect()
109                     };
110                     for stream in streams {
111                         let pkt = stream.generate_report(now);
112 
113                         let a = Attributes::new();
114                         if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
115                             log::warn!("failed sending: {}", err);
116                         }
117                     }
118                 }
119                 _ = close_rx.recv() =>{
120                     return Ok(());
121                 }
122             }
123         }
124     }
125 }
126 
127 #[async_trait]
128 impl Interceptor for ReceiverReport {
129     /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
130     /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>131     async fn bind_rtcp_reader(
132         &self,
133         reader: Arc<dyn RTCPReader + Send + Sync>,
134     ) -> Arc<dyn RTCPReader + Send + Sync> {
135         Arc::new(ReceiverReportRtcpReader {
136             internal: Arc::clone(&self.internal),
137             parent_rtcp_reader: reader,
138         })
139     }
140 
141     /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
142     /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>143     async fn bind_rtcp_writer(
144         &self,
145         writer: Arc<dyn RTCPWriter + Send + Sync>,
146     ) -> Arc<dyn RTCPWriter + Send + Sync> {
147         if self.is_closed().await {
148             return writer;
149         }
150 
151         let mut w = {
152             let wait_group = self.wg.lock().await;
153             wait_group.as_ref().map(|wg| wg.worker())
154         };
155         let writer2 = Arc::clone(&writer);
156         let internal = Arc::clone(&self.internal);
157         tokio::spawn(async move {
158             let _d = w.take();
159             if let Err(err) = ReceiverReport::run(writer2, internal).await {
160                 log::warn!("bind_rtcp_writer ReceiverReport::run got error: {}", err);
161             }
162         });
163 
164         writer
165     }
166 
167     /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
168     /// will be called once per rtp packet.
bind_local_stream( &self, _info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>169     async fn bind_local_stream(
170         &self,
171         _info: &StreamInfo,
172         writer: Arc<dyn RTPWriter + Send + Sync>,
173     ) -> Arc<dyn RTPWriter + Send + Sync> {
174         writer
175     }
176 
177     /// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, _info: &StreamInfo)178     async fn unbind_local_stream(&self, _info: &StreamInfo) {}
179 
180     /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
181     /// will be called once per rtp packet.
bind_remote_stream( &self, info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>182     async fn bind_remote_stream(
183         &self,
184         info: &StreamInfo,
185         reader: Arc<dyn RTPReader + Send + Sync>,
186     ) -> Arc<dyn RTPReader + Send + Sync> {
187         let stream = Arc::new(ReceiverStream::new(
188             info.ssrc,
189             info.clock_rate,
190             reader,
191             self.internal.now.clone(),
192         ));
193         {
194             let mut streams = self.internal.streams.lock().await;
195             streams.insert(info.ssrc, Arc::clone(&stream));
196         }
197 
198         stream
199     }
200 
201     /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, info: &StreamInfo)202     async fn unbind_remote_stream(&self, info: &StreamInfo) {
203         let mut streams = self.internal.streams.lock().await;
204         streams.remove(&info.ssrc);
205     }
206 
207     /// close closes the Interceptor, cleaning up any data if necessary.
close(&self) -> Result<()>208     async fn close(&self) -> Result<()> {
209         {
210             let mut close_tx = self.close_tx.lock().await;
211             close_tx.take();
212         }
213 
214         {
215             let mut wait_group = self.wg.lock().await;
216             if let Some(wg) = wait_group.take() {
217                 wg.wait().await;
218             }
219         }
220 
221         Ok(())
222     }
223 }
224