1 #[cfg(test)] 2 mod allocation_manager_test; 3 4 use super::*; 5 use crate::error::*; 6 use crate::relay::*; 7 8 use futures::future; 9 use std::collections::HashMap; 10 use stun::textattrs::Username; 11 use util::Conn; 12 13 // ManagerConfig a bag of config params for Manager. 14 pub struct ManagerConfig { 15 pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>, 16 } 17 18 // Manager is used to hold active allocations 19 pub struct Manager { 20 allocations: AllocationMap, 21 reservations: Arc<Mutex<HashMap<String, u16>>>, 22 relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>, 23 } 24 25 impl Manager { 26 // creates a new instance of Manager. 27 pub fn new(config: ManagerConfig) -> Self { 28 Manager { 29 allocations: Arc::new(Mutex::new(HashMap::new())), 30 reservations: Arc::new(Mutex::new(HashMap::new())), 31 relay_addr_generator: config.relay_addr_generator, 32 } 33 } 34 35 // Close closes the manager and closes all allocations it manages 36 pub async fn close(&self) -> Result<()> { 37 let allocations = self.allocations.lock().await; 38 for a in allocations.values() { 39 a.close().await?; 40 } 41 Ok(()) 42 } 43 44 // Returns the information about the all [`Allocation`]s associated with 45 // the specified [`FiveTuple`]s. 46 pub async fn get_allocations_info( 47 &self, 48 five_tuples: Option<Vec<FiveTuple>>, 49 ) -> HashMap<FiveTuple, AllocationInfo> { 50 let mut infos = HashMap::new(); 51 52 let guarded = self.allocations.lock().await; 53 54 guarded.iter().for_each(|(five_tuple, alloc)| { 55 if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) { 56 infos.insert( 57 *five_tuple, 58 AllocationInfo::new( 59 *five_tuple, 60 alloc.username.text.clone(), 61 #[cfg(feature = "metrics")] 62 alloc.relayed_bytes.load(Ordering::Acquire), 63 ), 64 ); 65 } 66 }); 67 68 infos 69 } 70 71 // get_allocation fetches the allocation matching the passed FiveTuple 72 pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> { 73 let allocations = self.allocations.lock().await; 74 allocations.get(five_tuple).map(Arc::clone) 75 } 76 77 // create_allocation creates a new allocation and starts relaying 78 pub async fn create_allocation( 79 &self, 80 five_tuple: FiveTuple, 81 turn_socket: Arc<dyn Conn + Send + Sync>, 82 requested_port: u16, 83 lifetime: Duration, 84 username: Username, 85 ) -> Result<Arc<Allocation>> { 86 if lifetime == Duration::from_secs(0) { 87 return Err(Error::ErrLifetimeZero); 88 } 89 90 if self.get_allocation(&five_tuple).await.is_some() { 91 return Err(Error::ErrDupeFiveTuple); 92 } 93 94 let (relay_socket, relay_addr) = self 95 .relay_addr_generator 96 .allocate_conn(true, requested_port) 97 .await?; 98 let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple, username); 99 a.allocations = Some(Arc::clone(&self.allocations)); 100 101 log::debug!("listening on relay addr: {:?}", a.relay_addr); 102 a.start(lifetime).await; 103 a.packet_handler().await; 104 105 let a = Arc::new(a); 106 { 107 let mut allocations = self.allocations.lock().await; 108 allocations.insert(five_tuple, Arc::clone(&a)); 109 } 110 111 Ok(a) 112 } 113 114 // delete_allocation removes an allocation 115 pub async fn delete_allocation(&self, five_tuple: &FiveTuple) { 116 let allocation = self.allocations.lock().await.remove(five_tuple); 117 118 if let Some(a) = allocation { 119 if let Err(err) = a.close().await { 120 log::error!("Failed to close allocation: {}", err); 121 } 122 } 123 } 124 125 /// Deletes the [`Allocation`]s according to the specified `username`. 126 pub async fn delete_allocations_by_username(&self, name: &str) { 127 let to_delete = { 128 let mut allocations = self.allocations.lock().await; 129 130 let mut to_delete = Vec::new(); 131 132 // TODO(logist322): Use `.drain_filter()` once stabilized. 133 allocations.retain(|_, allocation| { 134 let match_name = allocation.username.text == name; 135 136 if match_name { 137 to_delete.push(Arc::clone(allocation)); 138 } 139 140 !match_name 141 }); 142 143 to_delete 144 }; 145 146 future::join_all(to_delete.iter().map(|a| async move { 147 if let Err(err) = a.close().await { 148 log::error!("Failed to close allocation: {}", err); 149 } 150 })) 151 .await; 152 } 153 154 // create_reservation stores the reservation for the token+port 155 pub async fn create_reservation(&self, reservation_token: String, port: u16) { 156 let reservations = Arc::clone(&self.reservations); 157 let reservation_token2 = reservation_token.clone(); 158 159 tokio::spawn(async move { 160 let sleep = tokio::time::sleep(Duration::from_secs(30)); 161 tokio::pin!(sleep); 162 tokio::select! { 163 _ = &mut sleep => { 164 let mut reservations = reservations.lock().await; 165 reservations.remove(&reservation_token2); 166 }, 167 } 168 }); 169 170 let mut reservations = self.reservations.lock().await; 171 reservations.insert(reservation_token, port); 172 } 173 174 // get_reservation returns the port for a given reservation if it exists 175 pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> { 176 let reservations = self.reservations.lock().await; 177 reservations.get(reservation_token).copied() 178 } 179 180 // get_random_even_port returns a random un-allocated udp4 port 181 pub async fn get_random_even_port(&self) -> Result<u16> { 182 let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?; 183 Ok(addr.port()) 184 } 185 } 186