xref: /webrtc/turn/src/allocation/permission.rs (revision ffe74184)
1 use super::*;
2 
3 use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
4 use tokio::sync::Mutex;
5 use tokio::time::{Duration, Instant};
6 
7 pub(crate) const PERMISSION_TIMEOUT: Duration = Duration::from_secs(5 * 60);
8 
9 // Permission represents a TURN permission. TURN permissions mimic the address-restricted
10 // filtering mechanism of NATs that comply with [RFC4787].
11 // https://tools.ietf.org/html/rfc5766#section-2.3
12 pub struct Permission {
13     pub(crate) addr: SocketAddr,
14     pub(crate) permissions: Option<Arc<Mutex<HashMap<String, Permission>>>>,
15     reset_tx: Option<mpsc::Sender<Duration>>,
16     timer_expired: Arc<AtomicBool>,
17 }
18 
19 impl Permission {
20     // NewPermission create a new Permission
new(addr: SocketAddr) -> Self21     pub fn new(addr: SocketAddr) -> Self {
22         Permission {
23             addr,
24             permissions: None,
25             reset_tx: None,
26             timer_expired: Arc::new(AtomicBool::new(false)),
27         }
28     }
29 
start(&mut self, lifetime: Duration)30     pub(crate) async fn start(&mut self, lifetime: Duration) {
31         let (reset_tx, mut reset_rx) = mpsc::channel(1);
32         self.reset_tx = Some(reset_tx);
33 
34         let permissions = self.permissions.clone();
35         let addr = self.addr;
36         let timer_expired = Arc::clone(&self.timer_expired);
37 
38         tokio::spawn(async move {
39             let timer = tokio::time::sleep(lifetime);
40             tokio::pin!(timer);
41             let mut done = false;
42 
43             while !done {
44                 tokio::select! {
45                     _ = &mut timer => {
46                         if let Some(perms) = &permissions{
47                             let mut p = perms.lock().await;
48                             p.remove(&addr2ipfingerprint(&addr));
49                         }
50                         done = true;
51                     },
52                     result = reset_rx.recv() => {
53                         if let Some(d) = result {
54                             timer.as_mut().reset(Instant::now() + d);
55                         } else {
56                             done = true;
57                         }
58                     },
59                 }
60             }
61 
62             timer_expired.store(true, Ordering::SeqCst);
63         });
64     }
65 
stop(&mut self) -> bool66     pub(crate) fn stop(&mut self) -> bool {
67         let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
68         self.reset_tx.take();
69         expired
70     }
71 
refresh(&self, lifetime: Duration)72     pub(crate) async fn refresh(&self, lifetime: Duration) {
73         if let Some(tx) = &self.reset_tx {
74             let _ = tx.send(lifetime).await;
75         }
76     }
77 }
78