1 use super::*; 2 use crate::error::Error; 3 4 use tokio::sync::mpsc; 5 use tokio::time::{sleep, Duration}; 6 use tokio_test::assert_ok; 7 8 #[tokio::test] 9 async fn test_buffer() { 10 let buffer = Buffer::new(0, 0); 11 let mut packet: Vec<u8> = vec![0; 4]; 12 13 // Write once 14 let n = assert_ok!(buffer.write(&[0, 1]).await); 15 assert_eq!(n, 2, "n must be 2"); 16 17 // Read once 18 let n = assert_ok!(buffer.read(&mut packet, None).await); 19 assert_eq!(n, 2, "n must be 2"); 20 assert_eq!(&[0, 1], &packet[..n]); 21 22 // Read deadline 23 let result = buffer.read(&mut packet, Some(Duration::new(0, 1))).await; 24 assert!(result.is_err()); 25 assert_eq!(Error::ErrTimeout, result.unwrap_err()); 26 27 // Write twice 28 let n = assert_ok!(buffer.write(&[2, 3, 4]).await); 29 assert_eq!(n, 3, "n must be 3"); 30 31 let n = assert_ok!(buffer.write(&[5, 6, 7]).await); 32 assert_eq!(n, 3, "n must be 3"); 33 34 // Read twice 35 let n = assert_ok!(buffer.read(&mut packet, None).await); 36 assert_eq!(n, 3, "n must be 3"); 37 assert_eq!(&[2, 3, 4], &packet[..n]); 38 39 let n = assert_ok!(buffer.read(&mut packet, None).await); 40 assert_eq!(n, 3, "n must be 3"); 41 assert_eq!(&[5, 6, 7], &packet[..n]); 42 43 // Write once prior to close. 44 let n = assert_ok!(buffer.write(&[3]).await); 45 assert_eq!(n, 1, "n must be 1"); 46 47 // Close 48 buffer.close().await; 49 50 // Future writes will error 51 let result = buffer.write(&[4]).await; 52 assert!(result.is_err()); 53 54 // But we can read the remaining data. 55 let n = assert_ok!(buffer.read(&mut packet, None).await); 56 assert_eq!(n, 1, "n must be 1"); 57 assert_eq!(&[3], &packet[..n]); 58 59 // Until EOF 60 let result = buffer.read(&mut packet, None).await; 61 assert!(result.is_err()); 62 assert_eq!(Error::ErrBufferClosed, result.unwrap_err()); 63 } 64 65 async fn test_wraparound(grow: bool) { 66 let buffer = Buffer::new(0, 0); 67 { 68 let mut b = buffer.buffer.lock().await; 69 let result = b.grow(); 70 assert!(result.is_ok()); 71 72 b.head = b.data.len() - 13; 73 b.tail = b.head; 74 } 75 76 let p1 = vec![1, 2, 3]; 77 let p2 = vec![4, 5, 6]; 78 let p3 = vec![7, 8, 9]; 79 let p4 = vec![10, 11, 12]; 80 81 assert_ok!(buffer.write(&p1).await); 82 assert_ok!(buffer.write(&p2).await); 83 assert_ok!(buffer.write(&p3).await); 84 85 let mut p = vec![0; 10]; 86 87 let n = assert_ok!(buffer.read(&mut p, None).await); 88 assert_eq!(&p1[..], &p[..n]); 89 90 if grow { 91 let mut b = buffer.buffer.lock().await; 92 let result = b.grow(); 93 assert!(result.is_ok()); 94 } 95 96 let n = assert_ok!(buffer.read(&mut p, None).await); 97 assert_eq!(&p2[..], &p[..n]); 98 99 assert_ok!(buffer.write(&p4).await); 100 101 let n = assert_ok!(buffer.read(&mut p, None).await); 102 assert_eq!(&p3[..], &p[..n]); 103 let n = assert_ok!(buffer.read(&mut p, None).await); 104 assert_eq!(&p4[..], &p[..n]); 105 106 { 107 let b = buffer.buffer.lock().await; 108 if !grow { 109 assert_eq!(b.data.len(), MIN_SIZE); 110 } else { 111 assert_eq!(b.data.len(), 2 * MIN_SIZE); 112 } 113 } 114 } 115 116 #[tokio::test] 117 async fn test_buffer_wraparound() { 118 test_wraparound(false).await; 119 } 120 121 #[tokio::test] 122 async fn test_buffer_wraparound_grow() { 123 test_wraparound(true).await; 124 } 125 126 #[tokio::test] 127 async fn test_buffer_async() { 128 let buffer = Buffer::new(0, 0); 129 130 let (done_tx, mut done_rx) = mpsc::channel::<()>(1); 131 132 let buffer2 = buffer.clone(); 133 tokio::spawn(async move { 134 let mut packet: Vec<u8> = vec![0; 4]; 135 136 let n = assert_ok!(buffer2.read(&mut packet, None).await); 137 assert_eq!(n, 2, "n must be 2"); 138 assert_eq!(&[0, 1], &packet[..n]); 139 140 let result = buffer2.read(&mut packet, None).await; 141 assert!(result.is_err()); 142 assert_eq!(Error::ErrBufferClosed, result.unwrap_err()); 143 144 drop(done_tx); 145 }); 146 147 // Wait for the reader to start reading. 148 sleep(Duration::from_micros(1)).await; 149 150 // Write once 151 let n = assert_ok!(buffer.write(&[0, 1]).await); 152 assert_eq!(n, 2, "n must be 2"); 153 154 // Wait for the reader to start reading again. 155 sleep(Duration::from_micros(1)).await; 156 157 // Close will unblock the reader. 158 buffer.close().await; 159 160 done_rx.recv().await; 161 } 162 163 #[tokio::test] 164 async fn test_buffer_limit_count() { 165 let buffer = Buffer::new(2, 0); 166 167 assert_eq!(0, buffer.count().await); 168 169 // Write twice 170 let n = assert_ok!(buffer.write(&[0, 1]).await); 171 assert_eq!(n, 2, "n must be 2"); 172 assert_eq!(1, buffer.count().await); 173 174 let n = assert_ok!(buffer.write(&[2, 3]).await); 175 assert_eq!(n, 2, "n must be 2"); 176 assert_eq!(2, buffer.count().await); 177 178 // Over capacity 179 let result = buffer.write(&[4, 5]).await; 180 assert!(result.is_err()); 181 if let Err(err) = result { 182 assert_eq!(Error::ErrBufferFull, err); 183 } 184 assert_eq!(2, buffer.count().await); 185 186 // Read once 187 let mut packet: Vec<u8> = vec![0; 4]; 188 let n = assert_ok!(buffer.read(&mut packet, None).await); 189 assert_eq!(n, 2, "n must be 2"); 190 assert_eq!(&[0, 1], &packet[..n]); 191 assert_eq!(1, buffer.count().await); 192 193 // Write once 194 let n = assert_ok!(buffer.write(&[6, 7]).await); 195 assert_eq!(n, 2, "n must be 2"); 196 assert_eq!(2, buffer.count().await); 197 198 // Over capacity 199 let result = buffer.write(&[8, 9]).await; 200 assert!(result.is_err()); 201 if let Err(err) = result { 202 assert_eq!(Error::ErrBufferFull, err); 203 } 204 assert_eq!(2, buffer.count().await); 205 206 // Read twice 207 let n = assert_ok!(buffer.read(&mut packet, None).await); 208 assert_eq!(n, 2, "n must be 2"); 209 assert_eq!(&[2, 3], &packet[..n]); 210 assert_eq!(1, buffer.count().await); 211 212 let n = assert_ok!(buffer.read(&mut packet, None).await); 213 assert_eq!(n, 2, "n must be 2"); 214 assert_eq!(&[6, 7], &packet[..n]); 215 assert_eq!(0, buffer.count().await); 216 217 // Nothing left. 218 buffer.close().await; 219 } 220 221 #[tokio::test] 222 async fn test_buffer_limit_size() { 223 let buffer = Buffer::new(0, 11); 224 225 assert_eq!(0, buffer.size().await); 226 227 // Write twice 228 let n = assert_ok!(buffer.write(&[0, 1]).await); 229 assert_eq!(n, 2, "n must be 2"); 230 assert_eq!(4, buffer.size().await); 231 232 let n = assert_ok!(buffer.write(&[2, 3]).await); 233 assert_eq!(n, 2, "n must be 2"); 234 assert_eq!(8, buffer.size().await); 235 236 // Over capacity 237 let result = buffer.write(&[4, 5]).await; 238 assert!(result.is_err()); 239 if let Err(err) = result { 240 assert_eq!(Error::ErrBufferFull, err); 241 } 242 assert_eq!(8, buffer.size().await); 243 244 // Cheeky write at exact size. 245 let n = assert_ok!(buffer.write(&[6]).await); 246 assert_eq!(n, 1, "n must be 1"); 247 assert_eq!(11, buffer.size().await); 248 249 // Read once 250 let mut packet: Vec<u8> = vec![0; 4]; 251 let n = assert_ok!(buffer.read(&mut packet, None).await); 252 assert_eq!(n, 2, "n must be 2"); 253 assert_eq!(&[0, 1], &packet[..n]); 254 assert_eq!(7, buffer.size().await); 255 256 // Write once 257 let n = assert_ok!(buffer.write(&[7, 8]).await); 258 assert_eq!(n, 2, "n must be 2"); 259 assert_eq!(11, buffer.size().await); 260 261 // Over capacity 262 let result = buffer.write(&[9, 10]).await; 263 assert!(result.is_err()); 264 if let Err(err) = result { 265 assert_eq!(Error::ErrBufferFull, err); 266 } 267 assert_eq!(11, buffer.size().await); 268 269 // Read everything 270 let n = assert_ok!(buffer.read(&mut packet, None).await); 271 assert_eq!(n, 2, "n must be 2"); 272 assert_eq!(&[2, 3], &packet[..n]); 273 assert_eq!(7, buffer.size().await); 274 275 let n = assert_ok!(buffer.read(&mut packet, None).await); 276 assert_eq!(n, 1, "n must be 1"); 277 assert_eq!(&[6], &packet[..n]); 278 assert_eq!(4, buffer.size().await); 279 280 let n = assert_ok!(buffer.read(&mut packet, None).await); 281 assert_eq!(n, 2, "n must be 2"); 282 assert_eq!(&[7, 8], &packet[..n]); 283 assert_eq!(0, buffer.size().await); 284 285 // Nothing left. 286 buffer.close().await; 287 } 288 289 #[tokio::test] 290 async fn test_buffer_limit_sizes() { 291 let sizes = vec![ 292 128 * 1024, 293 1024 * 1024, 294 8 * 1024 * 1024, 295 0, // default 296 ]; 297 const HEADER_SIZE: usize = 2; 298 const PACKET_SIZE: usize = 0x8000; 299 300 for mut size in sizes { 301 let mut name = "default".to_owned(); 302 if size > 0 { 303 name = format!("{}kbytes", size / 1024); 304 } 305 306 let buffer = Buffer::new(0, 0); 307 if size == 0 { 308 size = MAX_SIZE; 309 } else { 310 buffer.set_limit_size(size + HEADER_SIZE).await; 311 } 312 313 //assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock 314 315 let n_packets = size / (PACKET_SIZE + HEADER_SIZE); 316 let pkt = vec![0; PACKET_SIZE]; 317 for _ in 0..n_packets { 318 assert_ok!(buffer.write(&pkt).await); 319 } 320 321 // Next write is expected to be errored. 322 let result = buffer.write(&pkt).await; 323 assert!(result.is_err(), "{}", name); 324 assert_eq!(Error::ErrBufferFull, result.unwrap_err(), "{}", name); 325 326 let mut packet = vec![0; size]; 327 for _ in 0..n_packets { 328 let n = assert_ok!(buffer.read(&mut packet, Some(Duration::new(5, 0))).await); 329 assert_eq!(PACKET_SIZE, n, "{}", name); 330 } 331 } 332 } 333 334 #[tokio::test] 335 async fn test_buffer_misc() { 336 let buffer = Buffer::new(0, 0); 337 338 // Write once 339 let n = assert_ok!(buffer.write(&[0, 1, 2, 3]).await); 340 assert_eq!(n, 4, "n must be 4"); 341 342 // Try to read with a short buffer 343 let mut packet: Vec<u8> = vec![0; 3]; 344 let result = buffer.read(&mut packet, None).await; 345 assert!(result.is_err()); 346 if let Err(err) = result { 347 assert_eq!(Error::ErrBufferShort, err); 348 } 349 350 // Close 351 buffer.close().await; 352 353 // check is_close 354 assert!(buffer.is_closed().await); 355 356 // Make sure you can Close twice 357 buffer.close().await; 358 } 359