xref: /webrtc/util/src/buffer/buffer_test.rs (revision 529e41be)
1 use super::*;
2 use crate::error::Error;
3 
4 use tokio::sync::mpsc;
5 use tokio::time::{sleep, Duration};
6 use tokio_test::assert_ok;
7 
8 #[tokio::test]
9 async fn test_buffer() {
10     let buffer = Buffer::new(0, 0);
11     let mut packet: Vec<u8> = vec![0; 4];
12 
13     // Write once
14     let n = assert_ok!(buffer.write(&[0, 1]).await);
15     assert_eq!(n, 2, "n must be 2");
16 
17     // Read once
18     let n = assert_ok!(buffer.read(&mut packet, None).await);
19     assert_eq!(n, 2, "n must be 2");
20     assert_eq!(&packet[..n], &[0, 1]);
21 
22     // Read deadline
23     let result = buffer.read(&mut packet, Some(Duration::new(0, 1))).await;
24     assert!(result.is_err());
25     assert_eq!(result.unwrap_err(), Error::ErrTimeout);
26 
27     // Write twice
28     let n = assert_ok!(buffer.write(&[2, 3, 4]).await);
29     assert_eq!(n, 3, "n must be 3");
30 
31     let n = assert_ok!(buffer.write(&[5, 6, 7]).await);
32     assert_eq!(n, 3, "n must be 3");
33 
34     // Read twice
35     let n = assert_ok!(buffer.read(&mut packet, None).await);
36     assert_eq!(n, 3, "n must be 3");
37     assert_eq!(&packet[..n], &[2, 3, 4]);
38 
39     let n = assert_ok!(buffer.read(&mut packet, None).await);
40     assert_eq!(n, 3, "n must be 3");
41     assert_eq!(&packet[..n], &[5, 6, 7]);
42 
43     // Write once prior to close.
44     let n = assert_ok!(buffer.write(&[3]).await);
45     assert_eq!(n, 1, "n must be 1");
46 
47     // Close
48     buffer.close().await;
49 
50     // Future writes will error
51     let result = buffer.write(&[4]).await;
52     assert!(result.is_err());
53 
54     // But we can read the remaining data.
55     let n = assert_ok!(buffer.read(&mut packet, None).await);
56     assert_eq!(n, 1, "n must be 1");
57     assert_eq!(&packet[..n], &[3]);
58 
59     // Until EOF
60     let result = buffer.read(&mut packet, None).await;
61     assert!(result.is_err());
62     assert_eq!(Error::ErrBufferClosed, result.unwrap_err());
63 }
64 
65 async fn test_wraparound(grow: bool) {
66     let buffer = Buffer::new(0, 0);
67     {
68         let mut b = buffer.buffer.lock().await;
69         let result = b.grow();
70         assert!(result.is_ok());
71 
72         b.head = b.data.len() - 13;
73         b.tail = b.head;
74     }
75 
76     let p1 = vec![1, 2, 3];
77     let p2 = vec![4, 5, 6];
78     let p3 = vec![7, 8, 9];
79     let p4 = vec![10, 11, 12];
80 
81     assert_ok!(buffer.write(&p1).await);
82     assert_ok!(buffer.write(&p2).await);
83     assert_ok!(buffer.write(&p3).await);
84 
85     let mut p = vec![0; 10];
86 
87     let n = assert_ok!(buffer.read(&mut p, None).await);
88     assert_eq!(&p1[..], &p[..n]);
89 
90     if grow {
91         let mut b = buffer.buffer.lock().await;
92         let result = b.grow();
93         assert!(result.is_ok());
94     }
95 
96     let n = assert_ok!(buffer.read(&mut p, None).await);
97     assert_eq!(&p2[..], &p[..n]);
98 
99     assert_ok!(buffer.write(&p4).await);
100 
101     let n = assert_ok!(buffer.read(&mut p, None).await);
102     assert_eq!(&p3[..], &p[..n]);
103     let n = assert_ok!(buffer.read(&mut p, None).await);
104     assert_eq!(&p4[..], &p[..n]);
105 
106     {
107         let b = buffer.buffer.lock().await;
108         if !grow {
109             assert_eq!(b.data.len(), MIN_SIZE);
110         } else {
111             assert_eq!(b.data.len(), 2 * MIN_SIZE);
112         }
113     }
114 }
115 
116 #[tokio::test]
117 async fn test_buffer_wraparound() {
118     test_wraparound(false).await;
119 }
120 
121 #[tokio::test]
122 async fn test_buffer_wraparound_grow() {
123     test_wraparound(true).await;
124 }
125 
126 #[tokio::test]
127 async fn test_buffer_async() {
128     let buffer = Buffer::new(0, 0);
129 
130     let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
131 
132     let buffer2 = buffer.clone();
133     tokio::spawn(async move {
134         let mut packet: Vec<u8> = vec![0; 4];
135 
136         let n = assert_ok!(buffer2.read(&mut packet, None).await);
137         assert_eq!(n, 2, "n must be 2");
138         assert_eq!(&packet[..n], &[0, 1]);
139 
140         let result = buffer2.read(&mut packet, None).await;
141         assert!(result.is_err());
142         assert_eq!(result.unwrap_err(), Error::ErrBufferClosed);
143 
144         drop(done_tx);
145     });
146 
147     // Wait for the reader to start reading.
148     sleep(Duration::from_micros(1)).await;
149 
150     // Write once
151     let n = assert_ok!(buffer.write(&[0, 1]).await);
152     assert_eq!(n, 2, "n must be 2");
153 
154     // Wait for the reader to start reading again.
155     sleep(Duration::from_micros(1)).await;
156 
157     // Close will unblock the reader.
158     buffer.close().await;
159 
160     done_rx.recv().await;
161 }
162 
163 #[tokio::test]
164 async fn test_buffer_limit_count() {
165     let buffer = Buffer::new(2, 0);
166 
167     assert_eq!(buffer.count().await, 0);
168 
169     // Write twice
170     let n = assert_ok!(buffer.write(&[0, 1]).await);
171     assert_eq!(n, 2, "n must be 2");
172     assert_eq!(buffer.count().await, 1);
173 
174     let n = assert_ok!(buffer.write(&[2, 3]).await);
175     assert_eq!(n, 2, "n must be 2");
176     assert_eq!(buffer.count().await, 2);
177 
178     // Over capacity
179     let result = buffer.write(&[4, 5]).await;
180     assert!(result.is_err());
181     if let Err(err) = result {
182         assert_eq!(err, Error::ErrBufferFull);
183     }
184     assert_eq!(buffer.count().await, 2);
185 
186     // Read once
187     let mut packet: Vec<u8> = vec![0; 4];
188     let n = assert_ok!(buffer.read(&mut packet, None).await);
189     assert_eq!(n, 2, "n must be 2");
190     assert_eq!(&packet[..n], &[0, 1]);
191     assert_eq!(buffer.count().await, 1);
192 
193     // Write once
194     let n = assert_ok!(buffer.write(&[6, 7]).await);
195     assert_eq!(n, 2, "n must be 2");
196     assert_eq!(buffer.count().await, 2);
197 
198     // Over capacity
199     let result = buffer.write(&[8, 9]).await;
200     assert!(result.is_err());
201     if let Err(err) = result {
202         assert_eq!(Error::ErrBufferFull, err);
203     }
204     assert_eq!(buffer.count().await, 2);
205 
206     // Read twice
207     let n = assert_ok!(buffer.read(&mut packet, None).await);
208     assert_eq!(n, 2, "n must be 2");
209     assert_eq!(&packet[..n], &[2, 3]);
210     assert_eq!(buffer.count().await, 1);
211 
212     let n = assert_ok!(buffer.read(&mut packet, None).await);
213     assert_eq!(n, 2, "n must be 2");
214     assert_eq!(&packet[..n], &[6, 7]);
215     assert_eq!(buffer.count().await, 0);
216 
217     // Nothing left.
218     buffer.close().await;
219 }
220 
221 #[tokio::test]
222 async fn test_buffer_limit_size() {
223     let buffer = Buffer::new(0, 11);
224 
225     assert_eq!(buffer.size().await, 0);
226 
227     // Write twice
228     let n = assert_ok!(buffer.write(&[0, 1]).await);
229     assert_eq!(n, 2, "n must be 2");
230     assert_eq!(buffer.size().await, 4);
231 
232     let n = assert_ok!(buffer.write(&[2, 3]).await);
233     assert_eq!(n, 2, "n must be 2");
234     assert_eq!(buffer.size().await, 8);
235 
236     // Over capacity
237     let result = buffer.write(&[4, 5]).await;
238     assert!(result.is_err());
239     if let Err(err) = result {
240         assert_eq!(Error::ErrBufferFull, err);
241     }
242     assert_eq!(buffer.size().await, 8);
243 
244     // Cheeky write at exact size.
245     let n = assert_ok!(buffer.write(&[6]).await);
246     assert_eq!(n, 1, "n must be 1");
247     assert_eq!(buffer.size().await, 11);
248 
249     // Read once
250     let mut packet: Vec<u8> = vec![0; 4];
251     let n = assert_ok!(buffer.read(&mut packet, None).await);
252     assert_eq!(n, 2, "n must be 2");
253     assert_eq!(&packet[..n], &[0, 1]);
254     assert_eq!(buffer.size().await, 7);
255 
256     // Write once
257     let n = assert_ok!(buffer.write(&[7, 8]).await);
258     assert_eq!(n, 2, "n must be 2");
259     assert_eq!(buffer.size().await, 11);
260 
261     // Over capacity
262     let result = buffer.write(&[9, 10]).await;
263     assert!(result.is_err());
264     if let Err(err) = result {
265         assert_eq!(Error::ErrBufferFull, err);
266     }
267     assert_eq!(buffer.size().await, 11);
268 
269     // Read everything
270     let n = assert_ok!(buffer.read(&mut packet, None).await);
271     assert_eq!(n, 2, "n must be 2");
272     assert_eq!(&packet[..n], &[2, 3]);
273     assert_eq!(buffer.size().await, 7);
274 
275     let n = assert_ok!(buffer.read(&mut packet, None).await);
276     assert_eq!(n, 1, "n must be 1");
277     assert_eq!(&packet[..n], &[6]);
278     assert_eq!(buffer.size().await, 4);
279 
280     let n = assert_ok!(buffer.read(&mut packet, None).await);
281     assert_eq!(n, 2, "n must be 2");
282     assert_eq!(&packet[..n], &[7, 8]);
283     assert_eq!(buffer.size().await, 0);
284 
285     // Nothing left.
286     buffer.close().await;
287 }
288 
289 #[tokio::test]
290 async fn test_buffer_limit_sizes() {
291     let sizes = vec![
292         128 * 1024,
293         1024 * 1024,
294         8 * 1024 * 1024,
295         0, // default
296     ];
297     const HEADER_SIZE: usize = 2;
298     const PACKET_SIZE: usize = 0x8000;
299 
300     for mut size in sizes {
301         let mut name = "default".to_owned();
302         if size > 0 {
303             name = format!("{}kbytes", size / 1024);
304         }
305 
306         let buffer = Buffer::new(0, 0);
307         if size == 0 {
308             size = MAX_SIZE;
309         } else {
310             buffer.set_limit_size(size + HEADER_SIZE).await;
311         }
312 
313         //assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock
314 
315         let n_packets = size / (PACKET_SIZE + HEADER_SIZE);
316         let pkt = vec![0; PACKET_SIZE];
317         for _ in 0..n_packets {
318             assert_ok!(buffer.write(&pkt).await);
319         }
320 
321         // Next write is expected to be errored.
322         let result = buffer.write(&pkt).await;
323         assert!(result.is_err(), "{}", name);
324         assert_eq!(result.unwrap_err(), Error::ErrBufferFull, "{}", name);
325 
326         let mut packet = vec![0; size];
327         for _ in 0..n_packets {
328             let n = assert_ok!(buffer.read(&mut packet, Some(Duration::new(5, 0))).await);
329             assert_eq!(n, PACKET_SIZE, "{}", name);
330         }
331     }
332 }
333 
334 #[tokio::test]
335 async fn test_buffer_misc() {
336     let buffer = Buffer::new(0, 0);
337 
338     // Write once
339     let n = assert_ok!(buffer.write(&[0, 1, 2, 3]).await);
340     assert_eq!(n, 4, "n must be 4");
341 
342     // Try to read with a short buffer
343     let mut packet: Vec<u8> = vec![0; 3];
344     let result = buffer.read(&mut packet, None).await;
345     assert!(result.is_err());
346     if let Err(err) = result {
347         assert_eq!(err, Error::ErrBufferShort);
348     }
349 
350     // Close
351     buffer.close().await;
352 
353     // check is_close
354     assert!(buffer.is_closed().await);
355 
356     // Make sure you can Close twice
357     buffer.close().await;
358 }
359