1 #[cfg(test)] 2 mod server_test; 3 4 pub mod config; 5 pub mod request; 6 7 use crate::{ 8 allocation::{allocation_manager::*, five_tuple::FiveTuple, AllocationInfo}, 9 auth::AuthHandler, 10 error::*, 11 proto::lifetime::DEFAULT_LIFETIME, 12 }; 13 use config::*; 14 use request::*; 15 16 use std::{collections::HashMap, sync::Arc}; 17 18 use tokio::{ 19 sync::{ 20 broadcast::{self, error::RecvError}, 21 mpsc, oneshot, Mutex, 22 }, 23 time::{Duration, Instant}, 24 }; 25 use util::Conn; 26 27 const INBOUND_MTU: usize = 1500; 28 29 /// Server is an instance of the TURN Server 30 pub struct Server { 31 auth_handler: Arc<dyn AuthHandler + Send + Sync>, 32 realm: String, 33 channel_bind_timeout: Duration, 34 pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>, 35 command_tx: Mutex<Option<broadcast::Sender<Command>>>, 36 } 37 38 impl Server { 39 /// creates the TURN server new(config: ServerConfig) -> Result<Self>40 pub async fn new(config: ServerConfig) -> Result<Self> { 41 config.validate()?; 42 43 let (command_tx, _) = broadcast::channel(16); 44 let mut s = Server { 45 auth_handler: config.auth_handler, 46 realm: config.realm, 47 channel_bind_timeout: config.channel_bind_timeout, 48 nonces: Arc::new(Mutex::new(HashMap::new())), 49 command_tx: Mutex::new(Some(command_tx.clone())), 50 }; 51 52 if s.channel_bind_timeout == Duration::from_secs(0) { 53 s.channel_bind_timeout = DEFAULT_LIFETIME; 54 } 55 56 for p in config.conn_configs.into_iter() { 57 let nonces = Arc::clone(&s.nonces); 58 let auth_handler = Arc::clone(&s.auth_handler); 59 let realm = s.realm.clone(); 60 let channel_bind_timeout = s.channel_bind_timeout; 61 let handle_rx = command_tx.subscribe(); 62 let conn = p.conn; 63 let allocation_manager = Arc::new(Manager::new(ManagerConfig { 64 relay_addr_generator: p.relay_addr_generator, 65 alloc_close_notify: config.alloc_close_notify.clone(), 66 })); 67 68 tokio::spawn(Server::read_loop( 69 conn, 70 allocation_manager, 71 nonces, 72 auth_handler, 73 realm, 74 channel_bind_timeout, 75 handle_rx, 76 )); 77 } 78 79 Ok(s) 80 } 81 82 /// Deletes all existing [`crate::allocation::Allocation`]s by the provided `username`. delete_allocations_by_username(&self, username: String) -> Result<()>83 pub async fn delete_allocations_by_username(&self, username: String) -> Result<()> { 84 let tx = { 85 let command_tx = self.command_tx.lock().await; 86 command_tx.clone() 87 }; 88 if let Some(tx) = tx { 89 let (closed_tx, closed_rx) = mpsc::channel(1); 90 tx.send(Command::DeleteAllocations(username, Arc::new(closed_rx))) 91 .map_err(|_| Error::ErrClosed)?; 92 93 closed_tx.closed().await; 94 95 Ok(()) 96 } else { 97 Err(Error::ErrClosed) 98 } 99 } 100 101 /// Get information of [`Allocation`]s by specified [`FiveTuple`]s. 102 /// 103 /// If `five_tuples` is: 104 /// - [`None`]: It returns information about the all 105 /// [`Allocation`]s. 106 /// - [`Some`] and not empty: It returns information about 107 /// the [`Allocation`]s associated with 108 /// the specified [`FiveTuples`]. 109 /// - [`Some`], but empty: It returns an empty [`HashMap`]. 110 /// 111 /// [`Allocation`]: crate::allocation::Allocation get_allocations_info( &self, five_tuples: Option<Vec<FiveTuple>>, ) -> Result<HashMap<FiveTuple, AllocationInfo>>112 pub async fn get_allocations_info( 113 &self, 114 five_tuples: Option<Vec<FiveTuple>>, 115 ) -> Result<HashMap<FiveTuple, AllocationInfo>> { 116 if let Some(five_tuples) = &five_tuples { 117 if five_tuples.is_empty() { 118 return Ok(HashMap::new()); 119 } 120 } 121 122 let tx = { 123 let command_tx = self.command_tx.lock().await; 124 command_tx.clone() 125 }; 126 if let Some(tx) = tx { 127 let (infos_tx, mut infos_rx) = mpsc::channel(1); 128 tx.send(Command::GetAllocationsInfo(five_tuples, infos_tx)) 129 .map_err(|_| Error::ErrClosed)?; 130 131 let mut info: HashMap<FiveTuple, AllocationInfo> = HashMap::new(); 132 133 for _ in 0..tx.receiver_count() { 134 info.extend(infos_rx.recv().await.ok_or(Error::ErrClosed)?); 135 } 136 137 Ok(info) 138 } else { 139 Err(Error::ErrClosed) 140 } 141 } 142 read_loop( conn: Arc<dyn Conn + Send + Sync>, allocation_manager: Arc<Manager>, nonces: Arc<Mutex<HashMap<String, Instant>>>, auth_handler: Arc<dyn AuthHandler + Send + Sync>, realm: String, channel_bind_timeout: Duration, mut handle_rx: broadcast::Receiver<Command>, )143 async fn read_loop( 144 conn: Arc<dyn Conn + Send + Sync>, 145 allocation_manager: Arc<Manager>, 146 nonces: Arc<Mutex<HashMap<String, Instant>>>, 147 auth_handler: Arc<dyn AuthHandler + Send + Sync>, 148 realm: String, 149 channel_bind_timeout: Duration, 150 mut handle_rx: broadcast::Receiver<Command>, 151 ) { 152 let mut buf = vec![0u8; INBOUND_MTU]; 153 154 let (mut close_tx, mut close_rx) = oneshot::channel::<()>(); 155 156 tokio::spawn({ 157 let allocation_manager = Arc::clone(&allocation_manager); 158 159 async move { 160 loop { 161 match handle_rx.recv().await { 162 Ok(Command::DeleteAllocations(name, _)) => { 163 allocation_manager 164 .delete_allocations_by_username(name.as_str()) 165 .await; 166 continue; 167 } 168 Ok(Command::GetAllocationsInfo(five_tuples, tx)) => { 169 let infos = allocation_manager.get_allocations_info(five_tuples).await; 170 let _ = tx.send(infos).await; 171 172 continue; 173 } 174 Err(RecvError::Closed) | Ok(Command::Close(_)) => { 175 close_rx.close(); 176 break; 177 } 178 Err(RecvError::Lagged(n)) => { 179 log::warn!("Turn server has lagged by {} messages", n); 180 continue; 181 } 182 } 183 } 184 } 185 }); 186 187 loop { 188 let (n, addr) = tokio::select! { 189 v = conn.recv_from(&mut buf) => { 190 match v { 191 Ok(v) => v, 192 Err(err) => { 193 log::debug!("exit read loop on error: {}", err); 194 break; 195 } 196 } 197 }, 198 _ = close_tx.closed() => break 199 }; 200 201 let mut r = Request { 202 conn: Arc::clone(&conn), 203 src_addr: addr, 204 buff: buf[..n].to_vec(), 205 allocation_manager: Arc::clone(&allocation_manager), 206 nonces: Arc::clone(&nonces), 207 auth_handler: Arc::clone(&auth_handler), 208 realm: realm.clone(), 209 channel_bind_timeout, 210 }; 211 212 if let Err(err) = r.handle_request().await { 213 log::error!("error when handling datagram: {}", err); 214 } 215 } 216 217 let _ = allocation_manager.close().await; 218 let _ = conn.close().await; 219 } 220 221 /// Close stops the TURN Server. It cleans up any associated state and closes all connections it is managing close(&self) -> Result<()>222 pub async fn close(&self) -> Result<()> { 223 let tx = { 224 let mut command_tx = self.command_tx.lock().await; 225 command_tx.take() 226 }; 227 228 if let Some(tx) = tx { 229 if tx.receiver_count() == 0 { 230 return Ok(()); 231 } 232 233 let (closed_tx, closed_rx) = mpsc::channel(1); 234 let _ = tx.send(Command::Close(Arc::new(closed_rx))); 235 closed_tx.closed().await 236 } 237 238 Ok(()) 239 } 240 } 241 242 /// The protocol to communicate between the [`Server`]'s public methods 243 /// and the tasks spawned in the [`read_loop`] method. 244 #[derive(Clone)] 245 enum Command { 246 /// Command to delete [`crate::allocation::Allocation`] by provided 247 /// `username`. 248 DeleteAllocations(String, Arc<mpsc::Receiver<()>>), 249 250 /// Command to get information of [`Allocation`]s by provided [`FiveTuple`]s. 251 /// 252 /// [`Allocation`]: [`crate::allocation::Allocation`] 253 GetAllocationsInfo( 254 Option<Vec<FiveTuple>>, 255 mpsc::Sender<HashMap<FiveTuple, AllocationInfo>>, 256 ), 257 258 /// Command to close the [`Server`]. 259 Close(Arc<mpsc::Receiver<()>>), 260 } 261