1 use super::*;
2 use crate::error::Result;
3 use crate::relay::relay_none::*;
4 
5 use crate::proto::lifetime::DEFAULT_LIFETIME;
6 use std::net::Ipv4Addr;
7 use std::str::FromStr;
8 use tokio::net::UdpSocket;
9 use util::vnet::net::*;
10 
11 fn new_test_manager() -> Manager {
12     let config = ManagerConfig {
13         relay_addr_generator: Box::new(RelayAddressGeneratorNone {
14             address: "0.0.0.0".to_owned(),
15             net: Arc::new(Net::new(None)),
16         }),
17     };
18     Manager::new(config)
19 }
20 
21 fn random_five_tuple() -> FiveTuple {
22     /* #nosec */
23     FiveTuple {
24         src_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
25         dst_addr: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), rand::random()),
26         ..Default::default()
27     }
28 }
29 
30 #[tokio::test]
31 async fn test_packet_handler() -> Result<()> {
32     //env_logger::init();
33 
34     // turn server initialization
35     let turn_socket = UdpSocket::bind("127.0.0.1:0").await?;
36 
37     // client listener initialization
38     let client_listener = UdpSocket::bind("127.0.0.1:0").await?;
39     let src_addr = client_listener.local_addr()?;
40     let (data_ch_tx, mut data_ch_rx) = mpsc::channel(1);
41     // client listener read data
42     tokio::spawn(async move {
43         let mut buffer = vec![0u8; RTP_MTU];
44         loop {
45             let n = match client_listener.recv_from(&mut buffer).await {
46                 Ok((n, _)) => n,
47                 Err(_) => break,
48             };
49 
50             let _ = data_ch_tx.send(buffer[..n].to_vec()).await;
51         }
52     });
53 
54     let m = new_test_manager();
55     let a = m
56         .create_allocation(
57             FiveTuple {
58                 src_addr,
59                 dst_addr: turn_socket.local_addr()?,
60                 ..Default::default()
61             },
62             Arc::new(turn_socket),
63             0,
64             DEFAULT_LIFETIME,
65         )
66         .await?;
67 
68     let peer_listener1 = UdpSocket::bind("127.0.0.1:0").await?;
69     let peer_listener2 = UdpSocket::bind("127.0.0.1:0").await?;
70 
71     let channel_bind = ChannelBind::new(
72         ChannelNumber(MIN_CHANNEL_NUMBER),
73         peer_listener2.local_addr()?,
74     );
75 
76     let port = {
77         let a = a.lock().await;
78 
79         // add permission with peer1 address
80         a.add_permission(Permission::new(peer_listener1.local_addr()?))
81             .await;
82         // add channel with min channel number and peer2 address
83         a.add_channel_bind(channel_bind.clone(), DEFAULT_LIFETIME)
84             .await?;
85 
86         a.relay_socket.local_addr().await?.port()
87     };
88 
89     let relay_addr_with_host_str = format!("127.0.0.1:{}", port);
90     let relay_addr_with_host = SocketAddr::from_str(&relay_addr_with_host_str)?;
91 
92     // test for permission and data message
93     let target_text = "permission";
94     let _ = peer_listener1
95         .send_to(target_text.as_bytes(), relay_addr_with_host)
96         .await?;
97     let data = data_ch_rx
98         .recv()
99         .await
100         .ok_or(Error::Other("data ch closed".to_owned()))?;
101 
102     // resolve stun data message
103     assert!(is_message(&data), "should be stun message");
104 
105     let mut msg = Message::new();
106     msg.raw = data;
107     msg.decode()?;
108 
109     let mut msg_data = Data::default();
110     msg_data.get_from(&msg)?;
111     assert_eq!(
112         target_text.as_bytes(),
113         &msg_data.0,
114         "get message doesn't equal the target text"
115     );
116 
117     // test for channel bind and channel data
118     let target_text2 = "channel bind";
119     let _ = peer_listener2
120         .send_to(target_text2.as_bytes(), relay_addr_with_host)
121         .await?;
122     let data = data_ch_rx
123         .recv()
124         .await
125         .ok_or(Error::Other("data ch closed".to_owned()))?;
126 
127     // resolve channel data
128     assert!(
129         ChannelData::is_channel_data(&data),
130         "should be channel data"
131     );
132 
133     let mut channel_data = ChannelData {
134         raw: data,
135         ..Default::default()
136     };
137     channel_data.decode()?;
138     assert_eq!(
139         channel_bind.number, channel_data.number,
140         "get channel data's number is invalid"
141     );
142     assert_eq!(
143         target_text2.as_bytes(),
144         &channel_data.data,
145         "get data doesn't equal the target text."
146     );
147 
148     // listeners close
149     m.close().await?;
150 
151     Ok(())
152 }
153 
154 #[tokio::test]
155 async fn test_create_allocation_duplicate_five_tuple() -> Result<()> {
156     //env_logger::init();
157 
158     // turn server initialization
159     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
160 
161     let m = new_test_manager();
162 
163     let five_tuple = random_five_tuple();
164 
165     let _ = m
166         .create_allocation(
167             five_tuple.clone(),
168             Arc::clone(&turn_socket),
169             0,
170             DEFAULT_LIFETIME,
171         )
172         .await?;
173 
174     let result = m
175         .create_allocation(five_tuple, Arc::clone(&turn_socket), 0, DEFAULT_LIFETIME)
176         .await;
177     assert!(result.is_err(), "expected error, but got ok");
178 
179     Ok(())
180 }
181 
182 #[tokio::test]
183 async fn test_delete_allocation() -> Result<()> {
184     //env_logger::init();
185 
186     // turn server initialization
187     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
188 
189     let m = new_test_manager();
190 
191     let five_tuple = random_five_tuple();
192 
193     let _ = m
194         .create_allocation(
195             five_tuple.clone(),
196             Arc::clone(&turn_socket),
197             0,
198             DEFAULT_LIFETIME,
199         )
200         .await?;
201 
202     assert!(
203         m.get_allocation(&five_tuple).await.is_some(),
204         "Failed to get allocation right after creation"
205     );
206 
207     m.delete_allocation(&five_tuple).await;
208 
209     assert!(
210         m.get_allocation(&five_tuple).await.is_none(),
211         "Get allocation with {} should be nil after delete",
212         five_tuple
213     );
214 
215     Ok(())
216 }
217 
218 #[tokio::test]
219 async fn test_allocation_timeout() -> Result<()> {
220     //env_logger::init();
221 
222     // turn server initialization
223     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
224 
225     let m = new_test_manager();
226 
227     let mut allocations = vec![];
228     let lifetime = Duration::from_millis(100);
229 
230     for _ in 0..5 {
231         let five_tuple = random_five_tuple();
232 
233         let a = m
234             .create_allocation(five_tuple, Arc::clone(&turn_socket), 0, lifetime)
235             .await?;
236 
237         allocations.push(a);
238     }
239 
240     let mut count = 0;
241 
242     'outer: loop {
243         count += 1;
244 
245         if count >= 10 {
246             assert!(false, "Allocations didn't timeout");
247         }
248 
249         tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
250 
251         let any_outstanding = false;
252 
253         for allocation in &allocations {
254             let mut a = allocation.lock().await;
255             if a.close().await.is_ok() {
256                 continue 'outer;
257             }
258         }
259 
260         if !any_outstanding {
261             return Ok(());
262         }
263     }
264 }
265 
266 #[tokio::test]
267 async fn test_manager_close() -> Result<()> {
268     // env_logger::init();
269 
270     // turn server initialization
271     let turn_socket: Arc<dyn Conn + Send + Sync> = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
272 
273     let m = new_test_manager();
274 
275     let mut allocations = vec![];
276 
277     let a1 = m
278         .create_allocation(
279             random_five_tuple(),
280             Arc::clone(&turn_socket),
281             0,
282             Duration::from_millis(100),
283         )
284         .await?;
285     allocations.push(a1);
286 
287     let a2 = m
288         .create_allocation(
289             random_five_tuple(),
290             Arc::clone(&turn_socket),
291             0,
292             Duration::from_millis(200),
293         )
294         .await?;
295     allocations.push(a2);
296 
297     tokio::time::sleep(Duration::from_millis(150)).await;
298 
299     log::trace!("Mgr is going to be closed...");
300 
301     m.close().await?;
302 
303     for allocation in allocations {
304         let mut a = allocation.lock().await;
305         assert!(
306             a.close().await.is_err(),
307             "Allocation should be closed if lifetime timeout"
308         );
309     }
310 
311     Ok(())
312 }
313