1 use super::*;
2
3 use std::io::{Error, ErrorKind};
4 use std::str::FromStr;
5 use tokio::sync::{mpsc, Mutex};
6
7 struct Pipe {
8 rd_rx: Mutex<mpsc::Receiver<Vec<u8>>>,
9 wr_tx: Mutex<mpsc::Sender<Vec<u8>>>,
10 }
11
pipe() -> (impl Conn, impl Conn)12 pub fn pipe() -> (impl Conn, impl Conn) {
13 let (cb1_tx, cb1_rx) = mpsc::channel(16);
14 let (cb2_tx, cb2_rx) = mpsc::channel(16);
15
16 let p1 = Pipe {
17 rd_rx: Mutex::new(cb1_rx),
18 wr_tx: Mutex::new(cb2_tx),
19 };
20
21 let p2 = Pipe {
22 rd_rx: Mutex::new(cb2_rx),
23 wr_tx: Mutex::new(cb1_tx),
24 };
25
26 (p1, p2)
27 }
28
29 #[async_trait]
30 impl Conn for Pipe {
connect(&self, _addr: SocketAddr) -> Result<()>31 async fn connect(&self, _addr: SocketAddr) -> Result<()> {
32 Err(Error::new(ErrorKind::Other, "Not applicable").into())
33 }
34
recv(&self, b: &mut [u8]) -> Result<usize>35 async fn recv(&self, b: &mut [u8]) -> Result<usize> {
36 let mut rd_rx = self.rd_rx.lock().await;
37 let v = match rd_rx.recv().await {
38 Some(v) => v,
39 None => return Err(Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF").into()),
40 };
41 let l = std::cmp::min(v.len(), b.len());
42 b[..l].copy_from_slice(&v[..l]);
43 Ok(l)
44 }
45
recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)>46 async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
47 let n = self.recv(buf).await?;
48 Ok((n, SocketAddr::from_str("0.0.0.0:0")?))
49 }
50
send(&self, b: &[u8]) -> Result<usize>51 async fn send(&self, b: &[u8]) -> Result<usize> {
52 let wr_tx = self.wr_tx.lock().await;
53 match wr_tx.send(b.to_vec()).await {
54 Ok(_) => {}
55 Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string()).into()),
56 };
57 Ok(b.len())
58 }
59
send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize>60 async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize> {
61 Err(Error::new(ErrorKind::Other, "Not applicable").into())
62 }
63
local_addr(&self) -> Result<SocketAddr>64 fn local_addr(&self) -> Result<SocketAddr> {
65 Err(Error::new(ErrorKind::AddrNotAvailable, "Addr Not Available").into())
66 }
67
remote_addr(&self) -> Option<SocketAddr>68 fn remote_addr(&self) -> Option<SocketAddr> {
69 None
70 }
71
close(&self) -> Result<()>72 async fn close(&self) -> Result<()> {
73 Ok(())
74 }
75 }
76