1*46f4d48bSningnao use std::net::SocketAddr; 2*46f4d48bSningnao use std::time::Duration; 388325f54SHarlanC 4*46f4d48bSningnao use async_trait::async_trait; 58e71d710SHarlan use bytes::BufMut; 688325f54SHarlanC use bytes::Bytes; 788325f54SHarlanC use bytes::BytesMut; 888325f54SHarlanC use futures::SinkExt; 9*46f4d48bSningnao use futures::StreamExt; 10*46f4d48bSningnao use tokio::net::TcpStream; 11*46f4d48bSningnao use tokio::net::UdpSocket; 1288325f54SHarlanC use tokio_util::codec::BytesCodec; 1388325f54SHarlanC use tokio_util::codec::Framed; 1488325f54SHarlanC 15*46f4d48bSningnao use super::bytesio_errors::{BytesIOError, BytesIOErrorValue}; 168e71d710SHarlan 1713bac29aSHarlan pub enum NetType { 1813bac29aSHarlan TCP, 1913bac29aSHarlan UDP, 2013bac29aSHarlan } 2113bac29aSHarlan 228e71d710SHarlan #[async_trait] 238e71d710SHarlan pub trait TNetIO: Send + Sync { write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>248e71d710SHarlan async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>; read(&mut self) -> Result<BytesMut, BytesIOError>258e71d710SHarlan async fn read(&mut self) -> Result<BytesMut, BytesIOError>; read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>268e71d710SHarlan async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>; get_net_type(&self) -> NetType2713bac29aSHarlan fn get_net_type(&self) -> NetType; 2888325f54SHarlanC } 2988325f54SHarlanC 308e71d710SHarlan pub struct UdpIO { 318e71d710SHarlan socket: UdpSocket, 328e71d710SHarlan } 338e71d710SHarlan 348e71d710SHarlan impl UdpIO { new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self>3513bac29aSHarlan pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option<Self> { 36b36cf5daSHarlan let remote_address = format!("{remote_domain}:{remote_port}"); 3713bac29aSHarlan log::info!("remote address: {}", remote_address); 38b36cf5daSHarlan let local_address = format!("0.0.0.0:{local_port}"); 3913bac29aSHarlan if let Ok(local_socket) = UdpSocket::bind(local_address).await { 408e71d710SHarlan if let Ok(remote_socket_addr) = remote_address.parse::<SocketAddr>() { 418e71d710SHarlan if let Err(err) = local_socket.connect(remote_socket_addr).await { 428e71d710SHarlan log::info!("connect to remote udp socket error: {}", err); 438e71d710SHarlan } 448e71d710SHarlan } 458e71d710SHarlan return Some(Self { 468e71d710SHarlan socket: local_socket, 478e71d710SHarlan }); 488e71d710SHarlan } 498e71d710SHarlan 508e71d710SHarlan None 518e71d710SHarlan } get_local_port(&self) -> Option<u16>528e71d710SHarlan pub fn get_local_port(&self) -> Option<u16> { 538e71d710SHarlan if let Ok(local_addr) = self.socket.local_addr() { 5413bac29aSHarlan log::info!("local address: {}", local_addr); 558e71d710SHarlan return Some(local_addr.port()); 568e71d710SHarlan } 578e71d710SHarlan 588e71d710SHarlan None 5988325f54SHarlanC } 6088325f54SHarlanC } 6188325f54SHarlanC 628e71d710SHarlan #[async_trait] 638e71d710SHarlan impl TNetIO for UdpIO { get_net_type(&self) -> NetType6413bac29aSHarlan fn get_net_type(&self) -> NetType { 6513bac29aSHarlan NetType::UDP 6613bac29aSHarlan } 6713bac29aSHarlan write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>688e71d710SHarlan async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError> { 698e71d710SHarlan self.socket.send(bytes.as_ref()).await?; 7088325f54SHarlanC Ok(()) 7188325f54SHarlanC } 7288325f54SHarlanC read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>738e71d710SHarlan async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError> { 74*46f4d48bSningnao match tokio::time::timeout(duration, self.read()).await { 75*46f4d48bSningnao Ok(data) => data, 76*46f4d48bSningnao Err(err) => Err(BytesIOError { 77*46f4d48bSningnao value: BytesIOErrorValue::TimeoutError(err), 78*46f4d48bSningnao }) 7988325f54SHarlanC } 8088325f54SHarlanC } 8188325f54SHarlanC read(&mut self) -> Result<BytesMut, BytesIOError>828e71d710SHarlan async fn read(&mut self) -> Result<BytesMut, BytesIOError> { 838e71d710SHarlan let mut buf = vec![0; 4096]; 848e71d710SHarlan let len = self.socket.recv(&mut buf).await?; 858e71d710SHarlan let mut rv = BytesMut::new(); 868e71d710SHarlan rv.put(&buf[..len]); 878e71d710SHarlan 888e71d710SHarlan Ok(rv) 898e71d710SHarlan } 908e71d710SHarlan } 918e71d710SHarlan 928e71d710SHarlan pub struct TcpIO { 938e71d710SHarlan stream: Framed<TcpStream, BytesCodec>, 948e71d710SHarlan //timeout: Duration, 958e71d710SHarlan } 968e71d710SHarlan 978e71d710SHarlan impl TcpIO { new(stream: TcpStream) -> Self988e71d710SHarlan pub fn new(stream: TcpStream) -> Self { 998e71d710SHarlan Self { 1008e71d710SHarlan stream: Framed::new(stream, BytesCodec::new()), 1018e71d710SHarlan // timeout: ms, 1028e71d710SHarlan } 1038e71d710SHarlan } 1048e71d710SHarlan } 1058e71d710SHarlan 1068e71d710SHarlan #[async_trait] 1078e71d710SHarlan impl TNetIO for TcpIO { get_net_type(&self) -> NetType10813bac29aSHarlan fn get_net_type(&self) -> NetType { 10913bac29aSHarlan NetType::TCP 11013bac29aSHarlan } 11113bac29aSHarlan write(&mut self, bytes: Bytes) -> Result<(), BytesIOError>1128e71d710SHarlan async fn write(&mut self, bytes: Bytes) -> Result<(), BytesIOError> { 1138e71d710SHarlan self.stream.send(bytes).await?; 1148e71d710SHarlan 1158e71d710SHarlan Ok(()) 1168e71d710SHarlan } 1178e71d710SHarlan read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError>1188e71d710SHarlan async fn read_timeout(&mut self, duration: Duration) -> Result<BytesMut, BytesIOError> { 1192e1aad50Sningnao match tokio::time::timeout(duration, self.read()).await { 120*46f4d48bSningnao Ok(data) => data, 121*46f4d48bSningnao Err(err) => Err(BytesIOError { 122*46f4d48bSningnao value: BytesIOErrorValue::TimeoutError(err), 1232e1aad50Sningnao }) 1248e71d710SHarlan } 1258e71d710SHarlan } 1268e71d710SHarlan read(&mut self) -> Result<BytesMut, BytesIOError>1278e71d710SHarlan async fn read(&mut self) -> Result<BytesMut, BytesIOError> { 12888325f54SHarlanC let message = self.stream.next().await; 12988325f54SHarlanC 13088325f54SHarlanC match message { 13188325f54SHarlanC Some(data) => match data { 1328e71d710SHarlan Ok(bytes) => Ok(bytes), 1338e71d710SHarlan Err(err) => Err(BytesIOError { 13488325f54SHarlanC value: BytesIOErrorValue::IOError(err), 1358e71d710SHarlan }), 13688325f54SHarlanC }, 1378e71d710SHarlan None => Err(BytesIOError { 13888325f54SHarlanC value: BytesIOErrorValue::NoneReturn, 1398e71d710SHarlan }), 14088325f54SHarlanC } 14188325f54SHarlanC } 14288325f54SHarlanC } 143