xref: /webrtc/turn/src/allocation/mod.rs (revision 9ea7b2ac)
1 #[cfg(test)]
2 mod allocation_test;
3 
4 pub mod allocation_manager;
5 pub mod channel_bind;
6 pub mod five_tuple;
7 pub mod permission;
8 
9 use crate::error::*;
10 use crate::proto::{chandata::*, channum::*, data::*, peeraddr::*, *};
11 use channel_bind::*;
12 use five_tuple::*;
13 use permission::*;
14 use stun::{agent::*, message::*, textattrs::Username};
15 use util::sync::Mutex as SyncMutex;
16 
17 use util::Conn;
18 
19 use std::sync::atomic::AtomicUsize;
20 use std::{
21     collections::HashMap,
22     marker::{Send, Sync},
23     net::SocketAddr,
24     sync::{atomic::AtomicBool, atomic::Ordering, Arc},
25 };
26 use tokio::{
27     sync::{
28         mpsc,
29         oneshot::{self, Sender},
30         Mutex,
31     },
32     time::{Duration, Instant},
33 };
34 
35 const RTP_MTU: usize = 1500;
36 
37 pub type AllocationMap = Arc<Mutex<HashMap<FiveTuple, Arc<Allocation>>>>;
38 
39 /// Information about an [`Allocation`].
40 #[derive(Debug, Clone)]
41 pub struct AllocationInfo {
42     /// [`FiveTuple`] of this [`Allocation`].
43     pub five_tuple: FiveTuple,
44 
45     /// Username of this [`Allocation`].
46     pub username: String,
47 
48     /// Relayed bytes with this [`Allocation`].
49     #[cfg(feature = "metrics")]
50     pub relayed_bytes: usize,
51 }
52 
53 impl AllocationInfo {
54     // Creates a new `AllocationInfo`
new( five_tuple: FiveTuple, username: String, #[cfg(feature = "metrics")] relayed_bytes: usize, ) -> Self55     pub fn new(
56         five_tuple: FiveTuple,
57         username: String,
58         #[cfg(feature = "metrics")] relayed_bytes: usize,
59     ) -> Self {
60         Self {
61             five_tuple,
62             username,
63             #[cfg(feature = "metrics")]
64             relayed_bytes,
65         }
66     }
67 }
68 
69 // Allocation is tied to a FiveTuple and relays traffic
70 // use create_allocation and get_allocation to operate
71 pub struct Allocation {
72     protocol: Protocol,
73     turn_socket: Arc<dyn Conn + Send + Sync>,
74     pub(crate) relay_addr: SocketAddr,
75     pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
76     five_tuple: FiveTuple,
77     username: Username,
78     permissions: Arc<Mutex<HashMap<String, Permission>>>,
79     channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
80     pub(crate) allocations: Option<AllocationMap>,
81     reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
82     timer_expired: Arc<AtomicBool>,
83     closed: AtomicBool, // Option<mpsc::Receiver<()>>,
84     pub(crate) relayed_bytes: AtomicUsize,
85     drop_tx: Option<Sender<u32>>,
86     alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
87 }
88 
addr2ipfingerprint(addr: &SocketAddr) -> String89 fn addr2ipfingerprint(addr: &SocketAddr) -> String {
90     addr.ip().to_string()
91 }
92 
93 impl Allocation {
94     // creates a new instance of NewAllocation.
new( turn_socket: Arc<dyn Conn + Send + Sync>, relay_socket: Arc<dyn Conn + Send + Sync>, relay_addr: SocketAddr, five_tuple: FiveTuple, username: Username, alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>, ) -> Self95     pub fn new(
96         turn_socket: Arc<dyn Conn + Send + Sync>,
97         relay_socket: Arc<dyn Conn + Send + Sync>,
98         relay_addr: SocketAddr,
99         five_tuple: FiveTuple,
100         username: Username,
101         alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
102     ) -> Self {
103         Allocation {
104             protocol: PROTO_UDP,
105             turn_socket,
106             relay_addr,
107             relay_socket,
108             five_tuple,
109             username,
110             permissions: Arc::new(Mutex::new(HashMap::new())),
111             channel_bindings: Arc::new(Mutex::new(HashMap::new())),
112             allocations: None,
113             reset_tx: SyncMutex::new(None),
114             timer_expired: Arc::new(AtomicBool::new(false)),
115             closed: AtomicBool::new(false),
116             relayed_bytes: Default::default(),
117             drop_tx: None,
118             alloc_close_notify,
119         }
120     }
121 
122     // has_permission gets the Permission from the allocation
has_permission(&self, addr: &SocketAddr) -> bool123     pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
124         let permissions = self.permissions.lock().await;
125         permissions.get(&addr2ipfingerprint(addr)).is_some()
126     }
127 
128     // add_permission adds a new permission to the allocation
add_permission(&self, mut p: Permission)129     pub async fn add_permission(&self, mut p: Permission) {
130         let fingerprint = addr2ipfingerprint(&p.addr);
131 
132         {
133             let permissions = self.permissions.lock().await;
134             if let Some(existed_permission) = permissions.get(&fingerprint) {
135                 existed_permission.refresh(PERMISSION_TIMEOUT).await;
136                 return;
137             }
138         }
139 
140         p.permissions = Some(Arc::clone(&self.permissions));
141         p.start(PERMISSION_TIMEOUT).await;
142 
143         {
144             let mut permissions = self.permissions.lock().await;
145             permissions.insert(fingerprint, p);
146         }
147     }
148 
149     // remove_permission removes the net.Addr's fingerprint from the allocation's permissions
remove_permission(&self, addr: &SocketAddr) -> bool150     pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
151         let mut permissions = self.permissions.lock().await;
152         permissions.remove(&addr2ipfingerprint(addr)).is_some()
153     }
154 
155     // add_channel_bind adds a new ChannelBind to the allocation, it also updates the
156     // permissions needed for this ChannelBind
add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()>157     pub async fn add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()> {
158         {
159             if let Some(addr) = self.get_channel_addr(&c.number).await {
160                 if addr != c.peer {
161                     return Err(Error::ErrSameChannelDifferentPeer);
162                 }
163             }
164 
165             if let Some(number) = self.get_channel_number(&c.peer).await {
166                 if number != c.number {
167                     return Err(Error::ErrSameChannelDifferentPeer);
168                 }
169             }
170         }
171 
172         {
173             let channel_bindings = self.channel_bindings.lock().await;
174             if let Some(cb) = channel_bindings.get(&c.number) {
175                 cb.refresh(lifetime).await;
176 
177                 // Channel binds also refresh permissions.
178                 self.add_permission(Permission::new(cb.peer)).await;
179 
180                 return Ok(());
181             }
182         }
183 
184         let peer = c.peer;
185 
186         // Add or refresh this channel.
187         c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
188         c.start(lifetime).await;
189 
190         {
191             let mut channel_bindings = self.channel_bindings.lock().await;
192             channel_bindings.insert(c.number, c);
193         }
194 
195         // Channel binds also refresh permissions.
196         self.add_permission(Permission::new(peer)).await;
197 
198         Ok(())
199     }
200 
201     // remove_channel_bind removes the ChannelBind from this allocation by id
remove_channel_bind(&self, number: ChannelNumber) -> bool202     pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
203         let mut channel_bindings = self.channel_bindings.lock().await;
204         channel_bindings.remove(&number).is_some()
205     }
206 
207     // get_channel_addr gets the ChannelBind's addr
get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr>208     pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
209         let channel_bindings = self.channel_bindings.lock().await;
210         channel_bindings.get(number).map(|cb| cb.peer)
211     }
212 
213     // GetChannelByAddr gets the ChannelBind's number from this allocation by net.Addr
get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber>214     pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
215         let channel_bindings = self.channel_bindings.lock().await;
216         for cb in channel_bindings.values() {
217             if cb.peer == *addr {
218                 return Some(cb.number);
219             }
220         }
221         None
222     }
223 
224     // Close closes the allocation
close(&self) -> Result<()>225     pub async fn close(&self) -> Result<()> {
226         if self.closed.load(Ordering::Acquire) {
227             return Err(Error::ErrClosed);
228         }
229 
230         self.closed.store(true, Ordering::Release);
231         self.stop();
232 
233         {
234             let mut permissions = self.permissions.lock().await;
235             for p in permissions.values_mut() {
236                 p.stop();
237             }
238         }
239 
240         {
241             let mut channel_bindings = self.channel_bindings.lock().await;
242             for c in channel_bindings.values_mut() {
243                 c.stop();
244             }
245         }
246 
247         log::trace!("allocation with {} closed!", self.five_tuple);
248 
249         let _ = self.turn_socket.close().await;
250         let _ = self.relay_socket.close().await;
251 
252         if let Some(notify_tx) = &self.alloc_close_notify {
253             let _ = notify_tx
254                 .send(AllocationInfo {
255                     five_tuple: self.five_tuple,
256                     username: self.username.text.clone(),
257                     #[cfg(feature = "metrics")]
258                     relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
259                 })
260                 .await;
261         }
262 
263         Ok(())
264     }
265 
start(&self, lifetime: Duration)266     pub async fn start(&self, lifetime: Duration) {
267         let (reset_tx, mut reset_rx) = mpsc::channel(1);
268         self.reset_tx.lock().replace(reset_tx);
269 
270         let allocations = self.allocations.clone();
271         let five_tuple = self.five_tuple;
272         let timer_expired = Arc::clone(&self.timer_expired);
273 
274         tokio::spawn(async move {
275             let timer = tokio::time::sleep(lifetime);
276             tokio::pin!(timer);
277             let mut done = false;
278 
279             while !done {
280                 tokio::select! {
281                     _ = &mut timer => {
282                         if let Some(allocs) = &allocations{
283                             let mut alls = allocs.lock().await;
284                             if let Some(a) = alls.remove(&five_tuple) {
285                                 let _ = a.close().await;
286                             }
287                         }
288                         done = true;
289                     },
290                     result = reset_rx.recv() => {
291                         if let Some(d) = result {
292                             timer.as_mut().reset(Instant::now() + d);
293                         } else {
294                             done = true;
295                         }
296                     },
297                 }
298             }
299 
300             timer_expired.store(true, Ordering::SeqCst);
301         });
302     }
303 
stop(&self) -> bool304     fn stop(&self) -> bool {
305         let reset_tx = self.reset_tx.lock().take();
306         reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst)
307     }
308 
309     // Refresh updates the allocations lifetime
refresh(&self, lifetime: Duration)310     pub async fn refresh(&self, lifetime: Duration) {
311         let reset_tx = self.reset_tx.lock().clone();
312         if let Some(tx) = reset_tx {
313             let _ = tx.send(lifetime).await;
314         }
315     }
316 
317     //  https://tools.ietf.org/html/rfc5766#section-10.3
318     //  When the server receives a UDP datagram at a currently allocated
319     //  relayed transport address, the server looks up the allocation
320     //  associated with the relayed transport address.  The server then
321     //  checks to see whether the set of permissions for the allocation allow
322     //  the relaying of the UDP datagram as described in Section 8.
323     //
324     //  If relaying is permitted, then the server checks if there is a
325     //  channel bound to the peer that sent the UDP datagram (see
326     //  Section 11).  If a channel is bound, then processing proceeds as
327     //  described in Section 11.7.
328     //
329     //  If relaying is permitted but no channel is bound to the peer, then
330     //  the server forms and sends a Data indication.  The Data indication
331     //  MUST contain both an XOR-PEER-ADDRESS and a DATA attribute.  The DATA
332     //  attribute is set to the value of the 'data octets' field from the
333     //  datagram, and the XOR-PEER-ADDRESS attribute is set to the source
334     //  transport address of the received UDP datagram.  The Data indication
335     //  is then sent on the 5-tuple associated with the allocation.
packet_handler(&mut self)336     async fn packet_handler(&mut self) {
337         let five_tuple = self.five_tuple;
338         let relay_addr = self.relay_addr;
339         let relay_socket = Arc::clone(&self.relay_socket);
340         let turn_socket = Arc::clone(&self.turn_socket);
341         let allocations = self.allocations.clone();
342         let channel_bindings = Arc::clone(&self.channel_bindings);
343         let permissions = Arc::clone(&self.permissions);
344         let (drop_tx, drop_rx) = oneshot::channel::<u32>();
345         self.drop_tx = Some(drop_tx);
346 
347         tokio::spawn(async move {
348             let mut buffer = vec![0u8; RTP_MTU];
349 
350             tokio::pin!(drop_rx);
351 
352             loop {
353                 let (n, src_addr) = tokio::select! {
354                     result = relay_socket.recv_from(&mut buffer) => {
355                         match result {
356                             Ok((n, src_addr)) => (n, src_addr),
357                             Err(_) => {
358                                 if let Some(allocs) = &allocations {
359                                     let mut alls = allocs.lock().await;
360                                     alls.remove(&five_tuple);
361                                 }
362                                 break;
363                             }
364                         }
365                     }
366                     _ = drop_rx.as_mut() => {
367                         log::trace!("allocation has stopped, stop packet_handler. five_tuple: {:?}", five_tuple);
368                         break;
369                     }
370                 };
371 
372                 log::debug!(
373                     "relay socket {:?} received {} bytes from {}",
374                     relay_socket.local_addr(),
375                     n,
376                     src_addr
377                 );
378 
379                 let cb_number = {
380                     let mut cb_number = None;
381                     let cbs = channel_bindings.lock().await;
382                     for cb in cbs.values() {
383                         if cb.peer == src_addr {
384                             cb_number = Some(cb.number);
385                             break;
386                         }
387                     }
388                     cb_number
389                 };
390 
391                 if let Some(number) = cb_number {
392                     let mut channel_data = ChannelData {
393                         data: buffer[..n].to_vec(),
394                         number,
395                         raw: vec![],
396                     };
397                     channel_data.encode();
398 
399                     if let Err(err) = turn_socket
400                         .send_to(&channel_data.raw, five_tuple.src_addr)
401                         .await
402                     {
403                         log::error!(
404                             "Failed to send ChannelData from allocation {} {}",
405                             src_addr,
406                             err
407                         );
408                     }
409                 } else {
410                     let exist = {
411                         let ps = permissions.lock().await;
412                         ps.get(&addr2ipfingerprint(&src_addr)).is_some()
413                     };
414 
415                     if exist {
416                         let msg = {
417                             let peer_address_attr = PeerAddress {
418                                 ip: src_addr.ip(),
419                                 port: src_addr.port(),
420                             };
421                             let data_attr = Data(buffer[..n].to_vec());
422 
423                             let mut msg = Message::new();
424                             if let Err(err) = msg.build(&[
425                                 Box::new(TransactionId::new()),
426                                 Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
427                                 Box::new(peer_address_attr),
428                                 Box::new(data_attr),
429                             ]) {
430                                 log::error!(
431                                     "Failed to send DataIndication from allocation {} {}",
432                                     src_addr,
433                                     err
434                                 );
435                                 None
436                             } else {
437                                 Some(msg)
438                             }
439                         };
440 
441                         if let Some(msg) = msg {
442                             log::debug!(
443                                 "relaying message from {} to client at {}",
444                                 src_addr,
445                                 five_tuple.src_addr
446                             );
447                             if let Err(err) =
448                                 turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
449                             {
450                                 log::error!(
451                                     "Failed to send DataIndication from allocation {} {}",
452                                     src_addr,
453                                     err
454                                 );
455                             }
456                         }
457                     } else {
458                         log::info!(
459                             "No Permission or Channel exists for {} on allocation {}",
460                             src_addr,
461                             relay_addr
462                         );
463                     }
464                 }
465             }
466         });
467     }
468 }
469