1 use crate::association::RtxTimerId;
2 use async_trait::async_trait;
3 use std::sync::{Arc, Weak};
4 use tokio::sync::{mpsc, Mutex};
5 use tokio::time::Duration;
6
7 pub(crate) const RTO_INITIAL: u64 = 3000; // msec
8 pub(crate) const RTO_MIN: u64 = 1000; // msec
9 pub(crate) const RTO_MAX: u64 = 60000; // msec
10 pub(crate) const RTO_ALPHA: u64 = 1;
11 pub(crate) const RTO_BETA: u64 = 2;
12 pub(crate) const RTO_BASE: u64 = 8;
13 pub(crate) const MAX_INIT_RETRANS: usize = 8;
14 pub(crate) const PATH_MAX_RETRANS: usize = 5;
15 pub(crate) const NO_MAX_RETRANS: usize = 0;
16
17 /// rtoManager manages Rtx timeout values.
18 /// This is an implementation of RFC 4960 sec 6.3.1.
19 #[derive(Default, Debug)]
20 pub(crate) struct RtoManager {
21 pub(crate) srtt: u64,
22 pub(crate) rttvar: f64,
23 pub(crate) rto: u64,
24 pub(crate) no_update: bool,
25 }
26
27 impl RtoManager {
28 /// newRTOManager creates a new rtoManager.
new() -> Self29 pub(crate) fn new() -> Self {
30 RtoManager {
31 rto: RTO_INITIAL,
32 ..Default::default()
33 }
34 }
35
36 /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec.
set_new_rtt(&mut self, rtt: u64) -> u6437 pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 {
38 if self.no_update {
39 return self.srtt;
40 }
41
42 if self.srtt == 0 {
43 // First measurement
44 self.srtt = rtt;
45 self.rttvar = rtt as f64 / 2.0;
46 } else {
47 // Subsequent rtt measurement
48 self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar
49 + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64)
50 / RTO_BASE as f64;
51 self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE;
52 }
53
54 self.rto = (self.srtt + (4.0 * self.rttvar) as u64).clamp(RTO_MIN, RTO_MAX);
55
56 self.srtt
57 }
58
59 /// get_rto simply returns the current RTO in msec.
get_rto(&self) -> u6460 pub(crate) fn get_rto(&self) -> u64 {
61 self.rto
62 }
63
64 /// reset resets the RTO variables to the initial values.
reset(&mut self)65 pub(crate) fn reset(&mut self) {
66 if self.no_update {
67 return;
68 }
69
70 self.srtt = 0;
71 self.rttvar = 0.0;
72 self.rto = RTO_INITIAL;
73 }
74
75 /// set RTO value for testing
set_rto(&mut self, rto: u64, no_update: bool)76 pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) {
77 self.rto = rto;
78 self.no_update = no_update;
79 }
80 }
81
calculate_next_timeout(rto: u64, n_rtos: usize) -> u6482 pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 {
83 // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
84 // E2) For the destination address for which the timer expires, set RTO
85 // <- RTO * 2 ("back off the timer"). The maximum value discussed
86 // in rule C7 above (RTO.max) may be used to provide an upper bound
87 // to this doubling operation.
88 if n_rtos < 31 {
89 std::cmp::min(rto << n_rtos, RTO_MAX)
90 } else {
91 RTO_MAX
92 }
93 }
94
95 /// rtxTimerObserver is the inteface to a timer observer.
96 /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
97 /// from within these callbacks.
98 #[async_trait]
99 pub(crate) trait RtxTimerObserver {
on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize)100 async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize);
on_retransmission_failure(&mut self, timer_id: RtxTimerId)101 async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId);
102 }
103
104 /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
105 #[derive(Default, Debug)]
106 pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> {
107 pub(crate) timeout_observer: Weak<Mutex<T>>,
108 pub(crate) id: RtxTimerId,
109 pub(crate) max_retrans: usize,
110 pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>,
111 }
112
113 impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> {
114 /// newRTXTimer creates a new retransmission timer.
115 /// if max_retrans is set to 0, it will keep retransmitting until stop() is called.
116 /// (it will never make on_retransmission_failure() callback.
new( timeout_observer: Weak<Mutex<T>>, id: RtxTimerId, max_retrans: usize, ) -> Self117 pub(crate) fn new(
118 timeout_observer: Weak<Mutex<T>>,
119 id: RtxTimerId,
120 max_retrans: usize,
121 ) -> Self {
122 RtxTimer {
123 timeout_observer,
124 id,
125 max_retrans,
126 close_tx: Arc::new(Mutex::new(None)),
127 }
128 }
129
130 /// start starts the timer.
start(&self, rto: u64) -> bool131 pub(crate) async fn start(&self, rto: u64) -> bool {
132 // Note: rto value is intentionally not capped by RTO.Min to allow
133 // fast timeout for the tests. Non-test code should pass in the
134 // rto generated by rtoManager get_rto() method which caps the
135 // value at RTO.Min or at RTO.Max.
136
137 // this timer is already closed
138 let mut close_rx = {
139 let mut close = self.close_tx.lock().await;
140 if close.is_some() {
141 return false;
142 }
143
144 let (close_tx, close_rx) = mpsc::channel(1);
145 *close = Some(close_tx);
146 close_rx
147 };
148
149 let id = self.id;
150 let max_retrans = self.max_retrans;
151 let close_tx = Arc::clone(&self.close_tx);
152 let timeout_observer = self.timeout_observer.clone();
153
154 tokio::spawn(async move {
155 let mut n_rtos = 0;
156
157 loop {
158 let interval = calculate_next_timeout(rto, n_rtos);
159 let timer = tokio::time::sleep(Duration::from_millis(interval));
160 tokio::pin!(timer);
161
162 tokio::select! {
163 _ = timer.as_mut() => {
164 n_rtos+=1;
165
166 let failure = {
167 if let Some(observer) = timeout_observer.upgrade(){
168 let mut observer = observer.lock().await;
169 if max_retrans == 0 || n_rtos <= max_retrans {
170 observer.on_retransmission_timeout(id, n_rtos).await;
171 false
172 } else {
173 observer.on_retransmission_failure(id).await;
174 true
175 }
176 }else{
177 true
178 }
179 };
180 if failure {
181 let mut close = close_tx.lock().await;
182 *close = None;
183 break;
184 }
185 }
186 _ = close_rx.recv() => break,
187 }
188 }
189 });
190
191 true
192 }
193
194 /// stop stops the timer.
stop(&self)195 pub(crate) async fn stop(&self) {
196 let mut close_tx = self.close_tx.lock().await;
197 close_tx.take();
198 }
199
200 /// isRunning tests if the timer is running.
201 /// Debug purpose only
is_running(&self) -> bool202 pub(crate) async fn is_running(&self) -> bool {
203 let close_tx = self.close_tx.lock().await;
204 close_tx.is_some()
205 }
206 }
207