xref: /webrtc/ice/src/agent/agent_vnet_test.rs (revision da547c92)
1 use super::*;
2 
3 use crate::candidate::candidate_base::unmarshal_candidate;
4 use async_trait::async_trait;
5 use std::net::{IpAddr, Ipv4Addr};
6 use std::result::Result;
7 use std::str::FromStr;
8 use std::sync::atomic::AtomicU64;
9 use util::vnet::chunk::Chunk;
10 use util::{vnet::router::Nic, vnet::*, Conn};
11 use waitgroup::WaitGroup;
12 
13 pub(crate) struct MockConn;
14 
15 #[async_trait]
16 impl Conn for MockConn {
connect(&self, _addr: SocketAddr) -> Result<(), util::Error>17     async fn connect(&self, _addr: SocketAddr) -> Result<(), util::Error> {
18         Ok(())
19     }
recv(&self, _buf: &mut [u8]) -> Result<usize, util::Error>20     async fn recv(&self, _buf: &mut [u8]) -> Result<usize, util::Error> {
21         Ok(0)
22     }
recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), util::Error>23     async fn recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), util::Error> {
24         Ok((0, SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)))
25     }
send(&self, _buf: &[u8]) -> Result<usize, util::Error>26     async fn send(&self, _buf: &[u8]) -> Result<usize, util::Error> {
27         Ok(0)
28     }
send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, util::Error>29     async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, util::Error> {
30         Ok(0)
31     }
local_addr(&self) -> Result<SocketAddr, util::Error>32     fn local_addr(&self) -> Result<SocketAddr, util::Error> {
33         Ok(SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0))
34     }
remote_addr(&self) -> Option<SocketAddr>35     fn remote_addr(&self) -> Option<SocketAddr> {
36         None
37     }
close(&self) -> Result<(), util::Error>38     async fn close(&self) -> Result<(), util::Error> {
39         Ok(())
40     }
41 }
42 
43 pub(crate) struct VNet {
44     pub(crate) wan: Arc<Mutex<router::Router>>,
45     pub(crate) net0: Arc<net::Net>,
46     pub(crate) net1: Arc<net::Net>,
47     pub(crate) server: turn::server::Server,
48 }
49 
50 impl VNet {
close(&self) -> Result<(), Error>51     pub(crate) async fn close(&self) -> Result<(), Error> {
52         self.server.close().await?;
53         let mut w = self.wan.lock().await;
54         w.stop().await?;
55         Ok(())
56     }
57 }
58 
59 pub(crate) const VNET_GLOBAL_IPA: &str = "27.1.1.1";
60 pub(crate) const VNET_LOCAL_IPA: &str = "192.168.0.1";
61 pub(crate) const VNET_LOCAL_SUBNET_MASK_A: &str = "24";
62 pub(crate) const VNET_GLOBAL_IPB: &str = "28.1.1.1";
63 pub(crate) const VNET_LOCAL_IPB: &str = "10.2.0.1";
64 pub(crate) const VNET_LOCAL_SUBNET_MASK_B: &str = "24";
65 pub(crate) const VNET_STUN_SERVER_IP: &str = "1.2.3.4";
66 pub(crate) const VNET_STUN_SERVER_PORT: u16 = 3478;
67 
build_simple_vnet( _nat_type0: nat::NatType, _nat_type1: nat::NatType, ) -> Result<VNet, Error>68 pub(crate) async fn build_simple_vnet(
69     _nat_type0: nat::NatType,
70     _nat_type1: nat::NatType,
71 ) -> Result<VNet, Error> {
72     // WAN
73     let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
74         cidr: "0.0.0.0/0".to_owned(),
75         ..Default::default()
76     })?));
77 
78     let wnet = Arc::new(net::Net::new(Some(net::NetConfig {
79         static_ip: VNET_STUN_SERVER_IP.to_owned(), // will be assigned to eth0
80         ..Default::default()
81     })));
82 
83     connect_net2router(&wnet, &wan).await?;
84 
85     // LAN
86     let lan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
87         cidr: format!("{VNET_LOCAL_IPA}/{VNET_LOCAL_SUBNET_MASK_A}"),
88         ..Default::default()
89     })?));
90 
91     let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
92         static_ips: vec!["192.168.0.1".to_owned()],
93         ..Default::default()
94     })));
95     let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
96         static_ips: vec!["192.168.0.2".to_owned()],
97         ..Default::default()
98     })));
99 
100     connect_net2router(&net0, &lan).await?;
101     connect_net2router(&net1, &lan).await?;
102     connect_router2router(&lan, &wan).await?;
103 
104     // start routers...
105     start_router(&wan).await?;
106 
107     let server = add_vnet_stun(wnet).await?;
108 
109     Ok(VNet {
110         wan,
111         net0,
112         net1,
113         server,
114     })
115 }
116 
build_vnet( nat_type0: nat::NatType, nat_type1: nat::NatType, ) -> Result<VNet, Error>117 pub(crate) async fn build_vnet(
118     nat_type0: nat::NatType,
119     nat_type1: nat::NatType,
120 ) -> Result<VNet, Error> {
121     // WAN
122     let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
123         cidr: "0.0.0.0/0".to_owned(),
124         ..Default::default()
125     })?));
126 
127     let wnet = Arc::new(net::Net::new(Some(net::NetConfig {
128         static_ip: VNET_STUN_SERVER_IP.to_owned(), // will be assigned to eth0
129         ..Default::default()
130     })));
131 
132     connect_net2router(&wnet, &wan).await?;
133 
134     // LAN 0
135     let lan0 = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
136         static_ips: if nat_type0.mode == nat::NatMode::Nat1To1 {
137             vec![format!("{VNET_GLOBAL_IPA}/{VNET_LOCAL_IPA}")]
138         } else {
139             vec![VNET_GLOBAL_IPA.to_owned()]
140         },
141         cidr: format!("{VNET_LOCAL_IPA}/{VNET_LOCAL_SUBNET_MASK_A}"),
142         nat_type: Some(nat_type0),
143         ..Default::default()
144     })?));
145 
146     let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
147         static_ips: vec![VNET_LOCAL_IPA.to_owned()],
148         ..Default::default()
149     })));
150 
151     connect_net2router(&net0, &lan0).await?;
152     connect_router2router(&lan0, &wan).await?;
153 
154     // LAN 1
155     let lan1 = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
156         static_ips: if nat_type1.mode == nat::NatMode::Nat1To1 {
157             vec![format!("{VNET_GLOBAL_IPB}/{VNET_LOCAL_IPB}")]
158         } else {
159             vec![VNET_GLOBAL_IPB.to_owned()]
160         },
161         cidr: format!("{VNET_LOCAL_IPB}/{VNET_LOCAL_SUBNET_MASK_B}"),
162         nat_type: Some(nat_type1),
163         ..Default::default()
164     })?));
165 
166     let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
167         static_ips: vec![VNET_LOCAL_IPB.to_owned()],
168         ..Default::default()
169     })));
170 
171     connect_net2router(&net1, &lan1).await?;
172     connect_router2router(&lan1, &wan).await?;
173 
174     // start routers...
175     start_router(&wan).await?;
176 
177     let server = add_vnet_stun(wnet).await?;
178 
179     Ok(VNet {
180         wan,
181         net0,
182         net1,
183         server,
184     })
185 }
186 
187 pub(crate) struct TestAuthHandler {
188     pub(crate) cred_map: HashMap<String, Vec<u8>>,
189 }
190 
191 impl TestAuthHandler {
new() -> Self192     pub(crate) fn new() -> Self {
193         let mut cred_map = HashMap::new();
194         cred_map.insert(
195             "user".to_owned(),
196             turn::auth::generate_auth_key("user", "webrtc.rs", "pass"),
197         );
198 
199         TestAuthHandler { cred_map }
200     }
201 }
202 
203 impl turn::auth::AuthHandler for TestAuthHandler {
auth_handle( &self, username: &str, _realm: &str, _src_addr: SocketAddr, ) -> Result<Vec<u8>, turn::Error>204     fn auth_handle(
205         &self,
206         username: &str,
207         _realm: &str,
208         _src_addr: SocketAddr,
209     ) -> Result<Vec<u8>, turn::Error> {
210         if let Some(pw) = self.cred_map.get(username) {
211             Ok(pw.to_vec())
212         } else {
213             Err(turn::Error::Other("fake error".to_owned()))
214         }
215     }
216 }
217 
add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server::Server, Error>218 pub(crate) async fn add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server::Server, Error> {
219     // Run TURN(STUN) server
220     let conn = wan_net
221         .bind(SocketAddr::from_str(&format!(
222             "{VNET_STUN_SERVER_IP}:{VNET_STUN_SERVER_PORT}"
223         ))?)
224         .await?;
225 
226     let server = turn::server::Server::new(turn::server::config::ServerConfig {
227         conn_configs: vec![turn::server::config::ConnConfig {
228             conn,
229             relay_addr_generator: Box::new(
230                 turn::relay::relay_static::RelayAddressGeneratorStatic {
231                     relay_address: IpAddr::from_str(VNET_STUN_SERVER_IP)?,
232                     address: "0.0.0.0".to_owned(),
233                     net: wan_net,
234                 },
235             ),
236         }],
237         realm: "webrtc.rs".to_owned(),
238         auth_handler: Arc::new(TestAuthHandler::new()),
239         channel_bind_timeout: Duration::from_secs(0),
240         alloc_close_notify: None,
241     })
242     .await?;
243 
244     Ok(server)
245 }
246 
connect_with_vnet( a_agent: &Arc<Agent>, b_agent: &Arc<Agent>, ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error>247 pub(crate) async fn connect_with_vnet(
248     a_agent: &Arc<Agent>,
249     b_agent: &Arc<Agent>,
250 ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error> {
251     // Manual signaling
252     let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials().await;
253     let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials().await;
254 
255     gather_and_exchange_candidates(a_agent, b_agent).await?;
256 
257     let (accepted_tx, mut accepted_rx) = mpsc::channel(1);
258     let (_a_cancel_tx, a_cancel_rx) = mpsc::channel(1);
259 
260     let agent_a = Arc::clone(a_agent);
261     tokio::spawn(async move {
262         let a_conn = agent_a.accept(a_cancel_rx, b_ufrag, b_pwd).await?;
263 
264         let _ = accepted_tx.send(a_conn).await;
265 
266         Result::<(), Error>::Ok(())
267     });
268 
269     let (_b_cancel_tx, b_cancel_rx) = mpsc::channel(1);
270     let b_conn = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd).await?;
271 
272     // Ensure accepted
273     if let Some(a_conn) = accepted_rx.recv().await {
274         Ok((a_conn, b_conn))
275     } else {
276         Err(Error::Other("no a_conn".to_owned()))
277     }
278 }
279 
280 #[derive(Default)]
281 pub(crate) struct AgentTestConfig {
282     pub(crate) urls: Vec<Url>,
283     pub(crate) nat_1to1_ip_candidate_type: CandidateType,
284 }
285 
pipe_with_vnet( v: &VNet, a0test_config: AgentTestConfig, a1test_config: AgentTestConfig, ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error>286 pub(crate) async fn pipe_with_vnet(
287     v: &VNet,
288     a0test_config: AgentTestConfig,
289     a1test_config: AgentTestConfig,
290 ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error> {
291     let (a_notifier, mut a_connected) = on_connected();
292     let (b_notifier, mut b_connected) = on_connected();
293 
294     let nat_1to1_ips = if a0test_config.nat_1to1_ip_candidate_type != CandidateType::Unspecified {
295         vec![VNET_GLOBAL_IPA.to_owned()]
296     } else {
297         vec![]
298     };
299 
300     let cfg0 = AgentConfig {
301         urls: a0test_config.urls,
302         network_types: supported_network_types(),
303         multicast_dns_mode: MulticastDnsMode::Disabled,
304         nat_1to1_ips,
305         nat_1to1_ip_candidate_type: a0test_config.nat_1to1_ip_candidate_type,
306         net: Some(Arc::clone(&v.net0)),
307         ..Default::default()
308     };
309 
310     let a_agent = Arc::new(Agent::new(cfg0).await?);
311     a_agent.on_connection_state_change(a_notifier);
312 
313     let nat_1to1_ips = if a1test_config.nat_1to1_ip_candidate_type != CandidateType::Unspecified {
314         vec![VNET_GLOBAL_IPB.to_owned()]
315     } else {
316         vec![]
317     };
318     let cfg1 = AgentConfig {
319         urls: a1test_config.urls,
320         network_types: supported_network_types(),
321         multicast_dns_mode: MulticastDnsMode::Disabled,
322         nat_1to1_ips,
323         nat_1to1_ip_candidate_type: a1test_config.nat_1to1_ip_candidate_type,
324         net: Some(Arc::clone(&v.net1)),
325         ..Default::default()
326     };
327 
328     let b_agent = Arc::new(Agent::new(cfg1).await?);
329     b_agent.on_connection_state_change(b_notifier);
330 
331     let (a_conn, b_conn) = connect_with_vnet(&a_agent, &b_agent).await?;
332 
333     // Ensure pair selected
334     // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
335     let _ = a_connected.recv().await;
336     let _ = b_connected.recv().await;
337 
338     Ok((a_conn, b_conn))
339 }
340 
on_connected() -> (OnConnectionStateChangeHdlrFn, mpsc::Receiver<()>)341 pub(crate) fn on_connected() -> (OnConnectionStateChangeHdlrFn, mpsc::Receiver<()>) {
342     let (done_tx, done_rx) = mpsc::channel::<()>(1);
343     let done_tx = Arc::new(Mutex::new(Some(done_tx)));
344     let hdlr_fn: OnConnectionStateChangeHdlrFn = Box::new(move |state: ConnectionState| {
345         let done_tx_clone = Arc::clone(&done_tx);
346         Box::pin(async move {
347             if state == ConnectionState::Connected {
348                 let mut tx = done_tx_clone.lock().await;
349                 tx.take();
350             }
351         })
352     });
353     (hdlr_fn, done_rx)
354 }
355 
gather_and_exchange_candidates( a_agent: &Arc<Agent>, b_agent: &Arc<Agent>, ) -> Result<(), Error>356 pub(crate) async fn gather_and_exchange_candidates(
357     a_agent: &Arc<Agent>,
358     b_agent: &Arc<Agent>,
359 ) -> Result<(), Error> {
360     let wg = WaitGroup::new();
361 
362     let w1 = Arc::new(Mutex::new(Some(wg.worker())));
363     a_agent.on_candidate(Box::new(
364         move |candidate: Option<Arc<dyn Candidate + Send + Sync>>| {
365             let w3 = Arc::clone(&w1);
366             Box::pin(async move {
367                 if candidate.is_none() {
368                     let mut w = w3.lock().await;
369                     w.take();
370                 }
371             })
372         },
373     ));
374     a_agent.gather_candidates()?;
375 
376     let w2 = Arc::new(Mutex::new(Some(wg.worker())));
377     b_agent.on_candidate(Box::new(
378         move |candidate: Option<Arc<dyn Candidate + Send + Sync>>| {
379             let w3 = Arc::clone(&w2);
380             Box::pin(async move {
381                 if candidate.is_none() {
382                     let mut w = w3.lock().await;
383                     w.take();
384                 }
385             })
386         },
387     ));
388     b_agent.gather_candidates()?;
389 
390     wg.wait().await;
391 
392     let candidates = a_agent.get_local_candidates().await?;
393     for c in candidates {
394         let c2: Arc<dyn Candidate + Send + Sync> =
395             Arc::new(unmarshal_candidate(c.marshal().as_str())?);
396         b_agent.add_remote_candidate(&c2)?;
397     }
398 
399     let candidates = b_agent.get_local_candidates().await?;
400     for c in candidates {
401         let c2: Arc<dyn Candidate + Send + Sync> =
402             Arc::new(unmarshal_candidate(c.marshal().as_str())?);
403         a_agent.add_remote_candidate(&c2)?;
404     }
405 
406     Ok(())
407 }
408 
start_router(router: &Arc<Mutex<router::Router>>) -> Result<(), Error>409 pub(crate) async fn start_router(router: &Arc<Mutex<router::Router>>) -> Result<(), Error> {
410     let mut w = router.lock().await;
411     Ok(w.start().await?)
412 }
413 
connect_net2router( net: &Arc<net::Net>, router: &Arc<Mutex<router::Router>>, ) -> Result<(), Error>414 pub(crate) async fn connect_net2router(
415     net: &Arc<net::Net>,
416     router: &Arc<Mutex<router::Router>>,
417 ) -> Result<(), Error> {
418     let nic = net.get_nic()?;
419 
420     {
421         let mut w = router.lock().await;
422         w.add_net(Arc::clone(&nic)).await?;
423     }
424     {
425         let n = nic.lock().await;
426         n.set_router(Arc::clone(router)).await?;
427     }
428 
429     Ok(())
430 }
431 
connect_router2router( child: &Arc<Mutex<router::Router>>, parent: &Arc<Mutex<router::Router>>, ) -> Result<(), Error>432 pub(crate) async fn connect_router2router(
433     child: &Arc<Mutex<router::Router>>,
434     parent: &Arc<Mutex<router::Router>>,
435 ) -> Result<(), Error> {
436     {
437         let mut w = parent.lock().await;
438         w.add_router(Arc::clone(child)).await?;
439     }
440 
441     {
442         let l = child.lock().await;
443         l.set_router(Arc::clone(parent)).await?;
444     }
445 
446     Ok(())
447 }
448 
449 #[tokio::test]
test_connectivity_simple_vnet_full_cone_nats_on_both_ends() -> Result<(), Error>450 async fn test_connectivity_simple_vnet_full_cone_nats_on_both_ends() -> Result<(), Error> {
451     /*env_logger::Builder::new()
452     .format(|buf, record| {
453         writeln!(
454             buf,
455             "{}:{} [{}] {} - {}",
456             record.file().unwrap_or("unknown"),
457             record.line().unwrap_or(0),
458             record.level(),
459             chrono::Local::now().format("%H:%M:%S.%6f"),
460             record.args()
461         )
462     })
463     .filter(None, log::LevelFilter::Trace)
464     .init();*/
465 
466     let stun_server_url = Url {
467         scheme: SchemeType::Stun,
468         host: VNET_STUN_SERVER_IP.to_owned(),
469         port: VNET_STUN_SERVER_PORT,
470         proto: ProtoType::Udp,
471         ..Default::default()
472     };
473 
474     // buildVNet with a Full-cone NATs both LANs
475     let nat_type = nat::NatType {
476         mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
477         filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
478         ..Default::default()
479     };
480 
481     let v = build_simple_vnet(nat_type, nat_type).await?;
482 
483     log::debug!("Connecting...");
484     let a0test_config = AgentTestConfig {
485         urls: vec![stun_server_url.clone()],
486         ..Default::default()
487     };
488     let a1test_config = AgentTestConfig {
489         urls: vec![stun_server_url.clone()],
490         ..Default::default()
491     };
492     let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
493 
494     tokio::time::sleep(Duration::from_secs(1)).await;
495 
496     log::debug!("Closing...");
497     v.close().await?;
498 
499     Ok(())
500 }
501 
502 #[tokio::test]
test_connectivity_vnet_full_cone_nats_on_both_ends() -> Result<(), Error>503 async fn test_connectivity_vnet_full_cone_nats_on_both_ends() -> Result<(), Error> {
504     /*env_logger::Builder::new()
505     .format(|buf, record| {
506         writeln!(
507             buf,
508             "{}:{} [{}] {} - {}",
509             record.file().unwrap_or("unknown"),
510             record.line().unwrap_or(0),
511             record.level(),
512             chrono::Local::now().format("%H:%M:%S.%6f"),
513             record.args()
514         )
515     })
516     .filter(None, log::LevelFilter::Trace)
517     .init();*/
518 
519     let stun_server_url = Url {
520         scheme: SchemeType::Stun,
521         host: VNET_STUN_SERVER_IP.to_owned(),
522         port: VNET_STUN_SERVER_PORT,
523         proto: ProtoType::Udp,
524         ..Default::default()
525     };
526 
527     let _turn_server_url = Url {
528         scheme: SchemeType::Turn,
529         host: VNET_STUN_SERVER_IP.to_owned(),
530         port: VNET_STUN_SERVER_PORT,
531         username: "user".to_owned(),
532         password: "pass".to_owned(),
533         proto: ProtoType::Udp,
534     };
535 
536     // buildVNet with a Full-cone NATs both LANs
537     let nat_type = nat::NatType {
538         mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
539         filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
540         ..Default::default()
541     };
542 
543     let v = build_vnet(nat_type, nat_type).await?;
544 
545     log::debug!("Connecting...");
546     let a0test_config = AgentTestConfig {
547         urls: vec![stun_server_url.clone()],
548         ..Default::default()
549     };
550     let a1test_config = AgentTestConfig {
551         urls: vec![stun_server_url.clone()],
552         ..Default::default()
553     };
554     let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
555 
556     tokio::time::sleep(Duration::from_secs(1)).await;
557 
558     log::debug!("Closing...");
559     v.close().await?;
560 
561     Ok(())
562 }
563 
564 #[tokio::test]
test_connectivity_vnet_symmetric_nats_on_both_ends() -> Result<(), Error>565 async fn test_connectivity_vnet_symmetric_nats_on_both_ends() -> Result<(), Error> {
566     /*env_logger::Builder::new()
567     .format(|buf, record| {
568         writeln!(
569             buf,
570             "{}:{} [{}] {} - {}",
571             record.file().unwrap_or("unknown"),
572             record.line().unwrap_or(0),
573             record.level(),
574             chrono::Local::now().format("%H:%M:%S.%6f"),
575             record.args()
576         )
577     })
578     .filter(None, log::LevelFilter::Trace)
579     .init();*/
580 
581     let stun_server_url = Url {
582         scheme: SchemeType::Stun,
583         host: VNET_STUN_SERVER_IP.to_owned(),
584         port: VNET_STUN_SERVER_PORT,
585         proto: ProtoType::Udp,
586         ..Default::default()
587     };
588 
589     let turn_server_url = Url {
590         scheme: SchemeType::Turn,
591         host: VNET_STUN_SERVER_IP.to_owned(),
592         port: VNET_STUN_SERVER_PORT,
593         username: "user".to_owned(),
594         password: "pass".to_owned(),
595         proto: ProtoType::Udp,
596     };
597 
598     // buildVNet with a Symmetric NATs for both LANs
599     let nat_type = nat::NatType {
600         mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
601         filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
602         ..Default::default()
603     };
604 
605     let v = build_vnet(nat_type, nat_type).await?;
606 
607     log::debug!("Connecting...");
608     let a0test_config = AgentTestConfig {
609         urls: vec![stun_server_url.clone(), turn_server_url.clone()],
610         ..Default::default()
611     };
612     let a1test_config = AgentTestConfig {
613         urls: vec![stun_server_url.clone()],
614         ..Default::default()
615     };
616     let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
617 
618     tokio::time::sleep(Duration::from_secs(1)).await;
619 
620     log::debug!("Closing...");
621     v.close().await?;
622 
623     Ok(())
624 }
625 
626 #[tokio::test]
test_connectivity_vnet_1to1_nat_with_host_candidate_vs_symmetric_nats() -> Result<(), Error>627 async fn test_connectivity_vnet_1to1_nat_with_host_candidate_vs_symmetric_nats() -> Result<(), Error>
628 {
629     /*env_logger::Builder::new()
630     .format(|buf, record| {
631         writeln!(
632             buf,
633             "{}:{} [{}] {} - {}",
634             record.file().unwrap_or("unknown"),
635             record.line().unwrap_or(0),
636             record.level(),
637             chrono::Local::now().format("%H:%M:%S.%6f"),
638             record.args()
639         )
640     })
641     .filter(None, log::LevelFilter::Trace)
642     .init();*/
643 
644     // Agent0 is behind 1:1 NAT
645     let nat_type0 = nat::NatType {
646         mode: nat::NatMode::Nat1To1,
647         ..Default::default()
648     };
649     // Agent1 is behind a symmetric NAT
650     let nat_type1 = nat::NatType {
651         mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
652         filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
653         ..Default::default()
654     };
655     log::debug!("natType0: {:?}", nat_type0);
656     log::debug!("natType1: {:?}", nat_type1);
657 
658     let v = build_vnet(nat_type0, nat_type1).await?;
659 
660     log::debug!("Connecting...");
661     let a0test_config = AgentTestConfig {
662         urls: vec![],
663         nat_1to1_ip_candidate_type: CandidateType::Host, // Use 1:1 NAT IP as a host candidate
664     };
665     let a1test_config = AgentTestConfig {
666         urls: vec![],
667         ..Default::default()
668     };
669     let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
670 
671     tokio::time::sleep(Duration::from_secs(1)).await;
672 
673     log::debug!("Closing...");
674     v.close().await?;
675 
676     Ok(())
677 }
678 
679 #[tokio::test]
test_connectivity_vnet_1to1_nat_with_srflx_candidate_vs_symmetric_nats( ) -> Result<(), Error>680 async fn test_connectivity_vnet_1to1_nat_with_srflx_candidate_vs_symmetric_nats(
681 ) -> Result<(), Error> {
682     /*env_logger::Builder::new()
683     .format(|buf, record| {
684         writeln!(
685             buf,
686             "{}:{} [{}] {} - {}",
687             record.file().unwrap_or("unknown"),
688             record.line().unwrap_or(0),
689             record.level(),
690             chrono::Local::now().format("%H:%M:%S.%6f"),
691             record.args()
692         )
693     })
694     .filter(None, log::LevelFilter::Trace)
695     .init();*/
696 
697     // Agent0 is behind 1:1 NAT
698     let nat_type0 = nat::NatType {
699         mode: nat::NatMode::Nat1To1,
700         ..Default::default()
701     };
702     // Agent1 is behind a symmetric NAT
703     let nat_type1 = nat::NatType {
704         mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
705         filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
706         ..Default::default()
707     };
708     log::debug!("natType0: {:?}", nat_type0);
709     log::debug!("natType1: {:?}", nat_type1);
710 
711     let v = build_vnet(nat_type0, nat_type1).await?;
712 
713     log::debug!("Connecting...");
714     let a0test_config = AgentTestConfig {
715         urls: vec![],
716         nat_1to1_ip_candidate_type: CandidateType::ServerReflexive, // Use 1:1 NAT IP as a srflx candidate
717     };
718     let a1test_config = AgentTestConfig {
719         urls: vec![],
720         ..Default::default()
721     };
722     let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
723 
724     tokio::time::sleep(Duration::from_secs(1)).await;
725 
726     log::debug!("Closing...");
727     v.close().await?;
728 
729     Ok(())
730 }
731 
block_until_state_seen( expected_state: ConnectionState, state_queue: &mut mpsc::Receiver<ConnectionState>, )732 async fn block_until_state_seen(
733     expected_state: ConnectionState,
734     state_queue: &mut mpsc::Receiver<ConnectionState>,
735 ) {
736     while let Some(s) = state_queue.recv().await {
737         if s == expected_state {
738             return;
739         }
740     }
741 }
742 
743 // test_disconnected_to_connected asserts that an agent can go to disconnected, and then return to connected successfully
744 #[tokio::test]
test_disconnected_to_connected() -> Result<(), Error>745 async fn test_disconnected_to_connected() -> Result<(), Error> {
746     /*env_logger::Builder::new()
747     .format(|buf, record| {
748         writeln!(
749             buf,
750             "{}:{} [{}] {} - {}",
751             record.file().unwrap_or("unknown"),
752             record.line().unwrap_or(0),
753             record.level(),
754             chrono::Local::now().format("%H:%M:%S.%6f"),
755             record.args()
756         )
757     })
758     .filter(None, log::LevelFilter::Trace)
759     .init();*/
760 
761     // Create a network with two interfaces
762     let wan = router::Router::new(router::RouterConfig {
763         cidr: "0.0.0.0/0".to_owned(),
764         ..Default::default()
765     })?;
766 
767     let drop_all_data = Arc::new(AtomicU64::new(0));
768     let drop_all_data2 = Arc::clone(&drop_all_data);
769     wan.add_chunk_filter(Box::new(move |_c: &(dyn Chunk + Send + Sync)| -> bool {
770         drop_all_data2.load(Ordering::SeqCst) != 1
771     }))
772     .await;
773     let wan = Arc::new(Mutex::new(wan));
774 
775     let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
776         static_ips: vec!["192.168.0.1".to_owned()],
777         ..Default::default()
778     })));
779     let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
780         static_ips: vec!["192.168.0.2".to_owned()],
781         ..Default::default()
782     })));
783 
784     connect_net2router(&net0, &wan).await?;
785     connect_net2router(&net1, &wan).await?;
786     start_router(&wan).await?;
787 
788     let disconnected_timeout = Duration::from_secs(1);
789     let keepalive_interval = Duration::from_millis(20);
790 
791     // Create two agents and connect them
792     let controlling_agent = Arc::new(
793         Agent::new(AgentConfig {
794             network_types: supported_network_types(),
795             multicast_dns_mode: MulticastDnsMode::Disabled,
796             net: Some(Arc::clone(&net0)),
797             disconnected_timeout: Some(disconnected_timeout),
798             keepalive_interval: Some(keepalive_interval),
799             check_interval: keepalive_interval,
800             ..Default::default()
801         })
802         .await?,
803     );
804 
805     let controlled_agent = Arc::new(
806         Agent::new(AgentConfig {
807             network_types: supported_network_types(),
808             multicast_dns_mode: MulticastDnsMode::Disabled,
809             net: Some(Arc::clone(&net1)),
810             disconnected_timeout: Some(disconnected_timeout),
811             keepalive_interval: Some(keepalive_interval),
812             check_interval: keepalive_interval,
813             ..Default::default()
814         })
815         .await?,
816     );
817 
818     let (controlling_state_changes_tx, mut controlling_state_changes_rx) =
819         mpsc::channel::<ConnectionState>(100);
820     let controlling_state_changes_tx = Arc::new(controlling_state_changes_tx);
821     controlling_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
822         let controlling_state_changes_tx_clone = Arc::clone(&controlling_state_changes_tx);
823         Box::pin(async move {
824             let _ = controlling_state_changes_tx_clone.try_send(c);
825         })
826     }));
827 
828     let (controlled_state_changes_tx, mut controlled_state_changes_rx) =
829         mpsc::channel::<ConnectionState>(100);
830     let controlled_state_changes_tx = Arc::new(controlled_state_changes_tx);
831     controlled_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
832         let controlled_state_changes_tx_clone = Arc::clone(&controlled_state_changes_tx);
833         Box::pin(async move {
834             let _ = controlled_state_changes_tx_clone.try_send(c);
835         })
836     }));
837 
838     connect_with_vnet(&controlling_agent, &controlled_agent).await?;
839 
840     // Assert we have gone to connected
841     block_until_state_seen(
842         ConnectionState::Connected,
843         &mut controlling_state_changes_rx,
844     )
845     .await;
846     block_until_state_seen(ConnectionState::Connected, &mut controlled_state_changes_rx).await;
847 
848     // Drop all packets, and block until we have gone to disconnected
849     drop_all_data.store(1, Ordering::SeqCst);
850     block_until_state_seen(
851         ConnectionState::Disconnected,
852         &mut controlling_state_changes_rx,
853     )
854     .await;
855     block_until_state_seen(
856         ConnectionState::Disconnected,
857         &mut controlled_state_changes_rx,
858     )
859     .await;
860 
861     // Allow all packets through again, block until we have gone to connected
862     drop_all_data.store(0, Ordering::SeqCst);
863     block_until_state_seen(
864         ConnectionState::Connected,
865         &mut controlling_state_changes_rx,
866     )
867     .await;
868     block_until_state_seen(ConnectionState::Connected, &mut controlled_state_changes_rx).await;
869 
870     {
871         let mut w = wan.lock().await;
872         w.stop().await?;
873     }
874 
875     controlling_agent.close().await?;
876     controlled_agent.close().await?;
877 
878     Ok(())
879 }
880 
881 //use std::io::Write;
882 
883 // Agent.Write should use the best valid pair if a selected pair is not yet available
884 #[tokio::test]
test_write_use_valid_pair() -> Result<(), Error>885 async fn test_write_use_valid_pair() -> Result<(), Error> {
886     /*env_logger::Builder::new()
887     .format(|buf, record| {
888         writeln!(
889             buf,
890             "{}:{} [{}] {} - {}",
891             record.file().unwrap_or("unknown"),
892             record.line().unwrap_or(0),
893             record.level(),
894             chrono::Local::now().format("%H:%M:%S.%6f"),
895             record.args()
896         )
897     })
898     .filter(None, log::LevelFilter::Trace)
899     .init();*/
900 
901     // Create a network with two interfaces
902     let wan = router::Router::new(router::RouterConfig {
903         cidr: "0.0.0.0/0".to_owned(),
904         ..Default::default()
905     })?;
906 
907     wan.add_chunk_filter(Box::new(move |c: &(dyn Chunk + Send + Sync)| -> bool {
908         let raw = c.user_data();
909         if stun::message::is_message(&raw) {
910             let mut m = stun::message::Message {
911                 raw,
912                 ..Default::default()
913             };
914             let result = m.decode();
915             if result.is_err() | m.contains(stun::attributes::ATTR_USE_CANDIDATE) {
916                 return false;
917             }
918         }
919 
920         true
921     }))
922     .await;
923     let wan = Arc::new(Mutex::new(wan));
924 
925     let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
926         static_ips: vec!["192.168.0.1".to_owned()],
927         ..Default::default()
928     })));
929     let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
930         static_ips: vec!["192.168.0.2".to_owned()],
931         ..Default::default()
932     })));
933 
934     connect_net2router(&net0, &wan).await?;
935     connect_net2router(&net1, &wan).await?;
936     start_router(&wan).await?;
937 
938     // Create two agents and connect them
939     let controlling_agent = Arc::new(
940         Agent::new(AgentConfig {
941             network_types: supported_network_types(),
942             multicast_dns_mode: MulticastDnsMode::Disabled,
943             net: Some(Arc::clone(&net0)),
944             ..Default::default()
945         })
946         .await?,
947     );
948 
949     let controlled_agent = Arc::new(
950         Agent::new(AgentConfig {
951             network_types: supported_network_types(),
952             multicast_dns_mode: MulticastDnsMode::Disabled,
953             net: Some(Arc::clone(&net1)),
954             ..Default::default()
955         })
956         .await?,
957     );
958 
959     gather_and_exchange_candidates(&controlling_agent, &controlled_agent).await?;
960 
961     let (controlling_ufrag, controlling_pwd) = controlling_agent.get_local_user_credentials().await;
962     let (controlled_ufrag, controlled_pwd) = controlled_agent.get_local_user_credentials().await;
963 
964     let controlling_agent_tx = Arc::clone(&controlling_agent);
965     tokio::spawn(async move {
966         let test_message = "Test Message";
967         let controlling_agent_conn = {
968             controlling_agent_tx
969                 .internal
970                 .start_connectivity_checks(true, controlled_ufrag, controlled_pwd)
971                 .await?;
972             Arc::clone(&controlling_agent_tx.internal.agent_conn) as Arc<dyn Conn + Send + Sync>
973         };
974 
975         log::debug!("controlling_agent start_connectivity_checks done...");
976         loop {
977             let result = controlling_agent_conn.send(test_message.as_bytes()).await;
978             if result.is_err() {
979                 break;
980             }
981 
982             tokio::time::sleep(Duration::from_millis(20)).await;
983         }
984 
985         Result::<(), Error>::Ok(())
986     });
987 
988     let controlled_agent_conn = {
989         controlled_agent
990             .internal
991             .start_connectivity_checks(false, controlling_ufrag, controlling_pwd)
992             .await?;
993         Arc::clone(&controlled_agent.internal.agent_conn) as Arc<dyn Conn + Send + Sync>
994     };
995 
996     log::debug!("controlled_agent start_connectivity_checks done...");
997 
998     let test_message = "Test Message";
999     let mut read_buf = vec![0u8; test_message.as_bytes().len()];
1000     controlled_agent_conn.recv(&mut read_buf).await?;
1001 
1002     assert_eq!(read_buf, test_message.as_bytes(), "should match");
1003 
1004     {
1005         let mut w = wan.lock().await;
1006         w.stop().await?;
1007     }
1008 
1009     controlling_agent.close().await?;
1010     controlled_agent.close().await?;
1011 
1012     Ok(())
1013 }
1014