xref: /webrtc/turn/src/server/mod.rs (revision 9ea7b2ac)
1 #[cfg(test)]
2 mod server_test;
3 
4 pub mod config;
5 pub mod request;
6 
7 use crate::{
8     allocation::{allocation_manager::*, five_tuple::FiveTuple, AllocationInfo},
9     auth::AuthHandler,
10     error::*,
11     proto::lifetime::DEFAULT_LIFETIME,
12 };
13 use config::*;
14 use request::*;
15 
16 use std::{collections::HashMap, sync::Arc};
17 
18 use tokio::{
19     sync::{
20         broadcast::{self, error::RecvError},
21         mpsc, oneshot, Mutex,
22     },
23     time::{Duration, Instant},
24 };
25 use util::Conn;
26 
27 const INBOUND_MTU: usize = 1500;
28 
29 /// Server is an instance of the TURN Server
30 pub struct Server {
31     auth_handler: Arc<dyn AuthHandler + Send + Sync>,
32     realm: String,
33     channel_bind_timeout: Duration,
34     pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
35     command_tx: Mutex<Option<broadcast::Sender<Command>>>,
36 }
37 
38 impl Server {
39     /// creates the TURN server
new(config: ServerConfig) -> Result<Self>40     pub async fn new(config: ServerConfig) -> Result<Self> {
41         config.validate()?;
42 
43         let (command_tx, _) = broadcast::channel(16);
44         let mut s = Server {
45             auth_handler: config.auth_handler,
46             realm: config.realm,
47             channel_bind_timeout: config.channel_bind_timeout,
48             nonces: Arc::new(Mutex::new(HashMap::new())),
49             command_tx: Mutex::new(Some(command_tx.clone())),
50         };
51 
52         if s.channel_bind_timeout == Duration::from_secs(0) {
53             s.channel_bind_timeout = DEFAULT_LIFETIME;
54         }
55 
56         for p in config.conn_configs.into_iter() {
57             let nonces = Arc::clone(&s.nonces);
58             let auth_handler = Arc::clone(&s.auth_handler);
59             let realm = s.realm.clone();
60             let channel_bind_timeout = s.channel_bind_timeout;
61             let handle_rx = command_tx.subscribe();
62             let conn = p.conn;
63             let allocation_manager = Arc::new(Manager::new(ManagerConfig {
64                 relay_addr_generator: p.relay_addr_generator,
65                 alloc_close_notify: config.alloc_close_notify.clone(),
66             }));
67 
68             tokio::spawn(Server::read_loop(
69                 conn,
70                 allocation_manager,
71                 nonces,
72                 auth_handler,
73                 realm,
74                 channel_bind_timeout,
75                 handle_rx,
76             ));
77         }
78 
79         Ok(s)
80     }
81 
82     /// Deletes all existing [`crate::allocation::Allocation`]s by the provided `username`.
delete_allocations_by_username(&self, username: String) -> Result<()>83     pub async fn delete_allocations_by_username(&self, username: String) -> Result<()> {
84         let tx = {
85             let command_tx = self.command_tx.lock().await;
86             command_tx.clone()
87         };
88         if let Some(tx) = tx {
89             let (closed_tx, closed_rx) = mpsc::channel(1);
90             tx.send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
91                 .map_err(|_| Error::ErrClosed)?;
92 
93             closed_tx.closed().await;
94 
95             Ok(())
96         } else {
97             Err(Error::ErrClosed)
98         }
99     }
100 
101     /// Get information of [`Allocation`]s by specified [`FiveTuple`]s.
102     ///
103     /// If `five_tuples` is:
104     /// - [`None`]:               It returns information about the all
105     ///                           [`Allocation`]s.
106     /// - [`Some`] and not empty: It returns information about
107     ///                           the [`Allocation`]s associated with
108     ///                           the specified [`FiveTuples`].
109     /// - [`Some`], but empty:    It returns an empty [`HashMap`].
110     ///
111     /// [`Allocation`]: crate::allocation::Allocation
get_allocations_info( &self, five_tuples: Option<Vec<FiveTuple>>, ) -> Result<HashMap<FiveTuple, AllocationInfo>>112     pub async fn get_allocations_info(
113         &self,
114         five_tuples: Option<Vec<FiveTuple>>,
115     ) -> Result<HashMap<FiveTuple, AllocationInfo>> {
116         if let Some(five_tuples) = &five_tuples {
117             if five_tuples.is_empty() {
118                 return Ok(HashMap::new());
119             }
120         }
121 
122         let tx = {
123             let command_tx = self.command_tx.lock().await;
124             command_tx.clone()
125         };
126         if let Some(tx) = tx {
127             let (infos_tx, mut infos_rx) = mpsc::channel(1);
128             tx.send(Command::GetAllocationsInfo(five_tuples, infos_tx))
129                 .map_err(|_| Error::ErrClosed)?;
130 
131             let mut info: HashMap<FiveTuple, AllocationInfo> = HashMap::new();
132 
133             for _ in 0..tx.receiver_count() {
134                 info.extend(infos_rx.recv().await.ok_or(Error::ErrClosed)?);
135             }
136 
137             Ok(info)
138         } else {
139             Err(Error::ErrClosed)
140         }
141     }
142 
read_loop( conn: Arc<dyn Conn + Send + Sync>, allocation_manager: Arc<Manager>, nonces: Arc<Mutex<HashMap<String, Instant>>>, auth_handler: Arc<dyn AuthHandler + Send + Sync>, realm: String, channel_bind_timeout: Duration, mut handle_rx: broadcast::Receiver<Command>, )143     async fn read_loop(
144         conn: Arc<dyn Conn + Send + Sync>,
145         allocation_manager: Arc<Manager>,
146         nonces: Arc<Mutex<HashMap<String, Instant>>>,
147         auth_handler: Arc<dyn AuthHandler + Send + Sync>,
148         realm: String,
149         channel_bind_timeout: Duration,
150         mut handle_rx: broadcast::Receiver<Command>,
151     ) {
152         let mut buf = vec![0u8; INBOUND_MTU];
153 
154         let (mut close_tx, mut close_rx) = oneshot::channel::<()>();
155 
156         tokio::spawn({
157             let allocation_manager = Arc::clone(&allocation_manager);
158 
159             async move {
160                 loop {
161                     match handle_rx.recv().await {
162                         Ok(Command::DeleteAllocations(name, _)) => {
163                             allocation_manager
164                                 .delete_allocations_by_username(name.as_str())
165                                 .await;
166                             continue;
167                         }
168                         Ok(Command::GetAllocationsInfo(five_tuples, tx)) => {
169                             let infos = allocation_manager.get_allocations_info(five_tuples).await;
170                             let _ = tx.send(infos).await;
171 
172                             continue;
173                         }
174                         Err(RecvError::Closed) | Ok(Command::Close(_)) => {
175                             close_rx.close();
176                             break;
177                         }
178                         Err(RecvError::Lagged(n)) => {
179                             log::warn!("Turn server has lagged by {} messages", n);
180                             continue;
181                         }
182                     }
183                 }
184             }
185         });
186 
187         loop {
188             let (n, addr) = tokio::select! {
189                 v = conn.recv_from(&mut buf) => {
190                     match v {
191                         Ok(v) => v,
192                         Err(err) => {
193                             log::debug!("exit read loop on error: {}", err);
194                             break;
195                         }
196                     }
197                 },
198                 _ = close_tx.closed() => break
199             };
200 
201             let mut r = Request {
202                 conn: Arc::clone(&conn),
203                 src_addr: addr,
204                 buff: buf[..n].to_vec(),
205                 allocation_manager: Arc::clone(&allocation_manager),
206                 nonces: Arc::clone(&nonces),
207                 auth_handler: Arc::clone(&auth_handler),
208                 realm: realm.clone(),
209                 channel_bind_timeout,
210             };
211 
212             if let Err(err) = r.handle_request().await {
213                 log::error!("error when handling datagram: {}", err);
214             }
215         }
216 
217         let _ = allocation_manager.close().await;
218         let _ = conn.close().await;
219     }
220 
221     /// Close stops the TURN Server. It cleans up any associated state and closes all connections it is managing
close(&self) -> Result<()>222     pub async fn close(&self) -> Result<()> {
223         let tx = {
224             let mut command_tx = self.command_tx.lock().await;
225             command_tx.take()
226         };
227 
228         if let Some(tx) = tx {
229             if tx.receiver_count() == 0 {
230                 return Ok(());
231             }
232 
233             let (closed_tx, closed_rx) = mpsc::channel(1);
234             let _ = tx.send(Command::Close(Arc::new(closed_rx)));
235             closed_tx.closed().await
236         }
237 
238         Ok(())
239     }
240 }
241 
242 /// The protocol to communicate between the [`Server`]'s public methods
243 /// and the tasks spawned in the [`read_loop`] method.
244 #[derive(Clone)]
245 enum Command {
246     /// Command to delete [`crate::allocation::Allocation`] by provided
247     /// `username`.
248     DeleteAllocations(String, Arc<mpsc::Receiver<()>>),
249 
250     /// Command to get information of [`Allocation`]s by provided [`FiveTuple`]s.
251     ///
252     /// [`Allocation`]: [`crate::allocation::Allocation`]
253     GetAllocationsInfo(
254         Option<Vec<FiveTuple>>,
255         mpsc::Sender<HashMap<FiveTuple, AllocationInfo>>,
256     ),
257 
258     /// Command to close the [`Server`].
259     Close(Arc<mpsc::Receiver<()>>),
260 }
261