1 pub mod errors; 2 use streamhub::{ 3 define::{ 4 DataSender, InformationSender, NotifyInfo, PublishType, PublisherInfo, StreamHubEvent, 5 StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, 6 }, 7 errors::ChannelError, 8 statistics::StreamStatistics, 9 stream::StreamIdentifier, 10 utils::{RandomDigitCount, Uuid}, 11 }; 12 use tokio::sync::Mutex; 13 use tokio::sync::{broadcast, oneshot}; 14 15 use bytesio::bytesio::TNetIO; 16 use bytesio::bytesio::TcpIO; 17 use std::io::Read; 18 use std::{collections::HashMap, fs::File, sync::Arc}; 19 use tokio::net::TcpStream; 20 21 use super::http::define::http_method_name; 22 use super::http::parse_content_length; 23 use super::http::{HttpRequest, HttpResponse, Marshal, Unmarshal}; 24 25 use super::whep::handle_whep; 26 use super::whip::handle_whip; 27 use async_trait::async_trait; 28 29 use bytes::BytesMut; 30 use bytesio::bytes_reader::BytesReader; 31 use bytesio::bytes_writer::AsyncBytesWriter; 32 use errors::SessionError; 33 use errors::SessionErrorValue; 34 use http::StatusCode; 35 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; 36 use webrtc::peer_connection::{sdp::session_description::RTCSessionDescription, RTCPeerConnection}; 37 38 pub struct WebRTCServerSession { 39 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>, 40 reader: BytesReader, 41 writer: AsyncBytesWriter, 42 43 event_sender: StreamHubEventSender, 44 stream_handler: Arc<WebRTCStreamHandler>, 45 46 pub session_id: Option<Uuid>, 47 pub http_request_data: Option<HttpRequest>, 48 pub peer_connection: Option<Arc<RTCPeerConnection>>, 49 } 50 51 impl WebRTCServerSession { new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self52 pub fn new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self { 53 let net_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream)); 54 let io = Arc::new(Mutex::new(net_io)); 55 56 Self { 57 io: io.clone(), 58 reader: BytesReader::new(BytesMut::default()), 59 writer: AsyncBytesWriter::new(io), 60 event_sender: event_producer, 61 stream_handler: Arc::new(WebRTCStreamHandler::new()), 62 session_id: None, 63 http_request_data: None, 64 peer_connection: None, 65 } 66 } 67 close_peer_connection(&self) -> Result<(), SessionError>68 pub async fn close_peer_connection(&self) -> Result<(), SessionError> { 69 if let Some(pc) = &self.peer_connection { 70 pc.close().await?; 71 } 72 Ok(()) 73 } 74 run( &mut self, uuid_2_sessions: Arc<Mutex<HashMap<Uuid, Arc<Mutex<WebRTCServerSession>>>>>, ) -> Result<(), SessionError>75 pub async fn run( 76 &mut self, 77 uuid_2_sessions: Arc<Mutex<HashMap<Uuid, Arc<Mutex<WebRTCServerSession>>>>>, 78 ) -> Result<(), SessionError> { 79 while self.reader.len() < 4 { 80 let data = self.io.lock().await.read().await?; 81 self.reader.extend_from_slice(&data[..]); 82 } 83 84 let mut remaining_data = self.reader.get_remaining_bytes(); 85 86 if let Some(content_length) = parse_content_length(std::str::from_utf8(&remaining_data)?) { 87 while remaining_data.len() < content_length as usize { 88 log::trace!( 89 "content_length: {} {}", 90 content_length, 91 remaining_data.len() 92 ); 93 let data = self.io.lock().await.read().await?; 94 self.reader.extend_from_slice(&data[..]); 95 remaining_data = self.reader.get_remaining_bytes(); 96 } 97 } 98 99 let request_data = self.reader.extract_remaining_bytes(); 100 101 if let Some(http_request) = HttpRequest::unmarshal(std::str::from_utf8(&request_data)?) { 102 //POST /whip?app=live&stream=test HTTP/1.1 103 let eles: Vec<&str> = http_request.path.splitn(2, '/').collect(); 104 let pars_map = &http_request.path_parameters_map; 105 106 let request_method = http_request.method.as_str(); 107 if request_method == http_method_name::GET { 108 let response = match http_request.path.as_str() { 109 "/" => Self::gen_file_response("./index.html"), 110 "/whip.js" => Self::gen_file_response("./whip.js"), 111 "/whep.js" => Self::gen_file_response("./whep.js"), 112 _ => { 113 log::warn!("the http get path: {} is not supported.", http_request.path); 114 return Ok(()); 115 } 116 }; 117 118 self.send_response(&response).await?; 119 return Ok(()); 120 } 121 122 if eles.len() < 2 || pars_map.get("app").is_none() || pars_map.get("stream").is_none() { 123 log::error!( 124 "WebRTCServerSession::run the http path is not correct: {}", 125 http_request.path 126 ); 127 128 return Err(SessionError { 129 value: errors::SessionErrorValue::HttpRequestPathError, 130 }); 131 } 132 133 let t = eles[1]; 134 let app_name = pars_map.get("app").unwrap().clone(); 135 let stream_name = pars_map.get("stream").unwrap().clone(); 136 137 log::info!("1:{},2:{},3:{}", t, app_name, stream_name); 138 139 match request_method { 140 http_method_name::POST => { 141 let sdp_data = if let Some(body) = http_request.body.as_ref() { 142 body 143 } else { 144 return Err(SessionError { 145 value: errors::SessionErrorValue::HttpRequestEmptySdp, 146 }); 147 }; 148 self.session_id = Some(Uuid::new(RandomDigitCount::Zero)); 149 150 let path = format!( 151 "{}?{}&session_id={}", 152 http_request.path, 153 http_request.path_parameters.as_ref().unwrap(), 154 self.session_id.unwrap() 155 ); 156 let offer = RTCSessionDescription::offer(sdp_data.clone())?; 157 158 match t.to_lowercase().as_str() { 159 "whip" => { 160 self.publish_whip(app_name, stream_name, path, offer) 161 .await?; 162 } 163 "whep" => { 164 self.subscribe_whep(app_name, stream_name, path, offer) 165 .await?; 166 } 167 _ => { 168 log::error!( 169 "current path: {}, method: {}", 170 http_request.path, 171 t.to_lowercase() 172 ); 173 return Err(SessionError { 174 value: errors::SessionErrorValue::HttpRequestNotSupported, 175 }); 176 } 177 } 178 } 179 http_method_name::OPTIONS => {} 180 http_method_name::PATCH => {} 181 http_method_name::DELETE => { 182 if let Some(session_id) = pars_map.get("session_id") { 183 if let Some(uuid) = Uuid::from_str2(session_id) { 184 //stop the running session and delete it. 185 let mut uuid_2_sessions_unlock = uuid_2_sessions.lock().await; 186 if let Some(session) = uuid_2_sessions_unlock.get(&uuid) { 187 if let Err(err) = session.lock().await.close_peer_connection().await 188 { 189 log::error!("close peer connection failed: {}", err); 190 } else { 191 log::info!("close peer connection successfully."); 192 } 193 uuid_2_sessions_unlock.remove(&uuid); 194 } else { 195 log::warn!("the session :{} is not exited.", uuid); 196 } 197 } 198 } else { 199 log::error!( 200 "the delete path does not contain session id: {}?{}", 201 http_request.path, 202 http_request.path_parameters.as_ref().unwrap() 203 ); 204 } 205 206 match t.to_lowercase().as_str() { 207 "whip" => { 208 Self::unpublish_whip( 209 app_name, 210 stream_name, 211 self.get_publisher_info(), 212 self.event_sender.clone(), 213 )?; 214 } 215 "whep" => {} 216 _ => { 217 log::error!( 218 "current path: {}, method: {}", 219 http_request.path, 220 t.to_lowercase() 221 ); 222 return Err(SessionError { 223 value: errors::SessionErrorValue::HttpRequestNotSupported, 224 }); 225 } 226 } 227 228 let status_code = http::StatusCode::OK; 229 let response = Self::gen_response(status_code); 230 self.send_response(&response).await?; 231 } 232 _ => { 233 log::warn!( 234 "WebRTCServerSession::unsupported method name: {}", 235 http_request.method 236 ); 237 } 238 } 239 240 self.http_request_data = Some(http_request); 241 } 242 243 Ok(()) 244 } 245 publish_whip( &mut self, app_name: String, stream_name: String, path: String, offer: RTCSessionDescription, ) -> Result<(), SessionError>246 async fn publish_whip( 247 &mut self, 248 app_name: String, 249 stream_name: String, 250 path: String, 251 offer: RTCSessionDescription, 252 ) -> Result<(), SessionError> { 253 let (event_result_sender, event_result_receiver) = oneshot::channel(); 254 255 let publish_event = StreamHubEvent::Publish { 256 identifier: StreamIdentifier::WebRTC { 257 app_name, 258 stream_name, 259 }, 260 result_sender: event_result_sender, 261 info: self.get_publisher_info(), 262 stream_handler: self.stream_handler.clone(), 263 }; 264 265 if self.event_sender.send(publish_event).is_err() { 266 return Err(SessionError { 267 value: SessionErrorValue::StreamHubEventSendErr, 268 }); 269 } 270 271 let sender = event_result_receiver.await??.1.unwrap(); 272 273 let response = match handle_whip(offer, sender).await { 274 Ok((session_description, peer_connection)) => { 275 self.peer_connection = Some(peer_connection); 276 277 let status_code = http::StatusCode::CREATED; 278 let mut response = Self::gen_response(status_code); 279 280 response 281 .headers 282 .insert("Content-Type".to_string(), "application/sdp".to_string()); 283 response.headers.insert("Location".to_string(), path); 284 response.body = Some(session_description.sdp); 285 286 response 287 } 288 Err(err) => { 289 log::error!("handle whip err: {}", err); 290 let status_code = http::StatusCode::SERVICE_UNAVAILABLE; 291 Self::gen_response(status_code) 292 } 293 }; 294 295 self.send_response(&response).await 296 } 297 unpublish_whip( app_name: String, stream_name: String, publish_info: PublisherInfo, sender: StreamHubEventSender, ) -> Result<(), SessionError>298 fn unpublish_whip( 299 app_name: String, 300 stream_name: String, 301 publish_info: PublisherInfo, 302 sender: StreamHubEventSender, 303 ) -> Result<(), SessionError> { 304 let unpublish_event = StreamHubEvent::UnPublish { 305 identifier: StreamIdentifier::WebRTC { 306 app_name, 307 stream_name, 308 }, 309 info: publish_info, 310 }; 311 312 if sender.send(unpublish_event).is_err() { 313 return Err(SessionError { 314 value: SessionErrorValue::StreamHubEventSendErr, 315 }); 316 } 317 318 Ok(()) 319 } 320 subscribe_whep( &mut self, app_name: String, stream_name: String, path: String, offer: RTCSessionDescription, ) -> Result<(), SessionError>321 async fn subscribe_whep( 322 &mut self, 323 app_name: String, 324 stream_name: String, 325 path: String, 326 offer: RTCSessionDescription, 327 ) -> Result<(), SessionError> { 328 let subscriber_info = self.get_subscriber_info(); 329 330 let (event_result_sender, event_result_receiver) = oneshot::channel(); 331 332 let subscribe_event = StreamHubEvent::Subscribe { 333 identifier: StreamIdentifier::WebRTC { 334 app_name: app_name.clone(), 335 stream_name: stream_name.clone(), 336 }, 337 info: subscriber_info.clone(), 338 result_sender: event_result_sender, 339 }; 340 341 if self.event_sender.send(subscribe_event).is_err() { 342 return Err(SessionError { 343 value: SessionErrorValue::StreamHubEventSendErr, 344 }); 345 } 346 347 let receiver = event_result_receiver.await??.packet_receiver.unwrap(); 348 349 let (pc_state_sender, mut pc_state_receiver) = broadcast::channel(1); 350 351 let response = match handle_whep(offer, receiver, pc_state_sender).await { 352 Ok((session_description, peer_connection)) => { 353 let pc_clone = peer_connection.clone(); 354 355 let app_name_out = app_name.clone(); 356 let stream_name_out = stream_name.clone(); 357 let subscriber_info_out = subscriber_info.clone(); 358 let sender_out = self.event_sender.clone(); 359 360 tokio::spawn(async move { 361 loop { 362 if let Ok(state) = pc_state_receiver.recv().await { 363 log::info!("state: {}", state); 364 match state { 365 RTCPeerConnectionState::Disconnected 366 | RTCPeerConnectionState::Failed => { 367 if let Err(err) = pc_clone.close().await { 368 log::error!("peer connection close error: {}", err); 369 } 370 } 371 RTCPeerConnectionState::Closed => { 372 if let Err(err) = Self::unsubscribe_whep( 373 app_name_out, 374 stream_name_out, 375 subscriber_info_out, 376 sender_out, 377 ) { 378 log::error!("unsubscribe whep error: {}", err); 379 } 380 break; 381 } 382 _ => {} 383 } 384 } else { 385 log::info!("recv"); 386 } 387 } 388 }); 389 390 self.peer_connection = Some(peer_connection); 391 392 let status_code = http::StatusCode::CREATED; 393 let mut response = Self::gen_response(status_code); 394 response 395 .headers 396 .insert("Content-Type".to_string(), "application/sdp".to_string()); 397 response.headers.insert("Location".to_string(), path); 398 response.body = Some(session_description.sdp); 399 log::info!("before whep 1"); 400 response 401 } 402 Err(err) => { 403 log::error!("handle whep err: {}", err); 404 let status_code = http::StatusCode::SERVICE_UNAVAILABLE; 405 Self::gen_response(status_code) 406 } 407 }; 408 self.send_response(&response).await 409 } 410 unsubscribe_whep( app_name: String, stream_name: String, subscriber_info: SubscriberInfo, sender: StreamHubEventSender, ) -> Result<(), SessionError>411 fn unsubscribe_whep( 412 app_name: String, 413 stream_name: String, 414 subscriber_info: SubscriberInfo, 415 sender: StreamHubEventSender, 416 ) -> Result<(), SessionError> { 417 let unsubscribe_event = StreamHubEvent::UnSubscribe { 418 identifier: StreamIdentifier::WebRTC { 419 app_name, 420 stream_name, 421 }, 422 info: subscriber_info, 423 }; 424 425 if sender.send(unsubscribe_event).is_err() { 426 return Err(SessionError { 427 value: SessionErrorValue::StreamHubEventSendErr, 428 }); 429 } 430 Ok(()) 431 } 432 get_subscriber_info(&self) -> SubscriberInfo433 fn get_subscriber_info(&self) -> SubscriberInfo { 434 let id = if let Some(session_id) = &self.session_id { 435 *session_id 436 } else { 437 Uuid::new(RandomDigitCount::Zero) 438 }; 439 440 SubscriberInfo { 441 id, 442 sub_type: SubscribeType::PlayerWebrtc, 443 sub_data_type: streamhub::define::SubDataType::Packet, 444 notify_info: NotifyInfo { 445 request_url: String::from(""), 446 remote_addr: String::from(""), 447 }, 448 } 449 } 450 get_publisher_info(&self) -> PublisherInfo451 fn get_publisher_info(&self) -> PublisherInfo { 452 let id = if let Some(session_id) = &self.session_id { 453 *session_id 454 } else { 455 Uuid::new(RandomDigitCount::Zero) 456 }; 457 458 PublisherInfo { 459 id, 460 pub_type: PublishType::PushWebRTC, 461 pub_data_type: streamhub::define::PubDataType::Packet, 462 notify_info: NotifyInfo { 463 request_url: String::from(""), 464 remote_addr: String::from(""), 465 }, 466 } 467 } 468 gen_response(status_code: StatusCode) -> HttpResponse469 fn gen_response(status_code: StatusCode) -> HttpResponse { 470 let reason_phrase = if let Some(reason) = status_code.canonical_reason() { 471 reason.to_string() 472 } else { 473 "".to_string() 474 }; 475 476 HttpResponse { 477 version: "HTTP/1.1".to_string(), 478 status_code: status_code.as_u16(), 479 reason_phrase, 480 ..Default::default() 481 } 482 } 483 gen_file_response(file_path: &str) -> HttpResponse484 fn gen_file_response(file_path: &str) -> HttpResponse { 485 let mut response = Self::gen_response(http::StatusCode::OK); 486 487 let mut file = File::open(file_path).expect("Failed to open file"); 488 let mut contents = Vec::new(); 489 file.read_to_end(&mut contents) 490 .expect("Failed to read file"); 491 492 let contents_str = String::from_utf8_lossy(&contents).to_string(); 493 494 response 495 .headers 496 .insert("Content-Type".to_string(), "text/html".to_string()); 497 response.body = Some(contents_str); 498 499 response 500 } 501 send_response(&mut self, response: &HttpResponse) -> Result<(), SessionError>502 async fn send_response(&mut self, response: &HttpResponse) -> Result<(), SessionError> { 503 self.writer.write(response.marshal().as_bytes())?; 504 self.writer.flush().await?; 505 Ok(()) 506 } 507 } 508 509 #[derive(Default)] 510 pub struct WebRTCStreamHandler {} 511 512 impl WebRTCStreamHandler { new() -> Self513 pub fn new() -> Self { 514 Self {} 515 } 516 } 517 518 #[async_trait] 519 impl TStreamHandler for WebRTCStreamHandler { send_prior_data( &self, _sender: DataSender, _sub_type: SubscribeType, ) -> Result<(), ChannelError>520 async fn send_prior_data( 521 &self, 522 _sender: DataSender, 523 _sub_type: SubscribeType, 524 ) -> Result<(), ChannelError> { 525 Ok(()) 526 } get_statistic_data(&self) -> Option<StreamStatistics>527 async fn get_statistic_data(&self) -> Option<StreamStatistics> { 528 None 529 } 530 send_information(&self, _sender: InformationSender)531 async fn send_information(&self, _sender: InformationSender) {} 532 } 533