xref: /webrtc/util/src/conn/conn_pipe.rs (revision 5b79f08a)
1 use super::*;
2 
3 use std::io::{Error, ErrorKind};
4 use std::str::FromStr;
5 use tokio::sync::{mpsc, Mutex};
6 
7 struct Pipe {
8     rd_rx: Mutex<mpsc::Receiver<Vec<u8>>>,
9     wr_tx: Mutex<mpsc::Sender<Vec<u8>>>,
10 }
11 
pipe() -> (impl Conn, impl Conn)12 pub fn pipe() -> (impl Conn, impl Conn) {
13     let (cb1_tx, cb1_rx) = mpsc::channel(16);
14     let (cb2_tx, cb2_rx) = mpsc::channel(16);
15 
16     let p1 = Pipe {
17         rd_rx: Mutex::new(cb1_rx),
18         wr_tx: Mutex::new(cb2_tx),
19     };
20 
21     let p2 = Pipe {
22         rd_rx: Mutex::new(cb2_rx),
23         wr_tx: Mutex::new(cb1_tx),
24     };
25 
26     (p1, p2)
27 }
28 
29 #[async_trait]
30 impl Conn for Pipe {
connect(&self, _addr: SocketAddr) -> Result<()>31     async fn connect(&self, _addr: SocketAddr) -> Result<()> {
32         Err(Error::new(ErrorKind::Other, "Not applicable").into())
33     }
34 
recv(&self, b: &mut [u8]) -> Result<usize>35     async fn recv(&self, b: &mut [u8]) -> Result<usize> {
36         let mut rd_rx = self.rd_rx.lock().await;
37         let v = match rd_rx.recv().await {
38             Some(v) => v,
39             None => return Err(Error::new(ErrorKind::UnexpectedEof, "Unexpected EOF").into()),
40         };
41         let l = std::cmp::min(v.len(), b.len());
42         b[..l].copy_from_slice(&v[..l]);
43         Ok(l)
44     }
45 
recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)>46     async fn recv_from(&self, buf: &mut [u8]) -> Result<(usize, SocketAddr)> {
47         let n = self.recv(buf).await?;
48         Ok((n, SocketAddr::from_str("0.0.0.0:0")?))
49     }
50 
send(&self, b: &[u8]) -> Result<usize>51     async fn send(&self, b: &[u8]) -> Result<usize> {
52         let wr_tx = self.wr_tx.lock().await;
53         match wr_tx.send(b.to_vec()).await {
54             Ok(_) => {}
55             Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string()).into()),
56         };
57         Ok(b.len())
58     }
59 
send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize>60     async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize> {
61         Err(Error::new(ErrorKind::Other, "Not applicable").into())
62     }
63 
local_addr(&self) -> Result<SocketAddr>64     fn local_addr(&self) -> Result<SocketAddr> {
65         Err(Error::new(ErrorKind::AddrNotAvailable, "Addr Not Available").into())
66     }
67 
remote_addr(&self) -> Option<SocketAddr>68     fn remote_addr(&self) -> Option<SocketAddr> {
69         None
70     }
71 
close(&self) -> Result<()>72     async fn close(&self) -> Result<()> {
73         Ok(())
74     }
75 }
76