1 use super::*;
2 
3 use crate::{
4     auth::{generate_auth_key, AuthHandler},
5     client::{Client, ClientConfig},
6     error::Result,
7     proto::lifetime::DEFAULT_LIFETIME,
8     relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic},
9     server::{
10         config::{ConnConfig, ServerConfig},
11         Server,
12     },
13 };
14 
15 use std::{
16     net::{IpAddr, Ipv4Addr},
17     str::FromStr,
18 };
19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
20 use tokio::net::UdpSocket;
21 use util::vnet::net::*;
22 
23 fn new_test_manager() -> Manager {
24     let config = ManagerConfig {
25         relay_addr_generator: Box::new(RelayAddressGeneratorNone {
26             address: "0.0.0.0".to_owned(),
27             net: Arc::new(Net::new(None)),
28         }),
29     };
30     Manager::new(config)
31 }
32 
33 fn random_five_tuple() -> FiveTuple {
34     /* #nosec */
35     FiveTuple {
36         src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
37         dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
38         ..Default::default()
39     }
40 }
41 
42 #[tokio::test]
43 async fn test_packet_handler() -> Result<()> {
44     //env_logger::init();
45 
46     // turn server initialization
47     let turn_socket = UdpSocket::bind("127.0.0.1:0").await?;
48 
49     // client listener initialization
50     let client_listener = UdpSocket::bind("127.0.0.1:0").await?;
51     let src_addr = client_listener.local_addr()?;
52     let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1);
53     // client listener read data
54     tokio::spawn(async move {
55         let mut buffer = vec![0u8; RTP_MTU];
56         loop {
57             let n = match client_listener.recv_from(&mut buffer).await {
58                 Ok((n, _)) => n,
59                 Err(_) => break,
60             };
61 
62             let _ = data_ch_tx.send(buffer[..n].to_vec()).await;
63         }
64     });
65 
66     let m = new_test_manager();
67     let a = m
68         .create_allocation(
69             FiveTuple {
70                 src_addr,
71                 dst_addr: turn_socket.local_addr()?,
72                 ..Default::default()
73             },
74             Arc::new(turn_socket),
75             0,
76             DEFAULT_LIFETIME,
77             TextAttribute::new(ATTR_USERNAME, "user".into()),
78         )
79         .await?;
80 
81     let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?;
82     let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?;
83 
84     let channel_bind = ChannelBind::new(
85         ChannelNumber(MIN_CHANNEL_NUMBER),
86         peer_listener2.local_addr()?,
87     );
88 
89     let port = {
90         // add permission with peer1 address
91         a.add_permission(Permission::new(peer_listener1.local_addr()?))
92             .await;
93         // add channel with min channel number and peer2 address
94         a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME)
95             .await?;
96 
97         a.relay_socket.local_addr()?.port()
98     };
99 
100     let relay_addr_with_host_str = format!("127.0.0.1:{}", port);
101     let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?;
102 
103     // test for permission and data message
104     let target_text = "permission";
105     let _ = peer_listener1
106         .send_to(target_text.as_bytes(), relay_addr_with_host)
107         .await?;
108     let data = data_ch_rx
109         .recv()
110         .await
111         .ok_or(Error::Other("data ch closed".to_owned()))?;
112 
113     // resolve stun data message
114     assert!(is_message(&data), "should be stun message");
115 
116     let mut msg = Message::new();
117     msg.raw = data;
118     msg.decode()?;
119 
120     let mut msg_data = Data::default();
121     msg_data.get_from(&msg)?;
122     assert_eq!(
123         target_text.as_bytes(),
124         &msg_data.0,
125         "get message doesn't equal the target text"
126     );
127 
128     // test for channel bind and channel data
129     let target_text2 = "channel bind";
130     let _ = peer_listener2
131         .send_to(target_text2.as_bytes(), relay_addr_with_host)
132         .await?;
133     let data = data_ch_rx
134         .recv()
135         .await
136         .ok_or(Error::Other("data ch closed".to_owned()))?;
137 
138     // resolve channel data
139     assert!(
140         ChannelData::is_channel_data(&data),
141         "should be channel data"
142     );
143 
144     let mut channel_data = ChannelData {
145         raw: data,
146         ..Default::default()
147     };
148     channel_data.decode()?;
149     assert_eq!(
150         channel_bind.number, channel_data.number,
151         "get channel data's number is invalid"
152     );
153     assert_eq!(
154         target_text2.as_bytes(),
155         &channel_data.data,
156         "get data doesn't equal the target text."
157     );
158 
159     // listeners close
160     m.close().await?;
161 
162     Ok(())
163 }
164 
165 #[tokio::test]
166 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
167     //env_logger::init();
168 
169     // turn server initialization
170     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
171 
172     let m = new_test_manager();
173 
174     let five_tuple = random_five_tuple();
175 
176     let _ = m
177         .create_allocation(
178             five_tuple,
179             Arc::clone(&turn_socket),
180             0,
181             DEFAULT_LIFETIME,
182             TextAttribute::new(ATTR_USERNAME, "user".into()),
183         )
184         .await?;
185 
186     let result = m
187         .create_allocation(
188             five_tuple,
189             Arc::clone(&turn_socket),
190             0,
191             DEFAULT_LIFETIME,
192             TextAttribute::new(ATTR_USERNAME, "user".into()),
193         )
194         .await;
195     assert!(result.is_err(), "expected error, but got ok");
196 
197     Ok(())
198 }
199 
200 #[tokio::test]
201 async fn test_delete_allocation() -> Result<()> {
202     //env_logger::init();
203 
204     // turn server initialization
205     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
206 
207     let m = new_test_manager();
208 
209     let five_tuple = random_five_tuple();
210 
211     let _ = m
212         .create_allocation(
213             five_tuple,
214             Arc::clone(&turn_socket),
215             0,
216             DEFAULT_LIFETIME,
217             TextAttribute::new(ATTR_USERNAME, "user".into()),
218         )
219         .await?;
220 
221     assert!(
222         m.get_allocation(&five_tuple).await.is_some(),
223         "Failed to get allocation right after creation"
224     );
225 
226     m.delete_allocation(&five_tuple).await;
227 
228     assert!(
229         m.get_allocation(&five_tuple).await.is_none(),
230         "Get allocation with {} should be nil after delete",
231         five_tuple
232     );
233 
234     Ok(())
235 }
236 
237 #[tokio::test]
238 async fn test_allocation_timeout() -> Result<()> {
239     //env_logger::init();
240 
241     // turn server initialization
242     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
243 
244     let m = new_test_manager();
245 
246     let mut allocations = vec![];
247     let lifetime = Duration::from_millis(100);
248 
249     for _ in 0..5 {
250         let five_tuple = random_five_tuple();
251 
252         let a = m
253             .create_allocation(
254                 five_tuple,
255                 Arc::clone(&turn_socket),
256                 0,
257                 lifetime,
258                 TextAttribute::new(ATTR_USERNAME, "user".into()),
259             )
260             .await?;
261 
262         allocations.push(a);
263     }
264 
265     let mut count = 0;
266 
267     'outer: loop {
268         count += 1;
269 
270         if count >= 10 {
271             assert!(false, "Allocations didn't timeout");
272         }
273 
274         tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
275 
276         let any_outstanding = false;
277 
278         for a in &allocations {
279             if a.close().await.is_ok() {
280                 continue 'outer;
281             }
282         }
283 
284         if !any_outstanding {
285             return Ok(());
286         }
287     }
288 }
289 
290 #[tokio::test]
291 async fn test_manager_close() -> Result<()> {
292     // env_logger::init();
293 
294     // turn server initialization
295     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
296 
297     let m = new_test_manager();
298 
299     let mut allocations = vec![];
300 
301     let a1 = m
302         .create_allocation(
303             random_five_tuple(),
304             Arc::clone(&turn_socket),
305             0,
306             Duration::from_millis(100),
307             TextAttribute::new(ATTR_USERNAME, "user".into()),
308         )
309         .await?;
310     allocations.push(a1);
311 
312     let a2 = m
313         .create_allocation(
314             random_five_tuple(),
315             Arc::clone(&turn_socket),
316             0,
317             Duration::from_millis(200),
318             TextAttribute::new(ATTR_USERNAME, "user".into()),
319         )
320         .await?;
321     allocations.push(a2);
322 
323     tokio::time::sleep(Duration::from_millis(150)).await;
324 
325     log::trace!("Mgr is going to be closed...");
326 
327     m.close().await?;
328 
329     for a in allocations {
330         assert!(
331             a.close().await.is_err(),
332             "Allocation should be closed if lifetime timeout"
333         );
334     }
335 
336     Ok(())
337 }
338 
339 #[tokio::test]
340 async fn test_delete_allocation_by_username() -> Result<()> {
341     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
342 
343     let m = new_test_manager();
344 
345     let five_tuple1 = random_five_tuple();
346     let five_tuple2 = random_five_tuple();
347     let five_tuple3 = random_five_tuple();
348 
349     let _ = m
350         .create_allocation(
351             five_tuple1,
352             Arc::clone(&turn_socket),
353             0,
354             DEFAULT_LIFETIME,
355             TextAttribute::new(ATTR_USERNAME, "user".into()),
356         )
357         .await?;
358     let _ = m
359         .create_allocation(
360             five_tuple2,
361             Arc::clone(&turn_socket),
362             0,
363             DEFAULT_LIFETIME,
364             TextAttribute::new(ATTR_USERNAME, "user".into()),
365         )
366         .await?;
367     let _ = m
368         .create_allocation(
369             five_tuple3,
370             Arc::clone(&turn_socket),
371             0,
372             DEFAULT_LIFETIME,
373             TextAttribute::new(ATTR_USERNAME, "user2".into()),
374         )
375         .await?;
376 
377     assert_eq!(m.allocations.lock().await.len(), 3);
378 
379     m.delete_allocations_by_username("user").await;
380 
381     assert_eq!(m.allocations.lock().await.len(), 1);
382 
383     assert!(
384         m.get_allocation(&five_tuple1).await.is_none()
385             && m.get_allocation(&five_tuple2).await.is_none()
386             && m.get_allocation(&five_tuple3).await.is_some()
387     );
388 
389     Ok(())
390 }
391 
392 struct TestAuthHandler;
393 impl AuthHandler for TestAuthHandler {
394     fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> {
395         Ok(generate_auth_key(username, realm, "pass"))
396     }
397 }
398 
399 async fn create_server() -> Result<(Server, u16)> {
400     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
401     let server_port = conn.local_addr()?.port();
402 
403     let server = Server::new(ServerConfig {
404         conn_configs: vec![ConnConfig {
405             conn,
406             relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
407                 relay_address: IpAddr::from_str("127.0.0.1")?,
408                 address: "0.0.0.0".to_owned(),
409                 net: Arc::new(Net::new(None)),
410             }),
411         }],
412         realm: "webrtc.rs".to_owned(),
413         auth_handler: Arc::new(TestAuthHandler {}),
414         channel_bind_timeout: Duration::from_secs(0),
415     })
416     .await?;
417 
418     Ok((server, server_port))
419 }
420 
421 async fn create_client(username: String, server_port: u16) -> Result<Client> {
422     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
423 
424     Client::new(ClientConfig {
425         stun_serv_addr: format!("127.0.0.1:{}", server_port),
426         turn_serv_addr: format!("127.0.0.1:{}", server_port),
427         username,
428         password: "pass".to_owned(),
429         realm: String::new(),
430         software: String::new(),
431         rto_in_ms: 0,
432         conn,
433         vnet: None,
434     })
435     .await
436 }
437 
438 #[cfg(feature = "metrics")]
439 #[tokio::test]
440 async fn test_get_allocations_info() -> Result<()> {
441     let (server, server_port) = create_server().await?;
442 
443     let client1 = create_client("user1".to_owned(), server_port).await?;
444     client1.listen().await?;
445 
446     let client2 = create_client("user2".to_owned(), server_port).await?;
447     client2.listen().await?;
448 
449     let client3 = create_client("user3".to_owned(), server_port).await?;
450     client3.listen().await?;
451 
452     assert!(server.get_allocations_info(None).await?.is_empty());
453 
454     let user1 = client1.allocate().await?;
455     let user2 = client2.allocate().await?;
456     let user3 = client3.allocate().await?;
457 
458     assert_eq!(server.get_allocations_info(None).await?.len(), 3);
459 
460     let addr1 = client1
461         .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
462         .await?;
463     let addr2 = client2
464         .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
465         .await?;
466     let addr3 = client3
467         .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
468         .await?;
469 
470     user1.send_to(b"1", addr1).await?;
471     user2.send_to(b"12", addr2).await?;
472     user3.send_to(b"123", addr3).await?;
473 
474     tokio::time::sleep(Duration::from_millis(100)).await;
475 
476     server
477         .get_allocations_info(None)
478         .await?
479         .iter()
480         .for_each(|(_, ai)| match ai.username.as_str() {
481             "user1" => assert_eq!(ai.relayed_bytes, 1),
482             "user2" => assert_eq!(ai.relayed_bytes, 2),
483             "user3" => assert_eq!(ai.relayed_bytes, 3),
484             _ => unreachable!(),
485         });
486 
487     Ok(())
488 }
489 
490 #[cfg(feature = "metrics")]
491 #[tokio::test]
492 async fn test_get_allocations_info_bytes_count() -> Result<()> {
493     let (server, server_port) = create_server().await?;
494 
495     let client = create_client("foo".to_owned(), server_port).await?;
496 
497     client.listen().await?;
498 
499     assert!(server.get_allocations_info(None).await?.is_empty());
500 
501     let conn = client.allocate().await?;
502     let addr = client
503         .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str())
504         .await?;
505 
506     assert!(!server.get_allocations_info(None).await?.is_empty());
507 
508     assert_eq!(
509         server
510             .get_allocations_info(None)
511             .await?
512             .values()
513             .last()
514             .unwrap()
515             .relayed_bytes,
516         0
517     );
518 
519     for _ in 0..10 {
520         conn.send_to(b"Hello", addr).await?;
521 
522         tokio::time::sleep(Duration::from_millis(100)).await;
523     }
524 
525     tokio::time::sleep(Duration::from_millis(1000)).await;
526 
527     assert_eq!(
528         server
529             .get_allocations_info(None)
530             .await?
531             .values()
532             .last()
533             .unwrap()
534             .relayed_bytes,
535         50
536     );
537 
538     for _ in 0..10 {
539         conn.send_to(b"Hello", addr).await?;
540 
541         tokio::time::sleep(Duration::from_millis(100)).await;
542     }
543 
544     tokio::time::sleep(Duration::from_millis(1000)).await;
545 
546     assert_eq!(
547         server
548             .get_allocations_info(None)
549             .await?
550             .values()
551             .last()
552             .unwrap()
553             .relayed_bytes,
554         100
555     );
556 
557     client.close().await?;
558     server.close().await?;
559 
560     Ok(())
561 }
562