xref: /webrtc/sctp/src/timer/timer_test.rs (revision 529e41be)
1 // Silence warning on `for i in 0..vec.len() { … }`:
2 #![allow(clippy::needless_range_loop)]
3 // Silence warning on `..Default::default()` with no effect:
4 #![allow(clippy::needless_update)]
5 
6 use async_trait::async_trait;
7 use tokio::sync::Mutex;
8 use tokio::time::{sleep, Duration};
9 
10 use std::sync::atomic::{AtomicU32, Ordering};
11 use std::sync::Arc;
12 
13 ///////////////////////////////////////////////////////////////////
14 //ack_timer_test
15 ///////////////////////////////////////////////////////////////////
16 use super::ack_timer::*;
17 
18 mod test_ack_timer {
19     use crate::error::Result;
20 
21     use super::*;
22 
23     struct TestAckTimerObserver {
24         ncbs: Arc<AtomicU32>,
25     }
26 
27     #[async_trait]
28     impl AckTimerObserver for TestAckTimerObserver {
29         async fn on_ack_timeout(&mut self) {
30             log::trace!("ack timed out");
31             self.ncbs.fetch_add(1, Ordering::SeqCst);
32         }
33     }
34 
35     #[tokio::test]
36     async fn test_ack_timer_start_and_stop() -> Result<()> {
37         let ncbs = Arc::new(AtomicU32::new(0));
38         let obs = Arc::new(Mutex::new(TestAckTimerObserver { ncbs: ncbs.clone() }));
39 
40         let mut rt = AckTimer::new(Arc::downgrade(&obs), ACK_INTERVAL);
41 
42         // should start ok
43         let ok = rt.start();
44         assert!(ok, "start() should succeed");
45         assert!(rt.is_running(), "should be running");
46 
47         // stop immedidately
48         rt.stop();
49         assert!(!rt.is_running(), "should not be running");
50 
51         // Sleep more than 200msec of interval to test if it never times out
52         sleep(ACK_INTERVAL + Duration::from_millis(50)).await;
53 
54         assert_eq!(
55             ncbs.load(Ordering::SeqCst),
56             0,
57             "should not be timed out (actual: {})",
58             ncbs.load(Ordering::SeqCst)
59         );
60 
61         // can start again
62         let ok = rt.start();
63         assert!(ok, "start() should succeed again");
64         assert!(rt.is_running(), "should be running");
65 
66         // should close ok
67         rt.stop();
68         assert!(!rt.is_running(), "should not be running");
69 
70         Ok(())
71     }
72 }
73 
74 ///////////////////////////////////////////////////////////////////
75 //rtx_timer_test
76 ///////////////////////////////////////////////////////////////////
77 use super::rtx_timer::*;
78 
79 mod test_rto_manager {
80     use crate::error::Result;
81 
82     use super::*;
83 
84     #[tokio::test]
85     async fn test_rto_manager_initial_values() -> Result<()> {
86         let m = RtoManager::new();
87         assert_eq!(m.rto, RTO_INITIAL, "should be rtoInitial");
88         assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial");
89         assert_eq!(m.srtt, 0, "should be 0");
90         assert_eq!(m.rttvar, 0.0, "should be 0.0");
91 
92         Ok(())
93     }
94 
95     #[tokio::test]
96     async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> {
97         let mut m = RtoManager::new();
98         let exp = vec![
99             1800, 1500, 1275, 1106, 1000, // capped at RTO.Min
100         ];
101 
102         for i in 0..5 {
103             m.set_new_rtt(600);
104             let rto = m.get_rto();
105             assert_eq!(rto, exp[i], "should be equal: {}", i);
106         }
107 
108         Ok(())
109     }
110 
111     #[tokio::test]
112     async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> {
113         let mut m = RtoManager::new();
114         let exp = vec![
115             60000, // capped at RTO.Max
116             60000, // capped at RTO.Max
117             60000, // capped at RTO.Max
118             55312, 48984,
119         ];
120 
121         for i in 0..5 {
122             m.set_new_rtt(30000);
123             let rto = m.get_rto();
124             assert_eq!(rto, exp[i], "should be equal: {}", i);
125         }
126 
127         Ok(())
128     }
129 
130     #[tokio::test]
131     async fn test_rto_manager_calculate_next_timeout() -> Result<()> {
132         let rto = calculate_next_timeout(1, 0);
133         assert_eq!(rto, 1, "should match");
134         let rto = calculate_next_timeout(1, 1);
135         assert_eq!(rto, 2, "should match");
136         let rto = calculate_next_timeout(1, 2);
137         assert_eq!(rto, 4, "should match");
138         let rto = calculate_next_timeout(1, 30);
139         assert_eq!(rto, 60000, "should match");
140         let rto = calculate_next_timeout(1, 63);
141         assert_eq!(rto, 60000, "should match");
142         let rto = calculate_next_timeout(1, 64);
143         assert_eq!(rto, 60000, "should match");
144 
145         Ok(())
146     }
147 
148     #[tokio::test]
149     async fn test_rto_manager_reset() -> Result<()> {
150         let mut m = RtoManager::new();
151         for _ in 0..10 {
152             m.set_new_rtt(200);
153         }
154 
155         m.reset();
156         assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial");
157         assert_eq!(m.srtt, 0, "should be 0");
158         assert_eq!(m.rttvar, 0.0, "should be 0");
159 
160         Ok(())
161     }
162 }
163 
164 //TODO: remove this conditional test
165 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
166 mod test_rtx_timer {
167     use super::*;
168     use crate::association::RtxTimerId;
169     use crate::error::Result;
170 
171     use std::time::SystemTime;
172     use tokio::sync::mpsc;
173 
174     struct TestTimerObserver {
175         ncbs: Arc<AtomicU32>,
176         timer_id: RtxTimerId,
177         done_tx: Option<mpsc::Sender<SystemTime>>,
178         max_rtos: usize,
179     }
180 
181     impl Default for TestTimerObserver {
182         fn default() -> Self {
183             TestTimerObserver {
184                 ncbs: Arc::new(AtomicU32::new(0)),
185                 timer_id: RtxTimerId::T1Init,
186                 done_tx: None,
187                 max_rtos: 0,
188             }
189         }
190     }
191 
192     #[async_trait]
193     impl RtxTimerObserver for TestTimerObserver {
194         async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n_rtos: usize) {
195             self.ncbs.fetch_add(1, Ordering::SeqCst);
196             // 30 : 1 (30)
197             // 60 : 2 (90)
198             // 120: 3 (210)
199             // 240: 4 (550) <== expected in 650 msec
200             assert_eq!(self.timer_id, timer_id, "unexpected timer ID: {}", timer_id);
201             if (self.max_rtos > 0 && n_rtos == self.max_rtos) || self.max_rtos == usize::MAX {
202                 if let Some(done) = &self.done_tx {
203                     let elapsed = SystemTime::now();
204                     let _ = done.send(elapsed).await;
205                 }
206             }
207         }
208 
209         async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId) {
210             if self.max_rtos == 0 {
211                 if let Some(done) = &self.done_tx {
212                     assert_eq!(self.timer_id, timer_id, "unexpted timer ID: {}", timer_id);
213                     let elapsed = SystemTime::now();
214                     //t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed)
215                     let _ = done.send(elapsed).await;
216                 }
217             } else {
218                 panic!("timer should not fail");
219             }
220         }
221     }
222 
223     #[tokio::test]
224     async fn test_rtx_timer_callback_interval() -> Result<()> {
225         let timer_id = RtxTimerId::T1Init;
226         let ncbs = Arc::new(AtomicU32::new(0));
227         let obs = Arc::new(Mutex::new(TestTimerObserver {
228             ncbs: ncbs.clone(),
229             timer_id,
230             ..Default::default()
231         }));
232         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
233 
234         assert!(!rt.is_running().await, "should not be running");
235 
236         // since := time.Now()
237         let ok = rt.start(30).await;
238         assert!(ok, "should be true");
239         assert!(rt.is_running().await, "should be running");
240 
241         sleep(Duration::from_millis(650)).await;
242         rt.stop().await;
243         assert!(!rt.is_running().await, "should not be running");
244 
245         assert_eq!(ncbs.load(Ordering::SeqCst), 4, "should be called 4 times");
246 
247         Ok(())
248     }
249 
250     #[tokio::test]
251     async fn test_rtx_timer_last_start_wins() -> Result<()> {
252         let timer_id = RtxTimerId::T3RTX;
253         let ncbs = Arc::new(AtomicU32::new(0));
254         let obs = Arc::new(Mutex::new(TestTimerObserver {
255             ncbs: ncbs.clone(),
256             timer_id,
257             ..Default::default()
258         }));
259         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
260 
261         let interval = 30;
262         let ok = rt.start(interval).await;
263         assert!(ok, "should be accepted");
264         let ok = rt.start(interval * 99).await; // should ignored
265         assert!(!ok, "should be ignored");
266         let ok = rt.start(interval * 99).await; // should ignored
267         assert!(!ok, "should be ignored");
268 
269         sleep(Duration::from_millis((interval * 3) / 2)).await;
270         rt.stop().await;
271 
272         assert!(!rt.is_running().await, "should not be running");
273         assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once");
274 
275         Ok(())
276     }
277 
278     #[tokio::test]
279     async fn test_rtx_timer_stop_right_after_start() -> Result<()> {
280         let timer_id = RtxTimerId::T3RTX;
281         let ncbs = Arc::new(AtomicU32::new(0));
282         let obs = Arc::new(Mutex::new(TestTimerObserver {
283             ncbs: ncbs.clone(),
284             timer_id,
285             ..Default::default()
286         }));
287         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
288 
289         let interval = 30;
290         let ok = rt.start(interval).await;
291         assert!(ok, "should be accepted");
292         rt.stop().await;
293 
294         sleep(Duration::from_millis((interval * 3) / 2)).await;
295         rt.stop().await;
296 
297         assert!(!rt.is_running().await, "should not be running");
298         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made");
299 
300         Ok(())
301     }
302 
303     #[tokio::test]
304     async fn test_rtx_timer_start_stop_then_start() -> Result<()> {
305         let timer_id = RtxTimerId::T1Cookie;
306         let ncbs = Arc::new(AtomicU32::new(0));
307         let obs = Arc::new(Mutex::new(TestTimerObserver {
308             ncbs: ncbs.clone(),
309             timer_id,
310             ..Default::default()
311         }));
312         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
313 
314         let interval = 30;
315         let ok = rt.start(interval).await;
316         assert!(ok, "should be accepted");
317         rt.stop().await;
318         assert!(!rt.is_running().await, "should NOT be running");
319         let ok = rt.start(interval).await;
320         assert!(ok, "should be accepted");
321         assert!(rt.is_running().await, "should be running");
322 
323         sleep(Duration::from_millis((interval * 3) / 2)).await;
324         rt.stop().await;
325 
326         assert!(!rt.is_running().await, "should NOT be running");
327         assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once");
328 
329         Ok(())
330     }
331 
332     #[tokio::test]
333     async fn test_rtx_timer_start_and_stop_in_atight_loop() -> Result<()> {
334         let timer_id = RtxTimerId::T2Shutdown;
335         let ncbs = Arc::new(AtomicU32::new(0));
336         let obs = Arc::new(Mutex::new(TestTimerObserver {
337             ncbs: ncbs.clone(),
338             timer_id,
339             ..Default::default()
340         }));
341         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
342 
343         for _ in 0..1000 {
344             let ok = rt.start(30).await;
345             assert!(ok, "should be accepted");
346             assert!(rt.is_running().await, "should be running");
347             rt.stop().await;
348             assert!(!rt.is_running().await, "should NOT be running");
349         }
350 
351         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made");
352 
353         Ok(())
354     }
355 
356     #[tokio::test]
357     async fn test_rtx_timer_should_stop_after_rtx_failure() -> Result<()> {
358         let (done_tx, mut done_rx) = mpsc::channel(1);
359 
360         let timer_id = RtxTimerId::Reconfig;
361         let ncbs = Arc::new(AtomicU32::new(0));
362         let obs = Arc::new(Mutex::new(TestTimerObserver {
363             ncbs: ncbs.clone(),
364             timer_id,
365             done_tx: Some(done_tx),
366             ..Default::default()
367         }));
368 
369         let since = SystemTime::now();
370         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
371 
372         // RTO(msec) Total(msec)
373         //  10          10    1st RTO
374         //  20          30    2nd RTO
375         //  40          70    3rd RTO
376         //  80         150    4th RTO
377         // 160         310    5th RTO (== Path.Max.Retrans)
378         // 320         630    Failure
379 
380         let interval = 10;
381         let ok = rt.start(interval).await;
382         assert!(ok, "should be accepted");
383         assert!(rt.is_running().await, "should be running");
384 
385         let elapsed = done_rx.recv().await;
386 
387         assert!(!rt.is_running().await, "should not be running");
388         assert_eq!(ncbs.load(Ordering::SeqCst), 5, "should be called 5 times");
389 
390         if let Some(elapsed) = elapsed {
391             let diff = elapsed.duration_since(since).unwrap();
392             assert!(
393                 diff > Duration::from_millis(600),
394                 "must have taken more than 600 msec"
395             );
396             assert!(
397                 diff < Duration::from_millis(700),
398                 "must fail in less than 700 msec"
399             );
400         }
401 
402         Ok(())
403     }
404 
405     #[tokio::test]
406     async fn test_rtx_timer_should_not_stop_if_max_retrans_is_zero() -> Result<()> {
407         let (done_tx, mut done_rx) = mpsc::channel(1);
408 
409         let timer_id = RtxTimerId::Reconfig;
410         let max_rtos = 6;
411         let ncbs = Arc::new(AtomicU32::new(0));
412         let obs = Arc::new(Mutex::new(TestTimerObserver {
413             ncbs: ncbs.clone(),
414             timer_id,
415             done_tx: Some(done_tx),
416             max_rtos,
417             ..Default::default()
418         }));
419 
420         let since = SystemTime::now();
421         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, 0);
422 
423         // RTO(msec) Total(msec)
424         //  10          10    1st RTO
425         //  20          30    2nd RTO
426         //  40          70    3rd RTO
427         //  80         150    4th RTO
428         // 160         310    5th RTO
429         // 320         630    6th RTO => exit test (timer should still be running)
430 
431         let interval = 10;
432         let ok = rt.start(interval).await;
433         assert!(ok, "should be accepted");
434         assert!(rt.is_running().await, "should be running");
435 
436         let elapsed = done_rx.recv().await;
437 
438         assert!(rt.is_running().await, "should still be running");
439         assert_eq!(ncbs.load(Ordering::SeqCst), 6, "should be called 6 times");
440 
441         if let Some(elapsed) = elapsed {
442             let diff = elapsed.duration_since(since).unwrap();
443             assert!(
444                 diff > Duration::from_millis(600),
445                 "must have taken more than 600 msec"
446             );
447             assert!(
448                 diff < Duration::from_millis(700),
449                 "must fail in less than 700 msec"
450             );
451         }
452 
453         rt.stop().await;
454 
455         Ok(())
456     }
457 
458     #[tokio::test]
459     async fn test_rtx_timer_stop_timer_that_is_not_running_is_noop() -> Result<()> {
460         let (done_tx, mut done_rx) = mpsc::channel(1);
461 
462         let timer_id = RtxTimerId::Reconfig;
463         let obs = Arc::new(Mutex::new(TestTimerObserver {
464             timer_id,
465             done_tx: Some(done_tx),
466             max_rtos: usize::MAX,
467             ..Default::default()
468         }));
469         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
470 
471         for _ in 0..10 {
472             rt.stop().await;
473         }
474 
475         let ok = rt.start(20).await;
476         assert!(ok, "should be accepted");
477         assert!(rt.is_running().await, "must be running");
478 
479         let _ = done_rx.recv().await;
480         rt.stop().await;
481         assert!(!rt.is_running().await, "must be false");
482 
483         Ok(())
484     }
485 
486     #[tokio::test]
487     async fn test_rtx_timer_closed_timer_wont_start() -> Result<()> {
488         let timer_id = RtxTimerId::Reconfig;
489         let ncbs = Arc::new(AtomicU32::new(0));
490         let obs = Arc::new(Mutex::new(TestTimerObserver {
491             ncbs: ncbs.clone(),
492             timer_id,
493             ..Default::default()
494         }));
495         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
496 
497         let ok = rt.start(20).await;
498         assert!(ok, "should be accepted");
499         assert!(rt.is_running().await, "must be running");
500 
501         rt.stop().await;
502         assert!(!rt.is_running().await, "must be false");
503 
504         //let ok = rt.start(obs.clone(), 20).await;
505         //assert!(!ok, "should not start");
506         assert!(!rt.is_running().await, "must not be running");
507 
508         sleep(Duration::from_millis(100)).await;
509         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "RTO should not occur");
510 
511         Ok(())
512     }
513 }
514