1*ffe74184SMartin Algesten use async_trait::async_trait; 2*ffe74184SMartin Algesten use std::sync::Weak; 3*ffe74184SMartin Algesten use tokio::sync::{mpsc, Mutex}; 4*ffe74184SMartin Algesten use tokio::time::Duration; 5*ffe74184SMartin Algesten 6*ffe74184SMartin Algesten pub(crate) const ACK_INTERVAL: Duration = Duration::from_millis(200); 7*ffe74184SMartin Algesten 8*ffe74184SMartin Algesten /// ackTimerObserver is the inteface to an ack timer observer. 9*ffe74184SMartin Algesten #[async_trait] 10*ffe74184SMartin Algesten pub(crate) trait AckTimerObserver { on_ack_timeout(&mut self)11*ffe74184SMartin Algesten async fn on_ack_timeout(&mut self); 12*ffe74184SMartin Algesten } 13*ffe74184SMartin Algesten 14*ffe74184SMartin Algesten /// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 15*ffe74184SMartin Algesten #[derive(Default, Debug)] 16*ffe74184SMartin Algesten pub(crate) struct AckTimer<T: 'static + AckTimerObserver + Send> { 17*ffe74184SMartin Algesten pub(crate) timeout_observer: Weak<Mutex<T>>, 18*ffe74184SMartin Algesten pub(crate) interval: Duration, 19*ffe74184SMartin Algesten pub(crate) close_tx: Option<mpsc::Sender<()>>, 20*ffe74184SMartin Algesten } 21*ffe74184SMartin Algesten 22*ffe74184SMartin Algesten impl<T: 'static + AckTimerObserver + Send> AckTimer<T> { 23*ffe74184SMartin Algesten /// newAckTimer creates a new acknowledgement timer used to enable delayed ack. new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self24*ffe74184SMartin Algesten pub(crate) fn new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self { 25*ffe74184SMartin Algesten AckTimer { 26*ffe74184SMartin Algesten timeout_observer, 27*ffe74184SMartin Algesten interval, 28*ffe74184SMartin Algesten close_tx: None, 29*ffe74184SMartin Algesten } 30*ffe74184SMartin Algesten } 31*ffe74184SMartin Algesten 32*ffe74184SMartin Algesten /// start starts the timer. start(&mut self) -> bool33*ffe74184SMartin Algesten pub(crate) fn start(&mut self) -> bool { 34*ffe74184SMartin Algesten // this timer is already closed 35*ffe74184SMartin Algesten if self.close_tx.is_some() { 36*ffe74184SMartin Algesten return false; 37*ffe74184SMartin Algesten } 38*ffe74184SMartin Algesten 39*ffe74184SMartin Algesten let (close_tx, mut close_rx) = mpsc::channel(1); 40*ffe74184SMartin Algesten let interval = self.interval; 41*ffe74184SMartin Algesten let timeout_observer = self.timeout_observer.clone(); 42*ffe74184SMartin Algesten 43*ffe74184SMartin Algesten tokio::spawn(async move { 44*ffe74184SMartin Algesten let timer = tokio::time::sleep(interval); 45*ffe74184SMartin Algesten tokio::pin!(timer); 46*ffe74184SMartin Algesten 47*ffe74184SMartin Algesten tokio::select! { 48*ffe74184SMartin Algesten _ = timer.as_mut() => { 49*ffe74184SMartin Algesten if let Some(observer) = timeout_observer.upgrade(){ 50*ffe74184SMartin Algesten let mut observer = observer.lock().await; 51*ffe74184SMartin Algesten observer.on_ack_timeout().await; 52*ffe74184SMartin Algesten } 53*ffe74184SMartin Algesten } 54*ffe74184SMartin Algesten _ = close_rx.recv() => {}, 55*ffe74184SMartin Algesten } 56*ffe74184SMartin Algesten }); 57*ffe74184SMartin Algesten 58*ffe74184SMartin Algesten self.close_tx = Some(close_tx); 59*ffe74184SMartin Algesten true 60*ffe74184SMartin Algesten } 61*ffe74184SMartin Algesten 62*ffe74184SMartin Algesten /// stops the timer. this is similar to stop() but subsequent start() call 63*ffe74184SMartin Algesten /// will fail (the timer is no longer usable) stop(&mut self)64*ffe74184SMartin Algesten pub(crate) fn stop(&mut self) { 65*ffe74184SMartin Algesten self.close_tx.take(); 66*ffe74184SMartin Algesten } 67*ffe74184SMartin Algesten 68*ffe74184SMartin Algesten /// isRunning tests if the timer is running. 69*ffe74184SMartin Algesten /// Debug purpose only is_running(&self) -> bool70*ffe74184SMartin Algesten pub(crate) fn is_running(&self) -> bool { 71*ffe74184SMartin Algesten self.close_tx.is_some() 72*ffe74184SMartin Algesten } 73*ffe74184SMartin Algesten } 74