1 use super::*; 2 3 use tokio::prelude::*; 4 use tokio_test::assert_ok; 5 use tokio::timer::delay; 6 7 use std::time::Duration; 8 9 #[tokio::test] 10 async fn test_buffer() { 11 let mut buffer = Buffer::new(0, 0); 12 let mut packet: Vec<u8> = vec![0; 4]; 13 14 // Write once 15 let n = assert_ok!(buffer.write(&[0, 1]).await); 16 assert_eq!(n, 2, "n must be 2"); 17 18 // Read once 19 let n = assert_ok!(buffer.read(&mut packet).await); 20 assert_eq!(n, 2, "n must be 2"); 21 assert_eq!(&[0, 1], &packet[..n]); 22 23 // Write twice 24 let n = assert_ok!(buffer.write(&[2, 3, 4]).await); 25 assert_eq!(n, 3, "n must be 3"); 26 27 let n = assert_ok!(buffer.write(&[5, 6, 7]).await); 28 assert_eq!(n, 3, "n must be 3"); 29 30 // Read twice 31 let n = assert_ok!(buffer.read(&mut packet).await); 32 assert_eq!(n, 3, "n must be 3"); 33 assert_eq!(&[2, 3, 4], &packet[..n]); 34 35 let n = assert_ok!(buffer.read(&mut packet).await); 36 assert_eq!(n, 3, "n must be 3"); 37 assert_eq!(&[5, 6, 7], &packet[..n]); 38 39 // Write once prior to close. 40 let n = assert_ok!(buffer.write(&[3]).await); 41 assert_eq!(n, 1, "n must be 1"); 42 43 // Close 44 buffer.close().await; 45 46 // Future writes will error 47 let result = buffer.write(&[4]).await; 48 assert!(result.is_err()); 49 50 // But we can read the remaining data. 51 let n = assert_ok!(buffer.read(&mut packet).await); 52 assert_eq!(n, 1, "n must be 1"); 53 assert_eq!(&[3], &packet[..n]); 54 55 // Until EOF 56 let result = buffer.read(&mut packet).await; 57 assert!(result.is_err()); 58 if let Err(err) = result { 59 assert_eq!(err, ERR_BUFFER_CLOSED.clone()); 60 } 61 } 62 63 #[tokio::test] 64 async fn test_buffer_async() { 65 let mut buffer = Buffer::new(0, 0); 66 67 let (done_tx, mut done_rx) = mpsc::channel::<()>(1); 68 69 let mut buffer2 = buffer.clone(); 70 tokio::spawn(async move { 71 let mut packet:Vec<u8> = vec![0; 4]; 72 73 let n = assert_ok!(buffer2.read(&mut packet).await); 74 assert_eq!(n, 2, "n must be 2"); 75 assert_eq!(&[0, 1], &packet[..n]); 76 77 let result = buffer2.read(&mut packet).await; 78 assert!(result.is_err()); 79 if let Err(err) = result { 80 assert_eq!(err, ERR_BUFFER_CLOSED.clone()); 81 } 82 83 drop(done_tx); 84 }); 85 86 // Wait for the reader to start reading. 87 let when = tokio::clock::now() + Duration::from_millis(1); 88 delay(when).await; 89 90 // Write once 91 let n = assert_ok!(buffer.write(&[0, 1]).await); 92 assert_eq!(n, 2, "n must be 2"); 93 94 // Wait for the reader to start reading again. 95 let when = tokio::clock::now() + Duration::from_millis(1); 96 delay(when).await; 97 98 // Close will unblock the reader. 99 buffer.close().await; 100 101 done_rx.recv().await; 102 }