1 // Silence warning on `for i in 0..vec.len() { … }`: 2 #![allow(clippy::needless_range_loop)] 3 // Silence warning on `..Default::default()` with no effect: 4 #![allow(clippy::needless_update)] 5 6 use async_trait::async_trait; 7 use tokio::sync::Mutex; 8 use tokio::time::{sleep, Duration}; 9 10 use std::sync::atomic::{AtomicU32, Ordering}; 11 use std::sync::Arc; 12 13 /////////////////////////////////////////////////////////////////// 14 //ack_timer_test 15 /////////////////////////////////////////////////////////////////// 16 use super::ack_timer::*; 17 18 mod test_ack_timer { 19 use crate::error::Result; 20 21 use super::*; 22 23 struct TestAckTimerObserver { 24 ncbs: Arc<AtomicU32>, 25 } 26 27 #[async_trait] 28 impl AckTimerObserver for TestAckTimerObserver { on_ack_timeout(&mut self)29 async fn on_ack_timeout(&mut self) { 30 log::trace!("ack timed out"); 31 self.ncbs.fetch_add(1, Ordering::SeqCst); 32 } 33 } 34 35 #[tokio::test] test_ack_timer_start_and_stop() -> Result<()>36 async fn test_ack_timer_start_and_stop() -> Result<()> { 37 let ncbs = Arc::new(AtomicU32::new(0)); 38 let obs = Arc::new(Mutex::new(TestAckTimerObserver { ncbs: ncbs.clone() })); 39 40 let mut rt = AckTimer::new(Arc::downgrade(&obs), ACK_INTERVAL); 41 42 // should start ok 43 let ok = rt.start(); 44 assert!(ok, "start() should succeed"); 45 assert!(rt.is_running(), "should be running"); 46 47 // stop immedidately 48 rt.stop(); 49 assert!(!rt.is_running(), "should not be running"); 50 51 // Sleep more than 200msec of interval to test if it never times out 52 sleep(ACK_INTERVAL + Duration::from_millis(50)).await; 53 54 assert_eq!( 55 ncbs.load(Ordering::SeqCst), 56 0, 57 "should not be timed out (actual: {})", 58 ncbs.load(Ordering::SeqCst) 59 ); 60 61 // can start again 62 let ok = rt.start(); 63 assert!(ok, "start() should succeed again"); 64 assert!(rt.is_running(), "should be running"); 65 66 // should close ok 67 rt.stop(); 68 assert!(!rt.is_running(), "should not be running"); 69 70 Ok(()) 71 } 72 } 73 74 /////////////////////////////////////////////////////////////////// 75 //rtx_timer_test 76 /////////////////////////////////////////////////////////////////// 77 use super::rtx_timer::*; 78 79 mod test_rto_manager { 80 use crate::error::Result; 81 82 use super::*; 83 84 #[tokio::test] test_rto_manager_initial_values() -> Result<()>85 async fn test_rto_manager_initial_values() -> Result<()> { 86 let m = RtoManager::new(); 87 assert_eq!(m.rto, RTO_INITIAL, "should be rtoInitial"); 88 assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial"); 89 assert_eq!(m.srtt, 0, "should be 0"); 90 assert_eq!(m.rttvar, 0.0, "should be 0.0"); 91 92 Ok(()) 93 } 94 95 #[tokio::test] test_rto_manager_rto_calculation_small_rtt() -> Result<()>96 async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> { 97 let mut m = RtoManager::new(); 98 let exp = vec![ 99 1800, 1500, 1275, 1106, 1000, // capped at RTO.Min 100 ]; 101 102 for i in 0..5 { 103 m.set_new_rtt(600); 104 let rto = m.get_rto(); 105 assert_eq!(rto, exp[i], "should be equal: {i}"); 106 } 107 108 Ok(()) 109 } 110 111 #[tokio::test] test_rto_manager_rto_calculation_large_rtt() -> Result<()>112 async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> { 113 let mut m = RtoManager::new(); 114 let exp = vec![ 115 60000, // capped at RTO.Max 116 60000, // capped at RTO.Max 117 60000, // capped at RTO.Max 118 55312, 48984, 119 ]; 120 121 for i in 0..5 { 122 m.set_new_rtt(30000); 123 let rto = m.get_rto(); 124 assert_eq!(rto, exp[i], "should be equal: {i}"); 125 } 126 127 Ok(()) 128 } 129 130 #[tokio::test] test_rto_manager_calculate_next_timeout() -> Result<()>131 async fn test_rto_manager_calculate_next_timeout() -> Result<()> { 132 let rto = calculate_next_timeout(1, 0); 133 assert_eq!(rto, 1, "should match"); 134 let rto = calculate_next_timeout(1, 1); 135 assert_eq!(rto, 2, "should match"); 136 let rto = calculate_next_timeout(1, 2); 137 assert_eq!(rto, 4, "should match"); 138 let rto = calculate_next_timeout(1, 30); 139 assert_eq!(rto, 60000, "should match"); 140 let rto = calculate_next_timeout(1, 63); 141 assert_eq!(rto, 60000, "should match"); 142 let rto = calculate_next_timeout(1, 64); 143 assert_eq!(rto, 60000, "should match"); 144 145 Ok(()) 146 } 147 148 #[tokio::test] test_rto_manager_reset() -> Result<()>149 async fn test_rto_manager_reset() -> Result<()> { 150 let mut m = RtoManager::new(); 151 for _ in 0..10 { 152 m.set_new_rtt(200); 153 } 154 155 m.reset(); 156 assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial"); 157 assert_eq!(m.srtt, 0, "should be 0"); 158 assert_eq!(m.rttvar, 0.0, "should be 0"); 159 160 Ok(()) 161 } 162 } 163 164 //TODO: remove this conditional test 165 #[cfg(not(any(target_os = "macos", target_os = "windows")))] 166 mod test_rtx_timer { 167 use super::*; 168 use crate::association::RtxTimerId; 169 use crate::error::Result; 170 171 use std::time::SystemTime; 172 use tokio::sync::mpsc; 173 174 struct TestTimerObserver { 175 ncbs: Arc<AtomicU32>, 176 timer_id: RtxTimerId, 177 done_tx: Option<mpsc::Sender<SystemTime>>, 178 max_rtos: usize, 179 } 180 181 impl Default for TestTimerObserver { default() -> Self182 fn default() -> Self { 183 TestTimerObserver { 184 ncbs: Arc::new(AtomicU32::new(0)), 185 timer_id: RtxTimerId::T1Init, 186 done_tx: None, 187 max_rtos: 0, 188 } 189 } 190 } 191 192 #[async_trait] 193 impl RtxTimerObserver for TestTimerObserver { on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n_rtos: usize)194 async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n_rtos: usize) { 195 self.ncbs.fetch_add(1, Ordering::SeqCst); 196 // 30 : 1 (30) 197 // 60 : 2 (90) 198 // 120: 3 (210) 199 // 240: 4 (550) <== expected in 650 msec 200 assert_eq!(self.timer_id, timer_id, "unexpected timer ID: {timer_id}"); 201 if (self.max_rtos > 0 && n_rtos == self.max_rtos) || self.max_rtos == usize::MAX { 202 if let Some(done) = &self.done_tx { 203 let elapsed = SystemTime::now(); 204 let _ = done.send(elapsed).await; 205 } 206 } 207 } 208 on_retransmission_failure(&mut self, timer_id: RtxTimerId)209 async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId) { 210 if self.max_rtos == 0 { 211 if let Some(done) = &self.done_tx { 212 assert_eq!(self.timer_id, timer_id, "unexpted timer ID: {timer_id}"); 213 let elapsed = SystemTime::now(); 214 //t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed) 215 let _ = done.send(elapsed).await; 216 } 217 } else { 218 panic!("timer should not fail"); 219 } 220 } 221 } 222 223 #[tokio::test] test_rtx_timer_callback_interval() -> Result<()>224 async fn test_rtx_timer_callback_interval() -> Result<()> { 225 let timer_id = RtxTimerId::T1Init; 226 let ncbs = Arc::new(AtomicU32::new(0)); 227 let obs = Arc::new(Mutex::new(TestTimerObserver { 228 ncbs: ncbs.clone(), 229 timer_id, 230 ..Default::default() 231 })); 232 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 233 234 assert!(!rt.is_running().await, "should not be running"); 235 236 // since := time.Now() 237 let ok = rt.start(30).await; 238 assert!(ok, "should be true"); 239 assert!(rt.is_running().await, "should be running"); 240 241 sleep(Duration::from_millis(650)).await; 242 rt.stop().await; 243 assert!(!rt.is_running().await, "should not be running"); 244 245 assert_eq!(ncbs.load(Ordering::SeqCst), 4, "should be called 4 times"); 246 247 Ok(()) 248 } 249 250 #[tokio::test] test_rtx_timer_last_start_wins() -> Result<()>251 async fn test_rtx_timer_last_start_wins() -> Result<()> { 252 let timer_id = RtxTimerId::T3RTX; 253 let ncbs = Arc::new(AtomicU32::new(0)); 254 let obs = Arc::new(Mutex::new(TestTimerObserver { 255 ncbs: ncbs.clone(), 256 timer_id, 257 ..Default::default() 258 })); 259 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 260 261 let interval = 30; 262 let ok = rt.start(interval).await; 263 assert!(ok, "should be accepted"); 264 let ok = rt.start(interval * 99).await; // should ignored 265 assert!(!ok, "should be ignored"); 266 let ok = rt.start(interval * 99).await; // should ignored 267 assert!(!ok, "should be ignored"); 268 269 sleep(Duration::from_millis((interval * 3) / 2)).await; 270 rt.stop().await; 271 272 assert!(!rt.is_running().await, "should not be running"); 273 assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once"); 274 275 Ok(()) 276 } 277 278 #[tokio::test] test_rtx_timer_stop_right_after_start() -> Result<()>279 async fn test_rtx_timer_stop_right_after_start() -> Result<()> { 280 let timer_id = RtxTimerId::T3RTX; 281 let ncbs = Arc::new(AtomicU32::new(0)); 282 let obs = Arc::new(Mutex::new(TestTimerObserver { 283 ncbs: ncbs.clone(), 284 timer_id, 285 ..Default::default() 286 })); 287 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 288 289 let interval = 30; 290 let ok = rt.start(interval).await; 291 assert!(ok, "should be accepted"); 292 rt.stop().await; 293 294 sleep(Duration::from_millis((interval * 3) / 2)).await; 295 rt.stop().await; 296 297 assert!(!rt.is_running().await, "should not be running"); 298 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made"); 299 300 Ok(()) 301 } 302 303 #[tokio::test] test_rtx_timer_start_stop_then_start() -> Result<()>304 async fn test_rtx_timer_start_stop_then_start() -> Result<()> { 305 let timer_id = RtxTimerId::T1Cookie; 306 let ncbs = Arc::new(AtomicU32::new(0)); 307 let obs = Arc::new(Mutex::new(TestTimerObserver { 308 ncbs: ncbs.clone(), 309 timer_id, 310 ..Default::default() 311 })); 312 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 313 314 let interval = 30; 315 let ok = rt.start(interval).await; 316 assert!(ok, "should be accepted"); 317 rt.stop().await; 318 assert!(!rt.is_running().await, "should NOT be running"); 319 let ok = rt.start(interval).await; 320 assert!(ok, "should be accepted"); 321 assert!(rt.is_running().await, "should be running"); 322 323 sleep(Duration::from_millis((interval * 3) / 2)).await; 324 rt.stop().await; 325 326 assert!(!rt.is_running().await, "should NOT be running"); 327 assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once"); 328 329 Ok(()) 330 } 331 332 #[tokio::test] test_rtx_timer_start_and_stop_in_atight_loop() -> Result<()>333 async fn test_rtx_timer_start_and_stop_in_atight_loop() -> Result<()> { 334 let timer_id = RtxTimerId::T2Shutdown; 335 let ncbs = Arc::new(AtomicU32::new(0)); 336 let obs = Arc::new(Mutex::new(TestTimerObserver { 337 ncbs: ncbs.clone(), 338 timer_id, 339 ..Default::default() 340 })); 341 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 342 343 for _ in 0..1000 { 344 let ok = rt.start(30).await; 345 assert!(ok, "should be accepted"); 346 assert!(rt.is_running().await, "should be running"); 347 rt.stop().await; 348 assert!(!rt.is_running().await, "should NOT be running"); 349 } 350 351 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made"); 352 353 Ok(()) 354 } 355 356 #[tokio::test] test_rtx_timer_should_stop_after_rtx_failure() -> Result<()>357 async fn test_rtx_timer_should_stop_after_rtx_failure() -> Result<()> { 358 let (done_tx, mut done_rx) = mpsc::channel(1); 359 360 let timer_id = RtxTimerId::Reconfig; 361 let ncbs = Arc::new(AtomicU32::new(0)); 362 let obs = Arc::new(Mutex::new(TestTimerObserver { 363 ncbs: ncbs.clone(), 364 timer_id, 365 done_tx: Some(done_tx), 366 ..Default::default() 367 })); 368 369 let since = SystemTime::now(); 370 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 371 372 // RTO(msec) Total(msec) 373 // 10 10 1st RTO 374 // 20 30 2nd RTO 375 // 40 70 3rd RTO 376 // 80 150 4th RTO 377 // 160 310 5th RTO (== Path.Max.Retrans) 378 // 320 630 Failure 379 380 let interval = 10; 381 let ok = rt.start(interval).await; 382 assert!(ok, "should be accepted"); 383 assert!(rt.is_running().await, "should be running"); 384 385 let elapsed = done_rx.recv().await; 386 387 assert!(!rt.is_running().await, "should not be running"); 388 assert_eq!(ncbs.load(Ordering::SeqCst), 5, "should be called 5 times"); 389 390 if let Some(elapsed) = elapsed { 391 let diff = elapsed.duration_since(since).unwrap(); 392 assert!( 393 diff > Duration::from_millis(600), 394 "must have taken more than 600 msec" 395 ); 396 assert!( 397 diff < Duration::from_millis(700), 398 "must fail in less than 700 msec" 399 ); 400 } 401 402 Ok(()) 403 } 404 405 #[tokio::test] test_rtx_timer_should_not_stop_if_max_retrans_is_zero() -> Result<()>406 async fn test_rtx_timer_should_not_stop_if_max_retrans_is_zero() -> Result<()> { 407 let (done_tx, mut done_rx) = mpsc::channel(1); 408 409 let timer_id = RtxTimerId::Reconfig; 410 let max_rtos = 6; 411 let ncbs = Arc::new(AtomicU32::new(0)); 412 let obs = Arc::new(Mutex::new(TestTimerObserver { 413 ncbs: ncbs.clone(), 414 timer_id, 415 done_tx: Some(done_tx), 416 max_rtos, 417 ..Default::default() 418 })); 419 420 let since = SystemTime::now(); 421 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, 0); 422 423 // RTO(msec) Total(msec) 424 // 10 10 1st RTO 425 // 20 30 2nd RTO 426 // 40 70 3rd RTO 427 // 80 150 4th RTO 428 // 160 310 5th RTO 429 // 320 630 6th RTO => exit test (timer should still be running) 430 431 let interval = 10; 432 let ok = rt.start(interval).await; 433 assert!(ok, "should be accepted"); 434 assert!(rt.is_running().await, "should be running"); 435 436 let elapsed = done_rx.recv().await; 437 438 assert!(rt.is_running().await, "should still be running"); 439 assert_eq!(ncbs.load(Ordering::SeqCst), 6, "should be called 6 times"); 440 441 if let Some(elapsed) = elapsed { 442 let diff = elapsed.duration_since(since).unwrap(); 443 assert!( 444 diff > Duration::from_millis(600), 445 "must have taken more than 600 msec" 446 ); 447 assert!( 448 diff < Duration::from_millis(700), 449 "must fail in less than 700 msec" 450 ); 451 } 452 453 rt.stop().await; 454 455 Ok(()) 456 } 457 458 #[tokio::test] test_rtx_timer_stop_timer_that_is_not_running_is_noop() -> Result<()>459 async fn test_rtx_timer_stop_timer_that_is_not_running_is_noop() -> Result<()> { 460 let (done_tx, mut done_rx) = mpsc::channel(1); 461 462 let timer_id = RtxTimerId::Reconfig; 463 let obs = Arc::new(Mutex::new(TestTimerObserver { 464 timer_id, 465 done_tx: Some(done_tx), 466 max_rtos: usize::MAX, 467 ..Default::default() 468 })); 469 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 470 471 for _ in 0..10 { 472 rt.stop().await; 473 } 474 475 let ok = rt.start(20).await; 476 assert!(ok, "should be accepted"); 477 assert!(rt.is_running().await, "must be running"); 478 479 let _ = done_rx.recv().await; 480 rt.stop().await; 481 assert!(!rt.is_running().await, "must be false"); 482 483 Ok(()) 484 } 485 486 #[tokio::test] test_rtx_timer_closed_timer_wont_start() -> Result<()>487 async fn test_rtx_timer_closed_timer_wont_start() -> Result<()> { 488 let timer_id = RtxTimerId::Reconfig; 489 let ncbs = Arc::new(AtomicU32::new(0)); 490 let obs = Arc::new(Mutex::new(TestTimerObserver { 491 ncbs: ncbs.clone(), 492 timer_id, 493 ..Default::default() 494 })); 495 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 496 497 let ok = rt.start(20).await; 498 assert!(ok, "should be accepted"); 499 assert!(rt.is_running().await, "must be running"); 500 501 rt.stop().await; 502 assert!(!rt.is_running().await, "must be false"); 503 504 //let ok = rt.start(obs.clone(), 20).await; 505 //assert!(!ok, "should not start"); 506 assert!(!rt.is_running().await, "must not be running"); 507 508 sleep(Duration::from_millis(100)).await; 509 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "RTO should not occur"); 510 511 Ok(()) 512 } 513 } 514