1 use super::*;
2 use crate::error::Error;
3
4 use tokio::sync::mpsc;
5 use tokio::time::{sleep, Duration};
6 use tokio_test::assert_ok;
7
8 #[tokio::test]
test_buffer()9 async fn test_buffer() {
10 let buffer = Buffer::new(0, 0);
11 let mut packet: Vec<u8> = vec![0; 4];
12
13 // Write once
14 let n = assert_ok!(buffer.write(&[0, 1]).await);
15 assert_eq!(n, 2, "n must be 2");
16
17 // Read once
18 let n = assert_ok!(buffer.read(&mut packet, None).await);
19 assert_eq!(n, 2, "n must be 2");
20 assert_eq!(&packet[..n], &[0, 1]);
21
22 // Read deadline
23 let result = buffer.read(&mut packet, Some(Duration::new(0, 1))).await;
24 assert!(result.is_err());
25 assert_eq!(result.unwrap_err(), Error::ErrTimeout);
26
27 // Write twice
28 let n = assert_ok!(buffer.write(&[2, 3, 4]).await);
29 assert_eq!(n, 3, "n must be 3");
30
31 let n = assert_ok!(buffer.write(&[5, 6, 7]).await);
32 assert_eq!(n, 3, "n must be 3");
33
34 // Read twice
35 let n = assert_ok!(buffer.read(&mut packet, None).await);
36 assert_eq!(n, 3, "n must be 3");
37 assert_eq!(&packet[..n], &[2, 3, 4]);
38
39 let n = assert_ok!(buffer.read(&mut packet, None).await);
40 assert_eq!(n, 3, "n must be 3");
41 assert_eq!(&packet[..n], &[5, 6, 7]);
42
43 // Write once prior to close.
44 let n = assert_ok!(buffer.write(&[3]).await);
45 assert_eq!(n, 1, "n must be 1");
46
47 // Close
48 buffer.close().await;
49
50 // Future writes will error
51 let result = buffer.write(&[4]).await;
52 assert!(result.is_err());
53
54 // But we can read the remaining data.
55 let n = assert_ok!(buffer.read(&mut packet, None).await);
56 assert_eq!(n, 1, "n must be 1");
57 assert_eq!(&packet[..n], &[3]);
58
59 // Until EOF
60 let result = buffer.read(&mut packet, None).await;
61 assert!(result.is_err());
62 assert_eq!(Error::ErrBufferClosed, result.unwrap_err());
63 }
64
test_wraparound(grow: bool)65 async fn test_wraparound(grow: bool) {
66 let buffer = Buffer::new(0, 0);
67 {
68 let mut b = buffer.buffer.lock().await;
69 let result = b.grow();
70 assert!(result.is_ok());
71
72 b.head = b.data.len() - 13;
73 b.tail = b.head;
74 }
75
76 let p1 = vec![1, 2, 3];
77 let p2 = vec![4, 5, 6];
78 let p3 = vec![7, 8, 9];
79 let p4 = vec![10, 11, 12];
80
81 assert_ok!(buffer.write(&p1).await);
82 assert_ok!(buffer.write(&p2).await);
83 assert_ok!(buffer.write(&p3).await);
84
85 let mut p = vec![0; 10];
86
87 let n = assert_ok!(buffer.read(&mut p, None).await);
88 assert_eq!(&p1[..], &p[..n]);
89
90 if grow {
91 let mut b = buffer.buffer.lock().await;
92 let result = b.grow();
93 assert!(result.is_ok());
94 }
95
96 let n = assert_ok!(buffer.read(&mut p, None).await);
97 assert_eq!(&p2[..], &p[..n]);
98
99 assert_ok!(buffer.write(&p4).await);
100
101 let n = assert_ok!(buffer.read(&mut p, None).await);
102 assert_eq!(&p3[..], &p[..n]);
103 let n = assert_ok!(buffer.read(&mut p, None).await);
104 assert_eq!(&p4[..], &p[..n]);
105
106 {
107 let b = buffer.buffer.lock().await;
108 if !grow {
109 assert_eq!(b.data.len(), MIN_SIZE);
110 } else {
111 assert_eq!(b.data.len(), 2 * MIN_SIZE);
112 }
113 }
114 }
115
116 #[tokio::test]
test_buffer_wraparound()117 async fn test_buffer_wraparound() {
118 test_wraparound(false).await;
119 }
120
121 #[tokio::test]
test_buffer_wraparound_grow()122 async fn test_buffer_wraparound_grow() {
123 test_wraparound(true).await;
124 }
125
126 #[tokio::test]
test_buffer_async()127 async fn test_buffer_async() {
128 let buffer = Buffer::new(0, 0);
129
130 let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
131
132 let buffer2 = buffer.clone();
133 tokio::spawn(async move {
134 let mut packet: Vec<u8> = vec![0; 4];
135
136 let n = assert_ok!(buffer2.read(&mut packet, None).await);
137 assert_eq!(n, 2, "n must be 2");
138 assert_eq!(&packet[..n], &[0, 1]);
139
140 let result = buffer2.read(&mut packet, None).await;
141 assert!(result.is_err());
142 assert_eq!(result.unwrap_err(), Error::ErrBufferClosed);
143
144 drop(done_tx);
145 });
146
147 // Wait for the reader to start reading.
148 sleep(Duration::from_micros(1)).await;
149
150 // Write once
151 let n = assert_ok!(buffer.write(&[0, 1]).await);
152 assert_eq!(n, 2, "n must be 2");
153
154 // Wait for the reader to start reading again.
155 sleep(Duration::from_micros(1)).await;
156
157 // Close will unblock the reader.
158 buffer.close().await;
159
160 done_rx.recv().await;
161 }
162
163 #[tokio::test]
test_buffer_limit_count()164 async fn test_buffer_limit_count() {
165 let buffer = Buffer::new(2, 0);
166
167 assert_eq!(buffer.count().await, 0);
168
169 // Write twice
170 let n = assert_ok!(buffer.write(&[0, 1]).await);
171 assert_eq!(n, 2, "n must be 2");
172 assert_eq!(buffer.count().await, 1);
173
174 let n = assert_ok!(buffer.write(&[2, 3]).await);
175 assert_eq!(n, 2, "n must be 2");
176 assert_eq!(buffer.count().await, 2);
177
178 // Over capacity
179 let result = buffer.write(&[4, 5]).await;
180 assert!(result.is_err());
181 if let Err(err) = result {
182 assert_eq!(err, Error::ErrBufferFull);
183 }
184 assert_eq!(buffer.count().await, 2);
185
186 // Read once
187 let mut packet: Vec<u8> = vec![0; 4];
188 let n = assert_ok!(buffer.read(&mut packet, None).await);
189 assert_eq!(n, 2, "n must be 2");
190 assert_eq!(&packet[..n], &[0, 1]);
191 assert_eq!(buffer.count().await, 1);
192
193 // Write once
194 let n = assert_ok!(buffer.write(&[6, 7]).await);
195 assert_eq!(n, 2, "n must be 2");
196 assert_eq!(buffer.count().await, 2);
197
198 // Over capacity
199 let result = buffer.write(&[8, 9]).await;
200 assert!(result.is_err());
201 if let Err(err) = result {
202 assert_eq!(Error::ErrBufferFull, err);
203 }
204 assert_eq!(buffer.count().await, 2);
205
206 // Read twice
207 let n = assert_ok!(buffer.read(&mut packet, None).await);
208 assert_eq!(n, 2, "n must be 2");
209 assert_eq!(&packet[..n], &[2, 3]);
210 assert_eq!(buffer.count().await, 1);
211
212 let n = assert_ok!(buffer.read(&mut packet, None).await);
213 assert_eq!(n, 2, "n must be 2");
214 assert_eq!(&packet[..n], &[6, 7]);
215 assert_eq!(buffer.count().await, 0);
216
217 // Nothing left.
218 buffer.close().await;
219 }
220
221 #[tokio::test]
test_buffer_limit_size()222 async fn test_buffer_limit_size() {
223 let buffer = Buffer::new(0, 11);
224
225 assert_eq!(buffer.size().await, 0);
226
227 // Write twice
228 let n = assert_ok!(buffer.write(&[0, 1]).await);
229 assert_eq!(n, 2, "n must be 2");
230 assert_eq!(buffer.size().await, 4);
231
232 let n = assert_ok!(buffer.write(&[2, 3]).await);
233 assert_eq!(n, 2, "n must be 2");
234 assert_eq!(buffer.size().await, 8);
235
236 // Over capacity
237 let result = buffer.write(&[4, 5]).await;
238 assert!(result.is_err());
239 if let Err(err) = result {
240 assert_eq!(Error::ErrBufferFull, err);
241 }
242 assert_eq!(buffer.size().await, 8);
243
244 // Cheeky write at exact size.
245 let n = assert_ok!(buffer.write(&[6]).await);
246 assert_eq!(n, 1, "n must be 1");
247 assert_eq!(buffer.size().await, 11);
248
249 // Read once
250 let mut packet: Vec<u8> = vec![0; 4];
251 let n = assert_ok!(buffer.read(&mut packet, None).await);
252 assert_eq!(n, 2, "n must be 2");
253 assert_eq!(&packet[..n], &[0, 1]);
254 assert_eq!(buffer.size().await, 7);
255
256 // Write once
257 let n = assert_ok!(buffer.write(&[7, 8]).await);
258 assert_eq!(n, 2, "n must be 2");
259 assert_eq!(buffer.size().await, 11);
260
261 // Over capacity
262 let result = buffer.write(&[9, 10]).await;
263 assert!(result.is_err());
264 if let Err(err) = result {
265 assert_eq!(Error::ErrBufferFull, err);
266 }
267 assert_eq!(buffer.size().await, 11);
268
269 // Read everything
270 let n = assert_ok!(buffer.read(&mut packet, None).await);
271 assert_eq!(n, 2, "n must be 2");
272 assert_eq!(&packet[..n], &[2, 3]);
273 assert_eq!(buffer.size().await, 7);
274
275 let n = assert_ok!(buffer.read(&mut packet, None).await);
276 assert_eq!(n, 1, "n must be 1");
277 assert_eq!(&packet[..n], &[6]);
278 assert_eq!(buffer.size().await, 4);
279
280 let n = assert_ok!(buffer.read(&mut packet, None).await);
281 assert_eq!(n, 2, "n must be 2");
282 assert_eq!(&packet[..n], &[7, 8]);
283 assert_eq!(buffer.size().await, 0);
284
285 // Nothing left.
286 buffer.close().await;
287 }
288
289 #[tokio::test]
test_buffer_limit_sizes()290 async fn test_buffer_limit_sizes() {
291 let sizes = vec![
292 128 * 1024,
293 1024 * 1024,
294 8 * 1024 * 1024,
295 0, // default
296 ];
297 const HEADER_SIZE: usize = 2;
298 const PACKET_SIZE: usize = 0x8000;
299
300 for mut size in sizes {
301 let mut name = "default".to_owned();
302 if size > 0 {
303 name = format!("{}kbytes", size / 1024);
304 }
305
306 let buffer = Buffer::new(0, 0);
307 if size == 0 {
308 size = MAX_SIZE;
309 } else {
310 buffer.set_limit_size(size + HEADER_SIZE).await;
311 }
312
313 //assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock
314
315 let n_packets = size / (PACKET_SIZE + HEADER_SIZE);
316 let pkt = vec![0; PACKET_SIZE];
317 for _ in 0..n_packets {
318 assert_ok!(buffer.write(&pkt).await);
319 }
320
321 // Next write is expected to be errored.
322 let result = buffer.write(&pkt).await;
323 assert!(result.is_err(), "{}", name);
324 assert_eq!(result.unwrap_err(), Error::ErrBufferFull, "{name}");
325
326 let mut packet = vec![0; size];
327 for _ in 0..n_packets {
328 let n = assert_ok!(buffer.read(&mut packet, Some(Duration::new(5, 0))).await);
329 assert_eq!(n, PACKET_SIZE, "{name}");
330 }
331 }
332 }
333
334 #[tokio::test]
test_buffer_misc()335 async fn test_buffer_misc() {
336 let buffer = Buffer::new(0, 0);
337
338 // Write once
339 let n = assert_ok!(buffer.write(&[0, 1, 2, 3]).await);
340 assert_eq!(n, 4, "n must be 4");
341
342 // Try to read with a short buffer
343 let mut packet: Vec<u8> = vec![0; 3];
344 let result = buffer.read(&mut packet, None).await;
345 assert!(result.is_err());
346 if let Err(err) = result {
347 assert_eq!(err, Error::ErrBufferShort);
348 }
349
350 // Close
351 buffer.close().await;
352
353 // check is_close
354 assert!(buffer.is_closed().await);
355
356 // Make sure you can Close twice
357 buffer.close().await;
358 }
359