xref: /webrtc/util/src/vnet/router.rs (revision 5d8fe953)
1 #[cfg(test)]
2 mod router_test;
3 
4 use crate::error::*;
5 use crate::vnet::chunk::*;
6 use crate::vnet::chunk_queue::*;
7 use crate::vnet::interface::*;
8 use crate::vnet::nat::*;
9 use crate::vnet::net::*;
10 use crate::vnet::resolver::*;
11 
12 use async_trait::async_trait;
13 use ipnet::*;
14 use std::collections::HashMap;
15 use std::future::Future;
16 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
17 use std::ops::{Add, Sub};
18 use std::pin::Pin;
19 use std::str::FromStr;
20 use std::sync::atomic::{AtomicU64, Ordering};
21 use std::sync::Arc;
22 use std::time::SystemTime;
23 use tokio::sync::{mpsc, Mutex};
24 use tokio::time::Duration;
25 
26 const DEFAULT_ROUTER_QUEUE_SIZE: usize = 0; // unlimited
27 
28 lazy_static! {
29     pub static ref ROUTER_ID_CTR: AtomicU64 = AtomicU64::new(0);
30 }
31 
32 // Generate a unique router name
assign_router_name() -> String33 fn assign_router_name() -> String {
34     let n = ROUTER_ID_CTR.fetch_add(1, Ordering::SeqCst);
35     format!("router{n}")
36 }
37 
38 // RouterConfig ...
39 #[derive(Default)]
40 pub struct RouterConfig {
41     // name of router. If not specified, a unique name will be assigned.
42     pub name: String,
43     // cidr notation, like "192.0.2.0/24"
44     pub cidr: String,
45     // static_ips is an array of static IP addresses to be assigned for this router.
46     // If no static IP address is given, the router will automatically assign
47     // an IP address.
48     // This will be ignored if this router is the root.
49     pub static_ips: Vec<String>,
50     // static_ip is deprecated. Use static_ips.
51     pub static_ip: String,
52     // Internal queue size
53     pub queue_size: usize,
54     // Effective only when this router has a parent router
55     pub nat_type: Option<NatType>,
56     // Minimum Delay
57     pub min_delay: Duration,
58     // Max Jitter
59     pub max_jitter: Duration,
60 }
61 
62 // NIC is a network interface controller that interfaces Router
63 #[async_trait]
64 pub trait Nic {
get_interface(&self, ifc_name: &str) -> Option<Interface>65     async fn get_interface(&self, ifc_name: &str) -> Option<Interface>;
add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>66     async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>;
on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>)67     async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>);
get_static_ips(&self) -> Vec<IpAddr>68     async fn get_static_ips(&self) -> Vec<IpAddr>;
set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>69     async fn set_router(&self, r: Arc<Mutex<Router>>) -> Result<()>;
70 }
71 
72 // ChunkFilter is a handler users can add to filter chunks.
73 // If the filter returns false, the packet will be dropped.
74 pub type ChunkFilterFn = Box<dyn (Fn(&(dyn Chunk + Send + Sync)) -> bool) + Send + Sync>;
75 
76 #[derive(Default)]
77 pub struct RouterInternal {
78     pub(crate) nat_type: Option<NatType>,          // read-only
79     pub(crate) ipv4net: IpNet,                     // read-only
80     pub(crate) parent: Option<Arc<Mutex<Router>>>, // read-only
81     pub(crate) nat: NetworkAddressTranslator,      // read-only
82     pub(crate) nics: HashMap<String, Arc<Mutex<dyn Nic + Send + Sync>>>, // read-only
83     pub(crate) chunk_filters: Vec<ChunkFilterFn>,  // requires mutex [x]
84     pub(crate) last_id: u8, // requires mutex [x], used to assign the last digit of IPv4 address
85 }
86 
87 // Router ...
88 #[derive(Default)]
89 pub struct Router {
90     name: String,                              // read-only
91     ipv4net: IpNet,                            // read-only
92     min_delay: Duration,                       // requires mutex [x]
93     max_jitter: Duration,                      // requires mutex [x]
94     queue: Arc<ChunkQueue>,                    // read-only
95     interfaces: Vec<Interface>,                // read-only
96     static_ips: Vec<IpAddr>,                   // read-only
97     static_local_ips: HashMap<String, IpAddr>, // read-only,
98     children: Vec<Arc<Mutex<Router>>>,         // read-only
99     done: Option<mpsc::Sender<()>>,            // requires mutex [x]
100     pub(crate) resolver: Arc<Mutex<Resolver>>, // read-only
101     push_ch: Option<mpsc::Sender<()>>,         // writer requires mutex
102     router_internal: Arc<Mutex<RouterInternal>>,
103 }
104 
105 #[async_trait]
106 impl Nic for Router {
get_interface(&self, ifc_name: &str) -> Option<Interface>107     async fn get_interface(&self, ifc_name: &str) -> Option<Interface> {
108         for ifc in &self.interfaces {
109             if ifc.name == ifc_name {
110                 return Some(ifc.clone());
111             }
112         }
113         None
114     }
115 
add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()>116     async fn add_addrs_to_interface(&mut self, ifc_name: &str, addrs: &[IpNet]) -> Result<()> {
117         for ifc in &mut self.interfaces {
118             if ifc.name == ifc_name {
119                 for addr in addrs {
120                     ifc.add_addr(*addr);
121                 }
122                 return Ok(());
123             }
124         }
125 
126         Err(Error::ErrNotFound)
127     }
128 
on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>)129     async fn on_inbound_chunk(&self, c: Box<dyn Chunk + Send + Sync>) {
130         let from_parent: Box<dyn Chunk + Send + Sync> = {
131             let router_internal = self.router_internal.lock().await;
132             match router_internal.nat.translate_inbound(&*c).await {
133                 Ok(from) => {
134                     if let Some(from) = from {
135                         from
136                     } else {
137                         return;
138                     }
139                 }
140                 Err(err) => {
141                     log::warn!("[{}] {}", self.name, err);
142                     return;
143                 }
144             }
145         };
146 
147         self.push(from_parent).await;
148     }
149 
get_static_ips(&self) -> Vec<IpAddr>150     async fn get_static_ips(&self) -> Vec<IpAddr> {
151         self.static_ips.clone()
152     }
153 
154     // caller must hold the mutex
set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()>155     async fn set_router(&self, parent: Arc<Mutex<Router>>) -> Result<()> {
156         {
157             let mut router_internal = self.router_internal.lock().await;
158             router_internal.parent = Some(Arc::clone(&parent));
159         }
160 
161         let parent_resolver = {
162             let p = parent.lock().await;
163             Arc::clone(&p.resolver)
164         };
165         {
166             let mut resolver = self.resolver.lock().await;
167             resolver.set_parent(parent_resolver);
168         }
169 
170         let mut mapped_ips = vec![];
171         let mut local_ips = vec![];
172 
173         // when this method is called, one or more IP address has already been assigned by
174         // the parent router.
175         if let Some(ifc) = self.get_interface("eth0").await {
176             for ifc_addr in ifc.addrs() {
177                 let ip = ifc_addr.addr();
178                 mapped_ips.push(ip);
179 
180                 if let Some(loc_ip) = self.static_local_ips.get(&ip.to_string()) {
181                     local_ips.push(*loc_ip);
182                 }
183             }
184         } else {
185             return Err(Error::ErrNoIpaddrEth0);
186         }
187 
188         // Set up NAT here
189         {
190             let mut router_internal = self.router_internal.lock().await;
191             if router_internal.nat_type.is_none() {
192                 router_internal.nat_type = Some(NatType {
193                     mapping_behavior: EndpointDependencyType::EndpointIndependent,
194                     filtering_behavior: EndpointDependencyType::EndpointAddrPortDependent,
195                     hair_pining: false,
196                     port_preservation: false,
197                     mapping_life_time: Duration::from_secs(30),
198                     ..Default::default()
199                 });
200             }
201 
202             router_internal.nat = NetworkAddressTranslator::new(NatConfig {
203                 name: self.name.clone(),
204                 nat_type: router_internal.nat_type.unwrap(),
205                 mapped_ips,
206                 local_ips,
207             })?;
208         }
209 
210         Ok(())
211     }
212 }
213 
214 impl Router {
new(config: RouterConfig) -> Result<Self>215     pub fn new(config: RouterConfig) -> Result<Self> {
216         let ipv4net: IpNet = config.cidr.parse()?;
217 
218         let queue_size = if config.queue_size > 0 {
219             config.queue_size
220         } else {
221             DEFAULT_ROUTER_QUEUE_SIZE
222         };
223 
224         // set up network interface, lo0
225         let mut lo0 = Interface::new(LO0_STR.to_owned(), vec![]);
226         if let Ok(ipnet) = Interface::convert(
227             SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
228             Some(SocketAddr::new(Ipv4Addr::new(255, 0, 0, 0).into(), 0)),
229         ) {
230             lo0.add_addr(ipnet);
231         }
232 
233         // set up network interface, eth0
234         let eth0 = Interface::new("eth0".to_owned(), vec![]);
235 
236         // local host name resolver
237         let resolver = Arc::new(Mutex::new(Resolver::new()));
238 
239         let name = if config.name.is_empty() {
240             assign_router_name()
241         } else {
242             config.name.clone()
243         };
244 
245         let mut static_ips = vec![];
246         let mut static_local_ips = HashMap::new();
247         for ip_str in &config.static_ips {
248             let ip_pair: Vec<&str> = ip_str.split('/').collect();
249             if let Ok(ip) = IpAddr::from_str(ip_pair[0]) {
250                 if ip_pair.len() > 1 {
251                     let loc_ip = IpAddr::from_str(ip_pair[1])?;
252                     if !ipv4net.contains(&loc_ip) {
253                         return Err(Error::ErrLocalIpBeyondStaticIpsSubset);
254                     }
255                     static_local_ips.insert(ip.to_string(), loc_ip);
256                 }
257                 static_ips.push(ip);
258             }
259         }
260         if !config.static_ip.is_empty() {
261             log::warn!("static_ip is deprecated. Use static_ips instead");
262             if let Ok(ip) = IpAddr::from_str(&config.static_ip) {
263                 static_ips.push(ip);
264             }
265         }
266 
267         let n_static_local = static_local_ips.len();
268         if n_static_local > 0 && n_static_local != static_ips.len() {
269             return Err(Error::ErrLocalIpNoStaticsIpsAssociated);
270         }
271 
272         let router_internal = RouterInternal {
273             nat_type: config.nat_type,
274             ipv4net,
275             nics: HashMap::new(),
276             ..Default::default()
277         };
278 
279         Ok(Router {
280             name,
281             ipv4net,
282             interfaces: vec![lo0, eth0],
283             static_ips,
284             static_local_ips,
285             resolver,
286             router_internal: Arc::new(Mutex::new(router_internal)),
287             queue: Arc::new(ChunkQueue::new(queue_size)),
288             min_delay: config.min_delay,
289             max_jitter: config.max_jitter,
290             ..Default::default()
291         })
292     }
293 
294     // caller must hold the mutex
get_interfaces(&self) -> &[Interface]295     pub(crate) fn get_interfaces(&self) -> &[Interface] {
296         &self.interfaces
297     }
298 
299     // Start ...
start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>>300     pub fn start(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
301         if self.done.is_some() {
302             return Box::pin(async move { Err(Error::ErrRouterAlreadyStarted) });
303         }
304 
305         let (done_tx, mut done_rx) = mpsc::channel(1);
306         let (push_ch_tx, mut push_ch_rx) = mpsc::channel(1);
307         self.done = Some(done_tx);
308         self.push_ch = Some(push_ch_tx);
309 
310         let router_internal = Arc::clone(&self.router_internal);
311         let queue = Arc::clone(&self.queue);
312         let max_jitter = self.max_jitter;
313         let min_delay = self.min_delay;
314         let name = self.name.clone();
315         let ipv4net = self.ipv4net;
316 
317         tokio::spawn(async move {
318             while let Ok(d) = Router::process_chunks(
319                 &name,
320                 ipv4net,
321                 max_jitter,
322                 min_delay,
323                 &queue,
324                 &router_internal,
325             )
326             .await
327             {
328                 if d == Duration::from_secs(0) {
329                     tokio::select! {
330                      _ = push_ch_rx.recv() =>{},
331                      _ = done_rx.recv() => break,
332                     }
333                 } else {
334                     let t = tokio::time::sleep(d);
335                     tokio::pin!(t);
336 
337                     tokio::select! {
338                     _ = t.as_mut() => {},
339                     _ = done_rx.recv() => break,
340                     }
341                 }
342             }
343         });
344 
345         let children = self.children.clone();
346         Box::pin(async move { Router::start_childen(children).await })
347     }
348 
349     // Stop ...
stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>>350     pub fn stop(&mut self) -> Pin<Box<dyn Future<Output = Result<()>>>> {
351         if self.done.is_none() {
352             return Box::pin(async move { Err(Error::ErrRouterAlreadyStopped) });
353         }
354         self.push_ch.take();
355         self.done.take();
356 
357         let children = self.children.clone();
358         Box::pin(async move { Router::stop_childen(children).await })
359     }
360 
start_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()>361     async fn start_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
362         for child in children {
363             let mut c = child.lock().await;
364             c.start().await?;
365         }
366 
367         Ok(())
368     }
369 
stop_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()>370     async fn stop_childen(children: Vec<Arc<Mutex<Router>>>) -> Result<()> {
371         for child in children {
372             let mut c = child.lock().await;
373             c.stop().await?;
374         }
375 
376         Ok(())
377     }
378 
379     // AddRouter adds a chile Router.
380     // after parent.add_router(child), also call child.set_router(parent) to set child's parent router
add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()>381     pub async fn add_router(&mut self, child: Arc<Mutex<Router>>) -> Result<()> {
382         // Router is a NIC. Add it as a NIC so that packets are routed to this child
383         // router.
384         let nic = Arc::clone(&child) as Arc<Mutex<dyn Nic + Send + Sync>>;
385         self.children.push(child);
386         self.add_net(nic).await
387     }
388 
389     // AddNet ...
390     // after router.add_net(nic), also call nic.set_router(router) to set nic's router
add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()>391     pub async fn add_net(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
392         let mut router_internal = self.router_internal.lock().await;
393         router_internal.add_nic(nic).await
394     }
395 
396     // AddHost adds a mapping of hostname and an IP address to the local resolver.
add_host(&mut self, host_name: String, ip_addr: String) -> Result<()>397     pub async fn add_host(&mut self, host_name: String, ip_addr: String) -> Result<()> {
398         let mut resolver = self.resolver.lock().await;
399         resolver.add_host(host_name, ip_addr)
400     }
401 
402     // AddChunkFilter adds a filter for chunks traversing this router.
403     // You may add more than one filter. The filters are called in the order of this method call.
404     // If a chunk is dropped by a filter, subsequent filter will not receive the chunk.
add_chunk_filter(&self, filter: ChunkFilterFn)405     pub async fn add_chunk_filter(&self, filter: ChunkFilterFn) {
406         let mut router_internal = self.router_internal.lock().await;
407         router_internal.chunk_filters.push(filter);
408     }
409 
push(&self, mut c: Box<dyn Chunk + Send + Sync>)410     pub(crate) async fn push(&self, mut c: Box<dyn Chunk + Send + Sync>) {
411         log::debug!("[{}] route {}", self.name, c);
412         if self.done.is_some() {
413             c.set_timestamp();
414 
415             if self.queue.push(c).await {
416                 if let Some(push_ch) = &self.push_ch {
417                     let _ = push_ch.try_send(());
418                 }
419             } else {
420                 log::warn!("[{}] queue was full. dropped a chunk", self.name);
421             }
422         } else {
423             log::warn!("router is done");
424         }
425     }
426 
process_chunks( name: &str, ipv4net: IpNet, max_jitter: Duration, min_delay: Duration, queue: &Arc<ChunkQueue>, router_internal: &Arc<Mutex<RouterInternal>>, ) -> Result<Duration>427     async fn process_chunks(
428         name: &str,
429         ipv4net: IpNet,
430         max_jitter: Duration,
431         min_delay: Duration,
432         queue: &Arc<ChunkQueue>,
433         router_internal: &Arc<Mutex<RouterInternal>>,
434     ) -> Result<Duration> {
435         // Introduce jitter by delaying the processing of chunks.
436         let mj = max_jitter.as_nanos() as u64;
437         if mj > 0 {
438             let jitter = Duration::from_nanos(rand::random::<u64>() % mj);
439             tokio::time::sleep(jitter).await;
440         }
441 
442         //      cut_off
443         //         v min delay
444         //         |<--->|
445         //  +------------:--
446         //  |OOOOOOXXXXX :   --> time
447         //  +------------:--
448         //  |<--->|     now
449         //    due
450 
451         let entered_at = SystemTime::now();
452         let cut_off = entered_at.sub(min_delay);
453 
454         // the next sleep duration
455         let mut d;
456 
457         loop {
458             d = Duration::from_secs(0);
459 
460             if let Some(c) = queue.peek().await {
461                 // check timestamp to find if the chunk is due
462                 if c.get_timestamp().duration_since(cut_off).is_ok() {
463                     // There is one or more chunk in the queue but none of them are due.
464                     // Calculate the next sleep duration here.
465                     let next_expire = c.get_timestamp().add(min_delay);
466                     if let Ok(diff) = next_expire.duration_since(entered_at) {
467                         d = diff;
468                         break;
469                     }
470                 }
471             } else {
472                 break; // no more chunk in the queue
473             }
474 
475             if let Some(c) = queue.pop().await {
476                 let ri = router_internal.lock().await;
477                 let mut blocked = false;
478                 for filter in &ri.chunk_filters {
479                     if !filter(&*c) {
480                         blocked = true;
481                         break;
482                     }
483                 }
484                 if blocked {
485                     continue; // discard
486                 }
487 
488                 let dst_ip = c.get_destination_ip();
489 
490                 // check if the destination is in our subnet
491                 if ipv4net.contains(&dst_ip) {
492                     // search for the destination NIC
493                     if let Some(nic) = ri.nics.get(&dst_ip.to_string()) {
494                         // found the NIC, forward the chunk to the NIC.
495                         // call to NIC must unlock mutex
496                         let ni = nic.lock().await;
497                         ni.on_inbound_chunk(c).await;
498                     } else {
499                         // NIC not found. drop it.
500                         log::debug!("[{}] {} unreachable", name, c);
501                     }
502                 } else {
503                     // the destination is outside of this subnet
504                     // is this WAN?
505                     if let Some(parent) = &ri.parent {
506                         // Pass it to the parent via NAT
507                         if let Some(to_parent) = ri.nat.translate_outbound(&*c).await? {
508                             // call to parent router mutex unlock mutex
509                             let p = parent.lock().await;
510                             p.push(to_parent).await;
511                         }
512                     } else {
513                         // this WAN. No route for this chunk
514                         log::debug!("[{}] no route found for {}", name, c);
515                     }
516                 }
517             } else {
518                 break; // no more chunk in the queue
519             }
520         }
521 
522         Ok(d)
523     }
524 }
525 
526 impl RouterInternal {
527     // caller must hold the mutex
add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()>528     pub(crate) async fn add_nic(&mut self, nic: Arc<Mutex<dyn Nic + Send + Sync>>) -> Result<()> {
529         let mut ips = {
530             let ni = nic.lock().await;
531             ni.get_static_ips().await
532         };
533 
534         if ips.is_empty() {
535             // assign an IP address
536             let ip = self.assign_ip_address()?;
537             log::debug!("assign_ip_address: {}", ip);
538             ips.push(ip);
539         }
540 
541         let mut ipnets = vec![];
542         for ip in &ips {
543             if !self.ipv4net.contains(ip) {
544                 return Err(Error::ErrStaticIpIsBeyondSubnet);
545             }
546             self.nics.insert(ip.to_string(), Arc::clone(&nic));
547             ipnets.push(IpNet::from_str(&format!(
548                 "{}/{}",
549                 ip,
550                 self.ipv4net.prefix_len()
551             ))?);
552         }
553 
554         {
555             let mut ni = nic.lock().await;
556             let _ = ni.add_addrs_to_interface("eth0", &ipnets).await;
557         }
558 
559         Ok(())
560     }
561 
562     // caller should hold the mutex
assign_ip_address(&mut self) -> Result<IpAddr>563     fn assign_ip_address(&mut self) -> Result<IpAddr> {
564         // See: https://stackoverflow.com/questions/14915188/ip-address-ending-with-zero
565 
566         if self.last_id == 0xfe {
567             return Err(Error::ErrAddressSpaceExhausted);
568         }
569 
570         self.last_id += 1;
571         match self.ipv4net.addr() {
572             IpAddr::V4(ipv4) => {
573                 let mut ip = ipv4.octets();
574                 ip[3] = self.last_id;
575                 Ok(IpAddr::V4(Ipv4Addr::from(ip)))
576             }
577             IpAddr::V6(ipv6) => {
578                 let mut ip = ipv6.octets();
579                 ip[15] += self.last_id;
580                 Ok(IpAddr::V6(Ipv6Addr::from(ip)))
581             }
582         }
583     }
584 }
585