xref: /webrtc/dtls/examples/hub/src/lib.rs (revision 5d8fe953)
1 #![warn(rust_2018_idioms)]
2 #![allow(dead_code)]
3 
4 pub mod utilities;
5 
6 use std::collections::HashMap;
7 use std::io::{BufRead, BufReader};
8 use std::sync::Arc;
9 use tokio::sync::Mutex;
10 use util::Conn;
11 
12 use dtls::Error;
13 
14 const BUF_SIZE: usize = 8192;
15 
16 /// Hub is a helper to handle one to many chat
17 #[derive(Default)]
18 pub struct Hub {
19     conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>,
20 }
21 
22 impl Hub {
23     /// new builds a new hub
new() -> Self24     pub fn new() -> Self {
25         Hub {
26             conns: Arc::new(Mutex::new(HashMap::new())),
27         }
28     }
29 
30     /// register adds a new conn to the Hub
register(&self, conn: Arc<dyn Conn + Send + Sync>)31     pub async fn register(&self, conn: Arc<dyn Conn + Send + Sync>) {
32         println!("Connected to {}", conn.remote_addr().unwrap());
33 
34         if let Some(remote_addr) = conn.remote_addr() {
35             let mut conns = self.conns.lock().await;
36             conns.insert(remote_addr.to_string(), Arc::clone(&conn));
37         }
38 
39         let conns = Arc::clone(&self.conns);
40         tokio::spawn(async move {
41             let _ = Hub::read_loop(conns, conn).await;
42         });
43     }
44 
read_loop( conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, conn: Arc<dyn Conn + Send + Sync>, ) -> Result<(), Error>45     async fn read_loop(
46         conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>,
47         conn: Arc<dyn Conn + Send + Sync>,
48     ) -> Result<(), Error> {
49         let mut b = vec![0u8; BUF_SIZE];
50 
51         while let Ok(n) = conn.recv(&mut b).await {
52             let msg = String::from_utf8(b[..n].to_vec())?;
53             print!("Got message: {msg}");
54         }
55 
56         Hub::unregister(conns, conn).await
57     }
58 
unregister( conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>, conn: Arc<dyn Conn + Send + Sync>, ) -> Result<(), Error>59     async fn unregister(
60         conns: Arc<Mutex<HashMap<String, Arc<dyn Conn + Send + Sync>>>>,
61         conn: Arc<dyn Conn + Send + Sync>,
62     ) -> Result<(), Error> {
63         if let Some(remote_addr) = conn.remote_addr() {
64             {
65                 let mut cs = conns.lock().await;
66                 cs.remove(&remote_addr.to_string());
67             }
68 
69             if let Err(err) = conn.close().await {
70                 println!("Failed to disconnect: {remote_addr} with err {err}");
71             } else {
72                 println!("Disconnected: {remote_addr} ");
73             }
74         }
75 
76         Ok(())
77     }
78 
broadcast(&self, msg: &[u8])79     async fn broadcast(&self, msg: &[u8]) {
80         let conns = self.conns.lock().await;
81         for conn in conns.values() {
82             if let Err(err) = conn.send(msg).await {
83                 println!(
84                     "Failed to write message to {:?}: {}",
85                     conn.remote_addr(),
86                     err
87                 );
88             }
89         }
90     }
91 
92     /// Chat starts the stdin readloop to dispatch messages to the hub
chat(&self)93     pub async fn chat(&self) {
94         let input = std::io::stdin();
95         let mut reader = BufReader::new(input.lock());
96         loop {
97             let mut msg = String::new();
98             match reader.read_line(&mut msg) {
99                 Ok(0) => return,
100                 Err(err) => {
101                     println!("stdin read err: {err}");
102                     return;
103                 }
104                 _ => {}
105             };
106             if msg.trim() == "exit" {
107                 return;
108             }
109             self.broadcast(msg.as_bytes()).await;
110         }
111     }
112 }
113