1 use super::*; 2 use crate::error::Result; 3 use crate::relay::relay_none::*; 4 5 use crate::proto::lifetime::DEFAULT_LIFETIME; 6 use std::net::Ipv4Addr; 7 use std::str::FromStr; 8 use tokio::net::UdpSocket; 9 use util::vnet::net::*; 10 11 fn new_test_manager() -> Manager { 12 let config = ManagerConfig { 13 relay_addr_generator: Box::new(RelayAddressGeneratorNone { 14 address: "0.0.0.0".to_owned(), 15 net: Arc::new(Net::new(None)), 16 }), 17 }; 18 Manager::new(config) 19 } 20 21 fn random_five_tuple() -> FiveTuple { 22 /* #nosec */ 23 FiveTuple { 24 src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 25 dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()), 26 ..Default::default() 27 } 28 } 29 30 #[tokio::test] 31 async fn test_packet_handler() -> Result<()> { 32 //env_logger::init(); 33 34 // turn server initialization 35 let turn_socket = UdpSocket::bind("127.0.0.1:0").await?; 36 37 // client listener initialization 38 let client_listener = UdpSocket::bind("127.0.0.1:0").await?; 39 let src_addr = client_listener.local_addr()?; 40 let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1); 41 // client listener read data 42 tokio::spawn(async move { 43 let mut buffer = vec![0u8; RTP_MTU]; 44 loop { 45 let n = match client_listener.recv_from(&mut buffer).await { 46 Ok((n, _)) => n, 47 Err(_) => break, 48 }; 49 50 let _ = data_ch_tx.send(buffer[..n].to_vec()).await; 51 } 52 }); 53 54 let m = new_test_manager(); 55 let a = m 56 .create_allocation( 57 FiveTuple { 58 src_addr, 59 dst_addr: turn_socket.local_addr()?, 60 ..Default::default() 61 }, 62 Arc::new(turn_socket), 63 0, 64 DEFAULT_LIFETIME, 65 ) 66 .await?; 67 68 let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?; 69 let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?; 70 71 let channel_bind = ChannelBind::new( 72 ChannelNumber(MIN_CHANNEL_NUMBER), 73 peer_listener2.local_addr()?, 74 ); 75 76 let port = { 77 let a = a.lock().await; 78 79 // add permission with peer1 address 80 a.add_permission(Permission::new(peer_listener1.local_addr()?)) 81 .await; 82 // add channel with min channel number and peer2 address 83 a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME) 84 .await?; 85 86 a.relay_socket.local_addr().await?.port() 87 }; 88 89 let relay_addr_with_host_str = format!("127.0.0.1:{}", port); 90 let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?; 91 92 // test for permission and data message 93 let target_text = "permission"; 94 let _ = peer_listener1 95 .send_to(target_text.as_bytes(), relay_addr_with_host) 96 .await?; 97 let data = data_ch_rx 98 .recv() 99 .await 100 .ok_or(Error::Other("data ch closed".to_owned()))?; 101 102 // resolve stun data message 103 assert!(is_message(&data), "should be stun message"); 104 105 let mut msg = Message::new(); 106 msg.raw = data; 107 msg.decode()?; 108 109 let mut msg_data = Data::default(); 110 msg_data.get_from(&msg)?; 111 assert_eq!( 112 target_text.as_bytes(), 113 &msg_data.0, 114 "get message doesn't equal the target text" 115 ); 116 117 // test for channel bind and channel data 118 let target_text2 = "channel bind"; 119 let _ = peer_listener2 120 .send_to(target_text2.as_bytes(), relay_addr_with_host) 121 .await?; 122 let data = data_ch_rx 123 .recv() 124 .await 125 .ok_or(Error::Other("data ch closed".to_owned()))?; 126 127 // resolve channel data 128 assert!( 129 ChannelData::is_channel_data(&data), 130 "should be channel data" 131 ); 132 133 let mut channel_data = ChannelData { 134 raw: data, 135 ..Default::default() 136 }; 137 channel_data.decode()?; 138 assert_eq!( 139 channel_bind.number, channel_data.number, 140 "get channel data's number is invalid" 141 ); 142 assert_eq!( 143 target_text2.as_bytes(), 144 &channel_data.data, 145 "get data doesn't equal the target text." 146 ); 147 148 // listeners close 149 m.close().await?; 150 151 Ok(()) 152 } 153 154 #[tokio::test] 155 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> { 156 //env_logger::init(); 157 158 // turn server initialization 159 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 160 161 let m = new_test_manager(); 162 163 let five_tuple = random_five_tuple(); 164 165 let _ = m 166 .create_allocation( 167 five_tuple.clone(), 168 Arc::clone(&turn_socket), 169 0, 170 DEFAULT_LIFETIME, 171 ) 172 .await?; 173 174 let result = m 175 .create_allocation(five_tuple, Arc::clone(&turn_socket), 0, DEFAULT_LIFETIME) 176 .await; 177 assert!(result.is_err(), "expected error, but got ok"); 178 179 Ok(()) 180 } 181 182 #[tokio::test] 183 async fn test_delete_allocation() -> Result<()> { 184 //env_logger::init(); 185 186 // turn server initialization 187 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 188 189 let m = new_test_manager(); 190 191 let five_tuple = random_five_tuple(); 192 193 let _ = m 194 .create_allocation( 195 five_tuple.clone(), 196 Arc::clone(&turn_socket), 197 0, 198 DEFAULT_LIFETIME, 199 ) 200 .await?; 201 202 assert!( 203 m.get_allocation(&five_tuple).await.is_some(), 204 "Failed to get allocation right after creation" 205 ); 206 207 m.delete_allocation(&five_tuple).await; 208 209 assert!( 210 m.get_allocation(&five_tuple).await.is_none(), 211 "Get allocation with {} should be nil after delete", 212 five_tuple 213 ); 214 215 Ok(()) 216 } 217 218 #[tokio::test] 219 async fn test_allocation_timeout() -> Result<()> { 220 //env_logger::init(); 221 222 // turn server initialization 223 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 224 225 let m = new_test_manager(); 226 227 let mut allocations = vec![]; 228 let lifetime = Duration::from_millis(100); 229 230 for _ in 0..5 { 231 let five_tuple = random_five_tuple(); 232 233 let a = m 234 .create_allocation(five_tuple, Arc::clone(&turn_socket), 0, lifetime) 235 .await?; 236 237 allocations.push(a); 238 } 239 240 let mut count = 0; 241 242 'outer: loop { 243 count += 1; 244 245 if count >= 10 { 246 assert!(false, "Allocations didn't timeout"); 247 } 248 249 tokio::time::sleep(lifetime + Duration::from_millis(100)).await; 250 251 let any_outstanding = false; 252 253 for allocation in &allocations { 254 let mut a = allocation.lock().await; 255 if a.close().await.is_ok() { 256 continue 'outer; 257 } 258 } 259 260 if !any_outstanding { 261 return Ok(()); 262 } 263 } 264 } 265 266 #[tokio::test] 267 async fn test_manager_close() -> Result<()> { 268 // env_logger::init(); 269 270 // turn server initialization 271 let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?); 272 273 let m = new_test_manager(); 274 275 let mut allocations = vec![]; 276 277 let a1 = m 278 .create_allocation( 279 random_five_tuple(), 280 Arc::clone(&turn_socket), 281 0, 282 Duration::from_millis(100), 283 ) 284 .await?; 285 allocations.push(a1); 286 287 let a2 = m 288 .create_allocation( 289 random_five_tuple(), 290 Arc::clone(&turn_socket), 291 0, 292 Duration::from_millis(200), 293 ) 294 .await?; 295 allocations.push(a2); 296 297 tokio::time::sleep(Duration::from_millis(150)).await; 298 299 log::trace!("Mgr is going to be closed..."); 300 301 m.close().await?; 302 303 for allocation in allocations { 304 let mut a = allocation.lock().await; 305 assert!( 306 a.close().await.is_err(), 307 "Allocation should be closed if lifetime timeout" 308 ); 309 } 310 311 Ok(()) 312 } 313