xref: /webrtc/sctp/src/timer/ack_timer.rs (revision ffe74184)
1 use async_trait::async_trait;
2 use std::sync::Weak;
3 use tokio::sync::{mpsc, Mutex};
4 use tokio::time::Duration;
5 
6 pub(crate) const ACK_INTERVAL: Duration = Duration::from_millis(200);
7 
8 /// ackTimerObserver is the inteface to an ack timer observer.
9 #[async_trait]
10 pub(crate) trait AckTimerObserver {
on_ack_timeout(&mut self)11     async fn on_ack_timeout(&mut self);
12 }
13 
14 /// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
15 #[derive(Default, Debug)]
16 pub(crate) struct AckTimer<T: 'static + AckTimerObserver + Send> {
17     pub(crate) timeout_observer: Weak<Mutex<T>>,
18     pub(crate) interval: Duration,
19     pub(crate) close_tx: Option<mpsc::Sender<()>>,
20 }
21 
22 impl<T: 'static + AckTimerObserver + Send> AckTimer<T> {
23     /// newAckTimer creates a new acknowledgement timer used to enable delayed ack.
new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self24     pub(crate) fn new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self {
25         AckTimer {
26             timeout_observer,
27             interval,
28             close_tx: None,
29         }
30     }
31 
32     /// start starts the timer.
start(&mut self) -> bool33     pub(crate) fn start(&mut self) -> bool {
34         // this timer is already closed
35         if self.close_tx.is_some() {
36             return false;
37         }
38 
39         let (close_tx, mut close_rx) = mpsc::channel(1);
40         let interval = self.interval;
41         let timeout_observer = self.timeout_observer.clone();
42 
43         tokio::spawn(async move {
44             let timer = tokio::time::sleep(interval);
45             tokio::pin!(timer);
46 
47             tokio::select! {
48                 _ = timer.as_mut() => {
49                     if let Some(observer) = timeout_observer.upgrade(){
50                         let mut observer = observer.lock().await;
51                         observer.on_ack_timeout().await;
52                     }
53                  }
54                 _ = close_rx.recv() => {},
55             }
56         });
57 
58         self.close_tx = Some(close_tx);
59         true
60     }
61 
62     /// stops the timer. this is similar to stop() but subsequent start() call
63     /// will fail (the timer is no longer usable)
stop(&mut self)64     pub(crate) fn stop(&mut self) {
65         self.close_tx.take();
66     }
67 
68     /// isRunning tests if the timer is running.
69     /// Debug purpose only
is_running(&self) -> bool70     pub(crate) fn is_running(&self) -> bool {
71         self.close_tx.is_some()
72     }
73 }
74