1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use std::io::Write;
4 use std::sync::Arc;
5 use tokio::time::Duration;
6 use webrtc::api::interceptor_registry::register_default_interceptors;
7 use webrtc::api::media_engine::MediaEngine;
8 use webrtc::api::APIBuilder;
9 use webrtc::ice_transport::ice_server::RTCIceServer;
10 use webrtc::interceptor::registry::Registry;
11 use webrtc::peer_connection::configuration::RTCConfiguration;
12 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
13 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
14 use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
15 use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
16 use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
17 use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
18 use webrtc::Error;
19
20 #[tokio::main]
main() -> Result<()>21 async fn main() -> Result<()> {
22 let mut app = Command::new("broadcast")
23 .version("0.1.0")
24 .author("Rain Liu <[email protected]>")
25 .about("An example of broadcast.")
26 .setting(AppSettings::DeriveDisplayOrder)
27 .subcommand_negates_reqs(true)
28 .arg(
29 Arg::new("FULLHELP")
30 .help("Prints more detailed help information")
31 .long("fullhelp"),
32 )
33 .arg(
34 Arg::new("debug")
35 .long("debug")
36 .short('d')
37 .help("Prints debug log information"),
38 )
39 .arg(
40 Arg::new("port")
41 .takes_value(true)
42 .default_value("8080")
43 .long("port")
44 .help("http server port."),
45 );
46
47 let matches = app.clone().get_matches();
48
49 if matches.is_present("FULLHELP") {
50 app.print_long_help().unwrap();
51 std::process::exit(0);
52 }
53
54 let debug = matches.is_present("debug");
55 if debug {
56 env_logger::Builder::new()
57 .format(|buf, record| {
58 writeln!(
59 buf,
60 "{}:{} [{}] {} - {}",
61 record.file().unwrap_or("unknown"),
62 record.line().unwrap_or(0),
63 record.level(),
64 chrono::Local::now().format("%H:%M:%S.%6f"),
65 record.args()
66 )
67 })
68 .filter(None, log::LevelFilter::Trace)
69 .init();
70 }
71
72 let port = matches.value_of("port").unwrap().parse::<u16>()?;
73 let mut sdp_chan_rx = signal::http_sdp_server(port).await;
74
75 // Wait for the offer
76 println!("wait for the offer from http_sdp_server\n");
77 let line = sdp_chan_rx.recv().await.unwrap();
78 let desc_data = signal::decode(line.as_str())?;
79 let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
80 //println!("Receive offer from http_sdp_server:\n{:?}", offer);
81
82 // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
83
84 // Create a MediaEngine object to configure the supported codec
85 let mut m = MediaEngine::default();
86
87 m.register_default_codecs()?;
88
89 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
90 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
91 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
92 // for each PeerConnection.
93 let mut registry = Registry::new();
94
95 // Use the default set of Interceptors
96 registry = register_default_interceptors(registry, &mut m)?;
97
98 // Create the API object with the MediaEngine
99 let api = APIBuilder::new()
100 .with_media_engine(m)
101 .with_interceptor_registry(registry)
102 .build();
103
104 // Prepare the configuration
105 let config = RTCConfiguration {
106 ice_servers: vec![RTCIceServer {
107 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
108 ..Default::default()
109 }],
110 ..Default::default()
111 };
112
113 // Create a new RTCPeerConnection
114 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
115
116 // Allow us to receive 1 video track
117 peer_connection
118 .add_transceiver_from_kind(RTPCodecType::Video, None)
119 .await?;
120
121 let (local_track_chan_tx, mut local_track_chan_rx) =
122 tokio::sync::mpsc::channel::<Arc<TrackLocalStaticRTP>>(1);
123
124 let local_track_chan_tx = Arc::new(local_track_chan_tx);
125 // Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
126 // replaces the SSRC and sends them back
127 let pc = Arc::downgrade(&peer_connection);
128 peer_connection.on_track(Box::new(move |track, _, _| {
129 // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
130 // This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
131 let media_ssrc = track.ssrc();
132 let pc2 = pc.clone();
133 tokio::spawn(async move {
134 let mut result = Result::<usize>::Ok(0);
135 while result.is_ok() {
136 let timeout = tokio::time::sleep(Duration::from_secs(3));
137 tokio::pin!(timeout);
138
139 tokio::select! {
140 _ = timeout.as_mut() =>{
141 if let Some(pc) = pc2.upgrade(){
142 result = pc.write_rtcp(&[Box::new(PictureLossIndication{
143 sender_ssrc: 0,
144 media_ssrc,
145 })]).await.map_err(Into::into);
146 }else{
147 break;
148 }
149 }
150 };
151 }
152 });
153
154 let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
155 tokio::spawn(async move {
156 // Create Track that we send video back to browser on
157 let local_track = Arc::new(TrackLocalStaticRTP::new(
158 track.codec().capability,
159 "video".to_owned(),
160 "webrtc-rs".to_owned(),
161 ));
162 let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
163
164 // Read RTP packets being sent to webrtc-rs
165 while let Ok((rtp, _)) = track.read_rtp().await {
166 if let Err(err) = local_track.write_rtp(&rtp).await {
167 if Error::ErrClosedPipe != err {
168 print!("output track write_rtp got error: {err} and break");
169 break;
170 } else {
171 print!("output track write_rtp got error: {err}");
172 }
173 }
174 }
175 });
176
177 Box::pin(async {})
178 }));
179
180 // Set the handler for Peer connection state
181 // This will notify you when the peer has connected/disconnected
182 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
183 println!("Peer Connection State has changed: {s}");
184 Box::pin(async {})
185 }));
186
187 // Set the remote SessionDescription
188 peer_connection.set_remote_description(offer).await?;
189
190 // Create an answer
191 let answer = peer_connection.create_answer(None).await?;
192
193 // Create channel that is blocked until ICE Gathering is complete
194 let mut gather_complete = peer_connection.gathering_complete_promise().await;
195
196 // Sets the LocalDescription, and starts our UDP listeners
197 peer_connection.set_local_description(answer).await?;
198
199 // Block until ICE Gathering is complete, disabling trickle ICE
200 // we do this because we only can exchange one signaling message
201 // in a production application you should exchange ICE Candidates via OnICECandidate
202 let _ = gather_complete.recv().await;
203
204 // Output the answer in base64 so we can paste it in browser
205 if let Some(local_desc) = peer_connection.local_description().await {
206 let json_str = serde_json::to_string(&local_desc)?;
207 let b64 = signal::encode(&json_str);
208 println!("{b64}");
209 } else {
210 println!("generate local_description failed!");
211 }
212
213 if let Some(local_track) = local_track_chan_rx.recv().await {
214 loop {
215 println!("\nCurl an base64 SDP to start sendonly peer connection");
216
217 let line = sdp_chan_rx.recv().await.unwrap();
218 let desc_data = signal::decode(line.as_str())?;
219 let recv_only_offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
220
221 // Create a MediaEngine object to configure the supported codec
222 let mut m = MediaEngine::default();
223
224 m.register_default_codecs()?;
225
226 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
227 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
228 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
229 // for each PeerConnection.
230 let mut registry = Registry::new();
231
232 // Use the default set of Interceptors
233 registry = register_default_interceptors(registry, &mut m)?;
234
235 // Create the API object with the MediaEngine
236 let api = APIBuilder::new()
237 .with_media_engine(m)
238 .with_interceptor_registry(registry)
239 .build();
240
241 // Prepare the configuration
242 let config = RTCConfiguration {
243 ice_servers: vec![RTCIceServer {
244 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
245 ..Default::default()
246 }],
247 ..Default::default()
248 };
249
250 // Create a new RTCPeerConnection
251 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
252
253 let rtp_sender = peer_connection
254 .add_track(Arc::clone(&local_track) as Arc<dyn TrackLocal + Send + Sync>)
255 .await?;
256
257 // Read incoming RTCP packets
258 // Before these packets are returned they are processed by interceptors. For things
259 // like NACK this needs to be called.
260 tokio::spawn(async move {
261 let mut rtcp_buf = vec![0u8; 1500];
262 while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
263 Result::<()>::Ok(())
264 });
265
266 // Set the handler for Peer connection state
267 // This will notify you when the peer has connected/disconnected
268 peer_connection.on_peer_connection_state_change(Box::new(
269 move |s: RTCPeerConnectionState| {
270 println!("Peer Connection State has changed: {s}");
271 Box::pin(async {})
272 },
273 ));
274
275 // Set the remote SessionDescription
276 peer_connection
277 .set_remote_description(recv_only_offer)
278 .await?;
279
280 // Create an answer
281 let answer = peer_connection.create_answer(None).await?;
282
283 // Create channel that is blocked until ICE Gathering is complete
284 let mut gather_complete = peer_connection.gathering_complete_promise().await;
285
286 // Sets the LocalDescription, and starts our UDP listeners
287 peer_connection.set_local_description(answer).await?;
288
289 // Block until ICE Gathering is complete, disabling trickle ICE
290 // we do this because we only can exchange one signaling message
291 // in a production application you should exchange ICE Candidates via OnICECandidate
292 let _ = gather_complete.recv().await;
293
294 if let Some(local_desc) = peer_connection.local_description().await {
295 let json_str = serde_json::to_string(&local_desc)?;
296 let b64 = signal::encode(&json_str);
297 println!("{b64}");
298 } else {
299 println!("generate local_description failed!");
300 }
301 }
302 }
303
304 Ok(())
305 }
306