xref: /webrtc/ice/src/agent/mod.rs (revision 2e07f543)
1 #[cfg(test)]
2 mod agent_gather_test;
3 #[cfg(test)]
4 mod agent_test;
5 #[cfg(test)]
6 mod agent_transport_test;
7 #[cfg(test)]
8 pub(crate) mod agent_vnet_test;
9 
10 pub mod agent_config;
11 pub mod agent_gather;
12 pub(crate) mod agent_internal;
13 pub mod agent_selector;
14 pub mod agent_stats;
15 pub mod agent_transport;
16 
17 use crate::candidate::*;
18 use crate::error::*;
19 use crate::external_ip_mapper::*;
20 use crate::mdns::*;
21 use crate::network_type::*;
22 use crate::state::*;
23 use crate::udp_mux::UDPMux;
24 use crate::udp_network::UDPNetwork;
25 use crate::url::*;
26 use agent_config::*;
27 use agent_internal::*;
28 use agent_stats::*;
29 
30 use mdns::conn::*;
31 use std::collections::HashMap;
32 use std::net::{Ipv4Addr, SocketAddr};
33 use stun::{agent::*, attributes::*, fingerprint::*, integrity::*, message::*, xoraddr::*};
34 use util::{vnet::net::*, Buffer};
35 
36 use crate::agent::agent_gather::GatherCandidatesInternalParams;
37 use crate::rand::*;
38 use crate::tcp_type::TcpType;
39 use std::future::Future;
40 use std::pin::Pin;
41 use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
42 use std::sync::Arc;
43 use std::time::SystemTime;
44 use tokio::sync::{broadcast, mpsc, Mutex};
45 use tokio::time::{Duration, Instant};
46 
47 #[derive(Debug, Clone)]
48 pub(crate) struct BindingRequest {
49     pub(crate) timestamp: Instant,
50     pub(crate) transaction_id: TransactionId,
51     pub(crate) destination: SocketAddr,
52     pub(crate) is_use_candidate: bool,
53 }
54 
55 impl Default for BindingRequest {
default() -> Self56     fn default() -> Self {
57         Self {
58             timestamp: Instant::now(),
59             transaction_id: TransactionId::default(),
60             destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0),
61             is_use_candidate: false,
62         }
63     }
64 }
65 
66 pub type OnConnectionStateChangeHdlrFn = Box<
67     dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
68         + Send
69         + Sync,
70 >;
71 pub type OnSelectedCandidatePairChangeHdlrFn = Box<
72     dyn (FnMut(
73             &Arc<dyn Candidate + Send + Sync>,
74             &Arc<dyn Candidate + Send + Sync>,
75         ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
76         + Send
77         + Sync,
78 >;
79 pub type OnCandidateHdlrFn = Box<
80     dyn (FnMut(
81             Option<Arc<dyn Candidate + Send + Sync>>,
82         ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
83         + Send
84         + Sync,
85 >;
86 pub type GatherCandidateCancelFn = Box<dyn Fn() + Send + Sync>;
87 
88 struct ChanReceivers {
89     chan_state_rx: mpsc::Receiver<ConnectionState>,
90     chan_candidate_rx: mpsc::Receiver<Option<Arc<dyn Candidate + Send + Sync>>>,
91     chan_candidate_pair_rx: mpsc::Receiver<()>,
92 }
93 
94 /// Represents the ICE agent.
95 pub struct Agent {
96     pub(crate) internal: Arc<AgentInternal>,
97 
98     pub(crate) udp_network: UDPNetwork,
99     pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>,
100     pub(crate) ip_filter: Arc<Option<IpFilterFn>>,
101     pub(crate) mdns_mode: MulticastDnsMode,
102     pub(crate) mdns_name: String,
103     pub(crate) mdns_conn: Option<Arc<DnsConn>>,
104     pub(crate) net: Arc<Net>,
105 
106     // 1:1 D-NAT IP address mapping
107     pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>,
108     pub(crate) gathering_state: Arc<AtomicU8>, //GatheringState,
109     pub(crate) candidate_types: Vec<CandidateType>,
110     pub(crate) urls: Vec<Url>,
111     pub(crate) network_types: Vec<NetworkType>,
112 
113     pub(crate) gather_candidate_cancel: Option<GatherCandidateCancelFn>,
114 }
115 
116 impl Agent {
117     /// Creates a new Agent.
new(config: AgentConfig) -> Result<Self>118     pub async fn new(config: AgentConfig) -> Result<Self> {
119         let mut mdns_name = config.multicast_dns_host_name.clone();
120         if mdns_name.is_empty() {
121             mdns_name = generate_multicast_dns_name();
122         }
123 
124         if !mdns_name.ends_with(".local") || mdns_name.split('.').count() != 2 {
125             return Err(Error::ErrInvalidMulticastDnshostName);
126         }
127 
128         let mdns_mode = config.multicast_dns_mode;
129 
130         let mdns_conn =
131             match create_multicast_dns(mdns_mode, &mdns_name, &config.multicast_dns_dest_addr) {
132                 Ok(c) => c,
133                 Err(err) => {
134                     // Opportunistic mDNS: If we can't open the connection, that's ok: we
135                     // can continue without it.
136                     log::warn!("Failed to initialize mDNS {}: {}", mdns_name, err);
137                     None
138                 }
139             };
140 
141         let (mut ai, chan_receivers) = AgentInternal::new(&config);
142         let (chan_state_rx, chan_candidate_rx, chan_candidate_pair_rx) = (
143             chan_receivers.chan_state_rx,
144             chan_receivers.chan_candidate_rx,
145             chan_receivers.chan_candidate_pair_rx,
146         );
147 
148         config.init_with_defaults(&mut ai);
149 
150         let candidate_types = if config.candidate_types.is_empty() {
151             default_candidate_types()
152         } else {
153             config.candidate_types.clone()
154         };
155 
156         if ai.lite.load(Ordering::SeqCst)
157             && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host)
158         {
159             Self::close_multicast_conn(&mdns_conn).await;
160             return Err(Error::ErrLiteUsingNonHostCandidates);
161         }
162 
163         if !config.urls.is_empty()
164             && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types)
165             && !contains_candidate_type(CandidateType::Relay, &candidate_types)
166         {
167             Self::close_multicast_conn(&mdns_conn).await;
168             return Err(Error::ErrUselessUrlsProvided);
169         }
170 
171         let ext_ip_mapper = match config.init_ext_ip_mapping(mdns_mode, &candidate_types) {
172             Ok(ext_ip_mapper) => ext_ip_mapper,
173             Err(err) => {
174                 Self::close_multicast_conn(&mdns_conn).await;
175                 return Err(err);
176             }
177         };
178 
179         let net = if let Some(net) = config.net {
180             if net.is_virtual() {
181                 log::warn!("vnet is enabled");
182                 if mdns_mode != MulticastDnsMode::Disabled {
183                     log::warn!("vnet does not support mDNS yet");
184                 }
185             }
186 
187             net
188         } else {
189             Arc::new(Net::new(None))
190         };
191 
192         let agent = Self {
193             udp_network: config.udp_network,
194             internal: Arc::new(ai),
195             interface_filter: Arc::clone(&config.interface_filter),
196             ip_filter: Arc::clone(&config.ip_filter),
197             mdns_mode,
198             mdns_name,
199             mdns_conn,
200             net,
201             ext_ip_mapper: Arc::new(ext_ip_mapper),
202             gathering_state: Arc::new(AtomicU8::new(0)), //GatheringState::New,
203             candidate_types,
204             urls: config.urls.clone(),
205             network_types: config.network_types.clone(),
206 
207             gather_candidate_cancel: None, //TODO: add cancel
208         };
209 
210         agent.internal.start_on_connection_state_change_routine(
211             chan_state_rx,
212             chan_candidate_rx,
213             chan_candidate_pair_rx,
214         );
215 
216         // Restart is also used to initialize the agent for the first time
217         if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd).await {
218             Self::close_multicast_conn(&agent.mdns_conn).await;
219             let _ = agent.close().await;
220             return Err(err);
221         }
222 
223         Ok(agent)
224     }
225 
get_bytes_received(&self) -> usize226     pub fn get_bytes_received(&self) -> usize {
227         self.internal.agent_conn.bytes_received()
228     }
229 
get_bytes_sent(&self) -> usize230     pub fn get_bytes_sent(&self) -> usize {
231         self.internal.agent_conn.bytes_sent()
232     }
233 
234     /// Sets a handler that is fired when the connection state changes.
on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn)235     pub fn on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn) {
236         self.internal
237             .on_connection_state_change_hdlr
238             .store(Some(Arc::new(Mutex::new(f))))
239     }
240 
241     /// Sets a handler that is fired when the final candidate pair is selected.
on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn)242     pub fn on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn) {
243         self.internal
244             .on_selected_candidate_pair_change_hdlr
245             .store(Some(Arc::new(Mutex::new(f))))
246     }
247 
248     /// Sets a handler that is fired when new candidates gathered. When the gathering process
249     /// complete the last candidate is nil.
on_candidate(&self, f: OnCandidateHdlrFn)250     pub fn on_candidate(&self, f: OnCandidateHdlrFn) {
251         self.internal
252             .on_candidate_hdlr
253             .store(Some(Arc::new(Mutex::new(f))));
254     }
255 
256     /// Adds a new remote candidate.
add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()>257     pub fn add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()> {
258         // cannot check for network yet because it might not be applied
259         // when mDNS hostame is used.
260         if c.tcp_type() == TcpType::Active {
261             // TCP Candidates with tcptype active will probe server passive ones, so
262             // no need to do anything with them.
263             log::info!("Ignoring remote candidate with tcpType active: {}", c);
264             return Ok(());
265         }
266 
267         // If we have a mDNS Candidate lets fully resolve it before adding it locally
268         if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") {
269             if self.mdns_mode == MulticastDnsMode::Disabled {
270                 log::warn!(
271                     "remote mDNS candidate added, but mDNS is disabled: ({})",
272                     c.address()
273                 );
274                 return Ok(());
275             }
276 
277             if c.candidate_type() != CandidateType::Host {
278                 return Err(Error::ErrAddressParseFailed);
279             }
280 
281             let ai = Arc::clone(&self.internal);
282             let host_candidate = Arc::clone(c);
283             let mdns_conn = self.mdns_conn.clone();
284             tokio::spawn(async move {
285                 if let Some(mdns_conn) = mdns_conn {
286                     if let Ok(candidate) =
287                         Self::resolve_and_add_multicast_candidate(mdns_conn, host_candidate).await
288                     {
289                         ai.add_remote_candidate(&candidate).await;
290                     }
291                 }
292             });
293         } else {
294             let ai = Arc::clone(&self.internal);
295             let candidate = Arc::clone(c);
296             tokio::spawn(async move {
297                 ai.add_remote_candidate(&candidate).await;
298             });
299         }
300 
301         Ok(())
302     }
303 
304     /// Returns the local candidates.
get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>>305     pub async fn get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>> {
306         let mut res = vec![];
307 
308         {
309             let local_candidates = self.internal.local_candidates.lock().await;
310             for candidates in local_candidates.values() {
311                 for candidate in candidates {
312                     res.push(Arc::clone(candidate));
313                 }
314             }
315         }
316 
317         Ok(res)
318     }
319 
320     /// Returns the local user credentials.
get_local_user_credentials(&self) -> (String, String)321     pub async fn get_local_user_credentials(&self) -> (String, String) {
322         let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
323         (ufrag_pwd.local_ufrag.clone(), ufrag_pwd.local_pwd.clone())
324     }
325 
326     /// Returns the remote user credentials.
get_remote_user_credentials(&self) -> (String, String)327     pub async fn get_remote_user_credentials(&self) -> (String, String) {
328         let ufrag_pwd = self.internal.ufrag_pwd.lock().await;
329         (ufrag_pwd.remote_ufrag.clone(), ufrag_pwd.remote_pwd.clone())
330     }
331 
332     /// Cleans up the Agent.
close(&self) -> Result<()>333     pub async fn close(&self) -> Result<()> {
334         if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
335             gather_candidate_cancel();
336         }
337 
338         if let UDPNetwork::Muxed(ref udp_mux) = self.udp_network {
339             let (ufrag, _) = self.get_local_user_credentials().await;
340             udp_mux.remove_conn_by_ufrag(&ufrag).await;
341         }
342 
343         //FIXME: deadlock here
344         self.internal.close().await
345     }
346 
347     /// Returns the selected pair or nil if there is none
get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>>348     pub fn get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>> {
349         self.internal.agent_conn.get_selected_pair()
350     }
351 
352     /// Sets the credentials of the remote agent.
set_remote_credentials( &self, remote_ufrag: String, remote_pwd: String, ) -> Result<()>353     pub async fn set_remote_credentials(
354         &self,
355         remote_ufrag: String,
356         remote_pwd: String,
357     ) -> Result<()> {
358         self.internal
359             .set_remote_credentials(remote_ufrag, remote_pwd)
360             .await
361     }
362 
363     /// Restarts the ICE Agent with the provided ufrag/pwd
364     /// If no ufrag/pwd is provided the Agent will generate one itself.
365     ///
366     /// Restart must only be called when `GatheringState` is `GatheringStateComplete`
367     /// a user must then call `GatherCandidates` explicitly to start generating new ones.
restart(&self, mut ufrag: String, mut pwd: String) -> Result<()>368     pub async fn restart(&self, mut ufrag: String, mut pwd: String) -> Result<()> {
369         if ufrag.is_empty() {
370             ufrag = generate_ufrag();
371         }
372         if pwd.is_empty() {
373             pwd = generate_pwd();
374         }
375 
376         if ufrag.len() * 8 < 24 {
377             return Err(Error::ErrLocalUfragInsufficientBits);
378         }
379         if pwd.len() * 8 < 128 {
380             return Err(Error::ErrLocalPwdInsufficientBits);
381         }
382 
383         if GatheringState::from(self.gathering_state.load(Ordering::SeqCst))
384             == GatheringState::Gathering
385         {
386             return Err(Error::ErrRestartWhenGathering);
387         }
388         self.gathering_state
389             .store(GatheringState::New as u8, Ordering::SeqCst);
390 
391         {
392             let done_tx = self.internal.done_tx.lock().await;
393             if done_tx.is_none() {
394                 return Err(Error::ErrClosed);
395             }
396         }
397 
398         // Clear all agent needed to take back to fresh state
399         {
400             let mut ufrag_pwd = self.internal.ufrag_pwd.lock().await;
401             ufrag_pwd.local_ufrag = ufrag;
402             ufrag_pwd.local_pwd = pwd;
403             ufrag_pwd.remote_ufrag = String::new();
404             ufrag_pwd.remote_pwd = String::new();
405         }
406         {
407             let mut pending_binding_requests = self.internal.pending_binding_requests.lock().await;
408             *pending_binding_requests = vec![];
409         }
410 
411         {
412             let mut checklist = self.internal.agent_conn.checklist.lock().await;
413             *checklist = vec![];
414         }
415 
416         self.internal.set_selected_pair(None).await;
417         self.internal.delete_all_candidates().await;
418         self.internal.start().await;
419 
420         // Restart is used by NewAgent. Accept/Connect should be used to move to checking
421         // for new Agents
422         if self.internal.connection_state.load(Ordering::SeqCst) != ConnectionState::New as u8 {
423             self.internal
424                 .update_connection_state(ConnectionState::Checking)
425                 .await;
426         }
427 
428         Ok(())
429     }
430 
431     /// Initiates the trickle based gathering process.
gather_candidates(&self) -> Result<()>432     pub fn gather_candidates(&self) -> Result<()> {
433         if self.gathering_state.load(Ordering::SeqCst) != GatheringState::New as u8 {
434             return Err(Error::ErrMultipleGatherAttempted);
435         }
436 
437         if self.internal.on_candidate_hdlr.load().is_none() {
438             return Err(Error::ErrNoOnCandidateHandler);
439         }
440 
441         if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel {
442             gather_candidate_cancel(); // Cancel previous gathering routine
443         }
444 
445         //TODO: a.gatherCandidateCancel = cancel
446 
447         let params = GatherCandidatesInternalParams {
448             udp_network: self.udp_network.clone(),
449             candidate_types: self.candidate_types.clone(),
450             urls: self.urls.clone(),
451             network_types: self.network_types.clone(),
452             mdns_mode: self.mdns_mode,
453             mdns_name: self.mdns_name.clone(),
454             net: Arc::clone(&self.net),
455             interface_filter: self.interface_filter.clone(),
456             ip_filter: self.ip_filter.clone(),
457             ext_ip_mapper: Arc::clone(&self.ext_ip_mapper),
458             agent_internal: Arc::clone(&self.internal),
459             gathering_state: Arc::clone(&self.gathering_state),
460             chan_candidate_tx: Arc::clone(&self.internal.chan_candidate_tx),
461         };
462         tokio::spawn(async move {
463             Self::gather_candidates_internal(params).await;
464         });
465 
466         Ok(())
467     }
468 
469     /// Returns a list of candidate pair stats.
get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats>470     pub async fn get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats> {
471         self.internal.get_candidate_pairs_stats().await
472     }
473 
474     /// Returns a list of local candidates stats.
get_local_candidates_stats(&self) -> Vec<CandidateStats>475     pub async fn get_local_candidates_stats(&self) -> Vec<CandidateStats> {
476         self.internal.get_local_candidates_stats().await
477     }
478 
479     /// Returns a list of remote candidates stats.
get_remote_candidates_stats(&self) -> Vec<CandidateStats>480     pub async fn get_remote_candidates_stats(&self) -> Vec<CandidateStats> {
481         self.internal.get_remote_candidates_stats().await
482     }
483 
resolve_and_add_multicast_candidate( mdns_conn: Arc<DnsConn>, c: Arc<dyn Candidate + Send + Sync>, ) -> Result<Arc<dyn Candidate + Send + Sync>>484     async fn resolve_and_add_multicast_candidate(
485         mdns_conn: Arc<DnsConn>,
486         c: Arc<dyn Candidate + Send + Sync>,
487     ) -> Result<Arc<dyn Candidate + Send + Sync>> {
488         //TODO: hook up _close_query_signal_tx to Agent or Candidate's Close signal?
489         let (_close_query_signal_tx, close_query_signal_rx) = mpsc::channel(1);
490         let src = match mdns_conn.query(&c.address(), close_query_signal_rx).await {
491             Ok((_, src)) => src,
492             Err(err) => {
493                 log::warn!("Failed to discover mDNS candidate {}: {}", c.address(), err);
494                 return Err(err.into());
495             }
496         };
497 
498         c.set_ip(&src.ip())?;
499 
500         Ok(c)
501     }
502 
close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>)503     async fn close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>) {
504         if let Some(conn) = mdns_conn {
505             if let Err(err) = conn.close().await {
506                 log::warn!("failed to close mDNS Conn: {}", err);
507             }
508         }
509     }
510 }
511