1 use tokio::net::UdpSocket;
2 use webrtc_ice as ice;
3
4 use ice::agent::agent_config::AgentConfig;
5 use ice::agent::Agent;
6 use ice::candidate::{candidate_base::*, *};
7 use ice::state::*;
8 use ice::Error;
9 use ice::{network_type::*, udp_network::UDPNetwork};
10
11 use clap::{App, AppSettings, Arg};
12 use hyper::service::{make_service_fn, service_fn};
13 use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
14 use rand::{thread_rng, Rng};
15 use std::io;
16 use std::sync::Arc;
17 use std::time::Duration;
18 use tokio::sync::{mpsc, watch, Mutex};
19 use util::Conn;
20
21 #[macro_use]
22 extern crate lazy_static;
23
24 type SenderType = Arc<Mutex<mpsc::Sender<String>>>;
25 type ReceiverType = Arc<Mutex<mpsc::Receiver<String>>>;
26
27 lazy_static! {
28 // ErrUnknownType indicates an error with Unknown info.
29 static ref REMOTE_AUTH_CHANNEL: (SenderType, ReceiverType ) = {
30 let (tx, rx) = mpsc::channel::<String>(3);
31 (Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx)))
32 };
33
34 static ref REMOTE_CAND_CHANNEL: (SenderType, ReceiverType) = {
35 let (tx, rx) = mpsc::channel::<String>(10);
36 (Arc::new(Mutex::new(tx)), Arc::new(Mutex::new(rx)))
37 };
38 }
39
40 // HTTP Listener to get ICE Credentials/Candidate from remote Peer
remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error>41 async fn remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
42 //println!("received {:?}", req);
43 match (req.method(), req.uri().path()) {
44 (&Method::POST, "/remoteAuth") => {
45 let full_body =
46 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
47 Ok(s) => s.to_owned(),
48 Err(err) => panic!("{}", err),
49 };
50 let tx = REMOTE_AUTH_CHANNEL.0.lock().await;
51 //println!("body: {:?}", full_body);
52 let _ = tx.send(full_body).await;
53
54 let mut response = Response::new(Body::empty());
55 *response.status_mut() = StatusCode::OK;
56 Ok(response)
57 }
58
59 (&Method::POST, "/remoteCandidate") => {
60 let full_body =
61 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
62 Ok(s) => s.to_owned(),
63 Err(err) => panic!("{}", err),
64 };
65 let tx = REMOTE_CAND_CHANNEL.0.lock().await;
66 //println!("body: {:?}", full_body);
67 let _ = tx.send(full_body).await;
68
69 let mut response = Response::new(Body::empty());
70 *response.status_mut() = StatusCode::OK;
71 Ok(response)
72 }
73
74 // Return the 404 Not Found for other routes.
75 _ => {
76 let mut not_found = Response::default();
77 *not_found.status_mut() = StatusCode::NOT_FOUND;
78 Ok(not_found)
79 }
80 }
81 }
82
83 // Controlled Agent:
84 // cargo run --color=always --package webrtc-ice --example ping_pong
85 // Controlling Agent:
86 // cargo run --color=always --package webrtc-ice --example ping_pong -- --controlling
87
88 #[tokio::main]
main() -> Result<(), Error>89 async fn main() -> Result<(), Error> {
90 env_logger::init();
91 // .format(|buf, record| {
92 // writeln!(
93 // buf,
94 // "{}:{} [{}] {} - {}",
95 // record.file().unwrap_or("unknown"),
96 // record.line().unwrap_or(0),
97 // record.level(),
98 // chrono::Local::now().format("%H:%M:%S.%6f"),
99 // record.args()
100 // )
101 // })
102 // .filter(None, log::LevelFilter::Trace)
103 // .init();
104
105 let mut app = App::new("ICE Demo")
106 .version("0.1.0")
107 .author("Rain Liu <[email protected]>")
108 .about("An example of ICE")
109 .setting(AppSettings::DeriveDisplayOrder)
110 .setting(AppSettings::SubcommandsNegateReqs)
111 .arg(
112 Arg::with_name("use-mux")
113 .takes_value(false)
114 .long("use-mux")
115 .short('m')
116 .help("Use a muxed UDP connection over a single listening port"),
117 )
118 .arg(
119 Arg::with_name("FULLHELP")
120 .help("Prints more detailed help information")
121 .long("fullhelp"),
122 )
123 .arg(
124 Arg::with_name("controlling")
125 .takes_value(false)
126 .long("controlling")
127 .help("is ICE Agent controlling"),
128 );
129
130 let matches = app.clone().get_matches();
131
132 if matches.is_present("FULLHELP") {
133 app.print_long_help().unwrap();
134 std::process::exit(0);
135 }
136
137 let is_controlling = matches.is_present("controlling");
138 let use_mux = matches.is_present("use-mux");
139
140 let (local_http_port, remote_http_port) = if is_controlling {
141 (9000, 9001)
142 } else {
143 (9001, 9000)
144 };
145
146 let (weak_conn, weak_agent) = {
147 let (done_tx, done_rx) = watch::channel(());
148
149 println!("Listening on http://localhost:{local_http_port}");
150 let mut done_http_server = done_rx.clone();
151 tokio::spawn(async move {
152 let addr = ([0, 0, 0, 0], local_http_port).into();
153 let service =
154 make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) });
155 let server = Server::bind(&addr).serve(service);
156 tokio::select! {
157 _ = done_http_server.changed() => {
158 println!("receive cancel http server!");
159 }
160 result = server => {
161 // Run this server for... forever!
162 if let Err(e) = result {
163 eprintln!("server error: {e}");
164 }
165 println!("exit http server!");
166 }
167 };
168 });
169
170 if is_controlling {
171 println!("Local Agent is controlling");
172 } else {
173 println!("Local Agent is controlled");
174 };
175 println!("Press 'Enter' when both processes have started");
176 let mut input = String::new();
177 let _ = io::stdin().read_line(&mut input)?;
178
179 let udp_network = if use_mux {
180 use ice::udp_mux::*;
181 let port = if is_controlling { 4000 } else { 4001 };
182
183 let udp_socket = UdpSocket::bind(("0.0.0.0", port)).await?;
184 let udp_mux = UDPMuxDefault::new(UDPMuxParams::new(udp_socket));
185
186 UDPNetwork::Muxed(udp_mux)
187 } else {
188 UDPNetwork::Ephemeral(Default::default())
189 };
190
191 let ice_agent = Arc::new(
192 Agent::new(AgentConfig {
193 network_types: vec![NetworkType::Udp4],
194 udp_network,
195 ..Default::default()
196 })
197 .await?,
198 );
199
200 let client = Arc::new(Client::new());
201
202 // When we have gathered a new ICE Candidate send it to the remote peer
203 let client2 = Arc::clone(&client);
204 ice_agent.on_candidate(Box::new(
205 move |c: Option<Arc<dyn Candidate + Send + Sync>>| {
206 let client3 = Arc::clone(&client2);
207 Box::pin(async move {
208 if let Some(c) = c {
209 println!("posting remoteCandidate with {}", c.marshal());
210
211 let req = match Request::builder()
212 .method(Method::POST)
213 .uri(format!(
214 "http://localhost:{remote_http_port}/remoteCandidate"
215 ))
216 .body(Body::from(c.marshal()))
217 {
218 Ok(req) => req,
219 Err(err) => {
220 println!("{err}");
221 return;
222 }
223 };
224 let resp = match client3.request(req).await {
225 Ok(resp) => resp,
226 Err(err) => {
227 println!("{err}");
228 return;
229 }
230 };
231 println!("Response from remoteCandidate: {}", resp.status());
232 }
233 })
234 },
235 ));
236
237 let (ice_done_tx, mut ice_done_rx) = mpsc::channel::<()>(1);
238 // When ICE Connection state has change print to stdout
239 ice_agent.on_connection_state_change(Box::new(move |c: ConnectionState| {
240 println!("ICE Connection State has changed: {c}");
241 if c == ConnectionState::Failed {
242 let _ = ice_done_tx.try_send(());
243 }
244 Box::pin(async move {})
245 }));
246
247 // Get the local auth details and send to remote peer
248 let (local_ufrag, local_pwd) = ice_agent.get_local_user_credentials().await;
249
250 println!("posting remoteAuth with {local_ufrag}:{local_pwd}");
251 let req = match Request::builder()
252 .method(Method::POST)
253 .uri(format!("http://localhost:{remote_http_port}/remoteAuth"))
254 .body(Body::from(format!("{local_ufrag}:{local_pwd}")))
255 {
256 Ok(req) => req,
257 Err(err) => return Err(Error::Other(format!("{err}"))),
258 };
259 let resp = match client.request(req).await {
260 Ok(resp) => resp,
261 Err(err) => return Err(Error::Other(format!("{err}"))),
262 };
263 println!("Response from remoteAuth: {}", resp.status());
264
265 let (remote_ufrag, remote_pwd) = {
266 let mut rx = REMOTE_AUTH_CHANNEL.1.lock().await;
267 if let Some(s) = rx.recv().await {
268 println!("received: {s}");
269 let fields: Vec<String> = s.split(':').map(|s| s.to_string()).collect();
270 (fields[0].clone(), fields[1].clone())
271 } else {
272 panic!("rx.recv() empty");
273 }
274 };
275 println!("remote_ufrag: {remote_ufrag}, remote_pwd: {remote_pwd}");
276
277 let ice_agent2 = Arc::clone(&ice_agent);
278 let mut done_cand = done_rx.clone();
279 tokio::spawn(async move {
280 let mut rx = REMOTE_CAND_CHANNEL.1.lock().await;
281 loop {
282 tokio::select! {
283 _ = done_cand.changed() => {
284 println!("receive cancel remote cand!");
285 break;
286 }
287 result = rx.recv() => {
288 if let Some(s) = result {
289 if let Ok(c) = unmarshal_candidate(&s) {
290 println!("add_remote_candidate: {c}");
291 let c: Arc<dyn Candidate + Send + Sync> = Arc::new(c);
292 let _ = ice_agent2.add_remote_candidate(&c);
293 }else{
294 println!("unmarshal_candidate error!");
295 break;
296 }
297 }else{
298 println!("REMOTE_CAND_CHANNEL done!");
299 break;
300 }
301 }
302 };
303 }
304 });
305
306 ice_agent.gather_candidates()?;
307 println!("Connecting...");
308
309 let (_cancel_tx, cancel_rx) = mpsc::channel(1);
310 // Start the ICE Agent. One side must be controlled, and the other must be controlling
311 let conn: Arc<dyn Conn + Send + Sync> = if is_controlling {
312 ice_agent.dial(cancel_rx, remote_ufrag, remote_pwd).await?
313 } else {
314 ice_agent
315 .accept(cancel_rx, remote_ufrag, remote_pwd)
316 .await?
317 };
318
319 let weak_conn = Arc::downgrade(&conn);
320
321 // Send messages in a loop to the remote peer
322 let conn_tx = Arc::clone(&conn);
323 let mut done_send = done_rx.clone();
324 tokio::spawn(async move {
325 const RANDOM_STRING: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
326 loop {
327 tokio::time::sleep(Duration::from_secs(3)).await;
328
329 let val: String = (0..15)
330 .map(|_| {
331 let idx = thread_rng().gen_range(0..RANDOM_STRING.len());
332 RANDOM_STRING[idx] as char
333 })
334 .collect();
335
336 tokio::select! {
337 _ = done_send.changed() => {
338 println!("receive cancel ice send!");
339 break;
340 }
341 result = conn_tx.send(val.as_bytes()) => {
342 if let Err(err) = result {
343 eprintln!("conn_tx send error: {err}");
344 break;
345 }else{
346 println!("Sent: '{val}'");
347 }
348 }
349 };
350 }
351 });
352
353 let mut done_recv = done_rx.clone();
354 tokio::spawn(async move {
355 // Receive messages in a loop from the remote peer
356 let mut buf = vec![0u8; 1500];
357 loop {
358 tokio::select! {
359 _ = done_recv.changed() => {
360 println!("receive cancel ice recv!");
361 break;
362 }
363 result = conn.recv(&mut buf) => {
364 match result {
365 Ok(n) => {
366 println!("Received: '{}'", std::str::from_utf8(&buf[..n]).unwrap());
367 }
368 Err(err) => {
369 eprintln!("conn_tx send error: {err}");
370 break;
371 }
372 };
373 }
374 };
375 }
376 });
377
378 println!("Press ctrl-c to stop");
379 /*let d = if is_controlling {
380 Duration::from_secs(500)
381 } else {
382 Duration::from_secs(5)
383 };
384 let timeout = tokio::time::sleep(d);
385 tokio::pin!(timeout);*/
386
387 tokio::select! {
388 /*_ = timeout.as_mut() => {
389 println!("received timeout signal!");
390 let _ = done_tx.send(());
391 }*/
392 _ = ice_done_rx.recv() => {
393 println!("ice_done_rx");
394 let _ = done_tx.send(());
395 }
396 _ = tokio::signal::ctrl_c() => {
397 println!();
398 let _ = done_tx.send(());
399 }
400 };
401
402 let _ = ice_agent.close().await;
403
404 (weak_conn, Arc::downgrade(&ice_agent))
405 };
406
407 let mut int = tokio::time::interval(Duration::from_secs(1));
408 loop {
409 int.tick().await;
410 println!(
411 "weak_conn: weak count = {}, strong count = {}, weak_agent: weak count = {}, strong count = {}",
412 weak_conn.weak_count(),
413 weak_conn.strong_count(),
414 weak_agent.weak_count(),
415 weak_agent.strong_count(),
416 );
417 if weak_conn.strong_count() == 0 && weak_agent.strong_count() == 0 {
418 break;
419 }
420 }
421
422 Ok(())
423 }
424