1 use super::*;
2 
3 use crate::{
4     auth::{generate_auth_key, AuthHandler},
5     client::{Client, ClientConfig},
6     error::Result,
7     proto::lifetime::DEFAULT_LIFETIME,
8     relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic},
9     server::{
10         config::{ConnConfig, ServerConfig},
11         Server,
12     },
13 };
14 
15 use std::{
16     net::{IpAddr, Ipv4Addr},
17     str::FromStr,
18 };
19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
20 use tokio::net::UdpSocket;
21 use util::vnet::net::*;
22 
23 fn new_test_manager() -> Manager {
24     let config = ManagerConfig {
25         relay_addr_generator: Box::new(RelayAddressGeneratorNone {
26             address: "0.0.0.0".to_owned(),
27             net: Arc::new(Net::new(None)),
28         }),
29     };
30     Manager::new(config)
31 }
32 
33 fn random_five_tuple() -> FiveTuple {
34     /* #nosec */
35     FiveTuple {
36         src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
37         dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
38         ..Default::default()
39     }
40 }
41 
42 #[tokio::test]
43 async fn test_packet_handler() -> Result<()> {
44     //env_logger::init();
45 
46     // turn server initialization
47     let turn_socket = UdpSocket::bind("127.0.0.1:0").await?;
48 
49     // client listener initialization
50     let client_listener = UdpSocket::bind("127.0.0.1:0").await?;
51     let src_addr = client_listener.local_addr()?;
52     let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1);
53     // client listener read data
54     tokio::spawn(async move {
55         let mut buffer = vec![0u8; RTP_MTU];
56         loop {
57             let n = match client_listener.recv_from(&mut buffer).await {
58                 Ok((n, _)) => n,
59                 Err(_) => break,
60             };
61 
62             let _ = data_ch_tx.send(buffer[..n].to_vec()).await;
63         }
64     });
65 
66     let m = new_test_manager();
67     let a = m
68         .create_allocation(
69             FiveTuple {
70                 src_addr,
71                 dst_addr: turn_socket.local_addr()?,
72                 ..Default::default()
73             },
74             Arc::new(turn_socket),
75             0,
76             DEFAULT_LIFETIME,
77             TextAttribute::new(ATTR_USERNAME, "user".into()),
78         )
79         .await?;
80 
81     let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?;
82     let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?;
83 
84     let channel_bind = ChannelBind::new(
85         ChannelNumber(MIN_CHANNEL_NUMBER),
86         peer_listener2.local_addr()?,
87     );
88 
89     let port = {
90         // add permission with peer1 address
91         a.add_permission(Permission::new(peer_listener1.local_addr()?))
92             .await;
93         // add channel with min channel number and peer2 address
94         a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME)
95             .await?;
96 
97         a.relay_socket.local_addr()?.port()
98     };
99 
100     let relay_addr_with_host_str = format!("127.0.0.1:{port}");
101     let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?;
102 
103     // test for permission and data message
104     let target_text = "permission";
105     let _ = peer_listener1
106         .send_to(target_text.as_bytes(), relay_addr_with_host)
107         .await?;
108     let data = data_ch_rx
109         .recv()
110         .await
111         .ok_or(Error::Other("data ch closed".to_owned()))?;
112 
113     // resolve stun data message
114     assert!(is_message(&data), "should be stun message");
115 
116     let mut msg = Message::new();
117     msg.raw = data;
118     msg.decode()?;
119 
120     let mut msg_data = Data::default();
121     msg_data.get_from(&msg)?;
122     assert_eq!(
123         target_text.as_bytes(),
124         &msg_data.0,
125         "get message doesn't equal the target text"
126     );
127 
128     // test for channel bind and channel data
129     let target_text2 = "channel bind";
130     let _ = peer_listener2
131         .send_to(target_text2.as_bytes(), relay_addr_with_host)
132         .await?;
133     let data = data_ch_rx
134         .recv()
135         .await
136         .ok_or(Error::Other("data ch closed".to_owned()))?;
137 
138     // resolve channel data
139     assert!(
140         ChannelData::is_channel_data(&data),
141         "should be channel data"
142     );
143 
144     let mut channel_data = ChannelData {
145         raw: data,
146         ..Default::default()
147     };
148     channel_data.decode()?;
149     assert_eq!(
150         channel_bind.number, channel_data.number,
151         "get channel data's number is invalid"
152     );
153     assert_eq!(
154         target_text2.as_bytes(),
155         &channel_data.data,
156         "get data doesn't equal the target text."
157     );
158 
159     // listeners close
160     m.close().await?;
161 
162     Ok(())
163 }
164 
165 #[tokio::test]
166 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
167     //env_logger::init();
168 
169     // turn server initialization
170     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
171 
172     let m = new_test_manager();
173 
174     let five_tuple = random_five_tuple();
175 
176     let _ = m
177         .create_allocation(
178             five_tuple,
179             Arc::clone(&turn_socket),
180             0,
181             DEFAULT_LIFETIME,
182             TextAttribute::new(ATTR_USERNAME, "user".into()),
183         )
184         .await?;
185 
186     let result = m
187         .create_allocation(
188             five_tuple,
189             Arc::clone(&turn_socket),
190             0,
191             DEFAULT_LIFETIME,
192             TextAttribute::new(ATTR_USERNAME, "user".into()),
193         )
194         .await;
195     assert!(result.is_err(), "expected error, but got ok");
196 
197     Ok(())
198 }
199 
200 #[tokio::test]
201 async fn test_delete_allocation() -> Result<()> {
202     //env_logger::init();
203 
204     // turn server initialization
205     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
206 
207     let m = new_test_manager();
208 
209     let five_tuple = random_five_tuple();
210 
211     let _ = m
212         .create_allocation(
213             five_tuple,
214             Arc::clone(&turn_socket),
215             0,
216             DEFAULT_LIFETIME,
217             TextAttribute::new(ATTR_USERNAME, "user".into()),
218         )
219         .await?;
220 
221     assert!(
222         m.get_allocation(&five_tuple).await.is_some(),
223         "Failed to get allocation right after creation"
224     );
225 
226     m.delete_allocation(&five_tuple).await;
227 
228     assert!(
229         m.get_allocation(&five_tuple).await.is_none(),
230         "Get allocation with {five_tuple} should be nil after delete"
231     );
232 
233     Ok(())
234 }
235 
236 #[tokio::test]
237 async fn test_allocation_timeout() -> Result<()> {
238     //env_logger::init();
239 
240     // turn server initialization
241     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
242 
243     let m = new_test_manager();
244 
245     let mut allocations = vec![];
246     let lifetime = Duration::from_millis(100);
247 
248     for _ in 0..5 {
249         let five_tuple = random_five_tuple();
250 
251         let a = m
252             .create_allocation(
253                 five_tuple,
254                 Arc::clone(&turn_socket),
255                 0,
256                 lifetime,
257                 TextAttribute::new(ATTR_USERNAME, "user".into()),
258             )
259             .await?;
260 
261         allocations.push(a);
262     }
263 
264     let mut count = 0;
265 
266     'outer: loop {
267         count += 1;
268 
269         if count >= 10 {
270             panic!("Allocations didn't timeout");
271         }
272 
273         tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
274 
275         let any_outstanding = false;
276 
277         for a in &allocations {
278             if a.close().await.is_ok() {
279                 continue 'outer;
280             }
281         }
282 
283         if !any_outstanding {
284             return Ok(());
285         }
286     }
287 }
288 
289 #[tokio::test]
290 async fn test_manager_close() -> Result<()> {
291     // env_logger::init();
292 
293     // turn server initialization
294     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
295 
296     let m = new_test_manager();
297 
298     let mut allocations = vec![];
299 
300     let a1 = m
301         .create_allocation(
302             random_five_tuple(),
303             Arc::clone(&turn_socket),
304             0,
305             Duration::from_millis(100),
306             TextAttribute::new(ATTR_USERNAME, "user".into()),
307         )
308         .await?;
309     allocations.push(a1);
310 
311     let a2 = m
312         .create_allocation(
313             random_five_tuple(),
314             Arc::clone(&turn_socket),
315             0,
316             Duration::from_millis(200),
317             TextAttribute::new(ATTR_USERNAME, "user".into()),
318         )
319         .await?;
320     allocations.push(a2);
321 
322     tokio::time::sleep(Duration::from_millis(150)).await;
323 
324     log::trace!("Mgr is going to be closed...");
325 
326     m.close().await?;
327 
328     for a in allocations {
329         assert!(
330             a.close().await.is_err(),
331             "Allocation should be closed if lifetime timeout"
332         );
333     }
334 
335     Ok(())
336 }
337 
338 #[tokio::test]
339 async fn test_delete_allocation_by_username() -> Result<()> {
340     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
341 
342     let m = new_test_manager();
343 
344     let five_tuple1 = random_five_tuple();
345     let five_tuple2 = random_five_tuple();
346     let five_tuple3 = random_five_tuple();
347 
348     let _ = m
349         .create_allocation(
350             five_tuple1,
351             Arc::clone(&turn_socket),
352             0,
353             DEFAULT_LIFETIME,
354             TextAttribute::new(ATTR_USERNAME, "user".into()),
355         )
356         .await?;
357     let _ = m
358         .create_allocation(
359             five_tuple2,
360             Arc::clone(&turn_socket),
361             0,
362             DEFAULT_LIFETIME,
363             TextAttribute::new(ATTR_USERNAME, "user".into()),
364         )
365         .await?;
366     let _ = m
367         .create_allocation(
368             five_tuple3,
369             Arc::clone(&turn_socket),
370             0,
371             DEFAULT_LIFETIME,
372             TextAttribute::new(ATTR_USERNAME, "user2".into()),
373         )
374         .await?;
375 
376     assert_eq!(m.allocations.lock().await.len(), 3);
377 
378     m.delete_allocations_by_username("user").await;
379 
380     assert_eq!(m.allocations.lock().await.len(), 1);
381 
382     assert!(
383         m.get_allocation(&five_tuple1).await.is_none()
384             && m.get_allocation(&five_tuple2).await.is_none()
385             && m.get_allocation(&five_tuple3).await.is_some()
386     );
387 
388     Ok(())
389 }
390 
391 struct TestAuthHandler;
392 impl AuthHandler for TestAuthHandler {
393     fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> {
394         Ok(generate_auth_key(username, realm, "pass"))
395     }
396 }
397 
398 async fn create_server() -> Result<(Server, u16)> {
399     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
400     let server_port = conn.local_addr()?.port();
401 
402     let server = Server::new(ServerConfig {
403         conn_configs: vec![ConnConfig {
404             conn,
405             relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
406                 relay_address: IpAddr::from_str("127.0.0.1")?,
407                 address: "0.0.0.0".to_owned(),
408                 net: Arc::new(Net::new(None)),
409             }),
410         }],
411         realm: "webrtc.rs".to_owned(),
412         auth_handler: Arc::new(TestAuthHandler {}),
413         channel_bind_timeout: Duration::from_secs(0),
414     })
415     .await?;
416 
417     Ok((server, server_port))
418 }
419 
420 async fn create_client(username: String, server_port: u16) -> Result<Client> {
421     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
422 
423     Client::new(ClientConfig {
424         stun_serv_addr: format!("127.0.0.1:{server_port}"),
425         turn_serv_addr: format!("127.0.0.1:{server_port}"),
426         username,
427         password: "pass".to_owned(),
428         realm: String::new(),
429         software: String::new(),
430         rto_in_ms: 0,
431         conn,
432         vnet: None,
433     })
434     .await
435 }
436 
437 #[cfg(feature = "metrics")]
438 #[tokio::test]
439 async fn test_get_allocations_info() -> Result<()> {
440     let (server, server_port) = create_server().await?;
441 
442     let client1 = create_client("user1".to_owned(), server_port).await?;
443     client1.listen().await?;
444 
445     let client2 = create_client("user2".to_owned(), server_port).await?;
446     client2.listen().await?;
447 
448     let client3 = create_client("user3".to_owned(), server_port).await?;
449     client3.listen().await?;
450 
451     assert!(server.get_allocations_info(None).await?.is_empty());
452 
453     let user1 = client1.allocate().await?;
454     let user2 = client2.allocate().await?;
455     let user3 = client3.allocate().await?;
456 
457     assert_eq!(server.get_allocations_info(None).await?.len(), 3);
458 
459     let addr1 = client1
460         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
461         .await?;
462     let addr2 = client2
463         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
464         .await?;
465     let addr3 = client3
466         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
467         .await?;
468 
469     user1.send_to(b"1", addr1).await?;
470     user2.send_to(b"12", addr2).await?;
471     user3.send_to(b"123", addr3).await?;
472 
473     tokio::time::sleep(Duration::from_millis(100)).await;
474 
475     server
476         .get_allocations_info(None)
477         .await?
478         .iter()
479         .for_each(|(_, ai)| match ai.username.as_str() {
480             "user1" => assert_eq!(ai.relayed_bytes, 1),
481             "user2" => assert_eq!(ai.relayed_bytes, 2),
482             "user3" => assert_eq!(ai.relayed_bytes, 3),
483             _ => unreachable!(),
484         });
485 
486     Ok(())
487 }
488 
489 #[cfg(feature = "metrics")]
490 #[tokio::test]
491 async fn test_get_allocations_info_bytes_count() -> Result<()> {
492     let (server, server_port) = create_server().await?;
493 
494     let client = create_client("foo".to_owned(), server_port).await?;
495 
496     client.listen().await?;
497 
498     assert!(server.get_allocations_info(None).await?.is_empty());
499 
500     let conn = client.allocate().await?;
501     let addr = client
502         .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
503         .await?;
504 
505     assert!(!server.get_allocations_info(None).await?.is_empty());
506 
507     assert_eq!(
508         server
509             .get_allocations_info(None)
510             .await?
511             .values()
512             .last()
513             .unwrap()
514             .relayed_bytes,
515         0
516     );
517 
518     for _ in 0..10 {
519         conn.send_to(b"Hello", addr).await?;
520 
521         tokio::time::sleep(Duration::from_millis(100)).await;
522     }
523 
524     tokio::time::sleep(Duration::from_millis(1000)).await;
525 
526     assert_eq!(
527         server
528             .get_allocations_info(None)
529             .await?
530             .values()
531             .last()
532             .unwrap()
533             .relayed_bytes,
534         50
535     );
536 
537     for _ in 0..10 {
538         conn.send_to(b"Hello", addr).await?;
539 
540         tokio::time::sleep(Duration::from_millis(100)).await;
541     }
542 
543     tokio::time::sleep(Duration::from_millis(1000)).await;
544 
545     assert_eq!(
546         server
547             .get_allocations_info(None)
548             .await?
549             .values()
550             .last()
551             .unwrap()
552             .relayed_bytes,
553         100
554     );
555 
556     client.close().await?;
557     server.close().await?;
558 
559     Ok(())
560 }
561