xref: /webrtc/util/src/buffer/buffer_test.rs (revision a65328a8)
1 use super::*;
2 
3 use tokio::prelude::*;
4 use tokio_test::assert_ok;
5 use tokio::timer::delay;
6 
7 use std::time::Duration;
8 
9 #[tokio::test]
10 async fn test_buffer() {
11     let mut buffer = Buffer::new(0, 0);
12     let mut packet: Vec<u8> = vec![0; 4];
13 
14     // Write once
15     let n = assert_ok!(buffer.write(&[0, 1]).await);
16     assert_eq!(n, 2, "n must be 2");
17 
18     // Read once
19     let n = assert_ok!(buffer.read(&mut packet).await);
20     assert_eq!(n, 2, "n must be 2");
21     assert_eq!(&[0, 1], &packet[..n]);
22 
23     // Write twice
24     let n = assert_ok!(buffer.write(&[2, 3, 4]).await);
25     assert_eq!(n, 3, "n must be 3");
26 
27     let n = assert_ok!(buffer.write(&[5, 6, 7]).await);
28     assert_eq!(n, 3, "n must be 3");
29 
30     // Read twice
31     let n = assert_ok!(buffer.read(&mut packet).await);
32     assert_eq!(n, 3, "n must be 3");
33     assert_eq!(&[2, 3, 4], &packet[..n]);
34 
35     let n = assert_ok!(buffer.read(&mut packet).await);
36     assert_eq!(n, 3, "n must be 3");
37     assert_eq!(&[5, 6, 7], &packet[..n]);
38 
39     // Write once prior to close.
40     let n = assert_ok!(buffer.write(&[3]).await);
41     assert_eq!(n, 1, "n must be 1");
42 
43     // Close
44     buffer.close().await;
45 
46     // Future writes will error
47     let result = buffer.write(&[4]).await;
48     assert!(result.is_err());
49 
50     // But we can read the remaining data.
51     let n = assert_ok!(buffer.read(&mut packet).await);
52     assert_eq!(n, 1, "n must be 1");
53     assert_eq!(&[3], &packet[..n]);
54 
55     // Until EOF
56     let result = buffer.read(&mut packet).await;
57     assert!(result.is_err());
58     if let Err(err) = result {
59         assert_eq!(err, ERR_BUFFER_CLOSED.clone());
60     }
61 }
62 
63 #[tokio::test]
64 async fn test_buffer_async() {
65     let mut buffer = Buffer::new(0, 0);
66 
67     let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
68 
69     let mut buffer2 = buffer.clone();
70     tokio::spawn(async move {
71         let mut packet:Vec<u8> = vec![0; 4];
72 
73         let n = assert_ok!(buffer2.read(&mut packet).await);
74         assert_eq!(n, 2, "n must be 2");
75         assert_eq!(&[0, 1], &packet[..n]);
76 
77         let result = buffer2.read(&mut packet).await;
78         assert!(result.is_err());
79         if let Err(err) = result {
80             assert_eq!(err, ERR_BUFFER_CLOSED.clone());
81         }
82 
83         drop(done_tx);
84     });
85 
86     // Wait for the reader to start reading.
87     let when = tokio::clock::now() + Duration::from_millis(1);
88     delay(when).await;
89 
90     // Write once
91     let n = assert_ok!(buffer.write(&[0, 1]).await);
92     assert_eq!(n, 2, "n must be 2");
93 
94     // Wait for the reader to start reading again.
95     let when = tokio::clock::now() + Duration::from_millis(1);
96     delay(when).await;
97 
98     // Close will unblock the reader.
99     buffer.close().await;
100 
101     done_rx.recv().await;
102 }