1 use crate::association::RtxTimerId; 2 use async_trait::async_trait; 3 use std::sync::{Arc, Weak}; 4 use tokio::sync::{mpsc, Mutex}; 5 use tokio::time::Duration; 6 7 pub(crate) const RTO_INITIAL: u64 = 3000; // msec 8 pub(crate) const RTO_MIN: u64 = 1000; // msec 9 pub(crate) const RTO_MAX: u64 = 60000; // msec 10 pub(crate) const RTO_ALPHA: u64 = 1; 11 pub(crate) const RTO_BETA: u64 = 2; 12 pub(crate) const RTO_BASE: u64 = 8; 13 pub(crate) const MAX_INIT_RETRANS: usize = 8; 14 pub(crate) const PATH_MAX_RETRANS: usize = 5; 15 pub(crate) const NO_MAX_RETRANS: usize = 0; 16 17 /// rtoManager manages Rtx timeout values. 18 /// This is an implementation of RFC 4960 sec 6.3.1. 19 #[derive(Default, Debug)] 20 pub(crate) struct RtoManager { 21 pub(crate) srtt: u64, 22 pub(crate) rttvar: f64, 23 pub(crate) rto: u64, 24 pub(crate) no_update: bool, 25 } 26 27 impl RtoManager { 28 /// newRTOManager creates a new rtoManager. 29 pub(crate) fn new() -> Self { 30 RtoManager { 31 rto: RTO_INITIAL, 32 ..Default::default() 33 } 34 } 35 36 /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec. 37 pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 { 38 if self.no_update { 39 return self.srtt; 40 } 41 42 if self.srtt == 0 { 43 // First measurement 44 self.srtt = rtt; 45 self.rttvar = rtt as f64 / 2.0; 46 } else { 47 // Subsequent rtt measurement 48 self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar 49 + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64) 50 / RTO_BASE as f64; 51 self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE; 52 } 53 54 self.rto = (self.srtt + (4.0 * self.rttvar) as u64).clamp(RTO_MIN, RTO_MAX); 55 56 self.srtt 57 } 58 59 /// get_rto simply returns the current RTO in msec. 60 pub(crate) fn get_rto(&self) -> u64 { 61 self.rto 62 } 63 64 /// reset resets the RTO variables to the initial values. 65 pub(crate) fn reset(&mut self) { 66 if self.no_update { 67 return; 68 } 69 70 self.srtt = 0; 71 self.rttvar = 0.0; 72 self.rto = RTO_INITIAL; 73 } 74 75 /// set RTO value for testing 76 pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) { 77 self.rto = rto; 78 self.no_update = no_update; 79 } 80 } 81 82 pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 { 83 // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration 84 // E2) For the destination address for which the timer expires, set RTO 85 // <- RTO * 2 ("back off the timer"). The maximum value discussed 86 // in rule C7 above (RTO.max) may be used to provide an upper bound 87 // to this doubling operation. 88 if n_rtos < 31 { 89 std::cmp::min(rto << n_rtos, RTO_MAX) 90 } else { 91 RTO_MAX 92 } 93 } 94 95 /// rtxTimerObserver is the inteface to a timer observer. 96 /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer 97 /// from within these callbacks. 98 #[async_trait] 99 pub(crate) trait RtxTimerObserver { 100 async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize); 101 async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId); 102 } 103 104 /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 105 #[derive(Default, Debug)] 106 pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> { 107 pub(crate) timeout_observer: Weak<Mutex<T>>, 108 pub(crate) id: RtxTimerId, 109 pub(crate) max_retrans: usize, 110 pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>, 111 } 112 113 impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> { 114 /// newRTXTimer creates a new retransmission timer. 115 /// if max_retrans is set to 0, it will keep retransmitting until stop() is called. 116 /// (it will never make on_retransmission_failure() callback. 117 pub(crate) fn new( 118 timeout_observer: Weak<Mutex<T>>, 119 id: RtxTimerId, 120 max_retrans: usize, 121 ) -> Self { 122 RtxTimer { 123 timeout_observer, 124 id, 125 max_retrans, 126 close_tx: Arc::new(Mutex::new(None)), 127 } 128 } 129 130 /// start starts the timer. 131 pub(crate) async fn start(&self, rto: u64) -> bool { 132 // Note: rto value is intentionally not capped by RTO.Min to allow 133 // fast timeout for the tests. Non-test code should pass in the 134 // rto generated by rtoManager get_rto() method which caps the 135 // value at RTO.Min or at RTO.Max. 136 137 // this timer is already closed 138 let mut close_rx = { 139 let mut close = self.close_tx.lock().await; 140 if close.is_some() { 141 return false; 142 } 143 144 let (close_tx, close_rx) = mpsc::channel(1); 145 *close = Some(close_tx); 146 close_rx 147 }; 148 149 let id = self.id; 150 let max_retrans = self.max_retrans; 151 let close_tx = Arc::clone(&self.close_tx); 152 let timeout_observer = self.timeout_observer.clone(); 153 154 tokio::spawn(async move { 155 let mut n_rtos = 0; 156 157 loop { 158 let interval = calculate_next_timeout(rto, n_rtos); 159 let timer = tokio::time::sleep(Duration::from_millis(interval)); 160 tokio::pin!(timer); 161 162 tokio::select! { 163 _ = timer.as_mut() => { 164 n_rtos+=1; 165 166 let failure = { 167 if let Some(observer) = timeout_observer.upgrade(){ 168 let mut observer = observer.lock().await; 169 if max_retrans == 0 || n_rtos <= max_retrans { 170 observer.on_retransmission_timeout(id, n_rtos).await; 171 false 172 } else { 173 observer.on_retransmission_failure(id).await; 174 true 175 } 176 }else{ 177 true 178 } 179 }; 180 if failure { 181 let mut close = close_tx.lock().await; 182 *close = None; 183 break; 184 } 185 } 186 _ = close_rx.recv() => break, 187 } 188 } 189 }); 190 191 true 192 } 193 194 /// stop stops the timer. 195 pub(crate) async fn stop(&self) { 196 let mut close_tx = self.close_tx.lock().await; 197 close_tx.take(); 198 } 199 200 /// isRunning tests if the timer is running. 201 /// Debug purpose only 202 pub(crate) async fn is_running(&self) -> bool { 203 let close_tx = self.close_tx.lock().await; 204 close_tx.is_some() 205 } 206 } 207