1 use crate::agent::agent_internal::*; 2 use crate::candidate::*; 3 use crate::control::*; 4 use crate::priority::*; 5 use crate::use_candidate::*; 6 7 use stun::{agent::*, attributes::*, fingerprint::*, integrity::*, message::*, textattrs::*}; 8 9 use async_trait::async_trait; 10 use std::net::SocketAddr; 11 use std::sync::atomic::Ordering; 12 use std::sync::Arc; 13 use tokio::time::{Duration, Instant}; 14 15 #[async_trait] 16 trait ControllingSelector { start(&self)17 async fn start(&self); contact_candidates(&self)18 async fn contact_candidates(&self); ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )19 async fn ping_candidate( 20 &self, 21 local: &Arc<dyn Candidate + Send + Sync>, 22 remote: &Arc<dyn Candidate + Send + Sync>, 23 ); handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )24 async fn handle_success_response( 25 &self, 26 m: &Message, 27 local: &Arc<dyn Candidate + Send + Sync>, 28 remote: &Arc<dyn Candidate + Send + Sync>, 29 remote_addr: SocketAddr, 30 ); handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )31 async fn handle_binding_request( 32 &self, 33 m: &Message, 34 local: &Arc<dyn Candidate + Send + Sync>, 35 remote: &Arc<dyn Candidate + Send + Sync>, 36 ); 37 } 38 39 #[async_trait] 40 trait ControlledSelector { start(&self)41 async fn start(&self); contact_candidates(&self)42 async fn contact_candidates(&self); ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )43 async fn ping_candidate( 44 &self, 45 local: &Arc<dyn Candidate + Send + Sync>, 46 remote: &Arc<dyn Candidate + Send + Sync>, 47 ); handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )48 async fn handle_success_response( 49 &self, 50 m: &Message, 51 local: &Arc<dyn Candidate + Send + Sync>, 52 remote: &Arc<dyn Candidate + Send + Sync>, 53 remote_addr: SocketAddr, 54 ); handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )55 async fn handle_binding_request( 56 &self, 57 m: &Message, 58 local: &Arc<dyn Candidate + Send + Sync>, 59 remote: &Arc<dyn Candidate + Send + Sync>, 60 ); 61 } 62 63 impl AgentInternal { is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool64 fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool { 65 let start_time = *self.start_time.lock(); 66 match c.candidate_type() { 67 CandidateType::Host => { 68 Instant::now() 69 .checked_duration_since(start_time) 70 .unwrap_or_else(|| Duration::from_secs(0)) 71 .as_nanos() 72 > self.host_acceptance_min_wait.as_nanos() 73 } 74 CandidateType::ServerReflexive => { 75 Instant::now() 76 .checked_duration_since(start_time) 77 .unwrap_or_else(|| Duration::from_secs(0)) 78 .as_nanos() 79 > self.srflx_acceptance_min_wait.as_nanos() 80 } 81 CandidateType::PeerReflexive => { 82 Instant::now() 83 .checked_duration_since(start_time) 84 .unwrap_or_else(|| Duration::from_secs(0)) 85 .as_nanos() 86 > self.prflx_acceptance_min_wait.as_nanos() 87 } 88 CandidateType::Relay => { 89 Instant::now() 90 .checked_duration_since(start_time) 91 .unwrap_or_else(|| Duration::from_secs(0)) 92 .as_nanos() 93 > self.relay_acceptance_min_wait.as_nanos() 94 } 95 CandidateType::Unspecified => { 96 log::error!( 97 "is_nominatable invalid candidate type {}", 98 c.candidate_type() 99 ); 100 false 101 } 102 } 103 } 104 nominate_pair(&self)105 async fn nominate_pair(&self) { 106 let result = { 107 let nominated_pair = self.nominated_pair.lock().await; 108 if let Some(pair) = &*nominated_pair { 109 // The controlling agent MUST include the USE-CANDIDATE attribute in 110 // order to nominate a candidate pair (Section 8.1.1). The controlled 111 // agent MUST NOT include the USE-CANDIDATE attribute in a Binding 112 // request. 113 114 let (msg, result) = { 115 let ufrag_pwd = self.ufrag_pwd.lock().await; 116 let username = 117 ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str(); 118 let mut msg = Message::new(); 119 let result = msg.build(&[ 120 Box::new(BINDING_REQUEST), 121 Box::new(TransactionId::new()), 122 Box::new(Username::new(ATTR_USERNAME, username)), 123 Box::<UseCandidateAttr>::default(), 124 Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))), 125 Box::new(PriorityAttr(pair.local.priority())), 126 Box::new(MessageIntegrity::new_short_term_integrity( 127 ufrag_pwd.remote_pwd.clone(), 128 )), 129 Box::new(FINGERPRINT), 130 ]); 131 (msg, result) 132 }; 133 134 if let Err(err) = result { 135 log::error!("{}", err); 136 None 137 } else { 138 log::trace!( 139 "ping STUN (nominate candidate pair from {} to {}", 140 pair.local, 141 pair.remote 142 ); 143 let local = pair.local.clone(); 144 let remote = pair.remote.clone(); 145 Some((msg, local, remote)) 146 } 147 } else { 148 None 149 } 150 }; 151 152 if let Some((msg, local, remote)) = result { 153 self.send_binding_request(&msg, &local, &remote).await; 154 } 155 } 156 start(&self)157 pub(crate) async fn start(&self) { 158 if self.is_controlling.load(Ordering::SeqCst) { 159 ControllingSelector::start(self).await; 160 } else { 161 ControlledSelector::start(self).await; 162 } 163 } 164 contact_candidates(&self)165 pub(crate) async fn contact_candidates(&self) { 166 if self.is_controlling.load(Ordering::SeqCst) { 167 ControllingSelector::contact_candidates(self).await; 168 } else { 169 ControlledSelector::contact_candidates(self).await; 170 } 171 } 172 ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )173 pub(crate) async fn ping_candidate( 174 &self, 175 local: &Arc<dyn Candidate + Send + Sync>, 176 remote: &Arc<dyn Candidate + Send + Sync>, 177 ) { 178 if self.is_controlling.load(Ordering::SeqCst) { 179 ControllingSelector::ping_candidate(self, local, remote).await; 180 } else { 181 ControlledSelector::ping_candidate(self, local, remote).await; 182 } 183 } 184 handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )185 pub(crate) async fn handle_success_response( 186 &self, 187 m: &Message, 188 local: &Arc<dyn Candidate + Send + Sync>, 189 remote: &Arc<dyn Candidate + Send + Sync>, 190 remote_addr: SocketAddr, 191 ) { 192 if self.is_controlling.load(Ordering::SeqCst) { 193 ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await; 194 } else { 195 ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await; 196 } 197 } 198 handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )199 pub(crate) async fn handle_binding_request( 200 &self, 201 m: &Message, 202 local: &Arc<dyn Candidate + Send + Sync>, 203 remote: &Arc<dyn Candidate + Send + Sync>, 204 ) { 205 if self.is_controlling.load(Ordering::SeqCst) { 206 ControllingSelector::handle_binding_request(self, m, local, remote).await; 207 } else { 208 ControlledSelector::handle_binding_request(self, m, local, remote).await; 209 } 210 } 211 } 212 213 #[async_trait] 214 impl ControllingSelector for AgentInternal { start(&self)215 async fn start(&self) { 216 { 217 let mut nominated_pair = self.nominated_pair.lock().await; 218 *nominated_pair = None; 219 } 220 *self.start_time.lock() = Instant::now(); 221 } 222 contact_candidates(&self)223 async fn contact_candidates(&self) { 224 // A lite selector should not contact candidates 225 if self.lite.load(Ordering::SeqCst) { 226 // This only happens if both peers are lite. See RFC 8445 S6.1.1 and S6.2 227 log::trace!("now falling back to full agent"); 228 } 229 230 let nominated_pair_is_some = { 231 let nominated_pair = self.nominated_pair.lock().await; 232 nominated_pair.is_some() 233 }; 234 235 if self.agent_conn.get_selected_pair().is_some() { 236 if self.validate_selected_pair().await { 237 log::trace!("[{}]: checking keepalive", self.get_name()); 238 self.check_keepalive().await; 239 } 240 } else if nominated_pair_is_some { 241 self.nominate_pair().await; 242 } else { 243 let has_nominated_pair = 244 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await { 245 self.is_nominatable(&p.local) && self.is_nominatable(&p.remote) 246 } else { 247 false 248 }; 249 250 if has_nominated_pair { 251 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await { 252 log::trace!( 253 "Nominatable pair found, nominating ({}, {})", 254 p.local.to_string(), 255 p.remote.to_string() 256 ); 257 p.nominated.store(true, Ordering::SeqCst); 258 { 259 let mut nominated_pair = self.nominated_pair.lock().await; 260 *nominated_pair = Some(p); 261 } 262 } 263 264 self.nominate_pair().await; 265 } else { 266 self.ping_all_candidates().await; 267 } 268 } 269 } 270 ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )271 async fn ping_candidate( 272 &self, 273 local: &Arc<dyn Candidate + Send + Sync>, 274 remote: &Arc<dyn Candidate + Send + Sync>, 275 ) { 276 let (msg, result) = { 277 let ufrag_pwd = self.ufrag_pwd.lock().await; 278 let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str(); 279 let mut msg = Message::new(); 280 let result = msg.build(&[ 281 Box::new(BINDING_REQUEST), 282 Box::new(TransactionId::new()), 283 Box::new(Username::new(ATTR_USERNAME, username)), 284 Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))), 285 Box::new(PriorityAttr(local.priority())), 286 Box::new(MessageIntegrity::new_short_term_integrity( 287 ufrag_pwd.remote_pwd.clone(), 288 )), 289 Box::new(FINGERPRINT), 290 ]); 291 (msg, result) 292 }; 293 294 if let Err(err) = result { 295 log::error!("{}", err); 296 } else { 297 self.send_binding_request(&msg, local, remote).await; 298 } 299 } 300 handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )301 async fn handle_success_response( 302 &self, 303 m: &Message, 304 local: &Arc<dyn Candidate + Send + Sync>, 305 remote: &Arc<dyn Candidate + Send + Sync>, 306 remote_addr: SocketAddr, 307 ) { 308 if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await { 309 let transaction_addr = pending_request.destination; 310 311 // Assert that NAT is not symmetric 312 // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1 313 if transaction_addr != remote_addr { 314 log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote); 315 return; 316 } 317 318 log::trace!( 319 "inbound STUN (SuccessResponse) from {} to {}", 320 remote, 321 local 322 ); 323 let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none(); 324 325 if let Some(p) = self.find_pair(local, remote).await { 326 p.state 327 .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst); 328 log::trace!( 329 "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}", 330 p, 331 p.state.load(Ordering::SeqCst), 332 pending_request.is_use_candidate, 333 selected_pair_is_none 334 ); 335 if pending_request.is_use_candidate && selected_pair_is_none { 336 self.set_selected_pair(Some(Arc::clone(&p))).await; 337 } 338 } else { 339 // This shouldn't happen 340 log::error!("Success response from invalid candidate pair"); 341 } 342 } else { 343 log::warn!( 344 "discard message from ({}), unknown TransactionID 0x{:?}", 345 remote, 346 m.transaction_id 347 ); 348 } 349 } 350 handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )351 async fn handle_binding_request( 352 &self, 353 m: &Message, 354 local: &Arc<dyn Candidate + Send + Sync>, 355 remote: &Arc<dyn Candidate + Send + Sync>, 356 ) { 357 self.send_binding_success(m, local, remote).await; 358 log::trace!("controllingSelector: sendBindingSuccess"); 359 360 if let Some(p) = self.find_pair(local, remote).await { 361 let nominated_pair_is_none = { 362 let nominated_pair = self.nominated_pair.lock().await; 363 nominated_pair.is_none() 364 }; 365 366 log::trace!( 367 "controllingSelector: after findPair {}, p.state: {}, {}", 368 p, 369 p.state.load(Ordering::SeqCst), 370 nominated_pair_is_none, 371 //self.agent_conn.get_selected_pair().await.is_none() //, {} 372 ); 373 if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 374 && nominated_pair_is_none 375 && self.agent_conn.get_selected_pair().is_none() 376 { 377 if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair().await { 378 log::trace!( 379 "controllingSelector: getBestAvailableCandidatePair {}", 380 best_pair 381 ); 382 if best_pair == p 383 && self.is_nominatable(&p.local) 384 && self.is_nominatable(&p.remote) 385 { 386 log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated", 387 p.local, p.remote); 388 { 389 let mut nominated_pair = self.nominated_pair.lock().await; 390 *nominated_pair = Some(p); 391 } 392 self.nominate_pair().await; 393 } 394 } else { 395 log::trace!("No best pair available"); 396 } 397 } 398 } else { 399 log::trace!("controllingSelector: addPair"); 400 self.add_pair(local.clone(), remote.clone()).await; 401 } 402 } 403 } 404 405 #[async_trait] 406 impl ControlledSelector for AgentInternal { start(&self)407 async fn start(&self) {} 408 contact_candidates(&self)409 async fn contact_candidates(&self) { 410 // A lite selector should not contact candidates 411 if self.lite.load(Ordering::SeqCst) { 412 self.validate_selected_pair().await; 413 } else if self.agent_conn.get_selected_pair().is_some() { 414 if self.validate_selected_pair().await { 415 log::trace!("[{}]: checking keepalive", self.get_name()); 416 self.check_keepalive().await; 417 } 418 } else { 419 self.ping_all_candidates().await; 420 } 421 } 422 ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )423 async fn ping_candidate( 424 &self, 425 local: &Arc<dyn Candidate + Send + Sync>, 426 remote: &Arc<dyn Candidate + Send + Sync>, 427 ) { 428 let (msg, result) = { 429 let ufrag_pwd = self.ufrag_pwd.lock().await; 430 let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str(); 431 let mut msg = Message::new(); 432 let result = msg.build(&[ 433 Box::new(BINDING_REQUEST), 434 Box::new(TransactionId::new()), 435 Box::new(Username::new(ATTR_USERNAME, username)), 436 Box::new(AttrControlled(self.tie_breaker.load(Ordering::SeqCst))), 437 Box::new(PriorityAttr(local.priority())), 438 Box::new(MessageIntegrity::new_short_term_integrity( 439 ufrag_pwd.remote_pwd.clone(), 440 )), 441 Box::new(FINGERPRINT), 442 ]); 443 (msg, result) 444 }; 445 446 if let Err(err) = result { 447 log::error!("{}", err); 448 } else { 449 self.send_binding_request(&msg, local, remote).await; 450 } 451 } 452 handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )453 async fn handle_success_response( 454 &self, 455 m: &Message, 456 local: &Arc<dyn Candidate + Send + Sync>, 457 remote: &Arc<dyn Candidate + Send + Sync>, 458 remote_addr: SocketAddr, 459 ) { 460 // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 461 // If the controlled agent does not accept the request from the 462 // controlling agent, the controlled agent MUST reject the nomination 463 // request with an appropriate error code response (e.g., 400) 464 // [RFC5389]. 465 466 if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await { 467 let transaction_addr = pending_request.destination; 468 469 // Assert that NAT is not symmetric 470 // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1 471 if transaction_addr != remote_addr { 472 log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote); 473 return; 474 } 475 476 log::trace!( 477 "inbound STUN (SuccessResponse) from {} to {}", 478 remote, 479 local 480 ); 481 482 if let Some(p) = self.find_pair(local, remote).await { 483 p.state 484 .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst); 485 log::trace!("Found valid candidate pair: {}", p); 486 } else { 487 // This shouldn't happen 488 log::error!("Success response from invalid candidate pair"); 489 } 490 } else { 491 log::warn!( 492 "discard message from ({}), unknown TransactionID 0x{:?}", 493 remote, 494 m.transaction_id 495 ); 496 } 497 } 498 handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )499 async fn handle_binding_request( 500 &self, 501 m: &Message, 502 local: &Arc<dyn Candidate + Send + Sync>, 503 remote: &Arc<dyn Candidate + Send + Sync>, 504 ) { 505 if self.find_pair(local, remote).await.is_none() { 506 self.add_pair(local.clone(), remote.clone()).await; 507 } 508 509 if let Some(p) = self.find_pair(local, remote).await { 510 let use_candidate = m.contains(ATTR_USE_CANDIDATE); 511 if use_candidate { 512 // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 513 514 if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 { 515 // If the state of this pair is Succeeded, it means that the check 516 // previously sent by this pair produced a successful response and 517 // generated a valid pair (Section 7.2.5.3.2). The agent sets the 518 // nominated flag value of the valid pair to true. 519 if self.agent_conn.get_selected_pair().is_none() { 520 self.set_selected_pair(Some(Arc::clone(&p))).await; 521 } 522 self.send_binding_success(m, local, remote).await; 523 } else { 524 // If the received Binding request triggered a new check to be 525 // enqueued in the triggered-check queue (Section 7.3.1.4), once the 526 // check is sent and if it generates a successful response, and 527 // generates a valid pair, the agent sets the nominated flag of the 528 // pair to true. If the request fails (Section 7.2.5.2), the agent 529 // MUST remove the candidate pair from the valid list, set the 530 // candidate pair state to Failed, and set the checklist state to 531 // Failed. 532 self.ping_candidate(local, remote).await; 533 } 534 } else { 535 self.send_binding_success(m, local, remote).await; 536 self.ping_candidate(local, remote).await; 537 } 538 } 539 } 540 } 541