xref: /webrtc/util/src/buffer/buffer_test.rs (revision 099848e8)
1 use super::*;
2 
3 use tokio::time::{sleep, Duration};
4 use tokio_test::assert_ok;
5 
6 #[tokio::test]
7 async fn test_buffer() {
8     let mut buffer = Buffer::new(0, 0);
9     let mut packet: Vec<u8> = vec![0; 4];
10 
11     // Write once
12     let n = assert_ok!(buffer.write(&[0, 1]).await);
13     assert_eq!(n, 2, "n must be 2");
14 
15     // Read once
16     let n = assert_ok!(buffer.read(&mut packet, None).await);
17     assert_eq!(n, 2, "n must be 2");
18     assert_eq!(&[0, 1], &packet[..n]);
19 
20     // Read deadline
21     let result = buffer.read(&mut packet, Some(Duration::new(0, 1))).await;
22     assert!(result.is_err());
23     assert_eq!(result.unwrap_err(), ERR_TIMEOUT.clone());
24 
25     // Write twice
26     let n = assert_ok!(buffer.write(&[2, 3, 4]).await);
27     assert_eq!(n, 3, "n must be 3");
28 
29     let n = assert_ok!(buffer.write(&[5, 6, 7]).await);
30     assert_eq!(n, 3, "n must be 3");
31 
32     // Read twice
33     let n = assert_ok!(buffer.read(&mut packet, None).await);
34     assert_eq!(n, 3, "n must be 3");
35     assert_eq!(&[2, 3, 4], &packet[..n]);
36 
37     let n = assert_ok!(buffer.read(&mut packet, None).await);
38     assert_eq!(n, 3, "n must be 3");
39     assert_eq!(&[5, 6, 7], &packet[..n]);
40 
41     // Write once prior to close.
42     let n = assert_ok!(buffer.write(&[3]).await);
43     assert_eq!(n, 1, "n must be 1");
44 
45     // Close
46     buffer.close().await;
47 
48     // Future writes will error
49     let result = buffer.write(&[4]).await;
50     assert!(result.is_err());
51 
52     // But we can read the remaining data.
53     let n = assert_ok!(buffer.read(&mut packet, None).await);
54     assert_eq!(n, 1, "n must be 1");
55     assert_eq!(&[3], &packet[..n]);
56 
57     // Until EOF
58     let result = buffer.read(&mut packet, None).await;
59     assert!(result.is_err());
60     assert_eq!(result.unwrap_err(), ERR_BUFFER_CLOSED.clone());
61 }
62 
63 async fn test_wraparound(grow: bool) {
64     let mut buffer = Buffer::new(0, 0);
65     {
66         let mut b = buffer.buffer.lock().await;
67         let result = b.grow();
68         assert!(result.is_ok());
69 
70         b.head = b.data.len() - 13;
71         b.tail = b.head;
72     }
73 
74     let p1 = vec![1, 2, 3];
75     let p2 = vec![4, 5, 6];
76     let p3 = vec![7, 8, 9];
77     let p4 = vec![10, 11, 12];
78 
79     assert_ok!(buffer.write(&p1).await);
80     assert_ok!(buffer.write(&p2).await);
81     assert_ok!(buffer.write(&p3).await);
82 
83     let mut p = vec![0; 10];
84 
85     let n = assert_ok!(buffer.read(&mut p, None).await);
86     assert_eq!(&p1[..], &p[..n]);
87 
88     if grow {
89         let mut b = buffer.buffer.lock().await;
90         let result = b.grow();
91         assert!(result.is_ok());
92     }
93 
94     let n = assert_ok!(buffer.read(&mut p, None).await);
95     assert_eq!(&p2[..], &p[..n]);
96 
97     assert_ok!(buffer.write(&p4).await);
98 
99     let n = assert_ok!(buffer.read(&mut p, None).await);
100     assert_eq!(&p3[..], &p[..n]);
101     let n = assert_ok!(buffer.read(&mut p, None).await);
102     assert_eq!(&p4[..], &p[..n]);
103 
104     {
105         let b = buffer.buffer.lock().await;
106         if !grow {
107             assert_eq!(b.data.len(), MIN_SIZE);
108         } else {
109             assert_eq!(b.data.len(), 2 * MIN_SIZE);
110         }
111     }
112 }
113 
114 #[tokio::test]
115 async fn test_buffer_wraparound() {
116     test_wraparound(false).await;
117 }
118 
119 #[tokio::test]
120 async fn test_buffer_wraparound_grow() {
121     test_wraparound(true).await;
122 }
123 
124 #[tokio::test]
125 async fn test_buffer_async() {
126     let mut buffer = Buffer::new(0, 0);
127 
128     let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
129 
130     let mut buffer2 = buffer.clone();
131     tokio::spawn(async move {
132         let mut packet: Vec<u8> = vec![0; 4];
133 
134         let n = assert_ok!(buffer2.read(&mut packet, None).await);
135         assert_eq!(n, 2, "n must be 2");
136         assert_eq!(&[0, 1], &packet[..n]);
137 
138         let result = buffer2.read(&mut packet, None).await;
139         assert!(result.is_err());
140         assert_eq!(result.unwrap_err(), ERR_BUFFER_CLOSED.clone());
141 
142         drop(done_tx);
143     });
144 
145     // Wait for the reader to start reading.
146     sleep(Duration::from_micros(1)).await;
147 
148     // Write once
149     let n = assert_ok!(buffer.write(&[0, 1]).await);
150     assert_eq!(n, 2, "n must be 2");
151 
152     // Wait for the reader to start reading again.
153     sleep(Duration::from_micros(1)).await;
154 
155     // Close will unblock the reader.
156     buffer.close().await;
157 
158     done_rx.recv().await;
159 }
160 
161 #[tokio::test]
162 async fn test_buffer_limit_count() {
163     let mut buffer = Buffer::new(2, 0);
164 
165     assert_eq!(0, buffer.count().await);
166 
167     // Write twice
168     let n = assert_ok!(buffer.write(&[0, 1]).await);
169     assert_eq!(n, 2, "n must be 2");
170     assert_eq!(1, buffer.count().await);
171 
172     let n = assert_ok!(buffer.write(&[2, 3]).await);
173     assert_eq!(n, 2, "n must be 2");
174     assert_eq!(2, buffer.count().await);
175 
176     // Over capacity
177     let result = buffer.write(&[4, 5]).await;
178     assert!(result.is_err());
179     if let Err(err) = result {
180         assert_eq!(err, ERR_BUFFER_FULL.clone());
181     }
182     assert_eq!(2, buffer.count().await);
183 
184     // Read once
185     let mut packet: Vec<u8> = vec![0; 4];
186     let n = assert_ok!(buffer.read(&mut packet, None).await);
187     assert_eq!(n, 2, "n must be 2");
188     assert_eq!(&[0, 1], &packet[..n]);
189     assert_eq!(1, buffer.count().await);
190 
191     // Write once
192     let n = assert_ok!(buffer.write(&[6, 7]).await);
193     assert_eq!(n, 2, "n must be 2");
194     assert_eq!(2, buffer.count().await);
195 
196     // Over capacity
197     let result = buffer.write(&[8, 9]).await;
198     assert!(result.is_err());
199     if let Err(err) = result {
200         assert_eq!(err, ERR_BUFFER_FULL.clone());
201     }
202     assert_eq!(2, buffer.count().await);
203 
204     // Read twice
205     let n = assert_ok!(buffer.read(&mut packet, None).await);
206     assert_eq!(n, 2, "n must be 2");
207     assert_eq!(&[2, 3], &packet[..n]);
208     assert_eq!(1, buffer.count().await);
209 
210     let n = assert_ok!(buffer.read(&mut packet, None).await);
211     assert_eq!(n, 2, "n must be 2");
212     assert_eq!(&[6, 7], &packet[..n]);
213     assert_eq!(0, buffer.count().await);
214 
215     // Nothing left.
216     buffer.close().await;
217 }
218 
219 #[tokio::test]
220 async fn test_buffer_limit_size() {
221     let mut buffer = Buffer::new(0, 11);
222 
223     assert_eq!(0, buffer.size().await);
224 
225     // Write twice
226     let n = assert_ok!(buffer.write(&[0, 1]).await);
227     assert_eq!(n, 2, "n must be 2");
228     assert_eq!(4, buffer.size().await);
229 
230     let n = assert_ok!(buffer.write(&[2, 3]).await);
231     assert_eq!(n, 2, "n must be 2");
232     assert_eq!(8, buffer.size().await);
233 
234     // Over capacity
235     let result = buffer.write(&[4, 5]).await;
236     assert!(result.is_err());
237     if let Err(err) = result {
238         assert_eq!(err, ERR_BUFFER_FULL.clone());
239     }
240     assert_eq!(8, buffer.size().await);
241 
242     // Cheeky write at exact size.
243     let n = assert_ok!(buffer.write(&[6]).await);
244     assert_eq!(n, 1, "n must be 1");
245     assert_eq!(11, buffer.size().await);
246 
247     // Read once
248     let mut packet: Vec<u8> = vec![0; 4];
249     let n = assert_ok!(buffer.read(&mut packet, None).await);
250     assert_eq!(n, 2, "n must be 2");
251     assert_eq!(&[0, 1], &packet[..n]);
252     assert_eq!(7, buffer.size().await);
253 
254     // Write once
255     let n = assert_ok!(buffer.write(&[7, 8]).await);
256     assert_eq!(n, 2, "n must be 2");
257     assert_eq!(11, buffer.size().await);
258 
259     // Over capacity
260     let result = buffer.write(&[9, 10]).await;
261     assert!(result.is_err());
262     if let Err(err) = result {
263         assert_eq!(err, ERR_BUFFER_FULL.clone());
264     }
265     assert_eq!(11, buffer.size().await);
266 
267     // Read everything
268     let n = assert_ok!(buffer.read(&mut packet, None).await);
269     assert_eq!(n, 2, "n must be 2");
270     assert_eq!(&[2, 3], &packet[..n]);
271     assert_eq!(7, buffer.size().await);
272 
273     let n = assert_ok!(buffer.read(&mut packet, None).await);
274     assert_eq!(n, 1, "n must be 1");
275     assert_eq!(&[6], &packet[..n]);
276     assert_eq!(4, buffer.size().await);
277 
278     let n = assert_ok!(buffer.read(&mut packet, None).await);
279     assert_eq!(n, 2, "n must be 2");
280     assert_eq!(&[7, 8], &packet[..n]);
281     assert_eq!(0, buffer.size().await);
282 
283     // Nothing left.
284     buffer.close().await;
285 }
286 
287 #[tokio::test]
288 async fn test_buffer_limit_sizes() {
289     let sizes = vec![
290         128 * 1024,
291         1024 * 1024,
292         8 * 1024 * 1024,
293         0, // default
294     ];
295     const HEADER_SIZE: usize = 2;
296     const PACKET_SIZE: usize = 0x8000;
297 
298     for mut size in sizes {
299         let mut name = "default".to_owned();
300         if size > 0 {
301             name = format!("{}kbytes", size / 1024);
302         }
303 
304         let mut buffer = Buffer::new(0, 0);
305         if size == 0 {
306             size = MAX_SIZE;
307         } else {
308             buffer.set_limit_size(size + HEADER_SIZE).await;
309         }
310 
311         //assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock
312 
313         let n_packets = size / (PACKET_SIZE + HEADER_SIZE);
314         let pkt = vec![0; PACKET_SIZE];
315         for _ in 0..n_packets {
316             assert_ok!(buffer.write(&pkt).await);
317         }
318 
319         // Next write is expected to be errored.
320         let result = buffer.write(&pkt).await;
321         assert!(result.is_err(), "{}", name);
322         assert_eq!(result.unwrap_err(), ERR_BUFFER_FULL.clone(), "{}", name);
323 
324         let mut packet = vec![0; size];
325         for _ in 0..n_packets {
326             let n = assert_ok!(buffer.read(&mut packet, Some(Duration::new(5, 0))).await);
327             assert_eq!(PACKET_SIZE, n, "{}", name);
328         }
329     }
330 }
331 
332 #[tokio::test]
333 async fn test_buffer_misc() {
334     let mut buffer = Buffer::new(0, 0);
335 
336     // Write once
337     let n = assert_ok!(buffer.write(&[0, 1, 2, 3]).await);
338     assert_eq!(n, 4, "n must be 4");
339 
340     // Try to read with a short buffer
341     let mut packet: Vec<u8> = vec![0; 3];
342     let result = buffer.read(&mut packet, None).await;
343     assert!(result.is_err());
344     if let Err(err) = result {
345         assert_eq!(err, ERR_BUFFER_SHORT.clone());
346     }
347 
348     // Close
349     buffer.close().await;
350 
351     // check is_close
352     assert!(buffer.is_closed().await);
353 
354     // Make sure you can Close twice
355     buffer.close().await;
356 }
357