1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use std::io::Write;
4 use std::sync::Arc;
5 use tokio::time::Duration;
6 use webrtc::api::interceptor_registry::register_default_interceptors;
7 use webrtc::api::media_engine::MediaEngine;
8 use webrtc::api::APIBuilder;
9 use webrtc::ice_transport::ice_server::RTCIceServer;
10 use webrtc::interceptor::registry::Registry;
11 use webrtc::peer_connection::configuration::RTCConfiguration;
12 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
13 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
14 use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication;
15 use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
16 use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
17 use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
18 use webrtc::Error;
19 
20 #[tokio::main]
main() -> Result<()>21 async fn main() -> Result<()> {
22     let mut app = Command::new("broadcast")
23         .version("0.1.0")
24         .author("Rain Liu <[email protected]>")
25         .about("An example of broadcast.")
26         .setting(AppSettings::DeriveDisplayOrder)
27         .subcommand_negates_reqs(true)
28         .arg(
29             Arg::new("FULLHELP")
30                 .help("Prints more detailed help information")
31                 .long("fullhelp"),
32         )
33         .arg(
34             Arg::new("debug")
35                 .long("debug")
36                 .short('d')
37                 .help("Prints debug log information"),
38         )
39         .arg(
40             Arg::new("port")
41                 .takes_value(true)
42                 .default_value("8080")
43                 .long("port")
44                 .help("http server port."),
45         );
46 
47     let matches = app.clone().get_matches();
48 
49     if matches.is_present("FULLHELP") {
50         app.print_long_help().unwrap();
51         std::process::exit(0);
52     }
53 
54     let debug = matches.is_present("debug");
55     if debug {
56         env_logger::Builder::new()
57             .format(|buf, record| {
58                 writeln!(
59                     buf,
60                     "{}:{} [{}] {} - {}",
61                     record.file().unwrap_or("unknown"),
62                     record.line().unwrap_or(0),
63                     record.level(),
64                     chrono::Local::now().format("%H:%M:%S.%6f"),
65                     record.args()
66                 )
67             })
68             .filter(None, log::LevelFilter::Trace)
69             .init();
70     }
71 
72     let port = matches.value_of("port").unwrap().parse::<u16>()?;
73     let mut sdp_chan_rx = signal::http_sdp_server(port).await;
74 
75     // Wait for the offer
76     println!("wait for the offer from http_sdp_server\n");
77     let line = sdp_chan_rx.recv().await.unwrap();
78     let desc_data = signal::decode(line.as_str())?;
79     let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
80     //println!("Receive offer from http_sdp_server:\n{:?}", offer);
81 
82     // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
83 
84     // Create a MediaEngine object to configure the supported codec
85     let mut m = MediaEngine::default();
86 
87     m.register_default_codecs()?;
88 
89     // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
90     // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
91     // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
92     // for each PeerConnection.
93     let mut registry = Registry::new();
94 
95     // Use the default set of Interceptors
96     registry = register_default_interceptors(registry, &mut m)?;
97 
98     // Create the API object with the MediaEngine
99     let api = APIBuilder::new()
100         .with_media_engine(m)
101         .with_interceptor_registry(registry)
102         .build();
103 
104     // Prepare the configuration
105     let config = RTCConfiguration {
106         ice_servers: vec![RTCIceServer {
107             urls: vec!["stun:stun.l.google.com:19302".to_owned()],
108             ..Default::default()
109         }],
110         ..Default::default()
111     };
112 
113     // Create a new RTCPeerConnection
114     let peer_connection = Arc::new(api.new_peer_connection(config).await?);
115 
116     // Allow us to receive 1 video track
117     peer_connection
118         .add_transceiver_from_kind(RTPCodecType::Video, None)
119         .await?;
120 
121     let (local_track_chan_tx, mut local_track_chan_rx) =
122         tokio::sync::mpsc::channel::<Arc<TrackLocalStaticRTP>>(1);
123 
124     let local_track_chan_tx = Arc::new(local_track_chan_tx);
125     // Set a handler for when a new remote track starts, this handler copies inbound RTP packets,
126     // replaces the SSRC and sends them back
127     let pc = Arc::downgrade(&peer_connection);
128     peer_connection.on_track(Box::new(move |track, _, _| {
129         // Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
130         // This is a temporary fix until we implement incoming RTCP events, then we would push a PLI only when a viewer requests it
131         let media_ssrc = track.ssrc();
132         let pc2 = pc.clone();
133         tokio::spawn(async move {
134             let mut result = Result::<usize>::Ok(0);
135             while result.is_ok() {
136                 let timeout = tokio::time::sleep(Duration::from_secs(3));
137                 tokio::pin!(timeout);
138 
139                 tokio::select! {
140                     _ = timeout.as_mut() =>{
141                         if let Some(pc) = pc2.upgrade(){
142                             result = pc.write_rtcp(&[Box::new(PictureLossIndication{
143                                 sender_ssrc: 0,
144                                 media_ssrc,
145                             })]).await.map_err(Into::into);
146                         }else{
147                             break;
148                         }
149                     }
150                 };
151             }
152         });
153 
154         let local_track_chan_tx2 = Arc::clone(&local_track_chan_tx);
155         tokio::spawn(async move {
156             // Create Track that we send video back to browser on
157             let local_track = Arc::new(TrackLocalStaticRTP::new(
158                 track.codec().capability,
159                 "video".to_owned(),
160                 "webrtc-rs".to_owned(),
161             ));
162             let _ = local_track_chan_tx2.send(Arc::clone(&local_track)).await;
163 
164             // Read RTP packets being sent to webrtc-rs
165             while let Ok((rtp, _)) = track.read_rtp().await {
166                 if let Err(err) = local_track.write_rtp(&rtp).await {
167                     if Error::ErrClosedPipe != err {
168                         print!("output track write_rtp got error: {err} and break");
169                         break;
170                     } else {
171                         print!("output track write_rtp got error: {err}");
172                     }
173                 }
174             }
175         });
176 
177         Box::pin(async {})
178     }));
179 
180     // Set the handler for Peer connection state
181     // This will notify you when the peer has connected/disconnected
182     peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
183         println!("Peer Connection State has changed: {s}");
184         Box::pin(async {})
185     }));
186 
187     // Set the remote SessionDescription
188     peer_connection.set_remote_description(offer).await?;
189 
190     // Create an answer
191     let answer = peer_connection.create_answer(None).await?;
192 
193     // Create channel that is blocked until ICE Gathering is complete
194     let mut gather_complete = peer_connection.gathering_complete_promise().await;
195 
196     // Sets the LocalDescription, and starts our UDP listeners
197     peer_connection.set_local_description(answer).await?;
198 
199     // Block until ICE Gathering is complete, disabling trickle ICE
200     // we do this because we only can exchange one signaling message
201     // in a production application you should exchange ICE Candidates via OnICECandidate
202     let _ = gather_complete.recv().await;
203 
204     // Output the answer in base64 so we can paste it in browser
205     if let Some(local_desc) = peer_connection.local_description().await {
206         let json_str = serde_json::to_string(&local_desc)?;
207         let b64 = signal::encode(&json_str);
208         println!("{b64}");
209     } else {
210         println!("generate local_description failed!");
211     }
212 
213     if let Some(local_track) = local_track_chan_rx.recv().await {
214         loop {
215             println!("\nCurl an base64 SDP to start sendonly peer connection");
216 
217             let line = sdp_chan_rx.recv().await.unwrap();
218             let desc_data = signal::decode(line.as_str())?;
219             let recv_only_offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
220 
221             // Create a MediaEngine object to configure the supported codec
222             let mut m = MediaEngine::default();
223 
224             m.register_default_codecs()?;
225 
226             // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
227             // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
228             // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
229             // for each PeerConnection.
230             let mut registry = Registry::new();
231 
232             // Use the default set of Interceptors
233             registry = register_default_interceptors(registry, &mut m)?;
234 
235             // Create the API object with the MediaEngine
236             let api = APIBuilder::new()
237                 .with_media_engine(m)
238                 .with_interceptor_registry(registry)
239                 .build();
240 
241             // Prepare the configuration
242             let config = RTCConfiguration {
243                 ice_servers: vec![RTCIceServer {
244                     urls: vec!["stun:stun.l.google.com:19302".to_owned()],
245                     ..Default::default()
246                 }],
247                 ..Default::default()
248             };
249 
250             // Create a new RTCPeerConnection
251             let peer_connection = Arc::new(api.new_peer_connection(config).await?);
252 
253             let rtp_sender = peer_connection
254                 .add_track(Arc::clone(&local_track) as Arc<dyn TrackLocal + Send + Sync>)
255                 .await?;
256 
257             // Read incoming RTCP packets
258             // Before these packets are returned they are processed by interceptors. For things
259             // like NACK this needs to be called.
260             tokio::spawn(async move {
261                 let mut rtcp_buf = vec![0u8; 1500];
262                 while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
263                 Result::<()>::Ok(())
264             });
265 
266             // Set the handler for Peer connection state
267             // This will notify you when the peer has connected/disconnected
268             peer_connection.on_peer_connection_state_change(Box::new(
269                 move |s: RTCPeerConnectionState| {
270                     println!("Peer Connection State has changed: {s}");
271                     Box::pin(async {})
272                 },
273             ));
274 
275             // Set the remote SessionDescription
276             peer_connection
277                 .set_remote_description(recv_only_offer)
278                 .await?;
279 
280             // Create an answer
281             let answer = peer_connection.create_answer(None).await?;
282 
283             // Create channel that is blocked until ICE Gathering is complete
284             let mut gather_complete = peer_connection.gathering_complete_promise().await;
285 
286             // Sets the LocalDescription, and starts our UDP listeners
287             peer_connection.set_local_description(answer).await?;
288 
289             // Block until ICE Gathering is complete, disabling trickle ICE
290             // we do this because we only can exchange one signaling message
291             // in a production application you should exchange ICE Candidates via OnICECandidate
292             let _ = gather_complete.recv().await;
293 
294             if let Some(local_desc) = peer_connection.local_description().await {
295                 let json_str = serde_json::to_string(&local_desc)?;
296                 let b64 = signal::encode(&json_str);
297                 println!("{b64}");
298             } else {
299                 println!("generate local_description failed!");
300             }
301         }
302     }
303 
304     Ok(())
305 }
306