1 use super::agent_vnet_test::*;
2 use super::*;
3 use crate::candidate::candidate_base::*;
4 use crate::candidate::candidate_host::*;
5 use crate::candidate::candidate_peer_reflexive::*;
6 use crate::candidate::candidate_relay::*;
7 use crate::candidate::candidate_server_reflexive::*;
8 use crate::control::AttrControlling;
9 use crate::priority::PriorityAttr;
10 use crate::use_candidate::UseCandidateAttr;
11
12 use crate::agent::agent_transport_test::pipe;
13 use async_trait::async_trait;
14 use std::net::Ipv4Addr;
15 use std::ops::Sub;
16 use std::str::FromStr;
17 use stun::message::*;
18 use stun::textattrs::Username;
19 use util::{vnet::*, Conn};
20 use waitgroup::{WaitGroup, Worker};
21
22 #[tokio::test]
test_pair_search() -> Result<()>23 async fn test_pair_search() -> Result<()> {
24 let config = AgentConfig::default();
25 let a = Agent::new(config).await?;
26
27 {
28 {
29 let checklist = a.internal.agent_conn.checklist.lock().await;
30 assert!(
31 checklist.is_empty(),
32 "TestPairSearch is only a valid test if a.validPairs is empty on construction"
33 );
34 }
35
36 let cp = a
37 .internal
38 .agent_conn
39 .get_best_available_candidate_pair()
40 .await;
41 assert!(cp.is_none(), "No Candidate pairs should exist");
42 }
43
44 a.close().await?;
45
46 Ok(())
47 }
48
49 #[tokio::test]
test_pair_priority() -> Result<()>50 async fn test_pair_priority() -> Result<()> {
51 let a = Agent::new(AgentConfig::default()).await?;
52
53 let host_config = CandidateHostConfig {
54 base_config: CandidateBaseConfig {
55 network: "udp".to_owned(),
56 address: "192.168.1.1".to_owned(),
57 port: 19216,
58 component: 1,
59 ..Default::default()
60 },
61 ..Default::default()
62 };
63 let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
64
65 let relay_config = CandidateRelayConfig {
66 base_config: CandidateBaseConfig {
67 network: "udp".to_owned(),
68 address: "1.2.3.4".to_owned(),
69 port: 12340,
70 component: 1,
71 ..Default::default()
72 },
73 rel_addr: "4.3.2.1".to_owned(),
74 rel_port: 43210,
75 ..Default::default()
76 };
77
78 let relay_remote = relay_config.new_candidate_relay()?;
79
80 let srflx_config = CandidateServerReflexiveConfig {
81 base_config: CandidateBaseConfig {
82 network: "udp".to_owned(),
83 address: "10.10.10.2".to_owned(),
84 port: 19218,
85 component: 1,
86 ..Default::default()
87 },
88 rel_addr: "4.3.2.1".to_owned(),
89 rel_port: 43212,
90 };
91
92 let srflx_remote = srflx_config.new_candidate_server_reflexive()?;
93
94 let prflx_config = CandidatePeerReflexiveConfig {
95 base_config: CandidateBaseConfig {
96 network: "udp".to_owned(),
97 address: "10.10.10.2".to_owned(),
98 port: 19217,
99 component: 1,
100 ..Default::default()
101 },
102 rel_addr: "4.3.2.1".to_owned(),
103 rel_port: 43211,
104 };
105
106 let prflx_remote = prflx_config.new_candidate_peer_reflexive()?;
107
108 let host_config = CandidateHostConfig {
109 base_config: CandidateBaseConfig {
110 network: "udp".to_owned(),
111 address: "1.2.3.5".to_owned(),
112 port: 12350,
113 component: 1,
114 ..Default::default()
115 },
116 ..Default::default()
117 };
118 let host_remote = host_config.new_candidate_host()?;
119
120 let remotes: Vec<Arc<dyn Candidate + Send + Sync>> = vec![
121 Arc::new(relay_remote),
122 Arc::new(srflx_remote),
123 Arc::new(prflx_remote),
124 Arc::new(host_remote),
125 ];
126
127 {
128 for remote in remotes {
129 if a.internal.find_pair(&host_local, &remote).await.is_none() {
130 a.internal
131 .add_pair(host_local.clone(), remote.clone())
132 .await;
133 }
134
135 if let Some(p) = a.internal.find_pair(&host_local, &remote).await {
136 p.state
137 .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
138 }
139
140 if let Some(best_pair) = a
141 .internal
142 .agent_conn
143 .get_best_available_candidate_pair()
144 .await
145 {
146 assert_eq!(
147 best_pair.to_string(),
148 CandidatePair {
149 remote: remote.clone(),
150 local: host_local.clone(),
151 ..Default::default()
152 }
153 .to_string(),
154 "Unexpected bestPair {best_pair} (expected remote: {remote})",
155 );
156 } else {
157 panic!("expected Some, but got None");
158 }
159 }
160 }
161
162 a.close().await?;
163 Ok(())
164 }
165
166 #[tokio::test]
test_agent_get_stats() -> Result<()>167 async fn test_agent_get_stats() -> Result<()> {
168 let (conn_a, conn_b, agent_a, agent_b) = pipe(None, None).await?;
169 assert_eq!(agent_a.get_bytes_received(), 0);
170 assert_eq!(agent_a.get_bytes_sent(), 0);
171 assert_eq!(agent_b.get_bytes_received(), 0);
172 assert_eq!(agent_b.get_bytes_sent(), 0);
173
174 let _na = conn_a.send(&[0u8; 10]).await?;
175 let mut buf = vec![0u8; 10];
176 let _nb = conn_b.recv(&mut buf).await?;
177
178 assert_eq!(agent_a.get_bytes_received(), 0);
179 assert_eq!(agent_a.get_bytes_sent(), 10);
180
181 assert_eq!(agent_b.get_bytes_received(), 10);
182 assert_eq!(agent_b.get_bytes_sent(), 0);
183
184 Ok(())
185 }
186
187 #[tokio::test]
test_on_selected_candidate_pair_change() -> Result<()>188 async fn test_on_selected_candidate_pair_change() -> Result<()> {
189 let a = Agent::new(AgentConfig::default()).await?;
190 let (callback_called_tx, mut callback_called_rx) = mpsc::channel::<()>(1);
191 let callback_called_tx = Arc::new(Mutex::new(Some(callback_called_tx)));
192 let cb: OnSelectedCandidatePairChangeHdlrFn = Box::new(move |_, _| {
193 let callback_called_tx_clone = Arc::clone(&callback_called_tx);
194 Box::pin(async move {
195 let mut tx = callback_called_tx_clone.lock().await;
196 tx.take();
197 })
198 });
199 a.on_selected_candidate_pair_change(cb);
200
201 let host_config = CandidateHostConfig {
202 base_config: CandidateBaseConfig {
203 network: "udp".to_owned(),
204 address: "192.168.1.1".to_owned(),
205 port: 19216,
206 component: 1,
207 ..Default::default()
208 },
209 ..Default::default()
210 };
211 let host_local = host_config.new_candidate_host()?;
212
213 let relay_config = CandidateRelayConfig {
214 base_config: CandidateBaseConfig {
215 network: "udp".to_owned(),
216 address: "1.2.3.4".to_owned(),
217 port: 12340,
218 component: 1,
219 ..Default::default()
220 },
221 rel_addr: "4.3.2.1".to_owned(),
222 rel_port: 43210,
223 ..Default::default()
224 };
225 let relay_remote = relay_config.new_candidate_relay()?;
226
227 // select the pair
228 let p = Arc::new(CandidatePair::new(
229 Arc::new(host_local),
230 Arc::new(relay_remote),
231 false,
232 ));
233 a.internal.set_selected_pair(Some(p)).await;
234
235 // ensure that the callback fired on setting the pair
236 let _ = callback_called_rx.recv().await;
237
238 a.close().await?;
239 Ok(())
240 }
241
242 #[tokio::test]
test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()>243 async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> {
244 let a = Agent::new(AgentConfig::default()).await?;
245
246 let host_config = CandidateHostConfig {
247 base_config: CandidateBaseConfig {
248 network: "udp".to_owned(),
249 address: "192.168.0.2".to_owned(),
250 port: 777,
251 component: 1,
252 conn: Some(Arc::new(MockConn {})),
253 ..Default::default()
254 },
255 ..Default::default()
256 };
257
258 let local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
259 let remote = SocketAddr::from_str("172.17.0.3:999")?;
260
261 let (username, local_pwd, tie_breaker) = {
262 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
263 (
264 ufrag_pwd.local_ufrag.to_owned() + ":" + ufrag_pwd.remote_ufrag.as_str(),
265 ufrag_pwd.local_pwd.clone(),
266 a.internal.tie_breaker.load(Ordering::SeqCst),
267 )
268 };
269
270 let mut msg = Message::new();
271 msg.build(&[
272 Box::new(BINDING_REQUEST),
273 Box::new(TransactionId::new()),
274 Box::new(Username::new(ATTR_USERNAME, username)),
275 Box::new(UseCandidateAttr::new()),
276 Box::new(AttrControlling(tie_breaker)),
277 Box::new(PriorityAttr(local.priority())),
278 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
279 Box::new(FINGERPRINT),
280 ])?;
281
282 {
283 a.internal.handle_inbound(&mut msg, &local, remote).await;
284
285 let remote_candidates = a.internal.remote_candidates.lock().await;
286 // length of remote candidate list must be one now
287 assert_eq!(
288 remote_candidates.len(),
289 1,
290 "failed to add a network type to the remote candidate list"
291 );
292
293 // length of remote candidate list for a network type must be 1
294 if let Some(cands) = remote_candidates.get(&local.network_type()) {
295 assert_eq!(
296 cands.len(),
297 1,
298 "failed to add prflx candidate to remote candidate list"
299 );
300
301 let c = &cands[0];
302
303 assert_eq!(
304 c.candidate_type(),
305 CandidateType::PeerReflexive,
306 "candidate type must be prflx"
307 );
308
309 assert_eq!(c.address(), "172.17.0.3", "IP address mismatch");
310
311 assert_eq!(c.port(), 999, "Port number mismatch");
312 } else {
313 panic!(
314 "expected non-empty remote candidate for network type {}",
315 local.network_type()
316 );
317 }
318 }
319
320 a.close().await?;
321 Ok(())
322 }
323
324 #[tokio::test]
test_handle_peer_reflexive_unknown_remote() -> Result<()>325 async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> {
326 let a = Agent::new(AgentConfig::default()).await?;
327
328 let mut tid = TransactionId::default();
329 tid.0[..3].copy_from_slice("ABC".as_bytes());
330
331 let remote_pwd = {
332 {
333 let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await;
334 *pending_binding_requests = vec![BindingRequest {
335 timestamp: Instant::now(),
336 transaction_id: tid,
337 destination: SocketAddr::from_str("0.0.0.0:0")?,
338 is_use_candidate: false,
339 }];
340 }
341 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
342 ufrag_pwd.remote_pwd.clone()
343 };
344
345 let host_config = CandidateHostConfig {
346 base_config: CandidateBaseConfig {
347 network: "udp".to_owned(),
348 address: "192.168.0.2".to_owned(),
349 port: 777,
350 component: 1,
351 conn: Some(Arc::new(MockConn {})),
352 ..Default::default()
353 },
354 ..Default::default()
355 };
356
357 let local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
358 let remote = SocketAddr::from_str("172.17.0.3:999")?;
359
360 let mut msg = Message::new();
361 msg.build(&[
362 Box::new(BINDING_SUCCESS),
363 Box::new(tid),
364 Box::new(MessageIntegrity::new_short_term_integrity(remote_pwd)),
365 Box::new(FINGERPRINT),
366 ])?;
367
368 {
369 a.internal.handle_inbound(&mut msg, &local, remote).await;
370
371 let remote_candidates = a.internal.remote_candidates.lock().await;
372 assert_eq!(
373 remote_candidates.len(),
374 0,
375 "unknown remote was able to create a candidate"
376 );
377 }
378
379 a.close().await?;
380 Ok(())
381 }
382
383 //use std::io::Write;
384
385 // Assert that Agent on startup sends message, and doesn't wait for connectivityTicker to fire
386 #[tokio::test]
test_connectivity_on_startup() -> Result<()>387 async fn test_connectivity_on_startup() -> Result<()> {
388 /*env_logger::Builder::new()
389 .format(|buf, record| {
390 writeln!(
391 buf,
392 "{}:{} [{}] {} - {}",
393 record.file().unwrap_or("unknown"),
394 record.line().unwrap_or(0),
395 record.level(),
396 chrono::Local::now().format("%H:%M:%S.%6f"),
397 record.args()
398 )
399 })
400 .filter(None, log::LevelFilter::Trace)
401 .init();*/
402
403 // Create a network with two interfaces
404 let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
405 cidr: "0.0.0.0/0".to_owned(),
406 ..Default::default()
407 })?));
408
409 let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
410 static_ips: vec!["192.168.0.1".to_owned()],
411 ..Default::default()
412 })));
413 let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
414 static_ips: vec!["192.168.0.2".to_owned()],
415 ..Default::default()
416 })));
417
418 connect_net2router(&net0, &wan).await?;
419 connect_net2router(&net1, &wan).await?;
420 start_router(&wan).await?;
421
422 let (a_notifier, mut a_connected) = on_connected();
423 let (b_notifier, mut b_connected) = on_connected();
424
425 let keepalive_interval = Some(Duration::from_secs(3600)); //time.Hour
426 let check_interval = Duration::from_secs(3600); //time.Hour
427 let cfg0 = AgentConfig {
428 network_types: supported_network_types(),
429 multicast_dns_mode: MulticastDnsMode::Disabled,
430 net: Some(net0),
431
432 keepalive_interval,
433 check_interval,
434 ..Default::default()
435 };
436
437 let a_agent = Arc::new(Agent::new(cfg0).await?);
438 a_agent.on_connection_state_change(a_notifier);
439
440 let cfg1 = AgentConfig {
441 network_types: supported_network_types(),
442 multicast_dns_mode: MulticastDnsMode::Disabled,
443 net: Some(net1),
444
445 keepalive_interval,
446 check_interval,
447 ..Default::default()
448 };
449
450 let b_agent = Arc::new(Agent::new(cfg1).await?);
451 b_agent.on_connection_state_change(b_notifier);
452
453 // Manual signaling
454 let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials().await;
455 let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials().await;
456
457 gather_and_exchange_candidates(&a_agent, &b_agent).await?;
458
459 let (accepted_tx, mut accepted_rx) = mpsc::channel::<()>(1);
460 let (accepting_tx, mut accepting_rx) = mpsc::channel::<()>(1);
461 let (_a_cancel_tx, a_cancel_rx) = mpsc::channel(1);
462 let (_b_cancel_tx, b_cancel_rx) = mpsc::channel(1);
463
464 let accepting_tx = Arc::new(Mutex::new(Some(accepting_tx)));
465 a_agent.on_connection_state_change(Box::new(move |s: ConnectionState| {
466 let accepted_tx_clone = Arc::clone(&accepting_tx);
467 Box::pin(async move {
468 if s == ConnectionState::Checking {
469 let mut tx = accepted_tx_clone.lock().await;
470 tx.take();
471 }
472 })
473 }));
474
475 tokio::spawn(async move {
476 let result = a_agent.accept(a_cancel_rx, b_ufrag, b_pwd).await;
477 assert!(result.is_ok(), "agent accept expected OK");
478 drop(accepted_tx);
479 });
480
481 let _ = accepting_rx.recv().await;
482
483 let _ = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd).await?;
484
485 // Ensure accepted
486 let _ = accepted_rx.recv().await;
487
488 // Ensure pair selected
489 // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
490 let _ = a_connected.recv().await;
491 let _ = b_connected.recv().await;
492
493 {
494 let mut w = wan.lock().await;
495 w.stop().await?;
496 }
497
498 Ok(())
499 }
500
501 #[tokio::test]
test_connectivity_lite() -> Result<()>502 async fn test_connectivity_lite() -> Result<()> {
503 /*env_logger::Builder::new()
504 .format(|buf, record| {
505 writeln!(
506 buf,
507 "{}:{} [{}] {} - {}",
508 record.file().unwrap_or("unknown"),
509 record.line().unwrap_or(0),
510 record.level(),
511 chrono::Local::now().format("%H:%M:%S.%6f"),
512 record.args()
513 )
514 })
515 .filter(None, log::LevelFilter::Trace)
516 .init();*/
517
518 let stun_server_url = Url {
519 scheme: SchemeType::Stun,
520 host: "1.2.3.4".to_owned(),
521 port: 3478,
522 proto: ProtoType::Udp,
523 ..Default::default()
524 };
525
526 let nat_type = nat::NatType {
527 mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
528 filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
529 ..Default::default()
530 };
531
532 let v = build_vnet(nat_type, nat_type).await?;
533
534 let (a_notifier, mut a_connected) = on_connected();
535 let (b_notifier, mut b_connected) = on_connected();
536
537 let cfg0 = AgentConfig {
538 urls: vec![stun_server_url],
539 network_types: supported_network_types(),
540 multicast_dns_mode: MulticastDnsMode::Disabled,
541 net: Some(Arc::clone(&v.net0)),
542 ..Default::default()
543 };
544
545 let a_agent = Arc::new(Agent::new(cfg0).await?);
546 a_agent.on_connection_state_change(a_notifier);
547
548 let cfg1 = AgentConfig {
549 urls: vec![],
550 lite: true,
551 candidate_types: vec![CandidateType::Host],
552 network_types: supported_network_types(),
553 multicast_dns_mode: MulticastDnsMode::Disabled,
554 net: Some(Arc::clone(&v.net1)),
555 ..Default::default()
556 };
557
558 let b_agent = Arc::new(Agent::new(cfg1).await?);
559 b_agent.on_connection_state_change(b_notifier);
560
561 let _ = connect_with_vnet(&a_agent, &b_agent).await?;
562
563 // Ensure pair selected
564 // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
565 let _ = a_connected.recv().await;
566 let _ = b_connected.recv().await;
567
568 v.close().await?;
569
570 Ok(())
571 }
572
573 struct MockPacketConn;
574
575 #[async_trait]
576 impl Conn for MockPacketConn {
connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error>577 async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> {
578 Ok(())
579 }
580
recv(&self, _buf: &mut [u8]) -> std::result::Result<usize, util::Error>581 async fn recv(&self, _buf: &mut [u8]) -> std::result::Result<usize, util::Error> {
582 Ok(0)
583 }
584
recv_from( &self, _buf: &mut [u8], ) -> std::result::Result<(usize, SocketAddr), util::Error>585 async fn recv_from(
586 &self,
587 _buf: &mut [u8],
588 ) -> std::result::Result<(usize, SocketAddr), util::Error> {
589 Ok((0, SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)))
590 }
591
send(&self, _buf: &[u8]) -> std::result::Result<usize, util::Error>592 async fn send(&self, _buf: &[u8]) -> std::result::Result<usize, util::Error> {
593 Ok(0)
594 }
595
send_to( &self, _buf: &[u8], _target: SocketAddr, ) -> std::result::Result<usize, util::Error>596 async fn send_to(
597 &self,
598 _buf: &[u8],
599 _target: SocketAddr,
600 ) -> std::result::Result<usize, util::Error> {
601 Ok(0)
602 }
603
local_addr(&self) -> std::result::Result<SocketAddr, util::Error>604 fn local_addr(&self) -> std::result::Result<SocketAddr, util::Error> {
605 Ok(SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0))
606 }
607
remote_addr(&self) -> Option<SocketAddr>608 fn remote_addr(&self) -> Option<SocketAddr> {
609 None
610 }
611
close(&self) -> std::result::Result<(), util::Error>612 async fn close(&self) -> std::result::Result<(), util::Error> {
613 Ok(())
614 }
615 }
616
build_msg(c: MessageClass, username: String, key: String) -> Result<Message>617 fn build_msg(c: MessageClass, username: String, key: String) -> Result<Message> {
618 let mut msg = Message::new();
619 msg.build(&[
620 Box::new(MessageType::new(METHOD_BINDING, c)),
621 Box::new(TransactionId::new()),
622 Box::new(Username::new(ATTR_USERNAME, username)),
623 Box::new(MessageIntegrity::new_short_term_integrity(key)),
624 Box::new(FINGERPRINT),
625 ])?;
626 Ok(msg)
627 }
628
629 #[tokio::test]
test_inbound_validity() -> Result<()>630 async fn test_inbound_validity() -> Result<()> {
631 /*env_logger::Builder::new()
632 .format(|buf, record| {
633 writeln!(
634 buf,
635 "{}:{} [{}] {} - {}",
636 record.file().unwrap_or("unknown"),
637 record.line().unwrap_or(0),
638 record.level(),
639 chrono::Local::now().format("%H:%M:%S.%6f"),
640 record.args()
641 )
642 })
643 .filter(None, log::LevelFilter::Trace)
644 .init();*/
645
646 let remote = SocketAddr::from_str("172.17.0.3:999")?;
647 let local: Arc<dyn Candidate + Send + Sync> = Arc::new(
648 CandidateHostConfig {
649 base_config: CandidateBaseConfig {
650 network: "udp".to_owned(),
651 address: "192.168.0.2".to_owned(),
652 port: 777,
653 component: 1,
654 conn: Some(Arc::new(MockPacketConn {})),
655 ..Default::default()
656 },
657 ..Default::default()
658 }
659 .new_candidate_host()?,
660 );
661
662 //"Invalid Binding requests should be discarded"
663 {
664 let a = Agent::new(AgentConfig::default()).await?;
665
666 {
667 let local_pwd = {
668 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
669 ufrag_pwd.local_pwd.clone()
670 };
671 a.internal
672 .handle_inbound(
673 &mut build_msg(CLASS_REQUEST, "invalid".to_owned(), local_pwd)?,
674 &local,
675 remote,
676 )
677 .await;
678 {
679 let remote_candidates = a.internal.remote_candidates.lock().await;
680 assert_ne!(
681 remote_candidates.len(),
682 1,
683 "Binding with invalid Username was able to create prflx candidate"
684 );
685 }
686
687 let username = {
688 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
689 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
690 };
691 a.internal
692 .handle_inbound(
693 &mut build_msg(CLASS_REQUEST, username, "Invalid".to_owned())?,
694 &local,
695 remote,
696 )
697 .await;
698 {
699 let remote_candidates = a.internal.remote_candidates.lock().await;
700 assert_ne!(
701 remote_candidates.len(),
702 1,
703 "Binding with invalid MessageIntegrity was able to create prflx candidate"
704 );
705 }
706 }
707
708 a.close().await?;
709 }
710
711 //"Invalid Binding success responses should be discarded"
712 {
713 let a = Agent::new(AgentConfig::default()).await?;
714
715 {
716 let username = {
717 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
718 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
719 };
720 a.internal
721 .handle_inbound(
722 &mut build_msg(CLASS_SUCCESS_RESPONSE, username, "Invalid".to_owned())?,
723 &local,
724 remote,
725 )
726 .await;
727 {
728 let remote_candidates = a.internal.remote_candidates.lock().await;
729 assert_ne!(
730 remote_candidates.len(),
731 1,
732 "Binding with invalid Username was able to create prflx candidate"
733 );
734 }
735 }
736
737 a.close().await?;
738 }
739
740 //"Discard non-binding messages"
741 {
742 let a = Agent::new(AgentConfig::default()).await?;
743
744 {
745 let username = {
746 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
747 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
748 };
749 a.internal
750 .handle_inbound(
751 &mut build_msg(CLASS_ERROR_RESPONSE, username, "Invalid".to_owned())?,
752 &local,
753 remote,
754 )
755 .await;
756 let remote_candidates = a.internal.remote_candidates.lock().await;
757 assert_ne!(
758 remote_candidates.len(),
759 1,
760 "non-binding message was able to create prflxRemote"
761 );
762 }
763
764 a.close().await?;
765 }
766
767 //"Valid bind request"
768 {
769 let a = Agent::new(AgentConfig::default()).await?;
770
771 {
772 let (username, local_pwd) = {
773 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
774 (
775 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag),
776 ufrag_pwd.local_pwd.clone(),
777 )
778 };
779 a.internal
780 .handle_inbound(
781 &mut build_msg(CLASS_REQUEST, username, local_pwd)?,
782 &local,
783 remote,
784 )
785 .await;
786 let remote_candidates = a.internal.remote_candidates.lock().await;
787 assert_eq!(
788 remote_candidates.len(),
789 1,
790 "Binding with valid values was unable to create prflx candidate"
791 );
792 }
793
794 a.close().await?;
795 }
796
797 //"Valid bind without fingerprint"
798 {
799 let a = Agent::new(AgentConfig::default()).await?;
800
801 {
802 let (username, local_pwd) = {
803 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
804 (
805 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag),
806 ufrag_pwd.local_pwd.clone(),
807 )
808 };
809
810 let mut msg = Message::new();
811 msg.build(&[
812 Box::new(BINDING_REQUEST),
813 Box::new(TransactionId::new()),
814 Box::new(Username::new(ATTR_USERNAME, username)),
815 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
816 ])?;
817
818 a.internal.handle_inbound(&mut msg, &local, remote).await;
819 let remote_candidates = a.internal.remote_candidates.lock().await;
820 assert_eq!(
821 remote_candidates.len(),
822 1,
823 "Binding with valid values (but no fingerprint) was unable to create prflx candidate"
824 );
825 }
826
827 a.close().await?;
828 }
829
830 //"Success with invalid TransactionID"
831 {
832 let a = Agent::new(AgentConfig::default()).await?;
833
834 {
835 let remote = SocketAddr::from_str("172.17.0.3:999")?;
836
837 let mut t_id = TransactionId::default();
838 t_id.0[..3].copy_from_slice(b"ABC");
839
840 let remote_pwd = {
841 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
842 ufrag_pwd.remote_pwd.clone()
843 };
844
845 let mut msg = Message::new();
846 msg.build(&[
847 Box::new(BINDING_SUCCESS),
848 Box::new(t_id),
849 Box::new(MessageIntegrity::new_short_term_integrity(remote_pwd)),
850 Box::new(FINGERPRINT),
851 ])?;
852
853 a.internal.handle_inbound(&mut msg, &local, remote).await;
854
855 {
856 let remote_candidates = a.internal.remote_candidates.lock().await;
857 assert_eq!(
858 remote_candidates.len(),
859 0,
860 "unknown remote was able to create a candidate"
861 );
862 }
863 }
864
865 a.close().await?;
866 }
867
868 Ok(())
869 }
870
871 #[tokio::test]
test_invalid_agent_starts() -> Result<()>872 async fn test_invalid_agent_starts() -> Result<()> {
873 let a = Agent::new(AgentConfig::default()).await?;
874
875 let (_cancel_tx1, cancel_rx1) = mpsc::channel(1);
876 let result = a.dial(cancel_rx1, "".to_owned(), "bar".to_owned()).await;
877 assert!(result.is_err());
878 if let Err(err) = result {
879 assert_eq!(Error::ErrRemoteUfragEmpty, err);
880 }
881
882 let (_cancel_tx2, cancel_rx2) = mpsc::channel(1);
883 let result = a.dial(cancel_rx2, "foo".to_owned(), "".to_owned()).await;
884 assert!(result.is_err());
885 if let Err(err) = result {
886 assert_eq!(Error::ErrRemotePwdEmpty, err);
887 }
888
889 let (cancel_tx3, cancel_rx3) = mpsc::channel(1);
890 tokio::spawn(async move {
891 tokio::time::sleep(Duration::from_millis(100)).await;
892 drop(cancel_tx3);
893 });
894
895 let result = a.dial(cancel_rx3, "foo".to_owned(), "bar".to_owned()).await;
896 assert!(result.is_err());
897 if let Err(err) = result {
898 assert_eq!(Error::ErrCanceledByCaller, err);
899 }
900
901 let (_cancel_tx4, cancel_rx4) = mpsc::channel(1);
902 let result = a.dial(cancel_rx4, "foo".to_owned(), "bar".to_owned()).await;
903 assert!(result.is_err());
904 if let Err(err) = result {
905 assert_eq!(Error::ErrMultipleStart, err);
906 }
907
908 a.close().await?;
909
910 Ok(())
911 }
912
913 //use std::io::Write;
914
915 // Assert that Agent emits Connecting/Connected/Disconnected/Failed/Closed messages
916 #[tokio::test]
test_connection_state_callback() -> Result<()>917 async fn test_connection_state_callback() -> Result<()> {
918 /*env_logger::Builder::new()
919 .format(|buf, record| {
920 writeln!(
921 buf,
922 "{}:{} [{}] {} - {}",
923 record.file().unwrap_or("unknown"),
924 record.line().unwrap_or(0),
925 record.level(),
926 chrono::Local::now().format("%H:%M:%S.%6f"),
927 record.args()
928 )
929 })
930 .filter(None, log::LevelFilter::Trace)
931 .init();*/
932
933 let disconnected_duration = Duration::from_secs(1);
934 let failed_duration = Duration::from_secs(1);
935 let keepalive_interval = Duration::from_secs(0);
936
937 let cfg0 = AgentConfig {
938 urls: vec![],
939 network_types: supported_network_types(),
940 disconnected_timeout: Some(disconnected_duration),
941 failed_timeout: Some(failed_duration),
942 keepalive_interval: Some(keepalive_interval),
943 ..Default::default()
944 };
945
946 let cfg1 = AgentConfig {
947 urls: vec![],
948 network_types: supported_network_types(),
949 disconnected_timeout: Some(disconnected_duration),
950 failed_timeout: Some(failed_duration),
951 keepalive_interval: Some(keepalive_interval),
952 ..Default::default()
953 };
954
955 let a_agent = Arc::new(Agent::new(cfg0).await?);
956 let b_agent = Arc::new(Agent::new(cfg1).await?);
957
958 let (is_checking_tx, mut is_checking_rx) = mpsc::channel::<()>(1);
959 let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1);
960 let (is_disconnected_tx, mut is_disconnected_rx) = mpsc::channel::<()>(1);
961 let (is_failed_tx, mut is_failed_rx) = mpsc::channel::<()>(1);
962 let (is_closed_tx, mut is_closed_rx) = mpsc::channel::<()>(1);
963
964 let is_checking_tx = Arc::new(Mutex::new(Some(is_checking_tx)));
965 let is_connected_tx = Arc::new(Mutex::new(Some(is_connected_tx)));
966 let is_disconnected_tx = Arc::new(Mutex::new(Some(is_disconnected_tx)));
967 let is_failed_tx = Arc::new(Mutex::new(Some(is_failed_tx)));
968 let is_closed_tx = Arc::new(Mutex::new(Some(is_closed_tx)));
969
970 a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
971 let is_checking_tx_clone = Arc::clone(&is_checking_tx);
972 let is_connected_tx_clone = Arc::clone(&is_connected_tx);
973 let is_disconnected_tx_clone = Arc::clone(&is_disconnected_tx);
974 let is_failed_tx_clone = Arc::clone(&is_failed_tx);
975 let is_closed_tx_clone = Arc::clone(&is_closed_tx);
976 Box::pin(async move {
977 match c {
978 ConnectionState::Checking => {
979 log::debug!("drop is_checking_tx");
980 let mut tx = is_checking_tx_clone.lock().await;
981 tx.take();
982 }
983 ConnectionState::Connected => {
984 log::debug!("drop is_connected_tx");
985 let mut tx = is_connected_tx_clone.lock().await;
986 tx.take();
987 }
988 ConnectionState::Disconnected => {
989 log::debug!("drop is_disconnected_tx");
990 let mut tx = is_disconnected_tx_clone.lock().await;
991 tx.take();
992 }
993 ConnectionState::Failed => {
994 log::debug!("drop is_failed_tx");
995 let mut tx = is_failed_tx_clone.lock().await;
996 tx.take();
997 }
998 ConnectionState::Closed => {
999 log::debug!("drop is_closed_tx");
1000 let mut tx = is_closed_tx_clone.lock().await;
1001 tx.take();
1002 }
1003 _ => {}
1004 };
1005 })
1006 }));
1007
1008 connect_with_vnet(&a_agent, &b_agent).await?;
1009
1010 log::debug!("wait is_checking_tx");
1011 let _ = is_checking_rx.recv().await;
1012 log::debug!("wait is_connected_rx");
1013 let _ = is_connected_rx.recv().await;
1014 log::debug!("wait is_disconnected_rx");
1015 let _ = is_disconnected_rx.recv().await;
1016 log::debug!("wait is_failed_rx");
1017 let _ = is_failed_rx.recv().await;
1018
1019 a_agent.close().await?;
1020 b_agent.close().await?;
1021
1022 log::debug!("wait is_closed_rx");
1023 let _ = is_closed_rx.recv().await;
1024
1025 Ok(())
1026 }
1027
1028 #[tokio::test]
test_invalid_gather() -> Result<()>1029 async fn test_invalid_gather() -> Result<()> {
1030 //"Gather with no OnCandidate should error"
1031 let a = Agent::new(AgentConfig::default()).await?;
1032
1033 if let Err(err) = a.gather_candidates() {
1034 assert_eq!(
1035 Error::ErrNoOnCandidateHandler,
1036 err,
1037 "trickle GatherCandidates succeeded without OnCandidate"
1038 );
1039 }
1040
1041 a.close().await?;
1042
1043 Ok(())
1044 }
1045
1046 #[tokio::test]
test_candidate_pair_stats() -> Result<()>1047 async fn test_candidate_pair_stats() -> Result<()> {
1048 let a = Agent::new(AgentConfig::default()).await?;
1049
1050 let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1051 CandidateHostConfig {
1052 base_config: CandidateBaseConfig {
1053 network: "udp".to_owned(),
1054 address: "192.168.1.1".to_owned(),
1055 port: 19216,
1056 component: 1,
1057 ..Default::default()
1058 },
1059 ..Default::default()
1060 }
1061 .new_candidate_host()?,
1062 );
1063
1064 let relay_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1065 CandidateRelayConfig {
1066 base_config: CandidateBaseConfig {
1067 network: "udp".to_owned(),
1068 address: "1.2.3.4".to_owned(),
1069 port: 2340,
1070 component: 1,
1071 ..Default::default()
1072 },
1073 rel_addr: "4.3.2.1".to_owned(),
1074 rel_port: 43210,
1075 ..Default::default()
1076 }
1077 .new_candidate_relay()?,
1078 );
1079
1080 let srflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1081 CandidateServerReflexiveConfig {
1082 base_config: CandidateBaseConfig {
1083 network: "udp".to_owned(),
1084 address: "10.10.10.2".to_owned(),
1085 port: 19218,
1086 component: 1,
1087 ..Default::default()
1088 },
1089 rel_addr: "4.3.2.1".to_owned(),
1090 rel_port: 43212,
1091 }
1092 .new_candidate_server_reflexive()?,
1093 );
1094
1095 let prflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1096 CandidatePeerReflexiveConfig {
1097 base_config: CandidateBaseConfig {
1098 network: "udp".to_owned(),
1099 address: "10.10.10.2".to_owned(),
1100 port: 19217,
1101 component: 1,
1102 ..Default::default()
1103 },
1104 rel_addr: "4.3.2.1".to_owned(),
1105 rel_port: 43211,
1106 }
1107 .new_candidate_peer_reflexive()?,
1108 );
1109
1110 let host_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1111 CandidateHostConfig {
1112 base_config: CandidateBaseConfig {
1113 network: "udp".to_owned(),
1114 address: "1.2.3.5".to_owned(),
1115 port: 12350,
1116 component: 1,
1117 ..Default::default()
1118 },
1119 ..Default::default()
1120 }
1121 .new_candidate_host()?,
1122 );
1123
1124 for remote in &[
1125 Arc::clone(&relay_remote),
1126 Arc::clone(&srflx_remote),
1127 Arc::clone(&prflx_remote),
1128 Arc::clone(&host_remote),
1129 ] {
1130 let p = a.internal.find_pair(&host_local, remote).await;
1131
1132 if p.is_none() {
1133 a.internal
1134 .add_pair(Arc::clone(&host_local), Arc::clone(remote))
1135 .await;
1136 }
1137 }
1138
1139 {
1140 if let Some(p) = a.internal.find_pair(&host_local, &prflx_remote).await {
1141 p.state
1142 .store(CandidatePairState::Failed as u8, Ordering::SeqCst);
1143 }
1144 }
1145
1146 let stats = a.get_candidate_pairs_stats().await;
1147 assert_eq!(stats.len(), 4, "expected 4 candidate pairs stats");
1148
1149 let (mut relay_pair_stat, mut srflx_pair_stat, mut prflx_pair_stat, mut host_pair_stat) = (
1150 CandidatePairStats::default(),
1151 CandidatePairStats::default(),
1152 CandidatePairStats::default(),
1153 CandidatePairStats::default(),
1154 );
1155
1156 for cps in stats {
1157 assert_eq!(
1158 cps.local_candidate_id,
1159 host_local.id(),
1160 "invalid local candidate id"
1161 );
1162
1163 if cps.remote_candidate_id == relay_remote.id() {
1164 relay_pair_stat = cps;
1165 } else if cps.remote_candidate_id == srflx_remote.id() {
1166 srflx_pair_stat = cps;
1167 } else if cps.remote_candidate_id == prflx_remote.id() {
1168 prflx_pair_stat = cps;
1169 } else if cps.remote_candidate_id == host_remote.id() {
1170 host_pair_stat = cps;
1171 } else {
1172 panic!("invalid remote candidate ID");
1173 }
1174 }
1175
1176 assert_eq!(
1177 relay_pair_stat.remote_candidate_id,
1178 relay_remote.id(),
1179 "missing host-relay pair stat"
1180 );
1181 assert_eq!(
1182 srflx_pair_stat.remote_candidate_id,
1183 srflx_remote.id(),
1184 "missing host-srflx pair stat"
1185 );
1186 assert_eq!(
1187 prflx_pair_stat.remote_candidate_id,
1188 prflx_remote.id(),
1189 "missing host-prflx pair stat"
1190 );
1191 assert_eq!(
1192 host_pair_stat.remote_candidate_id,
1193 host_remote.id(),
1194 "missing host-host pair stat"
1195 );
1196 assert_eq!(
1197 prflx_pair_stat.state,
1198 CandidatePairState::Failed,
1199 "expected host-prfflx pair to have state failed, it has state {} instead",
1200 prflx_pair_stat.state
1201 );
1202
1203 a.close().await?;
1204
1205 Ok(())
1206 }
1207
1208 #[tokio::test]
test_local_candidate_stats() -> Result<()>1209 async fn test_local_candidate_stats() -> Result<()> {
1210 let a = Agent::new(AgentConfig::default()).await?;
1211
1212 let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1213 CandidateHostConfig {
1214 base_config: CandidateBaseConfig {
1215 network: "udp".to_owned(),
1216 address: "192.168.1.1".to_owned(),
1217 port: 19216,
1218 component: 1,
1219 ..Default::default()
1220 },
1221 ..Default::default()
1222 }
1223 .new_candidate_host()?,
1224 );
1225
1226 let srflx_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1227 CandidateServerReflexiveConfig {
1228 base_config: CandidateBaseConfig {
1229 network: "udp".to_owned(),
1230 address: "192.168.1.1".to_owned(),
1231 port: 19217,
1232 component: 1,
1233 ..Default::default()
1234 },
1235 rel_addr: "4.3.2.1".to_owned(),
1236 rel_port: 43212,
1237 }
1238 .new_candidate_server_reflexive()?,
1239 );
1240
1241 {
1242 let mut local_candidates = a.internal.local_candidates.lock().await;
1243 local_candidates.insert(
1244 NetworkType::Udp4,
1245 vec![Arc::clone(&host_local), Arc::clone(&srflx_local)],
1246 );
1247 }
1248
1249 let local_stats = a.get_local_candidates_stats().await;
1250 assert_eq!(
1251 local_stats.len(),
1252 2,
1253 "expected 2 local candidates stats, got {} instead",
1254 local_stats.len()
1255 );
1256
1257 let (mut host_local_stat, mut srflx_local_stat) =
1258 (CandidateStats::default(), CandidateStats::default());
1259 for stats in local_stats {
1260 let candidate = if stats.id == host_local.id() {
1261 host_local_stat = stats.clone();
1262 Arc::clone(&host_local)
1263 } else if stats.id == srflx_local.id() {
1264 srflx_local_stat = stats.clone();
1265 Arc::clone(&srflx_local)
1266 } else {
1267 panic!("invalid local candidate ID");
1268 };
1269
1270 assert_eq!(
1271 stats.candidate_type,
1272 candidate.candidate_type(),
1273 "invalid stats CandidateType"
1274 );
1275 assert_eq!(
1276 stats.priority,
1277 candidate.priority(),
1278 "invalid stats CandidateType"
1279 );
1280 assert_eq!(stats.ip, candidate.address(), "invalid stats IP");
1281 }
1282
1283 assert_eq!(
1284 host_local_stat.id,
1285 host_local.id(),
1286 "missing host local stat"
1287 );
1288 assert_eq!(
1289 srflx_local_stat.id,
1290 srflx_local.id(),
1291 "missing srflx local stat"
1292 );
1293
1294 a.close().await?;
1295
1296 Ok(())
1297 }
1298
1299 #[tokio::test]
test_remote_candidate_stats() -> Result<()>1300 async fn test_remote_candidate_stats() -> Result<()> {
1301 let a = Agent::new(AgentConfig::default()).await?;
1302
1303 let relay_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1304 CandidateRelayConfig {
1305 base_config: CandidateBaseConfig {
1306 network: "udp".to_owned(),
1307 address: "1.2.3.4".to_owned(),
1308 port: 12340,
1309 component: 1,
1310 ..Default::default()
1311 },
1312 rel_addr: "4.3.2.1".to_owned(),
1313 rel_port: 43210,
1314 ..Default::default()
1315 }
1316 .new_candidate_relay()?,
1317 );
1318
1319 let srflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1320 CandidateServerReflexiveConfig {
1321 base_config: CandidateBaseConfig {
1322 network: "udp".to_owned(),
1323 address: "10.10.10.2".to_owned(),
1324 port: 19218,
1325 component: 1,
1326 ..Default::default()
1327 },
1328 rel_addr: "4.3.2.1".to_owned(),
1329 rel_port: 43212,
1330 }
1331 .new_candidate_server_reflexive()?,
1332 );
1333
1334 let prflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1335 CandidatePeerReflexiveConfig {
1336 base_config: CandidateBaseConfig {
1337 network: "udp".to_owned(),
1338 address: "10.10.10.2".to_owned(),
1339 port: 19217,
1340 component: 1,
1341 ..Default::default()
1342 },
1343 rel_addr: "4.3.2.1".to_owned(),
1344 rel_port: 43211,
1345 }
1346 .new_candidate_peer_reflexive()?,
1347 );
1348
1349 let host_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1350 CandidateHostConfig {
1351 base_config: CandidateBaseConfig {
1352 network: "udp".to_owned(),
1353 address: "1.2.3.5".to_owned(),
1354 port: 12350,
1355 component: 1,
1356 ..Default::default()
1357 },
1358 ..Default::default()
1359 }
1360 .new_candidate_host()?,
1361 );
1362
1363 {
1364 let mut remote_candidates = a.internal.remote_candidates.lock().await;
1365 remote_candidates.insert(
1366 NetworkType::Udp4,
1367 vec![
1368 Arc::clone(&relay_remote),
1369 Arc::clone(&srflx_remote),
1370 Arc::clone(&prflx_remote),
1371 Arc::clone(&host_remote),
1372 ],
1373 );
1374 }
1375
1376 let remote_stats = a.get_remote_candidates_stats().await;
1377 assert_eq!(
1378 remote_stats.len(),
1379 4,
1380 "expected 4 remote candidates stats, got {} instead",
1381 remote_stats.len()
1382 );
1383
1384 let (mut relay_remote_stat, mut srflx_remote_stat, mut prflx_remote_stat, mut host_remote_stat) = (
1385 CandidateStats::default(),
1386 CandidateStats::default(),
1387 CandidateStats::default(),
1388 CandidateStats::default(),
1389 );
1390 for stats in remote_stats {
1391 let candidate = if stats.id == relay_remote.id() {
1392 relay_remote_stat = stats.clone();
1393 Arc::clone(&relay_remote)
1394 } else if stats.id == srflx_remote.id() {
1395 srflx_remote_stat = stats.clone();
1396 Arc::clone(&srflx_remote)
1397 } else if stats.id == prflx_remote.id() {
1398 prflx_remote_stat = stats.clone();
1399 Arc::clone(&prflx_remote)
1400 } else if stats.id == host_remote.id() {
1401 host_remote_stat = stats.clone();
1402 Arc::clone(&host_remote)
1403 } else {
1404 panic!("invalid remote candidate ID");
1405 };
1406
1407 assert_eq!(
1408 stats.candidate_type,
1409 candidate.candidate_type(),
1410 "invalid stats CandidateType"
1411 );
1412 assert_eq!(
1413 stats.priority,
1414 candidate.priority(),
1415 "invalid stats CandidateType"
1416 );
1417 assert_eq!(stats.ip, candidate.address(), "invalid stats IP");
1418 }
1419
1420 assert_eq!(
1421 relay_remote_stat.id,
1422 relay_remote.id(),
1423 "missing relay remote stat"
1424 );
1425 assert_eq!(
1426 srflx_remote_stat.id,
1427 srflx_remote.id(),
1428 "missing srflx remote stat"
1429 );
1430 assert_eq!(
1431 prflx_remote_stat.id,
1432 prflx_remote.id(),
1433 "missing prflx remote stat"
1434 );
1435 assert_eq!(
1436 host_remote_stat.id,
1437 host_remote.id(),
1438 "missing host remote stat"
1439 );
1440
1441 a.close().await?;
1442
1443 Ok(())
1444 }
1445
1446 #[tokio::test]
test_init_ext_ip_mapping() -> Result<()>1447 async fn test_init_ext_ip_mapping() -> Result<()> {
1448 // a.extIPMapper should be nil by default
1449 let a = Agent::new(AgentConfig::default()).await?;
1450 assert!(
1451 a.ext_ip_mapper.is_none(),
1452 "a.extIPMapper should be none by default"
1453 );
1454 a.close().await?;
1455
1456 // a.extIPMapper should be nil when NAT1To1IPs is a non-nil empty array
1457 let a = Agent::new(AgentConfig {
1458 nat_1to1_ips: vec![],
1459 nat_1to1_ip_candidate_type: CandidateType::Host,
1460 ..Default::default()
1461 })
1462 .await?;
1463 assert!(
1464 a.ext_ip_mapper.is_none(),
1465 "a.extIPMapper should be none by default"
1466 );
1467 a.close().await?;
1468
1469 // NewAgent should return an error when 1:1 NAT for host candidate is enabled
1470 // but the candidate type does not appear in the CandidateTypes.
1471 if let Err(err) = Agent::new(AgentConfig {
1472 nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1473 nat_1to1_ip_candidate_type: CandidateType::Host,
1474 candidate_types: vec![CandidateType::Relay],
1475 ..Default::default()
1476 })
1477 .await
1478 {
1479 assert_eq!(
1480 Error::ErrIneffectiveNat1to1IpMappingHost,
1481 err,
1482 "Unexpected error: {err}"
1483 );
1484 } else {
1485 panic!("expected error, but got ok");
1486 }
1487
1488 // NewAgent should return an error when 1:1 NAT for srflx candidate is enabled
1489 // but the candidate type does not appear in the CandidateTypes.
1490 if let Err(err) = Agent::new(AgentConfig {
1491 nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1492 nat_1to1_ip_candidate_type: CandidateType::ServerReflexive,
1493 candidate_types: vec![CandidateType::Relay],
1494 ..Default::default()
1495 })
1496 .await
1497 {
1498 assert_eq!(
1499 Error::ErrIneffectiveNat1to1IpMappingSrflx,
1500 err,
1501 "Unexpected error: {err}"
1502 );
1503 } else {
1504 panic!("expected error, but got ok");
1505 }
1506
1507 // NewAgent should return an error when 1:1 NAT for host candidate is enabled
1508 // along with mDNS with MulticastDNSModeQueryAndGather
1509 if let Err(err) = Agent::new(AgentConfig {
1510 nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1511 nat_1to1_ip_candidate_type: CandidateType::Host,
1512 multicast_dns_mode: MulticastDnsMode::QueryAndGather,
1513 ..Default::default()
1514 })
1515 .await
1516 {
1517 assert_eq!(
1518 Error::ErrMulticastDnsWithNat1to1IpMapping,
1519 err,
1520 "Unexpected error: {err}"
1521 );
1522 } else {
1523 panic!("expected error, but got ok");
1524 }
1525
1526 // NewAgent should return if newExternalIPMapper() returns an error.
1527 if let Err(err) = Agent::new(AgentConfig {
1528 nat_1to1_ips: vec!["bad.2.3.4".to_owned()], // bad IP
1529 nat_1to1_ip_candidate_type: CandidateType::Host,
1530 ..Default::default()
1531 })
1532 .await
1533 {
1534 assert_eq!(
1535 Error::ErrInvalidNat1to1IpMapping,
1536 err,
1537 "Unexpected error: {err}"
1538 );
1539 } else {
1540 panic!("expected error, but got ok");
1541 }
1542
1543 Ok(())
1544 }
1545
1546 #[tokio::test]
test_binding_request_timeout() -> Result<()>1547 async fn test_binding_request_timeout() -> Result<()> {
1548 const EXPECTED_REMOVAL_COUNT: usize = 2;
1549
1550 let a = Agent::new(AgentConfig::default()).await?;
1551
1552 let now = Instant::now();
1553 {
1554 {
1555 let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await;
1556 pending_binding_requests.push(BindingRequest {
1557 timestamp: now, // valid
1558 ..Default::default()
1559 });
1560 pending_binding_requests.push(BindingRequest {
1561 timestamp: now.sub(Duration::from_millis(3900)), // valid
1562 ..Default::default()
1563 });
1564 pending_binding_requests.push(BindingRequest {
1565 timestamp: now.sub(Duration::from_millis(4100)), // invalid
1566 ..Default::default()
1567 });
1568 pending_binding_requests.push(BindingRequest {
1569 timestamp: now.sub(Duration::from_secs(75)), // invalid
1570 ..Default::default()
1571 });
1572 }
1573
1574 a.internal.invalidate_pending_binding_requests(now).await;
1575 {
1576 let pending_binding_requests = a.internal.pending_binding_requests.lock().await;
1577 assert_eq!(pending_binding_requests.len(), EXPECTED_REMOVAL_COUNT, "Binding invalidation due to timeout did not remove the correct number of binding requests")
1578 }
1579 }
1580
1581 a.close().await?;
1582
1583 Ok(())
1584 }
1585
1586 // test_agent_credentials checks if local username fragments and passwords (if set) meet RFC standard
1587 // and ensure it's backwards compatible with previous versions of the pion/ice
1588 #[tokio::test]
test_agent_credentials() -> Result<()>1589 async fn test_agent_credentials() -> Result<()> {
1590 // Agent should not require any of the usernames and password to be set
1591 // If set, they should follow the default 16/128 bits random number generator strategy
1592
1593 let a = Agent::new(AgentConfig::default()).await?;
1594 {
1595 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
1596 assert!(ufrag_pwd.local_ufrag.as_bytes().len() * 8 >= 24);
1597 assert!(ufrag_pwd.local_pwd.as_bytes().len() * 8 >= 128);
1598 }
1599 a.close().await?;
1600
1601 // Should honor RFC standards
1602 // Local values MUST be unguessable, with at least 128 bits of
1603 // random number generator output used to generate the password, and
1604 // at least 24 bits of output to generate the username fragment.
1605
1606 if let Err(err) = Agent::new(AgentConfig {
1607 local_ufrag: "xx".to_owned(),
1608 ..Default::default()
1609 })
1610 .await
1611 {
1612 assert_eq!(Error::ErrLocalUfragInsufficientBits, err);
1613 } else {
1614 panic!("expected error, but got ok");
1615 }
1616
1617 if let Err(err) = Agent::new(AgentConfig {
1618 local_pwd: "xxxxxx".to_owned(),
1619 ..Default::default()
1620 })
1621 .await
1622 {
1623 assert_eq!(Error::ErrLocalPwdInsufficientBits, err);
1624 } else {
1625 panic!("expected error, but got ok");
1626 }
1627
1628 Ok(())
1629 }
1630
1631 // Assert that Agent on Failure deletes all existing candidates
1632 // User can then do an ICE Restart to bring agent back
1633 #[tokio::test]
test_connection_state_failed_delete_all_candidates() -> Result<()>1634 async fn test_connection_state_failed_delete_all_candidates() -> Result<()> {
1635 let one_second = Duration::from_secs(1);
1636 let keepalive_interval = Duration::from_secs(0);
1637
1638 let cfg0 = AgentConfig {
1639 network_types: supported_network_types(),
1640 disconnected_timeout: Some(one_second),
1641 failed_timeout: Some(one_second),
1642 keepalive_interval: Some(keepalive_interval),
1643 ..Default::default()
1644 };
1645 let cfg1 = AgentConfig {
1646 network_types: supported_network_types(),
1647 disconnected_timeout: Some(one_second),
1648 failed_timeout: Some(one_second),
1649 keepalive_interval: Some(keepalive_interval),
1650 ..Default::default()
1651 };
1652
1653 let a_agent = Arc::new(Agent::new(cfg0).await?);
1654 let b_agent = Arc::new(Agent::new(cfg1).await?);
1655
1656 let (is_failed_tx, mut is_failed_rx) = mpsc::channel::<()>(1);
1657 let is_failed_tx = Arc::new(Mutex::new(Some(is_failed_tx)));
1658 a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
1659 let is_failed_tx_clone = Arc::clone(&is_failed_tx);
1660 Box::pin(async move {
1661 if c == ConnectionState::Failed {
1662 let mut tx = is_failed_tx_clone.lock().await;
1663 tx.take();
1664 }
1665 })
1666 }));
1667
1668 connect_with_vnet(&a_agent, &b_agent).await?;
1669 let _ = is_failed_rx.recv().await;
1670
1671 {
1672 {
1673 let remote_candidates = a_agent.internal.remote_candidates.lock().await;
1674 assert_eq!(remote_candidates.len(), 0);
1675 }
1676 {
1677 let local_candidates = a_agent.internal.local_candidates.lock().await;
1678 assert_eq!(local_candidates.len(), 0);
1679 }
1680 }
1681
1682 a_agent.close().await?;
1683 b_agent.close().await?;
1684
1685 Ok(())
1686 }
1687
1688 // Assert that the ICE Agent can go directly from Connecting -> Failed on both sides
1689 #[tokio::test]
test_connection_state_connecting_to_failed() -> Result<()>1690 async fn test_connection_state_connecting_to_failed() -> Result<()> {
1691 let one_second = Duration::from_secs(1);
1692 let keepalive_interval = Duration::from_secs(0);
1693
1694 let cfg0 = AgentConfig {
1695 disconnected_timeout: Some(one_second),
1696 failed_timeout: Some(one_second),
1697 keepalive_interval: Some(keepalive_interval),
1698 ..Default::default()
1699 };
1700 let cfg1 = AgentConfig {
1701 disconnected_timeout: Some(one_second),
1702 failed_timeout: Some(one_second),
1703 keepalive_interval: Some(keepalive_interval),
1704 ..Default::default()
1705 };
1706
1707 let a_agent = Arc::new(Agent::new(cfg0).await?);
1708 let b_agent = Arc::new(Agent::new(cfg1).await?);
1709
1710 let is_failed = WaitGroup::new();
1711 let is_checking = WaitGroup::new();
1712
1713 let connection_state_check = move |wf: Worker, wc: Worker| {
1714 let wf = Arc::new(Mutex::new(Some(wf)));
1715 let wc = Arc::new(Mutex::new(Some(wc)));
1716 let hdlr_fn: OnConnectionStateChangeHdlrFn = Box::new(move |c: ConnectionState| {
1717 let wf_clone = Arc::clone(&wf);
1718 let wc_clone = Arc::clone(&wc);
1719 Box::pin(async move {
1720 if c == ConnectionState::Failed {
1721 let mut f = wf_clone.lock().await;
1722 f.take();
1723 } else if c == ConnectionState::Checking {
1724 let mut c = wc_clone.lock().await;
1725 c.take();
1726 } else if c == ConnectionState::Connected || c == ConnectionState::Completed {
1727 panic!("Unexpected ConnectionState: {c}");
1728 }
1729 })
1730 });
1731 hdlr_fn
1732 };
1733
1734 let (wf1, wc1) = (is_failed.worker(), is_checking.worker());
1735 a_agent.on_connection_state_change(connection_state_check(wf1, wc1));
1736
1737 let (wf2, wc2) = (is_failed.worker(), is_checking.worker());
1738 b_agent.on_connection_state_change(connection_state_check(wf2, wc2));
1739
1740 let agent_a = Arc::clone(&a_agent);
1741 tokio::spawn(async move {
1742 let (_cancel_tx, cancel_rx) = mpsc::channel(1);
1743 let result = agent_a
1744 .accept(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned())
1745 .await;
1746 assert!(result.is_err());
1747 });
1748
1749 let agent_b = Arc::clone(&b_agent);
1750 tokio::spawn(async move {
1751 let (_cancel_tx, cancel_rx) = mpsc::channel(1);
1752 let result = agent_b
1753 .dial(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned())
1754 .await;
1755 assert!(result.is_err());
1756 });
1757
1758 is_checking.wait().await;
1759 is_failed.wait().await;
1760
1761 a_agent.close().await?;
1762 b_agent.close().await?;
1763
1764 Ok(())
1765 }
1766
1767 #[tokio::test]
test_agent_restart_during_gather() -> Result<()>1768 async fn test_agent_restart_during_gather() -> Result<()> {
1769 //"Restart During Gather"
1770
1771 let agent = Agent::new(AgentConfig::default()).await?;
1772
1773 agent
1774 .gathering_state
1775 .store(GatheringState::Gathering as u8, Ordering::SeqCst);
1776
1777 if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await {
1778 assert_eq!(Error::ErrRestartWhenGathering, err);
1779 } else {
1780 panic!("expected error, but got ok");
1781 }
1782
1783 agent.close().await?;
1784
1785 Ok(())
1786 }
1787
1788 #[tokio::test]
test_agent_restart_when_closed() -> Result<()>1789 async fn test_agent_restart_when_closed() -> Result<()> {
1790 //"Restart When Closed"
1791
1792 let agent = Agent::new(AgentConfig::default()).await?;
1793 agent.close().await?;
1794
1795 if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await {
1796 assert_eq!(Error::ErrClosed, err);
1797 } else {
1798 panic!("expected error, but got ok");
1799 }
1800
1801 Ok(())
1802 }
1803
1804 #[tokio::test]
test_agent_restart_one_side() -> Result<()>1805 async fn test_agent_restart_one_side() -> Result<()> {
1806 let one_second = Duration::from_secs(1);
1807
1808 //"Restart One Side"
1809 let (_, _, agent_a, agent_b) = pipe(
1810 Some(AgentConfig {
1811 disconnected_timeout: Some(one_second),
1812 failed_timeout: Some(one_second),
1813 ..Default::default()
1814 }),
1815 Some(AgentConfig {
1816 disconnected_timeout: Some(one_second),
1817 failed_timeout: Some(one_second),
1818 ..Default::default()
1819 }),
1820 )
1821 .await?;
1822
1823 let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
1824 let cancel_tx = Arc::new(Mutex::new(Some(cancel_tx)));
1825 agent_b.on_connection_state_change(Box::new(move |c: ConnectionState| {
1826 let cancel_tx_clone = Arc::clone(&cancel_tx);
1827 Box::pin(async move {
1828 if c == ConnectionState::Failed || c == ConnectionState::Disconnected {
1829 let mut tx = cancel_tx_clone.lock().await;
1830 tx.take();
1831 }
1832 })
1833 }));
1834
1835 agent_a.restart("".to_owned(), "".to_owned()).await?;
1836
1837 let _ = cancel_rx.recv().await;
1838
1839 agent_a.close().await?;
1840 agent_b.close().await?;
1841
1842 Ok(())
1843 }
1844
1845 #[tokio::test]
test_agent_restart_both_side() -> Result<()>1846 async fn test_agent_restart_both_side() -> Result<()> {
1847 let one_second = Duration::from_secs(1);
1848 //"Restart Both Sides"
1849
1850 // Get all addresses of candidates concatenated
1851 let generate_candidate_address_strings =
1852 |res: Result<Vec<Arc<dyn Candidate + Send + Sync>>>| -> String {
1853 assert!(res.is_ok());
1854
1855 let mut out = String::new();
1856 if let Ok(candidates) = res {
1857 for c in candidates {
1858 out += c.address().as_str();
1859 out += ":";
1860 out += c.port().to_string().as_str();
1861 }
1862 }
1863 out
1864 };
1865
1866 // Store the original candidates, confirm that after we reconnect we have new pairs
1867 let (_, _, agent_a, agent_b) = pipe(
1868 Some(AgentConfig {
1869 disconnected_timeout: Some(one_second),
1870 failed_timeout: Some(one_second),
1871 ..Default::default()
1872 }),
1873 Some(AgentConfig {
1874 disconnected_timeout: Some(one_second),
1875 failed_timeout: Some(one_second),
1876 ..Default::default()
1877 }),
1878 )
1879 .await?;
1880
1881 let conn_afirst_candidates =
1882 generate_candidate_address_strings(agent_a.get_local_candidates().await);
1883 let conn_bfirst_candidates =
1884 generate_candidate_address_strings(agent_b.get_local_candidates().await);
1885
1886 let (a_notifier, mut a_connected) = on_connected();
1887 agent_a.on_connection_state_change(a_notifier);
1888
1889 let (b_notifier, mut b_connected) = on_connected();
1890 agent_b.on_connection_state_change(b_notifier);
1891
1892 // Restart and Re-Signal
1893 agent_a.restart("".to_owned(), "".to_owned()).await?;
1894 agent_b.restart("".to_owned(), "".to_owned()).await?;
1895
1896 // Exchange Candidates and Credentials
1897 let (ufrag, pwd) = agent_b.get_local_user_credentials().await;
1898 agent_a.set_remote_credentials(ufrag, pwd).await?;
1899
1900 let (ufrag, pwd) = agent_a.get_local_user_credentials().await;
1901 agent_b.set_remote_credentials(ufrag, pwd).await?;
1902
1903 gather_and_exchange_candidates(&agent_a, &agent_b).await?;
1904
1905 // Wait until both have gone back to connected
1906 let _ = a_connected.recv().await;
1907 let _ = b_connected.recv().await;
1908
1909 // Assert that we have new candiates each time
1910 assert_ne!(
1911 conn_afirst_candidates,
1912 generate_candidate_address_strings(agent_a.get_local_candidates().await)
1913 );
1914 assert_ne!(
1915 conn_bfirst_candidates,
1916 generate_candidate_address_strings(agent_b.get_local_candidates().await)
1917 );
1918
1919 agent_a.close().await?;
1920 agent_b.close().await?;
1921
1922 Ok(())
1923 }
1924
1925 #[tokio::test]
test_get_remote_credentials() -> Result<()>1926 async fn test_get_remote_credentials() -> Result<()> {
1927 let a = Agent::new(AgentConfig::default()).await?;
1928
1929 let (remote_ufrag, remote_pwd) = {
1930 let mut ufrag_pwd = a.internal.ufrag_pwd.lock().await;
1931 ufrag_pwd.remote_ufrag = "remoteUfrag".to_owned();
1932 ufrag_pwd.remote_pwd = "remotePwd".to_owned();
1933 (
1934 ufrag_pwd.remote_ufrag.to_owned(),
1935 ufrag_pwd.remote_pwd.to_owned(),
1936 )
1937 };
1938
1939 let (actual_ufrag, actual_pwd) = a.get_remote_user_credentials().await;
1940
1941 assert_eq!(actual_ufrag, remote_ufrag);
1942 assert_eq!(actual_pwd, remote_pwd);
1943
1944 a.close().await?;
1945
1946 Ok(())
1947 }
1948
1949 #[tokio::test]
test_close_in_connection_state_callback() -> Result<()>1950 async fn test_close_in_connection_state_callback() -> Result<()> {
1951 let disconnected_duration = Duration::from_secs(1);
1952 let failed_duration = Duration::from_secs(1);
1953 let keepalive_interval = Duration::from_secs(0);
1954
1955 let cfg0 = AgentConfig {
1956 urls: vec![],
1957 network_types: supported_network_types(),
1958 disconnected_timeout: Some(disconnected_duration),
1959 failed_timeout: Some(failed_duration),
1960 keepalive_interval: Some(keepalive_interval),
1961 check_interval: Duration::from_millis(500),
1962 ..Default::default()
1963 };
1964
1965 let cfg1 = AgentConfig {
1966 urls: vec![],
1967 network_types: supported_network_types(),
1968 disconnected_timeout: Some(disconnected_duration),
1969 failed_timeout: Some(failed_duration),
1970 keepalive_interval: Some(keepalive_interval),
1971 check_interval: Duration::from_millis(500),
1972 ..Default::default()
1973 };
1974
1975 let a_agent = Arc::new(Agent::new(cfg0).await?);
1976 let b_agent = Arc::new(Agent::new(cfg1).await?);
1977
1978 let (is_closed_tx, mut is_closed_rx) = mpsc::channel::<()>(1);
1979 let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1);
1980 let is_closed_tx = Arc::new(Mutex::new(Some(is_closed_tx)));
1981 let is_connected_tx = Arc::new(Mutex::new(Some(is_connected_tx)));
1982 a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
1983 let is_closed_tx_clone = Arc::clone(&is_closed_tx);
1984 let is_connected_tx_clone = Arc::clone(&is_connected_tx);
1985 Box::pin(async move {
1986 if c == ConnectionState::Connected {
1987 let mut tx = is_connected_tx_clone.lock().await;
1988 tx.take();
1989 } else if c == ConnectionState::Closed {
1990 let mut tx = is_closed_tx_clone.lock().await;
1991 tx.take();
1992 }
1993 })
1994 }));
1995
1996 connect_with_vnet(&a_agent, &b_agent).await?;
1997
1998 let _ = is_connected_rx.recv().await;
1999 a_agent.close().await?;
2000
2001 let _ = is_closed_rx.recv().await;
2002 b_agent.close().await?;
2003
2004 Ok(())
2005 }
2006
2007 #[tokio::test]
test_run_task_in_connection_state_callback() -> Result<()>2008 async fn test_run_task_in_connection_state_callback() -> Result<()> {
2009 let one_second = Duration::from_secs(1);
2010 let keepalive_interval = Duration::from_secs(0);
2011
2012 let cfg0 = AgentConfig {
2013 urls: vec![],
2014 network_types: supported_network_types(),
2015 disconnected_timeout: Some(one_second),
2016 failed_timeout: Some(one_second),
2017 keepalive_interval: Some(keepalive_interval),
2018 check_interval: Duration::from_millis(50),
2019 ..Default::default()
2020 };
2021
2022 let cfg1 = AgentConfig {
2023 urls: vec![],
2024 network_types: supported_network_types(),
2025 disconnected_timeout: Some(one_second),
2026 failed_timeout: Some(one_second),
2027 keepalive_interval: Some(keepalive_interval),
2028 check_interval: Duration::from_millis(50),
2029 ..Default::default()
2030 };
2031
2032 let a_agent = Arc::new(Agent::new(cfg0).await?);
2033 let b_agent = Arc::new(Agent::new(cfg1).await?);
2034
2035 let (is_complete_tx, mut is_complete_rx) = mpsc::channel::<()>(1);
2036 let is_complete_tx = Arc::new(Mutex::new(Some(is_complete_tx)));
2037 a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2038 let is_complete_tx_clone = Arc::clone(&is_complete_tx);
2039 Box::pin(async move {
2040 if c == ConnectionState::Connected {
2041 let mut tx = is_complete_tx_clone.lock().await;
2042 tx.take();
2043 }
2044 })
2045 }));
2046
2047 connect_with_vnet(&a_agent, &b_agent).await?;
2048
2049 let _ = is_complete_rx.recv().await;
2050 let _ = a_agent.get_local_user_credentials().await;
2051 a_agent.restart("".to_owned(), "".to_owned()).await?;
2052
2053 a_agent.close().await?;
2054 b_agent.close().await?;
2055
2056 Ok(())
2057 }
2058
2059 #[tokio::test]
test_run_task_in_selected_candidate_pair_change_callback() -> Result<()>2060 async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<()> {
2061 let one_second = Duration::from_secs(1);
2062 let keepalive_interval = Duration::from_secs(0);
2063
2064 let cfg0 = AgentConfig {
2065 urls: vec![],
2066 network_types: supported_network_types(),
2067 disconnected_timeout: Some(one_second),
2068 failed_timeout: Some(one_second),
2069 keepalive_interval: Some(keepalive_interval),
2070 check_interval: Duration::from_millis(50),
2071 ..Default::default()
2072 };
2073
2074 let cfg1 = AgentConfig {
2075 urls: vec![],
2076 network_types: supported_network_types(),
2077 disconnected_timeout: Some(one_second),
2078 failed_timeout: Some(one_second),
2079 keepalive_interval: Some(keepalive_interval),
2080 check_interval: Duration::from_millis(50),
2081 ..Default::default()
2082 };
2083
2084 let a_agent = Arc::new(Agent::new(cfg0).await?);
2085 let b_agent = Arc::new(Agent::new(cfg1).await?);
2086
2087 let (is_tested_tx, mut is_tested_rx) = mpsc::channel::<()>(1);
2088 let is_tested_tx = Arc::new(Mutex::new(Some(is_tested_tx)));
2089 a_agent.on_selected_candidate_pair_change(Box::new(
2090 move |_: &Arc<dyn Candidate + Send + Sync>, _: &Arc<dyn Candidate + Send + Sync>| {
2091 let is_tested_tx_clone = Arc::clone(&is_tested_tx);
2092 Box::pin(async move {
2093 let mut tx = is_tested_tx_clone.lock().await;
2094 tx.take();
2095 })
2096 },
2097 ));
2098
2099 let (is_complete_tx, mut is_complete_rx) = mpsc::channel::<()>(1);
2100 let is_complete_tx = Arc::new(Mutex::new(Some(is_complete_tx)));
2101 a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2102 let is_complete_tx_clone = Arc::clone(&is_complete_tx);
2103 Box::pin(async move {
2104 if c == ConnectionState::Connected {
2105 let mut tx = is_complete_tx_clone.lock().await;
2106 tx.take();
2107 }
2108 })
2109 }));
2110
2111 connect_with_vnet(&a_agent, &b_agent).await?;
2112
2113 let _ = is_complete_rx.recv().await;
2114 let _ = is_tested_rx.recv().await;
2115
2116 let _ = a_agent.get_local_user_credentials().await;
2117
2118 a_agent.close().await?;
2119 b_agent.close().await?;
2120
2121 Ok(())
2122 }
2123
2124 // Assert that a Lite agent goes to disconnected and failed
2125 #[tokio::test]
test_lite_lifecycle() -> Result<()>2126 async fn test_lite_lifecycle() -> Result<()> {
2127 let (a_notifier, mut a_connected_rx) = on_connected();
2128
2129 let a_agent = Arc::new(
2130 Agent::new(AgentConfig {
2131 network_types: supported_network_types(),
2132 multicast_dns_mode: MulticastDnsMode::Disabled,
2133 ..Default::default()
2134 })
2135 .await?,
2136 );
2137
2138 a_agent.on_connection_state_change(a_notifier);
2139
2140 let disconnected_duration = Duration::from_secs(1);
2141 let failed_duration = Duration::from_secs(1);
2142 let keepalive_interval = Duration::from_secs(0);
2143
2144 let b_agent = Arc::new(
2145 Agent::new(AgentConfig {
2146 lite: true,
2147 candidate_types: vec![CandidateType::Host],
2148 network_types: supported_network_types(),
2149 multicast_dns_mode: MulticastDnsMode::Disabled,
2150 disconnected_timeout: Some(disconnected_duration),
2151 failed_timeout: Some(failed_duration),
2152 keepalive_interval: Some(keepalive_interval),
2153 check_interval: Duration::from_millis(500),
2154 ..Default::default()
2155 })
2156 .await?,
2157 );
2158
2159 let (b_connected_tx, mut b_connected_rx) = mpsc::channel::<()>(1);
2160 let (b_disconnected_tx, mut b_disconnected_rx) = mpsc::channel::<()>(1);
2161 let (b_failed_tx, mut b_failed_rx) = mpsc::channel::<()>(1);
2162 let b_connected_tx = Arc::new(Mutex::new(Some(b_connected_tx)));
2163 let b_disconnected_tx = Arc::new(Mutex::new(Some(b_disconnected_tx)));
2164 let b_failed_tx = Arc::new(Mutex::new(Some(b_failed_tx)));
2165
2166 b_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2167 let b_connected_tx_clone = Arc::clone(&b_connected_tx);
2168 let b_disconnected_tx_clone = Arc::clone(&b_disconnected_tx);
2169 let b_failed_tx_clone = Arc::clone(&b_failed_tx);
2170
2171 Box::pin(async move {
2172 if c == ConnectionState::Connected {
2173 let mut tx = b_connected_tx_clone.lock().await;
2174 tx.take();
2175 } else if c == ConnectionState::Disconnected {
2176 let mut tx = b_disconnected_tx_clone.lock().await;
2177 tx.take();
2178 } else if c == ConnectionState::Failed {
2179 let mut tx = b_failed_tx_clone.lock().await;
2180 tx.take();
2181 }
2182 })
2183 }));
2184
2185 connect_with_vnet(&b_agent, &a_agent).await?;
2186
2187 let _ = a_connected_rx.recv().await;
2188 let _ = b_connected_rx.recv().await;
2189 a_agent.close().await?;
2190
2191 let _ = b_disconnected_rx.recv().await;
2192 let _ = b_failed_rx.recv().await;
2193
2194 b_agent.close().await?;
2195
2196 Ok(())
2197 }
2198