1 use async_trait::async_trait; 2 use std::sync::Weak; 3 use tokio::sync::{mpsc, Mutex}; 4 use tokio::time::Duration; 5 6 pub(crate) const ACK_INTERVAL: Duration = Duration::from_millis(200); 7 8 /// ackTimerObserver is the inteface to an ack timer observer. 9 #[async_trait] 10 pub(crate) trait AckTimerObserver { on_ack_timeout(&mut self)11 async fn on_ack_timeout(&mut self); 12 } 13 14 /// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 15 #[derive(Default, Debug)] 16 pub(crate) struct AckTimer<T: 'static + AckTimerObserver + Send> { 17 pub(crate) timeout_observer: Weak<Mutex<T>>, 18 pub(crate) interval: Duration, 19 pub(crate) close_tx: Option<mpsc::Sender<()>>, 20 } 21 22 impl<T: 'static + AckTimerObserver + Send> AckTimer<T> { 23 /// newAckTimer creates a new acknowledgement timer used to enable delayed ack. new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self24 pub(crate) fn new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self { 25 AckTimer { 26 timeout_observer, 27 interval, 28 close_tx: None, 29 } 30 } 31 32 /// start starts the timer. start(&mut self) -> bool33 pub(crate) fn start(&mut self) -> bool { 34 // this timer is already closed 35 if self.close_tx.is_some() { 36 return false; 37 } 38 39 let (close_tx, mut close_rx) = mpsc::channel(1); 40 let interval = self.interval; 41 let timeout_observer = self.timeout_observer.clone(); 42 43 tokio::spawn(async move { 44 let timer = tokio::time::sleep(interval); 45 tokio::pin!(timer); 46 47 tokio::select! { 48 _ = timer.as_mut() => { 49 if let Some(observer) = timeout_observer.upgrade(){ 50 let mut observer = observer.lock().await; 51 observer.on_ack_timeout().await; 52 } 53 } 54 _ = close_rx.recv() => {}, 55 } 56 }); 57 58 self.close_tx = Some(close_tx); 59 true 60 } 61 62 /// stops the timer. this is similar to stop() but subsequent start() call 63 /// will fail (the timer is no longer usable) stop(&mut self)64 pub(crate) fn stop(&mut self) { 65 self.close_tx.take(); 66 } 67 68 /// isRunning tests if the timer is running. 69 /// Debug purpose only is_running(&self) -> bool70 pub(crate) fn is_running(&self) -> bool { 71 self.close_tx.is_some() 72 } 73 } 74