1 use anyhow::Result; 2 use clap::{AppSettings, Arg, Command}; 3 use std::fs::File; 4 use std::io::BufReader; 5 use std::io::Write; 6 use std::path::Path; 7 use std::sync::Arc; 8 use tokio::sync::Notify; 9 use tokio::time::Duration; 10 use webrtc::api::interceptor_registry::register_default_interceptors; 11 use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_VP8}; 12 use webrtc::api::APIBuilder; 13 use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState; 14 use webrtc::ice_transport::ice_server::RTCIceServer; 15 use webrtc::interceptor::registry::Registry; 16 use webrtc::media::io::ivf_reader::IVFReader; 17 use webrtc::media::Sample; 18 use webrtc::peer_connection::configuration::RTCConfiguration; 19 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; 20 use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; 21 use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; 22 use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample; 23 use webrtc::track::track_local::TrackLocal; 24 use webrtc::Error; 25 26 const CIPHER_KEY: u8 = 0xAA; 27 28 #[tokio::main] 29 async fn main() -> Result<()> { 30 let mut app = Command::new("insertable-streams") 31 .version("0.1.0") 32 .author("Rain Liu <[email protected]>") 33 .about("An example of insertable-streams.") 34 .setting(AppSettings::DeriveDisplayOrder) 35 .subcommand_negates_reqs(true) 36 .arg( 37 Arg::new("FULLHELP") 38 .help("Prints more detailed help information") 39 .long("fullhelp"), 40 ) 41 .arg( 42 Arg::new("debug") 43 .long("debug") 44 .short('d') 45 .help("Prints debug log information"), 46 ) 47 .arg( 48 Arg::new("video") 49 .required_unless_present("FULLHELP") 50 .takes_value(true) 51 .short('v') 52 .long("video") 53 .help("Video file to be streaming."), 54 ); 55 56 let matches = app.clone().get_matches(); 57 58 if matches.is_present("FULLHELP") { 59 app.print_long_help().unwrap(); 60 std::process::exit(0); 61 } 62 63 let debug = matches.is_present("debug"); 64 if debug { 65 env_logger::Builder::new() 66 .format(|buf, record| { 67 writeln!( 68 buf, 69 "{}:{} [{}] {} - {}", 70 record.file().unwrap_or("unknown"), 71 record.line().unwrap_or(0), 72 record.level(), 73 chrono::Local::now().format("%H:%M:%S.%6f"), 74 record.args() 75 ) 76 }) 77 .filter(None, log::LevelFilter::Trace) 78 .init(); 79 } 80 81 let video_file = matches.value_of("video").unwrap(); 82 if !Path::new(video_file).exists() { 83 return Err(Error::new(format!("video file: '{}' not exist", video_file)).into()); 84 } 85 86 // Everything below is the WebRTC-rs API! Thanks for using it ❤️. 87 88 // Create a MediaEngine object to configure the supported codec 89 let mut m = MediaEngine::default(); 90 91 m.register_default_codecs()?; 92 93 // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. 94 // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` 95 // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry 96 // for each PeerConnection. 97 let mut registry = Registry::new(); 98 99 // Use the default set of Interceptors 100 registry = register_default_interceptors(registry, &mut m)?; 101 102 // Create the API object with the MediaEngine 103 let api = APIBuilder::new() 104 .with_media_engine(m) 105 .with_interceptor_registry(registry) 106 .build(); 107 108 // Prepare the configuration 109 let config = RTCConfiguration { 110 ice_servers: vec![RTCIceServer { 111 urls: vec!["stun:stun.l.google.com:19302".to_owned()], 112 ..Default::default() 113 }], 114 ..Default::default() 115 }; 116 117 // Create a new RTCPeerConnection 118 let peer_connection = Arc::new(api.new_peer_connection(config).await?); 119 120 let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); 121 let video_done_tx = done_tx.clone(); 122 123 // Create a video track 124 let video_track = Arc::new(TrackLocalStaticSample::new( 125 RTCRtpCodecCapability { 126 mime_type: MIME_TYPE_VP8.to_owned(), 127 ..Default::default() 128 }, 129 "video".to_owned(), 130 "webrtc-rs".to_owned(), 131 )); 132 133 // Add this newly created track to the PeerConnection 134 let rtp_sender = peer_connection 135 .add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>) 136 .await?; 137 138 // Read incoming RTCP packets 139 // Before these packets are returned they are processed by interceptors. For things 140 // like NACK this needs to be called. 141 tokio::spawn(async move { 142 let mut rtcp_buf = vec![0u8; 1500]; 143 while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} 144 Result::<()>::Ok(()) 145 }); 146 147 let notify_tx = Arc::new(Notify::new()); 148 let notify_video = notify_tx.clone(); 149 150 let video_file_name = video_file.to_owned(); 151 tokio::spawn(async move { 152 // Open a IVF file and start reading using our IVFReader 153 let file = File::open(video_file_name)?; 154 let reader = BufReader::new(file); 155 let (mut ivf, header) = IVFReader::new(reader)?; 156 157 // Wait for connection established 158 notify_video.notified().await; 159 160 println!("play video from disk file output.ivf"); 161 162 // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. 163 // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. 164 let sleep_time = Duration::from_millis( 165 ((1000 * header.timebase_numerator) / header.timebase_denominator) as u64, 166 ); 167 loop { 168 let mut frame = match ivf.parse_next_frame() { 169 Ok((frame, _)) => frame, 170 Err(err) => { 171 println!("All video frames parsed and sent: {}", err); 172 break; 173 } 174 }; 175 176 // Encrypt video using XOR Cipher 177 for b in &mut frame[..] { 178 *b ^= CIPHER_KEY; 179 } 180 181 tokio::time::sleep(sleep_time).await; 182 183 video_track 184 .write_sample(&Sample { 185 data: frame.freeze(), 186 duration: Duration::from_secs(1), 187 ..Default::default() 188 }) 189 .await?; 190 } 191 192 let _ = video_done_tx.try_send(()); 193 194 Result::<()>::Ok(()) 195 }); 196 197 // Set the handler for ICE connection state 198 // This will notify you when the peer has connected/disconnected 199 peer_connection.on_ice_connection_state_change(Box::new( 200 move |connection_state: RTCIceConnectionState| { 201 println!("Connection State has changed {}", connection_state); 202 if connection_state == RTCIceConnectionState::Connected { 203 notify_tx.notify_waiters(); 204 } 205 Box::pin(async {}) 206 }, 207 )); 208 209 // Set the handler for Peer connection state 210 // This will notify you when the peer has connected/disconnected 211 peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { 212 println!("Peer Connection State has changed: {}", s); 213 214 if s == RTCPeerConnectionState::Failed { 215 // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. 216 // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. 217 // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. 218 println!("Peer Connection has gone to failed exiting"); 219 let _ = done_tx.try_send(()); 220 } 221 222 Box::pin(async {}) 223 })); 224 225 // Wait for the offer to be pasted 226 let line = signal::must_read_stdin()?; 227 let desc_data = signal::decode(line.as_str())?; 228 let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?; 229 230 // Set the remote SessionDescription 231 peer_connection.set_remote_description(offer).await?; 232 233 // Create an answer 234 let answer = peer_connection.create_answer(None).await?; 235 236 // Create channel that is blocked until ICE Gathering is complete 237 let mut gather_complete = peer_connection.gathering_complete_promise().await; 238 239 // Sets the LocalDescription, and starts our UDP listeners 240 peer_connection.set_local_description(answer).await?; 241 242 // Block until ICE Gathering is complete, disabling trickle ICE 243 // we do this because we only can exchange one signaling message 244 // in a production application you should exchange ICE Candidates via OnICECandidate 245 let _ = gather_complete.recv().await; 246 247 // Output the answer in base64 so we can paste it in browser 248 if let Some(local_desc) = peer_connection.local_description().await { 249 let json_str = serde_json::to_string(&local_desc)?; 250 let b64 = signal::encode(&json_str); 251 println!("{}", b64); 252 } else { 253 println!("generate local_description failed!"); 254 } 255 256 println!("Press ctrl-c to stop"); 257 tokio::select! { 258 _ = done_rx.recv() => { 259 println!("received done signal!"); 260 } 261 _ = tokio::signal::ctrl_c() => { 262 println!(); 263 } 264 }; 265 266 peer_connection.close().await?; 267 268 Ok(()) 269 } 270