1 #[cfg(test)]
2 mod allocation_manager_test;
3 
4 use super::*;
5 use crate::error::*;
6 use crate::relay::*;
7 
8 use futures::future;
9 use std::collections::HashMap;
10 use stun::textattrs::Username;
11 use util::Conn;
12 
13 // ManagerConfig a bag of config params for Manager.
14 pub struct ManagerConfig {
15     pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
16 }
17 
18 // Manager is used to hold active allocations
19 pub struct Manager {
20     allocations: AllocationMap,
21     reservations: Arc<Mutex<HashMap<String, u16>>>,
22     relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
23 }
24 
25 impl Manager {
26     // creates a new instance of Manager.
27     pub fn new(config: ManagerConfig) -> Self {
28         Manager {
29             allocations: Arc::new(Mutex::new(HashMap::new())),
30             reservations: Arc::new(Mutex::new(HashMap::new())),
31             relay_addr_generator: config.relay_addr_generator,
32         }
33     }
34 
35     // Close closes the manager and closes all allocations it manages
36     pub async fn close(&self) -> Result<()> {
37         let allocations = self.allocations.lock().await;
38         for a in allocations.values() {
39             a.close().await?;
40         }
41         Ok(())
42     }
43 
44     // Returns the information about the all [`Allocation`]s associated with
45     // the specified [`FiveTuple`]s.
46     pub async fn get_allocations_info(
47         &self,
48         five_tuples: Option<Vec<FiveTuple>>,
49     ) -> HashMap<FiveTuple, AllocationInfo> {
50         let mut infos = HashMap::new();
51 
52         let guarded = self.allocations.lock().await;
53 
54         guarded.iter().for_each(|(five_tuple, alloc)| {
55             if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) {
56                 infos.insert(
57                     *five_tuple,
58                     AllocationInfo::new(
59                         *five_tuple,
60                         alloc.username.text.clone(),
61                         #[cfg(feature = "metrics")]
62                         alloc.relayed_bytes.load(Ordering::Acquire),
63                     ),
64                 );
65             }
66         });
67 
68         infos
69     }
70 
71     // get_allocation fetches the allocation matching the passed FiveTuple
72     pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> {
73         let allocations = self.allocations.lock().await;
74         allocations.get(five_tuple).map(Arc::clone)
75     }
76 
77     // create_allocation creates a new allocation and starts relaying
78     pub async fn create_allocation(
79         &self,
80         five_tuple: FiveTuple,
81         turn_socket: Arc<dyn Conn + Send + Sync>,
82         requested_port: u16,
83         lifetime: Duration,
84         username: Username,
85     ) -> Result<Arc<Allocation>> {
86         if lifetime == Duration::from_secs(0) {
87             return Err(Error::ErrLifetimeZero);
88         }
89 
90         if self.get_allocation(&five_tuple).await.is_some() {
91             return Err(Error::ErrDupeFiveTuple);
92         }
93 
94         let (relay_socket, relay_addr) = self
95             .relay_addr_generator
96             .allocate_conn(true, requested_port)
97             .await?;
98         let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple, username);
99         a.allocations = Some(Arc::clone(&self.allocations));
100 
101         log::debug!("listening on relay addr: {:?}", a.relay_addr);
102         a.start(lifetime).await;
103         a.packet_handler().await;
104 
105         let a = Arc::new(a);
106         {
107             let mut allocations = self.allocations.lock().await;
108             allocations.insert(five_tuple, Arc::clone(&a));
109         }
110 
111         Ok(a)
112     }
113 
114     // delete_allocation removes an allocation
115     pub async fn delete_allocation(&self, five_tuple: &FiveTuple) {
116         let allocation = self.allocations.lock().await.remove(five_tuple);
117 
118         if let Some(a) = allocation {
119             if let Err(err) = a.close().await {
120                 log::error!("Failed to close allocation: {}", err);
121             }
122         }
123     }
124 
125     /// Deletes the [`Allocation`]s according to the specified `username`.
126     pub async fn delete_allocations_by_username(&self, name: &str) {
127         let to_delete = {
128             let mut allocations = self.allocations.lock().await;
129 
130             let mut to_delete = Vec::new();
131 
132             // TODO(logist322): Use `.drain_filter()` once stabilized.
133             allocations.retain(|_, allocation| {
134                 let match_name = allocation.username.text == name;
135 
136                 if match_name {
137                     to_delete.push(Arc::clone(allocation));
138                 }
139 
140                 !match_name
141             });
142 
143             to_delete
144         };
145 
146         future::join_all(to_delete.iter().map(|a| async move {
147             if let Err(err) = a.close().await {
148                 log::error!("Failed to close allocation: {}", err);
149             }
150         }))
151         .await;
152     }
153 
154     // create_reservation stores the reservation for the token+port
155     pub async fn create_reservation(&self, reservation_token: String, port: u16) {
156         let reservations = Arc::clone(&self.reservations);
157         let reservation_token2 = reservation_token.clone();
158 
159         tokio::spawn(async move {
160             let sleep = tokio::time::sleep(Duration::from_secs(30));
161             tokio::pin!(sleep);
162             tokio::select! {
163                 _ = &mut sleep => {
164                     let mut reservations = reservations.lock().await;
165                     reservations.remove(&reservation_token2);
166                 },
167             }
168         });
169 
170         let mut reservations = self.reservations.lock().await;
171         reservations.insert(reservation_token, port);
172     }
173 
174     // get_reservation returns the port for a given reservation if it exists
175     pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> {
176         let reservations = self.reservations.lock().await;
177         reservations.get(reservation_token).copied()
178     }
179 
180     // get_random_even_port returns a random un-allocated udp4 port
181     pub async fn get_random_even_port(&self) -> Result<u16> {
182         let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?;
183         Ok(addr.port())
184     }
185 }
186