1 #[cfg(test)]
2 mod allocation_test;
3
4 pub mod allocation_manager;
5 pub mod channel_bind;
6 pub mod five_tuple;
7 pub mod permission;
8
9 use crate::error::*;
10 use crate::proto::{chandata::*, channum::*, data::*, peeraddr::*, *};
11 use channel_bind::*;
12 use five_tuple::*;
13 use permission::*;
14 use stun::{agent::*, message::*, textattrs::Username};
15 use util::sync::Mutex as SyncMutex;
16
17 use util::Conn;
18
19 use std::sync::atomic::AtomicUsize;
20 use std::{
21 collections::HashMap,
22 marker::{Send, Sync},
23 net::SocketAddr,
24 sync::{atomic::AtomicBool, atomic::Ordering, Arc},
25 };
26 use tokio::{
27 sync::{
28 mpsc,
29 oneshot::{self, Sender},
30 Mutex,
31 },
32 time::{Duration, Instant},
33 };
34
35 const RTP_MTU: usize = 1500;
36
37 pub type AllocationMap = Arc<Mutex<HashMap<FiveTuple, Arc<Allocation>>>>;
38
39 /// Information about an [`Allocation`].
40 #[derive(Debug, Clone)]
41 pub struct AllocationInfo {
42 /// [`FiveTuple`] of this [`Allocation`].
43 pub five_tuple: FiveTuple,
44
45 /// Username of this [`Allocation`].
46 pub username: String,
47
48 /// Relayed bytes with this [`Allocation`].
49 #[cfg(feature = "metrics")]
50 pub relayed_bytes: usize,
51 }
52
53 impl AllocationInfo {
54 // Creates a new `AllocationInfo`
new( five_tuple: FiveTuple, username: String, #[cfg(feature = "metrics")] relayed_bytes: usize, ) -> Self55 pub fn new(
56 five_tuple: FiveTuple,
57 username: String,
58 #[cfg(feature = "metrics")] relayed_bytes: usize,
59 ) -> Self {
60 Self {
61 five_tuple,
62 username,
63 #[cfg(feature = "metrics")]
64 relayed_bytes,
65 }
66 }
67 }
68
69 // Allocation is tied to a FiveTuple and relays traffic
70 // use create_allocation and get_allocation to operate
71 pub struct Allocation {
72 protocol: Protocol,
73 turn_socket: Arc<dyn Conn + Send + Sync>,
74 pub(crate) relay_addr: SocketAddr,
75 pub(crate) relay_socket: Arc<dyn Conn + Send + Sync>,
76 five_tuple: FiveTuple,
77 username: Username,
78 permissions: Arc<Mutex<HashMap<String, Permission>>>,
79 channel_bindings: Arc<Mutex<HashMap<ChannelNumber, ChannelBind>>>,
80 pub(crate) allocations: Option<AllocationMap>,
81 reset_tx: SyncMutex<Option<mpsc::Sender<Duration>>>,
82 timer_expired: Arc<AtomicBool>,
83 closed: AtomicBool, // Option<mpsc::Receiver<()>>,
84 pub(crate) relayed_bytes: AtomicUsize,
85 drop_tx: Option<Sender<u32>>,
86 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
87 }
88
addr2ipfingerprint(addr: &SocketAddr) -> String89 fn addr2ipfingerprint(addr: &SocketAddr) -> String {
90 addr.ip().to_string()
91 }
92
93 impl Allocation {
94 // creates a new instance of NewAllocation.
new( turn_socket: Arc<dyn Conn + Send + Sync>, relay_socket: Arc<dyn Conn + Send + Sync>, relay_addr: SocketAddr, five_tuple: FiveTuple, username: Username, alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>, ) -> Self95 pub fn new(
96 turn_socket: Arc<dyn Conn + Send + Sync>,
97 relay_socket: Arc<dyn Conn + Send + Sync>,
98 relay_addr: SocketAddr,
99 five_tuple: FiveTuple,
100 username: Username,
101 alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
102 ) -> Self {
103 Allocation {
104 protocol: PROTO_UDP,
105 turn_socket,
106 relay_addr,
107 relay_socket,
108 five_tuple,
109 username,
110 permissions: Arc::new(Mutex::new(HashMap::new())),
111 channel_bindings: Arc::new(Mutex::new(HashMap::new())),
112 allocations: None,
113 reset_tx: SyncMutex::new(None),
114 timer_expired: Arc::new(AtomicBool::new(false)),
115 closed: AtomicBool::new(false),
116 relayed_bytes: Default::default(),
117 drop_tx: None,
118 alloc_close_notify,
119 }
120 }
121
122 // has_permission gets the Permission from the allocation
has_permission(&self, addr: &SocketAddr) -> bool123 pub async fn has_permission(&self, addr: &SocketAddr) -> bool {
124 let permissions = self.permissions.lock().await;
125 permissions.get(&addr2ipfingerprint(addr)).is_some()
126 }
127
128 // add_permission adds a new permission to the allocation
add_permission(&self, mut p: Permission)129 pub async fn add_permission(&self, mut p: Permission) {
130 let fingerprint = addr2ipfingerprint(&p.addr);
131
132 {
133 let permissions = self.permissions.lock().await;
134 if let Some(existed_permission) = permissions.get(&fingerprint) {
135 existed_permission.refresh(PERMISSION_TIMEOUT).await;
136 return;
137 }
138 }
139
140 p.permissions = Some(Arc::clone(&self.permissions));
141 p.start(PERMISSION_TIMEOUT).await;
142
143 {
144 let mut permissions = self.permissions.lock().await;
145 permissions.insert(fingerprint, p);
146 }
147 }
148
149 // remove_permission removes the net.Addr's fingerprint from the allocation's permissions
remove_permission(&self, addr: &SocketAddr) -> bool150 pub async fn remove_permission(&self, addr: &SocketAddr) -> bool {
151 let mut permissions = self.permissions.lock().await;
152 permissions.remove(&addr2ipfingerprint(addr)).is_some()
153 }
154
155 // add_channel_bind adds a new ChannelBind to the allocation, it also updates the
156 // permissions needed for this ChannelBind
add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()>157 pub async fn add_channel_bind(&self, mut c: ChannelBind, lifetime: Duration) -> Result<()> {
158 {
159 if let Some(addr) = self.get_channel_addr(&c.number).await {
160 if addr != c.peer {
161 return Err(Error::ErrSameChannelDifferentPeer);
162 }
163 }
164
165 if let Some(number) = self.get_channel_number(&c.peer).await {
166 if number != c.number {
167 return Err(Error::ErrSameChannelDifferentPeer);
168 }
169 }
170 }
171
172 {
173 let channel_bindings = self.channel_bindings.lock().await;
174 if let Some(cb) = channel_bindings.get(&c.number) {
175 cb.refresh(lifetime).await;
176
177 // Channel binds also refresh permissions.
178 self.add_permission(Permission::new(cb.peer)).await;
179
180 return Ok(());
181 }
182 }
183
184 let peer = c.peer;
185
186 // Add or refresh this channel.
187 c.channel_bindings = Some(Arc::clone(&self.channel_bindings));
188 c.start(lifetime).await;
189
190 {
191 let mut channel_bindings = self.channel_bindings.lock().await;
192 channel_bindings.insert(c.number, c);
193 }
194
195 // Channel binds also refresh permissions.
196 self.add_permission(Permission::new(peer)).await;
197
198 Ok(())
199 }
200
201 // remove_channel_bind removes the ChannelBind from this allocation by id
remove_channel_bind(&self, number: ChannelNumber) -> bool202 pub async fn remove_channel_bind(&self, number: ChannelNumber) -> bool {
203 let mut channel_bindings = self.channel_bindings.lock().await;
204 channel_bindings.remove(&number).is_some()
205 }
206
207 // get_channel_addr gets the ChannelBind's addr
get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr>208 pub async fn get_channel_addr(&self, number: &ChannelNumber) -> Option<SocketAddr> {
209 let channel_bindings = self.channel_bindings.lock().await;
210 channel_bindings.get(number).map(|cb| cb.peer)
211 }
212
213 // GetChannelByAddr gets the ChannelBind's number from this allocation by net.Addr
get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber>214 pub async fn get_channel_number(&self, addr: &SocketAddr) -> Option<ChannelNumber> {
215 let channel_bindings = self.channel_bindings.lock().await;
216 for cb in channel_bindings.values() {
217 if cb.peer == *addr {
218 return Some(cb.number);
219 }
220 }
221 None
222 }
223
224 // Close closes the allocation
close(&self) -> Result<()>225 pub async fn close(&self) -> Result<()> {
226 if self.closed.load(Ordering::Acquire) {
227 return Err(Error::ErrClosed);
228 }
229
230 self.closed.store(true, Ordering::Release);
231 self.stop();
232
233 {
234 let mut permissions = self.permissions.lock().await;
235 for p in permissions.values_mut() {
236 p.stop();
237 }
238 }
239
240 {
241 let mut channel_bindings = self.channel_bindings.lock().await;
242 for c in channel_bindings.values_mut() {
243 c.stop();
244 }
245 }
246
247 log::trace!("allocation with {} closed!", self.five_tuple);
248
249 let _ = self.turn_socket.close().await;
250 let _ = self.relay_socket.close().await;
251
252 if let Some(notify_tx) = &self.alloc_close_notify {
253 let _ = notify_tx
254 .send(AllocationInfo {
255 five_tuple: self.five_tuple,
256 username: self.username.text.clone(),
257 #[cfg(feature = "metrics")]
258 relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
259 })
260 .await;
261 }
262
263 Ok(())
264 }
265
start(&self, lifetime: Duration)266 pub async fn start(&self, lifetime: Duration) {
267 let (reset_tx, mut reset_rx) = mpsc::channel(1);
268 self.reset_tx.lock().replace(reset_tx);
269
270 let allocations = self.allocations.clone();
271 let five_tuple = self.five_tuple;
272 let timer_expired = Arc::clone(&self.timer_expired);
273
274 tokio::spawn(async move {
275 let timer = tokio::time::sleep(lifetime);
276 tokio::pin!(timer);
277 let mut done = false;
278
279 while !done {
280 tokio::select! {
281 _ = &mut timer => {
282 if let Some(allocs) = &allocations{
283 let mut alls = allocs.lock().await;
284 if let Some(a) = alls.remove(&five_tuple) {
285 let _ = a.close().await;
286 }
287 }
288 done = true;
289 },
290 result = reset_rx.recv() => {
291 if let Some(d) = result {
292 timer.as_mut().reset(Instant::now() + d);
293 } else {
294 done = true;
295 }
296 },
297 }
298 }
299
300 timer_expired.store(true, Ordering::SeqCst);
301 });
302 }
303
stop(&self) -> bool304 fn stop(&self) -> bool {
305 let reset_tx = self.reset_tx.lock().take();
306 reset_tx.is_none() || self.timer_expired.load(Ordering::SeqCst)
307 }
308
309 // Refresh updates the allocations lifetime
refresh(&self, lifetime: Duration)310 pub async fn refresh(&self, lifetime: Duration) {
311 let reset_tx = self.reset_tx.lock().clone();
312 if let Some(tx) = reset_tx {
313 let _ = tx.send(lifetime).await;
314 }
315 }
316
317 // https://tools.ietf.org/html/rfc5766#section-10.3
318 // When the server receives a UDP datagram at a currently allocated
319 // relayed transport address, the server looks up the allocation
320 // associated with the relayed transport address. The server then
321 // checks to see whether the set of permissions for the allocation allow
322 // the relaying of the UDP datagram as described in Section 8.
323 //
324 // If relaying is permitted, then the server checks if there is a
325 // channel bound to the peer that sent the UDP datagram (see
326 // Section 11). If a channel is bound, then processing proceeds as
327 // described in Section 11.7.
328 //
329 // If relaying is permitted but no channel is bound to the peer, then
330 // the server forms and sends a Data indication. The Data indication
331 // MUST contain both an XOR-PEER-ADDRESS and a DATA attribute. The DATA
332 // attribute is set to the value of the 'data octets' field from the
333 // datagram, and the XOR-PEER-ADDRESS attribute is set to the source
334 // transport address of the received UDP datagram. The Data indication
335 // is then sent on the 5-tuple associated with the allocation.
packet_handler(&mut self)336 async fn packet_handler(&mut self) {
337 let five_tuple = self.five_tuple;
338 let relay_addr = self.relay_addr;
339 let relay_socket = Arc::clone(&self.relay_socket);
340 let turn_socket = Arc::clone(&self.turn_socket);
341 let allocations = self.allocations.clone();
342 let channel_bindings = Arc::clone(&self.channel_bindings);
343 let permissions = Arc::clone(&self.permissions);
344 let (drop_tx, drop_rx) = oneshot::channel::<u32>();
345 self.drop_tx = Some(drop_tx);
346
347 tokio::spawn(async move {
348 let mut buffer = vec![0u8; RTP_MTU];
349
350 tokio::pin!(drop_rx);
351
352 loop {
353 let (n, src_addr) = tokio::select! {
354 result = relay_socket.recv_from(&mut buffer) => {
355 match result {
356 Ok((n, src_addr)) => (n, src_addr),
357 Err(_) => {
358 if let Some(allocs) = &allocations {
359 let mut alls = allocs.lock().await;
360 alls.remove(&five_tuple);
361 }
362 break;
363 }
364 }
365 }
366 _ = drop_rx.as_mut() => {
367 log::trace!("allocation has stopped, stop packet_handler. five_tuple: {:?}", five_tuple);
368 break;
369 }
370 };
371
372 log::debug!(
373 "relay socket {:?} received {} bytes from {}",
374 relay_socket.local_addr(),
375 n,
376 src_addr
377 );
378
379 let cb_number = {
380 let mut cb_number = None;
381 let cbs = channel_bindings.lock().await;
382 for cb in cbs.values() {
383 if cb.peer == src_addr {
384 cb_number = Some(cb.number);
385 break;
386 }
387 }
388 cb_number
389 };
390
391 if let Some(number) = cb_number {
392 let mut channel_data = ChannelData {
393 data: buffer[..n].to_vec(),
394 number,
395 raw: vec![],
396 };
397 channel_data.encode();
398
399 if let Err(err) = turn_socket
400 .send_to(&channel_data.raw, five_tuple.src_addr)
401 .await
402 {
403 log::error!(
404 "Failed to send ChannelData from allocation {} {}",
405 src_addr,
406 err
407 );
408 }
409 } else {
410 let exist = {
411 let ps = permissions.lock().await;
412 ps.get(&addr2ipfingerprint(&src_addr)).is_some()
413 };
414
415 if exist {
416 let msg = {
417 let peer_address_attr = PeerAddress {
418 ip: src_addr.ip(),
419 port: src_addr.port(),
420 };
421 let data_attr = Data(buffer[..n].to_vec());
422
423 let mut msg = Message::new();
424 if let Err(err) = msg.build(&[
425 Box::new(TransactionId::new()),
426 Box::new(MessageType::new(METHOD_DATA, CLASS_INDICATION)),
427 Box::new(peer_address_attr),
428 Box::new(data_attr),
429 ]) {
430 log::error!(
431 "Failed to send DataIndication from allocation {} {}",
432 src_addr,
433 err
434 );
435 None
436 } else {
437 Some(msg)
438 }
439 };
440
441 if let Some(msg) = msg {
442 log::debug!(
443 "relaying message from {} to client at {}",
444 src_addr,
445 five_tuple.src_addr
446 );
447 if let Err(err) =
448 turn_socket.send_to(&msg.raw, five_tuple.src_addr).await
449 {
450 log::error!(
451 "Failed to send DataIndication from allocation {} {}",
452 src_addr,
453 err
454 );
455 }
456 }
457 } else {
458 log::info!(
459 "No Permission or Channel exists for {} on allocation {}",
460 src_addr,
461 relay_addr
462 );
463 }
464 }
465 }
466 });
467 }
468 }
469