1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use hyper::service::{make_service_fn, service_fn};
4 use hyper::{Body, Client, Method, Request, Response, Server, StatusCode};
5 use std::io::Write;
6 use std::net::SocketAddr;
7 use std::str::FromStr;
8 use std::sync::Arc;
9 use tokio::sync::Mutex;
10 use tokio::time::Duration;
11 use webrtc::api::interceptor_registry::register_default_interceptors;
12 use webrtc::api::media_engine::MediaEngine;
13 use webrtc::api::APIBuilder;
14 use webrtc::data_channel::data_channel_message::DataChannelMessage;
15 use webrtc::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
16 use webrtc::ice_transport::ice_server::RTCIceServer;
17 use webrtc::interceptor::registry::Registry;
18 use webrtc::peer_connection::configuration::RTCConfiguration;
19 use webrtc::peer_connection::math_rand_alpha;
20 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
21 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
22 use webrtc::peer_connection::RTCPeerConnection;
23
24 #[macro_use]
25 extern crate lazy_static;
26
27 lazy_static! {
28 static ref PEER_CONNECTION_MUTEX: Arc<Mutex<Option<Arc<RTCPeerConnection>>>> =
29 Arc::new(Mutex::new(None));
30 static ref PENDING_CANDIDATES: Arc<Mutex<Vec<RTCIceCandidate>>> = Arc::new(Mutex::new(vec![]));
31 static ref ADDRESS: Arc<Mutex<String>> = Arc::new(Mutex::new(String::new()));
32 }
33
signal_candidate(addr: &str, c: &RTCIceCandidate) -> Result<()>34 async fn signal_candidate(addr: &str, c: &RTCIceCandidate) -> Result<()> {
35 /*println!(
36 "signal_candidate Post candidate to {}",
37 format!("http://{}/candidate", addr)
38 );*/
39 let payload = c.to_json()?.candidate;
40 let req = match Request::builder()
41 .method(Method::POST)
42 .uri(format!("http://{addr}/candidate"))
43 .header("content-type", "application/json; charset=utf-8")
44 .body(Body::from(payload))
45 {
46 Ok(req) => req,
47 Err(err) => {
48 println!("{err}");
49 return Err(err.into());
50 }
51 };
52
53 let _resp = match Client::new().request(req).await {
54 Ok(resp) => resp,
55 Err(err) => {
56 println!("{err}");
57 return Err(err.into());
58 }
59 };
60 //println!("signal_candidate Response: {}", resp.status());
61
62 Ok(())
63 }
64
65 // HTTP Listener to get ICE Credentials/Candidate from remote Peer
remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error>66 async fn remote_handler(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
67 let pc = {
68 let pcm = PEER_CONNECTION_MUTEX.lock().await;
69 pcm.clone().unwrap()
70 };
71 let addr = {
72 let addr = ADDRESS.lock().await;
73 addr.clone()
74 };
75
76 match (req.method(), req.uri().path()) {
77 // A HTTP handler that allows the other WebRTC-rs or Pion instance to send us ICE candidates
78 // This allows us to add ICE candidates faster, we don't have to wait for STUN or TURN
79 // candidates which may be slower
80 (&Method::POST, "/candidate") => {
81 //println!("remote_handler receive from /candidate");
82 let candidate =
83 match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) {
84 Ok(s) => s.to_owned(),
85 Err(err) => panic!("{}", err),
86 };
87
88 if let Err(err) = pc
89 .add_ice_candidate(RTCIceCandidateInit {
90 candidate,
91 ..Default::default()
92 })
93 .await
94 {
95 panic!("{}", err);
96 }
97
98 let mut response = Response::new(Body::empty());
99 *response.status_mut() = StatusCode::OK;
100 Ok(response)
101 }
102
103 // A HTTP handler that processes a SessionDescription given to us from the other WebRTC-rs or Pion process
104 (&Method::POST, "/sdp") => {
105 //println!("remote_handler receive from /sdp");
106 let sdp_str = match std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
107 {
108 Ok(s) => s.to_owned(),
109 Err(err) => panic!("{}", err),
110 };
111 let sdp = match serde_json::from_str::<RTCSessionDescription>(&sdp_str) {
112 Ok(s) => s,
113 Err(err) => panic!("{}", err),
114 };
115
116 if let Err(err) = pc.set_remote_description(sdp).await {
117 panic!("{}", err);
118 }
119
120 {
121 let cs = PENDING_CANDIDATES.lock().await;
122 for c in &*cs {
123 if let Err(err) = signal_candidate(&addr, c).await {
124 panic!("{}", err);
125 }
126 }
127 }
128
129 let mut response = Response::new(Body::empty());
130 *response.status_mut() = StatusCode::OK;
131 Ok(response)
132 }
133 // Return the 404 Not Found for other routes.
134 _ => {
135 let mut not_found = Response::default();
136 *not_found.status_mut() = StatusCode::NOT_FOUND;
137 Ok(not_found)
138 }
139 }
140 }
141
142 #[tokio::main]
main() -> Result<()>143 async fn main() -> Result<()> {
144 let mut app = Command::new("Offer")
145 .version("0.1.0")
146 .author("Rain Liu <[email protected]>")
147 .about("An example of WebRTC-rs Offer.")
148 .setting(AppSettings::DeriveDisplayOrder)
149 .subcommand_negates_reqs(true)
150 .arg(
151 Arg::new("FULLHELP")
152 .help("Prints more detailed help information")
153 .long("fullhelp"),
154 )
155 .arg(
156 Arg::new("debug")
157 .long("debug")
158 .short('d')
159 .help("Prints debug log information"),
160 )
161 .arg(
162 Arg::new("offer-address")
163 .takes_value(true)
164 .default_value("0.0.0.0:50000")
165 .long("offer-address")
166 .help("Address that the Offer HTTP server is hosted on."),
167 )
168 .arg(
169 Arg::new("answer-address")
170 .takes_value(true)
171 .default_value("localhost:60000")
172 .long("answer-address")
173 .help("Address that the Answer HTTP server is hosted on."),
174 );
175
176 let matches = app.clone().get_matches();
177
178 if matches.is_present("FULLHELP") {
179 app.print_long_help().unwrap();
180 std::process::exit(0);
181 }
182
183 let debug = matches.is_present("debug");
184 if debug {
185 env_logger::Builder::new()
186 .format(|buf, record| {
187 writeln!(
188 buf,
189 "{}:{} [{}] {} - {}",
190 record.file().unwrap_or("unknown"),
191 record.line().unwrap_or(0),
192 record.level(),
193 chrono::Local::now().format("%H:%M:%S.%6f"),
194 record.args()
195 )
196 })
197 .filter(None, log::LevelFilter::Trace)
198 .init();
199 }
200
201 let offer_addr = matches.value_of("offer-address").unwrap().to_owned();
202 let answer_addr = matches.value_of("answer-address").unwrap().to_owned();
203
204 {
205 let mut oa = ADDRESS.lock().await;
206 *oa = answer_addr.clone();
207 }
208
209 // Prepare the configuration
210 let config = RTCConfiguration {
211 ice_servers: vec![RTCIceServer {
212 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
213 ..Default::default()
214 }],
215 ..Default::default()
216 };
217
218 // Create a MediaEngine object to configure the supported codec
219 let mut m = MediaEngine::default();
220 m.register_default_codecs()?;
221
222 let mut registry = Registry::new();
223
224 // Use the default set of Interceptors
225 registry = register_default_interceptors(registry, &mut m)?;
226
227 // Create the API object with the MediaEngine
228 let api = APIBuilder::new()
229 .with_media_engine(m)
230 .with_interceptor_registry(registry)
231 .build();
232
233 // Create a new RTCPeerConnection
234 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
235
236 // When an ICE candidate is available send to the other Pion instance
237 // the other Pion instance will add this candidate by calling AddICECandidate
238 let pc = Arc::downgrade(&peer_connection);
239 let pending_candidates2 = Arc::clone(&PENDING_CANDIDATES);
240 let addr2 = answer_addr.clone();
241 peer_connection.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
242 //println!("on_ice_candidate {:?}", c);
243
244 let pc2 = pc.clone();
245 let pending_candidates3 = Arc::clone(&pending_candidates2);
246 let addr3 = addr2.clone();
247 Box::pin(async move {
248 if let Some(c) = c {
249 if let Some(pc) = pc2.upgrade() {
250 let desc = pc.remote_description().await;
251 if desc.is_none() {
252 let mut cs = pending_candidates3.lock().await;
253 cs.push(c);
254 } else if let Err(err) = signal_candidate(&addr3, &c).await {
255 panic!("{}", err);
256 }
257 }
258 }
259 })
260 }));
261
262 println!("Listening on http://{offer_addr}");
263 {
264 let mut pcm = PEER_CONNECTION_MUTEX.lock().await;
265 *pcm = Some(Arc::clone(&peer_connection));
266 }
267
268 tokio::spawn(async move {
269 let addr = SocketAddr::from_str(&offer_addr).unwrap();
270 let service =
271 make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(remote_handler)) });
272 let server = Server::bind(&addr).serve(service);
273 // Run this server for... forever!
274 if let Err(e) = server.await {
275 eprintln!("server error: {e}");
276 }
277 });
278
279 // Create a datachannel with label 'data'
280 let data_channel = peer_connection.create_data_channel("data", None).await?;
281
282 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
283
284 // Set the handler for Peer connection state
285 // This will notify you when the peer has connected/disconnected
286 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
287 println!("Peer Connection State has changed: {s}");
288
289 if s == RTCPeerConnectionState::Failed {
290 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
291 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
292 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
293 println!("Peer Connection has gone to failed exiting");
294 let _ = done_tx.try_send(());
295 }
296
297 Box::pin(async {})
298 }));
299
300 // Register channel opening handling
301 let d1 = Arc::clone(&data_channel);
302 data_channel.on_open(Box::new(move || {
303 println!("Data channel '{}'-'{}' open. Random messages will now be sent to any connected DataChannels every 5 seconds", d1.label(), d1.id());
304
305 let d2 = Arc::clone(&d1);
306 Box::pin(async move {
307 let mut result = Result::<usize>::Ok(0);
308 while result.is_ok() {
309 let timeout = tokio::time::sleep(Duration::from_secs(5));
310 tokio::pin!(timeout);
311
312 tokio::select! {
313 _ = timeout.as_mut() =>{
314 let message = math_rand_alpha(15);
315 println!("Sending '{message}'");
316 result = d2.send_text(message).await.map_err(Into::into);
317 }
318 };
319 }
320 })
321 }));
322
323 // Register text message handling
324 let d_label = data_channel.label().to_owned();
325 data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
326 let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
327 println!("Message from DataChannel '{d_label}': '{msg_str}'");
328 Box::pin(async {})
329 }));
330
331 // Create an offer to send to the other process
332 let offer = peer_connection.create_offer(None).await?;
333
334 // Send our offer to the HTTP server listening in the other process
335 let payload = match serde_json::to_string(&offer) {
336 Ok(p) => p,
337 Err(err) => panic!("{}", err),
338 };
339
340 // Sets the LocalDescription, and starts our UDP listeners
341 // Note: this will start the gathering of ICE candidates
342 peer_connection.set_local_description(offer).await?;
343
344 //println!("Post: {}", format!("http://{}/sdp", answer_addr));
345 let req = match Request::builder()
346 .method(Method::POST)
347 .uri(format!("http://{answer_addr}/sdp"))
348 .header("content-type", "application/json; charset=utf-8")
349 .body(Body::from(payload))
350 {
351 Ok(req) => req,
352 Err(err) => panic!("{}", err),
353 };
354
355 let _resp = match Client::new().request(req).await {
356 Ok(resp) => resp,
357 Err(err) => {
358 println!("{err}");
359 return Err(err.into());
360 }
361 };
362 //println!("Response: {}", resp.status());
363
364 println!("Press ctrl-c to stop");
365 tokio::select! {
366 _ = done_rx.recv() => {
367 println!("received done signal!");
368 }
369 _ = tokio::signal::ctrl_c() => {
370 println!();
371 }
372 };
373
374 peer_connection.close().await?;
375
376 Ok(())
377 }
378