1 use super::*;
2
3 use crate::{
4 auth::{generate_auth_key, AuthHandler},
5 client::{Client, ClientConfig},
6 error::Result,
7 proto::lifetime::DEFAULT_LIFETIME,
8 relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic},
9 server::{
10 config::{ConnConfig, ServerConfig},
11 Server,
12 },
13 };
14
15 use std::{
16 net::{IpAddr, Ipv4Addr},
17 str::FromStr,
18 };
19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
20 use tokio::net::UdpSocket;
21 use tokio::sync::mpsc::Sender;
22 use util::vnet::net::*;
23
new_test_manager() -> Manager24 fn new_test_manager() -> Manager {
25 let config = ManagerConfig {
26 relay_addr_generator: Box::new(RelayAddressGeneratorNone {
27 address: "0.0.0.0".to_owned(),
28 net: Arc::new(Net::new(None)),
29 }),
30 alloc_close_notify: None,
31 };
32 Manager::new(config)
33 }
34
random_five_tuple() -> FiveTuple35 fn random_five_tuple() -> FiveTuple {
36 /* #nosec */
37 FiveTuple {
38 src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
39 dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
40 ..Default::default()
41 }
42 }
43
44 #[tokio::test]
test_packet_handler() -> Result<()>45 async fn test_packet_handler() -> Result<()> {
46 //env_logger::init();
47
48 // turn server initialization
49 let turn_socket = UdpSocket::bind("127.0.0.1:0").await?;
50
51 // client listener initialization
52 let client_listener = UdpSocket::bind("127.0.0.1:0").await?;
53 let src_addr = client_listener.local_addr()?;
54 let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1);
55 // client listener read data
56 tokio::spawn(async move {
57 let mut buffer = vec![0u8; RTP_MTU];
58 loop {
59 let n = match client_listener.recv_from(&mut buffer).await {
60 Ok((n, _)) => n,
61 Err(_) => break,
62 };
63
64 let _ = data_ch_tx.send(buffer[..n].to_vec()).await;
65 }
66 });
67
68 let m = new_test_manager();
69 let a = m
70 .create_allocation(
71 FiveTuple {
72 src_addr,
73 dst_addr: turn_socket.local_addr()?,
74 ..Default::default()
75 },
76 Arc::new(turn_socket),
77 0,
78 DEFAULT_LIFETIME,
79 TextAttribute::new(ATTR_USERNAME, "user".into()),
80 )
81 .await?;
82
83 let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?;
84 let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?;
85
86 let channel_bind = ChannelBind::new(
87 ChannelNumber(MIN_CHANNEL_NUMBER),
88 peer_listener2.local_addr()?,
89 );
90
91 let port = {
92 // add permission with peer1 address
93 a.add_permission(Permission::new(peer_listener1.local_addr()?))
94 .await;
95 // add channel with min channel number and peer2 address
96 a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME)
97 .await?;
98
99 a.relay_socket.local_addr()?.port()
100 };
101
102 let relay_addr_with_host_str = format!("127.0.0.1:{port}");
103 let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?;
104
105 // test for permission and data message
106 let target_text = "permission";
107 let _ = peer_listener1
108 .send_to(target_text.as_bytes(), relay_addr_with_host)
109 .await?;
110 let data = data_ch_rx
111 .recv()
112 .await
113 .ok_or(Error::Other("data ch closed".to_owned()))?;
114
115 // resolve stun data message
116 assert!(is_message(&data), "should be stun message");
117
118 let mut msg = Message::new();
119 msg.raw = data;
120 msg.decode()?;
121
122 let mut msg_data = Data::default();
123 msg_data.get_from(&msg)?;
124 assert_eq!(
125 target_text.as_bytes(),
126 &msg_data.0,
127 "get message doesn't equal the target text"
128 );
129
130 // test for channel bind and channel data
131 let target_text2 = "channel bind";
132 let _ = peer_listener2
133 .send_to(target_text2.as_bytes(), relay_addr_with_host)
134 .await?;
135 let data = data_ch_rx
136 .recv()
137 .await
138 .ok_or(Error::Other("data ch closed".to_owned()))?;
139
140 // resolve channel data
141 assert!(
142 ChannelData::is_channel_data(&data),
143 "should be channel data"
144 );
145
146 let mut channel_data = ChannelData {
147 raw: data,
148 ..Default::default()
149 };
150 channel_data.decode()?;
151 assert_eq!(
152 channel_bind.number, channel_data.number,
153 "get channel data's number is invalid"
154 );
155 assert_eq!(
156 target_text2.as_bytes(),
157 &channel_data.data,
158 "get data doesn't equal the target text."
159 );
160
161 // listeners close
162 m.close().await?;
163
164 Ok(())
165 }
166
167 #[tokio::test]
test_create_allocation_duplicate_five_tuple() -> Result<()>168 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
169 //env_logger::init();
170
171 // turn server initialization
172 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
173
174 let m = new_test_manager();
175
176 let five_tuple = random_five_tuple();
177
178 let _ = m
179 .create_allocation(
180 five_tuple,
181 Arc::clone(&turn_socket),
182 0,
183 DEFAULT_LIFETIME,
184 TextAttribute::new(ATTR_USERNAME, "user".into()),
185 )
186 .await?;
187
188 let result = m
189 .create_allocation(
190 five_tuple,
191 Arc::clone(&turn_socket),
192 0,
193 DEFAULT_LIFETIME,
194 TextAttribute::new(ATTR_USERNAME, "user".into()),
195 )
196 .await;
197 assert!(result.is_err(), "expected error, but got ok");
198
199 Ok(())
200 }
201
202 #[tokio::test]
test_delete_allocation() -> Result<()>203 async fn test_delete_allocation() -> Result<()> {
204 //env_logger::init();
205
206 // turn server initialization
207 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
208
209 let m = new_test_manager();
210
211 let five_tuple = random_five_tuple();
212
213 let _ = m
214 .create_allocation(
215 five_tuple,
216 Arc::clone(&turn_socket),
217 0,
218 DEFAULT_LIFETIME,
219 TextAttribute::new(ATTR_USERNAME, "user".into()),
220 )
221 .await?;
222
223 assert!(
224 m.get_allocation(&five_tuple).await.is_some(),
225 "Failed to get allocation right after creation"
226 );
227
228 m.delete_allocation(&five_tuple).await;
229
230 assert!(
231 m.get_allocation(&five_tuple).await.is_none(),
232 "Get allocation with {five_tuple} should be nil after delete"
233 );
234
235 Ok(())
236 }
237
238 #[tokio::test]
test_allocation_timeout() -> Result<()>239 async fn test_allocation_timeout() -> Result<()> {
240 //env_logger::init();
241
242 // turn server initialization
243 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
244
245 let m = new_test_manager();
246
247 let mut allocations = vec![];
248 let lifetime = Duration::from_millis(100);
249
250 for _ in 0..5 {
251 let five_tuple = random_five_tuple();
252
253 let a = m
254 .create_allocation(
255 five_tuple,
256 Arc::clone(&turn_socket),
257 0,
258 lifetime,
259 TextAttribute::new(ATTR_USERNAME, "user".into()),
260 )
261 .await?;
262
263 allocations.push(a);
264 }
265
266 let mut count = 0;
267
268 'outer: loop {
269 count += 1;
270
271 if count >= 10 {
272 panic!("Allocations didn't timeout");
273 }
274
275 tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
276
277 let any_outstanding = false;
278
279 for a in &allocations {
280 if a.close().await.is_ok() {
281 continue 'outer;
282 }
283 }
284
285 if !any_outstanding {
286 return Ok(());
287 }
288 }
289 }
290
291 #[tokio::test]
test_manager_close() -> Result<()>292 async fn test_manager_close() -> Result<()> {
293 // env_logger::init();
294
295 // turn server initialization
296 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
297
298 let m = new_test_manager();
299
300 let mut allocations = vec![];
301
302 let a1 = m
303 .create_allocation(
304 random_five_tuple(),
305 Arc::clone(&turn_socket),
306 0,
307 Duration::from_millis(100),
308 TextAttribute::new(ATTR_USERNAME, "user".into()),
309 )
310 .await?;
311 allocations.push(a1);
312
313 let a2 = m
314 .create_allocation(
315 random_five_tuple(),
316 Arc::clone(&turn_socket),
317 0,
318 Duration::from_millis(200),
319 TextAttribute::new(ATTR_USERNAME, "user".into()),
320 )
321 .await?;
322 allocations.push(a2);
323
324 tokio::time::sleep(Duration::from_millis(150)).await;
325
326 log::trace!("Mgr is going to be closed...");
327
328 m.close().await?;
329
330 for a in allocations {
331 assert!(
332 a.close().await.is_err(),
333 "Allocation should be closed if lifetime timeout"
334 );
335 }
336
337 Ok(())
338 }
339
340 #[tokio::test]
test_delete_allocation_by_username() -> Result<()>341 async fn test_delete_allocation_by_username() -> Result<()> {
342 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
343
344 let m = new_test_manager();
345
346 let five_tuple1 = random_five_tuple();
347 let five_tuple2 = random_five_tuple();
348 let five_tuple3 = random_five_tuple();
349
350 let _ = m
351 .create_allocation(
352 five_tuple1,
353 Arc::clone(&turn_socket),
354 0,
355 DEFAULT_LIFETIME,
356 TextAttribute::new(ATTR_USERNAME, "user".into()),
357 )
358 .await?;
359 let _ = m
360 .create_allocation(
361 five_tuple2,
362 Arc::clone(&turn_socket),
363 0,
364 DEFAULT_LIFETIME,
365 TextAttribute::new(ATTR_USERNAME, "user".into()),
366 )
367 .await?;
368 let _ = m
369 .create_allocation(
370 five_tuple3,
371 Arc::clone(&turn_socket),
372 0,
373 DEFAULT_LIFETIME,
374 TextAttribute::new(ATTR_USERNAME, "user2".into()),
375 )
376 .await?;
377
378 assert_eq!(m.allocations.lock().await.len(), 3);
379
380 m.delete_allocations_by_username("user").await;
381
382 assert_eq!(m.allocations.lock().await.len(), 1);
383
384 assert!(
385 m.get_allocation(&five_tuple1).await.is_none()
386 && m.get_allocation(&five_tuple2).await.is_none()
387 && m.get_allocation(&five_tuple3).await.is_some()
388 );
389
390 Ok(())
391 }
392
393 struct TestAuthHandler;
394 impl AuthHandler for TestAuthHandler {
auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>>395 fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> {
396 Ok(generate_auth_key(username, realm, "pass"))
397 }
398 }
399
create_server( alloc_close_notify: Option<Sender<AllocationInfo>>, ) -> Result<(Server, u16)>400 async fn create_server(
401 alloc_close_notify: Option<Sender<AllocationInfo>>,
402 ) -> Result<(Server, u16)> {
403 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
404 let server_port = conn.local_addr()?.port();
405
406 let server = Server::new(ServerConfig {
407 conn_configs: vec![ConnConfig {
408 conn,
409 relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
410 relay_address: IpAddr::from_str("127.0.0.1")?,
411 address: "0.0.0.0".to_owned(),
412 net: Arc::new(Net::new(None)),
413 }),
414 }],
415 realm: "webrtc.rs".to_owned(),
416 auth_handler: Arc::new(TestAuthHandler {}),
417 channel_bind_timeout: Duration::from_secs(0),
418 alloc_close_notify,
419 })
420 .await?;
421
422 Ok((server, server_port))
423 }
424
create_client(username: String, server_port: u16) -> Result<Client>425 async fn create_client(username: String, server_port: u16) -> Result<Client> {
426 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
427
428 Client::new(ClientConfig {
429 stun_serv_addr: format!("127.0.0.1:{server_port}"),
430 turn_serv_addr: format!("127.0.0.1:{server_port}"),
431 username,
432 password: "pass".to_owned(),
433 realm: String::new(),
434 software: String::new(),
435 rto_in_ms: 0,
436 conn,
437 vnet: None,
438 })
439 .await
440 }
441
442 #[cfg(feature = "metrics")]
443 #[tokio::test]
test_get_allocations_info() -> Result<()>444 async fn test_get_allocations_info() -> Result<()> {
445 let (server, server_port) = create_server(None).await?;
446
447 let client1 = create_client("user1".to_owned(), server_port).await?;
448 client1.listen().await?;
449
450 let client2 = create_client("user2".to_owned(), server_port).await?;
451 client2.listen().await?;
452
453 let client3 = create_client("user3".to_owned(), server_port).await?;
454 client3.listen().await?;
455
456 assert!(server.get_allocations_info(None).await?.is_empty());
457
458 let user1 = client1.allocate().await?;
459 let user2 = client2.allocate().await?;
460 let user3 = client3.allocate().await?;
461
462 assert_eq!(server.get_allocations_info(None).await?.len(), 3);
463
464 let addr1 = client1
465 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
466 .await?;
467 let addr2 = client2
468 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
469 .await?;
470 let addr3 = client3
471 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
472 .await?;
473
474 user1.send_to(b"1", addr1).await?;
475 user2.send_to(b"12", addr2).await?;
476 user3.send_to(b"123", addr3).await?;
477
478 tokio::time::sleep(Duration::from_millis(100)).await;
479
480 server
481 .get_allocations_info(None)
482 .await?
483 .iter()
484 .for_each(|(_, ai)| match ai.username.as_str() {
485 "user1" => assert_eq!(ai.relayed_bytes, 1),
486 "user2" => assert_eq!(ai.relayed_bytes, 2),
487 "user3" => assert_eq!(ai.relayed_bytes, 3),
488 _ => unreachable!(),
489 });
490
491 Ok(())
492 }
493
494 #[cfg(feature = "metrics")]
495 #[tokio::test]
test_get_allocations_info_bytes_count() -> Result<()>496 async fn test_get_allocations_info_bytes_count() -> Result<()> {
497 let (server, server_port) = create_server(None).await?;
498
499 let client = create_client("foo".to_owned(), server_port).await?;
500
501 client.listen().await?;
502
503 assert!(server.get_allocations_info(None).await?.is_empty());
504
505 let conn = client.allocate().await?;
506 let addr = client
507 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
508 .await?;
509
510 assert!(!server.get_allocations_info(None).await?.is_empty());
511
512 assert_eq!(
513 server
514 .get_allocations_info(None)
515 .await?
516 .values()
517 .last()
518 .unwrap()
519 .relayed_bytes,
520 0
521 );
522
523 for _ in 0..10 {
524 conn.send_to(b"Hello", addr).await?;
525
526 tokio::time::sleep(Duration::from_millis(100)).await;
527 }
528
529 tokio::time::sleep(Duration::from_millis(1000)).await;
530
531 assert_eq!(
532 server
533 .get_allocations_info(None)
534 .await?
535 .values()
536 .last()
537 .unwrap()
538 .relayed_bytes,
539 50
540 );
541
542 for _ in 0..10 {
543 conn.send_to(b"Hello", addr).await?;
544
545 tokio::time::sleep(Duration::from_millis(100)).await;
546 }
547
548 tokio::time::sleep(Duration::from_millis(1000)).await;
549
550 assert_eq!(
551 server
552 .get_allocations_info(None)
553 .await?
554 .values()
555 .last()
556 .unwrap()
557 .relayed_bytes,
558 100
559 );
560
561 client.close().await?;
562 server.close().await?;
563
564 Ok(())
565 }
566
567 #[cfg(feature = "metrics")]
568 #[tokio::test]
test_alloc_close_notify() -> Result<()>569 async fn test_alloc_close_notify() -> Result<()> {
570 let (tx, mut rx) = mpsc::channel::<AllocationInfo>(1);
571
572 tokio::spawn(async move {
573 if let Some(alloc) = rx.recv().await {
574 assert_eq!(alloc.relayed_bytes, 50);
575 }
576 });
577
578 let (server, server_port) = create_server(Some(tx)).await?;
579
580 let client = create_client("foo".to_owned(), server_port).await?;
581
582 client.listen().await?;
583
584 assert!(server.get_allocations_info(None).await?.is_empty());
585
586 let conn = client.allocate().await?;
587 let addr = client
588 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
589 .await?;
590
591 assert!(!server.get_allocations_info(None).await?.is_empty());
592
593 for _ in 0..10 {
594 conn.send_to(b"Hello", addr).await?;
595
596 tokio::time::sleep(Duration::from_millis(100)).await;
597 }
598
599 tokio::time::sleep(Duration::from_millis(1000)).await;
600
601 client.close().await?;
602 server.close().await?;
603
604 tokio::time::sleep(Duration::from_millis(1000)).await;
605
606 Ok(())
607 }
608