1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use hyper::service::{make_service_fn, service_fn};
4 use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
5 use std::io::Write;
6 use std::net::SocketAddr;
7 use std::str::FromStr;
8 use std::sync::Arc;
9 use tokio::sync::Mutex;
10 use tokio::time::Duration;
11 use webrtc::api::interceptor_registry::register_default_interceptors;
12 use webrtc::api::media_engine::MediaEngine;
13 use webrtc::api::APIBuilder;
14 use webrtc::data_channel::data_channel_message::DataChannelMessage;
15 use webrtc::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
16 use webrtc::ice_transport::ice_server::RTCIceServer;
17 use webrtc::interceptor::registry::Registry;
18 use webrtc::peer_connection::configuration::RTCConfiguration;
19 use webrtc::peer_connection::math_rand_alpha;
20 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
21 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
22 use webrtc::peer_connection::RTCPeerConnection;
23 
24 #[macro_use]
25 extern crate lazy_static;
26 
27 lazy_static! {
28     static ref PEER_CONNECTION_MUTEX: Arc<Mutex<Option<Arc<RTCPeerConnection>>>> =
29         Arc::new(Mutex::new(None));
30     static ref PENDING_CANDIDATES: Arc<Mutex<Vec<RTCIceCandidate>>> = Arc::new(Mutex::new(vec![]));
31     static ref ADDRESS: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
32 }
33 
signal_candidate(addr: &str, c: &RTCIceCandidate) -> Result<()>34 async fn signal_candidate(addr: &str, c: &RTCIceCandidate) -> Result<()> {
35     /*println!(
36         "signal_candidate Post candidate to {}",
37         format!("http://{}/candidate", addr)
38     );*/
39     let payload = c.to_json()?.candidate;
40     let req = match Request::builder()
41         .method(Method::POST)
42         .uri(format!("http://{addr}/candidate"))
43         .header("content-type", "application/json; charset=utf-8")
44         .body(Body::from(payload))
45     {
46         Ok(req) => req,
47         Err(err) => {
48             println!("{err}");
49             return Err(err.into());
50         }
51     };
52 
53     let _resp = match Client::new().request(req).await {
54         Ok(resp) => resp,
55         Err(err) => {
56             println!("{err}");
57             return Err(err.into());
58         }
59     };
60     //println!("signal_candidate Response: {}", resp.status());
61 
62     Ok(())
63 }
64 
65 // HTTP Listener to get ICE Credentials/Candidate from remote Peer
remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error>66 async fn remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
67     let pc = {
68         let pcm = PEER_CONNECTION_MUTEX.lock().await;
69         pcm.clone().unwrap()
70     };
71     let addr = {
72         let addr = ADDRESS.lock().await;
73         addr.clone()
74     };
75 
76     match (req.method(), req.uri().path()) {
77         // A HTTP handler that allows the other WebRTC-rs or Pion instance to send us ICE candidates
78         // This allows us to add ICE candidates faster, we don't have to wait for STUN or TURN
79         // candidates which may be slower
80         (&Method::POST, "/candidate") => {
81             //println!("remote_handler receive from /candidate");
82             let candidate =
83                 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
84                     Ok(s) => s.to_owned(),
85                     Err(err) => panic!("{}", err),
86                 };
87 
88             if let Err(err) = pc
89                 .add_ice_candidate(RTCIceCandidateInit {
90                     candidate,
91                     ..Default::default()
92                 })
93                 .await
94             {
95                 panic!("{}", err);
96             }
97 
98             let mut response = Response::new(Body::empty());
99             *response.status_mut() = StatusCode::OK;
100             Ok(response)
101         }
102 
103         // A HTTP handler that processes a SessionDescription given to us from the other WebRTC-rs or Pion process
104         (&Method::POST, "/sdp") => {
105             //println!("remote_handler receive from /sdp");
106             let sdp_str = match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
107             {
108                 Ok(s) => s.to_owned(),
109                 Err(err) => panic!("{}", err),
110             };
111             let sdp = match serde_json::from_str::<RTCSessionDescription>(&sdp_str) {
112                 Ok(s) => s,
113                 Err(err) => panic!("{}", err),
114             };
115 
116             if let Err(err) = pc.set_remote_description(sdp).await {
117                 panic!("{}", err);
118             }
119 
120             {
121                 let cs = PENDING_CANDIDATES.lock().await;
122                 for c in &*cs {
123                     if let Err(err) = signal_candidate(&addr, c).await {
124                         panic!("{}", err);
125                     }
126                 }
127             }
128 
129             let mut response = Response::new(Body::empty());
130             *response.status_mut() = StatusCode::OK;
131             Ok(response)
132         }
133         // Return the 404 Not Found for other routes.
134         _ => {
135             let mut not_found = Response::default();
136             *not_found.status_mut() = StatusCode::NOT_FOUND;
137             Ok(not_found)
138         }
139     }
140 }
141 
142 #[tokio::main]
main() -> Result<()>143 async fn main() -> Result<()> {
144     let mut app = Command::new("Offer")
145         .version("0.1.0")
146         .author("Rain Liu <[email protected]>")
147         .about("An example of WebRTC-rs Offer.")
148         .setting(AppSettings::DeriveDisplayOrder)
149         .subcommand_negates_reqs(true)
150         .arg(
151             Arg::new("FULLHELP")
152                 .help("Prints more detailed help information")
153                 .long("fullhelp"),
154         )
155         .arg(
156             Arg::new("debug")
157                 .long("debug")
158                 .short('d')
159                 .help("Prints debug log information"),
160         )
161         .arg(
162             Arg::new("offer-address")
163                 .takes_value(true)
164                 .default_value("0.0.0.0:50000")
165                 .long("offer-address")
166                 .help("Address that the Offer HTTP server is hosted on."),
167         )
168         .arg(
169             Arg::new("answer-address")
170                 .takes_value(true)
171                 .default_value("localhost:60000")
172                 .long("answer-address")
173                 .help("Address that the Answer HTTP server is hosted on."),
174         );
175 
176     let matches = app.clone().get_matches();
177 
178     if matches.is_present("FULLHELP") {
179         app.print_long_help().unwrap();
180         std::process::exit(0);
181     }
182 
183     let debug = matches.is_present("debug");
184     if debug {
185         env_logger::Builder::new()
186             .format(|buf, record| {
187                 writeln!(
188                     buf,
189                     "{}:{} [{}] {} - {}",
190                     record.file().unwrap_or("unknown"),
191                     record.line().unwrap_or(0),
192                     record.level(),
193                     chrono::Local::now().format("%H:%M:%S.%6f"),
194                     record.args()
195                 )
196             })
197             .filter(None, log::LevelFilter::Trace)
198             .init();
199     }
200 
201     let offer_addr = matches.value_of("offer-address").unwrap().to_owned();
202     let answer_addr = matches.value_of("answer-address").unwrap().to_owned();
203 
204     {
205         let mut oa = ADDRESS.lock().await;
206         *oa = answer_addr.clone();
207     }
208 
209     // Prepare the configuration
210     let config = RTCConfiguration {
211         ice_servers: vec![RTCIceServer {
212             urls: vec!["stun:stun.l.google.com:19302".to_owned()],
213             ..Default::default()
214         }],
215         ..Default::default()
216     };
217 
218     // Create a MediaEngine object to configure the supported codec
219     let mut m = MediaEngine::default();
220     m.register_default_codecs()?;
221 
222     let mut registry = Registry::new();
223 
224     // Use the default set of Interceptors
225     registry = register_default_interceptors(registry, &mut m)?;
226 
227     // Create the API object with the MediaEngine
228     let api = APIBuilder::new()
229         .with_media_engine(m)
230         .with_interceptor_registry(registry)
231         .build();
232 
233     // Create a new RTCPeerConnection
234     let peer_connection = Arc::new(api.new_peer_connection(config).await?);
235 
236     // When an ICE candidate is available send to the other Pion instance
237     // the other Pion instance will add this candidate by calling AddICECandidate
238     let pc = Arc::downgrade(&peer_connection);
239     let pending_candidates2 = Arc::clone(&PENDING_CANDIDATES);
240     let addr2 = answer_addr.clone();
241     peer_connection.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
242         //println!("on_ice_candidate {:?}", c);
243 
244         let pc2 = pc.clone();
245         let pending_candidates3 = Arc::clone(&pending_candidates2);
246         let addr3 = addr2.clone();
247         Box::pin(async move {
248             if let Some(c) = c {
249                 if let Some(pc) = pc2.upgrade() {
250                     let desc = pc.remote_description().await;
251                     if desc.is_none() {
252                         let mut cs = pending_candidates3.lock().await;
253                         cs.push(c);
254                     } else if let Err(err) = signal_candidate(&addr3, &c).await {
255                         panic!("{}", err);
256                     }
257                 }
258             }
259         })
260     }));
261 
262     println!("Listening on http://{offer_addr}");
263     {
264         let mut pcm = PEER_CONNECTION_MUTEX.lock().await;
265         *pcm = Some(Arc::clone(&peer_connection));
266     }
267 
268     tokio::spawn(async move {
269         let addr = SocketAddr::from_str(&offer_addr).unwrap();
270         let service =
271             make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) });
272         let server = Server::bind(&addr).serve(service);
273         // Run this server for... forever!
274         if let Err(e) = server.await {
275             eprintln!("server error: {e}");
276         }
277     });
278 
279     // Create a datachannel with label 'data'
280     let data_channel = peer_connection.create_data_channel("data", None).await?;
281 
282     let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
283 
284     // Set the handler for Peer connection state
285     // This will notify you when the peer has connected/disconnected
286     peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
287         println!("Peer Connection State has changed: {s}");
288 
289         if s == RTCPeerConnectionState::Failed {
290             // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
291             // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
292             // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
293             println!("Peer Connection has gone to failed exiting");
294             let _ = done_tx.try_send(());
295         }
296 
297         Box::pin(async {})
298     }));
299 
300     // Register channel opening handling
301     let d1 = Arc::clone(&data_channel);
302     data_channel.on_open(Box::new(move || {
303         println!("Data channel '{}'-'{}' open. Random messages will now be sent to any connected DataChannels every 5 seconds", d1.label(), d1.id());
304 
305         let d2 = Arc::clone(&d1);
306         Box::pin(async move {
307             let mut result = Result::<usize>::Ok(0);
308             while result.is_ok() {
309                 let timeout = tokio::time::sleep(Duration::from_secs(5));
310                 tokio::pin!(timeout);
311 
312                 tokio::select! {
313                     _ = timeout.as_mut() =>{
314                         let message = math_rand_alpha(15);
315                         println!("Sending '{message}'");
316                         result = d2.send_text(message).await.map_err(Into::into);
317                     }
318                 };
319             }
320         })
321     }));
322 
323     // Register text message handling
324     let d_label = data_channel.label().to_owned();
325     data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
326         let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
327         println!("Message from DataChannel '{d_label}': '{msg_str}'");
328         Box::pin(async {})
329     }));
330 
331     // Create an offer to send to the other process
332     let offer = peer_connection.create_offer(None).await?;
333 
334     // Send our offer to the HTTP server listening in the other process
335     let payload = match serde_json::to_string(&offer) {
336         Ok(p) => p,
337         Err(err) => panic!("{}", err),
338     };
339 
340     // Sets the LocalDescription, and starts our UDP listeners
341     // Note: this will start the gathering of ICE candidates
342     peer_connection.set_local_description(offer).await?;
343 
344     //println!("Post: {}", format!("http://{}/sdp", answer_addr));
345     let req = match Request::builder()
346         .method(Method::POST)
347         .uri(format!("http://{answer_addr}/sdp"))
348         .header("content-type", "application/json; charset=utf-8")
349         .body(Body::from(payload))
350     {
351         Ok(req) => req,
352         Err(err) => panic!("{}", err),
353     };
354 
355     let _resp = match Client::new().request(req).await {
356         Ok(resp) => resp,
357         Err(err) => {
358             println!("{err}");
359             return Err(err.into());
360         }
361     };
362     //println!("Response: {}", resp.status());
363 
364     println!("Press ctrl-c to stop");
365     tokio::select! {
366         _ = done_rx.recv() => {
367             println!("received done signal!");
368         }
369         _ = tokio::signal::ctrl_c() => {
370             println!();
371         }
372     };
373 
374     peer_connection.close().await?;
375 
376     Ok(())
377 }
378