xref: /webrtc/sctp/src/timer/rtx_timer.rs (revision 630c46fe)
1 use crate::association::RtxTimerId;
2 use async_trait::async_trait;
3 use std::sync::{Arc, Weak};
4 use tokio::sync::{mpsc, Mutex};
5 use tokio::time::Duration;
6 
7 pub(crate) const RTO_INITIAL: u64 = 3000; // msec
8 pub(crate) const RTO_MIN: u64 = 1000; // msec
9 pub(crate) const RTO_MAX: u64 = 60000; // msec
10 pub(crate) const RTO_ALPHA: u64 = 1;
11 pub(crate) const RTO_BETA: u64 = 2;
12 pub(crate) const RTO_BASE: u64 = 8;
13 pub(crate) const MAX_INIT_RETRANS: usize = 8;
14 pub(crate) const PATH_MAX_RETRANS: usize = 5;
15 pub(crate) const NO_MAX_RETRANS: usize = 0;
16 
17 /// rtoManager manages Rtx timeout values.
18 /// This is an implementation of RFC 4960 sec 6.3.1.
19 #[derive(Default, Debug)]
20 pub(crate) struct RtoManager {
21     pub(crate) srtt: u64,
22     pub(crate) rttvar: f64,
23     pub(crate) rto: u64,
24     pub(crate) no_update: bool,
25 }
26 
27 impl RtoManager {
28     /// newRTOManager creates a new rtoManager.
new() -> Self29     pub(crate) fn new() -> Self {
30         RtoManager {
31             rto: RTO_INITIAL,
32             ..Default::default()
33         }
34     }
35 
36     /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec.
set_new_rtt(&mut self, rtt: u64) -> u6437     pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 {
38         if self.no_update {
39             return self.srtt;
40         }
41 
42         if self.srtt == 0 {
43             // First measurement
44             self.srtt = rtt;
45             self.rttvar = rtt as f64 / 2.0;
46         } else {
47             // Subsequent rtt measurement
48             self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar
49                 + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64)
50                 / RTO_BASE as f64;
51             self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE;
52         }
53 
54         self.rto = (self.srtt + (4.0 * self.rttvar) as u64).clamp(RTO_MIN, RTO_MAX);
55 
56         self.srtt
57     }
58 
59     /// get_rto simply returns the current RTO in msec.
get_rto(&self) -> u6460     pub(crate) fn get_rto(&self) -> u64 {
61         self.rto
62     }
63 
64     /// reset resets the RTO variables to the initial values.
reset(&mut self)65     pub(crate) fn reset(&mut self) {
66         if self.no_update {
67             return;
68         }
69 
70         self.srtt = 0;
71         self.rttvar = 0.0;
72         self.rto = RTO_INITIAL;
73     }
74 
75     /// set RTO value for testing
set_rto(&mut self, rto: u64, no_update: bool)76     pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) {
77         self.rto = rto;
78         self.no_update = no_update;
79     }
80 }
81 
calculate_next_timeout(rto: u64, n_rtos: usize) -> u6482 pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 {
83     // RFC 4096 sec 6.3.3.  Handle T3-rtx Expiration
84     //   E2)  For the destination address for which the timer expires, set RTO
85     //        <- RTO * 2 ("back off the timer").  The maximum value discussed
86     //        in rule C7 above (RTO.max) may be used to provide an upper bound
87     //        to this doubling operation.
88     if n_rtos < 31 {
89         std::cmp::min(rto << n_rtos, RTO_MAX)
90     } else {
91         RTO_MAX
92     }
93 }
94 
95 /// rtxTimerObserver is the inteface to a timer observer.
96 /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
97 /// from within these callbacks.
98 #[async_trait]
99 pub(crate) trait RtxTimerObserver {
on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize)100     async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize);
on_retransmission_failure(&mut self, timer_id: RtxTimerId)101     async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId);
102 }
103 
104 /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
105 #[derive(Default, Debug)]
106 pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> {
107     pub(crate) timeout_observer: Weak<Mutex<T>>,
108     pub(crate) id: RtxTimerId,
109     pub(crate) max_retrans: usize,
110     pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>,
111 }
112 
113 impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> {
114     /// newRTXTimer creates a new retransmission timer.
115     /// if max_retrans is set to 0, it will keep retransmitting until stop() is called.
116     /// (it will never make on_retransmission_failure() callback.
new( timeout_observer: Weak<Mutex<T>>, id: RtxTimerId, max_retrans: usize, ) -> Self117     pub(crate) fn new(
118         timeout_observer: Weak<Mutex<T>>,
119         id: RtxTimerId,
120         max_retrans: usize,
121     ) -> Self {
122         RtxTimer {
123             timeout_observer,
124             id,
125             max_retrans,
126             close_tx: Arc::new(Mutex::new(None)),
127         }
128     }
129 
130     /// start starts the timer.
start(&self, rto: u64) -> bool131     pub(crate) async fn start(&self, rto: u64) -> bool {
132         // Note: rto value is intentionally not capped by RTO.Min to allow
133         // fast timeout for the tests. Non-test code should pass in the
134         // rto generated by rtoManager get_rto() method which caps the
135         // value at RTO.Min or at RTO.Max.
136 
137         // this timer is already closed
138         let mut close_rx = {
139             let mut close = self.close_tx.lock().await;
140             if close.is_some() {
141                 return false;
142             }
143 
144             let (close_tx, close_rx) = mpsc::channel(1);
145             *close = Some(close_tx);
146             close_rx
147         };
148 
149         let id = self.id;
150         let max_retrans = self.max_retrans;
151         let close_tx = Arc::clone(&self.close_tx);
152         let timeout_observer = self.timeout_observer.clone();
153 
154         tokio::spawn(async move {
155             let mut n_rtos = 0;
156 
157             loop {
158                 let interval = calculate_next_timeout(rto, n_rtos);
159                 let timer = tokio::time::sleep(Duration::from_millis(interval));
160                 tokio::pin!(timer);
161 
162                 tokio::select! {
163                     _ = timer.as_mut() => {
164                         n_rtos+=1;
165 
166                         let failure = {
167                             if let Some(observer) = timeout_observer.upgrade(){
168                                 let mut observer = observer.lock().await;
169                                 if max_retrans == 0 || n_rtos <= max_retrans {
170                                     observer.on_retransmission_timeout(id, n_rtos).await;
171                                     false
172                                 } else {
173                                     observer.on_retransmission_failure(id).await;
174                                     true
175                                 }
176                             }else{
177                                 true
178                             }
179                         };
180                         if failure {
181                             let mut close = close_tx.lock().await;
182                             *close = None;
183                             break;
184                         }
185                     }
186                     _ = close_rx.recv() => break,
187                 }
188             }
189         });
190 
191         true
192     }
193 
194     /// stop stops the timer.
stop(&self)195     pub(crate) async fn stop(&self) {
196         let mut close_tx = self.close_tx.lock().await;
197         close_tx.take();
198     }
199 
200     /// isRunning tests if the timer is running.
201     /// Debug purpose only
is_running(&self) -> bool202     pub(crate) async fn is_running(&self) -> bool {
203         let close_tx = self.close_tx.lock().await;
204         close_tx.is_some()
205     }
206 }
207