xref: /webrtc/ice/examples/ping_pong.rs (revision 5d8fe953)
1 use tokio::net::UdpSocket;
2 use webrtc_ice as ice;
3 
4 use ice::agent::agent_config::AgentConfig;
5 use ice::agent::Agent;
6 use ice::candidate::{candidate_base::*, *};
7 use ice::state::*;
8 use ice::Error;
9 use ice::{network_type::*, udp_network::UDPNetwork};
10 
11 use clap::{App, AppSettings, Arg};
12 use hyper::service::{make_service_fn, service_fn};
13 use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
14 use rand::{thread_rng, Rng};
15 use std::io;
16 use std::sync::Arc;
17 use std::time::Duration;
18 use tokio::sync::{mpsc, watch, Mutex};
19 use util::Conn;
20 
21 #[macro_use]
22 extern crate lazy_static;
23 
24 type SenderType = Arc<Mutex<mpsc::Sender<String>>>;
25 type ReceiverType = Arc<Mutex<mpsc::Receiver<String>>>;
26 
27 lazy_static! {
28     // ErrUnknownType indicates an error with Unknown info.
29     static ref REMOTE_AUTH_CHANNEL: (SenderType, ReceiverType ) = {
30         let (tx, rx) = mpsc::channel::<String>(3);
31         (Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx)))
32     };
33 
34     static ref REMOTE_CAND_CHANNEL: (SenderType, ReceiverType) = {
35         let (tx, rx) = mpsc::channel::<String>(10);
36         (Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx)))
37     };
38 }
39 
40 // HTTP Listener to get ICE Credentials/Candidate from remote Peer
remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error>41 async fn remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
42     //println!("received {:?}", req);
43     match (req.method(), req.uri().path()) {
44         (&Method::POST, "/remoteAuth") => {
45             let full_body =
46                 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
47                     Ok(s) => s.to_owned(),
48                     Err(err) => panic!("{}", err),
49                 };
50             let tx = REMOTE_AUTH_CHANNEL.0.lock().await;
51             //println!("body: {:?}", full_body);
52             let _ = tx.send(full_body).await;
53 
54             let mut response = Response::new(Body::empty());
55             *response.status_mut() = StatusCode::OK;
56             Ok(response)
57         }
58 
59         (&Method::POST, "/remoteCandidate") => {
60             let full_body =
61                 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
62                     Ok(s) => s.to_owned(),
63                     Err(err) => panic!("{}", err),
64                 };
65             let tx = REMOTE_CAND_CHANNEL.0.lock().await;
66             //println!("body: {:?}", full_body);
67             let _ = tx.send(full_body).await;
68 
69             let mut response = Response::new(Body::empty());
70             *response.status_mut() = StatusCode::OK;
71             Ok(response)
72         }
73 
74         // Return the 404 Not Found for other routes.
75         _ => {
76             let mut not_found = Response::default();
77             *not_found.status_mut() = StatusCode::NOT_FOUND;
78             Ok(not_found)
79         }
80     }
81 }
82 
83 // Controlled Agent:
84 //      cargo run --color=always --package webrtc-ice --example ping_pong
85 // Controlling Agent:
86 //      cargo run --color=always --package webrtc-ice --example ping_pong -- --controlling
87 
88 #[tokio::main]
main() -> Result<(), Error>89 async fn main() -> Result<(), Error> {
90     env_logger::init();
91     // .format(|buf, record| {
92     //     writeln!(
93     //         buf,
94     //         "{}:{} [{}] {} - {}",
95     //         record.file().unwrap_or("unknown"),
96     //         record.line().unwrap_or(0),
97     //         record.level(),
98     //         chrono::Local::now().format("%H:%M:%S.%6f"),
99     //         record.args()
100     //     )
101     // })
102     // .filter(None, log::LevelFilter::Trace)
103     // .init();
104 
105     let mut app = App::new("ICE Demo")
106         .version("0.1.0")
107         .author("Rain Liu <[email protected]>")
108         .about("An example of ICE")
109         .setting(AppSettings::DeriveDisplayOrder)
110         .setting(AppSettings::SubcommandsNegateReqs)
111         .arg(
112             Arg::with_name("use-mux")
113                 .takes_value(false)
114                 .long("use-mux")
115                 .short('m')
116                 .help("Use a muxed UDP connection over a single listening port"),
117         )
118         .arg(
119             Arg::with_name("FULLHELP")
120                 .help("Prints more detailed help information")
121                 .long("fullhelp"),
122         )
123         .arg(
124             Arg::with_name("controlling")
125                 .takes_value(false)
126                 .long("controlling")
127                 .help("is ICE Agent controlling"),
128         );
129 
130     let matches = app.clone().get_matches();
131 
132     if matches.is_present("FULLHELP") {
133         app.print_long_help().unwrap();
134         std::process::exit(0);
135     }
136 
137     let is_controlling = matches.is_present("controlling");
138     let use_mux = matches.is_present("use-mux");
139 
140     let (local_http_port, remote_http_port) = if is_controlling {
141         (9000, 9001)
142     } else {
143         (9001, 9000)
144     };
145 
146     let (weak_conn, weak_agent) = {
147         let (done_tx, done_rx) = watch::channel(());
148 
149         println!("Listening on http://localhost:{local_http_port}");
150         let mut done_http_server = done_rx.clone();
151         tokio::spawn(async move {
152             let addr = ([0, 0, 0, 0], local_http_port).into();
153             let service =
154                 make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) });
155             let server = Server::bind(&addr).serve(service);
156             tokio::select! {
157                 _ = done_http_server.changed() => {
158                     println!("receive cancel http server!");
159                 }
160                 result = server => {
161                     // Run this server for... forever!
162                     if let Err(e) = result {
163                         eprintln!("server error: {e}");
164                     }
165                     println!("exit http server!");
166                 }
167             };
168         });
169 
170         if is_controlling {
171             println!("Local Agent is controlling");
172         } else {
173             println!("Local Agent is controlled");
174         };
175         println!("Press 'Enter' when both processes have started");
176         let mut input = String::new();
177         let _ = io::stdin().read_line(&mut input)?;
178 
179         let udp_network = if use_mux {
180             use ice::udp_mux::*;
181             let port = if is_controlling { 4000 } else { 4001 };
182 
183             let udp_socket = UdpSocket::bind(("0.0.0.0", port)).await?;
184             let udp_mux = UDPMuxDefault::new(UDPMuxParams::new(udp_socket));
185 
186             UDPNetwork::Muxed(udp_mux)
187         } else {
188             UDPNetwork::Ephemeral(Default::default())
189         };
190 
191         let ice_agent = Arc::new(
192             Agent::new(AgentConfig {
193                 network_types: vec![NetworkType::Udp4],
194                 udp_network,
195                 ..Default::default()
196             })
197             .await?,
198         );
199 
200         let client = Arc::new(Client::new());
201 
202         // When we have gathered a new ICE Candidate send it to the remote peer
203         let client2 = Arc::clone(&client);
204         ice_agent.on_candidate(Box::new(
205             move |c: Option<Arc<dyn Candidate + Send + Sync>>| {
206                 let client3 = Arc::clone(&client2);
207                 Box::pin(async move {
208                     if let Some(c) = c {
209                         println!("posting remoteCandidate with {}", c.marshal());
210 
211                         let req = match Request::builder()
212                             .method(Method::POST)
213                             .uri(format!(
214                                 "http://localhost:{remote_http_port}/remoteCandidate"
215                             ))
216                             .body(Body::from(c.marshal()))
217                         {
218                             Ok(req) => req,
219                             Err(err) => {
220                                 println!("{err}");
221                                 return;
222                             }
223                         };
224                         let resp = match client3.request(req).await {
225                             Ok(resp) => resp,
226                             Err(err) => {
227                                 println!("{err}");
228                                 return;
229                             }
230                         };
231                         println!("Response from remoteCandidate: {}", resp.status());
232                     }
233                 })
234             },
235         ));
236 
237         let (ice_done_tx, mut ice_done_rx) = mpsc::channel::<()>(1);
238         // When ICE Connection state has change print to stdout
239         ice_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
240             println!("ICE Connection State has changed: {c}");
241             if c == ConnectionState::Failed {
242                 let _ = ice_done_tx.try_send(());
243             }
244             Box::pin(async move {})
245         }));
246 
247         // Get the local auth details and send to remote peer
248         let (local_ufrag, local_pwd) = ice_agent.get_local_user_credentials().await;
249 
250         println!("posting remoteAuth with {local_ufrag}:{local_pwd}");
251         let req = match Request::builder()
252             .method(Method::POST)
253             .uri(format!("http://localhost:{remote_http_port}/remoteAuth"))
254             .body(Body::from(format!("{local_ufrag}:{local_pwd}")))
255         {
256             Ok(req) => req,
257             Err(err) => return Err(Error::Other(format!("{err}"))),
258         };
259         let resp = match client.request(req).await {
260             Ok(resp) => resp,
261             Err(err) => return Err(Error::Other(format!("{err}"))),
262         };
263         println!("Response from remoteAuth: {}", resp.status());
264 
265         let (remote_ufrag, remote_pwd) = {
266             let mut rx = REMOTE_AUTH_CHANNEL.1.lock().await;
267             if let Some(s) = rx.recv().await {
268                 println!("received: {s}");
269                 let fields: Vec<String> = s.split(':').map(|s| s.to_string()).collect();
270                 (fields[0].clone(), fields[1].clone())
271             } else {
272                 panic!("rx.recv() empty");
273             }
274         };
275         println!("remote_ufrag: {remote_ufrag}, remote_pwd: {remote_pwd}");
276 
277         let ice_agent2 = Arc::clone(&ice_agent);
278         let mut done_cand = done_rx.clone();
279         tokio::spawn(async move {
280             let mut rx = REMOTE_CAND_CHANNEL.1.lock().await;
281             loop {
282                 tokio::select! {
283                      _ = done_cand.changed() => {
284                         println!("receive cancel remote cand!");
285                         break;
286                     }
287                     result = rx.recv() => {
288                         if let Some(s) = result {
289                             if let Ok(c) = unmarshal_candidate(&s) {
290                                 println!("add_remote_candidate: {c}");
291                                 let c: Arc<dyn Candidate + Send + Sync> = Arc::new(c);
292                                 let _ = ice_agent2.add_remote_candidate(&c);
293                             }else{
294                                 println!("unmarshal_candidate error!");
295                                 break;
296                             }
297                         }else{
298                             println!("REMOTE_CAND_CHANNEL done!");
299                             break;
300                         }
301                     }
302                 };
303             }
304         });
305 
306         ice_agent.gather_candidates()?;
307         println!("Connecting...");
308 
309         let (_cancel_tx, cancel_rx) = mpsc::channel(1);
310         // Start the ICE Agent. One side must be controlled, and the other must be controlling
311         let conn: Arc<dyn Conn + Send + Sync> = if is_controlling {
312             ice_agent.dial(cancel_rx, remote_ufrag, remote_pwd).await?
313         } else {
314             ice_agent
315                 .accept(cancel_rx, remote_ufrag, remote_pwd)
316                 .await?
317         };
318 
319         let weak_conn = Arc::downgrade(&conn);
320 
321         // Send messages in a loop to the remote peer
322         let conn_tx = Arc::clone(&conn);
323         let mut done_send = done_rx.clone();
324         tokio::spawn(async move {
325             const RANDOM_STRING: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
326             loop {
327                 tokio::time::sleep(Duration::from_secs(3)).await;
328 
329                 let val: String = (0..15)
330                     .map(|_| {
331                         let idx = thread_rng().gen_range(0..RANDOM_STRING.len());
332                         RANDOM_STRING[idx] as char
333                     })
334                     .collect();
335 
336                 tokio::select! {
337                      _ = done_send.changed() => {
338                         println!("receive cancel ice send!");
339                         break;
340                     }
341                     result = conn_tx.send(val.as_bytes()) => {
342                         if let Err(err) = result {
343                             eprintln!("conn_tx send error: {err}");
344                             break;
345                         }else{
346                             println!("Sent: '{val}'");
347                         }
348                     }
349                 };
350             }
351         });
352 
353         let mut done_recv = done_rx.clone();
354         tokio::spawn(async move {
355             // Receive messages in a loop from the remote peer
356             let mut buf = vec![0u8; 1500];
357             loop {
358                 tokio::select! {
359                     _ = done_recv.changed() => {
360                         println!("receive cancel ice recv!");
361                         break;
362                     }
363                     result = conn.recv(&mut buf) => {
364                         match result {
365                             Ok(n) => {
366                                 println!("Received: '{}'", std::str::from_utf8(&buf[..n]).unwrap());
367                             }
368                             Err(err) => {
369                                 eprintln!("conn_tx send error: {err}");
370                                 break;
371                             }
372                         };
373                     }
374                 };
375             }
376         });
377 
378         println!("Press ctrl-c to stop");
379         /*let d = if is_controlling {
380             Duration::from_secs(500)
381         } else {
382             Duration::from_secs(5)
383         };
384         let timeout = tokio::time::sleep(d);
385         tokio::pin!(timeout);*/
386 
387         tokio::select! {
388             /*_ = timeout.as_mut() => {
389                 println!("received timeout signal!");
390                 let _ = done_tx.send(());
391             }*/
392             _ = ice_done_rx.recv() => {
393                 println!("ice_done_rx");
394                 let _ = done_tx.send(());
395             }
396             _ = tokio::signal::ctrl_c() => {
397                 println!();
398                 let _ = done_tx.send(());
399             }
400         };
401 
402         let _ = ice_agent.close().await;
403 
404         (weak_conn, Arc::downgrade(&ice_agent))
405     };
406 
407     let mut int = tokio::time::interval(Duration::from_secs(1));
408     loop {
409         int.tick().await;
410         println!(
411             "weak_conn: weak count = {}, strong count = {}, weak_agent: weak count = {}, strong count = {}",
412             weak_conn.weak_count(),
413             weak_conn.strong_count(),
414             weak_agent.weak_count(),
415             weak_agent.strong_count(),
416         );
417         if weak_conn.strong_count() == 0 && weak_agent.strong_count() == 0 {
418             break;
419         }
420     }
421 
422     Ok(())
423 }
424