1 #[cfg(test)] 2 mod allocation_manager_test; 3 4 use super::*; 5 use crate::error::*; 6 use crate::relay::*; 7 8 use futures::future; 9 use std::collections::HashMap; 10 use stun::textattrs::Username; 11 use tokio::sync::mpsc; 12 use util::Conn; 13 14 // ManagerConfig a bag of config params for Manager. 15 pub struct ManagerConfig { 16 pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>, 17 pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>, 18 } 19 20 // Manager is used to hold active allocations 21 pub struct Manager { 22 allocations: AllocationMap, 23 reservations: Arc<Mutex<HashMap<String, u16>>>, 24 relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>, 25 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>, 26 } 27 28 impl Manager { 29 // creates a new instance of Manager. new(config: ManagerConfig) -> Self30 pub fn new(config: ManagerConfig) -> Self { 31 Manager { 32 allocations: Arc::new(Mutex::new(HashMap::new())), 33 reservations: Arc::new(Mutex::new(HashMap::new())), 34 relay_addr_generator: config.relay_addr_generator, 35 alloc_close_notify: config.alloc_close_notify, 36 } 37 } 38 39 // Close closes the manager and closes all allocations it manages close(&self) -> Result<()>40 pub async fn close(&self) -> Result<()> { 41 let allocations = self.allocations.lock().await; 42 for a in allocations.values() { 43 a.close().await?; 44 } 45 Ok(()) 46 } 47 48 // Returns the information about the all [`Allocation`]s associated with 49 // the specified [`FiveTuple`]s. get_allocations_info( &self, five_tuples: Option<Vec<FiveTuple>>, ) -> HashMap<FiveTuple, AllocationInfo>50 pub async fn get_allocations_info( 51 &self, 52 five_tuples: Option<Vec<FiveTuple>>, 53 ) -> HashMap<FiveTuple, AllocationInfo> { 54 let mut infos = HashMap::new(); 55 56 let guarded = self.allocations.lock().await; 57 58 guarded.iter().for_each(|(five_tuple, alloc)| { 59 if five_tuples.is_none() || five_tuples.as_ref().unwrap().contains(five_tuple) { 60 infos.insert( 61 *five_tuple, 62 AllocationInfo::new( 63 *five_tuple, 64 alloc.username.text.clone(), 65 #[cfg(feature = "metrics")] 66 alloc.relayed_bytes.load(Ordering::Acquire), 67 ), 68 ); 69 } 70 }); 71 72 infos 73 } 74 75 // get_allocation fetches the allocation matching the passed FiveTuple get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>>76 pub async fn get_allocation(&self, five_tuple: &FiveTuple) -> Option<Arc<Allocation>> { 77 let allocations = self.allocations.lock().await; 78 allocations.get(five_tuple).map(Arc::clone) 79 } 80 81 // create_allocation creates a new allocation and starts relaying create_allocation( &self, five_tuple: FiveTuple, turn_socket: Arc<dyn Conn + Send + Sync>, requested_port: u16, lifetime: Duration, username: Username, ) -> Result<Arc<Allocation>>82 pub async fn create_allocation( 83 &self, 84 five_tuple: FiveTuple, 85 turn_socket: Arc<dyn Conn + Send + Sync>, 86 requested_port: u16, 87 lifetime: Duration, 88 username: Username, 89 ) -> Result<Arc<Allocation>> { 90 if lifetime == Duration::from_secs(0) { 91 return Err(Error::ErrLifetimeZero); 92 } 93 94 if self.get_allocation(&five_tuple).await.is_some() { 95 return Err(Error::ErrDupeFiveTuple); 96 } 97 98 let (relay_socket, relay_addr) = self 99 .relay_addr_generator 100 .allocate_conn(true, requested_port) 101 .await?; 102 let mut a = Allocation::new( 103 turn_socket, 104 relay_socket, 105 relay_addr, 106 five_tuple, 107 username, 108 self.alloc_close_notify.clone(), 109 ); 110 a.allocations = Some(Arc::clone(&self.allocations)); 111 112 log::debug!("listening on relay addr: {:?}", a.relay_addr); 113 a.start(lifetime).await; 114 a.packet_handler().await; 115 116 let a = Arc::new(a); 117 { 118 let mut allocations = self.allocations.lock().await; 119 allocations.insert(five_tuple, Arc::clone(&a)); 120 } 121 122 Ok(a) 123 } 124 125 // delete_allocation removes an allocation delete_allocation(&self, five_tuple: &FiveTuple)126 pub async fn delete_allocation(&self, five_tuple: &FiveTuple) { 127 let allocation = self.allocations.lock().await.remove(five_tuple); 128 129 if let Some(a) = allocation { 130 if let Err(err) = a.close().await { 131 log::error!("Failed to close allocation: {}", err); 132 } 133 } 134 } 135 136 /// Deletes the [`Allocation`]s according to the specified `username`. delete_allocations_by_username(&self, name: &str)137 pub async fn delete_allocations_by_username(&self, name: &str) { 138 let to_delete = { 139 let mut allocations = self.allocations.lock().await; 140 141 let mut to_delete = Vec::new(); 142 143 // TODO(logist322): Use `.drain_filter()` once stabilized. 144 allocations.retain(|_, allocation| { 145 let match_name = allocation.username.text == name; 146 147 if match_name { 148 to_delete.push(Arc::clone(allocation)); 149 } 150 151 !match_name 152 }); 153 154 to_delete 155 }; 156 157 future::join_all(to_delete.iter().map(|a| async move { 158 if let Err(err) = a.close().await { 159 log::error!("Failed to close allocation: {}", err); 160 } 161 })) 162 .await; 163 } 164 165 // create_reservation stores the reservation for the token+port create_reservation(&self, reservation_token: String, port: u16)166 pub async fn create_reservation(&self, reservation_token: String, port: u16) { 167 let reservations = Arc::clone(&self.reservations); 168 let reservation_token2 = reservation_token.clone(); 169 170 tokio::spawn(async move { 171 let sleep = tokio::time::sleep(Duration::from_secs(30)); 172 tokio::pin!(sleep); 173 tokio::select! { 174 _ = &mut sleep => { 175 let mut reservations = reservations.lock().await; 176 reservations.remove(&reservation_token2); 177 }, 178 } 179 }); 180 181 let mut reservations = self.reservations.lock().await; 182 reservations.insert(reservation_token, port); 183 } 184 185 // get_reservation returns the port for a given reservation if it exists get_reservation(&self, reservation_token: &str) -> Option<u16>186 pub async fn get_reservation(&self, reservation_token: &str) -> Option<u16> { 187 let reservations = self.reservations.lock().await; 188 reservations.get(reservation_token).copied() 189 } 190 191 // get_random_even_port returns a random un-allocated udp4 port get_random_even_port(&self) -> Result<u16>192 pub async fn get_random_even_port(&self) -> Result<u16> { 193 let (_, addr) = self.relay_addr_generator.allocate_conn(true, 0).await?; 194 Ok(addr.port()) 195 } 196 } 197