1 #![warn(rust_2018_idioms)] 2 #![allow(dead_code)] 3 4 pub mod utilities; 5 6 use std::collections::HashMap; 7 use std::io::{BufRead, BufReader}; 8 use std::sync::Arc; 9 use tokio::sync::Mutex; 10 use util::Conn; 11 12 use dtls::Error; 13 14 const BUF_SIZE: usize = 8192; 15 16 /// Hub is a helper to handle one to many chat 17 #[derive(Default)] 18 pub struct Hub { 19 conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, 20 } 21 22 impl Hub { 23 /// new builds a new hub new() -> Self24 pub fn new() -> Self { 25 Hub { 26 conns: Arc::new(Mutex::new(HashMap::new())), 27 } 28 } 29 30 /// register adds a new conn to the Hub register(&self, conn: Arc<dyn Conn + Send + Sync>)31 pub async fn register(&self, conn: Arc<dyn Conn + Send + Sync>) { 32 println!("Connected to {}", conn.remote_addr().unwrap()); 33 34 if let Some(remote_addr) = conn.remote_addr() { 35 let mut conns = self.conns.lock().await; 36 conns.insert(remote_addr.to_string(), Arc::clone(&conn)); 37 } 38 39 let conns = Arc::clone(&self.conns); 40 tokio::spawn(async move { 41 let _ = Hub::read_loop(conns, conn).await; 42 }); 43 } 44 read_loop( conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, conn: Arc<dyn Conn + Send + Sync>, ) -> Result<(), Error>45 async fn read_loop( 46 conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, 47 conn: Arc<dyn Conn + Send + Sync>, 48 ) -> Result<(), Error> { 49 let mut b = vec![0u8; BUF_SIZE]; 50 51 while let Ok(n) = conn.recv(&mut b).await { 52 let msg = String::from_utf8(b[..n].to_vec())?; 53 print!("Got message: {msg}"); 54 } 55 56 Hub::unregister(conns, conn).await 57 } 58 unregister( conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, conn: Arc<dyn Conn + Send + Sync>, ) -> Result<(), Error>59 async fn unregister( 60 conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, 61 conn: Arc<dyn Conn + Send + Sync>, 62 ) -> Result<(), Error> { 63 if let Some(remote_addr) = conn.remote_addr() { 64 { 65 let mut cs = conns.lock().await; 66 cs.remove(&remote_addr.to_string()); 67 } 68 69 if let Err(err) = conn.close().await { 70 println!("Failed to disconnect: {remote_addr} with err {err}"); 71 } else { 72 println!("Disconnected: {remote_addr} "); 73 } 74 } 75 76 Ok(()) 77 } 78 broadcast(&self, msg: &[u8])79 async fn broadcast(&self, msg: &[u8]) { 80 let conns = self.conns.lock().await; 81 for conn in conns.values() { 82 if let Err(err) = conn.send(msg).await { 83 println!( 84 "Failed to write message to {:?}: {}", 85 conn.remote_addr(), 86 err 87 ); 88 } 89 } 90 } 91 92 /// Chat starts the stdin readloop to dispatch messages to the hub chat(&self)93 pub async fn chat(&self) { 94 let input = std::io::stdin(); 95 let mut reader = BufReader::new(input.lock()); 96 loop { 97 let mut msg = String::new(); 98 match reader.read_line(&mut msg) { 99 Ok(0) => return, 100 Err(err) => { 101 println!("stdin read err: {err}"); 102 return; 103 } 104 _ => {} 105 }; 106 if msg.trim() == "exit" { 107 return; 108 } 109 self.broadcast(msg.as_bytes()).await; 110 } 111 } 112 } 113