1 use super::*;
2 use crate::candidate::candidate_host::CandidateHostConfig;
3 use crate::candidate::candidate_peer_reflexive::CandidatePeerReflexiveConfig;
4 use crate::candidate::candidate_relay::CandidateRelayConfig;
5 use crate::candidate::candidate_server_reflexive::CandidateServerReflexiveConfig;
6 use crate::error::*;
7 use crate::util::*;
8
9 use async_trait::async_trait;
10 use crc::{Crc, CRC_32_ISCSI};
11 use std::{
12 fmt,
13 ops::Add,
14 sync::{
15 atomic::{AtomicU16, AtomicU64, AtomicU8, Ordering},
16 Arc,
17 },
18 time::{Duration, SystemTime, UNIX_EPOCH},
19 };
20 use tokio::sync::{broadcast, Mutex};
21 use util::sync::Mutex as SyncMutex;
22
23 #[derive(Default)]
24 pub struct CandidateBaseConfig {
25 pub candidate_id: String,
26 pub network: String,
27 pub address: String,
28 pub port: u16,
29 pub component: u16,
30 pub priority: u32,
31 pub foundation: String,
32 pub conn: Option<Arc<dyn util::Conn + Send + Sync>>,
33 pub initialized_ch: Option<broadcast::Receiver<()>>,
34 }
35
36 pub struct CandidateBase {
37 pub(crate) id: String,
38 pub(crate) network_type: AtomicU8,
39 pub(crate) candidate_type: CandidateType,
40
41 pub(crate) component: AtomicU16,
42 pub(crate) address: String,
43 pub(crate) port: u16,
44 pub(crate) related_address: Option<CandidateRelatedAddress>,
45 pub(crate) tcp_type: TcpType,
46
47 pub(crate) resolved_addr: SyncMutex<SocketAddr>,
48
49 pub(crate) last_sent: AtomicU64,
50 pub(crate) last_received: AtomicU64,
51
52 pub(crate) conn: Option<Arc<dyn util::Conn + Send + Sync>>,
53 pub(crate) closed_ch: Arc<Mutex<Option<broadcast::Sender<()>>>>,
54
55 pub(crate) foundation_override: String,
56 pub(crate) priority_override: u32,
57
58 //CandidateHost
59 pub(crate) network: String,
60 //CandidateRelay
61 pub(crate) relay_client: Option<Arc<turn::client::Client>>,
62 }
63
64 impl Default for CandidateBase {
default() -> Self65 fn default() -> Self {
66 Self {
67 id: String::new(),
68 network_type: AtomicU8::new(0),
69 candidate_type: CandidateType::default(),
70
71 component: AtomicU16::new(0),
72 address: String::new(),
73 port: 0,
74 related_address: None,
75 tcp_type: TcpType::default(),
76
77 resolved_addr: SyncMutex::new(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)),
78
79 last_sent: AtomicU64::new(0),
80 last_received: AtomicU64::new(0),
81
82 conn: None,
83 closed_ch: Arc::new(Mutex::new(None)),
84
85 foundation_override: String::new(),
86 priority_override: 0,
87 network: String::new(),
88 relay_client: None,
89 }
90 }
91 }
92
93 // String makes the candidateBase printable
94 impl fmt::Display for CandidateBase {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 if let Some(related_address) = self.related_address() {
97 write!(
98 f,
99 "{} {} {}:{}{}",
100 self.network_type(),
101 self.candidate_type(),
102 self.address(),
103 self.port(),
104 related_address,
105 )
106 } else {
107 write!(
108 f,
109 "{} {} {}:{}",
110 self.network_type(),
111 self.candidate_type(),
112 self.address(),
113 self.port(),
114 )
115 }
116 }
117 }
118
119 #[async_trait]
120 impl Candidate for CandidateBase {
foundation(&self) -> String121 fn foundation(&self) -> String {
122 if !self.foundation_override.is_empty() {
123 return self.foundation_override.clone();
124 }
125
126 let mut buf = vec![];
127 buf.extend_from_slice(self.candidate_type().to_string().as_bytes());
128 buf.extend_from_slice(self.address.as_bytes());
129 buf.extend_from_slice(self.network_type().to_string().as_bytes());
130
131 let checksum = Crc::<u32>::new(&CRC_32_ISCSI).checksum(&buf);
132
133 format!("{checksum}")
134 }
135
136 /// Returns Candidate ID.
id(&self) -> String137 fn id(&self) -> String {
138 self.id.clone()
139 }
140
141 /// Returns candidate component.
component(&self) -> u16142 fn component(&self) -> u16 {
143 self.component.load(Ordering::SeqCst)
144 }
145
set_component(&self, component: u16)146 fn set_component(&self, component: u16) {
147 self.component.store(component, Ordering::SeqCst);
148 }
149
150 /// Returns a time indicating the last time this candidate was received.
last_received(&self) -> SystemTime151 fn last_received(&self) -> SystemTime {
152 UNIX_EPOCH.add(Duration::from_nanos(
153 self.last_received.load(Ordering::SeqCst),
154 ))
155 }
156
157 /// Returns a time indicating the last time this candidate was sent.
last_sent(&self) -> SystemTime158 fn last_sent(&self) -> SystemTime {
159 UNIX_EPOCH.add(Duration::from_nanos(self.last_sent.load(Ordering::SeqCst)))
160 }
161
162 /// Returns candidate NetworkType.
network_type(&self) -> NetworkType163 fn network_type(&self) -> NetworkType {
164 NetworkType::from(self.network_type.load(Ordering::SeqCst))
165 }
166
167 /// Returns Candidate Address.
address(&self) -> String168 fn address(&self) -> String {
169 self.address.clone()
170 }
171
172 /// Returns Candidate Port.
port(&self) -> u16173 fn port(&self) -> u16 {
174 self.port
175 }
176
177 /// Computes the priority for this ICE Candidate.
priority(&self) -> u32178 fn priority(&self) -> u32 {
179 if self.priority_override != 0 {
180 return self.priority_override;
181 }
182
183 // The local preference MUST be an integer from 0 (lowest preference) to
184 // 65535 (highest preference) inclusive. When there is only a single IP
185 // address, this value SHOULD be set to 65535. If there are multiple
186 // candidates for a particular component for a particular data stream
187 // that have the same type, the local preference MUST be unique for each
188 // one.
189 (1 << 24) * u32::from(self.candidate_type().preference())
190 + (1 << 8) * u32::from(self.local_preference())
191 + (256 - u32::from(self.component()))
192 }
193
194 /// Returns `Option<CandidateRelatedAddress>`.
related_address(&self) -> Option<CandidateRelatedAddress>195 fn related_address(&self) -> Option<CandidateRelatedAddress> {
196 self.related_address.as_ref().cloned()
197 }
198
199 /// Returns candidate type.
candidate_type(&self) -> CandidateType200 fn candidate_type(&self) -> CandidateType {
201 self.candidate_type
202 }
203
tcp_type(&self) -> TcpType204 fn tcp_type(&self) -> TcpType {
205 self.tcp_type
206 }
207
208 /// Returns the string representation of the ICECandidate.
marshal(&self) -> String209 fn marshal(&self) -> String {
210 let mut val = format!(
211 "{} {} {} {} {} {} typ {}",
212 self.foundation(),
213 self.component(),
214 self.network_type().network_short(),
215 self.priority(),
216 self.address(),
217 self.port(),
218 self.candidate_type()
219 );
220
221 if self.tcp_type != TcpType::Unspecified {
222 val += format!(" tcptype {}", self.tcp_type()).as_str();
223 }
224
225 if let Some(related_address) = self.related_address() {
226 val += format!(
227 " raddr {} rport {}",
228 related_address.address, related_address.port,
229 )
230 .as_str();
231 }
232
233 val
234 }
235
addr(&self) -> SocketAddr236 fn addr(&self) -> SocketAddr {
237 *self.resolved_addr.lock()
238 }
239
240 /// Stops the recvLoop.
close(&self) -> Result<()>241 async fn close(&self) -> Result<()> {
242 {
243 let mut closed_ch = self.closed_ch.lock().await;
244 if closed_ch.is_none() {
245 return Err(Error::ErrClosed);
246 }
247 closed_ch.take();
248 }
249
250 if let Some(relay_client) = &self.relay_client {
251 let _ = relay_client.close().await;
252 }
253
254 if let Some(conn) = &self.conn {
255 let _ = conn.close().await;
256 }
257
258 Ok(())
259 }
260
seen(&self, outbound: bool)261 fn seen(&self, outbound: bool) {
262 let d = SystemTime::now()
263 .duration_since(UNIX_EPOCH)
264 .unwrap_or_else(|_| Duration::from_secs(0));
265
266 if outbound {
267 self.set_last_sent(d);
268 } else {
269 self.set_last_received(d);
270 }
271 }
272
write_to(&self, raw: &[u8], dst: &(dyn Candidate + Send + Sync)) -> Result<usize>273 async fn write_to(&self, raw: &[u8], dst: &(dyn Candidate + Send + Sync)) -> Result<usize> {
274 let n = if let Some(conn) = &self.conn {
275 let addr = dst.addr();
276 conn.send_to(raw, addr).await?
277 } else {
278 0
279 };
280 self.seen(true);
281 Ok(n)
282 }
283
284 /// Used to compare two candidateBases.
equal(&self, other: &dyn Candidate) -> bool285 fn equal(&self, other: &dyn Candidate) -> bool {
286 self.network_type() == other.network_type()
287 && self.candidate_type() == other.candidate_type()
288 && self.address() == other.address()
289 && self.port() == other.port()
290 && self.tcp_type() == other.tcp_type()
291 && self.related_address() == other.related_address()
292 }
293
set_ip(&self, ip: &IpAddr) -> Result<()>294 fn set_ip(&self, ip: &IpAddr) -> Result<()> {
295 let network_type = determine_network_type(&self.network, ip)?;
296
297 self.network_type
298 .store(network_type as u8, Ordering::SeqCst);
299
300 let addr = create_addr(network_type, *ip, self.port);
301 *self.resolved_addr.lock() = addr;
302
303 Ok(())
304 }
305
get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>>306 fn get_conn(&self) -> Option<&Arc<dyn util::Conn + Send + Sync>> {
307 self.conn.as_ref()
308 }
309
get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>>310 fn get_closed_ch(&self) -> Arc<Mutex<Option<broadcast::Sender<()>>>> {
311 self.closed_ch.clone()
312 }
313 }
314
315 impl CandidateBase {
set_last_received(&self, d: Duration)316 pub fn set_last_received(&self, d: Duration) {
317 #[allow(clippy::cast_possible_truncation)]
318 self.last_received
319 .store(d.as_nanos() as u64, Ordering::SeqCst);
320 }
321
set_last_sent(&self, d: Duration)322 pub fn set_last_sent(&self, d: Duration) {
323 #[allow(clippy::cast_possible_truncation)]
324 self.last_sent.store(d.as_nanos() as u64, Ordering::SeqCst);
325 }
326
327 /// Returns the local preference for this candidate.
local_preference(&self) -> u16328 pub fn local_preference(&self) -> u16 {
329 if self.network_type().is_tcp() {
330 // RFC 6544, section 4.2
331 //
332 // In Section 4.1.2.1 of [RFC5245], a recommended formula for UDP ICE
333 // candidate prioritization is defined. For TCP candidates, the same
334 // formula and candidate type preferences SHOULD be used, and the
335 // RECOMMENDED type preferences for the new candidate types defined in
336 // this document (see Section 5) are 105 for NAT-assisted candidates and
337 // 75 for UDP-tunneled candidates.
338 //
339 // (...)
340 //
341 // With TCP candidates, the local preference part of the recommended
342 // priority formula is updated to also include the directionality
343 // (active, passive, or simultaneous-open) of the TCP connection. The
344 // RECOMMENDED local preference is then defined as:
345 //
346 // local preference = (2^13) * direction-pref + other-pref
347 //
348 // The direction-pref MUST be between 0 and 7 (both inclusive), with 7
349 // being the most preferred. The other-pref MUST be between 0 and 8191
350 // (both inclusive), with 8191 being the most preferred. It is
351 // RECOMMENDED that the host, UDP-tunneled, and relayed TCP candidates
352 // have the direction-pref assigned as follows: 6 for active, 4 for
353 // passive, and 2 for S-O. For the NAT-assisted and server reflexive
354 // candidates, the RECOMMENDED values are: 6 for S-O, 4 for active, and
355 // 2 for passive.
356 //
357 // (...)
358 //
359 // If any two candidates have the same type-preference and direction-
360 // pref, they MUST have a unique other-pref. With this specification,
361 // this usually only happens with multi-homed hosts, in which case
362 // other-pref is the preference for the particular IP address from which
363 // the candidate was obtained. When there is only a single IP address,
364 // this value SHOULD be set to the maximum allowed value (8191).
365 let other_pref: u16 = 8191;
366
367 let direction_pref: u16 = match self.candidate_type() {
368 CandidateType::Host | CandidateType::Relay => match self.tcp_type() {
369 TcpType::Active => 6,
370 TcpType::Passive => 4,
371 TcpType::SimultaneousOpen => 2,
372 TcpType::Unspecified => 0,
373 },
374 CandidateType::PeerReflexive | CandidateType::ServerReflexive => {
375 match self.tcp_type() {
376 TcpType::SimultaneousOpen => 6,
377 TcpType::Active => 4,
378 TcpType::Passive => 2,
379 TcpType::Unspecified => 0,
380 }
381 }
382 CandidateType::Unspecified => 0,
383 };
384
385 (1 << 13) * direction_pref + other_pref
386 } else {
387 DEFAULT_LOCAL_PREFERENCE
388 }
389 }
390 }
391
392 /// Creates a Candidate from its string representation.
unmarshal_candidate(raw: &str) -> Result<impl Candidate>393 pub fn unmarshal_candidate(raw: &str) -> Result<impl Candidate> {
394 let split: Vec<&str> = raw.split_whitespace().collect();
395 if split.len() < 8 {
396 return Err(Error::Other(format!(
397 "{:?} ({})",
398 Error::ErrAttributeTooShortIceCandidate,
399 split.len()
400 )));
401 }
402
403 // Foundation
404 let foundation = split[0].to_owned();
405
406 // Component
407 let component: u16 = split[1].parse()?;
408
409 // Network
410 let network = split[2].to_owned();
411
412 // Priority
413 let priority: u32 = split[3].parse()?;
414
415 // Address
416 let address = split[4].to_owned();
417
418 // Port
419 let port: u16 = split[5].parse()?;
420
421 let typ = split[7];
422
423 let mut rel_addr = String::new();
424 let mut rel_port = 0;
425 let mut tcp_type = TcpType::Unspecified;
426
427 if split.len() > 8 {
428 let split2 = &split[8..];
429
430 if split2[0] == "raddr" {
431 if split2.len() < 4 {
432 return Err(Error::Other(format!(
433 "{:?}: incorrect length",
434 Error::ErrParseRelatedAddr
435 )));
436 }
437
438 // RelatedAddress
439 rel_addr = split2[1].to_owned();
440
441 // RelatedPort
442 rel_port = split2[3].parse()?;
443 } else if split2[0] == "tcptype" {
444 if split2.len() < 2 {
445 return Err(Error::Other(format!(
446 "{:?}: incorrect length",
447 Error::ErrParseType
448 )));
449 }
450
451 tcp_type = TcpType::from(split2[1]);
452 }
453 }
454
455 match typ {
456 "host" => {
457 let config = CandidateHostConfig {
458 base_config: CandidateBaseConfig {
459 network,
460 address,
461 port,
462 component,
463 priority,
464 foundation,
465 ..CandidateBaseConfig::default()
466 },
467 tcp_type,
468 };
469 config.new_candidate_host()
470 }
471 "srflx" => {
472 let config = CandidateServerReflexiveConfig {
473 base_config: CandidateBaseConfig {
474 network,
475 address,
476 port,
477 component,
478 priority,
479 foundation,
480 ..CandidateBaseConfig::default()
481 },
482 rel_addr,
483 rel_port,
484 };
485 config.new_candidate_server_reflexive()
486 }
487 "prflx" => {
488 let config = CandidatePeerReflexiveConfig {
489 base_config: CandidateBaseConfig {
490 network,
491 address,
492 port,
493 component,
494 priority,
495 foundation,
496 ..CandidateBaseConfig::default()
497 },
498 rel_addr,
499 rel_port,
500 };
501
502 config.new_candidate_peer_reflexive()
503 }
504 "relay" => {
505 let config = CandidateRelayConfig {
506 base_config: CandidateBaseConfig {
507 network,
508 address,
509 port,
510 component,
511 priority,
512 foundation,
513 ..CandidateBaseConfig::default()
514 },
515 rel_addr,
516 rel_port,
517 ..CandidateRelayConfig::default()
518 };
519 config.new_candidate_relay()
520 }
521 _ => Err(Error::Other(format!(
522 "{:?} ({})",
523 Error::ErrUnknownCandidateType,
524 typ
525 ))),
526 }
527 }
528