1 #[cfg(test)] 2 mod client_test; 3 4 pub mod binding; 5 pub mod periodic_timer; 6 pub mod permission; 7 pub mod relay_conn; 8 pub mod transaction; 9 10 use crate::error::*; 11 use crate::proto::{ 12 chandata::*, data::*, lifetime::*, peeraddr::*, relayaddr::*, reqtrans::*, PROTO_UDP, 13 }; 14 use binding::*; 15 use relay_conn::*; 16 use transaction::*; 17 18 use std::net::SocketAddr; 19 use std::str::FromStr; 20 use std::sync::Arc; 21 use stun::agent::*; 22 use stun::attributes::*; 23 use stun::error_code::*; 24 use stun::fingerprint::*; 25 use stun::integrity::*; 26 use stun::message::*; 27 use stun::textattrs::*; 28 use stun::xoraddr::*; 29 use tokio::sync::{mpsc, Mutex}; 30 use util::{conn::*, vnet::net::*}; 31 32 use async_trait::async_trait; 33 34 const DEFAULT_RTO_IN_MS: u16 = 200; 35 const MAX_DATA_BUFFER_SIZE: usize = u16::MAX as usize; // message size limit for Chromium 36 const MAX_READ_QUEUE_SIZE: usize = 1024; 37 38 // interval [msec] 39 // 0: 0 ms +500 40 // 1: 500 ms +1000 41 // 2: 1500 ms +2000 42 // 3: 3500 ms +4000 43 // 4: 7500 ms +8000 44 // 5: 15500 ms +16000 45 // 6: 31500 ms +32000 46 // -: 63500 ms failed 47 48 // ClientConfig is a bag of config parameters for Client. 49 pub struct ClientConfig { 50 pub stun_serv_addr: String, // STUN server address (e.g. "stun.abc.com:3478") 51 pub turn_serv_addr: String, // TURN server addrees (e.g. "turn.abc.com:3478") 52 pub username: String, 53 pub password: String, 54 pub realm: String, 55 pub software: String, 56 pub rto_in_ms: u16, 57 pub conn: Arc<dyn Conn + Send + Sync>, 58 pub vnet: Option<Arc<Net>>, 59 } 60 61 struct ClientInternal { 62 conn: Arc<dyn Conn + Send + Sync>, 63 stun_serv_addr: String, 64 turn_serv_addr: String, 65 username: Username, 66 password: String, 67 realm: Realm, 68 integrity: MessageIntegrity, 69 software: Software, 70 tr_map: Arc<Mutex<TransactionMap>>, 71 binding_mgr: Arc<Mutex<BindingManager>>, 72 rto_in_ms: u16, 73 read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, 74 } 75 76 #[async_trait] 77 impl RelayConnObserver for ClientInternal { 78 // turn_server_addr return the TURN server address turn_server_addr(&self) -> String79 fn turn_server_addr(&self) -> String { 80 self.turn_serv_addr.clone() 81 } 82 83 // username returns username username(&self) -> Username84 fn username(&self) -> Username { 85 self.username.clone() 86 } 87 88 // realm return realm realm(&self) -> Realm89 fn realm(&self) -> Realm { 90 self.realm.clone() 91 } 92 93 // WriteTo sends data to the specified destination using the base socket. write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error>94 async fn write_to(&self, data: &[u8], to: &str) -> std::result::Result<usize, util::Error> { 95 let n = self.conn.send_to(data, SocketAddr::from_str(to)?).await?; 96 Ok(n) 97 } 98 99 // PerformTransaction performs STUN transaction perform_transaction( &mut self, msg: &Message, to: &str, ignore_result: bool, ) -> Result<TransactionResult>100 async fn perform_transaction( 101 &mut self, 102 msg: &Message, 103 to: &str, 104 ignore_result: bool, 105 ) -> Result<TransactionResult> { 106 let tr_key = base64::encode(msg.transaction_id.0); 107 108 let mut tr = Transaction::new(TransactionConfig { 109 key: tr_key.clone(), 110 raw: msg.raw.clone(), 111 to: to.to_string(), 112 interval: self.rto_in_ms, 113 ignore_result, 114 }); 115 let result_ch_rx = tr.get_result_channel(); 116 117 log::trace!("start {} transaction {} to {}", msg.typ, tr_key, tr.to); 118 { 119 let mut tm = self.tr_map.lock().await; 120 tm.insert(tr_key.clone(), tr); 121 } 122 123 self.conn 124 .send_to(&msg.raw, SocketAddr::from_str(to)?) 125 .await?; 126 127 let conn2 = Arc::clone(&self.conn); 128 let tr_map2 = Arc::clone(&self.tr_map); 129 { 130 let mut tm = self.tr_map.lock().await; 131 if let Some(tr) = tm.get(&tr_key) { 132 tr.start_rtx_timer(conn2, tr_map2).await; 133 } 134 } 135 136 // If dontWait is true, get the transaction going and return immediately 137 if ignore_result { 138 return Ok(TransactionResult::default()); 139 } 140 141 // wait_for_result waits for the transaction result 142 if let Some(mut result_ch_rx) = result_ch_rx { 143 match result_ch_rx.recv().await { 144 Some(tr) => Ok(tr), 145 None => Err(Error::ErrTransactionClosed), 146 } 147 } else { 148 Err(Error::ErrWaitForResultOnNonResultTransaction) 149 } 150 } 151 } 152 153 impl ClientInternal { 154 // new returns a new Client instance. listeningAddress is the address and port to listen on, default "0.0.0.0:0" new(config: ClientConfig) -> Result<Self>155 async fn new(config: ClientConfig) -> Result<Self> { 156 let net = if let Some(vnet) = config.vnet { 157 if vnet.is_virtual() { 158 log::warn!("vnet is enabled"); 159 } 160 vnet 161 } else { 162 Arc::new(Net::new(None)) 163 }; 164 165 let stun_serv_addr = if config.stun_serv_addr.is_empty() { 166 String::new() 167 } else { 168 log::debug!("resolving {}", config.stun_serv_addr); 169 let local_addr = config.conn.local_addr()?; 170 let stun_serv = net 171 .resolve_addr(local_addr.is_ipv4(), &config.stun_serv_addr) 172 .await?; 173 log::debug!("stunServ: {}", stun_serv); 174 stun_serv.to_string() 175 }; 176 177 let turn_serv_addr = if config.turn_serv_addr.is_empty() { 178 String::new() 179 } else { 180 log::debug!("resolving {}", config.turn_serv_addr); 181 let local_addr = config.conn.local_addr()?; 182 let turn_serv = net 183 .resolve_addr(local_addr.is_ipv4(), &config.turn_serv_addr) 184 .await?; 185 log::debug!("turnServ: {}", turn_serv); 186 turn_serv.to_string() 187 }; 188 189 Ok(ClientInternal { 190 conn: Arc::clone(&config.conn), 191 stun_serv_addr, 192 turn_serv_addr, 193 username: Username::new(ATTR_USERNAME, config.username), 194 password: config.password, 195 realm: Realm::new(ATTR_REALM, config.realm), 196 software: Software::new(ATTR_SOFTWARE, config.software), 197 tr_map: Arc::new(Mutex::new(TransactionMap::new())), 198 binding_mgr: Arc::new(Mutex::new(BindingManager::new())), 199 rto_in_ms: if config.rto_in_ms != 0 { 200 config.rto_in_ms 201 } else { 202 DEFAULT_RTO_IN_MS 203 }, 204 integrity: MessageIntegrity::new_short_term_integrity(String::new()), 205 read_ch_tx: Arc::new(Mutex::new(None)), 206 }) 207 } 208 209 // stun_server_addr return the STUN server address stun_server_addr(&self) -> String210 fn stun_server_addr(&self) -> String { 211 self.stun_serv_addr.clone() 212 } 213 214 // Listen will have this client start listening on the relay_conn provided via the config. 215 // This is optional. If not used, you will need to call handle_inbound method 216 // to supply incoming data, instead. listen(&self) -> Result<()>217 async fn listen(&self) -> Result<()> { 218 let conn = Arc::clone(&self.conn); 219 let stun_serv_str = self.stun_serv_addr.clone(); 220 let tr_map = Arc::clone(&self.tr_map); 221 let read_ch_tx = Arc::clone(&self.read_ch_tx); 222 let binding_mgr = Arc::clone(&self.binding_mgr); 223 224 tokio::spawn(async move { 225 let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE]; 226 loop { 227 //TODO: gracefully exit loop 228 let (n, from) = match conn.recv_from(&mut buf).await { 229 Ok((n, from)) => (n, from), 230 Err(err) => { 231 log::debug!("exiting read loop: {}", err); 232 break; 233 } 234 }; 235 236 log::debug!("received {} bytes of udp from {}", n, from); 237 238 if let Err(err) = ClientInternal::handle_inbound( 239 &read_ch_tx, 240 &buf[..n], 241 from, 242 &stun_serv_str, 243 &tr_map, 244 &binding_mgr, 245 ) 246 .await 247 { 248 log::debug!("exiting read loop: {}", err); 249 break; 250 } 251 } 252 }); 253 254 Ok(()) 255 } 256 257 // handle_inbound handles data received. 258 // This method handles incoming packet demultiplex it by the source address 259 // and the types of the message. 260 // This return a booleen (handled or not) and if there was an error. 261 // Caller should check if the packet was handled by this client or not. 262 // If not handled, it is assumed that the packet is application data. 263 // If an error is returned, the caller should discard the packet regardless. handle_inbound( read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], from: SocketAddr, stun_serv_str: &str, tr_map: &Arc<Mutex<TransactionMap>>, binding_mgr: &Arc<Mutex<BindingManager>>, ) -> Result<()>264 async fn handle_inbound( 265 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, 266 data: &[u8], 267 from: SocketAddr, 268 stun_serv_str: &str, 269 tr_map: &Arc<Mutex<TransactionMap>>, 270 binding_mgr: &Arc<Mutex<BindingManager>>, 271 ) -> Result<()> { 272 // +-------------------+-------------------------------+ 273 // | Return Values | | 274 // +-------------------+ Meaning / Action | 275 // | handled | error | | 276 // |=========+=========+===============================+ 277 // | false | nil | Handle the packet as app data | 278 // |---------+---------+-------------------------------+ 279 // | true | nil | Nothing to do | 280 // |---------+---------+-------------------------------+ 281 // | false | error | (shouldn't happen) | 282 // |---------+---------+-------------------------------+ 283 // | true | error | Error occurred while handling | 284 // +---------+---------+-------------------------------+ 285 // Possible causes of the error: 286 // - Malformed packet (parse error) 287 // - STUN message was a request 288 // - Non-STUN message from the STUN server 289 290 if is_message(data) { 291 ClientInternal::handle_stun_message(tr_map, read_ch_tx, data, from).await 292 } else if ChannelData::is_channel_data(data) { 293 ClientInternal::handle_channel_data(binding_mgr, read_ch_tx, data).await 294 } else if !stun_serv_str.is_empty() && from.to_string() == *stun_serv_str { 295 // received from STUN server but it is not a STUN message 296 Err(Error::ErrNonStunmessage) 297 } else { 298 // assume, this is an application data 299 log::trace!("non-STUN/TURN packect, unhandled"); 300 Ok(()) 301 } 302 } 303 handle_stun_message( tr_map: &Arc<Mutex<TransactionMap>>, read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], mut from: SocketAddr, ) -> Result<()>304 async fn handle_stun_message( 305 tr_map: &Arc<Mutex<TransactionMap>>, 306 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, 307 data: &[u8], 308 mut from: SocketAddr, 309 ) -> Result<()> { 310 let mut msg = Message::new(); 311 msg.raw = data.to_vec(); 312 msg.decode()?; 313 314 if msg.typ.class == CLASS_REQUEST { 315 return Err(Error::Other(format!( 316 "{:?} : {}", 317 Error::ErrUnexpectedStunrequestMessage, 318 msg 319 ))); 320 } 321 322 if msg.typ.class == CLASS_INDICATION { 323 if msg.typ.method == METHOD_DATA { 324 let mut peer_addr = PeerAddress::default(); 325 peer_addr.get_from(&msg)?; 326 from = SocketAddr::new(peer_addr.ip, peer_addr.port); 327 328 let mut data = Data::default(); 329 data.get_from(&msg)?; 330 331 log::debug!("data indication received from {}", from); 332 333 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &data.0, from).await; 334 } 335 336 return Ok(()); 337 } 338 339 // This is a STUN response message (transactional) 340 // The type is either: 341 // - stun.ClassSuccessResponse 342 // - stun.ClassErrorResponse 343 344 let tr_key = base64::encode(msg.transaction_id.0); 345 346 let mut tm = tr_map.lock().await; 347 if tm.find(&tr_key).is_none() { 348 // silently discard 349 log::debug!("no transaction for {}", msg); 350 return Ok(()); 351 } 352 353 if let Some(mut tr) = tm.delete(&tr_key) { 354 // End the transaction 355 tr.stop_rtx_timer(); 356 357 if !tr 358 .write_result(TransactionResult { 359 msg, 360 from, 361 retries: tr.retries(), 362 ..Default::default() 363 }) 364 .await 365 { 366 log::debug!("no listener for msg.raw {:?}", data); 367 } 368 } 369 370 Ok(()) 371 } 372 handle_channel_data( binding_mgr: &Arc<Mutex<BindingManager>>, read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], ) -> Result<()>373 async fn handle_channel_data( 374 binding_mgr: &Arc<Mutex<BindingManager>>, 375 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, 376 data: &[u8], 377 ) -> Result<()> { 378 let mut ch_data = ChannelData { 379 raw: data.to_vec(), 380 ..Default::default() 381 }; 382 ch_data.decode()?; 383 384 let addr = ClientInternal::find_addr_by_channel_number(binding_mgr, ch_data.number.0) 385 .await 386 .ok_or(Error::ErrChannelBindNotFound)?; 387 388 log::trace!( 389 "channel data received from {} (ch={})", 390 addr, 391 ch_data.number.0 392 ); 393 394 let _ = ClientInternal::handle_inbound_relay_conn(read_ch_tx, &ch_data.data, addr).await; 395 396 Ok(()) 397 } 398 399 // handle_inbound_relay_conn passes inbound data in RelayConn handle_inbound_relay_conn( read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, data: &[u8], from: SocketAddr, ) -> Result<()>400 async fn handle_inbound_relay_conn( 401 read_ch_tx: &Arc<Mutex<Option<mpsc::Sender<InboundData>>>>, 402 data: &[u8], 403 from: SocketAddr, 404 ) -> Result<()> { 405 let read_ch_tx_opt = read_ch_tx.lock().await; 406 log::debug!("read_ch_tx_opt = {}", read_ch_tx_opt.is_some()); 407 if let Some(tx) = &*read_ch_tx_opt { 408 log::debug!("try_send data = {:?}, from = {}", data, from); 409 if tx 410 .try_send(InboundData { 411 data: data.to_vec(), 412 from, 413 }) 414 .is_err() 415 { 416 log::warn!("receive buffer full"); 417 } 418 Ok(()) 419 } else { 420 Err(Error::ErrAlreadyClosed) 421 } 422 } 423 424 // Close closes this client close(&mut self)425 async fn close(&mut self) { 426 { 427 let mut read_ch_tx = self.read_ch_tx.lock().await; 428 read_ch_tx.take(); 429 } 430 { 431 let mut tm = self.tr_map.lock().await; 432 tm.close_and_delete_all(); 433 } 434 } 435 436 // send_binding_request_to sends a new STUN request to the given transport address send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr>437 async fn send_binding_request_to(&mut self, to: &str) -> Result<SocketAddr> { 438 let msg = { 439 let attrs: Vec<Box<dyn Setter>> = if !self.software.text.is_empty() { 440 vec![ 441 Box::new(TransactionId::new()), 442 Box::new(BINDING_REQUEST), 443 Box::new(self.software.clone()), 444 ] 445 } else { 446 vec![Box::new(TransactionId::new()), Box::new(BINDING_REQUEST)] 447 }; 448 449 let mut msg = Message::new(); 450 msg.build(&attrs)?; 451 msg 452 }; 453 454 log::debug!("client.SendBindingRequestTo call PerformTransaction 1"); 455 let tr_res = self.perform_transaction(&msg, to, false).await?; 456 457 let mut refl_addr = XorMappedAddress::default(); 458 refl_addr.get_from(&tr_res.msg)?; 459 460 Ok(SocketAddr::new(refl_addr.ip, refl_addr.port)) 461 } 462 463 // send_binding_request sends a new STUN request to the STUN server send_binding_request(&mut self) -> Result<SocketAddr>464 async fn send_binding_request(&mut self) -> Result<SocketAddr> { 465 if self.stun_serv_addr.is_empty() { 466 Err(Error::ErrStunserverAddressNotSet) 467 } else { 468 self.send_binding_request_to(&self.stun_serv_addr.clone()) 469 .await 470 } 471 } 472 473 // find_addr_by_channel_number returns a peer address associated with the 474 // channel number on this UDPConn find_addr_by_channel_number( binding_mgr: &Arc<Mutex<BindingManager>>, ch_num: u16, ) -> Option<SocketAddr>475 async fn find_addr_by_channel_number( 476 binding_mgr: &Arc<Mutex<BindingManager>>, 477 ch_num: u16, 478 ) -> Option<SocketAddr> { 479 let bm = binding_mgr.lock().await; 480 bm.find_by_number(ch_num).map(|b| b.addr) 481 } 482 483 // Allocate sends a TURN allocation request to the given transport address allocate(&mut self) -> Result<RelayConnConfig>484 async fn allocate(&mut self) -> Result<RelayConnConfig> { 485 { 486 let read_ch_tx = self.read_ch_tx.lock().await; 487 log::debug!("allocate check: read_ch_tx_opt = {}", read_ch_tx.is_some()); 488 if read_ch_tx.is_some() { 489 return Err(Error::ErrOneAllocateOnly); 490 } 491 } 492 493 let mut msg = Message::new(); 494 msg.build(&[ 495 Box::new(TransactionId::new()), 496 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)), 497 Box::new(RequestedTransport { 498 protocol: PROTO_UDP, 499 }), 500 Box::new(FINGERPRINT), 501 ])?; 502 503 log::debug!("client.Allocate call PerformTransaction 1"); 504 let tr_res = self 505 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false) 506 .await?; 507 let res = tr_res.msg; 508 509 // Anonymous allocate failed, trying to authenticate. 510 let nonce = Nonce::get_from_as(&res, ATTR_NONCE)?; 511 self.realm = Realm::get_from_as(&res, ATTR_REALM)?; 512 513 self.integrity = MessageIntegrity::new_long_term_integrity( 514 self.username.text.clone(), 515 self.realm.text.clone(), 516 self.password.clone(), 517 ); 518 519 // Trying to authorize. 520 msg.build(&[ 521 Box::new(TransactionId::new()), 522 Box::new(MessageType::new(METHOD_ALLOCATE, CLASS_REQUEST)), 523 Box::new(RequestedTransport { 524 protocol: PROTO_UDP, 525 }), 526 Box::new(self.username.clone()), 527 Box::new(self.realm.clone()), 528 Box::new(nonce.clone()), 529 Box::new(self.integrity.clone()), 530 Box::new(FINGERPRINT), 531 ])?; 532 533 log::debug!("client.Allocate call PerformTransaction 2"); 534 let tr_res = self 535 .perform_transaction(&msg, &self.turn_serv_addr.clone(), false) 536 .await?; 537 let res = tr_res.msg; 538 539 if res.typ.class == CLASS_ERROR_RESPONSE { 540 let mut code = ErrorCodeAttribute::default(); 541 let result = code.get_from(&res); 542 if result.is_err() { 543 return Err(Error::Other(format!("{}", res.typ))); 544 } else { 545 return Err(Error::Other(format!("{} (error {})", res.typ, code))); 546 } 547 } 548 549 // Getting relayed addresses from response. 550 let mut relayed = RelayedAddress::default(); 551 relayed.get_from(&res)?; 552 let relayed_addr = SocketAddr::new(relayed.ip, relayed.port); 553 554 // Getting lifetime from response 555 let mut lifetime = Lifetime::default(); 556 lifetime.get_from(&res)?; 557 558 let (read_ch_tx, read_ch_rx) = mpsc::channel(MAX_READ_QUEUE_SIZE); 559 { 560 let mut read_ch_tx_opt = self.read_ch_tx.lock().await; 561 *read_ch_tx_opt = Some(read_ch_tx); 562 log::debug!("allocate: read_ch_tx_opt = {}", read_ch_tx_opt.is_some()); 563 } 564 565 Ok(RelayConnConfig { 566 relayed_addr, 567 integrity: self.integrity.clone(), 568 nonce, 569 lifetime: lifetime.0, 570 binding_mgr: Arc::clone(&self.binding_mgr), 571 read_ch_rx: Arc::new(Mutex::new(read_ch_rx)), 572 }) 573 } 574 } 575 576 // Client is a STUN server client 577 #[derive(Clone)] 578 pub struct Client { 579 client_internal: Arc<Mutex<ClientInternal>>, 580 } 581 582 impl Client { new(config: ClientConfig) -> Result<Self>583 pub async fn new(config: ClientConfig) -> Result<Self> { 584 let ci = ClientInternal::new(config).await?; 585 Ok(Client { 586 client_internal: Arc::new(Mutex::new(ci)), 587 }) 588 } 589 listen(&self) -> Result<()>590 pub async fn listen(&self) -> Result<()> { 591 let ci = self.client_internal.lock().await; 592 ci.listen().await 593 } 594 allocate(&self) -> Result<impl Conn>595 pub async fn allocate(&self) -> Result<impl Conn> { 596 let config = { 597 let mut ci = self.client_internal.lock().await; 598 ci.allocate().await? 599 }; 600 601 Ok(RelayConn::new(Arc::clone(&self.client_internal), config).await) 602 } 603 close(&self) -> Result<()>604 pub async fn close(&self) -> Result<()> { 605 let mut ci = self.client_internal.lock().await; 606 ci.close().await; 607 Ok(()) 608 } 609 610 // send_binding_request_to sends a new STUN request to the given transport address send_binding_request_to(&self, to: &str) -> Result<SocketAddr>611 pub async fn send_binding_request_to(&self, to: &str) -> Result<SocketAddr> { 612 let mut ci = self.client_internal.lock().await; 613 ci.send_binding_request_to(to).await 614 } 615 616 // send_binding_request sends a new STUN request to the STUN server send_binding_request(&self) -> Result<SocketAddr>617 pub async fn send_binding_request(&self) -> Result<SocketAddr> { 618 let mut ci = self.client_internal.lock().await; 619 ci.send_binding_request().await 620 } 621 } 622