1 use super::*; 2 3 use crate::{ 4 auth::{generate_auth_key, AuthHandler}, 5 client::{Client, ClientConfig}, 6 error::Result, 7 proto::lifetime::DEFAULT_LIFETIME, 8 relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic}, 9 server::{ 10 config::{ConnConfig, ServerConfig}, 11 Server, 12 }, 13 }; 14 15 use std::{ 16 net::{IpAddr, Ipv4Addr}, 17 str::FromStr, 18 }; 19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute}; 20 use tokio::net::UdpSocket; 21 use util::vnet::net::*; 22 23 fn new_test_manager() -> Manager { 24 let config = ManagerConfig { 25 relay_addr_generator: Box::new(RelayAddressGeneratorNone { 26 address: "0.0.0.0".to_owned(), 27 net: Arc::new(Net::new(None)), 28 }), 29 }; 30 Manager::new(config) 31 } 32 33 fn random_five_tuple() -> FiveTuple { 34 /* #nosec */ 35 FiveTuple { 36 src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 37 dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 38 ..Default::default() 39 } 40 } 41 42 #[tokio::test] 43 async fn test_packet_handler() -> Result<()> { 44 //env_logger::init(); 45 46 // turn server initialization 47 let turn_socket = UdpSocket::bind("127.0.0.1:0").await?; 48 49 // client listener initialization 50 let client_listener = UdpSocket::bind("127.0.0.1:0").await?; 51 let src_addr = client_listener.local_addr()?; 52 let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1); 53 // client listener read data 54 tokio::spawn(async move { 55 let mut buffer = vec![0u8; RTP_MTU]; 56 loop { 57 let n = match client_listener.recv_from(&mut buffer).await { 58 Ok((n, _)) => n, 59 Err(_) => break, 60 }; 61 62 let _ = data_ch_tx.send(buffer[..n].to_vec()).await; 63 } 64 }); 65 66 let m = new_test_manager(); 67 let a = m 68 .create_allocation( 69 FiveTuple { 70 src_addr, 71 dst_addr: turn_socket.local_addr()?, 72 ..Default::default() 73 }, 74 Arc::new(turn_socket), 75 0, 76 DEFAULT_LIFETIME, 77 TextAttribute::new(ATTR_USERNAME, "user".into()), 78 ) 79 .await?; 80 81 let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?; 82 let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?; 83 84 let channel_bind = ChannelBind::new( 85 ChannelNumber(MIN_CHANNEL_NUMBER), 86 peer_listener2.local_addr()?, 87 ); 88 89 let port = { 90 // add permission with peer1 address 91 a.add_permission(Permission::new(peer_listener1.local_addr()?)) 92 .await; 93 // add channel with min channel number and peer2 address 94 a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME) 95 .await?; 96 97 a.relay_socket.local_addr()?.port() 98 }; 99 100 let relay_addr_with_host_str = format!("127.0.0.1:{port}"); 101 let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?; 102 103 // test for permission and data message 104 let target_text = "permission"; 105 let _ = peer_listener1 106 .send_to(target_text.as_bytes(), relay_addr_with_host) 107 .await?; 108 let data = data_ch_rx 109 .recv() 110 .await 111 .ok_or(Error::Other("data ch closed".to_owned()))?; 112 113 // resolve stun data message 114 assert!(is_message(&data), "should be stun message"); 115 116 let mut msg = Message::new(); 117 msg.raw = data; 118 msg.decode()?; 119 120 let mut msg_data = Data::default(); 121 msg_data.get_from(&msg)?; 122 assert_eq!( 123 target_text.as_bytes(), 124 &msg_data.0, 125 "get message doesn't equal the target text" 126 ); 127 128 // test for channel bind and channel data 129 let target_text2 = "channel bind"; 130 let _ = peer_listener2 131 .send_to(target_text2.as_bytes(), relay_addr_with_host) 132 .await?; 133 let data = data_ch_rx 134 .recv() 135 .await 136 .ok_or(Error::Other("data ch closed".to_owned()))?; 137 138 // resolve channel data 139 assert!( 140 ChannelData::is_channel_data(&data), 141 "should be channel data" 142 ); 143 144 let mut channel_data = ChannelData { 145 raw: data, 146 ..Default::default() 147 }; 148 channel_data.decode()?; 149 assert_eq!( 150 channel_bind.number, channel_data.number, 151 "get channel data's number is invalid" 152 ); 153 assert_eq!( 154 target_text2.as_bytes(), 155 &channel_data.data, 156 "get data doesn't equal the target text." 157 ); 158 159 // listeners close 160 m.close().await?; 161 162 Ok(()) 163 } 164 165 #[tokio::test] 166 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> { 167 //env_logger::init(); 168 169 // turn server initialization 170 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 171 172 let m = new_test_manager(); 173 174 let five_tuple = random_five_tuple(); 175 176 let _ = m 177 .create_allocation( 178 five_tuple, 179 Arc::clone(&turn_socket), 180 0, 181 DEFAULT_LIFETIME, 182 TextAttribute::new(ATTR_USERNAME, "user".into()), 183 ) 184 .await?; 185 186 let result = m 187 .create_allocation( 188 five_tuple, 189 Arc::clone(&turn_socket), 190 0, 191 DEFAULT_LIFETIME, 192 TextAttribute::new(ATTR_USERNAME, "user".into()), 193 ) 194 .await; 195 assert!(result.is_err(), "expected error, but got ok"); 196 197 Ok(()) 198 } 199 200 #[tokio::test] 201 async fn test_delete_allocation() -> Result<()> { 202 //env_logger::init(); 203 204 // turn server initialization 205 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 206 207 let m = new_test_manager(); 208 209 let five_tuple = random_five_tuple(); 210 211 let _ = m 212 .create_allocation( 213 five_tuple, 214 Arc::clone(&turn_socket), 215 0, 216 DEFAULT_LIFETIME, 217 TextAttribute::new(ATTR_USERNAME, "user".into()), 218 ) 219 .await?; 220 221 assert!( 222 m.get_allocation(&five_tuple).await.is_some(), 223 "Failed to get allocation right after creation" 224 ); 225 226 m.delete_allocation(&five_tuple).await; 227 228 assert!( 229 m.get_allocation(&five_tuple).await.is_none(), 230 "Get allocation with {five_tuple} should be nil after delete" 231 ); 232 233 Ok(()) 234 } 235 236 #[tokio::test] 237 async fn test_allocation_timeout() -> Result<()> { 238 //env_logger::init(); 239 240 // turn server initialization 241 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 242 243 let m = new_test_manager(); 244 245 let mut allocations = vec![]; 246 let lifetime = Duration::from_millis(100); 247 248 for _ in 0..5 { 249 let five_tuple = random_five_tuple(); 250 251 let a = m 252 .create_allocation( 253 five_tuple, 254 Arc::clone(&turn_socket), 255 0, 256 lifetime, 257 TextAttribute::new(ATTR_USERNAME, "user".into()), 258 ) 259 .await?; 260 261 allocations.push(a); 262 } 263 264 let mut count = 0; 265 266 'outer: loop { 267 count += 1; 268 269 if count >= 10 { 270 panic!("Allocations didn't timeout"); 271 } 272 273 tokio::time::sleep(lifetime + Duration::from_millis(100)).await; 274 275 let any_outstanding = false; 276 277 for a in &allocations { 278 if a.close().await.is_ok() { 279 continue 'outer; 280 } 281 } 282 283 if !any_outstanding { 284 return Ok(()); 285 } 286 } 287 } 288 289 #[tokio::test] 290 async fn test_manager_close() -> Result<()> { 291 // env_logger::init(); 292 293 // turn server initialization 294 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 295 296 let m = new_test_manager(); 297 298 let mut allocations = vec![]; 299 300 let a1 = m 301 .create_allocation( 302 random_five_tuple(), 303 Arc::clone(&turn_socket), 304 0, 305 Duration::from_millis(100), 306 TextAttribute::new(ATTR_USERNAME, "user".into()), 307 ) 308 .await?; 309 allocations.push(a1); 310 311 let a2 = m 312 .create_allocation( 313 random_five_tuple(), 314 Arc::clone(&turn_socket), 315 0, 316 Duration::from_millis(200), 317 TextAttribute::new(ATTR_USERNAME, "user".into()), 318 ) 319 .await?; 320 allocations.push(a2); 321 322 tokio::time::sleep(Duration::from_millis(150)).await; 323 324 log::trace!("Mgr is going to be closed..."); 325 326 m.close().await?; 327 328 for a in allocations { 329 assert!( 330 a.close().await.is_err(), 331 "Allocation should be closed if lifetime timeout" 332 ); 333 } 334 335 Ok(()) 336 } 337 338 #[tokio::test] 339 async fn test_delete_allocation_by_username() -> Result<()> { 340 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 341 342 let m = new_test_manager(); 343 344 let five_tuple1 = random_five_tuple(); 345 let five_tuple2 = random_five_tuple(); 346 let five_tuple3 = random_five_tuple(); 347 348 let _ = m 349 .create_allocation( 350 five_tuple1, 351 Arc::clone(&turn_socket), 352 0, 353 DEFAULT_LIFETIME, 354 TextAttribute::new(ATTR_USERNAME, "user".into()), 355 ) 356 .await?; 357 let _ = m 358 .create_allocation( 359 five_tuple2, 360 Arc::clone(&turn_socket), 361 0, 362 DEFAULT_LIFETIME, 363 TextAttribute::new(ATTR_USERNAME, "user".into()), 364 ) 365 .await?; 366 let _ = m 367 .create_allocation( 368 five_tuple3, 369 Arc::clone(&turn_socket), 370 0, 371 DEFAULT_LIFETIME, 372 TextAttribute::new(ATTR_USERNAME, "user2".into()), 373 ) 374 .await?; 375 376 assert_eq!(m.allocations.lock().await.len(), 3); 377 378 m.delete_allocations_by_username("user").await; 379 380 assert_eq!(m.allocations.lock().await.len(), 1); 381 382 assert!( 383 m.get_allocation(&five_tuple1).await.is_none() 384 && m.get_allocation(&five_tuple2).await.is_none() 385 && m.get_allocation(&five_tuple3).await.is_some() 386 ); 387 388 Ok(()) 389 } 390 391 struct TestAuthHandler; 392 impl AuthHandler for TestAuthHandler { 393 fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> { 394 Ok(generate_auth_key(username, realm, "pass")) 395 } 396 } 397 398 async fn create_server() -> Result<(Server, u16)> { 399 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 400 let server_port = conn.local_addr()?.port(); 401 402 let server = Server::new(ServerConfig { 403 conn_configs: vec![ConnConfig { 404 conn, 405 relay_addr_generator: Box::new(RelayAddressGeneratorStatic { 406 relay_address: IpAddr::from_str("127.0.0.1")?, 407 address: "0.0.0.0".to_owned(), 408 net: Arc::new(Net::new(None)), 409 }), 410 }], 411 realm: "webrtc.rs".to_owned(), 412 auth_handler: Arc::new(TestAuthHandler {}), 413 channel_bind_timeout: Duration::from_secs(0), 414 }) 415 .await?; 416 417 Ok((server, server_port)) 418 } 419 420 async fn create_client(username: String, server_port: u16) -> Result<Client> { 421 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 422 423 Client::new(ClientConfig { 424 stun_serv_addr: format!("127.0.0.1:{server_port}"), 425 turn_serv_addr: format!("127.0.0.1:{server_port}"), 426 username, 427 password: "pass".to_owned(), 428 realm: String::new(), 429 software: String::new(), 430 rto_in_ms: 0, 431 conn, 432 vnet: None, 433 }) 434 .await 435 } 436 437 #[cfg(feature = "metrics")] 438 #[tokio::test] 439 async fn test_get_allocations_info() -> Result<()> { 440 let (server, server_port) = create_server().await?; 441 442 let client1 = create_client("user1".to_owned(), server_port).await?; 443 client1.listen().await?; 444 445 let client2 = create_client("user2".to_owned(), server_port).await?; 446 client2.listen().await?; 447 448 let client3 = create_client("user3".to_owned(), server_port).await?; 449 client3.listen().await?; 450 451 assert!(server.get_allocations_info(None).await?.is_empty()); 452 453 let user1 = client1.allocate().await?; 454 let user2 = client2.allocate().await?; 455 let user3 = client3.allocate().await?; 456 457 assert_eq!(server.get_allocations_info(None).await?.len(), 3); 458 459 let addr1 = client1 460 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str()) 461 .await?; 462 let addr2 = client2 463 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str()) 464 .await?; 465 let addr3 = client3 466 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str()) 467 .await?; 468 469 user1.send_to(b"1", addr1).await?; 470 user2.send_to(b"12", addr2).await?; 471 user3.send_to(b"123", addr3).await?; 472 473 tokio::time::sleep(Duration::from_millis(100)).await; 474 475 server 476 .get_allocations_info(None) 477 .await? 478 .iter() 479 .for_each(|(_, ai)| match ai.username.as_str() { 480 "user1" => assert_eq!(ai.relayed_bytes, 1), 481 "user2" => assert_eq!(ai.relayed_bytes, 2), 482 "user3" => assert_eq!(ai.relayed_bytes, 3), 483 _ => unreachable!(), 484 }); 485 486 Ok(()) 487 } 488 489 #[cfg(feature = "metrics")] 490 #[tokio::test] 491 async fn test_get_allocations_info_bytes_count() -> Result<()> { 492 let (server, server_port) = create_server().await?; 493 494 let client = create_client("foo".to_owned(), server_port).await?; 495 496 client.listen().await?; 497 498 assert!(server.get_allocations_info(None).await?.is_empty()); 499 500 let conn = client.allocate().await?; 501 let addr = client 502 .send_binding_request_to(format!("127.0.0.1:{server_port}").as_str()) 503 .await?; 504 505 assert!(!server.get_allocations_info(None).await?.is_empty()); 506 507 assert_eq!( 508 server 509 .get_allocations_info(None) 510 .await? 511 .values() 512 .last() 513 .unwrap() 514 .relayed_bytes, 515 0 516 ); 517 518 for _ in 0..10 { 519 conn.send_to(b"Hello", addr).await?; 520 521 tokio::time::sleep(Duration::from_millis(100)).await; 522 } 523 524 tokio::time::sleep(Duration::from_millis(1000)).await; 525 526 assert_eq!( 527 server 528 .get_allocations_info(None) 529 .await? 530 .values() 531 .last() 532 .unwrap() 533 .relayed_bytes, 534 50 535 ); 536 537 for _ in 0..10 { 538 conn.send_to(b"Hello", addr).await?; 539 540 tokio::time::sleep(Duration::from_millis(100)).await; 541 } 542 543 tokio::time::sleep(Duration::from_millis(1000)).await; 544 545 assert_eq!( 546 server 547 .get_allocations_info(None) 548 .await? 549 .values() 550 .last() 551 .unwrap() 552 .relayed_bytes, 553 100 554 ); 555 556 client.close().await?; 557 server.close().await?; 558 559 Ok(()) 560 } 561