xref: /webrtc/interceptor/src/report/mod.rs (revision ffe74184)
1 use std::collections::HashMap;
2 use std::sync::Arc;
3 use std::time::{Duration, SystemTime};
4 use tokio::sync::{mpsc, Mutex};
5 use waitgroup::WaitGroup;
6 
7 pub mod receiver;
8 pub mod sender;
9 
10 use crate::error::Result;
11 use crate::{Interceptor, InterceptorBuilder};
12 use receiver::{ReceiverReport, ReceiverReportInternal};
13 use sender::{SenderReport, SenderReportInternal};
14 
15 type FnTimeGen = Arc<dyn Fn() -> SystemTime + Sync + 'static + Send>;
16 
17 /// ReceiverBuilder can be used to configure ReceiverReport Interceptor.
18 #[derive(Default)]
19 pub struct ReportBuilder {
20     is_rr: bool,
21     interval: Option<Duration>,
22     now: Option<FnTimeGen>,
23 }
24 
25 impl ReportBuilder {
26     /// with_interval sets send interval for the interceptor.
with_interval(mut self, interval: Duration) -> ReportBuilder27     pub fn with_interval(mut self, interval: Duration) -> ReportBuilder {
28         self.interval = Some(interval);
29         self
30     }
31 
32     /// with_now_fn sets an alternative for the time.Now function.
with_now_fn(mut self, now: FnTimeGen) -> ReportBuilder33     pub fn with_now_fn(mut self, now: FnTimeGen) -> ReportBuilder {
34         self.now = Some(now);
35         self
36     }
37 
build_rr(&self) -> ReceiverReport38     fn build_rr(&self) -> ReceiverReport {
39         let (close_tx, close_rx) = mpsc::channel(1);
40         ReceiverReport {
41             internal: Arc::new(ReceiverReportInternal {
42                 interval: if let Some(interval) = &self.interval {
43                     *interval
44                 } else {
45                     Duration::from_secs(1)
46                 },
47                 now: self.now.clone(),
48                 streams: Mutex::new(HashMap::new()),
49                 close_rx: Mutex::new(Some(close_rx)),
50             }),
51 
52             wg: Mutex::new(Some(WaitGroup::new())),
53             close_tx: Mutex::new(Some(close_tx)),
54         }
55     }
56 
build_sr(&self) -> SenderReport57     fn build_sr(&self) -> SenderReport {
58         let (close_tx, close_rx) = mpsc::channel(1);
59         SenderReport {
60             internal: Arc::new(SenderReportInternal {
61                 interval: if let Some(interval) = &self.interval {
62                     *interval
63                 } else {
64                     Duration::from_secs(1)
65                 },
66                 now: self.now.clone(),
67                 streams: Mutex::new(HashMap::new()),
68                 close_rx: Mutex::new(Some(close_rx)),
69             }),
70 
71             wg: Mutex::new(Some(WaitGroup::new())),
72             close_tx: Mutex::new(Some(close_tx)),
73         }
74     }
75 }
76 
77 impl InterceptorBuilder for ReportBuilder {
build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>78     fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
79         if self.is_rr {
80             Ok(Arc::new(self.build_rr()))
81         } else {
82             Ok(Arc::new(self.build_sr()))
83         }
84     }
85 }
86