xref: /webrtc/sctp/src/timer/ack_timer.rs (revision ffe74184)
1*ffe74184SMartin Algesten use async_trait::async_trait;
2*ffe74184SMartin Algesten use std::sync::Weak;
3*ffe74184SMartin Algesten use tokio::sync::{mpsc, Mutex};
4*ffe74184SMartin Algesten use tokio::time::Duration;
5*ffe74184SMartin Algesten 
6*ffe74184SMartin Algesten pub(crate) const ACK_INTERVAL: Duration = Duration::from_millis(200);
7*ffe74184SMartin Algesten 
8*ffe74184SMartin Algesten /// ackTimerObserver is the inteface to an ack timer observer.
9*ffe74184SMartin Algesten #[async_trait]
10*ffe74184SMartin Algesten pub(crate) trait AckTimerObserver {
on_ack_timeout(&mut self)11*ffe74184SMartin Algesten     async fn on_ack_timeout(&mut self);
12*ffe74184SMartin Algesten }
13*ffe74184SMartin Algesten 
14*ffe74184SMartin Algesten /// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
15*ffe74184SMartin Algesten #[derive(Default, Debug)]
16*ffe74184SMartin Algesten pub(crate) struct AckTimer<T: 'static + AckTimerObserver + Send> {
17*ffe74184SMartin Algesten     pub(crate) timeout_observer: Weak<Mutex<T>>,
18*ffe74184SMartin Algesten     pub(crate) interval: Duration,
19*ffe74184SMartin Algesten     pub(crate) close_tx: Option<mpsc::Sender<()>>,
20*ffe74184SMartin Algesten }
21*ffe74184SMartin Algesten 
22*ffe74184SMartin Algesten impl<T: 'static + AckTimerObserver + Send> AckTimer<T> {
23*ffe74184SMartin Algesten     /// newAckTimer creates a new acknowledgement timer used to enable delayed ack.
new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self24*ffe74184SMartin Algesten     pub(crate) fn new(timeout_observer: Weak<Mutex<T>>, interval: Duration) -> Self {
25*ffe74184SMartin Algesten         AckTimer {
26*ffe74184SMartin Algesten             timeout_observer,
27*ffe74184SMartin Algesten             interval,
28*ffe74184SMartin Algesten             close_tx: None,
29*ffe74184SMartin Algesten         }
30*ffe74184SMartin Algesten     }
31*ffe74184SMartin Algesten 
32*ffe74184SMartin Algesten     /// start starts the timer.
start(&mut self) -> bool33*ffe74184SMartin Algesten     pub(crate) fn start(&mut self) -> bool {
34*ffe74184SMartin Algesten         // this timer is already closed
35*ffe74184SMartin Algesten         if self.close_tx.is_some() {
36*ffe74184SMartin Algesten             return false;
37*ffe74184SMartin Algesten         }
38*ffe74184SMartin Algesten 
39*ffe74184SMartin Algesten         let (close_tx, mut close_rx) = mpsc::channel(1);
40*ffe74184SMartin Algesten         let interval = self.interval;
41*ffe74184SMartin Algesten         let timeout_observer = self.timeout_observer.clone();
42*ffe74184SMartin Algesten 
43*ffe74184SMartin Algesten         tokio::spawn(async move {
44*ffe74184SMartin Algesten             let timer = tokio::time::sleep(interval);
45*ffe74184SMartin Algesten             tokio::pin!(timer);
46*ffe74184SMartin Algesten 
47*ffe74184SMartin Algesten             tokio::select! {
48*ffe74184SMartin Algesten                 _ = timer.as_mut() => {
49*ffe74184SMartin Algesten                     if let Some(observer) = timeout_observer.upgrade(){
50*ffe74184SMartin Algesten                         let mut observer = observer.lock().await;
51*ffe74184SMartin Algesten                         observer.on_ack_timeout().await;
52*ffe74184SMartin Algesten                     }
53*ffe74184SMartin Algesten                  }
54*ffe74184SMartin Algesten                 _ = close_rx.recv() => {},
55*ffe74184SMartin Algesten             }
56*ffe74184SMartin Algesten         });
57*ffe74184SMartin Algesten 
58*ffe74184SMartin Algesten         self.close_tx = Some(close_tx);
59*ffe74184SMartin Algesten         true
60*ffe74184SMartin Algesten     }
61*ffe74184SMartin Algesten 
62*ffe74184SMartin Algesten     /// stops the timer. this is similar to stop() but subsequent start() call
63*ffe74184SMartin Algesten     /// will fail (the timer is no longer usable)
stop(&mut self)64*ffe74184SMartin Algesten     pub(crate) fn stop(&mut self) {
65*ffe74184SMartin Algesten         self.close_tx.take();
66*ffe74184SMartin Algesten     }
67*ffe74184SMartin Algesten 
68*ffe74184SMartin Algesten     /// isRunning tests if the timer is running.
69*ffe74184SMartin Algesten     /// Debug purpose only
is_running(&self) -> bool70*ffe74184SMartin Algesten     pub(crate) fn is_running(&self) -> bool {
71*ffe74184SMartin Algesten         self.close_tx.is_some()
72*ffe74184SMartin Algesten     }
73*ffe74184SMartin Algesten }
74