1 #[cfg(test)]
2 mod allocation_manager_test;
3 
4 use super::*;
5 use crate::error::*;
6 use crate::relay::*;
7 
8 use futures::future;
9 use std::collections::HashMap;
10 use stun::textattrs::Username;
11 use tokio::sync::mpsc;
12 use util::Conn;
13 
14 // ManagerConfig a bag of config params for Manager.
15 pub struct ManagerConfig {
16     pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
17     pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
18 }
19 
20 // Manager is used to hold active allocations
21 pub struct Manager {
22     allocations: AllocationMap,
23     reservations: Arc<Mutex<HashMap<String, u16>>>,
24     relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
25     alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
26 }
27 
28 impl Manager {
29     // creates a new instance of Manager.
new(config: ManagerConfig) -> Self30     pub fn new(config: ManagerConfig) -> Self {
31         Manager {
32             allocations: Arc::new(Mutex::new(HashMap::new())),
33             reservations: Arc::new(Mutex::new(HashMap::new())),
34             relay_addr_generator: config.relay_addr_generator,
35             alloc_close_notify: config.alloc_close_notify,
36         }
37     }
38 
39     // Close closes the manager and closes all allocations it manages
close(&self) -> Result<()>40     pub async fn close(&self) -> Result<()> {
41         let allocations = self.allocations.lock().await;
42         for a in allocations.values() {
43             a.close().await?;
44         }
45         Ok(())
46     }
47 
48     // Returns the information about the all [`Allocation`]s associated with
49     // the specified [`FiveTuple`]s.
get_allocations_info( &self, five_tuples: Option<Vec<FiveTuple>>, ) -> HashMap<FiveTuple, AllocationInfo>50     pub async fn get_allocations_info(
51         &self,
52         five_tuples: Option<Vec<FiveTuple>>,
53     ) -> HashMap<FiveTuple, AllocationInfo> {
54         let mut infos = HashMap::new();
55 
56         let guarded = self.allocations.lock().await;
57 
58         guarded.iter().for_each(|(five_tuple, alloc)| {
59             if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) {
60                 infos.insert(
61                     *five_tuple,
62                     AllocationInfo::new(
63                         *five_tuple,
64                         alloc.username.text.clone(),
65                         #[cfg(feature = "metrics")]
66                         alloc.relayed_bytes.load(Ordering::Acquire),
67                     ),
68                 );
69             }
70         });
71 
72         infos
73     }
74 
75     // get_allocation fetches the allocation matching the passed FiveTuple
get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>>76     pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
77         let allocations = self.allocations.lock().await;
78         allocations.get(five_tuple).map(Arc::clone)
79     }
80 
81     // create_allocation creates a new allocation and starts relaying
create_allocation( &self, five_tuple: FiveTuple, turn_socket: Arc<dyn Conn + Send + Sync>, requested_port: u16, lifetime: Duration, username: Username, ) -> Result<Arc<Allocation>>82     pub async fn create_allocation(
83         &self,
84         five_tuple: FiveTuple,
85         turn_socket: Arc<dyn Conn + Send + Sync>,
86         requested_port: u16,
87         lifetime: Duration,
88         username: Username,
89     ) -> Result<Arc<Allocation>> {
90         if lifetime == Duration::from_secs(0) {
91             return Err(Error::ErrLifetimeZero);
92         }
93 
94         if self.get_allocation(&five_tuple).await.is_some() {
95             return Err(Error::ErrDupeFiveTuple);
96         }
97 
98         let (relay_socket, relay_addr) = self
99             .relay_addr_generator
100             .allocate_conn(true, requested_port)
101             .await?;
102         let mut a = Allocation::new(
103             turn_socket,
104             relay_socket,
105             relay_addr,
106             five_tuple,
107             username,
108             self.alloc_close_notify.clone(),
109         );
110         a.allocations = Some(Arc::clone(&self.allocations));
111 
112         log::debug!("listening on relay addr: {:?}", a.relay_addr);
113         a.start(lifetime).await;
114         a.packet_handler().await;
115 
116         let a = Arc::new(a);
117         {
118             let mut allocations = self.allocations.lock().await;
119             allocations.insert(five_tuple, Arc::clone(&a));
120         }
121 
122         Ok(a)
123     }
124 
125     // delete_allocation removes an allocation
delete_allocation(&self, five_tuple: &FiveTuple)126     pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
127         let allocation = self.allocations.lock().await.remove(five_tuple);
128 
129         if let Some(a) = allocation {
130             if let Err(err) = a.close().await {
131                 log::error!("Failed to close allocation: {}", err);
132             }
133         }
134     }
135 
136     /// Deletes the [`Allocation`]s according to the specified `username`.
delete_allocations_by_username(&self, name: &str)137     pub async fn delete_allocations_by_username(&self, name: &str) {
138         let to_delete = {
139             let mut allocations = self.allocations.lock().await;
140 
141             let mut to_delete = Vec::new();
142 
143             // TODO(logist322): Use `.drain_filter()` once stabilized.
144             allocations.retain(|_, allocation| {
145                 let match_name = allocation.username.text == name;
146 
147                 if match_name {
148                     to_delete.push(Arc::clone(allocation));
149                 }
150 
151                 !match_name
152             });
153 
154             to_delete
155         };
156 
157         future::join_all(to_delete.iter().map(|a| async move {
158             if let Err(err) = a.close().await {
159                 log::error!("Failed to close allocation: {}", err);
160             }
161         }))
162         .await;
163     }
164 
165     // create_reservation stores the reservation for the token+port
create_reservation(&self, reservation_token: String, port: u16)166     pub async fn create_reservation(&self, reservation_token: String, port: u16) {
167         let reservations = Arc::clone(&self.reservations);
168         let reservation_token2 = reservation_token.clone();
169 
170         tokio::spawn(async move {
171             let sleep = tokio::time::sleep(Duration::from_secs(30));
172             tokio::pin!(sleep);
173             tokio::select! {
174                 _ = &mut sleep => {
175                     let mut reservations = reservations.lock().await;
176                     reservations.remove(&reservation_token2);
177                 },
178             }
179         });
180 
181         let mut reservations = self.reservations.lock().await;
182         reservations.insert(reservation_token, port);
183     }
184 
185     // get_reservation returns the port for a given reservation if it exists
get_reservation(&self, reservation_token: &str) -> Option<u16>186     pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
187         let reservations = self.reservations.lock().await;
188         reservations.get(reservation_token).copied()
189     }
190 
191     // get_random_even_port returns a random un-allocated udp4 port
get_random_even_port(&self) -> Result<u16>192     pub async fn get_random_even_port(&self) -> Result<u16> {
193         let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
194         Ok(addr.port())
195     }
196 }
197