1 use super::*;
2
3 use crate::candidate::candidate_base::unmarshal_candidate;
4 use async_trait::async_trait;
5 use std::net::{IpAddr, Ipv4Addr};
6 use std::result::Result;
7 use std::str::FromStr;
8 use std::sync::atomic::AtomicU64;
9 use util::vnet::chunk::Chunk;
10 use util::{vnet::router::Nic, vnet::*, Conn};
11 use waitgroup::WaitGroup;
12
13 pub(crate) struct MockConn;
14
15 #[async_trait]
16 impl Conn for MockConn {
connect(&self, _addr: SocketAddr) -> Result<(), util::Error>17 async fn connect(&self, _addr: SocketAddr) -> Result<(), util::Error> {
18 Ok(())
19 }
recv(&self, _buf: &mut [u8]) -> Result<usize, util::Error>20 async fn recv(&self, _buf: &mut [u8]) -> Result<usize, util::Error> {
21 Ok(0)
22 }
recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), util::Error>23 async fn recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), util::Error> {
24 Ok((0, SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0)))
25 }
send(&self, _buf: &[u8]) -> Result<usize, util::Error>26 async fn send(&self, _buf: &[u8]) -> Result<usize, util::Error> {
27 Ok(0)
28 }
send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, util::Error>29 async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, util::Error> {
30 Ok(0)
31 }
local_addr(&self) -> Result<SocketAddr, util::Error>32 fn local_addr(&self) -> Result<SocketAddr, util::Error> {
33 Ok(SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0))
34 }
remote_addr(&self) -> Option<SocketAddr>35 fn remote_addr(&self) -> Option<SocketAddr> {
36 None
37 }
close(&self) -> Result<(), util::Error>38 async fn close(&self) -> Result<(), util::Error> {
39 Ok(())
40 }
41 }
42
43 pub(crate) struct VNet {
44 pub(crate) wan: Arc<Mutex<router::Router>>,
45 pub(crate) net0: Arc<net::Net>,
46 pub(crate) net1: Arc<net::Net>,
47 pub(crate) server: turn::server::Server,
48 }
49
50 impl VNet {
close(&self) -> Result<(), Error>51 pub(crate) async fn close(&self) -> Result<(), Error> {
52 self.server.close().await?;
53 let mut w = self.wan.lock().await;
54 w.stop().await?;
55 Ok(())
56 }
57 }
58
59 pub(crate) const VNET_GLOBAL_IPA: &str = "27.1.1.1";
60 pub(crate) const VNET_LOCAL_IPA: &str = "192.168.0.1";
61 pub(crate) const VNET_LOCAL_SUBNET_MASK_A: &str = "24";
62 pub(crate) const VNET_GLOBAL_IPB: &str = "28.1.1.1";
63 pub(crate) const VNET_LOCAL_IPB: &str = "10.2.0.1";
64 pub(crate) const VNET_LOCAL_SUBNET_MASK_B: &str = "24";
65 pub(crate) const VNET_STUN_SERVER_IP: &str = "1.2.3.4";
66 pub(crate) const VNET_STUN_SERVER_PORT: u16 = 3478;
67
build_simple_vnet( _nat_type0: nat::NatType, _nat_type1: nat::NatType, ) -> Result<VNet, Error>68 pub(crate) async fn build_simple_vnet(
69 _nat_type0: nat::NatType,
70 _nat_type1: nat::NatType,
71 ) -> Result<VNet, Error> {
72 // WAN
73 let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
74 cidr: "0.0.0.0/0".to_owned(),
75 ..Default::default()
76 })?));
77
78 let wnet = Arc::new(net::Net::new(Some(net::NetConfig {
79 static_ip: VNET_STUN_SERVER_IP.to_owned(), // will be assigned to eth0
80 ..Default::default()
81 })));
82
83 connect_net2router(&wnet, &wan).await?;
84
85 // LAN
86 let lan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
87 cidr: format!("{VNET_LOCAL_IPA}/{VNET_LOCAL_SUBNET_MASK_A}"),
88 ..Default::default()
89 })?));
90
91 let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
92 static_ips: vec!["192.168.0.1".to_owned()],
93 ..Default::default()
94 })));
95 let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
96 static_ips: vec!["192.168.0.2".to_owned()],
97 ..Default::default()
98 })));
99
100 connect_net2router(&net0, &lan).await?;
101 connect_net2router(&net1, &lan).await?;
102 connect_router2router(&lan, &wan).await?;
103
104 // start routers...
105 start_router(&wan).await?;
106
107 let server = add_vnet_stun(wnet).await?;
108
109 Ok(VNet {
110 wan,
111 net0,
112 net1,
113 server,
114 })
115 }
116
build_vnet( nat_type0: nat::NatType, nat_type1: nat::NatType, ) -> Result<VNet, Error>117 pub(crate) async fn build_vnet(
118 nat_type0: nat::NatType,
119 nat_type1: nat::NatType,
120 ) -> Result<VNet, Error> {
121 // WAN
122 let wan = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
123 cidr: "0.0.0.0/0".to_owned(),
124 ..Default::default()
125 })?));
126
127 let wnet = Arc::new(net::Net::new(Some(net::NetConfig {
128 static_ip: VNET_STUN_SERVER_IP.to_owned(), // will be assigned to eth0
129 ..Default::default()
130 })));
131
132 connect_net2router(&wnet, &wan).await?;
133
134 // LAN 0
135 let lan0 = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
136 static_ips: if nat_type0.mode == nat::NatMode::Nat1To1 {
137 vec![format!("{VNET_GLOBAL_IPA}/{VNET_LOCAL_IPA}")]
138 } else {
139 vec![VNET_GLOBAL_IPA.to_owned()]
140 },
141 cidr: format!("{VNET_LOCAL_IPA}/{VNET_LOCAL_SUBNET_MASK_A}"),
142 nat_type: Some(nat_type0),
143 ..Default::default()
144 })?));
145
146 let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
147 static_ips: vec![VNET_LOCAL_IPA.to_owned()],
148 ..Default::default()
149 })));
150
151 connect_net2router(&net0, &lan0).await?;
152 connect_router2router(&lan0, &wan).await?;
153
154 // LAN 1
155 let lan1 = Arc::new(Mutex::new(router::Router::new(router::RouterConfig {
156 static_ips: if nat_type1.mode == nat::NatMode::Nat1To1 {
157 vec![format!("{VNET_GLOBAL_IPB}/{VNET_LOCAL_IPB}")]
158 } else {
159 vec![VNET_GLOBAL_IPB.to_owned()]
160 },
161 cidr: format!("{VNET_LOCAL_IPB}/{VNET_LOCAL_SUBNET_MASK_B}"),
162 nat_type: Some(nat_type1),
163 ..Default::default()
164 })?));
165
166 let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
167 static_ips: vec![VNET_LOCAL_IPB.to_owned()],
168 ..Default::default()
169 })));
170
171 connect_net2router(&net1, &lan1).await?;
172 connect_router2router(&lan1, &wan).await?;
173
174 // start routers...
175 start_router(&wan).await?;
176
177 let server = add_vnet_stun(wnet).await?;
178
179 Ok(VNet {
180 wan,
181 net0,
182 net1,
183 server,
184 })
185 }
186
187 pub(crate) struct TestAuthHandler {
188 pub(crate) cred_map: HashMap<String, Vec<u8>>,
189 }
190
191 impl TestAuthHandler {
new() -> Self192 pub(crate) fn new() -> Self {
193 let mut cred_map = HashMap::new();
194 cred_map.insert(
195 "user".to_owned(),
196 turn::auth::generate_auth_key("user", "webrtc.rs", "pass"),
197 );
198
199 TestAuthHandler { cred_map }
200 }
201 }
202
203 impl turn::auth::AuthHandler for TestAuthHandler {
auth_handle( &self, username: &str, _realm: &str, _src_addr: SocketAddr, ) -> Result<Vec<u8>, turn::Error>204 fn auth_handle(
205 &self,
206 username: &str,
207 _realm: &str,
208 _src_addr: SocketAddr,
209 ) -> Result<Vec<u8>, turn::Error> {
210 if let Some(pw) = self.cred_map.get(username) {
211 Ok(pw.to_vec())
212 } else {
213 Err(turn::Error::Other("fake error".to_owned()))
214 }
215 }
216 }
217
add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server::Server, Error>218 pub(crate) async fn add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server::Server, Error> {
219 // Run TURN(STUN) server
220 let conn = wan_net
221 .bind(SocketAddr::from_str(&format!(
222 "{VNET_STUN_SERVER_IP}:{VNET_STUN_SERVER_PORT}"
223 ))?)
224 .await?;
225
226 let server = turn::server::Server::new(turn::server::config::ServerConfig {
227 conn_configs: vec![turn::server::config::ConnConfig {
228 conn,
229 relay_addr_generator: Box::new(
230 turn::relay::relay_static::RelayAddressGeneratorStatic {
231 relay_address: IpAddr::from_str(VNET_STUN_SERVER_IP)?,
232 address: "0.0.0.0".to_owned(),
233 net: wan_net,
234 },
235 ),
236 }],
237 realm: "webrtc.rs".to_owned(),
238 auth_handler: Arc::new(TestAuthHandler::new()),
239 channel_bind_timeout: Duration::from_secs(0),
240 alloc_close_notify: None,
241 })
242 .await?;
243
244 Ok(server)
245 }
246
connect_with_vnet( a_agent: &Arc<Agent>, b_agent: &Arc<Agent>, ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error>247 pub(crate) async fn connect_with_vnet(
248 a_agent: &Arc<Agent>,
249 b_agent: &Arc<Agent>,
250 ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error> {
251 // Manual signaling
252 let (a_ufrag, a_pwd) = a_agent.get_local_user_credentials().await;
253 let (b_ufrag, b_pwd) = b_agent.get_local_user_credentials().await;
254
255 gather_and_exchange_candidates(a_agent, b_agent).await?;
256
257 let (accepted_tx, mut accepted_rx) = mpsc::channel(1);
258 let (_a_cancel_tx, a_cancel_rx) = mpsc::channel(1);
259
260 let agent_a = Arc::clone(a_agent);
261 tokio::spawn(async move {
262 let a_conn = agent_a.accept(a_cancel_rx, b_ufrag, b_pwd).await?;
263
264 let _ = accepted_tx.send(a_conn).await;
265
266 Result::<(), Error>::Ok(())
267 });
268
269 let (_b_cancel_tx, b_cancel_rx) = mpsc::channel(1);
270 let b_conn = b_agent.dial(b_cancel_rx, a_ufrag, a_pwd).await?;
271
272 // Ensure accepted
273 if let Some(a_conn) = accepted_rx.recv().await {
274 Ok((a_conn, b_conn))
275 } else {
276 Err(Error::Other("no a_conn".to_owned()))
277 }
278 }
279
280 #[derive(Default)]
281 pub(crate) struct AgentTestConfig {
282 pub(crate) urls: Vec<Url>,
283 pub(crate) nat_1to1_ip_candidate_type: CandidateType,
284 }
285
pipe_with_vnet( v: &VNet, a0test_config: AgentTestConfig, a1test_config: AgentTestConfig, ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error>286 pub(crate) async fn pipe_with_vnet(
287 v: &VNet,
288 a0test_config: AgentTestConfig,
289 a1test_config: AgentTestConfig,
290 ) -> Result<(Arc<impl Conn>, Arc<impl Conn>), Error> {
291 let (a_notifier, mut a_connected) = on_connected();
292 let (b_notifier, mut b_connected) = on_connected();
293
294 let nat_1to1_ips = if a0test_config.nat_1to1_ip_candidate_type != CandidateType::Unspecified {
295 vec![VNET_GLOBAL_IPA.to_owned()]
296 } else {
297 vec![]
298 };
299
300 let cfg0 = AgentConfig {
301 urls: a0test_config.urls,
302 network_types: supported_network_types(),
303 multicast_dns_mode: MulticastDnsMode::Disabled,
304 nat_1to1_ips,
305 nat_1to1_ip_candidate_type: a0test_config.nat_1to1_ip_candidate_type,
306 net: Some(Arc::clone(&v.net0)),
307 ..Default::default()
308 };
309
310 let a_agent = Arc::new(Agent::new(cfg0).await?);
311 a_agent.on_connection_state_change(a_notifier);
312
313 let nat_1to1_ips = if a1test_config.nat_1to1_ip_candidate_type != CandidateType::Unspecified {
314 vec![VNET_GLOBAL_IPB.to_owned()]
315 } else {
316 vec![]
317 };
318 let cfg1 = AgentConfig {
319 urls: a1test_config.urls,
320 network_types: supported_network_types(),
321 multicast_dns_mode: MulticastDnsMode::Disabled,
322 nat_1to1_ips,
323 nat_1to1_ip_candidate_type: a1test_config.nat_1to1_ip_candidate_type,
324 net: Some(Arc::clone(&v.net1)),
325 ..Default::default()
326 };
327
328 let b_agent = Arc::new(Agent::new(cfg1).await?);
329 b_agent.on_connection_state_change(b_notifier);
330
331 let (a_conn, b_conn) = connect_with_vnet(&a_agent, &b_agent).await?;
332
333 // Ensure pair selected
334 // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair
335 let _ = a_connected.recv().await;
336 let _ = b_connected.recv().await;
337
338 Ok((a_conn, b_conn))
339 }
340
on_connected() -> (OnConnectionStateChangeHdlrFn, mpsc::Receiver<()>)341 pub(crate) fn on_connected() -> (OnConnectionStateChangeHdlrFn, mpsc::Receiver<()>) {
342 let (done_tx, done_rx) = mpsc::channel::<()>(1);
343 let done_tx = Arc::new(Mutex::new(Some(done_tx)));
344 let hdlr_fn: OnConnectionStateChangeHdlrFn = Box::new(move |state: ConnectionState| {
345 let done_tx_clone = Arc::clone(&done_tx);
346 Box::pin(async move {
347 if state == ConnectionState::Connected {
348 let mut tx = done_tx_clone.lock().await;
349 tx.take();
350 }
351 })
352 });
353 (hdlr_fn, done_rx)
354 }
355
gather_and_exchange_candidates( a_agent: &Arc<Agent>, b_agent: &Arc<Agent>, ) -> Result<(), Error>356 pub(crate) async fn gather_and_exchange_candidates(
357 a_agent: &Arc<Agent>,
358 b_agent: &Arc<Agent>,
359 ) -> Result<(), Error> {
360 let wg = WaitGroup::new();
361
362 let w1 = Arc::new(Mutex::new(Some(wg.worker())));
363 a_agent.on_candidate(Box::new(
364 move |candidate: Option<Arc<dyn Candidate + Send + Sync>>| {
365 let w3 = Arc::clone(&w1);
366 Box::pin(async move {
367 if candidate.is_none() {
368 let mut w = w3.lock().await;
369 w.take();
370 }
371 })
372 },
373 ));
374 a_agent.gather_candidates()?;
375
376 let w2 = Arc::new(Mutex::new(Some(wg.worker())));
377 b_agent.on_candidate(Box::new(
378 move |candidate: Option<Arc<dyn Candidate + Send + Sync>>| {
379 let w3 = Arc::clone(&w2);
380 Box::pin(async move {
381 if candidate.is_none() {
382 let mut w = w3.lock().await;
383 w.take();
384 }
385 })
386 },
387 ));
388 b_agent.gather_candidates()?;
389
390 wg.wait().await;
391
392 let candidates = a_agent.get_local_candidates().await?;
393 for c in candidates {
394 let c2: Arc<dyn Candidate + Send + Sync> =
395 Arc::new(unmarshal_candidate(c.marshal().as_str())?);
396 b_agent.add_remote_candidate(&c2)?;
397 }
398
399 let candidates = b_agent.get_local_candidates().await?;
400 for c in candidates {
401 let c2: Arc<dyn Candidate + Send + Sync> =
402 Arc::new(unmarshal_candidate(c.marshal().as_str())?);
403 a_agent.add_remote_candidate(&c2)?;
404 }
405
406 Ok(())
407 }
408
start_router(router: &Arc<Mutex<router::Router>>) -> Result<(), Error>409 pub(crate) async fn start_router(router: &Arc<Mutex<router::Router>>) -> Result<(), Error> {
410 let mut w = router.lock().await;
411 Ok(w.start().await?)
412 }
413
connect_net2router( net: &Arc<net::Net>, router: &Arc<Mutex<router::Router>>, ) -> Result<(), Error>414 pub(crate) async fn connect_net2router(
415 net: &Arc<net::Net>,
416 router: &Arc<Mutex<router::Router>>,
417 ) -> Result<(), Error> {
418 let nic = net.get_nic()?;
419
420 {
421 let mut w = router.lock().await;
422 w.add_net(Arc::clone(&nic)).await?;
423 }
424 {
425 let n = nic.lock().await;
426 n.set_router(Arc::clone(router)).await?;
427 }
428
429 Ok(())
430 }
431
connect_router2router( child: &Arc<Mutex<router::Router>>, parent: &Arc<Mutex<router::Router>>, ) -> Result<(), Error>432 pub(crate) async fn connect_router2router(
433 child: &Arc<Mutex<router::Router>>,
434 parent: &Arc<Mutex<router::Router>>,
435 ) -> Result<(), Error> {
436 {
437 let mut w = parent.lock().await;
438 w.add_router(Arc::clone(child)).await?;
439 }
440
441 {
442 let l = child.lock().await;
443 l.set_router(Arc::clone(parent)).await?;
444 }
445
446 Ok(())
447 }
448
449 #[tokio::test]
test_connectivity_simple_vnet_full_cone_nats_on_both_ends() -> Result<(), Error>450 async fn test_connectivity_simple_vnet_full_cone_nats_on_both_ends() -> Result<(), Error> {
451 /*env_logger::Builder::new()
452 .format(|buf, record| {
453 writeln!(
454 buf,
455 "{}:{} [{}] {} - {}",
456 record.file().unwrap_or("unknown"),
457 record.line().unwrap_or(0),
458 record.level(),
459 chrono::Local::now().format("%H:%M:%S.%6f"),
460 record.args()
461 )
462 })
463 .filter(None, log::LevelFilter::Trace)
464 .init();*/
465
466 let stun_server_url = Url {
467 scheme: SchemeType::Stun,
468 host: VNET_STUN_SERVER_IP.to_owned(),
469 port: VNET_STUN_SERVER_PORT,
470 proto: ProtoType::Udp,
471 ..Default::default()
472 };
473
474 // buildVNet with a Full-cone NATs both LANs
475 let nat_type = nat::NatType {
476 mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
477 filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
478 ..Default::default()
479 };
480
481 let v = build_simple_vnet(nat_type, nat_type).await?;
482
483 log::debug!("Connecting...");
484 let a0test_config = AgentTestConfig {
485 urls: vec![stun_server_url.clone()],
486 ..Default::default()
487 };
488 let a1test_config = AgentTestConfig {
489 urls: vec![stun_server_url.clone()],
490 ..Default::default()
491 };
492 let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
493
494 tokio::time::sleep(Duration::from_secs(1)).await;
495
496 log::debug!("Closing...");
497 v.close().await?;
498
499 Ok(())
500 }
501
502 #[tokio::test]
test_connectivity_vnet_full_cone_nats_on_both_ends() -> Result<(), Error>503 async fn test_connectivity_vnet_full_cone_nats_on_both_ends() -> Result<(), Error> {
504 /*env_logger::Builder::new()
505 .format(|buf, record| {
506 writeln!(
507 buf,
508 "{}:{} [{}] {} - {}",
509 record.file().unwrap_or("unknown"),
510 record.line().unwrap_or(0),
511 record.level(),
512 chrono::Local::now().format("%H:%M:%S.%6f"),
513 record.args()
514 )
515 })
516 .filter(None, log::LevelFilter::Trace)
517 .init();*/
518
519 let stun_server_url = Url {
520 scheme: SchemeType::Stun,
521 host: VNET_STUN_SERVER_IP.to_owned(),
522 port: VNET_STUN_SERVER_PORT,
523 proto: ProtoType::Udp,
524 ..Default::default()
525 };
526
527 let _turn_server_url = Url {
528 scheme: SchemeType::Turn,
529 host: VNET_STUN_SERVER_IP.to_owned(),
530 port: VNET_STUN_SERVER_PORT,
531 username: "user".to_owned(),
532 password: "pass".to_owned(),
533 proto: ProtoType::Udp,
534 };
535
536 // buildVNet with a Full-cone NATs both LANs
537 let nat_type = nat::NatType {
538 mapping_behavior: nat::EndpointDependencyType::EndpointIndependent,
539 filtering_behavior: nat::EndpointDependencyType::EndpointIndependent,
540 ..Default::default()
541 };
542
543 let v = build_vnet(nat_type, nat_type).await?;
544
545 log::debug!("Connecting...");
546 let a0test_config = AgentTestConfig {
547 urls: vec![stun_server_url.clone()],
548 ..Default::default()
549 };
550 let a1test_config = AgentTestConfig {
551 urls: vec![stun_server_url.clone()],
552 ..Default::default()
553 };
554 let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
555
556 tokio::time::sleep(Duration::from_secs(1)).await;
557
558 log::debug!("Closing...");
559 v.close().await?;
560
561 Ok(())
562 }
563
564 #[tokio::test]
test_connectivity_vnet_symmetric_nats_on_both_ends() -> Result<(), Error>565 async fn test_connectivity_vnet_symmetric_nats_on_both_ends() -> Result<(), Error> {
566 /*env_logger::Builder::new()
567 .format(|buf, record| {
568 writeln!(
569 buf,
570 "{}:{} [{}] {} - {}",
571 record.file().unwrap_or("unknown"),
572 record.line().unwrap_or(0),
573 record.level(),
574 chrono::Local::now().format("%H:%M:%S.%6f"),
575 record.args()
576 )
577 })
578 .filter(None, log::LevelFilter::Trace)
579 .init();*/
580
581 let stun_server_url = Url {
582 scheme: SchemeType::Stun,
583 host: VNET_STUN_SERVER_IP.to_owned(),
584 port: VNET_STUN_SERVER_PORT,
585 proto: ProtoType::Udp,
586 ..Default::default()
587 };
588
589 let turn_server_url = Url {
590 scheme: SchemeType::Turn,
591 host: VNET_STUN_SERVER_IP.to_owned(),
592 port: VNET_STUN_SERVER_PORT,
593 username: "user".to_owned(),
594 password: "pass".to_owned(),
595 proto: ProtoType::Udp,
596 };
597
598 // buildVNet with a Symmetric NATs for both LANs
599 let nat_type = nat::NatType {
600 mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
601 filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
602 ..Default::default()
603 };
604
605 let v = build_vnet(nat_type, nat_type).await?;
606
607 log::debug!("Connecting...");
608 let a0test_config = AgentTestConfig {
609 urls: vec![stun_server_url.clone(), turn_server_url.clone()],
610 ..Default::default()
611 };
612 let a1test_config = AgentTestConfig {
613 urls: vec![stun_server_url.clone()],
614 ..Default::default()
615 };
616 let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
617
618 tokio::time::sleep(Duration::from_secs(1)).await;
619
620 log::debug!("Closing...");
621 v.close().await?;
622
623 Ok(())
624 }
625
626 #[tokio::test]
test_connectivity_vnet_1to1_nat_with_host_candidate_vs_symmetric_nats() -> Result<(), Error>627 async fn test_connectivity_vnet_1to1_nat_with_host_candidate_vs_symmetric_nats() -> Result<(), Error>
628 {
629 /*env_logger::Builder::new()
630 .format(|buf, record| {
631 writeln!(
632 buf,
633 "{}:{} [{}] {} - {}",
634 record.file().unwrap_or("unknown"),
635 record.line().unwrap_or(0),
636 record.level(),
637 chrono::Local::now().format("%H:%M:%S.%6f"),
638 record.args()
639 )
640 })
641 .filter(None, log::LevelFilter::Trace)
642 .init();*/
643
644 // Agent0 is behind 1:1 NAT
645 let nat_type0 = nat::NatType {
646 mode: nat::NatMode::Nat1To1,
647 ..Default::default()
648 };
649 // Agent1 is behind a symmetric NAT
650 let nat_type1 = nat::NatType {
651 mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
652 filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
653 ..Default::default()
654 };
655 log::debug!("natType0: {:?}", nat_type0);
656 log::debug!("natType1: {:?}", nat_type1);
657
658 let v = build_vnet(nat_type0, nat_type1).await?;
659
660 log::debug!("Connecting...");
661 let a0test_config = AgentTestConfig {
662 urls: vec![],
663 nat_1to1_ip_candidate_type: CandidateType::Host, // Use 1:1 NAT IP as a host candidate
664 };
665 let a1test_config = AgentTestConfig {
666 urls: vec![],
667 ..Default::default()
668 };
669 let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
670
671 tokio::time::sleep(Duration::from_secs(1)).await;
672
673 log::debug!("Closing...");
674 v.close().await?;
675
676 Ok(())
677 }
678
679 #[tokio::test]
test_connectivity_vnet_1to1_nat_with_srflx_candidate_vs_symmetric_nats( ) -> Result<(), Error>680 async fn test_connectivity_vnet_1to1_nat_with_srflx_candidate_vs_symmetric_nats(
681 ) -> Result<(), Error> {
682 /*env_logger::Builder::new()
683 .format(|buf, record| {
684 writeln!(
685 buf,
686 "{}:{} [{}] {} - {}",
687 record.file().unwrap_or("unknown"),
688 record.line().unwrap_or(0),
689 record.level(),
690 chrono::Local::now().format("%H:%M:%S.%6f"),
691 record.args()
692 )
693 })
694 .filter(None, log::LevelFilter::Trace)
695 .init();*/
696
697 // Agent0 is behind 1:1 NAT
698 let nat_type0 = nat::NatType {
699 mode: nat::NatMode::Nat1To1,
700 ..Default::default()
701 };
702 // Agent1 is behind a symmetric NAT
703 let nat_type1 = nat::NatType {
704 mapping_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
705 filtering_behavior: nat::EndpointDependencyType::EndpointAddrPortDependent,
706 ..Default::default()
707 };
708 log::debug!("natType0: {:?}", nat_type0);
709 log::debug!("natType1: {:?}", nat_type1);
710
711 let v = build_vnet(nat_type0, nat_type1).await?;
712
713 log::debug!("Connecting...");
714 let a0test_config = AgentTestConfig {
715 urls: vec![],
716 nat_1to1_ip_candidate_type: CandidateType::ServerReflexive, // Use 1:1 NAT IP as a srflx candidate
717 };
718 let a1test_config = AgentTestConfig {
719 urls: vec![],
720 ..Default::default()
721 };
722 let (_ca, _cb) = pipe_with_vnet(&v, a0test_config, a1test_config).await?;
723
724 tokio::time::sleep(Duration::from_secs(1)).await;
725
726 log::debug!("Closing...");
727 v.close().await?;
728
729 Ok(())
730 }
731
block_until_state_seen( expected_state: ConnectionState, state_queue: &mut mpsc::Receiver<ConnectionState>, )732 async fn block_until_state_seen(
733 expected_state: ConnectionState,
734 state_queue: &mut mpsc::Receiver<ConnectionState>,
735 ) {
736 while let Some(s) = state_queue.recv().await {
737 if s == expected_state {
738 return;
739 }
740 }
741 }
742
743 // test_disconnected_to_connected asserts that an agent can go to disconnected, and then return to connected successfully
744 #[tokio::test]
test_disconnected_to_connected() -> Result<(), Error>745 async fn test_disconnected_to_connected() -> Result<(), Error> {
746 /*env_logger::Builder::new()
747 .format(|buf, record| {
748 writeln!(
749 buf,
750 "{}:{} [{}] {} - {}",
751 record.file().unwrap_or("unknown"),
752 record.line().unwrap_or(0),
753 record.level(),
754 chrono::Local::now().format("%H:%M:%S.%6f"),
755 record.args()
756 )
757 })
758 .filter(None, log::LevelFilter::Trace)
759 .init();*/
760
761 // Create a network with two interfaces
762 let wan = router::Router::new(router::RouterConfig {
763 cidr: "0.0.0.0/0".to_owned(),
764 ..Default::default()
765 })?;
766
767 let drop_all_data = Arc::new(AtomicU64::new(0));
768 let drop_all_data2 = Arc::clone(&drop_all_data);
769 wan.add_chunk_filter(Box::new(move |_c: &(dyn Chunk + Send + Sync)| -> bool {
770 drop_all_data2.load(Ordering::SeqCst) != 1
771 }))
772 .await;
773 let wan = Arc::new(Mutex::new(wan));
774
775 let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
776 static_ips: vec!["192.168.0.1".to_owned()],
777 ..Default::default()
778 })));
779 let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
780 static_ips: vec!["192.168.0.2".to_owned()],
781 ..Default::default()
782 })));
783
784 connect_net2router(&net0, &wan).await?;
785 connect_net2router(&net1, &wan).await?;
786 start_router(&wan).await?;
787
788 let disconnected_timeout = Duration::from_secs(1);
789 let keepalive_interval = Duration::from_millis(20);
790
791 // Create two agents and connect them
792 let controlling_agent = Arc::new(
793 Agent::new(AgentConfig {
794 network_types: supported_network_types(),
795 multicast_dns_mode: MulticastDnsMode::Disabled,
796 net: Some(Arc::clone(&net0)),
797 disconnected_timeout: Some(disconnected_timeout),
798 keepalive_interval: Some(keepalive_interval),
799 check_interval: keepalive_interval,
800 ..Default::default()
801 })
802 .await?,
803 );
804
805 let controlled_agent = Arc::new(
806 Agent::new(AgentConfig {
807 network_types: supported_network_types(),
808 multicast_dns_mode: MulticastDnsMode::Disabled,
809 net: Some(Arc::clone(&net1)),
810 disconnected_timeout: Some(disconnected_timeout),
811 keepalive_interval: Some(keepalive_interval),
812 check_interval: keepalive_interval,
813 ..Default::default()
814 })
815 .await?,
816 );
817
818 let (controlling_state_changes_tx, mut controlling_state_changes_rx) =
819 mpsc::channel::<ConnectionState>(100);
820 let controlling_state_changes_tx = Arc::new(controlling_state_changes_tx);
821 controlling_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
822 let controlling_state_changes_tx_clone = Arc::clone(&controlling_state_changes_tx);
823 Box::pin(async move {
824 let _ = controlling_state_changes_tx_clone.try_send(c);
825 })
826 }));
827
828 let (controlled_state_changes_tx, mut controlled_state_changes_rx) =
829 mpsc::channel::<ConnectionState>(100);
830 let controlled_state_changes_tx = Arc::new(controlled_state_changes_tx);
831 controlled_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
832 let controlled_state_changes_tx_clone = Arc::clone(&controlled_state_changes_tx);
833 Box::pin(async move {
834 let _ = controlled_state_changes_tx_clone.try_send(c);
835 })
836 }));
837
838 connect_with_vnet(&controlling_agent, &controlled_agent).await?;
839
840 // Assert we have gone to connected
841 block_until_state_seen(
842 ConnectionState::Connected,
843 &mut controlling_state_changes_rx,
844 )
845 .await;
846 block_until_state_seen(ConnectionState::Connected, &mut controlled_state_changes_rx).await;
847
848 // Drop all packets, and block until we have gone to disconnected
849 drop_all_data.store(1, Ordering::SeqCst);
850 block_until_state_seen(
851 ConnectionState::Disconnected,
852 &mut controlling_state_changes_rx,
853 )
854 .await;
855 block_until_state_seen(
856 ConnectionState::Disconnected,
857 &mut controlled_state_changes_rx,
858 )
859 .await;
860
861 // Allow all packets through again, block until we have gone to connected
862 drop_all_data.store(0, Ordering::SeqCst);
863 block_until_state_seen(
864 ConnectionState::Connected,
865 &mut controlling_state_changes_rx,
866 )
867 .await;
868 block_until_state_seen(ConnectionState::Connected, &mut controlled_state_changes_rx).await;
869
870 {
871 let mut w = wan.lock().await;
872 w.stop().await?;
873 }
874
875 controlling_agent.close().await?;
876 controlled_agent.close().await?;
877
878 Ok(())
879 }
880
881 //use std::io::Write;
882
883 // Agent.Write should use the best valid pair if a selected pair is not yet available
884 #[tokio::test]
test_write_use_valid_pair() -> Result<(), Error>885 async fn test_write_use_valid_pair() -> Result<(), Error> {
886 /*env_logger::Builder::new()
887 .format(|buf, record| {
888 writeln!(
889 buf,
890 "{}:{} [{}] {} - {}",
891 record.file().unwrap_or("unknown"),
892 record.line().unwrap_or(0),
893 record.level(),
894 chrono::Local::now().format("%H:%M:%S.%6f"),
895 record.args()
896 )
897 })
898 .filter(None, log::LevelFilter::Trace)
899 .init();*/
900
901 // Create a network with two interfaces
902 let wan = router::Router::new(router::RouterConfig {
903 cidr: "0.0.0.0/0".to_owned(),
904 ..Default::default()
905 })?;
906
907 wan.add_chunk_filter(Box::new(move |c: &(dyn Chunk + Send + Sync)| -> bool {
908 let raw = c.user_data();
909 if stun::message::is_message(&raw) {
910 let mut m = stun::message::Message {
911 raw,
912 ..Default::default()
913 };
914 let result = m.decode();
915 if result.is_err() | m.contains(stun::attributes::ATTR_USE_CANDIDATE) {
916 return false;
917 }
918 }
919
920 true
921 }))
922 .await;
923 let wan = Arc::new(Mutex::new(wan));
924
925 let net0 = Arc::new(net::Net::new(Some(net::NetConfig {
926 static_ips: vec!["192.168.0.1".to_owned()],
927 ..Default::default()
928 })));
929 let net1 = Arc::new(net::Net::new(Some(net::NetConfig {
930 static_ips: vec!["192.168.0.2".to_owned()],
931 ..Default::default()
932 })));
933
934 connect_net2router(&net0, &wan).await?;
935 connect_net2router(&net1, &wan).await?;
936 start_router(&wan).await?;
937
938 // Create two agents and connect them
939 let controlling_agent = Arc::new(
940 Agent::new(AgentConfig {
941 network_types: supported_network_types(),
942 multicast_dns_mode: MulticastDnsMode::Disabled,
943 net: Some(Arc::clone(&net0)),
944 ..Default::default()
945 })
946 .await?,
947 );
948
949 let controlled_agent = Arc::new(
950 Agent::new(AgentConfig {
951 network_types: supported_network_types(),
952 multicast_dns_mode: MulticastDnsMode::Disabled,
953 net: Some(Arc::clone(&net1)),
954 ..Default::default()
955 })
956 .await?,
957 );
958
959 gather_and_exchange_candidates(&controlling_agent, &controlled_agent).await?;
960
961 let (controlling_ufrag, controlling_pwd) = controlling_agent.get_local_user_credentials().await;
962 let (controlled_ufrag, controlled_pwd) = controlled_agent.get_local_user_credentials().await;
963
964 let controlling_agent_tx = Arc::clone(&controlling_agent);
965 tokio::spawn(async move {
966 let test_message = "Test Message";
967 let controlling_agent_conn = {
968 controlling_agent_tx
969 .internal
970 .start_connectivity_checks(true, controlled_ufrag, controlled_pwd)
971 .await?;
972 Arc::clone(&controlling_agent_tx.internal.agent_conn) as Arc<dyn Conn + Send + Sync>
973 };
974
975 log::debug!("controlling_agent start_connectivity_checks done...");
976 loop {
977 let result = controlling_agent_conn.send(test_message.as_bytes()).await;
978 if result.is_err() {
979 break;
980 }
981
982 tokio::time::sleep(Duration::from_millis(20)).await;
983 }
984
985 Result::<(), Error>::Ok(())
986 });
987
988 let controlled_agent_conn = {
989 controlled_agent
990 .internal
991 .start_connectivity_checks(false, controlling_ufrag, controlling_pwd)
992 .await?;
993 Arc::clone(&controlled_agent.internal.agent_conn) as Arc<dyn Conn + Send + Sync>
994 };
995
996 log::debug!("controlled_agent start_connectivity_checks done...");
997
998 let test_message = "Test Message";
999 let mut read_buf = vec![0u8; test_message.as_bytes().len()];
1000 controlled_agent_conn.recv(&mut read_buf).await?;
1001
1002 assert_eq!(read_buf, test_message.as_bytes(), "should match");
1003
1004 {
1005 let mut w = wan.lock().await;
1006 w.stop().await?;
1007 }
1008
1009 controlling_agent.close().await?;
1010 controlled_agent.close().await?;
1011
1012 Ok(())
1013 }
1014