xref: /webrtc/ice/src/agent/agent_selector.rs (revision 630c46fe)
1 use crate::agent::agent_internal::*;
2 use crate::candidate::*;
3 use crate::control::*;
4 use crate::priority::*;
5 use crate::use_candidate::*;
6 
7 use stun::{agent::*, attributes::*, fingerprint::*, integrity::*, message::*, textattrs::*};
8 
9 use async_trait::async_trait;
10 use std::net::SocketAddr;
11 use std::sync::atomic::Ordering;
12 use std::sync::Arc;
13 use tokio::time::{Duration, Instant};
14 
15 #[async_trait]
16 trait ControllingSelector {
start(&self)17     async fn start(&self);
contact_candidates(&self)18     async fn contact_candidates(&self);
ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )19     async fn ping_candidate(
20         &self,
21         local: &Arc<dyn Candidate + Send + Sync>,
22         remote: &Arc<dyn Candidate + Send + Sync>,
23     );
handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )24     async fn handle_success_response(
25         &self,
26         m: &Message,
27         local: &Arc<dyn Candidate + Send + Sync>,
28         remote: &Arc<dyn Candidate + Send + Sync>,
29         remote_addr: SocketAddr,
30     );
handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )31     async fn handle_binding_request(
32         &self,
33         m: &Message,
34         local: &Arc<dyn Candidate + Send + Sync>,
35         remote: &Arc<dyn Candidate + Send + Sync>,
36     );
37 }
38 
39 #[async_trait]
40 trait ControlledSelector {
start(&self)41     async fn start(&self);
contact_candidates(&self)42     async fn contact_candidates(&self);
ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )43     async fn ping_candidate(
44         &self,
45         local: &Arc<dyn Candidate + Send + Sync>,
46         remote: &Arc<dyn Candidate + Send + Sync>,
47     );
handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )48     async fn handle_success_response(
49         &self,
50         m: &Message,
51         local: &Arc<dyn Candidate + Send + Sync>,
52         remote: &Arc<dyn Candidate + Send + Sync>,
53         remote_addr: SocketAddr,
54     );
handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )55     async fn handle_binding_request(
56         &self,
57         m: &Message,
58         local: &Arc<dyn Candidate + Send + Sync>,
59         remote: &Arc<dyn Candidate + Send + Sync>,
60     );
61 }
62 
63 impl AgentInternal {
is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool64     fn is_nominatable(&self, c: &Arc<dyn Candidate + Send + Sync>) -> bool {
65         let start_time = *self.start_time.lock();
66         match c.candidate_type() {
67             CandidateType::Host => {
68                 Instant::now()
69                     .checked_duration_since(start_time)
70                     .unwrap_or_else(|| Duration::from_secs(0))
71                     .as_nanos()
72                     > self.host_acceptance_min_wait.as_nanos()
73             }
74             CandidateType::ServerReflexive => {
75                 Instant::now()
76                     .checked_duration_since(start_time)
77                     .unwrap_or_else(|| Duration::from_secs(0))
78                     .as_nanos()
79                     > self.srflx_acceptance_min_wait.as_nanos()
80             }
81             CandidateType::PeerReflexive => {
82                 Instant::now()
83                     .checked_duration_since(start_time)
84                     .unwrap_or_else(|| Duration::from_secs(0))
85                     .as_nanos()
86                     > self.prflx_acceptance_min_wait.as_nanos()
87             }
88             CandidateType::Relay => {
89                 Instant::now()
90                     .checked_duration_since(start_time)
91                     .unwrap_or_else(|| Duration::from_secs(0))
92                     .as_nanos()
93                     > self.relay_acceptance_min_wait.as_nanos()
94             }
95             CandidateType::Unspecified => {
96                 log::error!(
97                     "is_nominatable invalid candidate type {}",
98                     c.candidate_type()
99                 );
100                 false
101             }
102         }
103     }
104 
nominate_pair(&self)105     async fn nominate_pair(&self) {
106         let result = {
107             let nominated_pair = self.nominated_pair.lock().await;
108             if let Some(pair) = &*nominated_pair {
109                 // The controlling agent MUST include the USE-CANDIDATE attribute in
110                 // order to nominate a candidate pair (Section 8.1.1).  The controlled
111                 // agent MUST NOT include the USE-CANDIDATE attribute in a Binding
112                 // request.
113 
114                 let (msg, result) = {
115                     let ufrag_pwd = self.ufrag_pwd.lock().await;
116                     let username =
117                         ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
118                     let mut msg = Message::new();
119                     let result = msg.build(&[
120                         Box::new(BINDING_REQUEST),
121                         Box::new(TransactionId::new()),
122                         Box::new(Username::new(ATTR_USERNAME, username)),
123                         Box::<UseCandidateAttr>::default(),
124                         Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
125                         Box::new(PriorityAttr(pair.local.priority())),
126                         Box::new(MessageIntegrity::new_short_term_integrity(
127                             ufrag_pwd.remote_pwd.clone(),
128                         )),
129                         Box::new(FINGERPRINT),
130                     ]);
131                     (msg, result)
132                 };
133 
134                 if let Err(err) = result {
135                     log::error!("{}", err);
136                     None
137                 } else {
138                     log::trace!(
139                         "ping STUN (nominate candidate pair from {} to {}",
140                         pair.local,
141                         pair.remote
142                     );
143                     let local = pair.local.clone();
144                     let remote = pair.remote.clone();
145                     Some((msg, local, remote))
146                 }
147             } else {
148                 None
149             }
150         };
151 
152         if let Some((msg, local, remote)) = result {
153             self.send_binding_request(&msg, &local, &remote).await;
154         }
155     }
156 
start(&self)157     pub(crate) async fn start(&self) {
158         if self.is_controlling.load(Ordering::SeqCst) {
159             ControllingSelector::start(self).await;
160         } else {
161             ControlledSelector::start(self).await;
162         }
163     }
164 
contact_candidates(&self)165     pub(crate) async fn contact_candidates(&self) {
166         if self.is_controlling.load(Ordering::SeqCst) {
167             ControllingSelector::contact_candidates(self).await;
168         } else {
169             ControlledSelector::contact_candidates(self).await;
170         }
171     }
172 
ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )173     pub(crate) async fn ping_candidate(
174         &self,
175         local: &Arc<dyn Candidate + Send + Sync>,
176         remote: &Arc<dyn Candidate + Send + Sync>,
177     ) {
178         if self.is_controlling.load(Ordering::SeqCst) {
179             ControllingSelector::ping_candidate(self, local, remote).await;
180         } else {
181             ControlledSelector::ping_candidate(self, local, remote).await;
182         }
183     }
184 
handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )185     pub(crate) async fn handle_success_response(
186         &self,
187         m: &Message,
188         local: &Arc<dyn Candidate + Send + Sync>,
189         remote: &Arc<dyn Candidate + Send + Sync>,
190         remote_addr: SocketAddr,
191     ) {
192         if self.is_controlling.load(Ordering::SeqCst) {
193             ControllingSelector::handle_success_response(self, m, local, remote, remote_addr).await;
194         } else {
195             ControlledSelector::handle_success_response(self, m, local, remote, remote_addr).await;
196         }
197     }
198 
handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )199     pub(crate) async fn handle_binding_request(
200         &self,
201         m: &Message,
202         local: &Arc<dyn Candidate + Send + Sync>,
203         remote: &Arc<dyn Candidate + Send + Sync>,
204     ) {
205         if self.is_controlling.load(Ordering::SeqCst) {
206             ControllingSelector::handle_binding_request(self, m, local, remote).await;
207         } else {
208             ControlledSelector::handle_binding_request(self, m, local, remote).await;
209         }
210     }
211 }
212 
213 #[async_trait]
214 impl ControllingSelector for AgentInternal {
start(&self)215     async fn start(&self) {
216         {
217             let mut nominated_pair = self.nominated_pair.lock().await;
218             *nominated_pair = None;
219         }
220         *self.start_time.lock() = Instant::now();
221     }
222 
contact_candidates(&self)223     async fn contact_candidates(&self) {
224         // A lite selector should not contact candidates
225         if self.lite.load(Ordering::SeqCst) {
226             // This only happens if both peers are lite. See RFC 8445 S6.1.1 and S6.2
227             log::trace!("now falling back to full agent");
228         }
229 
230         let nominated_pair_is_some = {
231             let nominated_pair = self.nominated_pair.lock().await;
232             nominated_pair.is_some()
233         };
234 
235         if self.agent_conn.get_selected_pair().is_some() {
236             if self.validate_selected_pair().await {
237                 log::trace!("[{}]: checking keepalive", self.get_name());
238                 self.check_keepalive().await;
239             }
240         } else if nominated_pair_is_some {
241             self.nominate_pair().await;
242         } else {
243             let has_nominated_pair =
244                 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
245                     self.is_nominatable(&p.local) && self.is_nominatable(&p.remote)
246                 } else {
247                     false
248                 };
249 
250             if has_nominated_pair {
251                 if let Some(p) = self.agent_conn.get_best_valid_candidate_pair().await {
252                     log::trace!(
253                         "Nominatable pair found, nominating ({}, {})",
254                         p.local.to_string(),
255                         p.remote.to_string()
256                     );
257                     p.nominated.store(true, Ordering::SeqCst);
258                     {
259                         let mut nominated_pair = self.nominated_pair.lock().await;
260                         *nominated_pair = Some(p);
261                     }
262                 }
263 
264                 self.nominate_pair().await;
265             } else {
266                 self.ping_all_candidates().await;
267             }
268         }
269     }
270 
ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )271     async fn ping_candidate(
272         &self,
273         local: &Arc<dyn Candidate + Send + Sync>,
274         remote: &Arc<dyn Candidate + Send + Sync>,
275     ) {
276         let (msg, result) = {
277             let ufrag_pwd = self.ufrag_pwd.lock().await;
278             let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
279             let mut msg = Message::new();
280             let result = msg.build(&[
281                 Box::new(BINDING_REQUEST),
282                 Box::new(TransactionId::new()),
283                 Box::new(Username::new(ATTR_USERNAME, username)),
284                 Box::new(AttrControlling(self.tie_breaker.load(Ordering::SeqCst))),
285                 Box::new(PriorityAttr(local.priority())),
286                 Box::new(MessageIntegrity::new_short_term_integrity(
287                     ufrag_pwd.remote_pwd.clone(),
288                 )),
289                 Box::new(FINGERPRINT),
290             ]);
291             (msg, result)
292         };
293 
294         if let Err(err) = result {
295             log::error!("{}", err);
296         } else {
297             self.send_binding_request(&msg, local, remote).await;
298         }
299     }
300 
handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )301     async fn handle_success_response(
302         &self,
303         m: &Message,
304         local: &Arc<dyn Candidate + Send + Sync>,
305         remote: &Arc<dyn Candidate + Send + Sync>,
306         remote_addr: SocketAddr,
307     ) {
308         if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
309             let transaction_addr = pending_request.destination;
310 
311             // Assert that NAT is not symmetric
312             // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1
313             if transaction_addr != remote_addr {
314                 log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
315                 return;
316             }
317 
318             log::trace!(
319                 "inbound STUN (SuccessResponse) from {} to {}",
320                 remote,
321                 local
322             );
323             let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none();
324 
325             if let Some(p) = self.find_pair(local, remote).await {
326                 p.state
327                     .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
328                 log::trace!(
329                     "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}",
330                     p,
331                     p.state.load(Ordering::SeqCst),
332                     pending_request.is_use_candidate,
333                     selected_pair_is_none
334                 );
335                 if pending_request.is_use_candidate && selected_pair_is_none {
336                     self.set_selected_pair(Some(Arc::clone(&p))).await;
337                 }
338             } else {
339                 // This shouldn't happen
340                 log::error!("Success response from invalid candidate pair");
341             }
342         } else {
343             log::warn!(
344                 "discard message from ({}), unknown TransactionID 0x{:?}",
345                 remote,
346                 m.transaction_id
347             );
348         }
349     }
350 
handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )351     async fn handle_binding_request(
352         &self,
353         m: &Message,
354         local: &Arc<dyn Candidate + Send + Sync>,
355         remote: &Arc<dyn Candidate + Send + Sync>,
356     ) {
357         self.send_binding_success(m, local, remote).await;
358         log::trace!("controllingSelector: sendBindingSuccess");
359 
360         if let Some(p) = self.find_pair(local, remote).await {
361             let nominated_pair_is_none = {
362                 let nominated_pair = self.nominated_pair.lock().await;
363                 nominated_pair.is_none()
364             };
365 
366             log::trace!(
367                 "controllingSelector: after findPair {}, p.state: {}, {}",
368                 p,
369                 p.state.load(Ordering::SeqCst),
370                 nominated_pair_is_none,
371                 //self.agent_conn.get_selected_pair().await.is_none() //, {}
372             );
373             if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8
374                 && nominated_pair_is_none
375                 && self.agent_conn.get_selected_pair().is_none()
376             {
377                 if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair().await {
378                     log::trace!(
379                         "controllingSelector: getBestAvailableCandidatePair {}",
380                         best_pair
381                     );
382                     if best_pair == p
383                         && self.is_nominatable(&p.local)
384                         && self.is_nominatable(&p.remote)
385                     {
386                         log::trace!("The candidate ({}, {}) is the best candidate available, marking it as nominated",
387                             p.local, p.remote);
388                         {
389                             let mut nominated_pair = self.nominated_pair.lock().await;
390                             *nominated_pair = Some(p);
391                         }
392                         self.nominate_pair().await;
393                     }
394                 } else {
395                     log::trace!("No best pair available");
396                 }
397             }
398         } else {
399             log::trace!("controllingSelector: addPair");
400             self.add_pair(local.clone(), remote.clone()).await;
401         }
402     }
403 }
404 
405 #[async_trait]
406 impl ControlledSelector for AgentInternal {
start(&self)407     async fn start(&self) {}
408 
contact_candidates(&self)409     async fn contact_candidates(&self) {
410         // A lite selector should not contact candidates
411         if self.lite.load(Ordering::SeqCst) {
412             self.validate_selected_pair().await;
413         } else if self.agent_conn.get_selected_pair().is_some() {
414             if self.validate_selected_pair().await {
415                 log::trace!("[{}]: checking keepalive", self.get_name());
416                 self.check_keepalive().await;
417             }
418         } else {
419             self.ping_all_candidates().await;
420         }
421     }
422 
ping_candidate( &self, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )423     async fn ping_candidate(
424         &self,
425         local: &Arc<dyn Candidate + Send + Sync>,
426         remote: &Arc<dyn Candidate + Send + Sync>,
427     ) {
428         let (msg, result) = {
429             let ufrag_pwd = self.ufrag_pwd.lock().await;
430             let username = ufrag_pwd.remote_ufrag.clone() + ":" + ufrag_pwd.local_ufrag.as_str();
431             let mut msg = Message::new();
432             let result = msg.build(&[
433                 Box::new(BINDING_REQUEST),
434                 Box::new(TransactionId::new()),
435                 Box::new(Username::new(ATTR_USERNAME, username)),
436                 Box::new(AttrControlled(self.tie_breaker.load(Ordering::SeqCst))),
437                 Box::new(PriorityAttr(local.priority())),
438                 Box::new(MessageIntegrity::new_short_term_integrity(
439                     ufrag_pwd.remote_pwd.clone(),
440                 )),
441                 Box::new(FINGERPRINT),
442             ]);
443             (msg, result)
444         };
445 
446         if let Err(err) = result {
447             log::error!("{}", err);
448         } else {
449             self.send_binding_request(&msg, local, remote).await;
450         }
451     }
452 
handle_success_response( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, remote_addr: SocketAddr, )453     async fn handle_success_response(
454         &self,
455         m: &Message,
456         local: &Arc<dyn Candidate + Send + Sync>,
457         remote: &Arc<dyn Candidate + Send + Sync>,
458         remote_addr: SocketAddr,
459     ) {
460         // https://tools.ietf.org/html/rfc8445#section-7.3.1.5
461         // If the controlled agent does not accept the request from the
462         // controlling agent, the controlled agent MUST reject the nomination
463         // request with an appropriate error code response (e.g., 400)
464         // [RFC5389].
465 
466         if let Some(pending_request) = self.handle_inbound_binding_success(m.transaction_id).await {
467             let transaction_addr = pending_request.destination;
468 
469             // Assert that NAT is not symmetric
470             // https://tools.ietf.org/html/rfc8445#section-7.2.5.2.1
471             if transaction_addr != remote_addr {
472                 log::debug!("discard message: transaction source and destination does not match expected({}), actual({})", transaction_addr, remote);
473                 return;
474             }
475 
476             log::trace!(
477                 "inbound STUN (SuccessResponse) from {} to {}",
478                 remote,
479                 local
480             );
481 
482             if let Some(p) = self.find_pair(local, remote).await {
483                 p.state
484                     .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
485                 log::trace!("Found valid candidate pair: {}", p);
486             } else {
487                 // This shouldn't happen
488                 log::error!("Success response from invalid candidate pair");
489             }
490         } else {
491             log::warn!(
492                 "discard message from ({}), unknown TransactionID 0x{:?}",
493                 remote,
494                 m.transaction_id
495             );
496         }
497     }
498 
handle_binding_request( &self, m: &Message, local: &Arc<dyn Candidate + Send + Sync>, remote: &Arc<dyn Candidate + Send + Sync>, )499     async fn handle_binding_request(
500         &self,
501         m: &Message,
502         local: &Arc<dyn Candidate + Send + Sync>,
503         remote: &Arc<dyn Candidate + Send + Sync>,
504     ) {
505         if self.find_pair(local, remote).await.is_none() {
506             self.add_pair(local.clone(), remote.clone()).await;
507         }
508 
509         if let Some(p) = self.find_pair(local, remote).await {
510             let use_candidate = m.contains(ATTR_USE_CANDIDATE);
511             if use_candidate {
512                 // https://tools.ietf.org/html/rfc8445#section-7.3.1.5
513 
514                 if p.state.load(Ordering::SeqCst) == CandidatePairState::Succeeded as u8 {
515                     // If the state of this pair is Succeeded, it means that the check
516                     // previously sent by this pair produced a successful response and
517                     // generated a valid pair (Section 7.2.5.3.2).  The agent sets the
518                     // nominated flag value of the valid pair to true.
519                     if self.agent_conn.get_selected_pair().is_none() {
520                         self.set_selected_pair(Some(Arc::clone(&p))).await;
521                     }
522                     self.send_binding_success(m, local, remote).await;
523                 } else {
524                     // If the received Binding request triggered a new check to be
525                     // enqueued in the triggered-check queue (Section 7.3.1.4), once the
526                     // check is sent and if it generates a successful response, and
527                     // generates a valid pair, the agent sets the nominated flag of the
528                     // pair to true.  If the request fails (Section 7.2.5.2), the agent
529                     // MUST remove the candidate pair from the valid list, set the
530                     // candidate pair state to Failed, and set the checklist state to
531                     // Failed.
532                     self.ping_candidate(local, remote).await;
533                 }
534             } else {
535                 self.send_binding_success(m, local, remote).await;
536                 self.ping_candidate(local, remote).await;
537             }
538         }
539     }
540 }
541