1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use std::fs::File;
4 use std::io::BufReader;
5 use std::io::Write;
6 use std::path::Path;
7 use std::sync::Arc;
8 use tokio::sync::Notify;
9 use tokio::time::Duration;
10 use webrtc::api::interceptor_registry::register_default_interceptors;
11 use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8};
12 use webrtc::api::APIBuilder;
13 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
14 use webrtc::ice_transport::ice_server::RTCIceServer;
15 use webrtc::interceptor::registry::Registry;
16 use webrtc::media::io::ivf_reader::IVFReader;
17 use webrtc::media::Sample;
18 use webrtc::peer_connection::configuration::RTCConfiguration;
19 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
20 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
21 use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
22 use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
23 use webrtc::track::track_local::TrackLocal;
24 use webrtc::Error;
25 
26 const CIPHER_KEY: u8 = 0xAA;
27 
28 #[tokio::main]
29 async fn main() -> Result<()> {
30     let mut app = Command::new("insertable-streams")
31         .version("0.1.0")
32         .author("Rain Liu <[email protected]>")
33         .about("An example of insertable-streams.")
34         .setting(AppSettings::DeriveDisplayOrder)
35         .subcommand_negates_reqs(true)
36         .arg(
37             Arg::new("FULLHELP")
38                 .help("Prints more detailed help information")
39                 .long("fullhelp"),
40         )
41         .arg(
42             Arg::new("debug")
43                 .long("debug")
44                 .short('d')
45                 .help("Prints debug log information"),
46         )
47         .arg(
48             Arg::new("video")
49                 .required_unless_present("FULLHELP")
50                 .takes_value(true)
51                 .short('v')
52                 .long("video")
53                 .help("Video file to be streaming."),
54         );
55 
56     let matches = app.clone().get_matches();
57 
58     if matches.is_present("FULLHELP") {
59         app.print_long_help().unwrap();
60         std::process::exit(0);
61     }
62 
63     let debug = matches.is_present("debug");
64     if debug {
65         env_logger::Builder::new()
66             .format(|buf, record| {
67                 writeln!(
68                     buf,
69                     "{}:{} [{}] {} - {}",
70                     record.file().unwrap_or("unknown"),
71                     record.line().unwrap_or(0),
72                     record.level(),
73                     chrono::Local::now().format("%H:%M:%S.%6f"),
74                     record.args()
75                 )
76             })
77             .filter(None, log::LevelFilter::Trace)
78             .init();
79     }
80 
81     let video_file = matches.value_of("video").unwrap();
82     if !Path::new(video_file).exists() {
83         return Err(Error::new(format!("video file: '{}' not exist", video_file)).into());
84     }
85 
86     // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
87 
88     // Create a MediaEngine object to configure the supported codec
89     let mut m = MediaEngine::default();
90 
91     m.register_default_codecs()?;
92 
93     // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
94     // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
95     // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
96     // for each PeerConnection.
97     let mut registry = Registry::new();
98 
99     // Use the default set of Interceptors
100     registry = register_default_interceptors(registry, &mut m)?;
101 
102     // Create the API object with the MediaEngine
103     let api = APIBuilder::new()
104         .with_media_engine(m)
105         .with_interceptor_registry(registry)
106         .build();
107 
108     // Prepare the configuration
109     let config = RTCConfiguration {
110         ice_servers: vec![RTCIceServer {
111             urls: vec!["stun:stun.l.google.com:19302".to_owned()],
112             ..Default::default()
113         }],
114         ..Default::default()
115     };
116 
117     // Create a new RTCPeerConnection
118     let peer_connection = Arc::new(api.new_peer_connection(config).await?);
119 
120     let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
121     let video_done_tx = done_tx.clone();
122 
123     // Create a video track
124     let video_track = Arc::new(TrackLocalStaticSample::new(
125         RTCRtpCodecCapability {
126             mime_type: MIME_TYPE_VP8.to_owned(),
127             ..Default::default()
128         },
129         "video".to_owned(),
130         "webrtc-rs".to_owned(),
131     ));
132 
133     // Add this newly created track to the PeerConnection
134     let rtp_sender = peer_connection
135         .add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
136         .await?;
137 
138     // Read incoming RTCP packets
139     // Before these packets are returned they are processed by interceptors. For things
140     // like NACK this needs to be called.
141     tokio::spawn(async move {
142         let mut rtcp_buf = vec![0u8; 1500];
143         while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
144         Result::<()>::Ok(())
145     });
146 
147     let notify_tx = Arc::new(Notify::new());
148     let notify_video = notify_tx.clone();
149 
150     let video_file_name = video_file.to_owned();
151     tokio::spawn(async move {
152         // Open a IVF file and start reading using our IVFReader
153         let file = File::open(video_file_name)?;
154         let reader = BufReader::new(file);
155         let (mut ivf, header) = IVFReader::new(reader)?;
156 
157         // Wait for connection established
158         notify_video.notified().await;
159 
160         println!("play video from disk file output.ivf");
161 
162         // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
163         // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
164         let sleep_time = Duration::from_millis(
165             ((1000 * header.timebase_numerator) / header.timebase_denominator) as u64,
166         );
167         loop {
168             let mut frame = match ivf.parse_next_frame() {
169                 Ok((frame, _)) => frame,
170                 Err(err) => {
171                     println!("All video frames parsed and sent: {}", err);
172                     break;
173                 }
174             };
175 
176             // Encrypt video using XOR Cipher
177             for b in &mut frame[..] {
178                 *b ^= CIPHER_KEY;
179             }
180 
181             tokio::time::sleep(sleep_time).await;
182 
183             video_track
184                 .write_sample(&Sample {
185                     data: frame.freeze(),
186                     duration: Duration::from_secs(1),
187                     ..Default::default()
188                 })
189                 .await?;
190         }
191 
192         let _ = video_done_tx.try_send(());
193 
194         Result::<()>::Ok(())
195     });
196 
197     // Set the handler for ICE connection state
198     // This will notify you when the peer has connected/disconnected
199     peer_connection.on_ice_connection_state_change(Box::new(
200         move |connection_state: RTCIceConnectionState| {
201             println!("Connection State has changed {}", connection_state);
202             if connection_state == RTCIceConnectionState::Connected {
203                 notify_tx.notify_waiters();
204             }
205             Box::pin(async {})
206         },
207     ));
208 
209     // Set the handler for Peer connection state
210     // This will notify you when the peer has connected/disconnected
211     peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
212         println!("Peer Connection State has changed: {}", s);
213 
214         if s == RTCPeerConnectionState::Failed {
215             // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
216             // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
217             // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
218             println!("Peer Connection has gone to failed exiting");
219             let _ = done_tx.try_send(());
220         }
221 
222         Box::pin(async {})
223     }));
224 
225     // Wait for the offer to be pasted
226     let line = signal::must_read_stdin()?;
227     let desc_data = signal::decode(line.as_str())?;
228     let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
229 
230     // Set the remote SessionDescription
231     peer_connection.set_remote_description(offer).await?;
232 
233     // Create an answer
234     let answer = peer_connection.create_answer(None).await?;
235 
236     // Create channel that is blocked until ICE Gathering is complete
237     let mut gather_complete = peer_connection.gathering_complete_promise().await;
238 
239     // Sets the LocalDescription, and starts our UDP listeners
240     peer_connection.set_local_description(answer).await?;
241 
242     // Block until ICE Gathering is complete, disabling trickle ICE
243     // we do this because we only can exchange one signaling message
244     // in a production application you should exchange ICE Candidates via OnICECandidate
245     let _ = gather_complete.recv().await;
246 
247     // Output the answer in base64 so we can paste it in browser
248     if let Some(local_desc) = peer_connection.local_description().await {
249         let json_str = serde_json::to_string(&local_desc)?;
250         let b64 = signal::encode(&json_str);
251         println!("{}", b64);
252     } else {
253         println!("generate local_description failed!");
254     }
255 
256     println!("Press ctrl-c to stop");
257     tokio::select! {
258         _ = done_rx.recv() => {
259             println!("received done signal!");
260         }
261         _ = tokio::signal::ctrl_c() => {
262             println!();
263         }
264     };
265 
266     peer_connection.close().await?;
267 
268     Ok(())
269 }
270