1 use crate::association::RtxTimerId; 2 use async_trait::async_trait; 3 use std::sync::{Arc, Weak}; 4 use tokio::sync::{mpsc, Mutex}; 5 use tokio::time::Duration; 6 7 pub(crate) const RTO_INITIAL: u64 = 3000; // msec 8 pub(crate) const RTO_MIN: u64 = 1000; // msec 9 pub(crate) const RTO_MAX: u64 = 60000; // msec 10 pub(crate) const RTO_ALPHA: u64 = 1; 11 pub(crate) const RTO_BETA: u64 = 2; 12 pub(crate) const RTO_BASE: u64 = 8; 13 pub(crate) const MAX_INIT_RETRANS: usize = 8; 14 pub(crate) const PATH_MAX_RETRANS: usize = 5; 15 pub(crate) const NO_MAX_RETRANS: usize = 0; 16 17 /// rtoManager manages Rtx timeout values. 18 /// This is an implementation of RFC 4960 sec 6.3.1. 19 #[derive(Default, Debug)] 20 pub(crate) struct RtoManager { 21 pub(crate) srtt: u64, 22 pub(crate) rttvar: f64, 23 pub(crate) rto: u64, 24 pub(crate) no_update: bool, 25 } 26 27 impl RtoManager { 28 /// newRTOManager creates a new rtoManager. 29 pub(crate) fn new() -> Self { 30 RtoManager { 31 rto: RTO_INITIAL, 32 ..Default::default() 33 } 34 } 35 36 /// set_new_rtt takes a newly measured RTT then adjust the RTO in msec. 37 pub(crate) fn set_new_rtt(&mut self, rtt: u64) -> u64 { 38 if self.no_update { 39 return self.srtt; 40 } 41 42 if self.srtt == 0 { 43 // First measurement 44 self.srtt = rtt; 45 self.rttvar = rtt as f64 / 2.0; 46 } else { 47 // Subsequent rtt measurement 48 self.rttvar = ((RTO_BASE - RTO_BETA) as f64 * self.rttvar 49 + RTO_BETA as f64 * (self.srtt as i64 - rtt as i64).abs() as f64) 50 / RTO_BASE as f64; 51 self.srtt = ((RTO_BASE - RTO_ALPHA) * self.srtt + RTO_ALPHA * rtt) / RTO_BASE; 52 } 53 54 self.rto = std::cmp::min( 55 std::cmp::max(self.srtt + (4.0 * self.rttvar) as u64, RTO_MIN), 56 RTO_MAX, 57 ); 58 59 self.srtt 60 } 61 62 /// get_rto simply returns the current RTO in msec. 63 pub(crate) fn get_rto(&self) -> u64 { 64 self.rto 65 } 66 67 /// reset resets the RTO variables to the initial values. 68 pub(crate) fn reset(&mut self) { 69 if self.no_update { 70 return; 71 } 72 73 self.srtt = 0; 74 self.rttvar = 0.0; 75 self.rto = RTO_INITIAL; 76 } 77 78 /// set RTO value for testing 79 pub(crate) fn set_rto(&mut self, rto: u64, no_update: bool) { 80 self.rto = rto; 81 self.no_update = no_update; 82 } 83 } 84 85 pub(crate) fn calculate_next_timeout(rto: u64, n_rtos: usize) -> u64 { 86 // RFC 4096 sec 6.3.3. Handle T3-rtx Expiration 87 // E2) For the destination address for which the timer expires, set RTO 88 // <- RTO * 2 ("back off the timer"). The maximum value discussed 89 // in rule C7 above (RTO.max) may be used to provide an upper bound 90 // to this doubling operation. 91 if n_rtos < 31 { 92 std::cmp::min(rto << n_rtos, RTO_MAX) 93 } else { 94 RTO_MAX 95 } 96 } 97 98 /// rtxTimerObserver is the inteface to a timer observer. 99 /// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer 100 /// from within these callbacks. 101 #[async_trait] 102 pub(crate) trait RtxTimerObserver { 103 async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n: usize); 104 async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId); 105 } 106 107 /// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1 108 #[derive(Default, Debug)] 109 pub(crate) struct RtxTimer<T: 'static + RtxTimerObserver + Send> { 110 pub(crate) timeout_observer: Weak<Mutex<T>>, 111 pub(crate) id: RtxTimerId, 112 pub(crate) max_retrans: usize, 113 pub(crate) close_tx: Arc<Mutex<Option<mpsc::Sender<()>>>>, 114 } 115 116 impl<T: 'static + RtxTimerObserver + Send> RtxTimer<T> { 117 /// newRTXTimer creates a new retransmission timer. 118 /// if max_retrans is set to 0, it will keep retransmitting until stop() is called. 119 /// (it will never make on_retransmission_failure() callback. 120 pub(crate) fn new( 121 timeout_observer: Weak<Mutex<T>>, 122 id: RtxTimerId, 123 max_retrans: usize, 124 ) -> Self { 125 RtxTimer { 126 timeout_observer, 127 id, 128 max_retrans, 129 close_tx: Arc::new(Mutex::new(None)), 130 } 131 } 132 133 /// start starts the timer. 134 pub(crate) async fn start(&self, rto: u64) -> bool { 135 // Note: rto value is intentionally not capped by RTO.Min to allow 136 // fast timeout for the tests. Non-test code should pass in the 137 // rto generated by rtoManager get_rto() method which caps the 138 // value at RTO.Min or at RTO.Max. 139 140 // this timer is already closed 141 let mut close_rx = { 142 let mut close = self.close_tx.lock().await; 143 if close.is_some() { 144 return false; 145 } 146 147 let (close_tx, close_rx) = mpsc::channel(1); 148 *close = Some(close_tx); 149 close_rx 150 }; 151 152 let id = self.id; 153 let max_retrans = self.max_retrans; 154 let close_tx = Arc::clone(&self.close_tx); 155 let timeout_observer = self.timeout_observer.clone(); 156 157 tokio::spawn(async move { 158 let mut n_rtos = 0; 159 160 loop { 161 let interval = calculate_next_timeout(rto, n_rtos); 162 let timer = tokio::time::sleep(Duration::from_millis(interval)); 163 tokio::pin!(timer); 164 165 tokio::select! { 166 _ = timer.as_mut() => { 167 n_rtos+=1; 168 169 let failure = { 170 if let Some(observer) = timeout_observer.upgrade(){ 171 let mut observer = observer.lock().await; 172 if max_retrans == 0 || n_rtos <= max_retrans { 173 observer.on_retransmission_timeout(id, n_rtos).await; 174 false 175 } else { 176 observer.on_retransmission_failure(id).await; 177 true 178 } 179 }else{ 180 true 181 } 182 }; 183 if failure { 184 let mut close = close_tx.lock().await; 185 *close = None; 186 break; 187 } 188 } 189 _ = close_rx.recv() => break, 190 } 191 } 192 }); 193 194 true 195 } 196 197 /// stop stops the timer. 198 pub(crate) async fn stop(&self) { 199 let mut close_tx = self.close_tx.lock().await; 200 close_tx.take(); 201 } 202 203 /// isRunning tests if the timer is running. 204 /// Debug purpose only 205 pub(crate) async fn is_running(&self) -> bool { 206 let close_tx = self.close_tx.lock().await; 207 close_tx.is_some() 208 } 209 } 210