xref: /webrtc/turn/src/allocation/channel_bind.rs (revision ffe74184)
1 #[cfg(test)]
2 mod channel_bind_test;
3 
4 use super::*;
5 use crate::proto::channum::*;
6 
7 use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
8 use tokio::sync::Mutex;
9 use tokio::time::{Duration, Instant};
10 
11 // ChannelBind represents a TURN Channel
12 // https://tools.ietf.org/html/rfc5766#section-2.5
13 #[derive(Clone)]
14 pub struct ChannelBind {
15     pub(crate) peer: SocketAddr,
16     pub(crate) number: ChannelNumber,
17     pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>,
18     reset_tx: Option<mpsc::Sender<Duration>>,
19     timer_expired: Arc<AtomicBool>,
20 }
21 
22 impl ChannelBind {
23     // NewChannelBind creates a new ChannelBind
new(number: ChannelNumber, peer: SocketAddr) -> Self24     pub fn new(number: ChannelNumber, peer: SocketAddr) -> Self {
25         ChannelBind {
26             number,
27             peer,
28             channel_bindings: None,
29             reset_tx: None,
30             timer_expired: Arc::new(AtomicBool::new(false)),
31         }
32     }
33 
start(&mut self, lifetime: Duration)34     pub(crate) async fn start(&mut self, lifetime: Duration) {
35         let (reset_tx, mut reset_rx) = mpsc::channel(1);
36         self.reset_tx = Some(reset_tx);
37 
38         let channel_bindings = self.channel_bindings.clone();
39         let number = self.number;
40         let timer_expired = Arc::clone(&self.timer_expired);
41 
42         tokio::spawn(async move {
43             let timer = tokio::time::sleep(lifetime);
44             tokio::pin!(timer);
45             let mut done = false;
46 
47             while !done {
48                 tokio::select! {
49                     _ = &mut timer => {
50                         if let Some(cbs) = &channel_bindings{
51                             let mut cb = cbs.lock().await;
52                             if cb.remove(&number).is_none() {
53                                 log::error!("Failed to remove ChannelBind for {}", number);
54                             }
55                         }
56                         done = true;
57                     },
58                     result = reset_rx.recv() => {
59                         if let Some(d) = result {
60                             timer.as_mut().reset(Instant::now() + d);
61                         } else {
62                             done = true;
63                         }
64                     },
65                 }
66             }
67 
68             timer_expired.store(true, Ordering::SeqCst);
69         });
70     }
71 
stop(&mut self) -> bool72     pub(crate) fn stop(&mut self) -> bool {
73         let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst);
74         self.reset_tx.take();
75         expired
76     }
77 
refresh(&self, lifetime: Duration)78     pub(crate) async fn refresh(&self, lifetime: Duration) {
79         if let Some(tx) = &self.reset_tx {
80             let _ = tx.send(lifetime).await;
81         }
82     }
83 }
84