1 use async_trait::async_trait; 2 use tokio::sync::Mutex; 3 use tokio::time::{sleep, Duration}; 4 5 use std::sync::atomic::{AtomicU32, Ordering}; 6 use std::sync::Arc; 7 8 /////////////////////////////////////////////////////////////////// 9 //ack_timer_test 10 /////////////////////////////////////////////////////////////////// 11 use super::ack_timer::*; 12 13 mod test_ack_timer { 14 use crate::error::Result; 15 16 use super::*; 17 18 struct TestAckTimerObserver { 19 ncbs: Arc<AtomicU32>, 20 } 21 22 #[async_trait] 23 impl AckTimerObserver for TestAckTimerObserver { 24 async fn on_ack_timeout(&mut self) { 25 log::trace!("ack timed out"); 26 self.ncbs.fetch_add(1, Ordering::SeqCst); 27 } 28 } 29 30 #[tokio::test] 31 async fn test_ack_timer_start_and_stop() -> Result<()> { 32 let ncbs = Arc::new(AtomicU32::new(0)); 33 let obs = Arc::new(Mutex::new(TestAckTimerObserver { ncbs: ncbs.clone() })); 34 35 let mut rt = AckTimer::new(Arc::downgrade(&obs), ACK_INTERVAL); 36 37 // should start ok 38 let ok = rt.start(); 39 assert!(ok, "start() should succeed"); 40 assert!(rt.is_running(), "should be running"); 41 42 // stop immedidately 43 rt.stop(); 44 assert!(!rt.is_running(), "should not be running"); 45 46 // Sleep more than 200msec of interval to test if it never times out 47 sleep(ACK_INTERVAL + Duration::from_millis(50)).await; 48 49 assert_eq!( 50 ncbs.load(Ordering::SeqCst), 51 0, 52 "should not be timed out (actual: {})", 53 ncbs.load(Ordering::SeqCst) 54 ); 55 56 // can start again 57 let ok = rt.start(); 58 assert!(ok, "start() should succeed again"); 59 assert!(rt.is_running(), "should be running"); 60 61 // should close ok 62 rt.stop(); 63 assert!(!rt.is_running(), "should not be running"); 64 65 Ok(()) 66 } 67 } 68 69 /////////////////////////////////////////////////////////////////// 70 //rtx_timer_test 71 /////////////////////////////////////////////////////////////////// 72 use super::rtx_timer::*; 73 74 mod test_rto_manager { 75 use crate::error::Result; 76 77 use super::*; 78 79 #[tokio::test] 80 async fn test_rto_manager_initial_values() -> Result<()> { 81 let m = RtoManager::new(); 82 assert_eq!(m.rto, RTO_INITIAL, "should be rtoInitial"); 83 assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial"); 84 assert_eq!(m.srtt, 0, "should be 0"); 85 assert_eq!(m.rttvar, 0.0, "should be 0.0"); 86 87 Ok(()) 88 } 89 90 #[tokio::test] 91 async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> { 92 let mut m = RtoManager::new(); 93 let exp = vec![ 94 1800, 1500, 1275, 1106, 1000, // capped at RTO.Min 95 ]; 96 97 for i in 0..5 { 98 m.set_new_rtt(600); 99 let rto = m.get_rto(); 100 assert_eq!(rto, exp[i], "should be equal: {}", i); 101 } 102 103 Ok(()) 104 } 105 106 #[tokio::test] 107 async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> { 108 let mut m = RtoManager::new(); 109 let exp = vec![ 110 60000, // capped at RTO.Max 111 60000, // capped at RTO.Max 112 60000, // capped at RTO.Max 113 55312, 48984, 114 ]; 115 116 for i in 0..5 { 117 m.set_new_rtt(30000); 118 let rto = m.get_rto(); 119 assert_eq!(rto, exp[i], "should be equal: {}", i); 120 } 121 122 Ok(()) 123 } 124 125 #[tokio::test] 126 async fn test_rto_manager_calculate_next_timeout() -> Result<()> { 127 let rto = calculate_next_timeout(1, 0); 128 assert_eq!(rto, 1, "should match"); 129 let rto = calculate_next_timeout(1, 1); 130 assert_eq!(rto, 2, "should match"); 131 let rto = calculate_next_timeout(1, 2); 132 assert_eq!(rto, 4, "should match"); 133 let rto = calculate_next_timeout(1, 30); 134 assert_eq!(rto, 60000, "should match"); 135 let rto = calculate_next_timeout(1, 63); 136 assert_eq!(rto, 60000, "should match"); 137 let rto = calculate_next_timeout(1, 64); 138 assert_eq!(rto, 60000, "should match"); 139 140 Ok(()) 141 } 142 143 #[tokio::test] 144 async fn test_rto_manager_reset() -> Result<()> { 145 let mut m = RtoManager::new(); 146 for _ in 0..10 { 147 m.set_new_rtt(200); 148 } 149 150 m.reset(); 151 assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial"); 152 assert_eq!(m.srtt, 0, "should be 0"); 153 assert_eq!(m.rttvar, 0.0, "should be 0"); 154 155 Ok(()) 156 } 157 } 158 159 //TODO: remove this conditional test 160 #[cfg(not(any(target_os = "macos", target_os = "windows")))] 161 mod test_rtx_timer { 162 use super::*; 163 use crate::association::RtxTimerId; 164 use crate::error::Result; 165 166 use std::time::SystemTime; 167 use tokio::sync::mpsc; 168 169 struct TestTimerObserver { 170 ncbs: Arc<AtomicU32>, 171 timer_id: RtxTimerId, 172 done_tx: Option<mpsc::Sender<SystemTime>>, 173 max_rtos: usize, 174 } 175 176 impl Default for TestTimerObserver { 177 fn default() -> Self { 178 TestTimerObserver { 179 ncbs: Arc::new(AtomicU32::new(0)), 180 timer_id: RtxTimerId::T1Init, 181 done_tx: None, 182 max_rtos: 0, 183 } 184 } 185 } 186 187 #[async_trait] 188 impl RtxTimerObserver for TestTimerObserver { 189 async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n_rtos: usize) { 190 self.ncbs.fetch_add(1, Ordering::SeqCst); 191 // 30 : 1 (30) 192 // 60 : 2 (90) 193 // 120: 3 (210) 194 // 240: 4 (550) <== expected in 650 msec 195 assert_eq!(self.timer_id, timer_id, "unexpected timer ID: {}", timer_id); 196 if (self.max_rtos > 0 && n_rtos == self.max_rtos) || self.max_rtos == usize::MAX { 197 if let Some(done) = &self.done_tx { 198 let elapsed = SystemTime::now(); 199 let _ = done.send(elapsed).await; 200 } 201 } 202 } 203 204 async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId) { 205 if self.max_rtos == 0 { 206 if let Some(done) = &self.done_tx { 207 assert_eq!(self.timer_id, timer_id, "unexpted timer ID: {}", timer_id); 208 let elapsed = SystemTime::now(); 209 //t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed) 210 let _ = done.send(elapsed).await; 211 } 212 } else { 213 panic!("timer should not fail"); 214 } 215 } 216 } 217 218 #[tokio::test] 219 async fn test_rtx_timer_callback_interval() -> Result<()> { 220 let timer_id = RtxTimerId::T1Init; 221 let ncbs = Arc::new(AtomicU32::new(0)); 222 let obs = Arc::new(Mutex::new(TestTimerObserver { 223 ncbs: ncbs.clone(), 224 timer_id, 225 ..Default::default() 226 })); 227 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 228 229 assert!(!rt.is_running().await, "should not be running"); 230 231 // since := time.Now() 232 let ok = rt.start(30).await; 233 assert!(ok, "should be true"); 234 assert!(rt.is_running().await, "should be running"); 235 236 sleep(Duration::from_millis(650)).await; 237 rt.stop().await; 238 assert!(!rt.is_running().await, "should not be running"); 239 240 assert_eq!(ncbs.load(Ordering::SeqCst), 4, "should be called 4 times"); 241 242 Ok(()) 243 } 244 245 #[tokio::test] 246 async fn test_rtx_timer_last_start_wins() -> Result<()> { 247 let timer_id = RtxTimerId::T3RTX; 248 let ncbs = Arc::new(AtomicU32::new(0)); 249 let obs = Arc::new(Mutex::new(TestTimerObserver { 250 ncbs: ncbs.clone(), 251 timer_id, 252 ..Default::default() 253 })); 254 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 255 256 let interval = 30; 257 let ok = rt.start(interval).await; 258 assert!(ok, "should be accepted"); 259 let ok = rt.start(interval * 99).await; // should ignored 260 assert!(!ok, "should be ignored"); 261 let ok = rt.start(interval * 99).await; // should ignored 262 assert!(!ok, "should be ignored"); 263 264 sleep(Duration::from_millis((interval * 3) / 2)).await; 265 rt.stop().await; 266 267 assert!(!rt.is_running().await, "should not be running"); 268 assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once"); 269 270 Ok(()) 271 } 272 273 #[tokio::test] 274 async fn test_rtx_timer_stop_right_after_start() -> Result<()> { 275 let timer_id = RtxTimerId::T3RTX; 276 let ncbs = Arc::new(AtomicU32::new(0)); 277 let obs = Arc::new(Mutex::new(TestTimerObserver { 278 ncbs: ncbs.clone(), 279 timer_id, 280 ..Default::default() 281 })); 282 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 283 284 let interval = 30; 285 let ok = rt.start(interval).await; 286 assert!(ok, "should be accepted"); 287 rt.stop().await; 288 289 sleep(Duration::from_millis((interval * 3) / 2)).await; 290 rt.stop().await; 291 292 assert!(!rt.is_running().await, "should not be running"); 293 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made"); 294 295 Ok(()) 296 } 297 298 #[tokio::test] 299 async fn test_rtx_timer_start_stop_then_start() -> Result<()> { 300 let timer_id = RtxTimerId::T1Cookie; 301 let ncbs = Arc::new(AtomicU32::new(0)); 302 let obs = Arc::new(Mutex::new(TestTimerObserver { 303 ncbs: ncbs.clone(), 304 timer_id, 305 ..Default::default() 306 })); 307 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 308 309 let interval = 30; 310 let ok = rt.start(interval).await; 311 assert!(ok, "should be accepted"); 312 rt.stop().await; 313 assert!(!rt.is_running().await, "should NOT be running"); 314 let ok = rt.start(interval).await; 315 assert!(ok, "should be accepted"); 316 assert!(rt.is_running().await, "should be running"); 317 318 sleep(Duration::from_millis((interval * 3) / 2)).await; 319 rt.stop().await; 320 321 assert!(!rt.is_running().await, "should NOT be running"); 322 assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once"); 323 324 Ok(()) 325 } 326 327 #[tokio::test] 328 async fn test_rtx_timer_start_and_stop_in_atight_loop() -> Result<()> { 329 let timer_id = RtxTimerId::T2Shutdown; 330 let ncbs = Arc::new(AtomicU32::new(0)); 331 let obs = Arc::new(Mutex::new(TestTimerObserver { 332 ncbs: ncbs.clone(), 333 timer_id, 334 ..Default::default() 335 })); 336 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 337 338 for _ in 0..1000 { 339 let ok = rt.start(30).await; 340 assert!(ok, "should be accepted"); 341 assert!(rt.is_running().await, "should be running"); 342 rt.stop().await; 343 assert!(!rt.is_running().await, "should NOT be running"); 344 } 345 346 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made"); 347 348 Ok(()) 349 } 350 351 #[tokio::test] 352 async fn test_rtx_timer_should_stop_after_rtx_failure() -> Result<()> { 353 let (done_tx, mut done_rx) = mpsc::channel(1); 354 355 let timer_id = RtxTimerId::Reconfig; 356 let ncbs = Arc::new(AtomicU32::new(0)); 357 let obs = Arc::new(Mutex::new(TestTimerObserver { 358 ncbs: ncbs.clone(), 359 timer_id, 360 done_tx: Some(done_tx), 361 ..Default::default() 362 })); 363 364 let since = SystemTime::now(); 365 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 366 367 // RTO(msec) Total(msec) 368 // 10 10 1st RTO 369 // 20 30 2nd RTO 370 // 40 70 3rd RTO 371 // 80 150 4th RTO 372 // 160 310 5th RTO (== Path.Max.Retrans) 373 // 320 630 Failure 374 375 let interval = 10; 376 let ok = rt.start(interval).await; 377 assert!(ok, "should be accepted"); 378 assert!(rt.is_running().await, "should be running"); 379 380 let elapsed = done_rx.recv().await; 381 382 assert!(!rt.is_running().await, "should not be running"); 383 assert_eq!(ncbs.load(Ordering::SeqCst), 5, "should be called 5 times"); 384 385 if let Some(elapsed) = elapsed { 386 let diff = elapsed.duration_since(since).unwrap(); 387 assert!( 388 diff > Duration::from_millis(600), 389 "must have taken more than 600 msec" 390 ); 391 assert!( 392 diff < Duration::from_millis(700), 393 "must fail in less than 700 msec" 394 ); 395 } 396 397 Ok(()) 398 } 399 400 #[tokio::test] 401 async fn test_rtx_timer_should_not_stop_if_max_retrans_is_zero() -> Result<()> { 402 let (done_tx, mut done_rx) = mpsc::channel(1); 403 404 let timer_id = RtxTimerId::Reconfig; 405 let max_rtos = 6; 406 let ncbs = Arc::new(AtomicU32::new(0)); 407 let obs = Arc::new(Mutex::new(TestTimerObserver { 408 ncbs: ncbs.clone(), 409 timer_id, 410 done_tx: Some(done_tx), 411 max_rtos, 412 ..Default::default() 413 })); 414 415 let since = SystemTime::now(); 416 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, 0); 417 418 // RTO(msec) Total(msec) 419 // 10 10 1st RTO 420 // 20 30 2nd RTO 421 // 40 70 3rd RTO 422 // 80 150 4th RTO 423 // 160 310 5th RTO 424 // 320 630 6th RTO => exit test (timer should still be running) 425 426 let interval = 10; 427 let ok = rt.start(interval).await; 428 assert!(ok, "should be accepted"); 429 assert!(rt.is_running().await, "should be running"); 430 431 let elapsed = done_rx.recv().await; 432 433 assert!(rt.is_running().await, "should still be running"); 434 assert_eq!(ncbs.load(Ordering::SeqCst), 6, "should be called 6 times"); 435 436 if let Some(elapsed) = elapsed { 437 let diff = elapsed.duration_since(since).unwrap(); 438 assert!( 439 diff > Duration::from_millis(600), 440 "must have taken more than 600 msec" 441 ); 442 assert!( 443 diff < Duration::from_millis(700), 444 "must fail in less than 700 msec" 445 ); 446 } 447 448 rt.stop().await; 449 450 Ok(()) 451 } 452 453 #[tokio::test] 454 async fn test_rtx_timer_stop_timer_that_is_not_running_is_noop() -> Result<()> { 455 let (done_tx, mut done_rx) = mpsc::channel(1); 456 457 let timer_id = RtxTimerId::Reconfig; 458 let obs = Arc::new(Mutex::new(TestTimerObserver { 459 timer_id, 460 done_tx: Some(done_tx), 461 max_rtos: usize::MAX, 462 ..Default::default() 463 })); 464 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 465 466 for _ in 0..10 { 467 rt.stop().await; 468 } 469 470 let ok = rt.start(20).await; 471 assert!(ok, "should be accepted"); 472 assert!(rt.is_running().await, "must be running"); 473 474 let _ = done_rx.recv().await; 475 rt.stop().await; 476 assert!(!rt.is_running().await, "must be false"); 477 478 Ok(()) 479 } 480 481 #[tokio::test] 482 async fn test_rtx_timer_closed_timer_wont_start() -> Result<()> { 483 let timer_id = RtxTimerId::Reconfig; 484 let ncbs = Arc::new(AtomicU32::new(0)); 485 let obs = Arc::new(Mutex::new(TestTimerObserver { 486 ncbs: ncbs.clone(), 487 timer_id, 488 ..Default::default() 489 })); 490 let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS); 491 492 let ok = rt.start(20).await; 493 assert!(ok, "should be accepted"); 494 assert!(rt.is_running().await, "must be running"); 495 496 rt.stop().await; 497 assert!(!rt.is_running().await, "must be false"); 498 499 //let ok = rt.start(obs.clone(), 20).await; 500 //assert!(!ok, "should not start"); 501 assert!(!rt.is_running().await, "must not be running"); 502 503 sleep(Duration::from_millis(100)).await; 504 assert_eq!(ncbs.load(Ordering::SeqCst), 0, "RTO should not occur"); 505 506 Ok(()) 507 } 508 } 509