xref: /xiu/library/bytesio/src/bytesio.rs (revision 46f4d48b)
1*46f4d48bSningnao use std::net::SocketAddr;
2*46f4d48bSningnao use std::time::Duration;
388325f54SHarlanC 
4*46f4d48bSningnao use async_trait::async_trait;
58e71d710SHarlan use bytes::BufMut;
688325f54SHarlanC use bytes::Bytes;
788325f54SHarlanC use bytes::BytesMut;
888325f54SHarlanC use futures::SinkExt;
9*46f4d48bSningnao use futures::StreamExt;
10*46f4d48bSningnao use tokio::net::TcpStream;
11*46f4d48bSningnao use tokio::net::UdpSocket;
1288325f54SHarlanC use tokio_util::codec::BytesCodec;
1388325f54SHarlanC use tokio_util::codec::Framed;
1488325f54SHarlanC 
15*46f4d48bSningnao use super::bytesio_errors::{BytesIOError, BytesIOErrorValue};
168e71d710SHarlan 
1713bac29aSHarlan pub enum NetType {
1813bac29aSHarlan     TCP,
1913bac29aSHarlan     UDP,
2013bac29aSHarlan }
2113bac29aSHarlan 
228e71d710SHarlan #[async_trait]
238e71d710SHarlan pub trait TNetIO: Send + Sync {
write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>248e71d710SHarlan     async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>;
read(&mut self) -> Result<BytesMut, BytesIOError>258e71d710SHarlan     async fn read(&mut self) -> Result<BytesMut, BytesIOError>;
read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>268e71d710SHarlan     async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>;
get_net_type(&self) -> NetType2713bac29aSHarlan     fn get_net_type(&self) -> NetType;
2888325f54SHarlanC }
2988325f54SHarlanC 
308e71d710SHarlan pub struct UdpIO {
318e71d710SHarlan     socket: UdpSocket,
328e71d710SHarlan }
338e71d710SHarlan 
348e71d710SHarlan impl UdpIO {
new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self>3513bac29aSHarlan     pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self> {
36b36cf5daSHarlan         let remote_address = format!("{remote_domain}:{remote_port}");
3713bac29aSHarlan         log::info!("remote address: {}", remote_address);
38b36cf5daSHarlan         let local_address = format!("0.0.0.0:{local_port}");
3913bac29aSHarlan         if let Ok(local_socket) = UdpSocket::bind(local_address).await {
408e71d710SHarlan             if let Ok(remote_socket_addr) = remote_address.parse::<SocketAddr>() {
418e71d710SHarlan                 if let Err(err) = local_socket.connect(remote_socket_addr).await {
428e71d710SHarlan                     log::info!("connect to remote udp socket error: {}", err);
438e71d710SHarlan                 }
448e71d710SHarlan             }
458e71d710SHarlan             return Some(Self {
468e71d710SHarlan                 socket: local_socket,
478e71d710SHarlan             });
488e71d710SHarlan         }
498e71d710SHarlan 
508e71d710SHarlan         None
518e71d710SHarlan     }
get_local_port(&self) -> Option<u16>528e71d710SHarlan     pub fn get_local_port(&self) -> Option<u16> {
538e71d710SHarlan         if let Ok(local_addr) = self.socket.local_addr() {
5413bac29aSHarlan             log::info!("local address: {}", local_addr);
558e71d710SHarlan             return Some(local_addr.port());
568e71d710SHarlan         }
578e71d710SHarlan 
588e71d710SHarlan         None
5988325f54SHarlanC     }
6088325f54SHarlanC }
6188325f54SHarlanC 
628e71d710SHarlan #[async_trait]
638e71d710SHarlan impl TNetIO for UdpIO {
get_net_type(&self) -> NetType6413bac29aSHarlan     fn get_net_type(&self) -> NetType {
6513bac29aSHarlan         NetType::UDP
6613bac29aSHarlan     }
6713bac29aSHarlan 
write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>688e71d710SHarlan     async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError> {
698e71d710SHarlan         self.socket.send(bytes.as_ref()).await?;
7088325f54SHarlanC         Ok(())
7188325f54SHarlanC     }
7288325f54SHarlanC 
read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>738e71d710SHarlan     async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError> {
74*46f4d48bSningnao         match tokio::time::timeout(duration, self.read()).await {
75*46f4d48bSningnao             Ok(data) => data,
76*46f4d48bSningnao             Err(err) => Err(BytesIOError {
77*46f4d48bSningnao                 value: BytesIOErrorValue::TimeoutError(err),
78*46f4d48bSningnao             })
7988325f54SHarlanC         }
8088325f54SHarlanC     }
8188325f54SHarlanC 
read(&mut self) -> Result<BytesMut, BytesIOError>828e71d710SHarlan     async fn read(&mut self) -> Result<BytesMut, BytesIOError> {
838e71d710SHarlan         let mut buf = vec![0; 4096];
848e71d710SHarlan         let len = self.socket.recv(&mut buf).await?;
858e71d710SHarlan         let mut rv = BytesMut::new();
868e71d710SHarlan         rv.put(&buf[..len]);
878e71d710SHarlan 
888e71d710SHarlan         Ok(rv)
898e71d710SHarlan     }
908e71d710SHarlan }
918e71d710SHarlan 
928e71d710SHarlan pub struct TcpIO {
938e71d710SHarlan     stream: Framed<TcpStream, BytesCodec>,
948e71d710SHarlan     //timeout: Duration,
958e71d710SHarlan }
968e71d710SHarlan 
978e71d710SHarlan impl TcpIO {
new(stream: TcpStream) -> Self988e71d710SHarlan     pub fn new(stream: TcpStream) -> Self {
998e71d710SHarlan         Self {
1008e71d710SHarlan             stream: Framed::new(stream, BytesCodec::new()),
1018e71d710SHarlan             // timeout: ms,
1028e71d710SHarlan         }
1038e71d710SHarlan     }
1048e71d710SHarlan }
1058e71d710SHarlan 
1068e71d710SHarlan #[async_trait]
1078e71d710SHarlan impl TNetIO for TcpIO {
get_net_type(&self) -> NetType10813bac29aSHarlan     fn get_net_type(&self) -> NetType {
10913bac29aSHarlan         NetType::TCP
11013bac29aSHarlan     }
11113bac29aSHarlan 
write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>1128e71d710SHarlan     async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError> {
1138e71d710SHarlan         self.stream.send(bytes).await?;
1148e71d710SHarlan 
1158e71d710SHarlan         Ok(())
1168e71d710SHarlan     }
1178e71d710SHarlan 
read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>1188e71d710SHarlan     async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError> {
1192e1aad50Sningnao         match tokio::time::timeout(duration, self.read()).await {
120*46f4d48bSningnao             Ok(data) => data,
121*46f4d48bSningnao             Err(err) => Err(BytesIOError {
122*46f4d48bSningnao                 value: BytesIOErrorValue::TimeoutError(err),
1232e1aad50Sningnao             })
1248e71d710SHarlan         }
1258e71d710SHarlan     }
1268e71d710SHarlan 
read(&mut self) -> Result<BytesMut, BytesIOError>1278e71d710SHarlan     async fn read(&mut self) -> Result<BytesMut, BytesIOError> {
12888325f54SHarlanC         let message = self.stream.next().await;
12988325f54SHarlanC 
13088325f54SHarlanC         match message {
13188325f54SHarlanC             Some(data) => match data {
1328e71d710SHarlan                 Ok(bytes) => Ok(bytes),
1338e71d710SHarlan                 Err(err) => Err(BytesIOError {
13488325f54SHarlanC                     value: BytesIOErrorValue::IOError(err),
1358e71d710SHarlan                 }),
13688325f54SHarlanC             },
1378e71d710SHarlan             None => Err(BytesIOError {
13888325f54SHarlanC                 value: BytesIOErrorValue::NoneReturn,
1398e71d710SHarlan             }),
14088325f54SHarlanC         }
14188325f54SHarlanC     }
14288325f54SHarlanC }
143