1 use super::*; 2 3 use crate::{ 4 auth::{generate_auth_key, AuthHandler}, 5 client::{Client, ClientConfig}, 6 error::Result, 7 proto::lifetime::DEFAULT_LIFETIME, 8 relay::{relay_none::*, relay_static::RelayAddressGeneratorStatic}, 9 server::{ 10 config::{ConnConfig, ServerConfig}, 11 Server, 12 }, 13 }; 14 15 use std::{ 16 net::{IpAddr, Ipv4Addr}, 17 str::FromStr, 18 }; 19 use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute}; 20 use tokio::net::UdpSocket; 21 use util::vnet::net::*; 22 23 fn new_test_manager() -> Manager { 24 let config = ManagerConfig { 25 relay_addr_generator: Box::new(RelayAddressGeneratorNone { 26 address: "0.0.0.0".to_owned(), 27 net: Arc::new(Net::new(None)), 28 }), 29 }; 30 Manager::new(config) 31 } 32 33 fn random_five_tuple() -> FiveTuple { 34 /* #nosec */ 35 FiveTuple { 36 src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 37 dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 38 ..Default::default() 39 } 40 } 41 42 #[tokio::test] 43 async fn test_packet_handler() -> Result<()> { 44 //env_logger::init(); 45 46 // turn server initialization 47 let turn_socket = UdpSocket::bind("127.0.0.1:0").await?; 48 49 // client listener initialization 50 let client_listener = UdpSocket::bind("127.0.0.1:0").await?; 51 let src_addr = client_listener.local_addr()?; 52 let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1); 53 // client listener read data 54 tokio::spawn(async move { 55 let mut buffer = vec![0u8; RTP_MTU]; 56 loop { 57 let n = match client_listener.recv_from(&mut buffer).await { 58 Ok((n, _)) => n, 59 Err(_) => break, 60 }; 61 62 let _ = data_ch_tx.send(buffer[..n].to_vec()).await; 63 } 64 }); 65 66 let m = new_test_manager(); 67 let a = m 68 .create_allocation( 69 FiveTuple { 70 src_addr, 71 dst_addr: turn_socket.local_addr()?, 72 ..Default::default() 73 }, 74 Arc::new(turn_socket), 75 0, 76 DEFAULT_LIFETIME, 77 TextAttribute::new(ATTR_USERNAME, "user".into()), 78 ) 79 .await?; 80 81 let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?; 82 let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?; 83 84 let channel_bind = ChannelBind::new( 85 ChannelNumber(MIN_CHANNEL_NUMBER), 86 peer_listener2.local_addr()?, 87 ); 88 89 let port = { 90 // add permission with peer1 address 91 a.add_permission(Permission::new(peer_listener1.local_addr()?)) 92 .await; 93 // add channel with min channel number and peer2 address 94 a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME) 95 .await?; 96 97 a.relay_socket.local_addr()?.port() 98 }; 99 100 let relay_addr_with_host_str = format!("127.0.0.1:{}", port); 101 let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?; 102 103 // test for permission and data message 104 let target_text = "permission"; 105 let _ = peer_listener1 106 .send_to(target_text.as_bytes(), relay_addr_with_host) 107 .await?; 108 let data = data_ch_rx 109 .recv() 110 .await 111 .ok_or(Error::Other("data ch closed".to_owned()))?; 112 113 // resolve stun data message 114 assert!(is_message(&data), "should be stun message"); 115 116 let mut msg = Message::new(); 117 msg.raw = data; 118 msg.decode()?; 119 120 let mut msg_data = Data::default(); 121 msg_data.get_from(&msg)?; 122 assert_eq!( 123 target_text.as_bytes(), 124 &msg_data.0, 125 "get message doesn't equal the target text" 126 ); 127 128 // test for channel bind and channel data 129 let target_text2 = "channel bind"; 130 let _ = peer_listener2 131 .send_to(target_text2.as_bytes(), relay_addr_with_host) 132 .await?; 133 let data = data_ch_rx 134 .recv() 135 .await 136 .ok_or(Error::Other("data ch closed".to_owned()))?; 137 138 // resolve channel data 139 assert!( 140 ChannelData::is_channel_data(&data), 141 "should be channel data" 142 ); 143 144 let mut channel_data = ChannelData { 145 raw: data, 146 ..Default::default() 147 }; 148 channel_data.decode()?; 149 assert_eq!( 150 channel_bind.number, channel_data.number, 151 "get channel data's number is invalid" 152 ); 153 assert_eq!( 154 target_text2.as_bytes(), 155 &channel_data.data, 156 "get data doesn't equal the target text." 157 ); 158 159 // listeners close 160 m.close().await?; 161 162 Ok(()) 163 } 164 165 #[tokio::test] 166 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> { 167 //env_logger::init(); 168 169 // turn server initialization 170 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 171 172 let m = new_test_manager(); 173 174 let five_tuple = random_five_tuple(); 175 176 let _ = m 177 .create_allocation( 178 five_tuple, 179 Arc::clone(&turn_socket), 180 0, 181 DEFAULT_LIFETIME, 182 TextAttribute::new(ATTR_USERNAME, "user".into()), 183 ) 184 .await?; 185 186 let result = m 187 .create_allocation( 188 five_tuple, 189 Arc::clone(&turn_socket), 190 0, 191 DEFAULT_LIFETIME, 192 TextAttribute::new(ATTR_USERNAME, "user".into()), 193 ) 194 .await; 195 assert!(result.is_err(), "expected error, but got ok"); 196 197 Ok(()) 198 } 199 200 #[tokio::test] 201 async fn test_delete_allocation() -> Result<()> { 202 //env_logger::init(); 203 204 // turn server initialization 205 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 206 207 let m = new_test_manager(); 208 209 let five_tuple = random_five_tuple(); 210 211 let _ = m 212 .create_allocation( 213 five_tuple, 214 Arc::clone(&turn_socket), 215 0, 216 DEFAULT_LIFETIME, 217 TextAttribute::new(ATTR_USERNAME, "user".into()), 218 ) 219 .await?; 220 221 assert!( 222 m.get_allocation(&five_tuple).await.is_some(), 223 "Failed to get allocation right after creation" 224 ); 225 226 m.delete_allocation(&five_tuple).await; 227 228 assert!( 229 m.get_allocation(&five_tuple).await.is_none(), 230 "Get allocation with {} should be nil after delete", 231 five_tuple 232 ); 233 234 Ok(()) 235 } 236 237 #[tokio::test] 238 async fn test_allocation_timeout() -> Result<()> { 239 //env_logger::init(); 240 241 // turn server initialization 242 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 243 244 let m = new_test_manager(); 245 246 let mut allocations = vec![]; 247 let lifetime = Duration::from_millis(100); 248 249 for _ in 0..5 { 250 let five_tuple = random_five_tuple(); 251 252 let a = m 253 .create_allocation( 254 five_tuple, 255 Arc::clone(&turn_socket), 256 0, 257 lifetime, 258 TextAttribute::new(ATTR_USERNAME, "user".into()), 259 ) 260 .await?; 261 262 allocations.push(a); 263 } 264 265 let mut count = 0; 266 267 'outer: loop { 268 count += 1; 269 270 if count >= 10 { 271 assert!(false, "Allocations didn't timeout"); 272 } 273 274 tokio::time::sleep(lifetime + Duration::from_millis(100)).await; 275 276 let any_outstanding = false; 277 278 for a in &allocations { 279 if a.close().await.is_ok() { 280 continue 'outer; 281 } 282 } 283 284 if !any_outstanding { 285 return Ok(()); 286 } 287 } 288 } 289 290 #[tokio::test] 291 async fn test_manager_close() -> Result<()> { 292 // env_logger::init(); 293 294 // turn server initialization 295 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 296 297 let m = new_test_manager(); 298 299 let mut allocations = vec![]; 300 301 let a1 = m 302 .create_allocation( 303 random_five_tuple(), 304 Arc::clone(&turn_socket), 305 0, 306 Duration::from_millis(100), 307 TextAttribute::new(ATTR_USERNAME, "user".into()), 308 ) 309 .await?; 310 allocations.push(a1); 311 312 let a2 = m 313 .create_allocation( 314 random_five_tuple(), 315 Arc::clone(&turn_socket), 316 0, 317 Duration::from_millis(200), 318 TextAttribute::new(ATTR_USERNAME, "user".into()), 319 ) 320 .await?; 321 allocations.push(a2); 322 323 tokio::time::sleep(Duration::from_millis(150)).await; 324 325 log::trace!("Mgr is going to be closed..."); 326 327 m.close().await?; 328 329 for a in allocations { 330 assert!( 331 a.close().await.is_err(), 332 "Allocation should be closed if lifetime timeout" 333 ); 334 } 335 336 Ok(()) 337 } 338 339 #[tokio::test] 340 async fn test_delete_allocation_by_username() -> Result<()> { 341 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 342 343 let m = new_test_manager(); 344 345 let five_tuple1 = random_five_tuple(); 346 let five_tuple2 = random_five_tuple(); 347 let five_tuple3 = random_five_tuple(); 348 349 let _ = m 350 .create_allocation( 351 five_tuple1, 352 Arc::clone(&turn_socket), 353 0, 354 DEFAULT_LIFETIME, 355 TextAttribute::new(ATTR_USERNAME, "user".into()), 356 ) 357 .await?; 358 let _ = m 359 .create_allocation( 360 five_tuple2, 361 Arc::clone(&turn_socket), 362 0, 363 DEFAULT_LIFETIME, 364 TextAttribute::new(ATTR_USERNAME, "user".into()), 365 ) 366 .await?; 367 let _ = m 368 .create_allocation( 369 five_tuple3, 370 Arc::clone(&turn_socket), 371 0, 372 DEFAULT_LIFETIME, 373 TextAttribute::new(ATTR_USERNAME, "user2".into()), 374 ) 375 .await?; 376 377 assert_eq!(m.allocations.lock().await.len(), 3); 378 379 m.delete_allocations_by_username("user").await; 380 381 assert_eq!(m.allocations.lock().await.len(), 1); 382 383 assert!( 384 m.get_allocation(&five_tuple1).await.is_none() 385 && m.get_allocation(&five_tuple2).await.is_none() 386 && m.get_allocation(&five_tuple3).await.is_some() 387 ); 388 389 Ok(()) 390 } 391 392 struct TestAuthHandler; 393 impl AuthHandler for TestAuthHandler { 394 fn auth_handle(&self, username: &str, realm: &str, _src_addr: SocketAddr) -> Result<Vec<u8>> { 395 Ok(generate_auth_key(username, realm, "pass")) 396 } 397 } 398 399 async fn create_server() -> Result<(Server, u16)> { 400 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 401 let server_port = conn.local_addr()?.port(); 402 403 let server = Server::new(ServerConfig { 404 conn_configs: vec![ConnConfig { 405 conn, 406 relay_addr_generator: Box::new(RelayAddressGeneratorStatic { 407 relay_address: IpAddr::from_str("127.0.0.1")?, 408 address: "0.0.0.0".to_owned(), 409 net: Arc::new(Net::new(None)), 410 }), 411 }], 412 realm: "webrtc.rs".to_owned(), 413 auth_handler: Arc::new(TestAuthHandler {}), 414 channel_bind_timeout: Duration::from_secs(0), 415 }) 416 .await?; 417 418 Ok((server, server_port)) 419 } 420 421 async fn create_client(username: String, server_port: u16) -> Result<Client> { 422 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 423 424 Client::new(ClientConfig { 425 stun_serv_addr: format!("127.0.0.1:{}", server_port), 426 turn_serv_addr: format!("127.0.0.1:{}", server_port), 427 username, 428 password: "pass".to_owned(), 429 realm: String::new(), 430 software: String::new(), 431 rto_in_ms: 0, 432 conn, 433 vnet: None, 434 }) 435 .await 436 } 437 438 #[cfg(feature = "metrics")] 439 #[tokio::test] 440 async fn test_get_allocations_info() -> Result<()> { 441 let (server, server_port) = create_server().await?; 442 443 let client1 = create_client("user1".to_owned(), server_port).await?; 444 client1.listen().await?; 445 446 let client2 = create_client("user2".to_owned(), server_port).await?; 447 client2.listen().await?; 448 449 let client3 = create_client("user3".to_owned(), server_port).await?; 450 client3.listen().await?; 451 452 assert!(server.get_allocations_info(None).await?.is_empty()); 453 454 let user1 = client1.allocate().await?; 455 let user2 = client2.allocate().await?; 456 let user3 = client3.allocate().await?; 457 458 assert_eq!(server.get_allocations_info(None).await?.len(), 3); 459 460 let addr1 = client1 461 .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str()) 462 .await?; 463 let addr2 = client2 464 .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str()) 465 .await?; 466 let addr3 = client3 467 .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str()) 468 .await?; 469 470 user1.send_to(b"1", addr1).await?; 471 user2.send_to(b"12", addr2).await?; 472 user3.send_to(b"123", addr3).await?; 473 474 tokio::time::sleep(Duration::from_millis(100)).await; 475 476 server 477 .get_allocations_info(None) 478 .await? 479 .iter() 480 .for_each(|(_, ai)| match ai.username.as_str() { 481 "user1" => assert_eq!(ai.relayed_bytes, 1), 482 "user2" => assert_eq!(ai.relayed_bytes, 2), 483 "user3" => assert_eq!(ai.relayed_bytes, 3), 484 _ => unreachable!(), 485 }); 486 487 Ok(()) 488 } 489 490 #[cfg(feature = "metrics")] 491 #[tokio::test] 492 async fn test_get_allocations_info_bytes_count() -> Result<()> { 493 let (server, server_port) = create_server().await?; 494 495 let client = create_client("foo".to_owned(), server_port).await?; 496 497 client.listen().await?; 498 499 assert!(server.get_allocations_info(None).await?.is_empty()); 500 501 let conn = client.allocate().await?; 502 let addr = client 503 .send_binding_request_to(format!("127.0.0.1:{}", server_port).as_str()) 504 .await?; 505 506 assert!(!server.get_allocations_info(None).await?.is_empty()); 507 508 assert_eq!( 509 server 510 .get_allocations_info(None) 511 .await? 512 .values() 513 .last() 514 .unwrap() 515 .relayed_bytes, 516 0 517 ); 518 519 for _ in 0..10 { 520 conn.send_to(b"Hello", addr).await?; 521 522 tokio::time::sleep(Duration::from_millis(100)).await; 523 } 524 525 tokio::time::sleep(Duration::from_millis(1000)).await; 526 527 assert_eq!( 528 server 529 .get_allocations_info(None) 530 .await? 531 .values() 532 .last() 533 .unwrap() 534 .relayed_bytes, 535 50 536 ); 537 538 for _ in 0..10 { 539 conn.send_to(b"Hello", addr).await?; 540 541 tokio::time::sleep(Duration::from_millis(100)).await; 542 } 543 544 tokio::time::sleep(Duration::from_millis(1000)).await; 545 546 assert_eq!( 547 server 548 .get_allocations_info(None) 549 .await? 550 .values() 551 .last() 552 .unwrap() 553 .relayed_bytes, 554 100 555 ); 556 557 client.close().await?; 558 server.close().await?; 559 560 Ok(()) 561 } 562