1 use std::collections::HashMap; 2 use std::sync::Arc; 3 use std::time::{Duration, SystemTime}; 4 use tokio::sync::{mpsc, Mutex}; 5 use waitgroup::WaitGroup; 6 7 pub mod receiver; 8 pub mod sender; 9 10 use crate::error::Result; 11 use crate::{Interceptor, InterceptorBuilder}; 12 use receiver::{ReceiverReport, ReceiverReportInternal}; 13 use sender::{SenderReport, SenderReportInternal}; 14 15 type FnTimeGen = Arc<dyn Fn() -> SystemTime + Sync + 'static + Send>; 16 17 /// ReceiverBuilder can be used to configure ReceiverReport Interceptor. 18 #[derive(Default)] 19 pub struct ReportBuilder { 20 is_rr: bool, 21 interval: Option<Duration>, 22 now: Option<FnTimeGen>, 23 } 24 25 impl ReportBuilder { 26 /// with_interval sets send interval for the interceptor. with_interval(mut self, interval: Duration) -> ReportBuilder27 pub fn with_interval(mut self, interval: Duration) -> ReportBuilder { 28 self.interval = Some(interval); 29 self 30 } 31 32 /// with_now_fn sets an alternative for the time.Now function. with_now_fn(mut self, now: FnTimeGen) -> ReportBuilder33 pub fn with_now_fn(mut self, now: FnTimeGen) -> ReportBuilder { 34 self.now = Some(now); 35 self 36 } 37 build_rr(&self) -> ReceiverReport38 fn build_rr(&self) -> ReceiverReport { 39 let (close_tx, close_rx) = mpsc::channel(1); 40 ReceiverReport { 41 internal: Arc::new(ReceiverReportInternal { 42 interval: if let Some(interval) = &self.interval { 43 *interval 44 } else { 45 Duration::from_secs(1) 46 }, 47 now: self.now.clone(), 48 streams: Mutex::new(HashMap::new()), 49 close_rx: Mutex::new(Some(close_rx)), 50 }), 51 52 wg: Mutex::new(Some(WaitGroup::new())), 53 close_tx: Mutex::new(Some(close_tx)), 54 } 55 } 56 build_sr(&self) -> SenderReport57 fn build_sr(&self) -> SenderReport { 58 let (close_tx, close_rx) = mpsc::channel(1); 59 SenderReport { 60 internal: Arc::new(SenderReportInternal { 61 interval: if let Some(interval) = &self.interval { 62 *interval 63 } else { 64 Duration::from_secs(1) 65 }, 66 now: self.now.clone(), 67 streams: Mutex::new(HashMap::new()), 68 close_rx: Mutex::new(Some(close_rx)), 69 }), 70 71 wg: Mutex::new(Some(WaitGroup::new())), 72 close_tx: Mutex::new(Some(close_tx)), 73 } 74 } 75 } 76 77 impl InterceptorBuilder for ReportBuilder { build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>>78 fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> { 79 if self.is_rr { 80 Ok(Arc::new(self.build_rr())) 81 } else { 82 Ok(Arc::new(self.build_sr())) 83 } 84 } 85 } 86