1 #[cfg(test)]
2 mod router_test;
3
4 use crate::error::*;
5 use crate::vnet::chunk::*;
6 use crate::vnet::chunk_queue::*;
7 use crate::vnet::interface::*;
8 use crate::vnet::nat::*;
9 use crate::vnet::net::*;
10 use crate::vnet::resolver::*;
11
12 use async_trait::async_trait;
13 use ipnet::*;
14 use std::collections::HashMap;
15 use std::future::Future;
16 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
17 use std::ops::{Add, Sub};
18 use std::pin::Pin;
19 use std::str::FromStr;
20 use std::sync::atomic::{AtomicU64, Ordering};
21 use std::sync::Arc;
22 use std::time::SystemTime;
23 use tokio::sync::{mpsc, Mutex};
24 use tokio::time::Duration;
25
26 const DEFAULT_ROUTER_QUEUE_SIZE: usize = 0; // unlimited
27
28 lazy_static! {
29 pub static ref ROUTER_ID_CTR: AtomicU64 = AtomicU64::new(0);
30 }
31
32 // Generate a unique router name
assign_router_name() -> String33 fn assign_router_name() -> String {
34 let n = ROUTER_ID_CTR.fetch_add(1, Ordering::SeqCst);
35 format!("router{n}")
36 }
37
38 // RouterConfig ...
39 #[derive(Default)]
40 pub struct RouterConfig {
41 // name of router. If not specified, a unique name will be assigned.
42 pub name: String,
43 // cidr notation, like "192.0.2.0/24"
44 pub cidr: String,
45 // static_ips is an array of static IP addresses to be assigned for this router.
46 // If no static IP address is given, the router will automatically assign
47 // an IP address.
48 // This will be ignored if this router is the root.
49 pub static_ips: Vec<String>,
50 // static_ip is deprecated. Use static_ips.
51 pub static_ip: String,
52 // Internal queue size
53 pub queue_size: usize,
54 // Effective only when this router has a parent router
55 pub nat_type: Option<NatType>,
56 // Minimum Delay
57 pub min_delay: Duration,
58 // Max Jitter
59 pub max_jitter: Duration,
60 }
61
62 // NIC is a network interface controller that interfaces Router
63 #[async_trait]
64 pub trait Nic {
get_interface(&self, ifc_name: &str) -> Option<Interface>65 async fn get_interface(&self, ifc_name: &str) -> Option<Interface>;
add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>66 async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>;
on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>)67 async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>);
get_static_ips(&self) -> Vec<IpAddr>68 async fn get_static_ips(&self) -> Vec<IpAddr>;
set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>69 async fn set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>;
70 }
71
72 // ChunkFilter is a handler users can add to filter chunks.
73 // If the filter returns false, the packet will be dropped.
74 pub type ChunkFilterFn = Box<dyn (Fn(&(dyn Chunk + Send + Sync)) -> bool) + Send + Sync>;
75
76 #[derive(Default)]
77 pub struct RouterInternal {
78 pub(crate) nat_type: Option<NatType>, // read-only
79 pub(crate) ipv4net: IpNet, // read-only
80 pub(crate) parent: Option<Arc<Mutex<Router>>>, // read-only
81 pub(crate) nat: NetworkAddressTranslator, // read-only
82 pub(crate) nics: HashMap<String, Arc<Mutex<dyn Nic + Send + Sync>>>, // read-only
83 pub(crate) chunk_filters: Vec<ChunkFilterFn>, // requires mutex [x]
84 pub(crate) last_id: u8, // requires mutex [x], used to assign the last digit of IPv4 address
85 }
86
87 // Router ...
88 #[derive(Default)]
89 pub struct Router {
90 name: String, // read-only
91 ipv4net: IpNet, // read-only
92 min_delay: Duration, // requires mutex [x]
93 max_jitter: Duration, // requires mutex [x]
94 queue: Arc<ChunkQueue>, // read-only
95 interfaces: Vec<Interface>, // read-only
96 static_ips: Vec<IpAddr>, // read-only
97 static_local_ips: HashMap<String, IpAddr>, // read-only,
98 children: Vec<Arc<Mutex<Router>>>, // read-only
99 done: Option<mpsc::Sender<()>>, // requires mutex [x]
100 pub(crate) resolver: Arc<Mutex<Resolver>>, // read-only
101 push_ch: Option<mpsc::Sender<()>>, // writer requires mutex
102 router_internal: Arc<Mutex<RouterInternal>>,
103 }
104
105 #[async_trait]
106 impl Nic for Router {
get_interface(&self, ifc_name: &str) -> Option<Interface>107 async fn get_interface(&self, ifc_name: &str) -> Option<Interface> {
108 for ifc in &self.interfaces {
109 if ifc.name == ifc_name {
110 return Some(ifc.clone());
111 }
112 }
113 None
114 }
115
add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>116 async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()> {
117 for ifc in &mut self.interfaces {
118 if ifc.name == ifc_name {
119 for addr in addrs {
120 ifc.add_addr(*addr);
121 }
122 return Ok(());
123 }
124 }
125
126 Err(Error::ErrNotFound)
127 }
128
on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>)129 async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>) {
130 let from_parent: Box<dyn Chunk + Send + Sync> = {
131 let router_internal = self.router_internal.lock().await;
132 match router_internal.nat.translate_inbound(&*c).await {
133 Ok(from) => {
134 if let Some(from) = from {
135 from
136 } else {
137 return;
138 }
139 }
140 Err(err) => {
141 log::warn!("[{}] {}", self.name, err);
142 return;
143 }
144 }
145 };
146
147 self.push(from_parent).await;
148 }
149
get_static_ips(&self) -> Vec<IpAddr>150 async fn get_static_ips(&self) -> Vec<IpAddr> {
151 self.static_ips.clone()
152 }
153
154 // caller must hold the mutex
set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()>155 async fn set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()> {
156 {
157 let mut router_internal = self.router_internal.lock().await;
158 router_internal.parent = Some(Arc::clone(&parent));
159 }
160
161 let parent_resolver = {
162 let p = parent.lock().await;
163 Arc::clone(&p.resolver)
164 };
165 {
166 let mut resolver = self.resolver.lock().await;
167 resolver.set_parent(parent_resolver);
168 }
169
170 let mut mapped_ips = vec![];
171 let mut local_ips = vec![];
172
173 // when this method is called, one or more IP address has already been assigned by
174 // the parent router.
175 if let Some(ifc) = self.get_interface("eth0").await {
176 for ifc_addr in ifc.addrs() {
177 let ip = ifc_addr.addr();
178 mapped_ips.push(ip);
179
180 if let Some(loc_ip) = self.static_local_ips.get(&ip.to_string()) {
181 local_ips.push(*loc_ip);
182 }
183 }
184 } else {
185 return Err(Error::ErrNoIpaddrEth0);
186 }
187
188 // Set up NAT here
189 {
190 let mut router_internal = self.router_internal.lock().await;
191 if router_internal.nat_type.is_none() {
192 router_internal.nat_type = Some(NatType {
193 mapping_behavior: EndpointDependencyType::EndpointIndependent,
194 filtering_behavior: EndpointDependencyType::EndpointAddrPortDependent,
195 hair_pining: false,
196 port_preservation: false,
197 mapping_life_time: Duration::from_secs(30),
198 ..Default::default()
199 });
200 }
201
202 router_internal.nat = NetworkAddressTranslator::new(NatConfig {
203 name: self.name.clone(),
204 nat_type: router_internal.nat_type.unwrap(),
205 mapped_ips,
206 local_ips,
207 })?;
208 }
209
210 Ok(())
211 }
212 }
213
214 impl Router {
new(config: RouterConfig) -> Result<Self>215 pub fn new(config: RouterConfig) -> Result<Self> {
216 let ipv4net: IpNet = config.cidr.parse()?;
217
218 let queue_size = if config.queue_size > 0 {
219 config.queue_size
220 } else {
221 DEFAULT_ROUTER_QUEUE_SIZE
222 };
223
224 // set up network interface, lo0
225 let mut lo0 = Interface::new(LO0_STR.to_owned(), vec![]);
226 if let Ok(ipnet) = Interface::convert(
227 SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
228 Some(SocketAddr::new(Ipv4Addr::new(255, 0, 0, 0).into(), 0)),
229 ) {
230 lo0.add_addr(ipnet);
231 }
232
233 // set up network interface, eth0
234 let eth0 = Interface::new("eth0".to_owned(), vec![]);
235
236 // local host name resolver
237 let resolver = Arc::new(Mutex::new(Resolver::new()));
238
239 let name = if config.name.is_empty() {
240 assign_router_name()
241 } else {
242 config.name.clone()
243 };
244
245 let mut static_ips = vec![];
246 let mut static_local_ips = HashMap::new();
247 for ip_str in &config.static_ips {
248 let ip_pair: Vec<&str> = ip_str.split('/').collect();
249 if let Ok(ip) = IpAddr::from_str(ip_pair[0]) {
250 if ip_pair.len() > 1 {
251 let loc_ip = IpAddr::from_str(ip_pair[1])?;
252 if !ipv4net.contains(&loc_ip) {
253 return Err(Error::ErrLocalIpBeyondStaticIpsSubset);
254 }
255 static_local_ips.insert(ip.to_string(), loc_ip);
256 }
257 static_ips.push(ip);
258 }
259 }
260 if !config.static_ip.is_empty() {
261 log::warn!("static_ip is deprecated. Use static_ips instead");
262 if let Ok(ip) = IpAddr::from_str(&config.static_ip) {
263 static_ips.push(ip);
264 }
265 }
266
267 let n_static_local = static_local_ips.len();
268 if n_static_local > 0 && n_static_local != static_ips.len() {
269 return Err(Error::ErrLocalIpNoStaticsIpsAssociated);
270 }
271
272 let router_internal = RouterInternal {
273 nat_type: config.nat_type,
274 ipv4net,
275 nics: HashMap::new(),
276 ..Default::default()
277 };
278
279 Ok(Router {
280 name,
281 ipv4net,
282 interfaces: vec![lo0, eth0],
283 static_ips,
284 static_local_ips,
285 resolver,
286 router_internal: Arc::new(Mutex::new(router_internal)),
287 queue: Arc::new(ChunkQueue::new(queue_size)),
288 min_delay: config.min_delay,
289 max_jitter: config.max_jitter,
290 ..Default::default()
291 })
292 }
293
294 // caller must hold the mutex
get_interfaces(&self) -> &[Interface]295 pub(crate) fn get_interfaces(&self) -> &[Interface] {
296 &self.interfaces
297 }
298
299 // Start ...
start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>>300 pub fn start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
301 if self.done.is_some() {
302 return Box::pin(async move { Err(Error::ErrRouterAlreadyStarted) });
303 }
304
305 let (done_tx, mut done_rx) = mpsc::channel(1);
306 let (push_ch_tx, mut push_ch_rx) = mpsc::channel(1);
307 self.done = Some(done_tx);
308 self.push_ch = Some(push_ch_tx);
309
310 let router_internal = Arc::clone(&self.router_internal);
311 let queue = Arc::clone(&self.queue);
312 let max_jitter = self.max_jitter;
313 let min_delay = self.min_delay;
314 let name = self.name.clone();
315 let ipv4net = self.ipv4net;
316
317 tokio::spawn(async move {
318 while let Ok(d) = Router::process_chunks(
319 &name,
320 ipv4net,
321 max_jitter,
322 min_delay,
323 &queue,
324 &router_internal,
325 )
326 .await
327 {
328 if d == Duration::from_secs(0) {
329 tokio::select! {
330 _ = push_ch_rx.recv() =>{},
331 _ = done_rx.recv() => break,
332 }
333 } else {
334 let t = tokio::time::sleep(d);
335 tokio::pin!(t);
336
337 tokio::select! {
338 _ = t.as_mut() => {},
339 _ = done_rx.recv() => break,
340 }
341 }
342 }
343 });
344
345 let children = self.children.clone();
346 Box::pin(async move { Router::start_childen(children).await })
347 }
348
349 // Stop ...
stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>>350 pub fn stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
351 if self.done.is_none() {
352 return Box::pin(async move { Err(Error::ErrRouterAlreadyStopped) });
353 }
354 self.push_ch.take();
355 self.done.take();
356
357 let children = self.children.clone();
358 Box::pin(async move { Router::stop_childen(children).await })
359 }
360
start_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()>361 async fn start_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
362 for child in children {
363 let mut c = child.lock().await;
364 c.start().await?;
365 }
366
367 Ok(())
368 }
369
stop_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()>370 async fn stop_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
371 for child in children {
372 let mut c = child.lock().await;
373 c.stop().await?;
374 }
375
376 Ok(())
377 }
378
379 // AddRouter adds a chile Router.
380 // after parent.add_router(child), also call child.set_router(parent) to set child's parent router
add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()>381 pub async fn add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()> {
382 // Router is a NIC. Add it as a NIC so that packets are routed to this child
383 // router.
384 let nic = Arc::clone(&child) as Arc<Mutex<dyn Nic + Send + Sync>>;
385 self.children.push(child);
386 self.add_net(nic).await
387 }
388
389 // AddNet ...
390 // after router.add_net(nic), also call nic.set_router(router) to set nic's router
add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()>391 pub async fn add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
392 let mut router_internal = self.router_internal.lock().await;
393 router_internal.add_nic(nic).await
394 }
395
396 // AddHost adds a mapping of hostname and an IP address to the local resolver.
add_host(&mut self, host_name: String, ip_addr: String) -> Result<()>397 pub async fn add_host(&mut self, host_name: String, ip_addr: String) -> Result<()> {
398 let mut resolver = self.resolver.lock().await;
399 resolver.add_host(host_name, ip_addr)
400 }
401
402 // AddChunkFilter adds a filter for chunks traversing this router.
403 // You may add more than one filter. The filters are called in the order of this method call.
404 // If a chunk is dropped by a filter, subsequent filter will not receive the chunk.
add_chunk_filter(&self, filter: ChunkFilterFn)405 pub async fn add_chunk_filter(&self, filter: ChunkFilterFn) {
406 let mut router_internal = self.router_internal.lock().await;
407 router_internal.chunk_filters.push(filter);
408 }
409
push(&self, mut c: Box<dyn Chunk + Send + Sync>)410 pub(crate) async fn push(&self, mut c: Box<dyn Chunk + Send + Sync>) {
411 log::debug!("[{}] route {}", self.name, c);
412 if self.done.is_some() {
413 c.set_timestamp();
414
415 if self.queue.push(c).await {
416 if let Some(push_ch) = &self.push_ch {
417 let _ = push_ch.try_send(());
418 }
419 } else {
420 log::warn!("[{}] queue was full. dropped a chunk", self.name);
421 }
422 } else {
423 log::warn!("router is done");
424 }
425 }
426
process_chunks( name: &str, ipv4net: IpNet, max_jitter: Duration, min_delay: Duration, queue: &Arc<ChunkQueue>, router_internal: &Arc<Mutex<RouterInternal>>, ) -> Result<Duration>427 async fn process_chunks(
428 name: &str,
429 ipv4net: IpNet,
430 max_jitter: Duration,
431 min_delay: Duration,
432 queue: &Arc<ChunkQueue>,
433 router_internal: &Arc<Mutex<RouterInternal>>,
434 ) -> Result<Duration> {
435 // Introduce jitter by delaying the processing of chunks.
436 let mj = max_jitter.as_nanos() as u64;
437 if mj > 0 {
438 let jitter = Duration::from_nanos(rand::random::<u64>() % mj);
439 tokio::time::sleep(jitter).await;
440 }
441
442 // cut_off
443 // v min delay
444 // |<--->|
445 // +------------:--
446 // |OOOOOOXXXXX : --> time
447 // +------------:--
448 // |<--->| now
449 // due
450
451 let entered_at = SystemTime::now();
452 let cut_off = entered_at.sub(min_delay);
453
454 // the next sleep duration
455 let mut d;
456
457 loop {
458 d = Duration::from_secs(0);
459
460 if let Some(c) = queue.peek().await {
461 // check timestamp to find if the chunk is due
462 if c.get_timestamp().duration_since(cut_off).is_ok() {
463 // There is one or more chunk in the queue but none of them are due.
464 // Calculate the next sleep duration here.
465 let next_expire = c.get_timestamp().add(min_delay);
466 if let Ok(diff) = next_expire.duration_since(entered_at) {
467 d = diff;
468 break;
469 }
470 }
471 } else {
472 break; // no more chunk in the queue
473 }
474
475 if let Some(c) = queue.pop().await {
476 let ri = router_internal.lock().await;
477 let mut blocked = false;
478 for filter in &ri.chunk_filters {
479 if !filter(&*c) {
480 blocked = true;
481 break;
482 }
483 }
484 if blocked {
485 continue; // discard
486 }
487
488 let dst_ip = c.get_destination_ip();
489
490 // check if the destination is in our subnet
491 if ipv4net.contains(&dst_ip) {
492 // search for the destination NIC
493 if let Some(nic) = ri.nics.get(&dst_ip.to_string()) {
494 // found the NIC, forward the chunk to the NIC.
495 // call to NIC must unlock mutex
496 let ni = nic.lock().await;
497 ni.on_inbound_chunk(c).await;
498 } else {
499 // NIC not found. drop it.
500 log::debug!("[{}] {} unreachable", name, c);
501 }
502 } else {
503 // the destination is outside of this subnet
504 // is this WAN?
505 if let Some(parent) = &ri.parent {
506 // Pass it to the parent via NAT
507 if let Some(to_parent) = ri.nat.translate_outbound(&*c).await? {
508 // call to parent router mutex unlock mutex
509 let p = parent.lock().await;
510 p.push(to_parent).await;
511 }
512 } else {
513 // this WAN. No route for this chunk
514 log::debug!("[{}] no route found for {}", name, c);
515 }
516 }
517 } else {
518 break; // no more chunk in the queue
519 }
520 }
521
522 Ok(d)
523 }
524 }
525
526 impl RouterInternal {
527 // caller must hold the mutex
add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()>528 pub(crate) async fn add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
529 let mut ips = {
530 let ni = nic.lock().await;
531 ni.get_static_ips().await
532 };
533
534 if ips.is_empty() {
535 // assign an IP address
536 let ip = self.assign_ip_address()?;
537 log::debug!("assign_ip_address: {}", ip);
538 ips.push(ip);
539 }
540
541 let mut ipnets = vec![];
542 for ip in &ips {
543 if !self.ipv4net.contains(ip) {
544 return Err(Error::ErrStaticIpIsBeyondSubnet);
545 }
546 self.nics.insert(ip.to_string(), Arc::clone(&nic));
547 ipnets.push(IpNet::from_str(&format!(
548 "{}/{}",
549 ip,
550 self.ipv4net.prefix_len()
551 ))?);
552 }
553
554 {
555 let mut ni = nic.lock().await;
556 let _ = ni.add_addrs_to_interface("eth0", &ipnets).await;
557 }
558
559 Ok(())
560 }
561
562 // caller should hold the mutex
assign_ip_address(&mut self) -> Result<IpAddr>563 fn assign_ip_address(&mut self) -> Result<IpAddr> {
564 // See: https://stackoverflow.com/questions/14915188/ip-address-ending-with-zero
565
566 if self.last_id == 0xfe {
567 return Err(Error::ErrAddressSpaceExhausted);
568 }
569
570 self.last_id += 1;
571 match self.ipv4net.addr() {
572 IpAddr::V4(ipv4) => {
573 let mut ip = ipv4.octets();
574 ip[3] = self.last_id;
575 Ok(IpAddr::V4(Ipv4Addr::from(ip)))
576 }
577 IpAddr::V6(ipv6) => {
578 let mut ip = ipv6.octets();
579 ip[15] += self.last_id;
580 Ok(IpAddr::V6(Ipv6Addr::from(ip)))
581 }
582 }
583 }
584 }
585