xref: /webrtc/ice/src/agent/agent_gather.rs (revision 5b79f08a)
1 use super::*;
2 use crate::error::*;
3 use crate::network_type::*;
4 use crate::udp_network::UDPNetwork;
5 use crate::url::{ProtoType, SchemeType, Url};
6 use crate::util::*;
7 
8 use util::{vnet::net::*, Conn};
9 
10 use crate::candidate::candidate_base::CandidateBaseConfig;
11 use crate::candidate::candidate_host::CandidateHostConfig;
12 use crate::candidate::candidate_relay::CandidateRelayConfig;
13 use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
14 use crate::candidate::*;
15 use std::net::{Ipv4Addr, Ipv6Addr};
16 use std::str::FromStr;
17 use std::sync::Arc;
18 use waitgroup::WaitGroup;
19 
20 const STUN_GATHER_TIMEOUT: Duration = Duration::from_secs(5);
21 
22 pub(crate) struct GatherCandidatesInternalParams {
23     pub(crate) udp_network: UDPNetwork,
24     pub(crate) candidate_types: Vec<CandidateType>,
25     pub(crate) urls: Vec<Url>,
26     pub(crate) network_types: Vec<NetworkType>,
27     pub(crate) mdns_mode: MulticastDnsMode,
28     pub(crate) mdns_name: String,
29     pub(crate) net: Arc<Net>,
30     pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>,
31     pub(crate) ip_filter: Arc<Option<IpFilterFn>>,
32     pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
33     pub(crate) agent_internal: Arc<AgentInternal>,
34     pub(crate) gathering_state: Arc<AtomicU8>,
35     pub(crate) chan_candidate_tx: ChanCandidateTx,
36 }
37 
38 struct GatherCandidatesLocalParams {
39     udp_network: UDPNetwork,
40     network_types: Vec<NetworkType>,
41     mdns_mode: MulticastDnsMode,
42     mdns_name: String,
43     interface_filter: Arc<Option<InterfaceFilterFn>>,
44     ip_filter: Arc<Option<IpFilterFn>>,
45     ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
46     net: Arc<Net>,
47     agent_internal: Arc<AgentInternal>,
48 }
49 
50 struct GatherCandidatesLocalUDPMuxParams {
51     network_types: Vec<NetworkType>,
52     interface_filter: Arc<Option<InterfaceFilterFn>>,
53     ip_filter: Arc<Option<IpFilterFn>>,
54     ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
55     net: Arc<Net>,
56     agent_internal: Arc<AgentInternal>,
57     udp_mux: Arc<dyn UDPMux + Send + Sync>,
58 }
59 
60 struct GatherCandidatesSrflxMappedParasm {
61     network_types: Vec<NetworkType>,
62     port_max: u16,
63     port_min: u16,
64     ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
65     net: Arc<Net>,
66     agent_internal: Arc<AgentInternal>,
67 }
68 
69 struct GatherCandidatesSrflxParams {
70     urls: Vec<Url>,
71     network_types: Vec<NetworkType>,
72     port_max: u16,
73     port_min: u16,
74     net: Arc<Net>,
75     agent_internal: Arc<AgentInternal>,
76 }
77 
78 impl Agent {
gather_candidates_internal(params: GatherCandidatesInternalParams)79     pub(crate) async fn gather_candidates_internal(params: GatherCandidatesInternalParams) {
80         Self::set_gathering_state(
81             &params.chan_candidate_tx,
82             &params.gathering_state,
83             GatheringState::Gathering,
84         )
85         .await;
86 
87         let wg = WaitGroup::new();
88 
89         for t in &params.candidate_types {
90             match t {
91                 CandidateType::Host => {
92                     let local_params = GatherCandidatesLocalParams {
93                         udp_network: params.udp_network.clone(),
94                         network_types: params.network_types.clone(),
95                         mdns_mode: params.mdns_mode,
96                         mdns_name: params.mdns_name.clone(),
97                         interface_filter: Arc::clone(&params.interface_filter),
98                         ip_filter: Arc::clone(&params.ip_filter),
99                         ext_ip_mapper: Arc::clone(&params.ext_ip_mapper),
100                         net: Arc::clone(&params.net),
101                         agent_internal: Arc::clone(&params.agent_internal),
102                     };
103 
104                     let w = wg.worker();
105                     tokio::spawn(async move {
106                         let _d = w;
107 
108                         Self::gather_candidates_local(local_params).await;
109                     });
110                 }
111                 CandidateType::ServerReflexive => {
112                     let ephemeral_config = match &params.udp_network {
113                         UDPNetwork::Ephemeral(e) => e,
114                         // No server reflexive for muxxed connections
115                         UDPNetwork::Muxed(_) => continue,
116                     };
117 
118                     let srflx_params = GatherCandidatesSrflxParams {
119                         urls: params.urls.clone(),
120                         network_types: params.network_types.clone(),
121                         port_max: ephemeral_config.port_max(),
122                         port_min: ephemeral_config.port_min(),
123                         net: Arc::clone(&params.net),
124                         agent_internal: Arc::clone(&params.agent_internal),
125                     };
126                     let w1 = wg.worker();
127                     tokio::spawn(async move {
128                         let _d = w1;
129 
130                         Self::gather_candidates_srflx(srflx_params).await;
131                     });
132                     if let Some(ext_ip_mapper) = &*params.ext_ip_mapper {
133                         if ext_ip_mapper.candidate_type == CandidateType::ServerReflexive {
134                             let srflx_mapped_params = GatherCandidatesSrflxMappedParasm {
135                                 network_types: params.network_types.clone(),
136                                 port_max: ephemeral_config.port_max(),
137                                 port_min: ephemeral_config.port_min(),
138                                 ext_ip_mapper: Arc::clone(&params.ext_ip_mapper),
139                                 net: Arc::clone(&params.net),
140                                 agent_internal: Arc::clone(&params.agent_internal),
141                             };
142                             let w2 = wg.worker();
143                             tokio::spawn(async move {
144                                 let _d = w2;
145 
146                                 Self::gather_candidates_srflx_mapped(srflx_mapped_params).await;
147                             });
148                         }
149                     }
150                 }
151                 CandidateType::Relay => {
152                     let urls = params.urls.clone();
153                     let net = Arc::clone(&params.net);
154                     let agent_internal = Arc::clone(&params.agent_internal);
155                     let w = wg.worker();
156                     tokio::spawn(async move {
157                         let _d = w;
158 
159                         Self::gather_candidates_relay(urls, net, agent_internal).await;
160                     });
161                 }
162                 _ => {}
163             }
164         }
165 
166         // Block until all STUN and TURN URLs have been gathered (or timed out)
167         wg.wait().await;
168 
169         Self::set_gathering_state(
170             &params.chan_candidate_tx,
171             &params.gathering_state,
172             GatheringState::Complete,
173         )
174         .await;
175     }
176 
set_gathering_state( chan_candidate_tx: &ChanCandidateTx, gathering_state: &Arc<AtomicU8>, new_state: GatheringState, )177     async fn set_gathering_state(
178         chan_candidate_tx: &ChanCandidateTx,
179         gathering_state: &Arc<AtomicU8>,
180         new_state: GatheringState,
181     ) {
182         if GatheringState::from(gathering_state.load(Ordering::SeqCst)) != new_state
183             && new_state == GatheringState::Complete
184         {
185             let cand_tx = chan_candidate_tx.lock().await;
186             if let Some(tx) = &*cand_tx {
187                 let _ = tx.send(None).await;
188             }
189         }
190 
191         gathering_state.store(new_state as u8, Ordering::SeqCst);
192     }
193 
gather_candidates_local(params: GatherCandidatesLocalParams)194     async fn gather_candidates_local(params: GatherCandidatesLocalParams) {
195         let GatherCandidatesLocalParams {
196             udp_network,
197             network_types,
198             mdns_mode,
199             mdns_name,
200             interface_filter,
201             ip_filter,
202             ext_ip_mapper,
203             net,
204             agent_internal,
205         } = params;
206 
207         // If we wanna use UDP mux, do so
208         // FIXME: We still need to support TCP in combination with this option
209         if let UDPNetwork::Muxed(udp_mux) = udp_network {
210             let result = Self::gather_candidates_local_udp_mux(GatherCandidatesLocalUDPMuxParams {
211                 network_types,
212                 interface_filter,
213                 ip_filter,
214                 ext_ip_mapper,
215                 net,
216                 agent_internal,
217                 udp_mux,
218             })
219             .await;
220 
221             if let Err(err) = result {
222                 log::error!("Failed to gather local candidates using UDP mux: {}", err);
223             }
224 
225             return;
226         }
227 
228         let ips = local_interfaces(&net, &interface_filter, &ip_filter, &network_types).await;
229         for ip in ips {
230             let mut mapped_ip = ip;
231 
232             if mdns_mode != MulticastDnsMode::QueryAndGather && ext_ip_mapper.is_some() {
233                 if let Some(ext_ip_mapper2) = ext_ip_mapper.as_ref() {
234                     if ext_ip_mapper2.candidate_type == CandidateType::Host {
235                         if let Ok(mi) = ext_ip_mapper2.find_external_ip(&ip.to_string()) {
236                             mapped_ip = mi;
237                         } else {
238                             log::warn!(
239                                 "[{}]: 1:1 NAT mapping is enabled but no external IP is found for {}",
240                                 agent_internal.get_name(),
241                                 ip
242                             );
243                         }
244                     }
245                 }
246             }
247 
248             let address = if mdns_mode == MulticastDnsMode::QueryAndGather {
249                 mdns_name.clone()
250             } else {
251                 mapped_ip.to_string()
252             };
253 
254             //TODO: for network in networks
255             let network = UDP.to_owned();
256             if let UDPNetwork::Ephemeral(ephemeral_config) = &udp_network {
257                 /*TODO:switch network {
258                 case tcp:
259                     // Handle ICE TCP passive mode
260 
261                     a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
262                     conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
263                     if err != nil {
264                         if !errors.Is(err, ErrTCPMuxNotInitialized) {
265                             a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag)
266                         }
267                         continue
268                     }
269                     port = conn.LocalAddr().(*net.TCPAddr).Port
270                     tcpType = TCPTypePassive
271                     // is there a way to verify that the listen address is even
272                     // accessible from the current interface.
273                 case udp:*/
274 
275                 let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range(
276                     &net,
277                     ephemeral_config.port_max(),
278                     ephemeral_config.port_min(),
279                     SocketAddr::new(ip, 0),
280                 )
281                 .await
282                 {
283                     Ok(conn) => conn,
284                     Err(err) => {
285                         log::warn!(
286                             "[{}]: could not listen {} {}: {}",
287                             agent_internal.get_name(),
288                             network,
289                             ip,
290                             err
291                         );
292                         continue;
293                     }
294                 };
295 
296                 let port = match conn.local_addr() {
297                     Ok(addr) => addr.port(),
298                     Err(err) => {
299                         log::warn!(
300                             "[{}]: could not get local addr: {}",
301                             agent_internal.get_name(),
302                             err
303                         );
304                         continue;
305                     }
306                 };
307 
308                 let host_config = CandidateHostConfig {
309                     base_config: CandidateBaseConfig {
310                         network: network.clone(),
311                         address,
312                         port,
313                         component: COMPONENT_RTP,
314                         conn: Some(conn),
315                         ..CandidateBaseConfig::default()
316                     },
317                     ..CandidateHostConfig::default()
318                 };
319 
320                 let candidate: Arc<dyn Candidate + Send + Sync> =
321                     match host_config.new_candidate_host() {
322                         Ok(candidate) => {
323                             if mdns_mode == MulticastDnsMode::QueryAndGather {
324                                 if let Err(err) = candidate.set_ip(&ip) {
325                                     log::warn!(
326                                         "[{}]: Failed to create host candidate: {} {} {}: {:?}",
327                                         agent_internal.get_name(),
328                                         network,
329                                         mapped_ip,
330                                         port,
331                                         err
332                                     );
333                                     continue;
334                                 }
335                             }
336                             Arc::new(candidate)
337                         }
338                         Err(err) => {
339                             log::warn!(
340                                 "[{}]: Failed to create host candidate: {} {} {}: {}",
341                                 agent_internal.get_name(),
342                                 network,
343                                 mapped_ip,
344                                 port,
345                                 err
346                             );
347                             continue;
348                         }
349                     };
350 
351                 {
352                     if let Err(err) = agent_internal.add_candidate(&candidate).await {
353                         if let Err(close_err) = candidate.close().await {
354                             log::warn!(
355                                 "[{}]: Failed to close candidate: {}",
356                                 agent_internal.get_name(),
357                                 close_err
358                             );
359                         }
360                         log::warn!(
361                             "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}",
362                             agent_internal.get_name(),
363                             err
364                         );
365                     }
366                 }
367             }
368         }
369     }
370 
gather_candidates_local_udp_mux( params: GatherCandidatesLocalUDPMuxParams, ) -> Result<()>371     async fn gather_candidates_local_udp_mux(
372         params: GatherCandidatesLocalUDPMuxParams,
373     ) -> Result<()> {
374         let GatherCandidatesLocalUDPMuxParams {
375             network_types,
376             interface_filter,
377             ip_filter,
378             ext_ip_mapper,
379             net,
380             agent_internal,
381             udp_mux,
382         } = params;
383 
384         // Filter out non UDP network types
385         let relevant_network_types: Vec<_> =
386             network_types.into_iter().filter(|n| n.is_udp()).collect();
387 
388         let udp_mux = Arc::clone(&udp_mux);
389 
390         // There's actually only one, but `local_interfaces` requires a slice.
391         let local_ips =
392             local_interfaces(&net, &interface_filter, &ip_filter, &relevant_network_types).await;
393 
394         let candidate_ip = ext_ip_mapper
395             .as_ref() // Arc
396             .as_ref() // Option
397             .and_then(|mapper| {
398                 if mapper.candidate_type != CandidateType::Host {
399                     return None;
400                 }
401 
402                 local_ips
403                     .iter()
404                     .find_map(|ip| match mapper.find_external_ip(&ip.to_string()) {
405                         Ok(ip) => Some(ip),
406                         Err(err) => {
407                             log::warn!(
408                             "1:1 NAT mapping is enabled but not external IP is found for {}: {}",
409                             ip,
410                             err
411                         );
412                             None
413                         }
414                     })
415             })
416             .or_else(|| local_ips.iter().copied().next());
417 
418         let candidate_ip = match candidate_ip {
419             None => return Err(Error::ErrCandidateIpNotFound),
420             Some(ip) => ip,
421         };
422 
423         let ufrag = {
424             let ufrag_pwd = agent_internal.ufrag_pwd.lock().await;
425 
426             ufrag_pwd.local_ufrag.clone()
427         };
428 
429         let conn = udp_mux.get_conn(&ufrag).await?;
430         let port = conn.local_addr()?.port();
431 
432         let host_config = CandidateHostConfig {
433             base_config: CandidateBaseConfig {
434                 network: UDP.to_owned(),
435                 address: candidate_ip.to_string(),
436                 port,
437                 conn: Some(conn),
438                 component: COMPONENT_RTP,
439                 ..Default::default()
440             },
441             tcp_type: TcpType::Unspecified,
442         };
443 
444         let candidate: Arc<dyn Candidate + Send + Sync> =
445             Arc::new(host_config.new_candidate_host()?);
446 
447         agent_internal.add_candidate(&candidate).await?;
448 
449         Ok(())
450     }
451 
gather_candidates_srflx_mapped(params: GatherCandidatesSrflxMappedParasm)452     async fn gather_candidates_srflx_mapped(params: GatherCandidatesSrflxMappedParasm) {
453         let GatherCandidatesSrflxMappedParasm {
454             network_types,
455             port_max,
456             port_min,
457             ext_ip_mapper,
458             net,
459             agent_internal,
460         } = params;
461 
462         let wg = WaitGroup::new();
463 
464         for network_type in network_types {
465             if network_type.is_tcp() {
466                 continue;
467             }
468 
469             let network = network_type.to_string();
470             let net2 = Arc::clone(&net);
471             let agent_internal2 = Arc::clone(&agent_internal);
472             let ext_ip_mapper2 = Arc::clone(&ext_ip_mapper);
473 
474             let w = wg.worker();
475             tokio::spawn(async move {
476                 let _d = w;
477 
478                 let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range(
479                     &net2,
480                     port_max,
481                     port_min,
482                     if network_type.is_ipv4() {
483                         SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)
484                     } else {
485                         SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0)
486                     },
487                 )
488                 .await
489                 {
490                     Ok(conn) => conn,
491                     Err(err) => {
492                         log::warn!(
493                             "[{}]: Failed to listen {}: {}",
494                             agent_internal2.get_name(),
495                             network,
496                             err
497                         );
498                         return Ok(());
499                     }
500                 };
501 
502                 let laddr = conn.local_addr()?;
503                 let mapped_ip = {
504                     if let Some(ext_ip_mapper3) = &*ext_ip_mapper2 {
505                         match ext_ip_mapper3.find_external_ip(&laddr.ip().to_string()) {
506                             Ok(ip) => ip,
507                             Err(err) => {
508                                 log::warn!(
509                                     "[{}]: 1:1 NAT mapping is enabled but no external IP is found for {}: {}",
510                                     agent_internal2.get_name(),
511                                     laddr,
512                                     err
513                                 );
514                                 return Ok(());
515                             }
516                         }
517                     } else {
518                         log::error!(
519                             "[{}]: ext_ip_mapper is None in gather_candidates_srflx_mapped",
520                             agent_internal2.get_name(),
521                         );
522                         return Ok(());
523                     }
524                 };
525 
526                 let srflx_config = CandidateServerReflexiveConfig {
527                     base_config: CandidateBaseConfig {
528                         network: network.clone(),
529                         address: mapped_ip.to_string(),
530                         port: laddr.port(),
531                         component: COMPONENT_RTP,
532                         conn: Some(conn),
533                         ..CandidateBaseConfig::default()
534                     },
535                     rel_addr: laddr.ip().to_string(),
536                     rel_port: laddr.port(),
537                 };
538 
539                 let candidate: Arc<dyn Candidate + Send + Sync> =
540                     match srflx_config.new_candidate_server_reflexive() {
541                         Ok(candidate) => Arc::new(candidate),
542                         Err(err) => {
543                             log::warn!(
544                                 "[{}]: Failed to create server reflexive candidate: {} {} {}: {}",
545                                 agent_internal2.get_name(),
546                                 network,
547                                 mapped_ip,
548                                 laddr.port(),
549                                 err
550                             );
551                             return Ok(());
552                         }
553                     };
554 
555                 {
556                     if let Err(err) = agent_internal2.add_candidate(&candidate).await {
557                         if let Err(close_err) = candidate.close().await {
558                             log::warn!(
559                                 "[{}]: Failed to close candidate: {}",
560                                 agent_internal2.get_name(),
561                                 close_err
562                             );
563                         }
564                         log::warn!(
565                             "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}",
566                             agent_internal2.get_name(),
567                             err
568                         );
569                     }
570                 }
571 
572                 Result::<()>::Ok(())
573             });
574         }
575 
576         wg.wait().await;
577     }
578 
gather_candidates_srflx(params: GatherCandidatesSrflxParams)579     async fn gather_candidates_srflx(params: GatherCandidatesSrflxParams) {
580         let GatherCandidatesSrflxParams {
581             urls,
582             network_types,
583             port_max,
584             port_min,
585             net,
586             agent_internal,
587         } = params;
588 
589         let wg = WaitGroup::new();
590         for network_type in network_types {
591             if network_type.is_tcp() {
592                 continue;
593             }
594 
595             for url in &urls {
596                 let network = network_type.to_string();
597                 let is_ipv4 = network_type.is_ipv4();
598                 let url = url.clone();
599                 let net2 = Arc::clone(&net);
600                 let agent_internal2 = Arc::clone(&agent_internal);
601 
602                 let w = wg.worker();
603                 tokio::spawn(async move {
604                     let _d = w;
605 
606                     let host_port = format!("{}:{}", url.host, url.port);
607                     let server_addr = match net2.resolve_addr(is_ipv4, &host_port).await {
608                         Ok(addr) => addr,
609                         Err(err) => {
610                             log::warn!(
611                                 "[{}]: failed to resolve stun host: {}: {}",
612                                 agent_internal2.get_name(),
613                                 host_port,
614                                 err
615                             );
616                             return Ok(());
617                         }
618                     };
619 
620                     let conn: Arc<dyn Conn + Send + Sync> = match listen_udp_in_port_range(
621                         &net2,
622                         port_max,
623                         port_min,
624                         if is_ipv4 {
625                             SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)
626                         } else {
627                             SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0)
628                         },
629                     )
630                     .await
631                     {
632                         Ok(conn) => conn,
633                         Err(err) => {
634                             log::warn!(
635                                 "[{}]: Failed to listen for {}: {}",
636                                 agent_internal2.get_name(),
637                                 server_addr,
638                                 err
639                             );
640                             return Ok(());
641                         }
642                     };
643 
644                     let xoraddr =
645                         match get_xormapped_addr(&conn, server_addr, STUN_GATHER_TIMEOUT).await {
646                             Ok(xoraddr) => xoraddr,
647                             Err(err) => {
648                                 log::warn!(
649                                     "[{}]: could not get server reflexive address {} {}: {}",
650                                     agent_internal2.get_name(),
651                                     network,
652                                     url,
653                                     err
654                                 );
655                                 return Ok(());
656                             }
657                         };
658 
659                     let (ip, port) = (xoraddr.ip, xoraddr.port);
660 
661                     let laddr = conn.local_addr()?;
662                     let srflx_config = CandidateServerReflexiveConfig {
663                         base_config: CandidateBaseConfig {
664                             network: network.clone(),
665                             address: ip.to_string(),
666                             port,
667                             component: COMPONENT_RTP,
668                             conn: Some(conn),
669                             ..CandidateBaseConfig::default()
670                         },
671                         rel_addr: laddr.ip().to_string(),
672                         rel_port: laddr.port(),
673                     };
674 
675                     let candidate: Arc<dyn Candidate + Send + Sync> =
676                         match srflx_config.new_candidate_server_reflexive() {
677                             Ok(candidate) => Arc::new(candidate),
678                             Err(err) => {
679                                 log::warn!(
680                                 "[{}]: Failed to create server reflexive candidate: {} {} {}: {:?}",
681                                 agent_internal2.get_name(),
682                                 network,
683                                 ip,
684                                 port,
685                                 err
686                             );
687                                 return Ok(());
688                             }
689                         };
690 
691                     {
692                         if let Err(err) = agent_internal2.add_candidate(&candidate).await {
693                             if let Err(close_err) = candidate.close().await {
694                                 log::warn!(
695                                     "[{}]: Failed to close candidate: {}",
696                                     agent_internal2.get_name(),
697                                     close_err
698                                 );
699                             }
700                             log::warn!(
701                                 "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}",
702                                 agent_internal2.get_name(),
703                                 err
704                             );
705                         }
706                     }
707 
708                     Result::<()>::Ok(())
709                 });
710             }
711         }
712 
713         wg.wait().await;
714     }
715 
gather_candidates_relay( urls: Vec<Url>, net: Arc<Net>, agent_internal: Arc<AgentInternal>, )716     pub(crate) async fn gather_candidates_relay(
717         urls: Vec<Url>,
718         net: Arc<Net>,
719         agent_internal: Arc<AgentInternal>,
720     ) {
721         let wg = WaitGroup::new();
722 
723         for url in urls {
724             if url.scheme != SchemeType::Turn && url.scheme != SchemeType::Turns {
725                 continue;
726             }
727             if url.username.is_empty() {
728                 log::error!(
729                     "[{}]:Failed to gather relay candidates: {:?}",
730                     agent_internal.get_name(),
731                     Error::ErrUsernameEmpty
732                 );
733                 return;
734             }
735             if url.password.is_empty() {
736                 log::error!(
737                     "[{}]: Failed to gather relay candidates: {:?}",
738                     agent_internal.get_name(),
739                     Error::ErrPasswordEmpty
740                 );
741                 return;
742             }
743 
744             let network = NetworkType::Udp4.to_string();
745             let net2 = Arc::clone(&net);
746             let agent_internal2 = Arc::clone(&agent_internal);
747 
748             let w = wg.worker();
749             tokio::spawn(async move {
750                 let _d = w;
751 
752                 let turn_server_addr = format!("{}:{}", url.host, url.port);
753 
754                 let (loc_conn, rel_addr, rel_port) =
755                     if url.proto == ProtoType::Udp && url.scheme == SchemeType::Turn {
756                         let loc_conn = match net2.bind(SocketAddr::from_str("0.0.0.0:0")?).await {
757                             Ok(c) => c,
758                             Err(err) => {
759                                 log::warn!(
760                                     "[{}]: Failed to listen due to error: {}",
761                                     agent_internal2.get_name(),
762                                     err
763                                 );
764                                 return Ok(());
765                             }
766                         };
767 
768                         let local_addr = loc_conn.local_addr()?;
769                         let rel_addr = local_addr.ip().to_string();
770                         let rel_port = local_addr.port();
771                         (loc_conn, rel_addr, rel_port)
772                     /*TODO: case url.proto == ProtoType::UDP && url.scheme == SchemeType::TURNS{
773                     case a.proxyDialer != nil && url.Proto == ProtoTypeTCP && (url.Scheme == SchemeTypeTURN || url.Scheme == SchemeTypeTURNS):
774                     case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURN:
775                     case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURNS:*/
776                     } else {
777                         log::warn!(
778                             "[{}]: Unable to handle URL in gather_candidates_relay {}",
779                             agent_internal2.get_name(),
780                             url
781                         );
782                         return Ok(());
783                     };
784 
785                 let cfg = turn::client::ClientConfig {
786                     stun_serv_addr: String::new(),
787                     turn_serv_addr: turn_server_addr.clone(),
788                     username: url.username,
789                     password: url.password,
790                     realm: String::new(),
791                     software: String::new(),
792                     rto_in_ms: 0,
793                     conn: loc_conn,
794                     vnet: Some(Arc::clone(&net2)),
795                 };
796                 let client = match turn::client::Client::new(cfg).await {
797                     Ok(client) => Arc::new(client),
798                     Err(err) => {
799                         log::warn!(
800                             "[{}]: Failed to build new turn.Client {} {}\n",
801                             agent_internal2.get_name(),
802                             turn_server_addr,
803                             err
804                         );
805                         return Ok(());
806                     }
807                 };
808                 if let Err(err) = client.listen().await {
809                     let _ = client.close().await;
810                     log::warn!(
811                         "[{}]: Failed to listen on turn.Client {} {}",
812                         agent_internal2.get_name(),
813                         turn_server_addr,
814                         err
815                     );
816                     return Ok(());
817                 }
818 
819                 let relay_conn = match client.allocate().await {
820                     Ok(conn) => conn,
821                     Err(err) => {
822                         let _ = client.close().await;
823                         log::warn!(
824                             "[{}]: Failed to allocate on turn.Client {} {}",
825                             agent_internal2.get_name(),
826                             turn_server_addr,
827                             err
828                         );
829                         return Ok(());
830                     }
831                 };
832 
833                 let raddr = relay_conn.local_addr()?;
834                 let relay_config = CandidateRelayConfig {
835                     base_config: CandidateBaseConfig {
836                         network: network.clone(),
837                         address: raddr.ip().to_string(),
838                         port: raddr.port(),
839                         component: COMPONENT_RTP,
840                         conn: Some(Arc::new(relay_conn)),
841                         ..CandidateBaseConfig::default()
842                     },
843                     rel_addr,
844                     rel_port,
845                     relay_client: Some(Arc::clone(&client)),
846                 };
847 
848                 let candidate: Arc<dyn Candidate + Send + Sync> =
849                     match relay_config.new_candidate_relay() {
850                         Ok(candidate) => Arc::new(candidate),
851                         Err(err) => {
852                             let _ = client.close().await;
853                             log::warn!(
854                                 "[{}]: Failed to create relay candidate: {} {}: {}",
855                                 agent_internal2.get_name(),
856                                 network,
857                                 raddr,
858                                 err
859                             );
860                             return Ok(());
861                         }
862                     };
863 
864                 {
865                     if let Err(err) = agent_internal2.add_candidate(&candidate).await {
866                         if let Err(close_err) = candidate.close().await {
867                             log::warn!(
868                                 "[{}]: Failed to close candidate: {}",
869                                 agent_internal2.get_name(),
870                                 close_err
871                             );
872                         }
873                         log::warn!(
874                             "[{}]: Failed to append to localCandidates and run onCandidateHdlr: {}",
875                             agent_internal2.get_name(),
876                             err
877                         );
878                     }
879                 }
880 
881                 Result::<()>::Ok(())
882             });
883         }
884 
885         wg.wait().await;
886     }
887 }
888