xref: /webrtc/ice/src/agent/agent_test.rs (revision 5d8fe953)
1 use super::agent_vnet_test::*;
2 use super::*;
3 use crate::candidate::candidate_base::*;
4 use crate::candidate::candidate_host::*;
5 use crate::candidate::candidate_peer_reflexive::*;
6 use crate::candidate::candidate_relay::*;
7 use crate::candidate::candidate_server_reflexive::*;
8 use crate::control::AttrControlling;
9 use crate::priority::PriorityAttr;
10 use crate::use_candidate::UseCandidateAttr;
11 
12 use crate::agent::agent_transport_test::pipe;
13 use async_trait::async_trait;
14 use std::net::Ipv4Addr;
15 use std::ops::Sub;
16 use std::str::FromStr;
17 use stun::message::*;
18 use stun::textattrs::Username;
19 use util::{vnet::*, Conn};
20 use waitgroup::{WaitGroup, Worker};
21 
22 #[tokio::test]
test_pair_search() -> Result<()>23 async fn test_pair_search() -> Result<()> {
24     let config = AgentConfig::default();
25     let a = Agent::new(config).await?;
26 
27     {
28         {
29             let checklist = a.internal.agent_conn.checklist.lock().await;
30             assert!(
31                 checklist.is_empty(),
32                 "TestPairSearch is only a valid test if a.validPairs is empty on construction"
33             );
34         }
35 
36         let cp = a
37             .internal
38             .agent_conn
39             .get_best_available_candidate_pair()
40             .await;
41         assert!(cp.is_none(), "No Candidate pairs should exist");
42     }
43 
44     a.close().await?;
45 
46     Ok(())
47 }
48 
49 #[tokio::test]
test_pair_priority() -> Result<()>50 async fn test_pair_priority() -> Result<()> {
51     let a = Agent::new(AgentConfig::default()).await?;
52 
53     let host_config = CandidateHostConfig {
54         base_config: CandidateBaseConfig {
55             network: "udp".to_owned(),
56             address: "192.168.1.1".to_owned(),
57             port: 19216,
58             component: 1,
59             ..Default::default()
60         },
61         ..Default::default()
62     };
63     let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
64 
65     let relay_config = CandidateRelayConfig {
66         base_config: CandidateBaseConfig {
67             network: "udp".to_owned(),
68             address: "1.2.3.4".to_owned(),
69             port: 12340,
70             component: 1,
71             ..Default::default()
72         },
73         rel_addr: "4.3.2.1".to_owned(),
74         rel_port: 43210,
75         ..Default::default()
76     };
77 
78     let relay_remote = relay_config.new_candidate_relay()?;
79 
80     let srflx_config = CandidateServerReflexiveConfig {
81         base_config: CandidateBaseConfig {
82             network: "udp".to_owned(),
83             address: "10.10.10.2".to_owned(),
84             port: 19218,
85             component: 1,
86             ..Default::default()
87         },
88         rel_addr: "4.3.2.1".to_owned(),
89         rel_port: 43212,
90     };
91 
92     let srflx_remote = srflx_config.new_candidate_server_reflexive()?;
93 
94     let prflx_config = CandidatePeerReflexiveConfig {
95         base_config: CandidateBaseConfig {
96             network: "udp".to_owned(),
97             address: "10.10.10.2".to_owned(),
98             port: 19217,
99             component: 1,
100             ..Default::default()
101         },
102         rel_addr: "4.3.2.1".to_owned(),
103         rel_port: 43211,
104     };
105 
106     let prflx_remote = prflx_config.new_candidate_peer_reflexive()?;
107 
108     let host_config = CandidateHostConfig {
109         base_config: CandidateBaseConfig {
110             network: "udp".to_owned(),
111             address: "1.2.3.5".to_owned(),
112             port: 12350,
113             component: 1,
114             ..Default::default()
115         },
116         ..Default::default()
117     };
118     let host_remote = host_config.new_candidate_host()?;
119 
120     let remotes: Vec<Arc<dyn Candidate + Send + Sync>> = vec![
121         Arc::new(relay_remote),
122         Arc::new(srflx_remote),
123         Arc::new(prflx_remote),
124         Arc::new(host_remote),
125     ];
126 
127     {
128         for remote in remotes {
129             if a.internal.find_pair(&host_local, &remote).await.is_none() {
130                 a.internal
131                     .add_pair(host_local.clone(), remote.clone())
132                     .await;
133             }
134 
135             if let Some(p) = a.internal.find_pair(&host_local, &remote).await {
136                 p.state
137                     .store(CandidatePairState::Succeeded as u8, Ordering::SeqCst);
138             }
139 
140             if let Some(best_pair) = a
141                 .internal
142                 .agent_conn
143                 .get_best_available_candidate_pair()
144                 .await
145             {
146                 assert_eq!(
147                     best_pair.to_string(),
148                     CandidatePair {
149                         remote: remote.clone(),
150                         local: host_local.clone(),
151                         ..Default::default()
152                     }
153                     .to_string(),
154                     "Unexpected bestPair {best_pair} (expected remote: {remote})",
155                 );
156             } else {
157                 panic!("expected Some, but got None");
158             }
159         }
160     }
161 
162     a.close().await?;
163     Ok(())
164 }
165 
166 #[tokio::test]
test_agent_get_stats() -> Result<()>167 async fn test_agent_get_stats() -> Result<()> {
168     let (conn_a, conn_b, agent_a, agent_b) = pipe(None, None).await?;
169     assert_eq!(agent_a.get_bytes_received(), 0);
170     assert_eq!(agent_a.get_bytes_sent(), 0);
171     assert_eq!(agent_b.get_bytes_received(), 0);
172     assert_eq!(agent_b.get_bytes_sent(), 0);
173 
174     let _na = conn_a.send(&[0u8; 10]).await?;
175     let mut buf = vec![0u8; 10];
176     let _nb = conn_b.recv(&mut buf).await?;
177 
178     assert_eq!(agent_a.get_bytes_received(), 0);
179     assert_eq!(agent_a.get_bytes_sent(), 10);
180 
181     assert_eq!(agent_b.get_bytes_received(), 10);
182     assert_eq!(agent_b.get_bytes_sent(), 0);
183 
184     Ok(())
185 }
186 
187 #[tokio::test]
test_on_selected_candidate_pair_change() -> Result<()>188 async fn test_on_selected_candidate_pair_change() -> Result<()> {
189     let a = Agent::new(AgentConfig::default()).await?;
190     let (callback_called_tx, mut callback_called_rx) = mpsc::channel::<()>(1);
191     let callback_called_tx = Arc::new(Mutex::new(Some(callback_called_tx)));
192     let cb: OnSelectedCandidatePairChangeHdlrFn = Box::new(move |_, _| {
193         let callback_called_tx_clone = Arc::clone(&callback_called_tx);
194         Box::pin(async move {
195             let mut tx = callback_called_tx_clone.lock().await;
196             tx.take();
197         })
198     });
199     a.on_selected_candidate_pair_change(cb);
200 
201     let host_config = CandidateHostConfig {
202         base_config: CandidateBaseConfig {
203             network: "udp".to_owned(),
204             address: "192.168.1.1".to_owned(),
205             port: 19216,
206             component: 1,
207             ..Default::default()
208         },
209         ..Default::default()
210     };
211     let host_local = host_config.new_candidate_host()?;
212 
213     let relay_config = CandidateRelayConfig {
214         base_config: CandidateBaseConfig {
215             network: "udp".to_owned(),
216             address: "1.2.3.4".to_owned(),
217             port: 12340,
218             component: 1,
219             ..Default::default()
220         },
221         rel_addr: "4.3.2.1".to_owned(),
222         rel_port: 43210,
223         ..Default::default()
224     };
225     let relay_remote = relay_config.new_candidate_relay()?;
226 
227     // select the pair
228     let p = Arc::new(CandidatePair::new(
229         Arc::new(host_local),
230         Arc::new(relay_remote),
231         false,
232     ));
233     a.internal.set_selected_pair(Some(p)).await;
234 
235     // ensure that the callback fired on setting the pair
236     let _ = callback_called_rx.recv().await;
237 
238     a.close().await?;
239     Ok(())
240 }
241 
242 #[tokio::test]
test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()>243 async fn test_handle_peer_reflexive_udp_pflx_candidate() -> Result<()> {
244     let a = Agent::new(AgentConfig::default()).await?;
245 
246     let host_config = CandidateHostConfig {
247         base_config: CandidateBaseConfig {
248             network: "udp".to_owned(),
249             address: "192.168.0.2".to_owned(),
250             port: 777,
251             component: 1,
252             conn: Some(Arc::new(MockConn {})),
253             ..Default::default()
254         },
255         ..Default::default()
256     };
257 
258     let local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
259     let remote = SocketAddr::from_str("172.17.0.3:999")?;
260 
261     let (username, local_pwd, tie_breaker) = {
262         let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
263         (
264             ufrag_pwd.local_ufrag.to_owned() + ":" + ufrag_pwd.remote_ufrag.as_str(),
265             ufrag_pwd.local_pwd.clone(),
266             a.internal.tie_breaker.load(Ordering::SeqCst),
267         )
268     };
269 
270     let mut msg = Message::new();
271     msg.build(&[
272         Box::new(BINDING_REQUEST),
273         Box::new(TransactionId::new()),
274         Box::new(Username::new(ATTR_USERNAME, username)),
275         Box::new(UseCandidateAttr::new()),
276         Box::new(AttrControlling(tie_breaker)),
277         Box::new(PriorityAttr(local.priority())),
278         Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
279         Box::new(FINGERPRINT),
280     ])?;
281 
282     {
283         a.internal.handle_inbound(&mut msg, &local, remote).await;
284 
285         let remote_candidates = a.internal.remote_candidates.lock().await;
286         // length of remote candidate list must be one now
287         assert_eq!(
288             remote_candidates.len(),
289             1,
290             "failed to add a network type to the remote candidate list"
291         );
292 
293         // length of remote candidate list for a network type must be 1
294         if let Some(cands) = remote_candidates.get(&local.network_type()) {
295             assert_eq!(
296                 cands.len(),
297                 1,
298                 "failed to add prflx candidate to remote candidate list"
299             );
300 
301             let c = &cands[0];
302 
303             assert_eq!(
304                 c.candidate_type(),
305                 CandidateType::PeerReflexive,
306                 "candidate type must be prflx"
307             );
308 
309             assert_eq!(c.address(), "172.17.0.3", "IP address mismatch");
310 
311             assert_eq!(c.port(), 999, "Port number mismatch");
312         } else {
313             panic!(
314                 "expected non-empty remote candidate for network type {}",
315                 local.network_type()
316             );
317         }
318     }
319 
320     a.close().await?;
321     Ok(())
322 }
323 
324 #[tokio::test]
test_handle_peer_reflexive_unknown_remote() -> Result<()>325 async fn test_handle_peer_reflexive_unknown_remote() -> Result<()> {
326     let a = Agent::new(AgentConfig::default()).await?;
327 
328     let mut tid = TransactionId::default();
329     tid.0[..3].copy_from_slice("ABC".as_bytes());
330 
331     let remote_pwd = {
332         {
333             let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await;
334             *pending_binding_requests = vec![BindingRequest {
335                 timestamp: Instant::now(),
336                 transaction_id: tid,
337                 destination: SocketAddr::from_str("0.0.0.0:0")?,
338                 is_use_candidate: false,
339             }];
340         }
341         let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
342         ufrag_pwd.remote_pwd.clone()
343     };
344 
345     let host_config = CandidateHostConfig {
346         base_config: CandidateBaseConfig {
347             network: "udp".to_owned(),
348             address: "192.168.0.2".to_owned(),
349             port: 777,
350             component: 1,
351             conn: Some(Arc::new(MockConn {})),
352             ..Default::default()
353         },
354         ..Default::default()
355     };
356 
357     let local: Arc<dyn Candidate + Send + Sync> = Arc::new(host_config.new_candidate_host()?);
358     let remote = SocketAddr::from_str("172.17.0.3:999")?;
359 
360     let mut msg = Message::new();
361     msg.build(&[
362         Box::new(BINDING_SUCCESS),
363         Box::new(tid),
364         Box::new(MessageIntegrity::new_short_term_integrity(remote_pwd)),
365         Box::new(FINGERPRINT),
366     ])?;
367 
368     {
369         a.internal.handle_inbound(&mut msg, &local, remote).await;
370 
371         let remote_candidates = a.internal.remote_candidates.lock().await;
372         assert_eq!(
373             remote_candidates.len(),
374             0,
375             "unknown remote was able to create a candidate"
376         );
377     }
378 
379     a.close().await?;
380     Ok(())
381 }
382 
383 //use std::io::Write;
384 
385 // Assert that Agent on startup sends message, and doesn't wait for connectivityTicker to fire
386 #[tokio::test]
test_connectivity_on_startup() -> Result<()>387 async fn test_connectivity_on_startup() -> Result<()> {
388     /*env_logger::Builder::new()
389     .format(|buf, record| {
390         writeln!(
391             buf,
392             "{}:{} [{}] {} - {}",
393             record.file().unwrap_or("unknown"),
394             record.line().unwrap_or(0),
395             record.level(),
396             chrono::Local::now().format("%H:%M:%S.%6f"),
397             record.args()
398         )
399     })
400     .filter(None, log::LevelFilter::Trace)
401     .init();*/
402 
403     // Create a network with two interfaces
404     let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
405         cidr: "0.0.0.0/0".to_owned(),
406         ..Default::default()
407     })?));
408 
409     let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
410         static_ips: vec!["192.168.0.1".to_owned()],
411         ..Default::default()
412     })));
413     let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
414         static_ips: vec!["192.168.0.2".to_owned()],
415         ..Default::default()
416     })));
417 
418     connect_net2router(&net0, &wan).await?;
419     connect_net2router(&net1, &wan).await?;
420     start_router(&wan).await?;
421 
422     let (a_notifier, mut a_connected) = on_connected();
423     let (b_notifier, mut b_connected) = on_connected();
424 
425     let keepalive_interval = Some(Duration::from_secs(3600)); //time.Hour
426     let check_interval = Duration::from_secs(3600); //time.Hour
427     let cfg0 = AgentConfig {
428         network_types: supported_network_types(),
429         multicast_dns_mode: MulticastDnsMode::Disabled,
430         net: Some(net0),
431 
432         keepalive_interval,
433         check_interval,
434         ..Default::default()
435     };
436 
437     let a_agent = Arc::new(Agent::new(cfg0).await?);
438     a_agent.on_connection_state_change(a_notifier);
439 
440     let cfg1 = AgentConfig {
441         network_types: supported_network_types(),
442         multicast_dns_mode: MulticastDnsMode::Disabled,
443         net: Some(net1),
444 
445         keepalive_interval,
446         check_interval,
447         ..Default::default()
448     };
449 
450     let b_agent = Arc::new(Agent::new(cfg1).await?);
451     b_agent.on_connection_state_change(b_notifier);
452 
453     // Manual signaling
454     let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials().await;
455     let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials().await;
456 
457     gather_and_exchange_candidates(&a_agent, &b_agent).await?;
458 
459     let (accepted_tx, mut accepted_rx) = mpsc::channel::<()>(1);
460     let (accepting_tx, mut accepting_rx) = mpsc::channel::<()>(1);
461     let (_a_cancel_tx, a_cancel_rx) = mpsc::channel(1);
462     let (_b_cancel_tx, b_cancel_rx) = mpsc::channel(1);
463 
464     let accepting_tx = Arc::new(Mutex::new(Some(accepting_tx)));
465     a_agent.on_connection_state_change(Box::new(move |s: ConnectionState| {
466         let accepted_tx_clone = Arc::clone(&accepting_tx);
467         Box::pin(async move {
468             if s == ConnectionState::Checking {
469                 let mut tx = accepted_tx_clone.lock().await;
470                 tx.take();
471             }
472         })
473     }));
474 
475     tokio::spawn(async move {
476         let result = a_agent.accept(a_cancel_rx, b_ufrag, b_pwd).await;
477         assert!(result.is_ok(), "agent accept expected OK");
478         drop(accepted_tx);
479     });
480 
481     let _ = accepting_rx.recv().await;
482 
483     let _ = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd).await?;
484 
485     // Ensure accepted
486     let _ = accepted_rx.recv().await;
487 
488     // Ensure pair selected
489     // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
490     let _ = a_connected.recv().await;
491     let _ = b_connected.recv().await;
492 
493     {
494         let mut w = wan.lock().await;
495         w.stop().await?;
496     }
497 
498     Ok(())
499 }
500 
501 #[tokio::test]
test_connectivity_lite() -> Result<()>502 async fn test_connectivity_lite() -> Result<()> {
503     /*env_logger::Builder::new()
504     .format(|buf, record| {
505         writeln!(
506             buf,
507             "{}:{} [{}] {} - {}",
508             record.file().unwrap_or("unknown"),
509             record.line().unwrap_or(0),
510             record.level(),
511             chrono::Local::now().format("%H:%M:%S.%6f"),
512             record.args()
513         )
514     })
515     .filter(None, log::LevelFilter::Trace)
516     .init();*/
517 
518     let stun_server_url = Url {
519         scheme: SchemeType::Stun,
520         host: "1.2.3.4".to_owned(),
521         port: 3478,
522         proto: ProtoType::Udp,
523         ..Default::default()
524     };
525 
526     let nat_type = nat::NatType {
527         mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
528         filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
529         ..Default::default()
530     };
531 
532     let v = build_vnet(nat_type, nat_type).await?;
533 
534     let (a_notifier, mut a_connected) = on_connected();
535     let (b_notifier, mut b_connected) = on_connected();
536 
537     let cfg0 = AgentConfig {
538         urls: vec![stun_server_url],
539         network_types: supported_network_types(),
540         multicast_dns_mode: MulticastDnsMode::Disabled,
541         net: Some(Arc::clone(&v.net0)),
542         ..Default::default()
543     };
544 
545     let a_agent = Arc::new(Agent::new(cfg0).await?);
546     a_agent.on_connection_state_change(a_notifier);
547 
548     let cfg1 = AgentConfig {
549         urls: vec![],
550         lite: true,
551         candidate_types: vec![CandidateType::Host],
552         network_types: supported_network_types(),
553         multicast_dns_mode: MulticastDnsMode::Disabled,
554         net: Some(Arc::clone(&v.net1)),
555         ..Default::default()
556     };
557 
558     let b_agent = Arc::new(Agent::new(cfg1).await?);
559     b_agent.on_connection_state_change(b_notifier);
560 
561     let _ = connect_with_vnet(&a_agent, &b_agent).await?;
562 
563     // Ensure pair selected
564     // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
565     let _ = a_connected.recv().await;
566     let _ = b_connected.recv().await;
567 
568     v.close().await?;
569 
570     Ok(())
571 }
572 
573 struct MockPacketConn;
574 
575 #[async_trait]
576 impl Conn for MockPacketConn {
connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error>577     async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> {
578         Ok(())
579     }
580 
recv(&self, _buf: &mut [u8]) -> std::result::Result<usize, util::Error>581     async fn recv(&self, _buf: &mut [u8]) -> std::result::Result<usize, util::Error> {
582         Ok(0)
583     }
584 
recv_from( &self, _buf: &mut [u8], ) -> std::result::Result<(usize, SocketAddr), util::Error>585     async fn recv_from(
586         &self,
587         _buf: &mut [u8],
588     ) -> std::result::Result<(usize, SocketAddr), util::Error> {
589         Ok((0, SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)))
590     }
591 
send(&self, _buf: &[u8]) -> std::result::Result<usize, util::Error>592     async fn send(&self, _buf: &[u8]) -> std::result::Result<usize, util::Error> {
593         Ok(0)
594     }
595 
send_to( &self, _buf: &[u8], _target: SocketAddr, ) -> std::result::Result<usize, util::Error>596     async fn send_to(
597         &self,
598         _buf: &[u8],
599         _target: SocketAddr,
600     ) -> std::result::Result<usize, util::Error> {
601         Ok(0)
602     }
603 
local_addr(&self) -> std::result::Result<SocketAddr, util::Error>604     fn local_addr(&self) -> std::result::Result<SocketAddr, util::Error> {
605         Ok(SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0))
606     }
607 
remote_addr(&self) -> Option<SocketAddr>608     fn remote_addr(&self) -> Option<SocketAddr> {
609         None
610     }
611 
close(&self) -> std::result::Result<(), util::Error>612     async fn close(&self) -> std::result::Result<(), util::Error> {
613         Ok(())
614     }
615 }
616 
build_msg(c: MessageClass, username: String, key: String) -> Result<Message>617 fn build_msg(c: MessageClass, username: String, key: String) -> Result<Message> {
618     let mut msg = Message::new();
619     msg.build(&[
620         Box::new(MessageType::new(METHOD_BINDING, c)),
621         Box::new(TransactionId::new()),
622         Box::new(Username::new(ATTR_USERNAME, username)),
623         Box::new(MessageIntegrity::new_short_term_integrity(key)),
624         Box::new(FINGERPRINT),
625     ])?;
626     Ok(msg)
627 }
628 
629 #[tokio::test]
test_inbound_validity() -> Result<()>630 async fn test_inbound_validity() -> Result<()> {
631     /*env_logger::Builder::new()
632     .format(|buf, record| {
633         writeln!(
634             buf,
635             "{}:{} [{}] {} - {}",
636             record.file().unwrap_or("unknown"),
637             record.line().unwrap_or(0),
638             record.level(),
639             chrono::Local::now().format("%H:%M:%S.%6f"),
640             record.args()
641         )
642     })
643     .filter(None, log::LevelFilter::Trace)
644     .init();*/
645 
646     let remote = SocketAddr::from_str("172.17.0.3:999")?;
647     let local: Arc<dyn Candidate + Send + Sync> = Arc::new(
648         CandidateHostConfig {
649             base_config: CandidateBaseConfig {
650                 network: "udp".to_owned(),
651                 address: "192.168.0.2".to_owned(),
652                 port: 777,
653                 component: 1,
654                 conn: Some(Arc::new(MockPacketConn {})),
655                 ..Default::default()
656             },
657             ..Default::default()
658         }
659         .new_candidate_host()?,
660     );
661 
662     //"Invalid Binding requests should be discarded"
663     {
664         let a = Agent::new(AgentConfig::default()).await?;
665 
666         {
667             let local_pwd = {
668                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
669                 ufrag_pwd.local_pwd.clone()
670             };
671             a.internal
672                 .handle_inbound(
673                     &mut build_msg(CLASS_REQUEST, "invalid".to_owned(), local_pwd)?,
674                     &local,
675                     remote,
676                 )
677                 .await;
678             {
679                 let remote_candidates = a.internal.remote_candidates.lock().await;
680                 assert_ne!(
681                     remote_candidates.len(),
682                     1,
683                     "Binding with invalid Username was able to create prflx candidate"
684                 );
685             }
686 
687             let username = {
688                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
689                 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
690             };
691             a.internal
692                 .handle_inbound(
693                     &mut build_msg(CLASS_REQUEST, username, "Invalid".to_owned())?,
694                     &local,
695                     remote,
696                 )
697                 .await;
698             {
699                 let remote_candidates = a.internal.remote_candidates.lock().await;
700                 assert_ne!(
701                     remote_candidates.len(),
702                     1,
703                     "Binding with invalid MessageIntegrity was able to create prflx candidate"
704                 );
705             }
706         }
707 
708         a.close().await?;
709     }
710 
711     //"Invalid Binding success responses should be discarded"
712     {
713         let a = Agent::new(AgentConfig::default()).await?;
714 
715         {
716             let username = {
717                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
718                 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
719             };
720             a.internal
721                 .handle_inbound(
722                     &mut build_msg(CLASS_SUCCESS_RESPONSE, username, "Invalid".to_owned())?,
723                     &local,
724                     remote,
725                 )
726                 .await;
727             {
728                 let remote_candidates = a.internal.remote_candidates.lock().await;
729                 assert_ne!(
730                     remote_candidates.len(),
731                     1,
732                     "Binding with invalid Username was able to create prflx candidate"
733                 );
734             }
735         }
736 
737         a.close().await?;
738     }
739 
740     //"Discard non-binding messages"
741     {
742         let a = Agent::new(AgentConfig::default()).await?;
743 
744         {
745             let username = {
746                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
747                 format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag)
748             };
749             a.internal
750                 .handle_inbound(
751                     &mut build_msg(CLASS_ERROR_RESPONSE, username, "Invalid".to_owned())?,
752                     &local,
753                     remote,
754                 )
755                 .await;
756             let remote_candidates = a.internal.remote_candidates.lock().await;
757             assert_ne!(
758                 remote_candidates.len(),
759                 1,
760                 "non-binding message was able to create prflxRemote"
761             );
762         }
763 
764         a.close().await?;
765     }
766 
767     //"Valid bind request"
768     {
769         let a = Agent::new(AgentConfig::default()).await?;
770 
771         {
772             let (username, local_pwd) = {
773                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
774                 (
775                     format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag),
776                     ufrag_pwd.local_pwd.clone(),
777                 )
778             };
779             a.internal
780                 .handle_inbound(
781                     &mut build_msg(CLASS_REQUEST, username, local_pwd)?,
782                     &local,
783                     remote,
784                 )
785                 .await;
786             let remote_candidates = a.internal.remote_candidates.lock().await;
787             assert_eq!(
788                 remote_candidates.len(),
789                 1,
790                 "Binding with valid values was unable to create prflx candidate"
791             );
792         }
793 
794         a.close().await?;
795     }
796 
797     //"Valid bind without fingerprint"
798     {
799         let a = Agent::new(AgentConfig::default()).await?;
800 
801         {
802             let (username, local_pwd) = {
803                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
804                 (
805                     format!("{}:{}", ufrag_pwd.local_ufrag, ufrag_pwd.remote_ufrag),
806                     ufrag_pwd.local_pwd.clone(),
807                 )
808             };
809 
810             let mut msg = Message::new();
811             msg.build(&[
812                 Box::new(BINDING_REQUEST),
813                 Box::new(TransactionId::new()),
814                 Box::new(Username::new(ATTR_USERNAME, username)),
815                 Box::new(MessageIntegrity::new_short_term_integrity(local_pwd)),
816             ])?;
817 
818             a.internal.handle_inbound(&mut msg, &local, remote).await;
819             let remote_candidates = a.internal.remote_candidates.lock().await;
820             assert_eq!(
821                 remote_candidates.len(),
822                 1,
823                 "Binding with valid values (but no fingerprint) was unable to create prflx candidate"
824             );
825         }
826 
827         a.close().await?;
828     }
829 
830     //"Success with invalid TransactionID"
831     {
832         let a = Agent::new(AgentConfig::default()).await?;
833 
834         {
835             let remote = SocketAddr::from_str("172.17.0.3:999")?;
836 
837             let mut t_id = TransactionId::default();
838             t_id.0[..3].copy_from_slice(b"ABC");
839 
840             let remote_pwd = {
841                 let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
842                 ufrag_pwd.remote_pwd.clone()
843             };
844 
845             let mut msg = Message::new();
846             msg.build(&[
847                 Box::new(BINDING_SUCCESS),
848                 Box::new(t_id),
849                 Box::new(MessageIntegrity::new_short_term_integrity(remote_pwd)),
850                 Box::new(FINGERPRINT),
851             ])?;
852 
853             a.internal.handle_inbound(&mut msg, &local, remote).await;
854 
855             {
856                 let remote_candidates = a.internal.remote_candidates.lock().await;
857                 assert_eq!(
858                     remote_candidates.len(),
859                     0,
860                     "unknown remote was able to create a candidate"
861                 );
862             }
863         }
864 
865         a.close().await?;
866     }
867 
868     Ok(())
869 }
870 
871 #[tokio::test]
test_invalid_agent_starts() -> Result<()>872 async fn test_invalid_agent_starts() -> Result<()> {
873     let a = Agent::new(AgentConfig::default()).await?;
874 
875     let (_cancel_tx1, cancel_rx1) = mpsc::channel(1);
876     let result = a.dial(cancel_rx1, "".to_owned(), "bar".to_owned()).await;
877     assert!(result.is_err());
878     if let Err(err) = result {
879         assert_eq!(Error::ErrRemoteUfragEmpty, err);
880     }
881 
882     let (_cancel_tx2, cancel_rx2) = mpsc::channel(1);
883     let result = a.dial(cancel_rx2, "foo".to_owned(), "".to_owned()).await;
884     assert!(result.is_err());
885     if let Err(err) = result {
886         assert_eq!(Error::ErrRemotePwdEmpty, err);
887     }
888 
889     let (cancel_tx3, cancel_rx3) = mpsc::channel(1);
890     tokio::spawn(async move {
891         tokio::time::sleep(Duration::from_millis(100)).await;
892         drop(cancel_tx3);
893     });
894 
895     let result = a.dial(cancel_rx3, "foo".to_owned(), "bar".to_owned()).await;
896     assert!(result.is_err());
897     if let Err(err) = result {
898         assert_eq!(Error::ErrCanceledByCaller, err);
899     }
900 
901     let (_cancel_tx4, cancel_rx4) = mpsc::channel(1);
902     let result = a.dial(cancel_rx4, "foo".to_owned(), "bar".to_owned()).await;
903     assert!(result.is_err());
904     if let Err(err) = result {
905         assert_eq!(Error::ErrMultipleStart, err);
906     }
907 
908     a.close().await?;
909 
910     Ok(())
911 }
912 
913 //use std::io::Write;
914 
915 // Assert that Agent emits Connecting/Connected/Disconnected/Failed/Closed messages
916 #[tokio::test]
test_connection_state_callback() -> Result<()>917 async fn test_connection_state_callback() -> Result<()> {
918     /*env_logger::Builder::new()
919     .format(|buf, record| {
920         writeln!(
921             buf,
922             "{}:{} [{}] {} - {}",
923             record.file().unwrap_or("unknown"),
924             record.line().unwrap_or(0),
925             record.level(),
926             chrono::Local::now().format("%H:%M:%S.%6f"),
927             record.args()
928         )
929     })
930     .filter(None, log::LevelFilter::Trace)
931     .init();*/
932 
933     let disconnected_duration = Duration::from_secs(1);
934     let failed_duration = Duration::from_secs(1);
935     let keepalive_interval = Duration::from_secs(0);
936 
937     let cfg0 = AgentConfig {
938         urls: vec![],
939         network_types: supported_network_types(),
940         disconnected_timeout: Some(disconnected_duration),
941         failed_timeout: Some(failed_duration),
942         keepalive_interval: Some(keepalive_interval),
943         ..Default::default()
944     };
945 
946     let cfg1 = AgentConfig {
947         urls: vec![],
948         network_types: supported_network_types(),
949         disconnected_timeout: Some(disconnected_duration),
950         failed_timeout: Some(failed_duration),
951         keepalive_interval: Some(keepalive_interval),
952         ..Default::default()
953     };
954 
955     let a_agent = Arc::new(Agent::new(cfg0).await?);
956     let b_agent = Arc::new(Agent::new(cfg1).await?);
957 
958     let (is_checking_tx, mut is_checking_rx) = mpsc::channel::<()>(1);
959     let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1);
960     let (is_disconnected_tx, mut is_disconnected_rx) = mpsc::channel::<()>(1);
961     let (is_failed_tx, mut is_failed_rx) = mpsc::channel::<()>(1);
962     let (is_closed_tx, mut is_closed_rx) = mpsc::channel::<()>(1);
963 
964     let is_checking_tx = Arc::new(Mutex::new(Some(is_checking_tx)));
965     let is_connected_tx = Arc::new(Mutex::new(Some(is_connected_tx)));
966     let is_disconnected_tx = Arc::new(Mutex::new(Some(is_disconnected_tx)));
967     let is_failed_tx = Arc::new(Mutex::new(Some(is_failed_tx)));
968     let is_closed_tx = Arc::new(Mutex::new(Some(is_closed_tx)));
969 
970     a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
971         let is_checking_tx_clone = Arc::clone(&is_checking_tx);
972         let is_connected_tx_clone = Arc::clone(&is_connected_tx);
973         let is_disconnected_tx_clone = Arc::clone(&is_disconnected_tx);
974         let is_failed_tx_clone = Arc::clone(&is_failed_tx);
975         let is_closed_tx_clone = Arc::clone(&is_closed_tx);
976         Box::pin(async move {
977             match c {
978                 ConnectionState::Checking => {
979                     log::debug!("drop is_checking_tx");
980                     let mut tx = is_checking_tx_clone.lock().await;
981                     tx.take();
982                 }
983                 ConnectionState::Connected => {
984                     log::debug!("drop is_connected_tx");
985                     let mut tx = is_connected_tx_clone.lock().await;
986                     tx.take();
987                 }
988                 ConnectionState::Disconnected => {
989                     log::debug!("drop is_disconnected_tx");
990                     let mut tx = is_disconnected_tx_clone.lock().await;
991                     tx.take();
992                 }
993                 ConnectionState::Failed => {
994                     log::debug!("drop is_failed_tx");
995                     let mut tx = is_failed_tx_clone.lock().await;
996                     tx.take();
997                 }
998                 ConnectionState::Closed => {
999                     log::debug!("drop is_closed_tx");
1000                     let mut tx = is_closed_tx_clone.lock().await;
1001                     tx.take();
1002                 }
1003                 _ => {}
1004             };
1005         })
1006     }));
1007 
1008     connect_with_vnet(&a_agent, &b_agent).await?;
1009 
1010     log::debug!("wait is_checking_tx");
1011     let _ = is_checking_rx.recv().await;
1012     log::debug!("wait is_connected_rx");
1013     let _ = is_connected_rx.recv().await;
1014     log::debug!("wait is_disconnected_rx");
1015     let _ = is_disconnected_rx.recv().await;
1016     log::debug!("wait is_failed_rx");
1017     let _ = is_failed_rx.recv().await;
1018 
1019     a_agent.close().await?;
1020     b_agent.close().await?;
1021 
1022     log::debug!("wait is_closed_rx");
1023     let _ = is_closed_rx.recv().await;
1024 
1025     Ok(())
1026 }
1027 
1028 #[tokio::test]
test_invalid_gather() -> Result<()>1029 async fn test_invalid_gather() -> Result<()> {
1030     //"Gather with no OnCandidate should error"
1031     let a = Agent::new(AgentConfig::default()).await?;
1032 
1033     if let Err(err) = a.gather_candidates() {
1034         assert_eq!(
1035             Error::ErrNoOnCandidateHandler,
1036             err,
1037             "trickle GatherCandidates succeeded without OnCandidate"
1038         );
1039     }
1040 
1041     a.close().await?;
1042 
1043     Ok(())
1044 }
1045 
1046 #[tokio::test]
test_candidate_pair_stats() -> Result<()>1047 async fn test_candidate_pair_stats() -> Result<()> {
1048     let a = Agent::new(AgentConfig::default()).await?;
1049 
1050     let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1051         CandidateHostConfig {
1052             base_config: CandidateBaseConfig {
1053                 network: "udp".to_owned(),
1054                 address: "192.168.1.1".to_owned(),
1055                 port: 19216,
1056                 component: 1,
1057                 ..Default::default()
1058             },
1059             ..Default::default()
1060         }
1061         .new_candidate_host()?,
1062     );
1063 
1064     let relay_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1065         CandidateRelayConfig {
1066             base_config: CandidateBaseConfig {
1067                 network: "udp".to_owned(),
1068                 address: "1.2.3.4".to_owned(),
1069                 port: 2340,
1070                 component: 1,
1071                 ..Default::default()
1072             },
1073             rel_addr: "4.3.2.1".to_owned(),
1074             rel_port: 43210,
1075             ..Default::default()
1076         }
1077         .new_candidate_relay()?,
1078     );
1079 
1080     let srflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1081         CandidateServerReflexiveConfig {
1082             base_config: CandidateBaseConfig {
1083                 network: "udp".to_owned(),
1084                 address: "10.10.10.2".to_owned(),
1085                 port: 19218,
1086                 component: 1,
1087                 ..Default::default()
1088             },
1089             rel_addr: "4.3.2.1".to_owned(),
1090             rel_port: 43212,
1091         }
1092         .new_candidate_server_reflexive()?,
1093     );
1094 
1095     let prflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1096         CandidatePeerReflexiveConfig {
1097             base_config: CandidateBaseConfig {
1098                 network: "udp".to_owned(),
1099                 address: "10.10.10.2".to_owned(),
1100                 port: 19217,
1101                 component: 1,
1102                 ..Default::default()
1103             },
1104             rel_addr: "4.3.2.1".to_owned(),
1105             rel_port: 43211,
1106         }
1107         .new_candidate_peer_reflexive()?,
1108     );
1109 
1110     let host_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1111         CandidateHostConfig {
1112             base_config: CandidateBaseConfig {
1113                 network: "udp".to_owned(),
1114                 address: "1.2.3.5".to_owned(),
1115                 port: 12350,
1116                 component: 1,
1117                 ..Default::default()
1118             },
1119             ..Default::default()
1120         }
1121         .new_candidate_host()?,
1122     );
1123 
1124     for remote in &[
1125         Arc::clone(&relay_remote),
1126         Arc::clone(&srflx_remote),
1127         Arc::clone(&prflx_remote),
1128         Arc::clone(&host_remote),
1129     ] {
1130         let p = a.internal.find_pair(&host_local, remote).await;
1131 
1132         if p.is_none() {
1133             a.internal
1134                 .add_pair(Arc::clone(&host_local), Arc::clone(remote))
1135                 .await;
1136         }
1137     }
1138 
1139     {
1140         if let Some(p) = a.internal.find_pair(&host_local, &prflx_remote).await {
1141             p.state
1142                 .store(CandidatePairState::Failed as u8, Ordering::SeqCst);
1143         }
1144     }
1145 
1146     let stats = a.get_candidate_pairs_stats().await;
1147     assert_eq!(stats.len(), 4, "expected 4 candidate pairs stats");
1148 
1149     let (mut relay_pair_stat, mut srflx_pair_stat, mut prflx_pair_stat, mut host_pair_stat) = (
1150         CandidatePairStats::default(),
1151         CandidatePairStats::default(),
1152         CandidatePairStats::default(),
1153         CandidatePairStats::default(),
1154     );
1155 
1156     for cps in stats {
1157         assert_eq!(
1158             cps.local_candidate_id,
1159             host_local.id(),
1160             "invalid local candidate id"
1161         );
1162 
1163         if cps.remote_candidate_id == relay_remote.id() {
1164             relay_pair_stat = cps;
1165         } else if cps.remote_candidate_id == srflx_remote.id() {
1166             srflx_pair_stat = cps;
1167         } else if cps.remote_candidate_id == prflx_remote.id() {
1168             prflx_pair_stat = cps;
1169         } else if cps.remote_candidate_id == host_remote.id() {
1170             host_pair_stat = cps;
1171         } else {
1172             panic!("invalid remote candidate ID");
1173         }
1174     }
1175 
1176     assert_eq!(
1177         relay_pair_stat.remote_candidate_id,
1178         relay_remote.id(),
1179         "missing host-relay pair stat"
1180     );
1181     assert_eq!(
1182         srflx_pair_stat.remote_candidate_id,
1183         srflx_remote.id(),
1184         "missing host-srflx pair stat"
1185     );
1186     assert_eq!(
1187         prflx_pair_stat.remote_candidate_id,
1188         prflx_remote.id(),
1189         "missing host-prflx pair stat"
1190     );
1191     assert_eq!(
1192         host_pair_stat.remote_candidate_id,
1193         host_remote.id(),
1194         "missing host-host pair stat"
1195     );
1196     assert_eq!(
1197         prflx_pair_stat.state,
1198         CandidatePairState::Failed,
1199         "expected host-prfflx pair to have state failed, it has state {} instead",
1200         prflx_pair_stat.state
1201     );
1202 
1203     a.close().await?;
1204 
1205     Ok(())
1206 }
1207 
1208 #[tokio::test]
test_local_candidate_stats() -> Result<()>1209 async fn test_local_candidate_stats() -> Result<()> {
1210     let a = Agent::new(AgentConfig::default()).await?;
1211 
1212     let host_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1213         CandidateHostConfig {
1214             base_config: CandidateBaseConfig {
1215                 network: "udp".to_owned(),
1216                 address: "192.168.1.1".to_owned(),
1217                 port: 19216,
1218                 component: 1,
1219                 ..Default::default()
1220             },
1221             ..Default::default()
1222         }
1223         .new_candidate_host()?,
1224     );
1225 
1226     let srflx_local: Arc<dyn Candidate + Send + Sync> = Arc::new(
1227         CandidateServerReflexiveConfig {
1228             base_config: CandidateBaseConfig {
1229                 network: "udp".to_owned(),
1230                 address: "192.168.1.1".to_owned(),
1231                 port: 19217,
1232                 component: 1,
1233                 ..Default::default()
1234             },
1235             rel_addr: "4.3.2.1".to_owned(),
1236             rel_port: 43212,
1237         }
1238         .new_candidate_server_reflexive()?,
1239     );
1240 
1241     {
1242         let mut local_candidates = a.internal.local_candidates.lock().await;
1243         local_candidates.insert(
1244             NetworkType::Udp4,
1245             vec![Arc::clone(&host_local), Arc::clone(&srflx_local)],
1246         );
1247     }
1248 
1249     let local_stats = a.get_local_candidates_stats().await;
1250     assert_eq!(
1251         local_stats.len(),
1252         2,
1253         "expected 2 local candidates stats, got {} instead",
1254         local_stats.len()
1255     );
1256 
1257     let (mut host_local_stat, mut srflx_local_stat) =
1258         (CandidateStats::default(), CandidateStats::default());
1259     for stats in local_stats {
1260         let candidate = if stats.id == host_local.id() {
1261             host_local_stat = stats.clone();
1262             Arc::clone(&host_local)
1263         } else if stats.id == srflx_local.id() {
1264             srflx_local_stat = stats.clone();
1265             Arc::clone(&srflx_local)
1266         } else {
1267             panic!("invalid local candidate ID");
1268         };
1269 
1270         assert_eq!(
1271             stats.candidate_type,
1272             candidate.candidate_type(),
1273             "invalid stats CandidateType"
1274         );
1275         assert_eq!(
1276             stats.priority,
1277             candidate.priority(),
1278             "invalid stats CandidateType"
1279         );
1280         assert_eq!(stats.ip, candidate.address(), "invalid stats IP");
1281     }
1282 
1283     assert_eq!(
1284         host_local_stat.id,
1285         host_local.id(),
1286         "missing host local stat"
1287     );
1288     assert_eq!(
1289         srflx_local_stat.id,
1290         srflx_local.id(),
1291         "missing srflx local stat"
1292     );
1293 
1294     a.close().await?;
1295 
1296     Ok(())
1297 }
1298 
1299 #[tokio::test]
test_remote_candidate_stats() -> Result<()>1300 async fn test_remote_candidate_stats() -> Result<()> {
1301     let a = Agent::new(AgentConfig::default()).await?;
1302 
1303     let relay_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1304         CandidateRelayConfig {
1305             base_config: CandidateBaseConfig {
1306                 network: "udp".to_owned(),
1307                 address: "1.2.3.4".to_owned(),
1308                 port: 12340,
1309                 component: 1,
1310                 ..Default::default()
1311             },
1312             rel_addr: "4.3.2.1".to_owned(),
1313             rel_port: 43210,
1314             ..Default::default()
1315         }
1316         .new_candidate_relay()?,
1317     );
1318 
1319     let srflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1320         CandidateServerReflexiveConfig {
1321             base_config: CandidateBaseConfig {
1322                 network: "udp".to_owned(),
1323                 address: "10.10.10.2".to_owned(),
1324                 port: 19218,
1325                 component: 1,
1326                 ..Default::default()
1327             },
1328             rel_addr: "4.3.2.1".to_owned(),
1329             rel_port: 43212,
1330         }
1331         .new_candidate_server_reflexive()?,
1332     );
1333 
1334     let prflx_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1335         CandidatePeerReflexiveConfig {
1336             base_config: CandidateBaseConfig {
1337                 network: "udp".to_owned(),
1338                 address: "10.10.10.2".to_owned(),
1339                 port: 19217,
1340                 component: 1,
1341                 ..Default::default()
1342             },
1343             rel_addr: "4.3.2.1".to_owned(),
1344             rel_port: 43211,
1345         }
1346         .new_candidate_peer_reflexive()?,
1347     );
1348 
1349     let host_remote: Arc<dyn Candidate + Send + Sync> = Arc::new(
1350         CandidateHostConfig {
1351             base_config: CandidateBaseConfig {
1352                 network: "udp".to_owned(),
1353                 address: "1.2.3.5".to_owned(),
1354                 port: 12350,
1355                 component: 1,
1356                 ..Default::default()
1357             },
1358             ..Default::default()
1359         }
1360         .new_candidate_host()?,
1361     );
1362 
1363     {
1364         let mut remote_candidates = a.internal.remote_candidates.lock().await;
1365         remote_candidates.insert(
1366             NetworkType::Udp4,
1367             vec![
1368                 Arc::clone(&relay_remote),
1369                 Arc::clone(&srflx_remote),
1370                 Arc::clone(&prflx_remote),
1371                 Arc::clone(&host_remote),
1372             ],
1373         );
1374     }
1375 
1376     let remote_stats = a.get_remote_candidates_stats().await;
1377     assert_eq!(
1378         remote_stats.len(),
1379         4,
1380         "expected 4 remote candidates stats, got {} instead",
1381         remote_stats.len()
1382     );
1383 
1384     let (mut relay_remote_stat, mut srflx_remote_stat, mut prflx_remote_stat, mut host_remote_stat) = (
1385         CandidateStats::default(),
1386         CandidateStats::default(),
1387         CandidateStats::default(),
1388         CandidateStats::default(),
1389     );
1390     for stats in remote_stats {
1391         let candidate = if stats.id == relay_remote.id() {
1392             relay_remote_stat = stats.clone();
1393             Arc::clone(&relay_remote)
1394         } else if stats.id == srflx_remote.id() {
1395             srflx_remote_stat = stats.clone();
1396             Arc::clone(&srflx_remote)
1397         } else if stats.id == prflx_remote.id() {
1398             prflx_remote_stat = stats.clone();
1399             Arc::clone(&prflx_remote)
1400         } else if stats.id == host_remote.id() {
1401             host_remote_stat = stats.clone();
1402             Arc::clone(&host_remote)
1403         } else {
1404             panic!("invalid remote candidate ID");
1405         };
1406 
1407         assert_eq!(
1408             stats.candidate_type,
1409             candidate.candidate_type(),
1410             "invalid stats CandidateType"
1411         );
1412         assert_eq!(
1413             stats.priority,
1414             candidate.priority(),
1415             "invalid stats CandidateType"
1416         );
1417         assert_eq!(stats.ip, candidate.address(), "invalid stats IP");
1418     }
1419 
1420     assert_eq!(
1421         relay_remote_stat.id,
1422         relay_remote.id(),
1423         "missing relay remote stat"
1424     );
1425     assert_eq!(
1426         srflx_remote_stat.id,
1427         srflx_remote.id(),
1428         "missing srflx remote stat"
1429     );
1430     assert_eq!(
1431         prflx_remote_stat.id,
1432         prflx_remote.id(),
1433         "missing prflx remote stat"
1434     );
1435     assert_eq!(
1436         host_remote_stat.id,
1437         host_remote.id(),
1438         "missing host remote stat"
1439     );
1440 
1441     a.close().await?;
1442 
1443     Ok(())
1444 }
1445 
1446 #[tokio::test]
test_init_ext_ip_mapping() -> Result<()>1447 async fn test_init_ext_ip_mapping() -> Result<()> {
1448     // a.extIPMapper should be nil by default
1449     let a = Agent::new(AgentConfig::default()).await?;
1450     assert!(
1451         a.ext_ip_mapper.is_none(),
1452         "a.extIPMapper should be none by default"
1453     );
1454     a.close().await?;
1455 
1456     // a.extIPMapper should be nil when NAT1To1IPs is a non-nil empty array
1457     let a = Agent::new(AgentConfig {
1458         nat_1to1_ips: vec![],
1459         nat_1to1_ip_candidate_type: CandidateType::Host,
1460         ..Default::default()
1461     })
1462     .await?;
1463     assert!(
1464         a.ext_ip_mapper.is_none(),
1465         "a.extIPMapper should be none by default"
1466     );
1467     a.close().await?;
1468 
1469     // NewAgent should return an error when 1:1 NAT for host candidate is enabled
1470     // but the candidate type does not appear in the CandidateTypes.
1471     if let Err(err) = Agent::new(AgentConfig {
1472         nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1473         nat_1to1_ip_candidate_type: CandidateType::Host,
1474         candidate_types: vec![CandidateType::Relay],
1475         ..Default::default()
1476     })
1477     .await
1478     {
1479         assert_eq!(
1480             Error::ErrIneffectiveNat1to1IpMappingHost,
1481             err,
1482             "Unexpected error: {err}"
1483         );
1484     } else {
1485         panic!("expected error, but got ok");
1486     }
1487 
1488     // NewAgent should return an error when 1:1 NAT for srflx candidate is enabled
1489     // but the candidate type does not appear in the CandidateTypes.
1490     if let Err(err) = Agent::new(AgentConfig {
1491         nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1492         nat_1to1_ip_candidate_type: CandidateType::ServerReflexive,
1493         candidate_types: vec![CandidateType::Relay],
1494         ..Default::default()
1495     })
1496     .await
1497     {
1498         assert_eq!(
1499             Error::ErrIneffectiveNat1to1IpMappingSrflx,
1500             err,
1501             "Unexpected error: {err}"
1502         );
1503     } else {
1504         panic!("expected error, but got ok");
1505     }
1506 
1507     // NewAgent should return an error when 1:1 NAT for host candidate is enabled
1508     // along with mDNS with MulticastDNSModeQueryAndGather
1509     if let Err(err) = Agent::new(AgentConfig {
1510         nat_1to1_ips: vec!["1.2.3.4".to_owned()],
1511         nat_1to1_ip_candidate_type: CandidateType::Host,
1512         multicast_dns_mode: MulticastDnsMode::QueryAndGather,
1513         ..Default::default()
1514     })
1515     .await
1516     {
1517         assert_eq!(
1518             Error::ErrMulticastDnsWithNat1to1IpMapping,
1519             err,
1520             "Unexpected error: {err}"
1521         );
1522     } else {
1523         panic!("expected error, but got ok");
1524     }
1525 
1526     // NewAgent should return if newExternalIPMapper() returns an error.
1527     if let Err(err) = Agent::new(AgentConfig {
1528         nat_1to1_ips: vec!["bad.2.3.4".to_owned()], // bad IP
1529         nat_1to1_ip_candidate_type: CandidateType::Host,
1530         ..Default::default()
1531     })
1532     .await
1533     {
1534         assert_eq!(
1535             Error::ErrInvalidNat1to1IpMapping,
1536             err,
1537             "Unexpected error: {err}"
1538         );
1539     } else {
1540         panic!("expected error, but got ok");
1541     }
1542 
1543     Ok(())
1544 }
1545 
1546 #[tokio::test]
test_binding_request_timeout() -> Result<()>1547 async fn test_binding_request_timeout() -> Result<()> {
1548     const EXPECTED_REMOVAL_COUNT: usize = 2;
1549 
1550     let a = Agent::new(AgentConfig::default()).await?;
1551 
1552     let now = Instant::now();
1553     {
1554         {
1555             let mut pending_binding_requests = a.internal.pending_binding_requests.lock().await;
1556             pending_binding_requests.push(BindingRequest {
1557                 timestamp: now, // valid
1558                 ..Default::default()
1559             });
1560             pending_binding_requests.push(BindingRequest {
1561                 timestamp: now.sub(Duration::from_millis(3900)), // valid
1562                 ..Default::default()
1563             });
1564             pending_binding_requests.push(BindingRequest {
1565                 timestamp: now.sub(Duration::from_millis(4100)), // invalid
1566                 ..Default::default()
1567             });
1568             pending_binding_requests.push(BindingRequest {
1569                 timestamp: now.sub(Duration::from_secs(75)), // invalid
1570                 ..Default::default()
1571             });
1572         }
1573 
1574         a.internal.invalidate_pending_binding_requests(now).await;
1575         {
1576             let pending_binding_requests = a.internal.pending_binding_requests.lock().await;
1577             assert_eq!(pending_binding_requests.len(), EXPECTED_REMOVAL_COUNT, "Binding invalidation due to timeout did not remove the correct number of binding requests")
1578         }
1579     }
1580 
1581     a.close().await?;
1582 
1583     Ok(())
1584 }
1585 
1586 // test_agent_credentials checks if local username fragments and passwords (if set) meet RFC standard
1587 // and ensure it's backwards compatible with previous versions of the pion/ice
1588 #[tokio::test]
test_agent_credentials() -> Result<()>1589 async fn test_agent_credentials() -> Result<()> {
1590     // Agent should not require any of the usernames and password to be set
1591     // If set, they should follow the default 16/128 bits random number generator strategy
1592 
1593     let a = Agent::new(AgentConfig::default()).await?;
1594     {
1595         let ufrag_pwd = a.internal.ufrag_pwd.lock().await;
1596         assert!(ufrag_pwd.local_ufrag.as_bytes().len() * 8 >= 24);
1597         assert!(ufrag_pwd.local_pwd.as_bytes().len() * 8 >= 128);
1598     }
1599     a.close().await?;
1600 
1601     // Should honor RFC standards
1602     // Local values MUST be unguessable, with at least 128 bits of
1603     // random number generator output used to generate the password, and
1604     // at least 24 bits of output to generate the username fragment.
1605 
1606     if let Err(err) = Agent::new(AgentConfig {
1607         local_ufrag: "xx".to_owned(),
1608         ..Default::default()
1609     })
1610     .await
1611     {
1612         assert_eq!(Error::ErrLocalUfragInsufficientBits, err);
1613     } else {
1614         panic!("expected error, but got ok");
1615     }
1616 
1617     if let Err(err) = Agent::new(AgentConfig {
1618         local_pwd: "xxxxxx".to_owned(),
1619         ..Default::default()
1620     })
1621     .await
1622     {
1623         assert_eq!(Error::ErrLocalPwdInsufficientBits, err);
1624     } else {
1625         panic!("expected error, but got ok");
1626     }
1627 
1628     Ok(())
1629 }
1630 
1631 // Assert that Agent on Failure deletes all existing candidates
1632 // User can then do an ICE Restart to bring agent back
1633 #[tokio::test]
test_connection_state_failed_delete_all_candidates() -> Result<()>1634 async fn test_connection_state_failed_delete_all_candidates() -> Result<()> {
1635     let one_second = Duration::from_secs(1);
1636     let keepalive_interval = Duration::from_secs(0);
1637 
1638     let cfg0 = AgentConfig {
1639         network_types: supported_network_types(),
1640         disconnected_timeout: Some(one_second),
1641         failed_timeout: Some(one_second),
1642         keepalive_interval: Some(keepalive_interval),
1643         ..Default::default()
1644     };
1645     let cfg1 = AgentConfig {
1646         network_types: supported_network_types(),
1647         disconnected_timeout: Some(one_second),
1648         failed_timeout: Some(one_second),
1649         keepalive_interval: Some(keepalive_interval),
1650         ..Default::default()
1651     };
1652 
1653     let a_agent = Arc::new(Agent::new(cfg0).await?);
1654     let b_agent = Arc::new(Agent::new(cfg1).await?);
1655 
1656     let (is_failed_tx, mut is_failed_rx) = mpsc::channel::<()>(1);
1657     let is_failed_tx = Arc::new(Mutex::new(Some(is_failed_tx)));
1658     a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
1659         let is_failed_tx_clone = Arc::clone(&is_failed_tx);
1660         Box::pin(async move {
1661             if c == ConnectionState::Failed {
1662                 let mut tx = is_failed_tx_clone.lock().await;
1663                 tx.take();
1664             }
1665         })
1666     }));
1667 
1668     connect_with_vnet(&a_agent, &b_agent).await?;
1669     let _ = is_failed_rx.recv().await;
1670 
1671     {
1672         {
1673             let remote_candidates = a_agent.internal.remote_candidates.lock().await;
1674             assert_eq!(remote_candidates.len(), 0);
1675         }
1676         {
1677             let local_candidates = a_agent.internal.local_candidates.lock().await;
1678             assert_eq!(local_candidates.len(), 0);
1679         }
1680     }
1681 
1682     a_agent.close().await?;
1683     b_agent.close().await?;
1684 
1685     Ok(())
1686 }
1687 
1688 // Assert that the ICE Agent can go directly from Connecting -> Failed on both sides
1689 #[tokio::test]
test_connection_state_connecting_to_failed() -> Result<()>1690 async fn test_connection_state_connecting_to_failed() -> Result<()> {
1691     let one_second = Duration::from_secs(1);
1692     let keepalive_interval = Duration::from_secs(0);
1693 
1694     let cfg0 = AgentConfig {
1695         disconnected_timeout: Some(one_second),
1696         failed_timeout: Some(one_second),
1697         keepalive_interval: Some(keepalive_interval),
1698         ..Default::default()
1699     };
1700     let cfg1 = AgentConfig {
1701         disconnected_timeout: Some(one_second),
1702         failed_timeout: Some(one_second),
1703         keepalive_interval: Some(keepalive_interval),
1704         ..Default::default()
1705     };
1706 
1707     let a_agent = Arc::new(Agent::new(cfg0).await?);
1708     let b_agent = Arc::new(Agent::new(cfg1).await?);
1709 
1710     let is_failed = WaitGroup::new();
1711     let is_checking = WaitGroup::new();
1712 
1713     let connection_state_check = move |wf: Worker, wc: Worker| {
1714         let wf = Arc::new(Mutex::new(Some(wf)));
1715         let wc = Arc::new(Mutex::new(Some(wc)));
1716         let hdlr_fn: OnConnectionStateChangeHdlrFn = Box::new(move |c: ConnectionState| {
1717             let wf_clone = Arc::clone(&wf);
1718             let wc_clone = Arc::clone(&wc);
1719             Box::pin(async move {
1720                 if c == ConnectionState::Failed {
1721                     let mut f = wf_clone.lock().await;
1722                     f.take();
1723                 } else if c == ConnectionState::Checking {
1724                     let mut c = wc_clone.lock().await;
1725                     c.take();
1726                 } else if c == ConnectionState::Connected || c == ConnectionState::Completed {
1727                     panic!("Unexpected ConnectionState: {c}");
1728                 }
1729             })
1730         });
1731         hdlr_fn
1732     };
1733 
1734     let (wf1, wc1) = (is_failed.worker(), is_checking.worker());
1735     a_agent.on_connection_state_change(connection_state_check(wf1, wc1));
1736 
1737     let (wf2, wc2) = (is_failed.worker(), is_checking.worker());
1738     b_agent.on_connection_state_change(connection_state_check(wf2, wc2));
1739 
1740     let agent_a = Arc::clone(&a_agent);
1741     tokio::spawn(async move {
1742         let (_cancel_tx, cancel_rx) = mpsc::channel(1);
1743         let result = agent_a
1744             .accept(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned())
1745             .await;
1746         assert!(result.is_err());
1747     });
1748 
1749     let agent_b = Arc::clone(&b_agent);
1750     tokio::spawn(async move {
1751         let (_cancel_tx, cancel_rx) = mpsc::channel(1);
1752         let result = agent_b
1753             .dial(cancel_rx, "InvalidFrag".to_owned(), "InvalidPwd".to_owned())
1754             .await;
1755         assert!(result.is_err());
1756     });
1757 
1758     is_checking.wait().await;
1759     is_failed.wait().await;
1760 
1761     a_agent.close().await?;
1762     b_agent.close().await?;
1763 
1764     Ok(())
1765 }
1766 
1767 #[tokio::test]
test_agent_restart_during_gather() -> Result<()>1768 async fn test_agent_restart_during_gather() -> Result<()> {
1769     //"Restart During Gather"
1770 
1771     let agent = Agent::new(AgentConfig::default()).await?;
1772 
1773     agent
1774         .gathering_state
1775         .store(GatheringState::Gathering as u8, Ordering::SeqCst);
1776 
1777     if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await {
1778         assert_eq!(Error::ErrRestartWhenGathering, err);
1779     } else {
1780         panic!("expected error, but got ok");
1781     }
1782 
1783     agent.close().await?;
1784 
1785     Ok(())
1786 }
1787 
1788 #[tokio::test]
test_agent_restart_when_closed() -> Result<()>1789 async fn test_agent_restart_when_closed() -> Result<()> {
1790     //"Restart When Closed"
1791 
1792     let agent = Agent::new(AgentConfig::default()).await?;
1793     agent.close().await?;
1794 
1795     if let Err(err) = agent.restart("".to_owned(), "".to_owned()).await {
1796         assert_eq!(Error::ErrClosed, err);
1797     } else {
1798         panic!("expected error, but got ok");
1799     }
1800 
1801     Ok(())
1802 }
1803 
1804 #[tokio::test]
test_agent_restart_one_side() -> Result<()>1805 async fn test_agent_restart_one_side() -> Result<()> {
1806     let one_second = Duration::from_secs(1);
1807 
1808     //"Restart One Side"
1809     let (_, _, agent_a, agent_b) = pipe(
1810         Some(AgentConfig {
1811             disconnected_timeout: Some(one_second),
1812             failed_timeout: Some(one_second),
1813             ..Default::default()
1814         }),
1815         Some(AgentConfig {
1816             disconnected_timeout: Some(one_second),
1817             failed_timeout: Some(one_second),
1818             ..Default::default()
1819         }),
1820     )
1821     .await?;
1822 
1823     let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
1824     let cancel_tx = Arc::new(Mutex::new(Some(cancel_tx)));
1825     agent_b.on_connection_state_change(Box::new(move |c: ConnectionState| {
1826         let cancel_tx_clone = Arc::clone(&cancel_tx);
1827         Box::pin(async move {
1828             if c == ConnectionState::Failed || c == ConnectionState::Disconnected {
1829                 let mut tx = cancel_tx_clone.lock().await;
1830                 tx.take();
1831             }
1832         })
1833     }));
1834 
1835     agent_a.restart("".to_owned(), "".to_owned()).await?;
1836 
1837     let _ = cancel_rx.recv().await;
1838 
1839     agent_a.close().await?;
1840     agent_b.close().await?;
1841 
1842     Ok(())
1843 }
1844 
1845 #[tokio::test]
test_agent_restart_both_side() -> Result<()>1846 async fn test_agent_restart_both_side() -> Result<()> {
1847     let one_second = Duration::from_secs(1);
1848     //"Restart Both Sides"
1849 
1850     // Get all addresses of candidates concatenated
1851     let generate_candidate_address_strings =
1852         |res: Result<Vec<Arc<dyn Candidate + Send + Sync>>>| -> String {
1853             assert!(res.is_ok());
1854 
1855             let mut out = String::new();
1856             if let Ok(candidates) = res {
1857                 for c in candidates {
1858                     out += c.address().as_str();
1859                     out += ":";
1860                     out += c.port().to_string().as_str();
1861                 }
1862             }
1863             out
1864         };
1865 
1866     // Store the original candidates, confirm that after we reconnect we have new pairs
1867     let (_, _, agent_a, agent_b) = pipe(
1868         Some(AgentConfig {
1869             disconnected_timeout: Some(one_second),
1870             failed_timeout: Some(one_second),
1871             ..Default::default()
1872         }),
1873         Some(AgentConfig {
1874             disconnected_timeout: Some(one_second),
1875             failed_timeout: Some(one_second),
1876             ..Default::default()
1877         }),
1878     )
1879     .await?;
1880 
1881     let conn_afirst_candidates =
1882         generate_candidate_address_strings(agent_a.get_local_candidates().await);
1883     let conn_bfirst_candidates =
1884         generate_candidate_address_strings(agent_b.get_local_candidates().await);
1885 
1886     let (a_notifier, mut a_connected) = on_connected();
1887     agent_a.on_connection_state_change(a_notifier);
1888 
1889     let (b_notifier, mut b_connected) = on_connected();
1890     agent_b.on_connection_state_change(b_notifier);
1891 
1892     // Restart and Re-Signal
1893     agent_a.restart("".to_owned(), "".to_owned()).await?;
1894     agent_b.restart("".to_owned(), "".to_owned()).await?;
1895 
1896     // Exchange Candidates and Credentials
1897     let (ufrag, pwd) = agent_b.get_local_user_credentials().await;
1898     agent_a.set_remote_credentials(ufrag, pwd).await?;
1899 
1900     let (ufrag, pwd) = agent_a.get_local_user_credentials().await;
1901     agent_b.set_remote_credentials(ufrag, pwd).await?;
1902 
1903     gather_and_exchange_candidates(&agent_a, &agent_b).await?;
1904 
1905     // Wait until both have gone back to connected
1906     let _ = a_connected.recv().await;
1907     let _ = b_connected.recv().await;
1908 
1909     // Assert that we have new candiates each time
1910     assert_ne!(
1911         conn_afirst_candidates,
1912         generate_candidate_address_strings(agent_a.get_local_candidates().await)
1913     );
1914     assert_ne!(
1915         conn_bfirst_candidates,
1916         generate_candidate_address_strings(agent_b.get_local_candidates().await)
1917     );
1918 
1919     agent_a.close().await?;
1920     agent_b.close().await?;
1921 
1922     Ok(())
1923 }
1924 
1925 #[tokio::test]
test_get_remote_credentials() -> Result<()>1926 async fn test_get_remote_credentials() -> Result<()> {
1927     let a = Agent::new(AgentConfig::default()).await?;
1928 
1929     let (remote_ufrag, remote_pwd) = {
1930         let mut ufrag_pwd = a.internal.ufrag_pwd.lock().await;
1931         ufrag_pwd.remote_ufrag = "remoteUfrag".to_owned();
1932         ufrag_pwd.remote_pwd = "remotePwd".to_owned();
1933         (
1934             ufrag_pwd.remote_ufrag.to_owned(),
1935             ufrag_pwd.remote_pwd.to_owned(),
1936         )
1937     };
1938 
1939     let (actual_ufrag, actual_pwd) = a.get_remote_user_credentials().await;
1940 
1941     assert_eq!(actual_ufrag, remote_ufrag);
1942     assert_eq!(actual_pwd, remote_pwd);
1943 
1944     a.close().await?;
1945 
1946     Ok(())
1947 }
1948 
1949 #[tokio::test]
test_close_in_connection_state_callback() -> Result<()>1950 async fn test_close_in_connection_state_callback() -> Result<()> {
1951     let disconnected_duration = Duration::from_secs(1);
1952     let failed_duration = Duration::from_secs(1);
1953     let keepalive_interval = Duration::from_secs(0);
1954 
1955     let cfg0 = AgentConfig {
1956         urls: vec![],
1957         network_types: supported_network_types(),
1958         disconnected_timeout: Some(disconnected_duration),
1959         failed_timeout: Some(failed_duration),
1960         keepalive_interval: Some(keepalive_interval),
1961         check_interval: Duration::from_millis(500),
1962         ..Default::default()
1963     };
1964 
1965     let cfg1 = AgentConfig {
1966         urls: vec![],
1967         network_types: supported_network_types(),
1968         disconnected_timeout: Some(disconnected_duration),
1969         failed_timeout: Some(failed_duration),
1970         keepalive_interval: Some(keepalive_interval),
1971         check_interval: Duration::from_millis(500),
1972         ..Default::default()
1973     };
1974 
1975     let a_agent = Arc::new(Agent::new(cfg0).await?);
1976     let b_agent = Arc::new(Agent::new(cfg1).await?);
1977 
1978     let (is_closed_tx, mut is_closed_rx) = mpsc::channel::<()>(1);
1979     let (is_connected_tx, mut is_connected_rx) = mpsc::channel::<()>(1);
1980     let is_closed_tx = Arc::new(Mutex::new(Some(is_closed_tx)));
1981     let is_connected_tx = Arc::new(Mutex::new(Some(is_connected_tx)));
1982     a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
1983         let is_closed_tx_clone = Arc::clone(&is_closed_tx);
1984         let is_connected_tx_clone = Arc::clone(&is_connected_tx);
1985         Box::pin(async move {
1986             if c == ConnectionState::Connected {
1987                 let mut tx = is_connected_tx_clone.lock().await;
1988                 tx.take();
1989             } else if c == ConnectionState::Closed {
1990                 let mut tx = is_closed_tx_clone.lock().await;
1991                 tx.take();
1992             }
1993         })
1994     }));
1995 
1996     connect_with_vnet(&a_agent, &b_agent).await?;
1997 
1998     let _ = is_connected_rx.recv().await;
1999     a_agent.close().await?;
2000 
2001     let _ = is_closed_rx.recv().await;
2002     b_agent.close().await?;
2003 
2004     Ok(())
2005 }
2006 
2007 #[tokio::test]
test_run_task_in_connection_state_callback() -> Result<()>2008 async fn test_run_task_in_connection_state_callback() -> Result<()> {
2009     let one_second = Duration::from_secs(1);
2010     let keepalive_interval = Duration::from_secs(0);
2011 
2012     let cfg0 = AgentConfig {
2013         urls: vec![],
2014         network_types: supported_network_types(),
2015         disconnected_timeout: Some(one_second),
2016         failed_timeout: Some(one_second),
2017         keepalive_interval: Some(keepalive_interval),
2018         check_interval: Duration::from_millis(50),
2019         ..Default::default()
2020     };
2021 
2022     let cfg1 = AgentConfig {
2023         urls: vec![],
2024         network_types: supported_network_types(),
2025         disconnected_timeout: Some(one_second),
2026         failed_timeout: Some(one_second),
2027         keepalive_interval: Some(keepalive_interval),
2028         check_interval: Duration::from_millis(50),
2029         ..Default::default()
2030     };
2031 
2032     let a_agent = Arc::new(Agent::new(cfg0).await?);
2033     let b_agent = Arc::new(Agent::new(cfg1).await?);
2034 
2035     let (is_complete_tx, mut is_complete_rx) = mpsc::channel::<()>(1);
2036     let is_complete_tx = Arc::new(Mutex::new(Some(is_complete_tx)));
2037     a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2038         let is_complete_tx_clone = Arc::clone(&is_complete_tx);
2039         Box::pin(async move {
2040             if c == ConnectionState::Connected {
2041                 let mut tx = is_complete_tx_clone.lock().await;
2042                 tx.take();
2043             }
2044         })
2045     }));
2046 
2047     connect_with_vnet(&a_agent, &b_agent).await?;
2048 
2049     let _ = is_complete_rx.recv().await;
2050     let _ = a_agent.get_local_user_credentials().await;
2051     a_agent.restart("".to_owned(), "".to_owned()).await?;
2052 
2053     a_agent.close().await?;
2054     b_agent.close().await?;
2055 
2056     Ok(())
2057 }
2058 
2059 #[tokio::test]
test_run_task_in_selected_candidate_pair_change_callback() -> Result<()>2060 async fn test_run_task_in_selected_candidate_pair_change_callback() -> Result<()> {
2061     let one_second = Duration::from_secs(1);
2062     let keepalive_interval = Duration::from_secs(0);
2063 
2064     let cfg0 = AgentConfig {
2065         urls: vec![],
2066         network_types: supported_network_types(),
2067         disconnected_timeout: Some(one_second),
2068         failed_timeout: Some(one_second),
2069         keepalive_interval: Some(keepalive_interval),
2070         check_interval: Duration::from_millis(50),
2071         ..Default::default()
2072     };
2073 
2074     let cfg1 = AgentConfig {
2075         urls: vec![],
2076         network_types: supported_network_types(),
2077         disconnected_timeout: Some(one_second),
2078         failed_timeout: Some(one_second),
2079         keepalive_interval: Some(keepalive_interval),
2080         check_interval: Duration::from_millis(50),
2081         ..Default::default()
2082     };
2083 
2084     let a_agent = Arc::new(Agent::new(cfg0).await?);
2085     let b_agent = Arc::new(Agent::new(cfg1).await?);
2086 
2087     let (is_tested_tx, mut is_tested_rx) = mpsc::channel::<()>(1);
2088     let is_tested_tx = Arc::new(Mutex::new(Some(is_tested_tx)));
2089     a_agent.on_selected_candidate_pair_change(Box::new(
2090         move |_: &Arc<dyn Candidate + Send + Sync>, _: &Arc<dyn Candidate + Send + Sync>| {
2091             let is_tested_tx_clone = Arc::clone(&is_tested_tx);
2092             Box::pin(async move {
2093                 let mut tx = is_tested_tx_clone.lock().await;
2094                 tx.take();
2095             })
2096         },
2097     ));
2098 
2099     let (is_complete_tx, mut is_complete_rx) = mpsc::channel::<()>(1);
2100     let is_complete_tx = Arc::new(Mutex::new(Some(is_complete_tx)));
2101     a_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2102         let is_complete_tx_clone = Arc::clone(&is_complete_tx);
2103         Box::pin(async move {
2104             if c == ConnectionState::Connected {
2105                 let mut tx = is_complete_tx_clone.lock().await;
2106                 tx.take();
2107             }
2108         })
2109     }));
2110 
2111     connect_with_vnet(&a_agent, &b_agent).await?;
2112 
2113     let _ = is_complete_rx.recv().await;
2114     let _ = is_tested_rx.recv().await;
2115 
2116     let _ = a_agent.get_local_user_credentials().await;
2117 
2118     a_agent.close().await?;
2119     b_agent.close().await?;
2120 
2121     Ok(())
2122 }
2123 
2124 // Assert that a Lite agent goes to disconnected and failed
2125 #[tokio::test]
test_lite_lifecycle() -> Result<()>2126 async fn test_lite_lifecycle() -> Result<()> {
2127     let (a_notifier, mut a_connected_rx) = on_connected();
2128 
2129     let a_agent = Arc::new(
2130         Agent::new(AgentConfig {
2131             network_types: supported_network_types(),
2132             multicast_dns_mode: MulticastDnsMode::Disabled,
2133             ..Default::default()
2134         })
2135         .await?,
2136     );
2137 
2138     a_agent.on_connection_state_change(a_notifier);
2139 
2140     let disconnected_duration = Duration::from_secs(1);
2141     let failed_duration = Duration::from_secs(1);
2142     let keepalive_interval = Duration::from_secs(0);
2143 
2144     let b_agent = Arc::new(
2145         Agent::new(AgentConfig {
2146             lite: true,
2147             candidate_types: vec![CandidateType::Host],
2148             network_types: supported_network_types(),
2149             multicast_dns_mode: MulticastDnsMode::Disabled,
2150             disconnected_timeout: Some(disconnected_duration),
2151             failed_timeout: Some(failed_duration),
2152             keepalive_interval: Some(keepalive_interval),
2153             check_interval: Duration::from_millis(500),
2154             ..Default::default()
2155         })
2156         .await?,
2157     );
2158 
2159     let (b_connected_tx, mut b_connected_rx) = mpsc::channel::<()>(1);
2160     let (b_disconnected_tx, mut b_disconnected_rx) = mpsc::channel::<()>(1);
2161     let (b_failed_tx, mut b_failed_rx) = mpsc::channel::<()>(1);
2162     let b_connected_tx = Arc::new(Mutex::new(Some(b_connected_tx)));
2163     let b_disconnected_tx = Arc::new(Mutex::new(Some(b_disconnected_tx)));
2164     let b_failed_tx = Arc::new(Mutex::new(Some(b_failed_tx)));
2165 
2166     b_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
2167         let b_connected_tx_clone = Arc::clone(&b_connected_tx);
2168         let b_disconnected_tx_clone = Arc::clone(&b_disconnected_tx);
2169         let b_failed_tx_clone = Arc::clone(&b_failed_tx);
2170 
2171         Box::pin(async move {
2172             if c == ConnectionState::Connected {
2173                 let mut tx = b_connected_tx_clone.lock().await;
2174                 tx.take();
2175             } else if c == ConnectionState::Disconnected {
2176                 let mut tx = b_disconnected_tx_clone.lock().await;
2177                 tx.take();
2178             } else if c == ConnectionState::Failed {
2179                 let mut tx = b_failed_tx_clone.lock().await;
2180                 tx.take();
2181             }
2182         })
2183     }));
2184 
2185     connect_with_vnet(&b_agent, &a_agent).await?;
2186 
2187     let _ = a_connected_rx.recv().await;
2188     let _ = b_connected_rx.recv().await;
2189     a_agent.close().await?;
2190 
2191     let _ = b_disconnected_rx.recv().await;
2192     let _ = b_failed_rx.recv().await;
2193 
2194     b_agent.close().await?;
2195 
2196     Ok(())
2197 }
2198