xref: /webrtc/sctp/src/timer/rtx_timer.rs (revision 04f0bd9e)
1 use crate::association::RtxTimerId;
2 use async_trait::async_trait;
3 use std::sync::{Arc, Weak};
4 use tokio::sync::{mpsc, Mutex};
5 use tokio::time::Duration;
6 
7 pub(crate) const RTO_INITIAL: u64 = 3000; // msec
8 pub(crate) const RTO_MIN: u64 = 1000; // msec
9 pub(crate) const RTO_MAX: u64 = 60000; // msec
10 pub(crate) const RTO_ALPHA: u64 = 1;
11 pub(crate) const RTO_BETA: u64 = 2;
12 pub(crate) const RTO_BASE: u64 = 8;
13 pub(crate) const MAX_INIT_RETRANS: usize = 8;
14 pub(crate) const PATH_MAX_RETRANS: usize = 5;
15 pub(crate) const NO_MAX_RETRANS: usize = 0;
16 
17 /// rtoManager manages Rtx timeout values.
18 /// This is an implementation of RFC 4960 sec 6.3.1.
19 #[derive(Default, Debug)]
20 pub(crate) struct RtoManager {
21     pub(crate) srtt: u64,
22     pub(crate) rttvar: f64,
23     pub(crate) rto: u64,
24     pub(crate) no_update: bool,
25 }
26 
27 impl RtoManager {
28     /// newRTOManager creates a new rtoManager.
29     pub(crate) fn new() -> Self {
30         RtoManager {
31             rto: RTO_INITIAL,
32             ..Default::default()
33         }
34     }
35 
36     /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec.
37     pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 {
38         if self.no_update {
39             return self.srtt;
40         }
41 
42         if self.srtt == 0 {
43             // First measurement
44             self.srtt = rtt;
45             self.rttvar = rtt as f64 / 2.0;
46         } else {
47             // Subsequent rtt measurement
48             self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar
49                 + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64)
50                 / RTO_BASE as f64;
51             self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE;
52         }
53 
54         self.rto = std::cmp::min(
55             std::cmp::max(self.srtt + (4.0 * self.rttvar) as u64, RTO_MIN),
56             RTO_MAX,
57         );
58 
59         self.srtt
60     }
61 
62     /// get_rto simply returns the current RTO in msec.
63     pub(crate) fn get_rto(&self) -> u64 {
64         self.rto
65     }
66 
67     /// reset resets the RTO variables to the initial values.
68     pub(crate) fn reset(&mut self) {
69         if self.no_update {
70             return;
71         }
72 
73         self.srtt = 0;
74         self.rttvar = 0.0;
75         self.rto = RTO_INITIAL;
76     }
77 
78     /// set RTO value for testing
79     pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) {
80         self.rto = rto;
81         self.no_update = no_update;
82     }
83 }
84 
85 pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 {
86     // RFC 4096 sec 6.3.3.  Handle T3-rtx Expiration
87     //   E2)  For the destination address for which the timer expires, set RTO
88     //        <- RTO * 2 ("back off the timer").  The maximum value discussed
89     //        in rule C7 above (RTO.max) may be used to provide an upper bound
90     //        to this doubling operation.
91     if n_rtos < 31 {
92         std::cmp::min(rto << n_rtos, RTO_MAX)
93     } else {
94         RTO_MAX
95     }
96 }
97 
98 /// rtxTimerObserver is the inteface to a timer observer.
99 /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
100 /// from within these callbacks.
101 #[async_trait]
102 pub(crate) trait RtxTimerObserver {
103     async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize);
104     async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId);
105 }
106 
107 /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
108 #[derive(Default, Debug)]
109 pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> {
110     pub(crate) timeout_observer: Weak<Mutex<T>>,
111     pub(crate) id: RtxTimerId,
112     pub(crate) max_retrans: usize,
113     pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>,
114 }
115 
116 impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> {
117     /// newRTXTimer creates a new retransmission timer.
118     /// if max_retrans is set to 0, it will keep retransmitting until stop() is called.
119     /// (it will never make on_retransmission_failure() callback.
120     pub(crate) fn new(
121         timeout_observer: Weak<Mutex<T>>,
122         id: RtxTimerId,
123         max_retrans: usize,
124     ) -> Self {
125         RtxTimer {
126             timeout_observer,
127             id,
128             max_retrans,
129             close_tx: Arc::new(Mutex::new(None)),
130         }
131     }
132 
133     /// start starts the timer.
134     pub(crate) async fn start(&self, rto: u64) -> bool {
135         // Note: rto value is intentionally not capped by RTO.Min to allow
136         // fast timeout for the tests. Non-test code should pass in the
137         // rto generated by rtoManager get_rto() method which caps the
138         // value at RTO.Min or at RTO.Max.
139 
140         // this timer is already closed
141         let mut close_rx = {
142             let mut close = self.close_tx.lock().await;
143             if close.is_some() {
144                 return false;
145             }
146 
147             let (close_tx, close_rx) = mpsc::channel(1);
148             *close = Some(close_tx);
149             close_rx
150         };
151 
152         let id = self.id;
153         let max_retrans = self.max_retrans;
154         let close_tx = Arc::clone(&self.close_tx);
155         let timeout_observer = self.timeout_observer.clone();
156 
157         tokio::spawn(async move {
158             let mut n_rtos = 0;
159 
160             loop {
161                 let interval = calculate_next_timeout(rto, n_rtos);
162                 let timer = tokio::time::sleep(Duration::from_millis(interval));
163                 tokio::pin!(timer);
164 
165                 tokio::select! {
166                     _ = timer.as_mut() => {
167                         n_rtos+=1;
168 
169                         let failure = {
170                             if let Some(observer) = timeout_observer.upgrade(){
171                                 let mut observer = observer.lock().await;
172                                 if max_retrans == 0 || n_rtos <= max_retrans {
173                                     observer.on_retransmission_timeout(id, n_rtos).await;
174                                     false
175                                 } else {
176                                     observer.on_retransmission_failure(id).await;
177                                     true
178                                 }
179                             }else{
180                                 true
181                             }
182                         };
183                         if failure {
184                             let mut close = close_tx.lock().await;
185                             *close = None;
186                             break;
187                         }
188                     }
189                     _ = close_rx.recv() => break,
190                 }
191             }
192         });
193 
194         true
195     }
196 
197     /// stop stops the timer.
198     pub(crate) async fn stop(&self) {
199         let mut close_tx = self.close_tx.lock().await;
200         close_tx.take();
201     }
202 
203     /// isRunning tests if the timer is running.
204     /// Debug purpose only
205     pub(crate) async fn is_running(&self) -> bool {
206         let close_tx = self.close_tx.lock().await;
207         close_tx.is_some()
208     }
209 }
210