xref: /webrtc/turn/src/client/mod.rs (revision 5b79f08a)
1 #[cfg(test)]
2 mod client_test;
3 
4 pub mod binding;
5 pub mod periodic_timer;
6 pub mod permission;
7 pub mod relay_conn;
8 pub mod transaction;
9 
10 use crate::error::*;
11 use crate::proto::{
12     chandata::*, data::*, lifetime::*, peeraddr::*, relayaddr::*, reqtrans::*, PROTO_UDP,
13 };
14 use binding::*;
15 use relay_conn::*;
16 use transaction::*;
17 
18 use std::net::SocketAddr;
19 use std::str::FromStr;
20 use std::sync::Arc;
21 use stun::agent::*;
22 use stun::attributes::*;
23 use stun::error_code::*;
24 use stun::fingerprint::*;
25 use stun::integrity::*;
26 use stun::message::*;
27 use stun::textattrs::*;
28 use stun::xoraddr::*;
29 use tokio::sync::{mpsc, Mutex};
30 use util::{conn::*, vnet::net::*};
31 
32 use async_trait::async_trait;
33 
34 const DEFAULT_RTO_IN_MS: u16 = 200;
35 const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium
36 const MAX_READ_QUEUE_SIZE: usize = 1024;
37 
38 //              interval [msec]
39 // 0: 0 ms      +500
40 // 1: 500 ms	+1000
41 // 2: 1500 ms   +2000
42 // 3: 3500 ms   +4000
43 // 4: 7500 ms   +8000
44 // 5: 15500 ms  +16000
45 // 6: 31500 ms  +32000
46 // -: 63500 ms  failed
47 
48 // ClientConfig is a bag of config parameters for Client.
49 pub struct ClientConfig {
50     pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478")
51     pub turn_serv_addr: String, // TURN server addrees (e.g. "turn.abc.com:3478")
52     pub username: String,
53     pub password: String,
54     pub realm: String,
55     pub software: String,
56     pub rto_in_ms: u16,
57     pub conn: Arc<dyn Conn + Send + Sync>,
58     pub vnet: Option<Arc<Net>>,
59 }
60 
61 struct ClientInternal {
62     conn: Arc<dyn Conn + Send + Sync>,
63     stun_serv_addr: String,
64     turn_serv_addr: String,
65     username: Username,
66     password: String,
67     realm: Realm,
68     integrity: MessageIntegrity,
69     software: Software,
70     tr_map: Arc<Mutex<TransactionMap>>,
71     binding_mgr: Arc<Mutex<BindingManager>>,
72     rto_in_ms: u16,
73     read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
74 }
75 
76 #[async_trait]
77 impl RelayConnObserver for ClientInternal {
78     // turn_server_addr return the TURN server address
turn_server_addr(&self) -> String79     fn turn_server_addr(&self) -> String {
80         self.turn_serv_addr.clone()
81     }
82 
83     // username returns username
username(&self) -> Username84     fn username(&self) -> Username {
85         self.username.clone()
86     }
87 
88     // realm return realm
realm(&self) -> Realm89     fn realm(&self) -> Realm {
90         self.realm.clone()
91     }
92 
93     // WriteTo sends data to the specified destination using the base socket.
write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error>94     async fn write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error> {
95         let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?;
96         Ok(n)
97     }
98 
99     // PerformTransaction performs STUN transaction
perform_transaction( &mut self, msg: &Message, to: &str, ignore_result: bool, ) -> Result<TransactionResult>100     async fn perform_transaction(
101         &mut self,
102         msg: &Message,
103         to: &str,
104         ignore_result: bool,
105     ) -> Result<TransactionResult> {
106         let tr_key = base64::encode(msg.transaction_id.0);
107 
108         let mut tr = Transaction::new(TransactionConfig {
109             key: tr_key.clone(),
110             raw: msg.raw.clone(),
111             to: to.to_string(),
112             interval: self.rto_in_ms,
113             ignore_result,
114         });
115         let result_ch_rx = tr.get_result_channel();
116 
117         log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to);
118         {
119             let mut tm = self.tr_map.lock().await;
120             tm.insert(tr_key.clone(), tr);
121         }
122 
123         self.conn
124             .send_to(&msg.raw, SocketAddr::from_str(to)?)
125             .await?;
126 
127         let conn2 = Arc::clone(&self.conn);
128         let tr_map2 = Arc::clone(&self.tr_map);
129         {
130             let mut tm = self.tr_map.lock().await;
131             if let Some(tr) = tm.get(&tr_key) {
132                 tr.start_rtx_timer(conn2, tr_map2).await;
133             }
134         }
135 
136         // If dontWait is true, get the transaction going and return immediately
137         if ignore_result {
138             return Ok(TransactionResult::default());
139         }
140 
141         // wait_for_result waits for the transaction result
142         if let Some(mut result_ch_rx) = result_ch_rx {
143             match result_ch_rx.recv().await {
144                 Some(tr) => Ok(tr),
145                 None => Err(Error::ErrTransactionClosed),
146             }
147         } else {
148             Err(Error::ErrWaitForResultOnNonResultTransaction)
149         }
150     }
151 }
152 
153 impl ClientInternal {
154     // new returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0"
new(config: ClientConfig) -> Result<Self>155     async fn new(config: ClientConfig) -> Result<Self> {
156         let net = if let Some(vnet) = config.vnet {
157             if vnet.is_virtual() {
158                 log::warn!("vnet is enabled");
159             }
160             vnet
161         } else {
162             Arc::new(Net::new(None))
163         };
164 
165         let stun_serv_addr = if config.stun_serv_addr.is_empty() {
166             String::new()
167         } else {
168             log::debug!("resolving {}", config.stun_serv_addr);
169             let local_addr = config.conn.local_addr()?;
170             let stun_serv = net
171                 .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr)
172                 .await?;
173             log::debug!("stunServ: {}", stun_serv);
174             stun_serv.to_string()
175         };
176 
177         let turn_serv_addr = if config.turn_serv_addr.is_empty() {
178             String::new()
179         } else {
180             log::debug!("resolving {}", config.turn_serv_addr);
181             let local_addr = config.conn.local_addr()?;
182             let turn_serv = net
183                 .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr)
184                 .await?;
185             log::debug!("turnServ: {}", turn_serv);
186             turn_serv.to_string()
187         };
188 
189         Ok(ClientInternal {
190             conn: Arc::clone(&config.conn),
191             stun_serv_addr,
192             turn_serv_addr,
193             username: Username::new(ATTR_USERNAME, config.username),
194             password: config.password,
195             realm: Realm::new(ATTR_REALM, config.realm),
196             software: Software::new(ATTR_SOFTWARE, config.software),
197             tr_map: Arc::new(Mutex::new(TransactionMap::new())),
198             binding_mgr: Arc::new(Mutex::new(BindingManager::new())),
199             rto_in_ms: if config.rto_in_ms != 0 {
200                 config.rto_in_ms
201             } else {
202                 DEFAULT_RTO_IN_MS
203             },
204             integrity: MessageIntegrity::new_short_term_integrity(String::new()),
205             read_ch_tx: Arc::new(Mutex::new(None)),
206         })
207     }
208 
209     // stun_server_addr return the STUN server address
stun_server_addr(&self) -> String210     fn stun_server_addr(&self) -> String {
211         self.stun_serv_addr.clone()
212     }
213 
214     // Listen will have this client start listening on the relay_conn provided via the config.
215     // This is optional. If not used, you will need to call handle_inbound method
216     // to supply incoming data, instead.
listen(&self) -> Result<()>217     async fn listen(&self) -> Result<()> {
218         let conn = Arc::clone(&self.conn);
219         let stun_serv_str = self.stun_serv_addr.clone();
220         let tr_map = Arc::clone(&self.tr_map);
221         let read_ch_tx = Arc::clone(&self.read_ch_tx);
222         let binding_mgr = Arc::clone(&self.binding_mgr);
223 
224         tokio::spawn(async move {
225             let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
226             loop {
227                 //TODO: gracefully exit loop
228                 let (n, from) = match conn.recv_from(&mut buf).await {
229                     Ok((n, from)) => (n, from),
230                     Err(err) => {
231                         log::debug!("exiting read loop: {}", err);
232                         break;
233                     }
234                 };
235 
236                 log::debug!("received {} bytes of udp from {}", n, from);
237 
238                 if let Err(err) = ClientInternal::handle_inbound(
239                     &read_ch_tx,
240                     &buf[..n],
241                     from,
242                     &stun_serv_str,
243                     &tr_map,
244                     &binding_mgr,
245                 )
246                 .await
247                 {
248                     log::debug!("exiting read loop: {}", err);
249                     break;
250                 }
251             }
252         });
253 
254         Ok(())
255     }
256 
257     // handle_inbound handles data received.
258     // This method handles incoming packet demultiplex it by the source address
259     // and the types of the message.
260     // This return a booleen (handled or not) and if there was an error.
261     // Caller should check if the packet was handled by this client or not.
262     // If not handled, it is assumed that the packet is application data.
263     // If an error is returned, the caller should discard the packet regardless.
handle_inbound( read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], from: SocketAddr, stun_serv_str: &str, tr_map: &Arc<Mutex<TransactionMap>>, binding_mgr: &Arc<Mutex<BindingManager>>, ) -> Result<()>264     async fn handle_inbound(
265         read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
266         data: &[u8],
267         from: SocketAddr,
268         stun_serv_str: &str,
269         tr_map: &Arc<Mutex<TransactionMap>>,
270         binding_mgr: &Arc<Mutex<BindingManager>>,
271     ) -> Result<()> {
272         // +-------------------+-------------------------------+
273         // |   Return Values   |                               |
274         // +-------------------+       Meaning / Action        |
275         // | handled |  error  |                               |
276         // |=========+=========+===============================+
277         // |  false  |   nil   | Handle the packet as app data |
278         // |---------+---------+-------------------------------+
279         // |  true   |   nil   |        Nothing to do          |
280         // |---------+---------+-------------------------------+
281         // |  false  |  error  |     (shouldn't happen)        |
282         // |---------+---------+-------------------------------+
283         // |  true   |  error  | Error occurred while handling |
284         // +---------+---------+-------------------------------+
285         // Possible causes of the error:
286         //  - Malformed packet (parse error)
287         //  - STUN message was a request
288         //  - Non-STUN message from the STUN server
289 
290         if is_message(data) {
291             ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await
292         } else if ChannelData::is_channel_data(data) {
293             ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await
294         } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str {
295             // received from STUN server but it is not a STUN message
296             Err(Error::ErrNonStunmessage)
297         } else {
298             // assume, this is an application data
299             log::trace!("non-STUN/TURN packect, unhandled");
300             Ok(())
301         }
302     }
303 
handle_stun_message( tr_map: &Arc<Mutex<TransactionMap>>, read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], mut from: SocketAddr, ) -> Result<()>304     async fn handle_stun_message(
305         tr_map: &Arc<Mutex<TransactionMap>>,
306         read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
307         data: &[u8],
308         mut from: SocketAddr,
309     ) -> Result<()> {
310         let mut msg = Message::new();
311         msg.raw = data.to_vec();
312         msg.decode()?;
313 
314         if msg.typ.class == CLASS_REQUEST {
315             return Err(Error::Other(format!(
316                 "{:?} : {}",
317                 Error::ErrUnexpectedStunrequestMessage,
318                 msg
319             )));
320         }
321 
322         if msg.typ.class == CLASS_INDICATION {
323             if msg.typ.method == METHOD_DATA {
324                 let mut peer_addr = PeerAddress::default();
325                 peer_addr.get_from(&msg)?;
326                 from = SocketAddr::new(peer_addr.ip, peer_addr.port);
327 
328                 let mut data = Data::default();
329                 data.get_from(&msg)?;
330 
331                 log::debug!("data indication received from {}", from);
332 
333                 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await;
334             }
335 
336             return Ok(());
337         }
338 
339         // This is a STUN response message (transactional)
340         // The type is either:
341         // - stun.ClassSuccessResponse
342         // - stun.ClassErrorResponse
343 
344         let tr_key = base64::encode(msg.transaction_id.0);
345 
346         let mut tm = tr_map.lock().await;
347         if tm.find(&tr_key).is_none() {
348             // silently discard
349             log::debug!("no transaction for {}", msg);
350             return Ok(());
351         }
352 
353         if let Some(mut tr) = tm.delete(&tr_key) {
354             // End the transaction
355             tr.stop_rtx_timer();
356 
357             if !tr
358                 .write_result(TransactionResult {
359                     msg,
360                     from,
361                     retries: tr.retries(),
362                     ..Default::default()
363                 })
364                 .await
365             {
366                 log::debug!("no listener for msg.raw {:?}", data);
367             }
368         }
369 
370         Ok(())
371     }
372 
handle_channel_data( binding_mgr: &Arc<Mutex<BindingManager>>, read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], ) -> Result<()>373     async fn handle_channel_data(
374         binding_mgr: &Arc<Mutex<BindingManager>>,
375         read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
376         data: &[u8],
377     ) -> Result<()> {
378         let mut ch_data = ChannelData {
379             raw: data.to_vec(),
380             ..Default::default()
381         };
382         ch_data.decode()?;
383 
384         let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0)
385             .await
386             .ok_or(Error::ErrChannelBindNotFound)?;
387 
388         log::trace!(
389             "channel data received from {} (ch={})",
390             addr,
391             ch_data.number.0
392         );
393 
394         let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await;
395 
396         Ok(())
397     }
398 
399     // handle_inbound_relay_conn passes inbound data in RelayConn
handle_inbound_relay_conn( read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], from: SocketAddr, ) -> Result<()>400     async fn handle_inbound_relay_conn(
401         read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
402         data: &[u8],
403         from: SocketAddr,
404     ) -> Result<()> {
405         let read_ch_tx_opt = read_ch_tx.lock().await;
406         log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
407         if let Some(tx) = &*read_ch_tx_opt {
408             log::debug!("try_send data = {:?}, from = {}", data, from);
409             if tx
410                 .try_send(InboundData {
411                     data: data.to_vec(),
412                     from,
413                 })
414                 .is_err()
415             {
416                 log::warn!("receive buffer full");
417             }
418             Ok(())
419         } else {
420             Err(Error::ErrAlreadyClosed)
421         }
422     }
423 
424     // Close closes this client
close(&mut self)425     async fn close(&mut self) {
426         {
427             let mut read_ch_tx = self.read_ch_tx.lock().await;
428             read_ch_tx.take();
429         }
430         {
431             let mut tm = self.tr_map.lock().await;
432             tm.close_and_delete_all();
433         }
434     }
435 
436     // send_binding_request_to sends a new STUN request to the given transport address
send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr>437     async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr> {
438         let msg = {
439             let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() {
440                 vec![
441                     Box::new(TransactionId::new()),
442                     Box::new(BINDING_REQUEST),
443                     Box::new(self.software.clone()),
444                 ]
445             } else {
446                 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)]
447             };
448 
449             let mut msg = Message::new();
450             msg.build(&attrs)?;
451             msg
452         };
453 
454         log::debug!("client.SendBindingRequestTo call PerformTransaction 1");
455         let tr_res = self.perform_transaction(&msg, to, false).await?;
456 
457         let mut refl_addr = XorMappedAddress::default();
458         refl_addr.get_from(&tr_res.msg)?;
459 
460         Ok(SocketAddr::new(refl_addr.ip, refl_addr.port))
461     }
462 
463     // send_binding_request sends a new STUN request to the STUN server
send_binding_request(&mut self) -> Result<SocketAddr>464     async fn send_binding_request(&mut self) -> Result<SocketAddr> {
465         if self.stun_serv_addr.is_empty() {
466             Err(Error::ErrStunserverAddressNotSet)
467         } else {
468             self.send_binding_request_to(&self.stun_serv_addr.clone())
469                 .await
470         }
471     }
472 
473     // find_addr_by_channel_number returns a peer address associated with the
474     // channel number on this UDPConn
find_addr_by_channel_number( binding_mgr: &Arc<Mutex<BindingManager>>, ch_num: u16, ) -> Option<SocketAddr>475     async fn find_addr_by_channel_number(
476         binding_mgr: &Arc<Mutex<BindingManager>>,
477         ch_num: u16,
478     ) -> Option<SocketAddr> {
479         let bm = binding_mgr.lock().await;
480         bm.find_by_number(ch_num).map(|b| b.addr)
481     }
482 
483     // Allocate sends a TURN allocation request to the given transport address
allocate(&mut self) -> Result<RelayConnConfig>484     async fn allocate(&mut self) -> Result<RelayConnConfig> {
485         {
486             let read_ch_tx = self.read_ch_tx.lock().await;
487             log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some());
488             if read_ch_tx.is_some() {
489                 return Err(Error::ErrOneAllocateOnly);
490             }
491         }
492 
493         let mut msg = Message::new();
494         msg.build(&[
495             Box::new(TransactionId::new()),
496             Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
497             Box::new(RequestedTransport {
498                 protocol: PROTO_UDP,
499             }),
500             Box::new(FINGERPRINT),
501         ])?;
502 
503         log::debug!("client.Allocate call PerformTransaction 1");
504         let tr_res = self
505             .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
506             .await?;
507         let res = tr_res.msg;
508 
509         // Anonymous allocate failed, trying to authenticate.
510         let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?;
511         self.realm = Realm::get_from_as(&res, ATTR_REALM)?;
512 
513         self.integrity = MessageIntegrity::new_long_term_integrity(
514             self.username.text.clone(),
515             self.realm.text.clone(),
516             self.password.clone(),
517         );
518 
519         // Trying to authorize.
520         msg.build(&[
521             Box::new(TransactionId::new()),
522             Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)),
523             Box::new(RequestedTransport {
524                 protocol: PROTO_UDP,
525             }),
526             Box::new(self.username.clone()),
527             Box::new(self.realm.clone()),
528             Box::new(nonce.clone()),
529             Box::new(self.integrity.clone()),
530             Box::new(FINGERPRINT),
531         ])?;
532 
533         log::debug!("client.Allocate call PerformTransaction 2");
534         let tr_res = self
535             .perform_transaction(&msg, &self.turn_serv_addr.clone(), false)
536             .await?;
537         let res = tr_res.msg;
538 
539         if res.typ.class == CLASS_ERROR_RESPONSE {
540             let mut code = ErrorCodeAttribute::default();
541             let result = code.get_from(&res);
542             if result.is_err() {
543                 return Err(Error::Other(format!("{}", res.typ)));
544             } else {
545                 return Err(Error::Other(format!("{} (error {})", res.typ, code)));
546             }
547         }
548 
549         // Getting relayed addresses from response.
550         let mut relayed = RelayedAddress::default();
551         relayed.get_from(&res)?;
552         let relayed_addr = SocketAddr::new(relayed.ip, relayed.port);
553 
554         // Getting lifetime from response
555         let mut lifetime = Lifetime::default();
556         lifetime.get_from(&res)?;
557 
558         let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE);
559         {
560             let mut read_ch_tx_opt = self.read_ch_tx.lock().await;
561             *read_ch_tx_opt = Some(read_ch_tx);
562             log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some());
563         }
564 
565         Ok(RelayConnConfig {
566             relayed_addr,
567             integrity: self.integrity.clone(),
568             nonce,
569             lifetime: lifetime.0,
570             binding_mgr: Arc::clone(&self.binding_mgr),
571             read_ch_rx: Arc::new(Mutex::new(read_ch_rx)),
572         })
573     }
574 }
575 
576 // Client is a STUN server client
577 #[derive(Clone)]
578 pub struct Client {
579     client_internal: Arc<Mutex<ClientInternal>>,
580 }
581 
582 impl Client {
new(config: ClientConfig) -> Result<Self>583     pub async fn new(config: ClientConfig) -> Result<Self> {
584         let ci = ClientInternal::new(config).await?;
585         Ok(Client {
586             client_internal: Arc::new(Mutex::new(ci)),
587         })
588     }
589 
listen(&self) -> Result<()>590     pub async fn listen(&self) -> Result<()> {
591         let ci = self.client_internal.lock().await;
592         ci.listen().await
593     }
594 
allocate(&self) -> Result<impl Conn>595     pub async fn allocate(&self) -> Result<impl Conn> {
596         let config = {
597             let mut ci = self.client_internal.lock().await;
598             ci.allocate().await?
599         };
600 
601         Ok(RelayConn::new(Arc::clone(&self.client_internal), config).await)
602     }
603 
close(&self) -> Result<()>604     pub async fn close(&self) -> Result<()> {
605         let mut ci = self.client_internal.lock().await;
606         ci.close().await;
607         Ok(())
608     }
609 
610     // send_binding_request_to sends a new STUN request to the given transport address
send_binding_request_to(&self, to: &str) -> Result<SocketAddr>611     pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr> {
612         let mut ci = self.client_internal.lock().await;
613         ci.send_binding_request_to(to).await
614     }
615 
616     // send_binding_request sends a new STUN request to the STUN server
send_binding_request(&self) -> Result<SocketAddr>617     pub async fn send_binding_request(&self) -> Result<SocketAddr> {
618         let mut ci = self.client_internal.lock().await;
619         ci.send_binding_request().await
620     }
621 }
622