xref: /webrtc/sctp/src/timer/timer_test.rs (revision 4ee96d0c)
1 use async_trait::async_trait;
2 use tokio::sync::Mutex;
3 use tokio::time::{sleep, Duration};
4 
5 use std::sync::atomic::{AtomicU32, Ordering};
6 use std::sync::Arc;
7 
8 ///////////////////////////////////////////////////////////////////
9 //ack_timer_test
10 ///////////////////////////////////////////////////////////////////
11 use super::ack_timer::*;
12 
13 mod test_ack_timer {
14     use crate::error::Result;
15 
16     use super::*;
17 
18     struct TestAckTimerObserver {
19         ncbs: Arc<AtomicU32>,
20     }
21 
22     #[async_trait]
23     impl AckTimerObserver for TestAckTimerObserver {
24         async fn on_ack_timeout(&mut self) {
25             log::trace!("ack timed out");
26             self.ncbs.fetch_add(1, Ordering::SeqCst);
27         }
28     }
29 
30     #[tokio::test]
31     async fn test_ack_timer_start_and_stop() -> Result<()> {
32         let ncbs = Arc::new(AtomicU32::new(0));
33         let obs = Arc::new(Mutex::new(TestAckTimerObserver { ncbs: ncbs.clone() }));
34 
35         let mut rt = AckTimer::new(Arc::downgrade(&obs), ACK_INTERVAL);
36 
37         // should start ok
38         let ok = rt.start();
39         assert!(ok, "start() should succeed");
40         assert!(rt.is_running(), "should be running");
41 
42         // stop immedidately
43         rt.stop();
44         assert!(!rt.is_running(), "should not be running");
45 
46         // Sleep more than 200msec of interval to test if it never times out
47         sleep(ACK_INTERVAL + Duration::from_millis(50)).await;
48 
49         assert_eq!(
50             ncbs.load(Ordering::SeqCst),
51             0,
52             "should not be timed out (actual: {})",
53             ncbs.load(Ordering::SeqCst)
54         );
55 
56         // can start again
57         let ok = rt.start();
58         assert!(ok, "start() should succeed again");
59         assert!(rt.is_running(), "should be running");
60 
61         // should close ok
62         rt.stop();
63         assert!(!rt.is_running(), "should not be running");
64 
65         Ok(())
66     }
67 }
68 
69 ///////////////////////////////////////////////////////////////////
70 //rtx_timer_test
71 ///////////////////////////////////////////////////////////////////
72 use super::rtx_timer::*;
73 
74 mod test_rto_manager {
75     use crate::error::Result;
76 
77     use super::*;
78 
79     #[tokio::test]
80     async fn test_rto_manager_initial_values() -> Result<()> {
81         let m = RtoManager::new();
82         assert_eq!(m.rto, RTO_INITIAL, "should be rtoInitial");
83         assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial");
84         assert_eq!(m.srtt, 0, "should be 0");
85         assert_eq!(m.rttvar, 0.0, "should be 0.0");
86 
87         Ok(())
88     }
89 
90     #[tokio::test]
91     async fn test_rto_manager_rto_calculation_small_rtt() -> Result<()> {
92         let mut m = RtoManager::new();
93         let exp = vec![
94             1800, 1500, 1275, 1106, 1000, // capped at RTO.Min
95         ];
96 
97         for i in 0..5 {
98             m.set_new_rtt(600);
99             let rto = m.get_rto();
100             assert_eq!(rto, exp[i], "should be equal: {}", i);
101         }
102 
103         Ok(())
104     }
105 
106     #[tokio::test]
107     async fn test_rto_manager_rto_calculation_large_rtt() -> Result<()> {
108         let mut m = RtoManager::new();
109         let exp = vec![
110             60000, // capped at RTO.Max
111             60000, // capped at RTO.Max
112             60000, // capped at RTO.Max
113             55312, 48984,
114         ];
115 
116         for i in 0..5 {
117             m.set_new_rtt(30000);
118             let rto = m.get_rto();
119             assert_eq!(rto, exp[i], "should be equal: {}", i);
120         }
121 
122         Ok(())
123     }
124 
125     #[tokio::test]
126     async fn test_rto_manager_calculate_next_timeout() -> Result<()> {
127         let rto = calculate_next_timeout(1, 0);
128         assert_eq!(rto, 1, "should match");
129         let rto = calculate_next_timeout(1, 1);
130         assert_eq!(rto, 2, "should match");
131         let rto = calculate_next_timeout(1, 2);
132         assert_eq!(rto, 4, "should match");
133         let rto = calculate_next_timeout(1, 30);
134         assert_eq!(rto, 60000, "should match");
135         let rto = calculate_next_timeout(1, 63);
136         assert_eq!(rto, 60000, "should match");
137         let rto = calculate_next_timeout(1, 64);
138         assert_eq!(rto, 60000, "should match");
139 
140         Ok(())
141     }
142 
143     #[tokio::test]
144     async fn test_rto_manager_reset() -> Result<()> {
145         let mut m = RtoManager::new();
146         for _ in 0..10 {
147             m.set_new_rtt(200);
148         }
149 
150         m.reset();
151         assert_eq!(m.get_rto(), RTO_INITIAL, "should be rtoInitial");
152         assert_eq!(m.srtt, 0, "should be 0");
153         assert_eq!(m.rttvar, 0.0, "should be 0");
154 
155         Ok(())
156     }
157 }
158 
159 //TODO: remove this conditional test
160 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
161 mod test_rtx_timer {
162     use super::*;
163     use crate::association::RtxTimerId;
164     use crate::error::Result;
165 
166     use std::time::SystemTime;
167     use tokio::sync::mpsc;
168 
169     struct TestTimerObserver {
170         ncbs: Arc<AtomicU32>,
171         timer_id: RtxTimerId,
172         done_tx: Option<mpsc::Sender<SystemTime>>,
173         max_rtos: usize,
174     }
175 
176     impl Default for TestTimerObserver {
177         fn default() -> Self {
178             TestTimerObserver {
179                 ncbs: Arc::new(AtomicU32::new(0)),
180                 timer_id: RtxTimerId::T1Init,
181                 done_tx: None,
182                 max_rtos: 0,
183             }
184         }
185     }
186 
187     #[async_trait]
188     impl RtxTimerObserver for TestTimerObserver {
189         async fn on_retransmission_timeout(&mut self, timer_id: RtxTimerId, n_rtos: usize) {
190             self.ncbs.fetch_add(1, Ordering::SeqCst);
191             // 30 : 1 (30)
192             // 60 : 2 (90)
193             // 120: 3 (210)
194             // 240: 4 (550) <== expected in 650 msec
195             assert_eq!(self.timer_id, timer_id, "unexpected timer ID: {}", timer_id);
196             if (self.max_rtos > 0 && n_rtos == self.max_rtos) || self.max_rtos == usize::MAX {
197                 if let Some(done) = &self.done_tx {
198                     let elapsed = SystemTime::now();
199                     let _ = done.send(elapsed).await;
200                 }
201             }
202         }
203 
204         async fn on_retransmission_failure(&mut self, timer_id: RtxTimerId) {
205             if self.max_rtos == 0 {
206                 if let Some(done) = &self.done_tx {
207                     assert_eq!(self.timer_id, timer_id, "unexpted timer ID: {}", timer_id);
208                     let elapsed = SystemTime::now();
209                     //t.Logf("onRtxFailure: elapsed=%.03f\n", elapsed)
210                     let _ = done.send(elapsed).await;
211                 }
212             } else {
213                 panic!("timer should not fail");
214             }
215         }
216     }
217 
218     #[tokio::test]
219     async fn test_rtx_timer_callback_interval() -> Result<()> {
220         let timer_id = RtxTimerId::T1Init;
221         let ncbs = Arc::new(AtomicU32::new(0));
222         let obs = Arc::new(Mutex::new(TestTimerObserver {
223             ncbs: ncbs.clone(),
224             timer_id,
225             ..Default::default()
226         }));
227         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
228 
229         assert!(!rt.is_running().await, "should not be running");
230 
231         // since := time.Now()
232         let ok = rt.start(30).await;
233         assert!(ok, "should be true");
234         assert!(rt.is_running().await, "should be running");
235 
236         sleep(Duration::from_millis(650)).await;
237         rt.stop().await;
238         assert!(!rt.is_running().await, "should not be running");
239 
240         assert_eq!(ncbs.load(Ordering::SeqCst), 4, "should be called 4 times");
241 
242         Ok(())
243     }
244 
245     #[tokio::test]
246     async fn test_rtx_timer_last_start_wins() -> Result<()> {
247         let timer_id = RtxTimerId::T3RTX;
248         let ncbs = Arc::new(AtomicU32::new(0));
249         let obs = Arc::new(Mutex::new(TestTimerObserver {
250             ncbs: ncbs.clone(),
251             timer_id,
252             ..Default::default()
253         }));
254         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
255 
256         let interval = 30;
257         let ok = rt.start(interval).await;
258         assert!(ok, "should be accepted");
259         let ok = rt.start(interval * 99).await; // should ignored
260         assert!(!ok, "should be ignored");
261         let ok = rt.start(interval * 99).await; // should ignored
262         assert!(!ok, "should be ignored");
263 
264         sleep(Duration::from_millis((interval * 3) / 2)).await;
265         rt.stop().await;
266 
267         assert!(!rt.is_running().await, "should not be running");
268         assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once");
269 
270         Ok(())
271     }
272 
273     #[tokio::test]
274     async fn test_rtx_timer_stop_right_after_start() -> Result<()> {
275         let timer_id = RtxTimerId::T3RTX;
276         let ncbs = Arc::new(AtomicU32::new(0));
277         let obs = Arc::new(Mutex::new(TestTimerObserver {
278             ncbs: ncbs.clone(),
279             timer_id,
280             ..Default::default()
281         }));
282         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
283 
284         let interval = 30;
285         let ok = rt.start(interval).await;
286         assert!(ok, "should be accepted");
287         rt.stop().await;
288 
289         sleep(Duration::from_millis((interval * 3) / 2)).await;
290         rt.stop().await;
291 
292         assert!(!rt.is_running().await, "should not be running");
293         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made");
294 
295         Ok(())
296     }
297 
298     #[tokio::test]
299     async fn test_rtx_timer_start_stop_then_start() -> Result<()> {
300         let timer_id = RtxTimerId::T1Cookie;
301         let ncbs = Arc::new(AtomicU32::new(0));
302         let obs = Arc::new(Mutex::new(TestTimerObserver {
303             ncbs: ncbs.clone(),
304             timer_id,
305             ..Default::default()
306         }));
307         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
308 
309         let interval = 30;
310         let ok = rt.start(interval).await;
311         assert!(ok, "should be accepted");
312         rt.stop().await;
313         assert!(!rt.is_running().await, "should NOT be running");
314         let ok = rt.start(interval).await;
315         assert!(ok, "should be accepted");
316         assert!(rt.is_running().await, "should be running");
317 
318         sleep(Duration::from_millis((interval * 3) / 2)).await;
319         rt.stop().await;
320 
321         assert!(!rt.is_running().await, "should NOT be running");
322         assert_eq!(ncbs.load(Ordering::SeqCst), 1, "must be called once");
323 
324         Ok(())
325     }
326 
327     #[tokio::test]
328     async fn test_rtx_timer_start_and_stop_in_atight_loop() -> Result<()> {
329         let timer_id = RtxTimerId::T2Shutdown;
330         let ncbs = Arc::new(AtomicU32::new(0));
331         let obs = Arc::new(Mutex::new(TestTimerObserver {
332             ncbs: ncbs.clone(),
333             timer_id,
334             ..Default::default()
335         }));
336         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
337 
338         for _ in 0..1000 {
339             let ok = rt.start(30).await;
340             assert!(ok, "should be accepted");
341             assert!(rt.is_running().await, "should be running");
342             rt.stop().await;
343             assert!(!rt.is_running().await, "should NOT be running");
344         }
345 
346         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "no callback should be made");
347 
348         Ok(())
349     }
350 
351     #[tokio::test]
352     async fn test_rtx_timer_should_stop_after_rtx_failure() -> Result<()> {
353         let (done_tx, mut done_rx) = mpsc::channel(1);
354 
355         let timer_id = RtxTimerId::Reconfig;
356         let ncbs = Arc::new(AtomicU32::new(0));
357         let obs = Arc::new(Mutex::new(TestTimerObserver {
358             ncbs: ncbs.clone(),
359             timer_id,
360             done_tx: Some(done_tx),
361             ..Default::default()
362         }));
363 
364         let since = SystemTime::now();
365         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
366 
367         // RTO(msec) Total(msec)
368         //  10          10    1st RTO
369         //  20          30    2nd RTO
370         //  40          70    3rd RTO
371         //  80         150    4th RTO
372         // 160         310    5th RTO (== Path.Max.Retrans)
373         // 320         630    Failure
374 
375         let interval = 10;
376         let ok = rt.start(interval).await;
377         assert!(ok, "should be accepted");
378         assert!(rt.is_running().await, "should be running");
379 
380         let elapsed = done_rx.recv().await;
381 
382         assert!(!rt.is_running().await, "should not be running");
383         assert_eq!(ncbs.load(Ordering::SeqCst), 5, "should be called 5 times");
384 
385         if let Some(elapsed) = elapsed {
386             let diff = elapsed.duration_since(since).unwrap();
387             assert!(
388                 diff > Duration::from_millis(600),
389                 "must have taken more than 600 msec"
390             );
391             assert!(
392                 diff < Duration::from_millis(700),
393                 "must fail in less than 700 msec"
394             );
395         }
396 
397         Ok(())
398     }
399 
400     #[tokio::test]
401     async fn test_rtx_timer_should_not_stop_if_max_retrans_is_zero() -> Result<()> {
402         let (done_tx, mut done_rx) = mpsc::channel(1);
403 
404         let timer_id = RtxTimerId::Reconfig;
405         let max_rtos = 6;
406         let ncbs = Arc::new(AtomicU32::new(0));
407         let obs = Arc::new(Mutex::new(TestTimerObserver {
408             ncbs: ncbs.clone(),
409             timer_id,
410             done_tx: Some(done_tx),
411             max_rtos,
412             ..Default::default()
413         }));
414 
415         let since = SystemTime::now();
416         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, 0);
417 
418         // RTO(msec) Total(msec)
419         //  10          10    1st RTO
420         //  20          30    2nd RTO
421         //  40          70    3rd RTO
422         //  80         150    4th RTO
423         // 160         310    5th RTO
424         // 320         630    6th RTO => exit test (timer should still be running)
425 
426         let interval = 10;
427         let ok = rt.start(interval).await;
428         assert!(ok, "should be accepted");
429         assert!(rt.is_running().await, "should be running");
430 
431         let elapsed = done_rx.recv().await;
432 
433         assert!(rt.is_running().await, "should still be running");
434         assert_eq!(ncbs.load(Ordering::SeqCst), 6, "should be called 6 times");
435 
436         if let Some(elapsed) = elapsed {
437             let diff = elapsed.duration_since(since).unwrap();
438             assert!(
439                 diff > Duration::from_millis(600),
440                 "must have taken more than 600 msec"
441             );
442             assert!(
443                 diff < Duration::from_millis(700),
444                 "must fail in less than 700 msec"
445             );
446         }
447 
448         rt.stop().await;
449 
450         Ok(())
451     }
452 
453     #[tokio::test]
454     async fn test_rtx_timer_stop_timer_that_is_not_running_is_noop() -> Result<()> {
455         let (done_tx, mut done_rx) = mpsc::channel(1);
456 
457         let timer_id = RtxTimerId::Reconfig;
458         let obs = Arc::new(Mutex::new(TestTimerObserver {
459             timer_id,
460             done_tx: Some(done_tx),
461             max_rtos: usize::MAX,
462             ..Default::default()
463         }));
464         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
465 
466         for _ in 0..10 {
467             rt.stop().await;
468         }
469 
470         let ok = rt.start(20).await;
471         assert!(ok, "should be accepted");
472         assert!(rt.is_running().await, "must be running");
473 
474         let _ = done_rx.recv().await;
475         rt.stop().await;
476         assert!(!rt.is_running().await, "must be false");
477 
478         Ok(())
479     }
480 
481     #[tokio::test]
482     async fn test_rtx_timer_closed_timer_wont_start() -> Result<()> {
483         let timer_id = RtxTimerId::Reconfig;
484         let ncbs = Arc::new(AtomicU32::new(0));
485         let obs = Arc::new(Mutex::new(TestTimerObserver {
486             ncbs: ncbs.clone(),
487             timer_id,
488             ..Default::default()
489         }));
490         let rt = RtxTimer::new(Arc::downgrade(&obs), timer_id, PATH_MAX_RETRANS);
491 
492         let ok = rt.start(20).await;
493         assert!(ok, "should be accepted");
494         assert!(rt.is_running().await, "must be running");
495 
496         rt.stop().await;
497         assert!(!rt.is_running().await, "must be false");
498 
499         //let ok = rt.start(obs.clone(), 20).await;
500         //assert!(!ok, "should not start");
501         assert!(!rt.is_running().await, "must not be running");
502 
503         sleep(Duration::from_millis(100)).await;
504         assert_eq!(ncbs.load(Ordering::SeqCst), 0, "RTO should not occur");
505 
506         Ok(())
507     }
508 }
509