1ffe74184SMartin Algesten use crate::association::RtxTimerId;
2ffe74184SMartin Algesten use async_trait::async_trait;
3ffe74184SMartin Algesten use std::sync::{Arc, Weak};
4ffe74184SMartin Algesten use tokio::sync::{mpsc, Mutex};
5ffe74184SMartin Algesten use tokio::time::Duration;
6ffe74184SMartin Algesten
7ffe74184SMartin Algesten pub(crate) const RTO_INITIAL: u64 = 3000; // msec
8ffe74184SMartin Algesten pub(crate) const RTO_MIN: u64 = 1000; // msec
9ffe74184SMartin Algesten pub(crate) const RTO_MAX: u64 = 60000; // msec
10ffe74184SMartin Algesten pub(crate) const RTO_ALPHA: u64 = 1;
11ffe74184SMartin Algesten pub(crate) const RTO_BETA: u64 = 2;
12ffe74184SMartin Algesten pub(crate) const RTO_BASE: u64 = 8;
13ffe74184SMartin Algesten pub(crate) const MAX_INIT_RETRANS: usize = 8;
14ffe74184SMartin Algesten pub(crate) const PATH_MAX_RETRANS: usize = 5;
15ffe74184SMartin Algesten pub(crate) const NO_MAX_RETRANS: usize = 0;
16ffe74184SMartin Algesten
17ffe74184SMartin Algesten /// rtoManager manages Rtx timeout values.
18ffe74184SMartin Algesten /// This is an implementation of RFC 4960 sec 6.3.1.
19ffe74184SMartin Algesten #[derive(Default, Debug)]
20ffe74184SMartin Algesten pub(crate) struct RtoManager {
21ffe74184SMartin Algesten pub(crate) srtt: u64,
22ffe74184SMartin Algesten pub(crate) rttvar: f64,
23ffe74184SMartin Algesten pub(crate) rto: u64,
24ffe74184SMartin Algesten pub(crate) no_update: bool,
25ffe74184SMartin Algesten }
26ffe74184SMartin Algesten
27ffe74184SMartin Algesten impl RtoManager {
28ffe74184SMartin Algesten /// newRTOManager creates a new rtoManager.
new() -> Self29ffe74184SMartin Algesten pub(crate) fn new() -> Self {
30ffe74184SMartin Algesten RtoManager {
31ffe74184SMartin Algesten rto: RTO_INITIAL,
32ffe74184SMartin Algesten ..Default::default()
33ffe74184SMartin Algesten }
34ffe74184SMartin Algesten }
35ffe74184SMartin Algesten
36ffe74184SMartin Algesten /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec.
set_new_rtt(&mut self, rtt: u64) -> u6437ffe74184SMartin Algesten pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 {
38ffe74184SMartin Algesten if self.no_update {
39ffe74184SMartin Algesten return self.srtt;
40ffe74184SMartin Algesten }
41ffe74184SMartin Algesten
42ffe74184SMartin Algesten if self.srtt == 0 {
43ffe74184SMartin Algesten // First measurement
44ffe74184SMartin Algesten self.srtt = rtt;
45ffe74184SMartin Algesten self.rttvar = rtt as f64 / 2.0;
46ffe74184SMartin Algesten } else {
47ffe74184SMartin Algesten // Subsequent rtt measurement
48ffe74184SMartin Algesten self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar
49ffe74184SMartin Algesten + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64)
50ffe74184SMartin Algesten / RTO_BASE as f64;
51ffe74184SMartin Algesten self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE;
52ffe74184SMartin Algesten }
53ffe74184SMartin Algesten
54*630c46feSHugo Tunius self.rto = (self.srtt + (4.0 * self.rttvar) as u64).clamp(RTO_MIN, RTO_MAX);
55ffe74184SMartin Algesten
56ffe74184SMartin Algesten self.srtt
57ffe74184SMartin Algesten }
58ffe74184SMartin Algesten
59ffe74184SMartin Algesten /// get_rto simply returns the current RTO in msec.
get_rto(&self) -> u6460ffe74184SMartin Algesten pub(crate) fn get_rto(&self) -> u64 {
61ffe74184SMartin Algesten self.rto
62ffe74184SMartin Algesten }
63ffe74184SMartin Algesten
64ffe74184SMartin Algesten /// reset resets the RTO variables to the initial values.
reset(&mut self)65ffe74184SMartin Algesten pub(crate) fn reset(&mut self) {
66ffe74184SMartin Algesten if self.no_update {
67ffe74184SMartin Algesten return;
68ffe74184SMartin Algesten }
69ffe74184SMartin Algesten
70ffe74184SMartin Algesten self.srtt = 0;
71ffe74184SMartin Algesten self.rttvar = 0.0;
72ffe74184SMartin Algesten self.rto = RTO_INITIAL;
73ffe74184SMartin Algesten }
74ffe74184SMartin Algesten
75ffe74184SMartin Algesten /// set RTO value for testing
set_rto(&mut self, rto: u64, no_update: bool)76ffe74184SMartin Algesten pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) {
77ffe74184SMartin Algesten self.rto = rto;
78ffe74184SMartin Algesten self.no_update = no_update;
79ffe74184SMartin Algesten }
80ffe74184SMartin Algesten }
81ffe74184SMartin Algesten
calculate_next_timeout(rto: u64, n_rtos: usize) -> u6482ffe74184SMartin Algesten pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 {
83ffe74184SMartin Algesten // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration
84ffe74184SMartin Algesten // E2) For the destination address for which the timer expires, set RTO
85ffe74184SMartin Algesten // <- RTO * 2 ("back off the timer"). The maximum value discussed
86ffe74184SMartin Algesten // in rule C7 above (RTO.max) may be used to provide an upper bound
87ffe74184SMartin Algesten // to this doubling operation.
88ffe74184SMartin Algesten if n_rtos < 31 {
89ffe74184SMartin Algesten std::cmp::min(rto << n_rtos, RTO_MAX)
90ffe74184SMartin Algesten } else {
91ffe74184SMartin Algesten RTO_MAX
92ffe74184SMartin Algesten }
93ffe74184SMartin Algesten }
94ffe74184SMartin Algesten
95ffe74184SMartin Algesten /// rtxTimerObserver is the inteface to a timer observer.
96ffe74184SMartin Algesten /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
97ffe74184SMartin Algesten /// from within these callbacks.
98ffe74184SMartin Algesten #[async_trait]
99ffe74184SMartin Algesten pub(crate) trait RtxTimerObserver {
on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize)100ffe74184SMartin Algesten async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize);
on_retransmission_failure(&mut self, timer_id: RtxTimerId)101ffe74184SMartin Algesten async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId);
102ffe74184SMartin Algesten }
103ffe74184SMartin Algesten
104ffe74184SMartin Algesten /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
105ffe74184SMartin Algesten #[derive(Default, Debug)]
106ffe74184SMartin Algesten pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> {
107ffe74184SMartin Algesten pub(crate) timeout_observer: Weak<Mutex<T>>,
108ffe74184SMartin Algesten pub(crate) id: RtxTimerId,
109ffe74184SMartin Algesten pub(crate) max_retrans: usize,
110ffe74184SMartin Algesten pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>,
111ffe74184SMartin Algesten }
112ffe74184SMartin Algesten
113ffe74184SMartin Algesten impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> {
114ffe74184SMartin Algesten /// newRTXTimer creates a new retransmission timer.
115ffe74184SMartin Algesten /// if max_retrans is set to 0, it will keep retransmitting until stop() is called.
116ffe74184SMartin Algesten /// (it will never make on_retransmission_failure() callback.
new( timeout_observer: Weak<Mutex<T>>, id: RtxTimerId, max_retrans: usize, ) -> Self117ffe74184SMartin Algesten pub(crate) fn new(
118ffe74184SMartin Algesten timeout_observer: Weak<Mutex<T>>,
119ffe74184SMartin Algesten id: RtxTimerId,
120ffe74184SMartin Algesten max_retrans: usize,
121ffe74184SMartin Algesten ) -> Self {
122ffe74184SMartin Algesten RtxTimer {
123ffe74184SMartin Algesten timeout_observer,
124ffe74184SMartin Algesten id,
125ffe74184SMartin Algesten max_retrans,
126ffe74184SMartin Algesten close_tx: Arc::new(Mutex::new(None)),
127ffe74184SMartin Algesten }
128ffe74184SMartin Algesten }
129ffe74184SMartin Algesten
130ffe74184SMartin Algesten /// start starts the timer.
start(&self, rto: u64) -> bool131ffe74184SMartin Algesten pub(crate) async fn start(&self, rto: u64) -> bool {
132ffe74184SMartin Algesten // Note: rto value is intentionally not capped by RTO.Min to allow
133ffe74184SMartin Algesten // fast timeout for the tests. Non-test code should pass in the
134ffe74184SMartin Algesten // rto generated by rtoManager get_rto() method which caps the
135ffe74184SMartin Algesten // value at RTO.Min or at RTO.Max.
136ffe74184SMartin Algesten
137ffe74184SMartin Algesten // this timer is already closed
138ffe74184SMartin Algesten let mut close_rx = {
139ffe74184SMartin Algesten let mut close = self.close_tx.lock().await;
140ffe74184SMartin Algesten if close.is_some() {
141ffe74184SMartin Algesten return false;
142ffe74184SMartin Algesten }
143ffe74184SMartin Algesten
144ffe74184SMartin Algesten let (close_tx, close_rx) = mpsc::channel(1);
145ffe74184SMartin Algesten *close = Some(close_tx);
146ffe74184SMartin Algesten close_rx
147ffe74184SMartin Algesten };
148ffe74184SMartin Algesten
149ffe74184SMartin Algesten let id = self.id;
150ffe74184SMartin Algesten let max_retrans = self.max_retrans;
151ffe74184SMartin Algesten let close_tx = Arc::clone(&self.close_tx);
152ffe74184SMartin Algesten let timeout_observer = self.timeout_observer.clone();
153ffe74184SMartin Algesten
154ffe74184SMartin Algesten tokio::spawn(async move {
155ffe74184SMartin Algesten let mut n_rtos = 0;
156ffe74184SMartin Algesten
157ffe74184SMartin Algesten loop {
158ffe74184SMartin Algesten let interval = calculate_next_timeout(rto, n_rtos);
159ffe74184SMartin Algesten let timer = tokio::time::sleep(Duration::from_millis(interval));
160ffe74184SMartin Algesten tokio::pin!(timer);
161ffe74184SMartin Algesten
162ffe74184SMartin Algesten tokio::select! {
163ffe74184SMartin Algesten _ = timer.as_mut() => {
164ffe74184SMartin Algesten n_rtos+=1;
165ffe74184SMartin Algesten
166ffe74184SMartin Algesten let failure = {
167ffe74184SMartin Algesten if let Some(observer) = timeout_observer.upgrade(){
168ffe74184SMartin Algesten let mut observer = observer.lock().await;
169ffe74184SMartin Algesten if max_retrans == 0 || n_rtos <= max_retrans {
170ffe74184SMartin Algesten observer.on_retransmission_timeout(id, n_rtos).await;
171ffe74184SMartin Algesten false
172ffe74184SMartin Algesten } else {
173ffe74184SMartin Algesten observer.on_retransmission_failure(id).await;
174ffe74184SMartin Algesten true
175ffe74184SMartin Algesten }
176ffe74184SMartin Algesten }else{
177ffe74184SMartin Algesten true
178ffe74184SMartin Algesten }
179ffe74184SMartin Algesten };
180ffe74184SMartin Algesten if failure {
181ffe74184SMartin Algesten let mut close = close_tx.lock().await;
182ffe74184SMartin Algesten *close = None;
183ffe74184SMartin Algesten break;
184ffe74184SMartin Algesten }
185ffe74184SMartin Algesten }
186ffe74184SMartin Algesten _ = close_rx.recv() => break,
187ffe74184SMartin Algesten }
188ffe74184SMartin Algesten }
189ffe74184SMartin Algesten });
190ffe74184SMartin Algesten
191ffe74184SMartin Algesten true
192ffe74184SMartin Algesten }
193ffe74184SMartin Algesten
194ffe74184SMartin Algesten /// stop stops the timer.
stop(&self)195ffe74184SMartin Algesten pub(crate) async fn stop(&self) {
196ffe74184SMartin Algesten let mut close_tx = self.close_tx.lock().await;
197ffe74184SMartin Algesten close_tx.take();
198ffe74184SMartin Algesten }
199ffe74184SMartin Algesten
200ffe74184SMartin Algesten /// isRunning tests if the timer is running.
201ffe74184SMartin Algesten /// Debug purpose only
is_running(&self) -> bool202ffe74184SMartin Algesten pub(crate) async fn is_running(&self) -> bool {
203ffe74184SMartin Algesten let close_tx = self.close_tx.lock().await;
204ffe74184SMartin Algesten close_tx.is_some()
205ffe74184SMartin Algesten }
206ffe74184SMartin Algesten }
207