xref: /webrtc/interceptor/src/report/sender/mod.rs (revision ffe74184)
1 mod sender_stream;
2 #[cfg(test)]
3 mod sender_test;
4 
5 use super::*;
6 use crate::error::Error;
7 use crate::*;
8 use sender_stream::SenderStream;
9 
10 use std::collections::HashMap;
11 use std::time::{Duration, SystemTime};
12 use tokio::sync::{mpsc, Mutex};
13 use waitgroup::WaitGroup;
14 
15 pub(crate) struct SenderReportInternal {
16     pub(crate) interval: Duration,
17     pub(crate) now: Option<FnTimeGen>,
18     pub(crate) streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
19     pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
20 }
21 
22 /// SenderReport interceptor generates sender reports.
23 pub struct SenderReport {
24     pub(crate) internal: Arc<SenderReportInternal>,
25 
26     pub(crate) wg: Mutex<Option<WaitGroup>>,
27     pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
28 }
29 
30 impl SenderReport {
31     /// builder returns a new ReportBuilder.
builder() -> ReportBuilder32     pub fn builder() -> ReportBuilder {
33         ReportBuilder {
34             is_rr: false,
35             ..Default::default()
36         }
37     }
38 
is_closed(&self) -> bool39     async fn is_closed(&self) -> bool {
40         let close_tx = self.close_tx.lock().await;
41         close_tx.is_none()
42     }
43 
run( rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>, internal: Arc<SenderReportInternal>, ) -> Result<()>44     async fn run(
45         rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
46         internal: Arc<SenderReportInternal>,
47     ) -> Result<()> {
48         let mut ticker = tokio::time::interval(internal.interval);
49         let mut close_rx = {
50             let mut close_rx = internal.close_rx.lock().await;
51             if let Some(close) = close_rx.take() {
52                 close
53             } else {
54                 return Err(Error::ErrInvalidCloseRx);
55             }
56         };
57 
58         loop {
59             tokio::select! {
60                 _ = ticker.tick() =>{
61                     // TODO(cancel safety): This branch isn't cancel safe
62                     let now = if let Some(f) = &internal.now {
63                         f()
64                     } else {
65                         SystemTime::now()
66                     };
67                     let streams:Vec<Arc<SenderStream>> = {
68                         let m = internal.streams.lock().await;
69                         m.values().cloned().collect()
70                     };
71                     for stream in streams {
72                         let pkt = stream.generate_report(now).await;
73 
74                         let a = Attributes::new();
75                         if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
76                             log::warn!("failed sending: {}", err);
77                         }
78                     }
79                 }
80                 _ = close_rx.recv() =>{
81                     return Ok(());
82                 }
83             }
84         }
85     }
86 }
87 
88 #[async_trait]
89 impl Interceptor for SenderReport {
90     /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
91     /// change in the future. The returned method will be called once per packet batch.
bind_rtcp_reader( &self, reader: Arc<dyn RTCPReader + Send + Sync>, ) -> Arc<dyn RTCPReader + Send + Sync>92     async fn bind_rtcp_reader(
93         &self,
94         reader: Arc<dyn RTCPReader + Send + Sync>,
95     ) -> Arc<dyn RTCPReader + Send + Sync> {
96         reader
97     }
98 
99     /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
100     /// will be called once per packet batch.
bind_rtcp_writer( &self, writer: Arc<dyn RTCPWriter + Send + Sync>, ) -> Arc<dyn RTCPWriter + Send + Sync>101     async fn bind_rtcp_writer(
102         &self,
103         writer: Arc<dyn RTCPWriter + Send + Sync>,
104     ) -> Arc<dyn RTCPWriter + Send + Sync> {
105         if self.is_closed().await {
106             return writer;
107         }
108 
109         let mut w = {
110             let wait_group = self.wg.lock().await;
111             wait_group.as_ref().map(|wg| wg.worker())
112         };
113         let writer2 = Arc::clone(&writer);
114         let internal = Arc::clone(&self.internal);
115         tokio::spawn(async move {
116             let _d = w.take();
117             if let Err(err) = SenderReport::run(writer2, internal).await {
118                 log::warn!("bind_rtcp_writer Generator::run got error: {}", err);
119             }
120         });
121 
122         writer
123     }
124 
125     /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
126     /// will be called once per rtp packet.
bind_local_stream( &self, info: &StreamInfo, writer: Arc<dyn RTPWriter + Send + Sync>, ) -> Arc<dyn RTPWriter + Send + Sync>127     async fn bind_local_stream(
128         &self,
129         info: &StreamInfo,
130         writer: Arc<dyn RTPWriter + Send + Sync>,
131     ) -> Arc<dyn RTPWriter + Send + Sync> {
132         let stream = Arc::new(SenderStream::new(
133             info.ssrc,
134             info.clock_rate,
135             writer,
136             self.internal.now.clone(),
137         ));
138         {
139             let mut streams = self.internal.streams.lock().await;
140             streams.insert(info.ssrc, Arc::clone(&stream));
141         }
142 
143         stream
144     }
145 
146     /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_local_stream(&self, info: &StreamInfo)147     async fn unbind_local_stream(&self, info: &StreamInfo) {
148         let mut streams = self.internal.streams.lock().await;
149         streams.remove(&info.ssrc);
150     }
151 
152     /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
153     /// will be called once per rtp packet.
bind_remote_stream( &self, _info: &StreamInfo, reader: Arc<dyn RTPReader + Send + Sync>, ) -> Arc<dyn RTPReader + Send + Sync>154     async fn bind_remote_stream(
155         &self,
156         _info: &StreamInfo,
157         reader: Arc<dyn RTPReader + Send + Sync>,
158     ) -> Arc<dyn RTPReader + Send + Sync> {
159         reader
160     }
161 
162     /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
unbind_remote_stream(&self, _info: &StreamInfo)163     async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
164 
165     /// close closes the Interceptor, cleaning up any data if necessary.
close(&self) -> Result<()>166     async fn close(&self) -> Result<()> {
167         {
168             let mut close_tx = self.close_tx.lock().await;
169             close_tx.take();
170         }
171 
172         {
173             let mut wait_group = self.wg.lock().await;
174             if let Some(wg) = wait_group.take() {
175                 wg.wait().await;
176             }
177         }
178 
179         Ok(())
180     }
181 }
182