1 use super::*;
2 
3 use crate::{
4     auth::{generate_auth_key, AuthHandler},
5     client::{Client, ClientConfig},
6     error::Result,
7     proto::lifetime::DEFAULT_LIFETIME,
8     relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic},
9     server::{
10         config::{ConnConfig, ServerConfig},
11         Server,
12     },
13 };
14 
15 use std::{
16     net::{IpAddr, Ipv4Addr},
17     str::FromStr,
18 };
19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
20 use tokio::net::UdpSocket;
21 use tokio::sync::mpsc::Sender;
22 use util::vnet::net::*;
23 
new_test_manager() -> Manager24 fn new_test_manager() -> Manager {
25     let config = ManagerConfig {
26         relay_addr_generator: Box::new(RelayAddressGeneratorNone {
27             address: "0.0.0.0".to_owned(),
28             net: Arc::new(Net::new(None)),
29         }),
30         alloc_close_notify: None,
31     };
32     Manager::new(config)
33 }
34 
random_five_tuple() -> FiveTuple35 fn random_five_tuple() -> FiveTuple {
36     /* #nosec */
37     FiveTuple {
38         src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
39         dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
40         ..Default::default()
41     }
42 }
43 
44 #[tokio::test]
test_packet_handler() -> Result<()>45 async fn test_packet_handler() -> Result<()> {
46     //env_logger::init();
47 
48     // turn server initialization
49     let turn_socket = UdpSocket::bind("127.0.0.1:0").await?;
50 
51     // client listener initialization
52     let client_listener = UdpSocket::bind("127.0.0.1:0").await?;
53     let src_addr = client_listener.local_addr()?;
54     let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1);
55     // client listener read data
56     tokio::spawn(async move {
57         let mut buffer = vec![0u8; RTP_MTU];
58         loop {
59             let n = match client_listener.recv_from(&mut buffer).await {
60                 Ok((n, _)) => n,
61                 Err(_) => break,
62             };
63 
64             let _ = data_ch_tx.send(buffer[..n].to_vec()).await;
65         }
66     });
67 
68     let m = new_test_manager();
69     let a = m
70         .create_allocation(
71             FiveTuple {
72                 src_addr,
73                 dst_addr: turn_socket.local_addr()?,
74                 ..Default::default()
75             },
76             Arc::new(turn_socket),
77             0,
78             DEFAULT_LIFETIME,
79             TextAttribute::new(ATTR_USERNAME, "user".into()),
80         )
81         .await?;
82 
83     let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?;
84     let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?;
85 
86     let channel_bind = ChannelBind::new(
87         ChannelNumber(MIN_CHANNEL_NUMBER),
88         peer_listener2.local_addr()?,
89     );
90 
91     let port = {
92         // add permission with peer1 address
93         a.add_permission(Permission::new(peer_listener1.local_addr()?))
94             .await;
95         // add channel with min channel number and peer2 address
96         a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME)
97             .await?;
98 
99         a.relay_socket.local_addr()?.port()
100     };
101 
102     let relay_addr_with_host_str = format!("127.0.0.1:{port}");
103     let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?;
104 
105     // test for permission and data message
106     let target_text = "permission";
107     let _ = peer_listener1
108         .send_to(target_text.as_bytes(), relay_addr_with_host)
109         .await?;
110     let data = data_ch_rx
111         .recv()
112         .await
113         .ok_or(Error::Other("data ch closed".to_owned()))?;
114 
115     // resolve stun data message
116     assert!(is_message(&data), "should be stun message");
117 
118     let mut msg = Message::new();
119     msg.raw = data;
120     msg.decode()?;
121 
122     let mut msg_data = Data::default();
123     msg_data.get_from(&msg)?;
124     assert_eq!(
125         target_text.as_bytes(),
126         &msg_data.0,
127         "get message doesn't equal the target text"
128     );
129 
130     // test for channel bind and channel data
131     let target_text2 = "channel bind";
132     let _ = peer_listener2
133         .send_to(target_text2.as_bytes(), relay_addr_with_host)
134         .await?;
135     let data = data_ch_rx
136         .recv()
137         .await
138         .ok_or(Error::Other("data ch closed".to_owned()))?;
139 
140     // resolve channel data
141     assert!(
142         ChannelData::is_channel_data(&data),
143         "should be channel data"
144     );
145 
146     let mut channel_data = ChannelData {
147         raw: data,
148         ..Default::default()
149     };
150     channel_data.decode()?;
151     assert_eq!(
152         channel_bind.number, channel_data.number,
153         "get channel data's number is invalid"
154     );
155     assert_eq!(
156         target_text2.as_bytes(),
157         &channel_data.data,
158         "get data doesn't equal the target text."
159     );
160 
161     // listeners close
162     m.close().await?;
163 
164     Ok(())
165 }
166 
167 #[tokio::test]
test_create_allocation_duplicate_five_tuple() -> Result<()>168 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
169     //env_logger::init();
170 
171     // turn server initialization
172     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
173 
174     let m = new_test_manager();
175 
176     let five_tuple = random_five_tuple();
177 
178     let _ = m
179         .create_allocation(
180             five_tuple,
181             Arc::clone(&turn_socket),
182             0,
183             DEFAULT_LIFETIME,
184             TextAttribute::new(ATTR_USERNAME, "user".into()),
185         )
186         .await?;
187 
188     let result = m
189         .create_allocation(
190             five_tuple,
191             Arc::clone(&turn_socket),
192             0,
193             DEFAULT_LIFETIME,
194             TextAttribute::new(ATTR_USERNAME, "user".into()),
195         )
196         .await;
197     assert!(result.is_err(), "expected error, but got ok");
198 
199     Ok(())
200 }
201 
202 #[tokio::test]
test_delete_allocation() -> Result<()>203 async fn test_delete_allocation() -> Result<()> {
204     //env_logger::init();
205 
206     // turn server initialization
207     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
208 
209     let m = new_test_manager();
210 
211     let five_tuple = random_five_tuple();
212 
213     let _ = m
214         .create_allocation(
215             five_tuple,
216             Arc::clone(&turn_socket),
217             0,
218             DEFAULT_LIFETIME,
219             TextAttribute::new(ATTR_USERNAME, "user".into()),
220         )
221         .await?;
222 
223     assert!(
224         m.get_allocation(&five_tuple).await.is_some(),
225         "Failed to get allocation right after creation"
226     );
227 
228     m.delete_allocation(&five_tuple).await;
229 
230     assert!(
231         m.get_allocation(&five_tuple).await.is_none(),
232         "Get allocation with {five_tuple} should be nil after delete"
233     );
234 
235     Ok(())
236 }
237 
238 #[tokio::test]
test_allocation_timeout() -> Result<()>239 async fn test_allocation_timeout() -> Result<()> {
240     //env_logger::init();
241 
242     // turn server initialization
243     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
244 
245     let m = new_test_manager();
246 
247     let mut allocations = vec![];
248     let lifetime = Duration::from_millis(100);
249 
250     for _ in 0..5 {
251         let five_tuple = random_five_tuple();
252 
253         let a = m
254             .create_allocation(
255                 five_tuple,
256                 Arc::clone(&turn_socket),
257                 0,
258                 lifetime,
259                 TextAttribute::new(ATTR_USERNAME, "user".into()),
260             )
261             .await?;
262 
263         allocations.push(a);
264     }
265 
266     let mut count = 0;
267 
268     'outer: loop {
269         count += 1;
270 
271         if count >= 10 {
272             panic!("Allocations didn't timeout");
273         }
274 
275         tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
276 
277         let any_outstanding = false;
278 
279         for a in &allocations {
280             if a.close().await.is_ok() {
281                 continue 'outer;
282             }
283         }
284 
285         if !any_outstanding {
286             return Ok(());
287         }
288     }
289 }
290 
291 #[tokio::test]
test_manager_close() -> Result<()>292 async fn test_manager_close() -> Result<()> {
293     // env_logger::init();
294 
295     // turn server initialization
296     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
297 
298     let m = new_test_manager();
299 
300     let mut allocations = vec![];
301 
302     let a1 = m
303         .create_allocation(
304             random_five_tuple(),
305             Arc::clone(&turn_socket),
306             0,
307             Duration::from_millis(100),
308             TextAttribute::new(ATTR_USERNAME, "user".into()),
309         )
310         .await?;
311     allocations.push(a1);
312 
313     let a2 = m
314         .create_allocation(
315             random_five_tuple(),
316             Arc::clone(&turn_socket),
317             0,
318             Duration::from_millis(200),
319             TextAttribute::new(ATTR_USERNAME, "user".into()),
320         )
321         .await?;
322     allocations.push(a2);
323 
324     tokio::time::sleep(Duration::from_millis(150)).await;
325 
326     log::trace!("Mgr is going to be closed...");
327 
328     m.close().await?;
329 
330     for a in allocations {
331         assert!(
332             a.close().await.is_err(),
333             "Allocation should be closed if lifetime timeout"
334         );
335     }
336 
337     Ok(())
338 }
339 
340 #[tokio::test]
test_delete_allocation_by_username() -> Result<()>341 async fn test_delete_allocation_by_username() -> Result<()> {
342     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
343 
344     let m = new_test_manager();
345 
346     let five_tuple1 = random_five_tuple();
347     let five_tuple2 = random_five_tuple();
348     let five_tuple3 = random_five_tuple();
349 
350     let _ = m
351         .create_allocation(
352             five_tuple1,
353             Arc::clone(&turn_socket),
354             0,
355             DEFAULT_LIFETIME,
356             TextAttribute::new(ATTR_USERNAME, "user".into()),
357         )
358         .await?;
359     let _ = m
360         .create_allocation(
361             five_tuple2,
362             Arc::clone(&turn_socket),
363             0,
364             DEFAULT_LIFETIME,
365             TextAttribute::new(ATTR_USERNAME, "user".into()),
366         )
367         .await?;
368     let _ = m
369         .create_allocation(
370             five_tuple3,
371             Arc::clone(&turn_socket),
372             0,
373             DEFAULT_LIFETIME,
374             TextAttribute::new(ATTR_USERNAME, "user2".into()),
375         )
376         .await?;
377 
378     assert_eq!(m.allocations.lock().await.len(), 3);
379 
380     m.delete_allocations_by_username("user").await;
381 
382     assert_eq!(m.allocations.lock().await.len(), 1);
383 
384     assert!(
385         m.get_allocation(&five_tuple1).await.is_none()
386             && m.get_allocation(&five_tuple2).await.is_none()
387             && m.get_allocation(&five_tuple3).await.is_some()
388     );
389 
390     Ok(())
391 }
392 
393 struct TestAuthHandler;
394 impl AuthHandler for TestAuthHandler {
auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>>395     fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> {
396         Ok(generate_auth_key(username, realm, "pass"))
397     }
398 }
399 
create_server( alloc_close_notify: Option<Sender<AllocationInfo>>, ) -> Result<(Server, u16)>400 async fn create_server(
401     alloc_close_notify: Option<Sender<AllocationInfo>>,
402 ) -> Result<(Server, u16)> {
403     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
404     let server_port = conn.local_addr()?.port();
405 
406     let server = Server::new(ServerConfig {
407         conn_configs: vec![ConnConfig {
408             conn,
409             relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
410                 relay_address: IpAddr::from_str("127.0.0.1")?,
411                 address: "0.0.0.0".to_owned(),
412                 net: Arc::new(Net::new(None)),
413             }),
414         }],
415         realm: "webrtc.rs".to_owned(),
416         auth_handler: Arc::new(TestAuthHandler {}),
417         channel_bind_timeout: Duration::from_secs(0),
418         alloc_close_notify,
419     })
420     .await?;
421 
422     Ok((server, server_port))
423 }
424 
create_client(username: String, server_port: u16) -> Result<Client>425 async fn create_client(username: String, server_port: u16) -> Result<Client> {
426     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
427 
428     Client::new(ClientConfig {
429         stun_serv_addr: format!("127.0.0.1:{server_port}"),
430         turn_serv_addr: format!("127.0.0.1:{server_port}"),
431         username,
432         password: "pass".to_owned(),
433         realm: String::new(),
434         software: String::new(),
435         rto_in_ms: 0,
436         conn,
437         vnet: None,
438     })
439     .await
440 }
441 
442 #[cfg(feature = "metrics")]
443 #[tokio::test]
test_get_allocations_info() -> Result<()>444 async fn test_get_allocations_info() -> Result<()> {
445     let (server, server_port) = create_server(None).await?;
446 
447     let client1 = create_client("user1".to_owned(), server_port).await?;
448     client1.listen().await?;
449 
450     let client2 = create_client("user2".to_owned(), server_port).await?;
451     client2.listen().await?;
452 
453     let client3 = create_client("user3".to_owned(), server_port).await?;
454     client3.listen().await?;
455 
456     assert!(server.get_allocations_info(None).await?.is_empty());
457 
458     let user1 = client1.allocate().await?;
459     let user2 = client2.allocate().await?;
460     let user3 = client3.allocate().await?;
461 
462     assert_eq!(server.get_allocations_info(None).await?.len(), 3);
463 
464     let addr1 = client1
465         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
466         .await?;
467     let addr2 = client2
468         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
469         .await?;
470     let addr3 = client3
471         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
472         .await?;
473 
474     user1.send_to(b"1", addr1).await?;
475     user2.send_to(b"12", addr2).await?;
476     user3.send_to(b"123", addr3).await?;
477 
478     tokio::time::sleep(Duration::from_millis(100)).await;
479 
480     server
481         .get_allocations_info(None)
482         .await?
483         .iter()
484         .for_each(|(_, ai)| match ai.username.as_str() {
485             "user1" => assert_eq!(ai.relayed_bytes, 1),
486             "user2" => assert_eq!(ai.relayed_bytes, 2),
487             "user3" => assert_eq!(ai.relayed_bytes, 3),
488             _ => unreachable!(),
489         });
490 
491     Ok(())
492 }
493 
494 #[cfg(feature = "metrics")]
495 #[tokio::test]
test_get_allocations_info_bytes_count() -> Result<()>496 async fn test_get_allocations_info_bytes_count() -> Result<()> {
497     let (server, server_port) = create_server(None).await?;
498 
499     let client = create_client("foo".to_owned(), server_port).await?;
500 
501     client.listen().await?;
502 
503     assert!(server.get_allocations_info(None).await?.is_empty());
504 
505     let conn = client.allocate().await?;
506     let addr = client
507         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
508         .await?;
509 
510     assert!(!server.get_allocations_info(None).await?.is_empty());
511 
512     assert_eq!(
513         server
514             .get_allocations_info(None)
515             .await?
516             .values()
517             .last()
518             .unwrap()
519             .relayed_bytes,
520         0
521     );
522 
523     for _ in 0..10 {
524         conn.send_to(b"Hello", addr).await?;
525 
526         tokio::time::sleep(Duration::from_millis(100)).await;
527     }
528 
529     tokio::time::sleep(Duration::from_millis(1000)).await;
530 
531     assert_eq!(
532         server
533             .get_allocations_info(None)
534             .await?
535             .values()
536             .last()
537             .unwrap()
538             .relayed_bytes,
539         50
540     );
541 
542     for _ in 0..10 {
543         conn.send_to(b"Hello", addr).await?;
544 
545         tokio::time::sleep(Duration::from_millis(100)).await;
546     }
547 
548     tokio::time::sleep(Duration::from_millis(1000)).await;
549 
550     assert_eq!(
551         server
552             .get_allocations_info(None)
553             .await?
554             .values()
555             .last()
556             .unwrap()
557             .relayed_bytes,
558         100
559     );
560 
561     client.close().await?;
562     server.close().await?;
563 
564     Ok(())
565 }
566 
567 #[cfg(feature = "metrics")]
568 #[tokio::test]
test_alloc_close_notify() -> Result<()>569 async fn test_alloc_close_notify() -> Result<()> {
570     let (tx, mut rx) = mpsc::channel::<AllocationInfo>(1);
571 
572     tokio::spawn(async move {
573         if let Some(alloc) = rx.recv().await {
574             assert_eq!(alloc.relayed_bytes, 50);
575         }
576     });
577 
578     let (server, server_port) = create_server(Some(tx)).await?;
579 
580     let client = create_client("foo".to_owned(), server_port).await?;
581 
582     client.listen().await?;
583 
584     assert!(server.get_allocations_info(None).await?.is_empty());
585 
586     let conn = client.allocate().await?;
587     let addr = client
588         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
589         .await?;
590 
591     assert!(!server.get_allocations_info(None).await?.is_empty());
592 
593     for _ in 0..10 {
594         conn.send_to(b"Hello", addr).await?;
595 
596         tokio::time::sleep(Duration::from_millis(100)).await;
597     }
598 
599     tokio::time::sleep(Duration::from_millis(1000)).await;
600 
601     client.close().await?;
602     server.close().await?;
603 
604     tokio::time::sleep(Duration::from_millis(1000)).await;
605 
606     Ok(())
607 }
608