1 #[cfg(test)] 2 mod agent_gather_test; 3 #[cfg(test)] 4 mod agent_test; 5 #[cfg(test)] 6 mod agent_transport_test; 7 #[cfg(test)] 8 pub(crate) mod agent_vnet_test; 9 10 pub mod agent_config; 11 pub mod agent_gather; 12 pub(crate) mod agent_internal; 13 pub mod agent_selector; 14 pub mod agent_stats; 15 pub mod agent_transport; 16 17 use crate::candidate::*; 18 use crate::error::*; 19 use crate::external_ip_mapper::*; 20 use crate::mdns::*; 21 use crate::network_type::*; 22 use crate::state::*; 23 use crate::udp_mux::UDPMux; 24 use crate::udp_network::UDPNetwork; 25 use crate::url::*; 26 use agent_config::*; 27 use agent_internal::*; 28 use agent_stats::*; 29 30 use mdns::conn::*; 31 use std::collections::HashMap; 32 use std::net::{Ipv4Addr, SocketAddr}; 33 use stun::{agent::*, attributes::*, fingerprint::*, integrity::*, message::*, xoraddr::*}; 34 use util::{vnet::net::*, Buffer}; 35 36 use crate::agent::agent_gather::GatherCandidatesInternalParams; 37 use crate::rand::*; 38 use crate::tcp_type::TcpType; 39 use std::future::Future; 40 use std::pin::Pin; 41 use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; 42 use std::sync::Arc; 43 use std::time::SystemTime; 44 use tokio::sync::{broadcast, mpsc, Mutex}; 45 use tokio::time::{Duration, Instant}; 46 47 #[derive(Debug, Clone)] 48 pub(crate) struct BindingRequest { 49 pub(crate) timestamp: Instant, 50 pub(crate) transaction_id: TransactionId, 51 pub(crate) destination: SocketAddr, 52 pub(crate) is_use_candidate: bool, 53 } 54 55 impl Default for BindingRequest { default() -> Self56 fn default() -> Self { 57 Self { 58 timestamp: Instant::now(), 59 transaction_id: TransactionId::default(), 60 destination: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), 61 is_use_candidate: false, 62 } 63 } 64 } 65 66 pub type OnConnectionStateChangeHdlrFn = Box< 67 dyn (FnMut(ConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) 68 + Send 69 + Sync, 70 >; 71 pub type OnSelectedCandidatePairChangeHdlrFn = Box< 72 dyn (FnMut( 73 &Arc<dyn Candidate + Send + Sync>, 74 &Arc<dyn Candidate + Send + Sync>, 75 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) 76 + Send 77 + Sync, 78 >; 79 pub type OnCandidateHdlrFn = Box< 80 dyn (FnMut( 81 Option<Arc<dyn Candidate + Send + Sync>>, 82 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) 83 + Send 84 + Sync, 85 >; 86 pub type GatherCandidateCancelFn = Box<dyn Fn() + Send + Sync>; 87 88 struct ChanReceivers { 89 chan_state_rx: mpsc::Receiver<ConnectionState>, 90 chan_candidate_rx: mpsc::Receiver<Option<Arc<dyn Candidate + Send + Sync>>>, 91 chan_candidate_pair_rx: mpsc::Receiver<()>, 92 } 93 94 /// Represents the ICE agent. 95 pub struct Agent { 96 pub(crate) internal: Arc<AgentInternal>, 97 98 pub(crate) udp_network: UDPNetwork, 99 pub(crate) interface_filter: Arc<Option<InterfaceFilterFn>>, 100 pub(crate) ip_filter: Arc<Option<IpFilterFn>>, 101 pub(crate) mdns_mode: MulticastDnsMode, 102 pub(crate) mdns_name: String, 103 pub(crate) mdns_conn: Option<Arc<DnsConn>>, 104 pub(crate) net: Arc<Net>, 105 106 // 1:1 D-NAT IP address mapping 107 pub(crate) ext_ip_mapper: Arc<Option<ExternalIpMapper>>, 108 pub(crate) gathering_state: Arc<AtomicU8>, //GatheringState, 109 pub(crate) candidate_types: Vec<CandidateType>, 110 pub(crate) urls: Vec<Url>, 111 pub(crate) network_types: Vec<NetworkType>, 112 113 pub(crate) gather_candidate_cancel: Option<GatherCandidateCancelFn>, 114 } 115 116 impl Agent { 117 /// Creates a new Agent. new(config: AgentConfig) -> Result<Self>118 pub async fn new(config: AgentConfig) -> Result<Self> { 119 let mut mdns_name = config.multicast_dns_host_name.clone(); 120 if mdns_name.is_empty() { 121 mdns_name = generate_multicast_dns_name(); 122 } 123 124 if !mdns_name.ends_with(".local") || mdns_name.split('.').count() != 2 { 125 return Err(Error::ErrInvalidMulticastDnshostName); 126 } 127 128 let mdns_mode = config.multicast_dns_mode; 129 130 let mdns_conn = 131 match create_multicast_dns(mdns_mode, &mdns_name, &config.multicast_dns_dest_addr) { 132 Ok(c) => c, 133 Err(err) => { 134 // Opportunistic mDNS: If we can't open the connection, that's ok: we 135 // can continue without it. 136 log::warn!("Failed to initialize mDNS {}: {}", mdns_name, err); 137 None 138 } 139 }; 140 141 let (mut ai, chan_receivers) = AgentInternal::new(&config); 142 let (chan_state_rx, chan_candidate_rx, chan_candidate_pair_rx) = ( 143 chan_receivers.chan_state_rx, 144 chan_receivers.chan_candidate_rx, 145 chan_receivers.chan_candidate_pair_rx, 146 ); 147 148 config.init_with_defaults(&mut ai); 149 150 let candidate_types = if config.candidate_types.is_empty() { 151 default_candidate_types() 152 } else { 153 config.candidate_types.clone() 154 }; 155 156 if ai.lite.load(Ordering::SeqCst) 157 && (candidate_types.len() != 1 || candidate_types[0] != CandidateType::Host) 158 { 159 Self::close_multicast_conn(&mdns_conn).await; 160 return Err(Error::ErrLiteUsingNonHostCandidates); 161 } 162 163 if !config.urls.is_empty() 164 && !contains_candidate_type(CandidateType::ServerReflexive, &candidate_types) 165 && !contains_candidate_type(CandidateType::Relay, &candidate_types) 166 { 167 Self::close_multicast_conn(&mdns_conn).await; 168 return Err(Error::ErrUselessUrlsProvided); 169 } 170 171 let ext_ip_mapper = match config.init_ext_ip_mapping(mdns_mode, &candidate_types) { 172 Ok(ext_ip_mapper) => ext_ip_mapper, 173 Err(err) => { 174 Self::close_multicast_conn(&mdns_conn).await; 175 return Err(err); 176 } 177 }; 178 179 let net = if let Some(net) = config.net { 180 if net.is_virtual() { 181 log::warn!("vnet is enabled"); 182 if mdns_mode != MulticastDnsMode::Disabled { 183 log::warn!("vnet does not support mDNS yet"); 184 } 185 } 186 187 net 188 } else { 189 Arc::new(Net::new(None)) 190 }; 191 192 let agent = Self { 193 udp_network: config.udp_network, 194 internal: Arc::new(ai), 195 interface_filter: Arc::clone(&config.interface_filter), 196 ip_filter: Arc::clone(&config.ip_filter), 197 mdns_mode, 198 mdns_name, 199 mdns_conn, 200 net, 201 ext_ip_mapper: Arc::new(ext_ip_mapper), 202 gathering_state: Arc::new(AtomicU8::new(0)), //GatheringState::New, 203 candidate_types, 204 urls: config.urls.clone(), 205 network_types: config.network_types.clone(), 206 207 gather_candidate_cancel: None, //TODO: add cancel 208 }; 209 210 agent.internal.start_on_connection_state_change_routine( 211 chan_state_rx, 212 chan_candidate_rx, 213 chan_candidate_pair_rx, 214 ); 215 216 // Restart is also used to initialize the agent for the first time 217 if let Err(err) = agent.restart(config.local_ufrag, config.local_pwd).await { 218 Self::close_multicast_conn(&agent.mdns_conn).await; 219 let _ = agent.close().await; 220 return Err(err); 221 } 222 223 Ok(agent) 224 } 225 get_bytes_received(&self) -> usize226 pub fn get_bytes_received(&self) -> usize { 227 self.internal.agent_conn.bytes_received() 228 } 229 get_bytes_sent(&self) -> usize230 pub fn get_bytes_sent(&self) -> usize { 231 self.internal.agent_conn.bytes_sent() 232 } 233 234 /// Sets a handler that is fired when the connection state changes. on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn)235 pub fn on_connection_state_change(&self, f: OnConnectionStateChangeHdlrFn) { 236 self.internal 237 .on_connection_state_change_hdlr 238 .store(Some(Arc::new(Mutex::new(f)))) 239 } 240 241 /// Sets a handler that is fired when the final candidate pair is selected. on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn)242 pub fn on_selected_candidate_pair_change(&self, f: OnSelectedCandidatePairChangeHdlrFn) { 243 self.internal 244 .on_selected_candidate_pair_change_hdlr 245 .store(Some(Arc::new(Mutex::new(f)))) 246 } 247 248 /// Sets a handler that is fired when new candidates gathered. When the gathering process 249 /// complete the last candidate is nil. on_candidate(&self, f: OnCandidateHdlrFn)250 pub fn on_candidate(&self, f: OnCandidateHdlrFn) { 251 self.internal 252 .on_candidate_hdlr 253 .store(Some(Arc::new(Mutex::new(f)))); 254 } 255 256 /// Adds a new remote candidate. add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()>257 pub fn add_remote_candidate(&self, c: &Arc<dyn Candidate + Send + Sync>) -> Result<()> { 258 // cannot check for network yet because it might not be applied 259 // when mDNS hostame is used. 260 if c.tcp_type() == TcpType::Active { 261 // TCP Candidates with tcptype active will probe server passive ones, so 262 // no need to do anything with them. 263 log::info!("Ignoring remote candidate with tcpType active: {}", c); 264 return Ok(()); 265 } 266 267 // If we have a mDNS Candidate lets fully resolve it before adding it locally 268 if c.candidate_type() == CandidateType::Host && c.address().ends_with(".local") { 269 if self.mdns_mode == MulticastDnsMode::Disabled { 270 log::warn!( 271 "remote mDNS candidate added, but mDNS is disabled: ({})", 272 c.address() 273 ); 274 return Ok(()); 275 } 276 277 if c.candidate_type() != CandidateType::Host { 278 return Err(Error::ErrAddressParseFailed); 279 } 280 281 let ai = Arc::clone(&self.internal); 282 let host_candidate = Arc::clone(c); 283 let mdns_conn = self.mdns_conn.clone(); 284 tokio::spawn(async move { 285 if let Some(mdns_conn) = mdns_conn { 286 if let Ok(candidate) = 287 Self::resolve_and_add_multicast_candidate(mdns_conn, host_candidate).await 288 { 289 ai.add_remote_candidate(&candidate).await; 290 } 291 } 292 }); 293 } else { 294 let ai = Arc::clone(&self.internal); 295 let candidate = Arc::clone(c); 296 tokio::spawn(async move { 297 ai.add_remote_candidate(&candidate).await; 298 }); 299 } 300 301 Ok(()) 302 } 303 304 /// Returns the local candidates. get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>>305 pub async fn get_local_candidates(&self) -> Result<Vec<Arc<dyn Candidate + Send + Sync>>> { 306 let mut res = vec![]; 307 308 { 309 let local_candidates = self.internal.local_candidates.lock().await; 310 for candidates in local_candidates.values() { 311 for candidate in candidates { 312 res.push(Arc::clone(candidate)); 313 } 314 } 315 } 316 317 Ok(res) 318 } 319 320 /// Returns the local user credentials. get_local_user_credentials(&self) -> (String, String)321 pub async fn get_local_user_credentials(&self) -> (String, String) { 322 let ufrag_pwd = self.internal.ufrag_pwd.lock().await; 323 (ufrag_pwd.local_ufrag.clone(), ufrag_pwd.local_pwd.clone()) 324 } 325 326 /// Returns the remote user credentials. get_remote_user_credentials(&self) -> (String, String)327 pub async fn get_remote_user_credentials(&self) -> (String, String) { 328 let ufrag_pwd = self.internal.ufrag_pwd.lock().await; 329 (ufrag_pwd.remote_ufrag.clone(), ufrag_pwd.remote_pwd.clone()) 330 } 331 332 /// Cleans up the Agent. close(&self) -> Result<()>333 pub async fn close(&self) -> Result<()> { 334 if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel { 335 gather_candidate_cancel(); 336 } 337 338 if let UDPNetwork::Muxed(ref udp_mux) = self.udp_network { 339 let (ufrag, _) = self.get_local_user_credentials().await; 340 udp_mux.remove_conn_by_ufrag(&ufrag).await; 341 } 342 343 //FIXME: deadlock here 344 self.internal.close().await 345 } 346 347 /// Returns the selected pair or nil if there is none get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>>348 pub fn get_selected_candidate_pair(&self) -> Option<Arc<CandidatePair>> { 349 self.internal.agent_conn.get_selected_pair() 350 } 351 352 /// Sets the credentials of the remote agent. set_remote_credentials( &self, remote_ufrag: String, remote_pwd: String, ) -> Result<()>353 pub async fn set_remote_credentials( 354 &self, 355 remote_ufrag: String, 356 remote_pwd: String, 357 ) -> Result<()> { 358 self.internal 359 .set_remote_credentials(remote_ufrag, remote_pwd) 360 .await 361 } 362 363 /// Restarts the ICE Agent with the provided ufrag/pwd 364 /// If no ufrag/pwd is provided the Agent will generate one itself. 365 /// 366 /// Restart must only be called when `GatheringState` is `GatheringStateComplete` 367 /// a user must then call `GatherCandidates` explicitly to start generating new ones. restart(&self, mut ufrag: String, mut pwd: String) -> Result<()>368 pub async fn restart(&self, mut ufrag: String, mut pwd: String) -> Result<()> { 369 if ufrag.is_empty() { 370 ufrag = generate_ufrag(); 371 } 372 if pwd.is_empty() { 373 pwd = generate_pwd(); 374 } 375 376 if ufrag.len() * 8 < 24 { 377 return Err(Error::ErrLocalUfragInsufficientBits); 378 } 379 if pwd.len() * 8 < 128 { 380 return Err(Error::ErrLocalPwdInsufficientBits); 381 } 382 383 if GatheringState::from(self.gathering_state.load(Ordering::SeqCst)) 384 == GatheringState::Gathering 385 { 386 return Err(Error::ErrRestartWhenGathering); 387 } 388 self.gathering_state 389 .store(GatheringState::New as u8, Ordering::SeqCst); 390 391 { 392 let done_tx = self.internal.done_tx.lock().await; 393 if done_tx.is_none() { 394 return Err(Error::ErrClosed); 395 } 396 } 397 398 // Clear all agent needed to take back to fresh state 399 { 400 let mut ufrag_pwd = self.internal.ufrag_pwd.lock().await; 401 ufrag_pwd.local_ufrag = ufrag; 402 ufrag_pwd.local_pwd = pwd; 403 ufrag_pwd.remote_ufrag = String::new(); 404 ufrag_pwd.remote_pwd = String::new(); 405 } 406 { 407 let mut pending_binding_requests = self.internal.pending_binding_requests.lock().await; 408 *pending_binding_requests = vec![]; 409 } 410 411 { 412 let mut checklist = self.internal.agent_conn.checklist.lock().await; 413 *checklist = vec![]; 414 } 415 416 self.internal.set_selected_pair(None).await; 417 self.internal.delete_all_candidates().await; 418 self.internal.start().await; 419 420 // Restart is used by NewAgent. Accept/Connect should be used to move to checking 421 // for new Agents 422 if self.internal.connection_state.load(Ordering::SeqCst) != ConnectionState::New as u8 { 423 self.internal 424 .update_connection_state(ConnectionState::Checking) 425 .await; 426 } 427 428 Ok(()) 429 } 430 431 /// Initiates the trickle based gathering process. gather_candidates(&self) -> Result<()>432 pub fn gather_candidates(&self) -> Result<()> { 433 if self.gathering_state.load(Ordering::SeqCst) != GatheringState::New as u8 { 434 return Err(Error::ErrMultipleGatherAttempted); 435 } 436 437 if self.internal.on_candidate_hdlr.load().is_none() { 438 return Err(Error::ErrNoOnCandidateHandler); 439 } 440 441 if let Some(gather_candidate_cancel) = &self.gather_candidate_cancel { 442 gather_candidate_cancel(); // Cancel previous gathering routine 443 } 444 445 //TODO: a.gatherCandidateCancel = cancel 446 447 let params = GatherCandidatesInternalParams { 448 udp_network: self.udp_network.clone(), 449 candidate_types: self.candidate_types.clone(), 450 urls: self.urls.clone(), 451 network_types: self.network_types.clone(), 452 mdns_mode: self.mdns_mode, 453 mdns_name: self.mdns_name.clone(), 454 net: Arc::clone(&self.net), 455 interface_filter: self.interface_filter.clone(), 456 ip_filter: self.ip_filter.clone(), 457 ext_ip_mapper: Arc::clone(&self.ext_ip_mapper), 458 agent_internal: Arc::clone(&self.internal), 459 gathering_state: Arc::clone(&self.gathering_state), 460 chan_candidate_tx: Arc::clone(&self.internal.chan_candidate_tx), 461 }; 462 tokio::spawn(async move { 463 Self::gather_candidates_internal(params).await; 464 }); 465 466 Ok(()) 467 } 468 469 /// Returns a list of candidate pair stats. get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats>470 pub async fn get_candidate_pairs_stats(&self) -> Vec<CandidatePairStats> { 471 self.internal.get_candidate_pairs_stats().await 472 } 473 474 /// Returns a list of local candidates stats. get_local_candidates_stats(&self) -> Vec<CandidateStats>475 pub async fn get_local_candidates_stats(&self) -> Vec<CandidateStats> { 476 self.internal.get_local_candidates_stats().await 477 } 478 479 /// Returns a list of remote candidates stats. get_remote_candidates_stats(&self) -> Vec<CandidateStats>480 pub async fn get_remote_candidates_stats(&self) -> Vec<CandidateStats> { 481 self.internal.get_remote_candidates_stats().await 482 } 483 resolve_and_add_multicast_candidate( mdns_conn: Arc<DnsConn>, c: Arc<dyn Candidate + Send + Sync>, ) -> Result<Arc<dyn Candidate + Send + Sync>>484 async fn resolve_and_add_multicast_candidate( 485 mdns_conn: Arc<DnsConn>, 486 c: Arc<dyn Candidate + Send + Sync>, 487 ) -> Result<Arc<dyn Candidate + Send + Sync>> { 488 //TODO: hook up _close_query_signal_tx to Agent or Candidate's Close signal? 489 let (_close_query_signal_tx, close_query_signal_rx) = mpsc::channel(1); 490 let src = match mdns_conn.query(&c.address(), close_query_signal_rx).await { 491 Ok((_, src)) => src, 492 Err(err) => { 493 log::warn!("Failed to discover mDNS candidate {}: {}", c.address(), err); 494 return Err(err.into()); 495 } 496 }; 497 498 c.set_ip(&src.ip())?; 499 500 Ok(c) 501 } 502 close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>)503 async fn close_multicast_conn(mdns_conn: &Option<Arc<DnsConn>>) { 504 if let Some(conn) = mdns_conn { 505 if let Err(err) = conn.close().await { 506 log::warn!("failed to close mDNS Conn: {}", err); 507 } 508 } 509 } 510 } 511