1 #[cfg(test)] 2 mod channel_bind_test; 3 4 use super::*; 5 use crate::proto::channum::*; 6 7 use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc}; 8 use tokio::sync::Mutex; 9 use tokio::time::{Duration, Instant}; 10 11 // ChannelBind represents a TURN Channel 12 // https://tools.ietf.org/html/rfc5766#section-2.5 13 #[derive(Clone)] 14 pub struct ChannelBind { 15 pub(crate) peer: SocketAddr, 16 pub(crate) number: ChannelNumber, 17 pub(crate) channel_bindings: Option<Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>>, 18 reset_tx: Option<mpsc::Sender<Duration>>, 19 timer_expired: Arc<AtomicBool>, 20 } 21 22 impl ChannelBind { 23 // NewChannelBind creates a new ChannelBind new(number: ChannelNumber, peer: SocketAddr) -> Self24 pub fn new(number: ChannelNumber, peer: SocketAddr) -> Self { 25 ChannelBind { 26 number, 27 peer, 28 channel_bindings: None, 29 reset_tx: None, 30 timer_expired: Arc::new(AtomicBool::new(false)), 31 } 32 } 33 start(&mut self, lifetime: Duration)34 pub(crate) async fn start(&mut self, lifetime: Duration) { 35 let (reset_tx, mut reset_rx) = mpsc::channel(1); 36 self.reset_tx = Some(reset_tx); 37 38 let channel_bindings = self.channel_bindings.clone(); 39 let number = self.number; 40 let timer_expired = Arc::clone(&self.timer_expired); 41 42 tokio::spawn(async move { 43 let timer = tokio::time::sleep(lifetime); 44 tokio::pin!(timer); 45 let mut done = false; 46 47 while !done { 48 tokio::select! { 49 _ = &mut timer => { 50 if let Some(cbs) = &channel_bindings{ 51 let mut cb = cbs.lock().await; 52 if cb.remove(&number).is_none() { 53 log::error!("Failed to remove ChannelBind for {}", number); 54 } 55 } 56 done = true; 57 }, 58 result = reset_rx.recv() => { 59 if let Some(d) = result { 60 timer.as_mut().reset(Instant::now() + d); 61 } else { 62 done = true; 63 } 64 }, 65 } 66 } 67 68 timer_expired.store(true, Ordering::SeqCst); 69 }); 70 } 71 stop(&mut self) -> bool72 pub(crate) fn stop(&mut self) -> bool { 73 let expired = self.reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst); 74 self.reset_tx.take(); 75 expired 76 } 77 refresh(&self, lifetime: Duration)78 pub(crate) async fn refresh(&self, lifetime: Duration) { 79 if let Some(tx) = &self.reset_tx { 80 let _ = tx.send(lifetime).await; 81 } 82 } 83 } 84