1 use anyhow::Result;
2 use clap::{AppSettings, Arg, Command};
3 use std::fs::File;
4 use std::io::BufReader;
5 use std::io::Write;
6 use std::path::Path;
7 use std::sync::Arc;
8 use tokio::sync::Notify;
9 use tokio::time::Duration;
10 use webrtc::api::interceptor_registry::register_default_interceptors;
11 use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8};
12 use webrtc::api::APIBuilder;
13 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
14 use webrtc::ice_transport::ice_server::RTCIceServer;
15 use webrtc::interceptor::registry::Registry;
16 use webrtc::media::io::ivf_reader::IVFReader;
17 use webrtc::media::Sample;
18 use webrtc::peer_connection::configuration::RTCConfiguration;
19 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
20 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
21 use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
22 use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
23 use webrtc::track::track_local::TrackLocal;
24 use webrtc::Error;
25
26 const CIPHER_KEY: u8 = 0xAA;
27
28 #[tokio::main]
main() -> Result<()>29 async fn main() -> Result<()> {
30 let mut app = Command::new("insertable-streams")
31 .version("0.1.0")
32 .author("Rain Liu <[email protected]>")
33 .about("An example of insertable-streams.")
34 .setting(AppSettings::DeriveDisplayOrder)
35 .subcommand_negates_reqs(true)
36 .arg(
37 Arg::new("FULLHELP")
38 .help("Prints more detailed help information")
39 .long("fullhelp"),
40 )
41 .arg(
42 Arg::new("debug")
43 .long("debug")
44 .short('d')
45 .help("Prints debug log information"),
46 )
47 .arg(
48 Arg::new("video")
49 .required_unless_present("FULLHELP")
50 .takes_value(true)
51 .short('v')
52 .long("video")
53 .help("Video file to be streaming."),
54 );
55
56 let matches = app.clone().get_matches();
57
58 if matches.is_present("FULLHELP") {
59 app.print_long_help().unwrap();
60 std::process::exit(0);
61 }
62
63 let debug = matches.is_present("debug");
64 if debug {
65 env_logger::Builder::new()
66 .format(|buf, record| {
67 writeln!(
68 buf,
69 "{}:{} [{}] {} - {}",
70 record.file().unwrap_or("unknown"),
71 record.line().unwrap_or(0),
72 record.level(),
73 chrono::Local::now().format("%H:%M:%S.%6f"),
74 record.args()
75 )
76 })
77 .filter(None, log::LevelFilter::Trace)
78 .init();
79 }
80
81 let video_file = matches.value_of("video").unwrap();
82 if !Path::new(video_file).exists() {
83 return Err(Error::new(format!("video file: '{video_file}' not exist")).into());
84 }
85
86 // Everything below is the WebRTC-rs API! Thanks for using it ❤️.
87
88 // Create a MediaEngine object to configure the supported codec
89 let mut m = MediaEngine::default();
90
91 m.register_default_codecs()?;
92
93 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
94 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
95 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
96 // for each PeerConnection.
97 let mut registry = Registry::new();
98
99 // Use the default set of Interceptors
100 registry = register_default_interceptors(registry, &mut m)?;
101
102 // Create the API object with the MediaEngine
103 let api = APIBuilder::new()
104 .with_media_engine(m)
105 .with_interceptor_registry(registry)
106 .build();
107
108 // Prepare the configuration
109 let config = RTCConfiguration {
110 ice_servers: vec![RTCIceServer {
111 urls: vec!["stun:stun.l.google.com:19302".to_owned()],
112 ..Default::default()
113 }],
114 ..Default::default()
115 };
116
117 // Create a new RTCPeerConnection
118 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
119
120 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
121 let video_done_tx = done_tx.clone();
122
123 // Create a video track
124 let video_track = Arc::new(TrackLocalStaticSample::new(
125 RTCRtpCodecCapability {
126 mime_type: MIME_TYPE_VP8.to_owned(),
127 ..Default::default()
128 },
129 "video".to_owned(),
130 "webrtc-rs".to_owned(),
131 ));
132
133 // Add this newly created track to the PeerConnection
134 let rtp_sender = peer_connection
135 .add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
136 .await?;
137
138 // Read incoming RTCP packets
139 // Before these packets are returned they are processed by interceptors. For things
140 // like NACK this needs to be called.
141 tokio::spawn(async move {
142 let mut rtcp_buf = vec![0u8; 1500];
143 while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
144 Result::<()>::Ok(())
145 });
146
147 let notify_tx = Arc::new(Notify::new());
148 let notify_video = notify_tx.clone();
149
150 let video_file_name = video_file.to_owned();
151 tokio::spawn(async move {
152 // Open a IVF file and start reading using our IVFReader
153 let file = File::open(video_file_name)?;
154 let reader = BufReader::new(file);
155 let (mut ivf, header) = IVFReader::new(reader)?;
156
157 // Wait for connection established
158 notify_video.notified().await;
159
160 println!("play video from disk file output.ivf");
161
162 // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
163 // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
164 let sleep_time = Duration::from_millis(
165 ((1000 * header.timebase_numerator) / header.timebase_denominator) as u64,
166 );
167 loop {
168 let mut frame = match ivf.parse_next_frame() {
169 Ok((frame, _)) => frame,
170 Err(err) => {
171 println!("All video frames parsed and sent: {err}");
172 break;
173 }
174 };
175
176 // Encrypt video using XOR Cipher
177 for b in &mut frame[..] {
178 *b ^= CIPHER_KEY;
179 }
180
181 tokio::time::sleep(sleep_time).await;
182
183 video_track
184 .write_sample(&Sample {
185 data: frame.freeze(),
186 duration: Duration::from_secs(1),
187 ..Default::default()
188 })
189 .await?;
190 }
191
192 let _ = video_done_tx.try_send(());
193
194 Result::<()>::Ok(())
195 });
196
197 // Set the handler for ICE connection state
198 // This will notify you when the peer has connected/disconnected
199 peer_connection.on_ice_connection_state_change(Box::new(
200 move |connection_state: RTCIceConnectionState| {
201 println!("Connection State has changed {connection_state}");
202 if connection_state == RTCIceConnectionState::Connected {
203 notify_tx.notify_waiters();
204 }
205 Box::pin(async {})
206 },
207 ));
208
209 // Set the handler for Peer connection state
210 // This will notify you when the peer has connected/disconnected
211 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
212 println!("Peer Connection State has changed: {s}");
213
214 if s == RTCPeerConnectionState::Failed {
215 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
216 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
217 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
218 println!("Peer Connection has gone to failed exiting");
219 let _ = done_tx.try_send(());
220 }
221
222 Box::pin(async {})
223 }));
224
225 // Wait for the offer to be pasted
226 let line = signal::must_read_stdin()?;
227 let desc_data = signal::decode(line.as_str())?;
228 let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
229
230 // Set the remote SessionDescription
231 peer_connection.set_remote_description(offer).await?;
232
233 // Create an answer
234 let answer = peer_connection.create_answer(None).await?;
235
236 // Create channel that is blocked until ICE Gathering is complete
237 let mut gather_complete = peer_connection.gathering_complete_promise().await;
238
239 // Sets the LocalDescription, and starts our UDP listeners
240 peer_connection.set_local_description(answer).await?;
241
242 // Block until ICE Gathering is complete, disabling trickle ICE
243 // we do this because we only can exchange one signaling message
244 // in a production application you should exchange ICE Candidates via OnICECandidate
245 let _ = gather_complete.recv().await;
246
247 // Output the answer in base64 so we can paste it in browser
248 if let Some(local_desc) = peer_connection.local_description().await {
249 let json_str = serde_json::to_string(&local_desc)?;
250 let b64 = signal::encode(&json_str);
251 println!("{b64}");
252 } else {
253 println!("generate local_description failed!");
254 }
255
256 println!("Press ctrl-c to stop");
257 tokio::select! {
258 _ = done_rx.recv() => {
259 println!("received done signal!");
260 }
261 _ = tokio::signal::ctrl_c() => {
262 println!();
263 }
264 };
265
266 peer_connection.close().await?;
267
268 Ok(())
269 }
270