1 use super::*; 2 use crate::error::*; 3 use crate::network_type::*; 4 use crate::udp_network::UDPNetwork; 5 use crate::url::{ProtoType, SchemeType, Url}; 6 use crate::util::*; 7 8 use util::{vnet::net::*, Conn}; 9 10 use crate::candidate::candidate_base::CandidateBaseConfig; 11 use crate::candidate::candidate_host::CandidateHostConfig; 12 use crate::candidate::candidate_relay::CandidateRelayConfig; 13 use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig; 14 use crate::candidate::*; 15 use std::net::{Ipv4Addr, Ipv6Addr}; 16 use std::str::FromStr; 17 use std::sync::Arc; 18 use waitgroup::WaitGroup; 19 20 const STUN_GATHER_TIMEOUT: Duration = Duration::from_secs(5); 21 22 pub(crate) struct GatherCandidatesInternalParams { 23 pub(crate) udp_network: UDPNetwork, 24 pub(crate) candidate_types: Vec<CandidateType>, 25 pub(crate) urls: Vec<Url>, 26 pub(crate) network_types: Vec<NetworkType>, 27 pub(crate) mdns_mode: MulticastDnsMode, 28 pub(crate) mdns_name: String, 29 pub(crate) net: Arc<Net>, 30 pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>, 31 pub(crate) ip_filter: Arc<Option<IpFilterFn>>, 32 pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>, 33 pub(crate) agent_internal: Arc<AgentInternal>, 34 pub(crate) gathering_state: Arc<AtomicU8>, 35 pub(crate) chan_candidate_tx: ChanCandidateTx, 36 } 37 38 struct GatherCandidatesLocalParams { 39 udp_network: UDPNetwork, 40 network_types: Vec<NetworkType>, 41 mdns_mode: MulticastDnsMode, 42 mdns_name: String, 43 interface_filter: Arc<Option<InterfaceFilterFn>>, 44 ip_filter: Arc<Option<IpFilterFn>>, 45 ext_ip_mapper: Arc<Option<ExternalIpMapper>>, 46 net: Arc<Net>, 47 agent_internal: Arc<AgentInternal>, 48 } 49 50 struct GatherCandidatesLocalUDPMuxParams { 51 network_types: Vec<NetworkType>, 52 interface_filter: Arc<Option<InterfaceFilterFn>>, 53 ip_filter: Arc<Option<IpFilterFn>>, 54 ext_ip_mapper: Arc<Option<ExternalIpMapper>>, 55 net: Arc<Net>, 56 agent_internal: Arc<AgentInternal>, 57 udp_mux: Arc<dyn UDPMux + Send + Sync>, 58 } 59 60 struct GatherCandidatesSrflxMappedParasm { 61 network_types: Vec<NetworkType>, 62 port_max: u16, 63 port_min: u16, 64 ext_ip_mapper: Arc<Option<ExternalIpMapper>>, 65 net: Arc<Net>, 66 agent_internal: Arc<AgentInternal>, 67 } 68 69 struct GatherCandidatesSrflxParams { 70 urls: Vec<Url>, 71 network_types: Vec<NetworkType>, 72 port_max: u16, 73 port_min: u16, 74 net: Arc<Net>, 75 agent_internal: Arc<AgentInternal>, 76 } 77 78 impl Agent { gather_candidates_internal(params: GatherCandidatesInternalParams)79 pub(crate) async fn gather_candidates_internal(params: GatherCandidatesInternalParams) { 80 Self::set_gathering_state( 81 ¶ms.chan_candidate_tx, 82 ¶ms.gathering_state, 83 GatheringState::Gathering, 84 ) 85 .await; 86 87 let wg = WaitGroup::new(); 88 89 for t in ¶ms.candidate_types { 90 match t { 91 CandidateType::Host => { 92 let local_params = GatherCandidatesLocalParams { 93 udp_network: params.udp_network.clone(), 94 network_types: params.network_types.clone(), 95 mdns_mode: params.mdns_mode, 96 mdns_name: params.mdns_name.clone(), 97 interface_filter: Arc::clone(¶ms.interface_filter), 98 ip_filter: Arc::clone(¶ms.ip_filter), 99 ext_ip_mapper: Arc::clone(¶ms.ext_ip_mapper), 100 net: Arc::clone(¶ms.net), 101 agent_internal: Arc::clone(¶ms.agent_internal), 102 }; 103 104 let w = wg.worker(); 105 tokio::spawn(async move { 106 let _d = w; 107 108 Self::gather_candidates_local(local_params).await; 109 }); 110 } 111 CandidateType::ServerReflexive => { 112 let ephemeral_config = match ¶ms.udp_network { 113 UDPNetwork::Ephemeral(e) => e, 114 // No server reflexive for muxxed connections 115 UDPNetwork::Muxed(_) => continue, 116 }; 117 118 let srflx_params = GatherCandidatesSrflxParams { 119 urls: params.urls.clone(), 120 network_types: params.network_types.clone(), 121 port_max: ephemeral_config.port_max(), 122 port_min: ephemeral_config.port_min(), 123 net: Arc::clone(¶ms.net), 124 agent_internal: Arc::clone(¶ms.agent_internal), 125 }; 126 let w1 = wg.worker(); 127 tokio::spawn(async move { 128 let _d = w1; 129 130 Self::gather_candidates_srflx(srflx_params).await; 131 }); 132 if let Some(ext_ip_mapper) = &*params.ext_ip_mapper { 133 if ext_ip_mapper.candidate_type == CandidateType::ServerReflexive { 134 let srflx_mapped_params = GatherCandidatesSrflxMappedParasm { 135 network_types: params.network_types.clone(), 136 port_max: ephemeral_config.port_max(), 137 port_min: ephemeral_config.port_min(), 138 ext_ip_mapper: Arc::clone(¶ms.ext_ip_mapper), 139 net: Arc::clone(¶ms.net), 140 agent_internal: Arc::clone(¶ms.agent_internal), 141 }; 142 let w2 = wg.worker(); 143 tokio::spawn(async move { 144 let _d = w2; 145 146 Self::gather_candidates_srflx_mapped(srflx_mapped_params).await; 147 }); 148 } 149 } 150 } 151 CandidateType::Relay => { 152 let urls = params.urls.clone(); 153 let net = Arc::clone(¶ms.net); 154 let agent_internal = Arc::clone(¶ms.agent_internal); 155 let w = wg.worker(); 156 tokio::spawn(async move { 157 let _d = w; 158 159 Self::gather_candidates_relay(urls, net, agent_internal).await; 160 }); 161 } 162 _ => {} 163 } 164 } 165 166 // Block until all STUN and TURN URLs have been gathered (or timed out) 167 wg.wait().await; 168 169 Self::set_gathering_state( 170 ¶ms.chan_candidate_tx, 171 ¶ms.gathering_state, 172 GatheringState::Complete, 173 ) 174 .await; 175 } 176 set_gathering_state( chan_candidate_tx: &ChanCandidateTx, gathering_state: &Arc<AtomicU8>, new_state: GatheringState, )177 async fn set_gathering_state( 178 chan_candidate_tx: &ChanCandidateTx, 179 gathering_state: &Arc<AtomicU8>, 180 new_state: GatheringState, 181 ) { 182 if GatheringState::from(gathering_state.load(Ordering::SeqCst)) != new_state 183 && new_state == GatheringState::Complete 184 { 185 let cand_tx = chan_candidate_tx.lock().await; 186 if let Some(tx) = &*cand_tx { 187 let _ = tx.send(None).await; 188 } 189 } 190 191 gathering_state.store(new_state as u8, Ordering::SeqCst); 192 } 193 gather_candidates_local(params: GatherCandidatesLocalParams)194 async fn gather_candidates_local(params: GatherCandidatesLocalParams) { 195 let GatherCandidatesLocalParams { 196 udp_network, 197 network_types, 198 mdns_mode, 199 mdns_name, 200 interface_filter, 201 ip_filter, 202 ext_ip_mapper, 203 net, 204 agent_internal, 205 } = params; 206 207 // If we wanna use UDP mux, do so 208 // FIXME: We still need to support TCP in combination with this option 209 if let UDPNetwork::Muxed(udp_mux) = udp_network { 210 let result = Self::gather_candidates_local_udp_mux(GatherCandidatesLocalUDPMuxParams { 211 network_types, 212 interface_filter, 213 ip_filter, 214 ext_ip_mapper, 215 net, 216 agent_internal, 217 udp_mux, 218 }) 219 .await; 220 221 if let Err(err) = result { 222 log::error!("Failed to gather local candidates using UDP mux: {}", err); 223 } 224 225 return; 226 } 227 228 let ips = local_interfaces(&net, &interface_filter, &ip_filter, &network_types).await; 229 for ip in ips { 230 let mut mapped_ip = ip; 231 232 if mdns_mode != MulticastDnsMode::QueryAndGather && ext_ip_mapper.is_some() { 233 if let Some(ext_ip_mapper2) = ext_ip_mapper.as_ref() { 234 if ext_ip_mapper2.candidate_type == CandidateType::Host { 235 if let Ok(mi) = ext_ip_mapper2.find_external_ip(&ip.to_string()) { 236 mapped_ip = mi; 237 } else { 238 log::warn!( 239 "[{}]: 1:1 NAT mapping is enabled but no external IP is found for {}", 240 agent_internal.get_name(), 241 ip 242 ); 243 } 244 } 245 } 246 } 247 248 let address = if mdns_mode == MulticastDnsMode::QueryAndGather { 249 mdns_name.clone() 250 } else { 251 mapped_ip.to_string() 252 }; 253 254 //TODO: for network in networks 255 let network = UDP.to_owned(); 256 if let UDPNetwork::Ephemeral(ephemeral_config) = &udp_network { 257 /*TODO:switch network { 258 case tcp: 259 // Handle ICE TCP passive mode 260 261 a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag) 262 conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag) 263 if err != nil { 264 if !errors.Is(err, ErrTCPMuxNotInitialized) { 265 a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag) 266 } 267 continue 268 } 269 port = conn.LocalAddr().(*net.TCPAddr).Port 270 tcpType = TCPTypePassive 271 // is there a way to verify that the listen address is even 272 // accessible from the current interface. 273 case udp:*/ 274 275 let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range( 276 &net, 277 ephemeral_config.port_max(), 278 ephemeral_config.port_min(), 279 SocketAddr::new(ip, 0), 280 ) 281 .await 282 { 283 Ok(conn) => conn, 284 Err(err) => { 285 log::warn!( 286 "[{}]: could not listen {} {}: {}", 287 agent_internal.get_name(), 288 network, 289 ip, 290 err 291 ); 292 continue; 293 } 294 }; 295 296 let port = match conn.local_addr() { 297 Ok(addr) => addr.port(), 298 Err(err) => { 299 log::warn!( 300 "[{}]: could not get local addr: {}", 301 agent_internal.get_name(), 302 err 303 ); 304 continue; 305 } 306 }; 307 308 let host_config = CandidateHostConfig { 309 base_config: CandidateBaseConfig { 310 network: network.clone(), 311 address, 312 port, 313 component: COMPONENT_RTP, 314 conn: Some(conn), 315 ..CandidateBaseConfig::default() 316 }, 317 ..CandidateHostConfig::default() 318 }; 319 320 let candidate: Arc<dyn Candidate + Send + Sync> = 321 match host_config.new_candidate_host() { 322 Ok(candidate) => { 323 if mdns_mode == MulticastDnsMode::QueryAndGather { 324 if let Err(err) = candidate.set_ip(&ip) { 325 log::warn!( 326 "[{}]: Failed to create host candidate: {} {} {}: {:?}", 327 agent_internal.get_name(), 328 network, 329 mapped_ip, 330 port, 331 err 332 ); 333 continue; 334 } 335 } 336 Arc::new(candidate) 337 } 338 Err(err) => { 339 log::warn!( 340 "[{}]: Failed to create host candidate: {} {} {}: {}", 341 agent_internal.get_name(), 342 network, 343 mapped_ip, 344 port, 345 err 346 ); 347 continue; 348 } 349 }; 350 351 { 352 if let Err(err) = agent_internal.add_candidate(&candidate).await { 353 if let Err(close_err) = candidate.close().await { 354 log::warn!( 355 "[{}]: Failed to close candidate: {}", 356 agent_internal.get_name(), 357 close_err 358 ); 359 } 360 log::warn!( 361 "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}", 362 agent_internal.get_name(), 363 err 364 ); 365 } 366 } 367 } 368 } 369 } 370 gather_candidates_local_udp_mux( params: GatherCandidatesLocalUDPMuxParams, ) -> Result<()>371 async fn gather_candidates_local_udp_mux( 372 params: GatherCandidatesLocalUDPMuxParams, 373 ) -> Result<()> { 374 let GatherCandidatesLocalUDPMuxParams { 375 network_types, 376 interface_filter, 377 ip_filter, 378 ext_ip_mapper, 379 net, 380 agent_internal, 381 udp_mux, 382 } = params; 383 384 // Filter out non UDP network types 385 let relevant_network_types: Vec<_> = 386 network_types.into_iter().filter(|n| n.is_udp()).collect(); 387 388 let udp_mux = Arc::clone(&udp_mux); 389 390 // There's actually only one, but `local_interfaces` requires a slice. 391 let local_ips = 392 local_interfaces(&net, &interface_filter, &ip_filter, &relevant_network_types).await; 393 394 let candidate_ip = ext_ip_mapper 395 .as_ref() // Arc 396 .as_ref() // Option 397 .and_then(|mapper| { 398 if mapper.candidate_type != CandidateType::Host { 399 return None; 400 } 401 402 local_ips 403 .iter() 404 .find_map(|ip| match mapper.find_external_ip(&ip.to_string()) { 405 Ok(ip) => Some(ip), 406 Err(err) => { 407 log::warn!( 408 "1:1 NAT mapping is enabled but not external IP is found for {}: {}", 409 ip, 410 err 411 ); 412 None 413 } 414 }) 415 }) 416 .or_else(|| local_ips.iter().copied().next()); 417 418 let candidate_ip = match candidate_ip { 419 None => return Err(Error::ErrCandidateIpNotFound), 420 Some(ip) => ip, 421 }; 422 423 let ufrag = { 424 let ufrag_pwd = agent_internal.ufrag_pwd.lock().await; 425 426 ufrag_pwd.local_ufrag.clone() 427 }; 428 429 let conn = udp_mux.get_conn(&ufrag).await?; 430 let port = conn.local_addr()?.port(); 431 432 let host_config = CandidateHostConfig { 433 base_config: CandidateBaseConfig { 434 network: UDP.to_owned(), 435 address: candidate_ip.to_string(), 436 port, 437 conn: Some(conn), 438 component: COMPONENT_RTP, 439 ..Default::default() 440 }, 441 tcp_type: TcpType::Unspecified, 442 }; 443 444 let candidate: Arc<dyn Candidate + Send + Sync> = 445 Arc::new(host_config.new_candidate_host()?); 446 447 agent_internal.add_candidate(&candidate).await?; 448 449 Ok(()) 450 } 451 gather_candidates_srflx_mapped(params: GatherCandidatesSrflxMappedParasm)452 async fn gather_candidates_srflx_mapped(params: GatherCandidatesSrflxMappedParasm) { 453 let GatherCandidatesSrflxMappedParasm { 454 network_types, 455 port_max, 456 port_min, 457 ext_ip_mapper, 458 net, 459 agent_internal, 460 } = params; 461 462 let wg = WaitGroup::new(); 463 464 for network_type in network_types { 465 if network_type.is_tcp() { 466 continue; 467 } 468 469 let network = network_type.to_string(); 470 let net2 = Arc::clone(&net); 471 let agent_internal2 = Arc::clone(&agent_internal); 472 let ext_ip_mapper2 = Arc::clone(&ext_ip_mapper); 473 474 let w = wg.worker(); 475 tokio::spawn(async move { 476 let _d = w; 477 478 let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range( 479 &net2, 480 port_max, 481 port_min, 482 if network_type.is_ipv4() { 483 SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0) 484 } else { 485 SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0) 486 }, 487 ) 488 .await 489 { 490 Ok(conn) => conn, 491 Err(err) => { 492 log::warn!( 493 "[{}]: Failed to listen {}: {}", 494 agent_internal2.get_name(), 495 network, 496 err 497 ); 498 return Ok(()); 499 } 500 }; 501 502 let laddr = conn.local_addr()?; 503 let mapped_ip = { 504 if let Some(ext_ip_mapper3) = &*ext_ip_mapper2 { 505 match ext_ip_mapper3.find_external_ip(&laddr.ip().to_string()) { 506 Ok(ip) => ip, 507 Err(err) => { 508 log::warn!( 509 "[{}]: 1:1 NAT mapping is enabled but no external IP is found for {}: {}", 510 agent_internal2.get_name(), 511 laddr, 512 err 513 ); 514 return Ok(()); 515 } 516 } 517 } else { 518 log::error!( 519 "[{}]: ext_ip_mapper is None in gather_candidates_srflx_mapped", 520 agent_internal2.get_name(), 521 ); 522 return Ok(()); 523 } 524 }; 525 526 let srflx_config = CandidateServerReflexiveConfig { 527 base_config: CandidateBaseConfig { 528 network: network.clone(), 529 address: mapped_ip.to_string(), 530 port: laddr.port(), 531 component: COMPONENT_RTP, 532 conn: Some(conn), 533 ..CandidateBaseConfig::default() 534 }, 535 rel_addr: laddr.ip().to_string(), 536 rel_port: laddr.port(), 537 }; 538 539 let candidate: Arc<dyn Candidate + Send + Sync> = 540 match srflx_config.new_candidate_server_reflexive() { 541 Ok(candidate) => Arc::new(candidate), 542 Err(err) => { 543 log::warn!( 544 "[{}]: Failed to create server reflexive candidate: {} {} {}: {}", 545 agent_internal2.get_name(), 546 network, 547 mapped_ip, 548 laddr.port(), 549 err 550 ); 551 return Ok(()); 552 } 553 }; 554 555 { 556 if let Err(err) = agent_internal2.add_candidate(&candidate).await { 557 if let Err(close_err) = candidate.close().await { 558 log::warn!( 559 "[{}]: Failed to close candidate: {}", 560 agent_internal2.get_name(), 561 close_err 562 ); 563 } 564 log::warn!( 565 "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}", 566 agent_internal2.get_name(), 567 err 568 ); 569 } 570 } 571 572 Result::<()>::Ok(()) 573 }); 574 } 575 576 wg.wait().await; 577 } 578 gather_candidates_srflx(params: GatherCandidatesSrflxParams)579 async fn gather_candidates_srflx(params: GatherCandidatesSrflxParams) { 580 let GatherCandidatesSrflxParams { 581 urls, 582 network_types, 583 port_max, 584 port_min, 585 net, 586 agent_internal, 587 } = params; 588 589 let wg = WaitGroup::new(); 590 for network_type in network_types { 591 if network_type.is_tcp() { 592 continue; 593 } 594 595 for url in &urls { 596 let network = network_type.to_string(); 597 let is_ipv4 = network_type.is_ipv4(); 598 let url = url.clone(); 599 let net2 = Arc::clone(&net); 600 let agent_internal2 = Arc::clone(&agent_internal); 601 602 let w = wg.worker(); 603 tokio::spawn(async move { 604 let _d = w; 605 606 let host_port = format!("{}:{}", url.host, url.port); 607 let server_addr = match net2.resolve_addr(is_ipv4, &host_port).await { 608 Ok(addr) => addr, 609 Err(err) => { 610 log::warn!( 611 "[{}]: failed to resolve stun host: {}: {}", 612 agent_internal2.get_name(), 613 host_port, 614 err 615 ); 616 return Ok(()); 617 } 618 }; 619 620 let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range( 621 &net2, 622 port_max, 623 port_min, 624 if is_ipv4 { 625 SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0) 626 } else { 627 SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0) 628 }, 629 ) 630 .await 631 { 632 Ok(conn) => conn, 633 Err(err) => { 634 log::warn!( 635 "[{}]: Failed to listen for {}: {}", 636 agent_internal2.get_name(), 637 server_addr, 638 err 639 ); 640 return Ok(()); 641 } 642 }; 643 644 let xoraddr = 645 match get_xormapped_addr(&conn, server_addr, STUN_GATHER_TIMEOUT).await { 646 Ok(xoraddr) => xoraddr, 647 Err(err) => { 648 log::warn!( 649 "[{}]: could not get server reflexive address {} {}: {}", 650 agent_internal2.get_name(), 651 network, 652 url, 653 err 654 ); 655 return Ok(()); 656 } 657 }; 658 659 let (ip, port) = (xoraddr.ip, xoraddr.port); 660 661 let laddr = conn.local_addr()?; 662 let srflx_config = CandidateServerReflexiveConfig { 663 base_config: CandidateBaseConfig { 664 network: network.clone(), 665 address: ip.to_string(), 666 port, 667 component: COMPONENT_RTP, 668 conn: Some(conn), 669 ..CandidateBaseConfig::default() 670 }, 671 rel_addr: laddr.ip().to_string(), 672 rel_port: laddr.port(), 673 }; 674 675 let candidate: Arc<dyn Candidate + Send + Sync> = 676 match srflx_config.new_candidate_server_reflexive() { 677 Ok(candidate) => Arc::new(candidate), 678 Err(err) => { 679 log::warn!( 680 "[{}]: Failed to create server reflexive candidate: {} {} {}: {:?}", 681 agent_internal2.get_name(), 682 network, 683 ip, 684 port, 685 err 686 ); 687 return Ok(()); 688 } 689 }; 690 691 { 692 if let Err(err) = agent_internal2.add_candidate(&candidate).await { 693 if let Err(close_err) = candidate.close().await { 694 log::warn!( 695 "[{}]: Failed to close candidate: {}", 696 agent_internal2.get_name(), 697 close_err 698 ); 699 } 700 log::warn!( 701 "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}", 702 agent_internal2.get_name(), 703 err 704 ); 705 } 706 } 707 708 Result::<()>::Ok(()) 709 }); 710 } 711 } 712 713 wg.wait().await; 714 } 715 gather_candidates_relay( urls: Vec<Url>, net: Arc<Net>, agent_internal: Arc<AgentInternal>, )716 pub(crate) async fn gather_candidates_relay( 717 urls: Vec<Url>, 718 net: Arc<Net>, 719 agent_internal: Arc<AgentInternal>, 720 ) { 721 let wg = WaitGroup::new(); 722 723 for url in urls { 724 if url.scheme != SchemeType::Turn && url.scheme != SchemeType::Turns { 725 continue; 726 } 727 if url.username.is_empty() { 728 log::error!( 729 "[{}]:Failed to gather relay candidates: {:?}", 730 agent_internal.get_name(), 731 Error::ErrUsernameEmpty 732 ); 733 return; 734 } 735 if url.password.is_empty() { 736 log::error!( 737 "[{}]: Failed to gather relay candidates: {:?}", 738 agent_internal.get_name(), 739 Error::ErrPasswordEmpty 740 ); 741 return; 742 } 743 744 let network = NetworkType::Udp4.to_string(); 745 let net2 = Arc::clone(&net); 746 let agent_internal2 = Arc::clone(&agent_internal); 747 748 let w = wg.worker(); 749 tokio::spawn(async move { 750 let _d = w; 751 752 let turn_server_addr = format!("{}:{}", url.host, url.port); 753 754 let (loc_conn, rel_addr, rel_port) = 755 if url.proto == ProtoType::Udp && url.scheme == SchemeType::Turn { 756 let loc_conn = match net2.bind(SocketAddr::from_str("0.0.0.0:0")?).await { 757 Ok(c) => c, 758 Err(err) => { 759 log::warn!( 760 "[{}]: Failed to listen due to error: {}", 761 agent_internal2.get_name(), 762 err 763 ); 764 return Ok(()); 765 } 766 }; 767 768 let local_addr = loc_conn.local_addr()?; 769 let rel_addr = local_addr.ip().to_string(); 770 let rel_port = local_addr.port(); 771 (loc_conn, rel_addr, rel_port) 772 /*TODO: case url.proto == ProtoType::UDP && url.scheme == SchemeType::TURNS{ 773 case a.proxyDialer != nil && url.Proto == ProtoTypeTCP && (url.Scheme == SchemeTypeTURN || url.Scheme == SchemeTypeTURNS): 774 case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURN: 775 case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURNS:*/ 776 } else { 777 log::warn!( 778 "[{}]: Unable to handle URL in gather_candidates_relay {}", 779 agent_internal2.get_name(), 780 url 781 ); 782 return Ok(()); 783 }; 784 785 let cfg = turn::client::ClientConfig { 786 stun_serv_addr: String::new(), 787 turn_serv_addr: turn_server_addr.clone(), 788 username: url.username, 789 password: url.password, 790 realm: String::new(), 791 software: String::new(), 792 rto_in_ms: 0, 793 conn: loc_conn, 794 vnet: Some(Arc::clone(&net2)), 795 }; 796 let client = match turn::client::Client::new(cfg).await { 797 Ok(client) => Arc::new(client), 798 Err(err) => { 799 log::warn!( 800 "[{}]: Failed to build new turn.Client {} {}\n", 801 agent_internal2.get_name(), 802 turn_server_addr, 803 err 804 ); 805 return Ok(()); 806 } 807 }; 808 if let Err(err) = client.listen().await { 809 let _ = client.close().await; 810 log::warn!( 811 "[{}]: Failed to listen on turn.Client {} {}", 812 agent_internal2.get_name(), 813 turn_server_addr, 814 err 815 ); 816 return Ok(()); 817 } 818 819 let relay_conn = match client.allocate().await { 820 Ok(conn) => conn, 821 Err(err) => { 822 let _ = client.close().await; 823 log::warn!( 824 "[{}]: Failed to allocate on turn.Client {} {}", 825 agent_internal2.get_name(), 826 turn_server_addr, 827 err 828 ); 829 return Ok(()); 830 } 831 }; 832 833 let raddr = relay_conn.local_addr()?; 834 let relay_config = CandidateRelayConfig { 835 base_config: CandidateBaseConfig { 836 network: network.clone(), 837 address: raddr.ip().to_string(), 838 port: raddr.port(), 839 component: COMPONENT_RTP, 840 conn: Some(Arc::new(relay_conn)), 841 ..CandidateBaseConfig::default() 842 }, 843 rel_addr, 844 rel_port, 845 relay_client: Some(Arc::clone(&client)), 846 }; 847 848 let candidate: Arc<dyn Candidate + Send + Sync> = 849 match relay_config.new_candidate_relay() { 850 Ok(candidate) => Arc::new(candidate), 851 Err(err) => { 852 let _ = client.close().await; 853 log::warn!( 854 "[{}]: Failed to create relay candidate: {} {}: {}", 855 agent_internal2.get_name(), 856 network, 857 raddr, 858 err 859 ); 860 return Ok(()); 861 } 862 }; 863 864 { 865 if let Err(err) = agent_internal2.add_candidate(&candidate).await { 866 if let Err(close_err) = candidate.close().await { 867 log::warn!( 868 "[{}]: Failed to close candidate: {}", 869 agent_internal2.get_name(), 870 close_err 871 ); 872 } 873 log::warn!( 874 "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}", 875 agent_internal2.get_name(), 876 err 877 ); 878 } 879 } 880 881 Result::<()>::Ok(()) 882 }); 883 } 884 885 wg.wait().await; 886 } 887 } 888