1 use super::*; 2 3 use tokio::time::{sleep, Duration}; 4 use tokio_test::assert_ok; 5 6 #[tokio::test] 7 async fn test_buffer() { 8 let mut buffer = Buffer::new(0, 0); 9 let mut packet: Vec<u8> = vec![0; 4]; 10 11 // Write once 12 let n = assert_ok!(buffer.write(&[0, 1]).await); 13 assert_eq!(n, 2, "n must be 2"); 14 15 // Read once 16 let n = assert_ok!(buffer.read(&mut packet, None).await); 17 assert_eq!(n, 2, "n must be 2"); 18 assert_eq!(&[0, 1], &packet[..n]); 19 20 // Read deadline 21 let result = buffer.read(&mut packet, Some(Duration::new(0, 1))).await; 22 assert!(result.is_err()); 23 assert_eq!(result.unwrap_err(), ERR_TIMEOUT.clone()); 24 25 // Write twice 26 let n = assert_ok!(buffer.write(&[2, 3, 4]).await); 27 assert_eq!(n, 3, "n must be 3"); 28 29 let n = assert_ok!(buffer.write(&[5, 6, 7]).await); 30 assert_eq!(n, 3, "n must be 3"); 31 32 // Read twice 33 let n = assert_ok!(buffer.read(&mut packet, None).await); 34 assert_eq!(n, 3, "n must be 3"); 35 assert_eq!(&[2, 3, 4], &packet[..n]); 36 37 let n = assert_ok!(buffer.read(&mut packet, None).await); 38 assert_eq!(n, 3, "n must be 3"); 39 assert_eq!(&[5, 6, 7], &packet[..n]); 40 41 // Write once prior to close. 42 let n = assert_ok!(buffer.write(&[3]).await); 43 assert_eq!(n, 1, "n must be 1"); 44 45 // Close 46 buffer.close().await; 47 48 // Future writes will error 49 let result = buffer.write(&[4]).await; 50 assert!(result.is_err()); 51 52 // But we can read the remaining data. 53 let n = assert_ok!(buffer.read(&mut packet, None).await); 54 assert_eq!(n, 1, "n must be 1"); 55 assert_eq!(&[3], &packet[..n]); 56 57 // Until EOF 58 let result = buffer.read(&mut packet, None).await; 59 assert!(result.is_err()); 60 assert_eq!(result.unwrap_err(), ERR_BUFFER_CLOSED.clone()); 61 } 62 63 async fn test_wraparound(grow: bool) { 64 let mut buffer = Buffer::new(0, 0); 65 { 66 let mut b = buffer.buffer.lock().await; 67 let result = b.grow(); 68 assert!(result.is_ok()); 69 70 b.head = b.data.len() - 13; 71 b.tail = b.head; 72 } 73 74 let p1 = vec![1, 2, 3]; 75 let p2 = vec![4, 5, 6]; 76 let p3 = vec![7, 8, 9]; 77 let p4 = vec![10, 11, 12]; 78 79 assert_ok!(buffer.write(&p1).await); 80 assert_ok!(buffer.write(&p2).await); 81 assert_ok!(buffer.write(&p3).await); 82 83 let mut p = vec![0; 10]; 84 85 let n = assert_ok!(buffer.read(&mut p, None).await); 86 assert_eq!(&p1[..], &p[..n]); 87 88 if grow { 89 let mut b = buffer.buffer.lock().await; 90 let result = b.grow(); 91 assert!(result.is_ok()); 92 } 93 94 let n = assert_ok!(buffer.read(&mut p, None).await); 95 assert_eq!(&p2[..], &p[..n]); 96 97 assert_ok!(buffer.write(&p4).await); 98 99 let n = assert_ok!(buffer.read(&mut p, None).await); 100 assert_eq!(&p3[..], &p[..n]); 101 let n = assert_ok!(buffer.read(&mut p, None).await); 102 assert_eq!(&p4[..], &p[..n]); 103 104 { 105 let b = buffer.buffer.lock().await; 106 if !grow { 107 assert_eq!(b.data.len(), MIN_SIZE); 108 } else { 109 assert_eq!(b.data.len(), 2 * MIN_SIZE); 110 } 111 } 112 } 113 114 #[tokio::test] 115 async fn test_buffer_wraparound() { 116 test_wraparound(false).await; 117 } 118 119 #[tokio::test] 120 async fn test_buffer_wraparound_grow() { 121 test_wraparound(true).await; 122 } 123 124 #[tokio::test] 125 async fn test_buffer_async() { 126 let mut buffer = Buffer::new(0, 0); 127 128 let (done_tx, mut done_rx) = mpsc::channel::<()>(1); 129 130 let mut buffer2 = buffer.clone(); 131 tokio::spawn(async move { 132 let mut packet: Vec<u8> = vec![0; 4]; 133 134 let n = assert_ok!(buffer2.read(&mut packet, None).await); 135 assert_eq!(n, 2, "n must be 2"); 136 assert_eq!(&[0, 1], &packet[..n]); 137 138 let result = buffer2.read(&mut packet, None).await; 139 assert!(result.is_err()); 140 assert_eq!(result.unwrap_err(), ERR_BUFFER_CLOSED.clone()); 141 142 drop(done_tx); 143 }); 144 145 // Wait for the reader to start reading. 146 sleep(Duration::from_micros(1)).await; 147 148 // Write once 149 let n = assert_ok!(buffer.write(&[0, 1]).await); 150 assert_eq!(n, 2, "n must be 2"); 151 152 // Wait for the reader to start reading again. 153 sleep(Duration::from_micros(1)).await; 154 155 // Close will unblock the reader. 156 buffer.close().await; 157 158 done_rx.recv().await; 159 } 160 161 #[tokio::test] 162 async fn test_buffer_limit_count() { 163 let mut buffer = Buffer::new(2, 0); 164 165 assert_eq!(0, buffer.count().await); 166 167 // Write twice 168 let n = assert_ok!(buffer.write(&[0, 1]).await); 169 assert_eq!(n, 2, "n must be 2"); 170 assert_eq!(1, buffer.count().await); 171 172 let n = assert_ok!(buffer.write(&[2, 3]).await); 173 assert_eq!(n, 2, "n must be 2"); 174 assert_eq!(2, buffer.count().await); 175 176 // Over capacity 177 let result = buffer.write(&[4, 5]).await; 178 assert!(result.is_err()); 179 if let Err(err) = result { 180 assert_eq!(err, ERR_BUFFER_FULL.clone()); 181 } 182 assert_eq!(2, buffer.count().await); 183 184 // Read once 185 let mut packet: Vec<u8> = vec![0; 4]; 186 let n = assert_ok!(buffer.read(&mut packet, None).await); 187 assert_eq!(n, 2, "n must be 2"); 188 assert_eq!(&[0, 1], &packet[..n]); 189 assert_eq!(1, buffer.count().await); 190 191 // Write once 192 let n = assert_ok!(buffer.write(&[6, 7]).await); 193 assert_eq!(n, 2, "n must be 2"); 194 assert_eq!(2, buffer.count().await); 195 196 // Over capacity 197 let result = buffer.write(&[8, 9]).await; 198 assert!(result.is_err()); 199 if let Err(err) = result { 200 assert_eq!(err, ERR_BUFFER_FULL.clone()); 201 } 202 assert_eq!(2, buffer.count().await); 203 204 // Read twice 205 let n = assert_ok!(buffer.read(&mut packet, None).await); 206 assert_eq!(n, 2, "n must be 2"); 207 assert_eq!(&[2, 3], &packet[..n]); 208 assert_eq!(1, buffer.count().await); 209 210 let n = assert_ok!(buffer.read(&mut packet, None).await); 211 assert_eq!(n, 2, "n must be 2"); 212 assert_eq!(&[6, 7], &packet[..n]); 213 assert_eq!(0, buffer.count().await); 214 215 // Nothing left. 216 buffer.close().await; 217 } 218 219 #[tokio::test] 220 async fn test_buffer_limit_size() { 221 let mut buffer = Buffer::new(0, 11); 222 223 assert_eq!(0, buffer.size().await); 224 225 // Write twice 226 let n = assert_ok!(buffer.write(&[0, 1]).await); 227 assert_eq!(n, 2, "n must be 2"); 228 assert_eq!(4, buffer.size().await); 229 230 let n = assert_ok!(buffer.write(&[2, 3]).await); 231 assert_eq!(n, 2, "n must be 2"); 232 assert_eq!(8, buffer.size().await); 233 234 // Over capacity 235 let result = buffer.write(&[4, 5]).await; 236 assert!(result.is_err()); 237 if let Err(err) = result { 238 assert_eq!(err, ERR_BUFFER_FULL.clone()); 239 } 240 assert_eq!(8, buffer.size().await); 241 242 // Cheeky write at exact size. 243 let n = assert_ok!(buffer.write(&[6]).await); 244 assert_eq!(n, 1, "n must be 1"); 245 assert_eq!(11, buffer.size().await); 246 247 // Read once 248 let mut packet: Vec<u8> = vec![0; 4]; 249 let n = assert_ok!(buffer.read(&mut packet, None).await); 250 assert_eq!(n, 2, "n must be 2"); 251 assert_eq!(&[0, 1], &packet[..n]); 252 assert_eq!(7, buffer.size().await); 253 254 // Write once 255 let n = assert_ok!(buffer.write(&[7, 8]).await); 256 assert_eq!(n, 2, "n must be 2"); 257 assert_eq!(11, buffer.size().await); 258 259 // Over capacity 260 let result = buffer.write(&[9, 10]).await; 261 assert!(result.is_err()); 262 if let Err(err) = result { 263 assert_eq!(err, ERR_BUFFER_FULL.clone()); 264 } 265 assert_eq!(11, buffer.size().await); 266 267 // Read everything 268 let n = assert_ok!(buffer.read(&mut packet, None).await); 269 assert_eq!(n, 2, "n must be 2"); 270 assert_eq!(&[2, 3], &packet[..n]); 271 assert_eq!(7, buffer.size().await); 272 273 let n = assert_ok!(buffer.read(&mut packet, None).await); 274 assert_eq!(n, 1, "n must be 1"); 275 assert_eq!(&[6], &packet[..n]); 276 assert_eq!(4, buffer.size().await); 277 278 let n = assert_ok!(buffer.read(&mut packet, None).await); 279 assert_eq!(n, 2, "n must be 2"); 280 assert_eq!(&[7, 8], &packet[..n]); 281 assert_eq!(0, buffer.size().await); 282 283 // Nothing left. 284 buffer.close().await; 285 } 286 287 #[tokio::test] 288 async fn test_buffer_limit_sizes() { 289 let sizes = vec![ 290 128 * 1024, 291 1024 * 1024, 292 8 * 1024 * 1024, 293 0, // default 294 ]; 295 const HEADER_SIZE: usize = 2; 296 const PACKET_SIZE: usize = 0x8000; 297 298 for mut size in sizes { 299 let mut name = "default".to_owned(); 300 if size > 0 { 301 name = format!("{}kbytes", size / 1024); 302 } 303 304 let mut buffer = Buffer::new(0, 0); 305 if size == 0 { 306 size = MAX_SIZE; 307 } else { 308 buffer.set_limit_size(size + HEADER_SIZE).await; 309 } 310 311 //assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock 312 313 let n_packets = size / (PACKET_SIZE + HEADER_SIZE); 314 let pkt = vec![0; PACKET_SIZE]; 315 for _ in 0..n_packets { 316 assert_ok!(buffer.write(&pkt).await); 317 } 318 319 // Next write is expected to be errored. 320 let result = buffer.write(&pkt).await; 321 assert!(result.is_err(), "{}", name); 322 assert_eq!(result.unwrap_err(), ERR_BUFFER_FULL.clone(), "{}", name); 323 324 let mut packet = vec![0; size]; 325 for _ in 0..n_packets { 326 let n = assert_ok!(buffer.read(&mut packet, Some(Duration::new(5, 0))).await); 327 assert_eq!(PACKET_SIZE, n, "{}", name); 328 } 329 } 330 } 331 332 #[tokio::test] 333 async fn test_buffer_misc() { 334 let mut buffer = Buffer::new(0, 0); 335 336 // Write once 337 let n = assert_ok!(buffer.write(&[0, 1, 2, 3]).await); 338 assert_eq!(n, 4, "n must be 4"); 339 340 // Try to read with a short buffer 341 let mut packet: Vec<u8> = vec![0; 3]; 342 let result = buffer.read(&mut packet, None).await; 343 assert!(result.is_err()); 344 if let Err(err) = result { 345 assert_eq!(err, ERR_BUFFER_SHORT.clone()); 346 } 347 348 // Close 349 buffer.close().await; 350 351 // check is_close 352 assert!(buffer.is_closed().await); 353 354 // Make sure you can Close twice 355 buffer.close().await; 356 } 357