xref: /xiu/protocol/webrtc/src/session/mod.rs (revision f3f517c7)
1 pub mod errors;
2 use streamhub::{
3     define::{
4         DataSender, InformationSender, NotifyInfo, PublishType, PublisherInfo, StreamHubEvent,
5         StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler,
6     },
7     errors::ChannelError,
8     statistics::StreamStatistics,
9     stream::StreamIdentifier,
10     utils::{RandomDigitCount, Uuid},
11 };
12 use tokio::sync::Mutex;
13 use tokio::sync::{broadcast, oneshot};
14 
15 use bytesio::bytesio::TNetIO;
16 use bytesio::bytesio::TcpIO;
17 use std::io::Read;
18 use std::{collections::HashMap, fs::File, sync::Arc};
19 use tokio::net::TcpStream;
20 
21 use super::http::define::http_method_name;
22 use super::http::parse_content_length;
23 use super::http::{HttpRequest, HttpResponse, Marshal, Unmarshal};
24 
25 use super::whep::handle_whep;
26 use super::whip::handle_whip;
27 use async_trait::async_trait;
28 
29 use bytes::BytesMut;
30 use bytesio::bytes_reader::BytesReader;
31 use bytesio::bytes_writer::AsyncBytesWriter;
32 use errors::SessionError;
33 use errors::SessionErrorValue;
34 use http::StatusCode;
35 use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
36 use webrtc::peer_connection::{sdp::session_description::RTCSessionDescription, RTCPeerConnection};
37 
38 pub struct WebRTCServerSession {
39     io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
40     reader: BytesReader,
41     writer: AsyncBytesWriter,
42 
43     event_sender: StreamHubEventSender,
44     stream_handler: Arc<WebRTCStreamHandler>,
45 
46     pub session_id: Option<Uuid>,
47     pub http_request_data: Option<HttpRequest>,
48     pub peer_connection: Option<Arc<RTCPeerConnection>>,
49 }
50 
51 impl WebRTCServerSession {
new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self52     pub fn new(stream: TcpStream, event_producer: StreamHubEventSender) -> Self {
53         let net_io: Box<dyn TNetIO + Send + Sync> = Box::new(TcpIO::new(stream));
54         let io = Arc::new(Mutex::new(net_io));
55 
56         Self {
57             io: io.clone(),
58             reader: BytesReader::new(BytesMut::default()),
59             writer: AsyncBytesWriter::new(io),
60             event_sender: event_producer,
61             stream_handler: Arc::new(WebRTCStreamHandler::new()),
62             session_id: None,
63             http_request_data: None,
64             peer_connection: None,
65         }
66     }
67 
close_peer_connection(&self) -> Result<(), SessionError>68     pub async fn close_peer_connection(&self) -> Result<(), SessionError> {
69         if let Some(pc) = &self.peer_connection {
70             pc.close().await?;
71         }
72         Ok(())
73     }
74 
run( &mut self, uuid_2_sessions: Arc<Mutex<HashMap<Uuid, Arc<Mutex<WebRTCServerSession>>>>>, ) -> Result<(), SessionError>75     pub async fn run(
76         &mut self,
77         uuid_2_sessions: Arc<Mutex<HashMap<Uuid, Arc<Mutex<WebRTCServerSession>>>>>,
78     ) -> Result<(), SessionError> {
79         while self.reader.len() < 4 {
80             let data = self.io.lock().await.read().await?;
81             self.reader.extend_from_slice(&data[..]);
82         }
83 
84         let mut remaining_data = self.reader.get_remaining_bytes();
85 
86         if let Some(content_length) = parse_content_length(std::str::from_utf8(&remaining_data)?) {
87             while remaining_data.len() < content_length as usize {
88                 log::trace!(
89                     "content_length: {} {}",
90                     content_length,
91                     remaining_data.len()
92                 );
93                 let data = self.io.lock().await.read().await?;
94                 self.reader.extend_from_slice(&data[..]);
95                 remaining_data = self.reader.get_remaining_bytes();
96             }
97         }
98 
99         let request_data = self.reader.extract_remaining_bytes();
100 
101         if let Some(http_request) = HttpRequest::unmarshal(std::str::from_utf8(&request_data)?) {
102             //POST /whip?app=live&stream=test HTTP/1.1
103             let eles: Vec<&str> = http_request.path.splitn(2, '/').collect();
104             let pars_map = &http_request.path_parameters_map;
105 
106             let request_method = http_request.method.as_str();
107             if request_method == http_method_name::GET {
108                 let response = match http_request.path.as_str() {
109                     "/" => Self::gen_file_response("./index.html"),
110                     "/whip.js" => Self::gen_file_response("./whip.js"),
111                     "/whep.js" => Self::gen_file_response("./whep.js"),
112                     _ => {
113                         log::warn!("the http get path: {} is not supported.", http_request.path);
114                         return Ok(());
115                     }
116                 };
117 
118                 self.send_response(&response).await?;
119                 return Ok(());
120             }
121 
122             if eles.len() < 2 || pars_map.get("app").is_none() || pars_map.get("stream").is_none() {
123                 log::error!(
124                     "WebRTCServerSession::run the http path is not correct: {}",
125                     http_request.path
126                 );
127 
128                 return Err(SessionError {
129                     value: errors::SessionErrorValue::HttpRequestPathError,
130                 });
131             }
132 
133             let t = eles[1];
134             let app_name = pars_map.get("app").unwrap().clone();
135             let stream_name = pars_map.get("stream").unwrap().clone();
136 
137             log::info!("1:{},2:{},3:{}", t, app_name, stream_name);
138 
139             match request_method {
140                 http_method_name::POST => {
141                     let sdp_data = if let Some(body) = http_request.body.as_ref() {
142                         body
143                     } else {
144                         return Err(SessionError {
145                             value: errors::SessionErrorValue::HttpRequestEmptySdp,
146                         });
147                     };
148                     self.session_id = Some(Uuid::new(RandomDigitCount::Zero));
149 
150                     let path = format!(
151                         "{}?{}&session_id={}",
152                         http_request.path,
153                         http_request.path_parameters.as_ref().unwrap(),
154                         self.session_id.unwrap()
155                     );
156                     let offer = RTCSessionDescription::offer(sdp_data.clone())?;
157 
158                     match t.to_lowercase().as_str() {
159                         "whip" => {
160                             self.publish_whip(app_name, stream_name, path, offer)
161                                 .await?;
162                         }
163                         "whep" => {
164                             self.subscribe_whep(app_name, stream_name, path, offer)
165                                 .await?;
166                         }
167                         _ => {
168                             log::error!(
169                                 "current path: {}, method: {}",
170                                 http_request.path,
171                                 t.to_lowercase()
172                             );
173                             return Err(SessionError {
174                                 value: errors::SessionErrorValue::HttpRequestNotSupported,
175                             });
176                         }
177                     }
178                 }
179                 http_method_name::OPTIONS => {}
180                 http_method_name::PATCH => {}
181                 http_method_name::DELETE => {
182                     if let Some(session_id) = pars_map.get("session_id") {
183                         if let Some(uuid) = Uuid::from_str2(session_id) {
184                             //stop the running session and delete it.
185                             let mut uuid_2_sessions_unlock = uuid_2_sessions.lock().await;
186                             if let Some(session) = uuid_2_sessions_unlock.get(&uuid) {
187                                 if let Err(err) = session.lock().await.close_peer_connection().await
188                                 {
189                                     log::error!("close peer connection failed: {}", err);
190                                 } else {
191                                     log::info!("close peer connection successfully.");
192                                 }
193                                 uuid_2_sessions_unlock.remove(&uuid);
194                             } else {
195                                 log::warn!("the session :{}  is not exited.", uuid);
196                             }
197                         }
198                     } else {
199                         log::error!(
200                             "the delete path does not contain session id: {}?{}",
201                             http_request.path,
202                             http_request.path_parameters.as_ref().unwrap()
203                         );
204                     }
205 
206                     match t.to_lowercase().as_str() {
207                         "whip" => {
208                             Self::unpublish_whip(
209                                 app_name,
210                                 stream_name,
211                                 self.get_publisher_info(),
212                                 self.event_sender.clone(),
213                             )?;
214                         }
215                         "whep" => {}
216                         _ => {
217                             log::error!(
218                                 "current path: {}, method: {}",
219                                 http_request.path,
220                                 t.to_lowercase()
221                             );
222                             return Err(SessionError {
223                                 value: errors::SessionErrorValue::HttpRequestNotSupported,
224                             });
225                         }
226                     }
227 
228                     let status_code = http::StatusCode::OK;
229                     let response = Self::gen_response(status_code);
230                     self.send_response(&response).await?;
231                 }
232                 _ => {
233                     log::warn!(
234                         "WebRTCServerSession::unsupported method name: {}",
235                         http_request.method
236                     );
237                 }
238             }
239 
240             self.http_request_data = Some(http_request);
241         }
242 
243         Ok(())
244     }
245 
publish_whip( &mut self, app_name: String, stream_name: String, path: String, offer: RTCSessionDescription, ) -> Result<(), SessionError>246     async fn publish_whip(
247         &mut self,
248         app_name: String,
249         stream_name: String,
250         path: String,
251         offer: RTCSessionDescription,
252     ) -> Result<(), SessionError> {
253         let (event_result_sender, event_result_receiver) = oneshot::channel();
254 
255         let publish_event = StreamHubEvent::Publish {
256             identifier: StreamIdentifier::WebRTC {
257                 app_name,
258                 stream_name,
259             },
260             result_sender: event_result_sender,
261             info: self.get_publisher_info(),
262             stream_handler: self.stream_handler.clone(),
263         };
264 
265         if self.event_sender.send(publish_event).is_err() {
266             return Err(SessionError {
267                 value: SessionErrorValue::StreamHubEventSendErr,
268             });
269         }
270 
271         let sender = event_result_receiver.await??.1.unwrap();
272 
273         let response = match handle_whip(offer, sender).await {
274             Ok((session_description, peer_connection)) => {
275                 self.peer_connection = Some(peer_connection);
276 
277                 let status_code = http::StatusCode::CREATED;
278                 let mut response = Self::gen_response(status_code);
279 
280                 response
281                     .headers
282                     .insert("Content-Type".to_string(), "application/sdp".to_string());
283                 response.headers.insert("Location".to_string(), path);
284                 response.body = Some(session_description.sdp);
285 
286                 response
287             }
288             Err(err) => {
289                 log::error!("handle whip err: {}", err);
290                 let status_code = http::StatusCode::SERVICE_UNAVAILABLE;
291                 Self::gen_response(status_code)
292             }
293         };
294 
295         self.send_response(&response).await
296     }
297 
unpublish_whip( app_name: String, stream_name: String, publish_info: PublisherInfo, sender: StreamHubEventSender, ) -> Result<(), SessionError>298     fn unpublish_whip(
299         app_name: String,
300         stream_name: String,
301         publish_info: PublisherInfo,
302         sender: StreamHubEventSender,
303     ) -> Result<(), SessionError> {
304         let unpublish_event = StreamHubEvent::UnPublish {
305             identifier: StreamIdentifier::WebRTC {
306                 app_name,
307                 stream_name,
308             },
309             info: publish_info,
310         };
311 
312         if sender.send(unpublish_event).is_err() {
313             return Err(SessionError {
314                 value: SessionErrorValue::StreamHubEventSendErr,
315             });
316         }
317 
318         Ok(())
319     }
320 
subscribe_whep( &mut self, app_name: String, stream_name: String, path: String, offer: RTCSessionDescription, ) -> Result<(), SessionError>321     async fn subscribe_whep(
322         &mut self,
323         app_name: String,
324         stream_name: String,
325         path: String,
326         offer: RTCSessionDescription,
327     ) -> Result<(), SessionError> {
328         let subscriber_info = self.get_subscriber_info();
329 
330         let (event_result_sender, event_result_receiver) = oneshot::channel();
331 
332         let subscribe_event = StreamHubEvent::Subscribe {
333             identifier: StreamIdentifier::WebRTC {
334                 app_name: app_name.clone(),
335                 stream_name: stream_name.clone(),
336             },
337             info: subscriber_info.clone(),
338             result_sender: event_result_sender,
339         };
340 
341         if self.event_sender.send(subscribe_event).is_err() {
342             return Err(SessionError {
343                 value: SessionErrorValue::StreamHubEventSendErr,
344             });
345         }
346 
347         let receiver = event_result_receiver.await??.packet_receiver.unwrap();
348 
349         let (pc_state_sender, mut pc_state_receiver) = broadcast::channel(1);
350 
351         let response = match handle_whep(offer, receiver, pc_state_sender).await {
352             Ok((session_description, peer_connection)) => {
353                 let pc_clone = peer_connection.clone();
354 
355                 let app_name_out = app_name.clone();
356                 let stream_name_out = stream_name.clone();
357                 let subscriber_info_out = subscriber_info.clone();
358                 let sender_out = self.event_sender.clone();
359 
360                 tokio::spawn(async move {
361                     loop {
362                         if let Ok(state) = pc_state_receiver.recv().await {
363                             log::info!("state: {}", state);
364                             match state {
365                                 RTCPeerConnectionState::Disconnected
366                                 | RTCPeerConnectionState::Failed => {
367                                     if let Err(err) = pc_clone.close().await {
368                                         log::error!("peer connection close error: {}", err);
369                                     }
370                                 }
371                                 RTCPeerConnectionState::Closed => {
372                                     if let Err(err) = Self::unsubscribe_whep(
373                                         app_name_out,
374                                         stream_name_out,
375                                         subscriber_info_out,
376                                         sender_out,
377                                     ) {
378                                         log::error!("unsubscribe whep error: {}", err);
379                                     }
380                                     break;
381                                 }
382                                 _ => {}
383                             }
384                         } else {
385                             log::info!("recv");
386                         }
387                     }
388                 });
389 
390                 self.peer_connection = Some(peer_connection);
391 
392                 let status_code = http::StatusCode::CREATED;
393                 let mut response = Self::gen_response(status_code);
394                 response
395                     .headers
396                     .insert("Content-Type".to_string(), "application/sdp".to_string());
397                 response.headers.insert("Location".to_string(), path);
398                 response.body = Some(session_description.sdp);
399                 log::info!("before whep 1");
400                 response
401             }
402             Err(err) => {
403                 log::error!("handle whep err: {}", err);
404                 let status_code = http::StatusCode::SERVICE_UNAVAILABLE;
405                 Self::gen_response(status_code)
406             }
407         };
408         self.send_response(&response).await
409     }
410 
unsubscribe_whep( app_name: String, stream_name: String, subscriber_info: SubscriberInfo, sender: StreamHubEventSender, ) -> Result<(), SessionError>411     fn unsubscribe_whep(
412         app_name: String,
413         stream_name: String,
414         subscriber_info: SubscriberInfo,
415         sender: StreamHubEventSender,
416     ) -> Result<(), SessionError> {
417         let unsubscribe_event = StreamHubEvent::UnSubscribe {
418             identifier: StreamIdentifier::WebRTC {
419                 app_name,
420                 stream_name,
421             },
422             info: subscriber_info,
423         };
424 
425         if sender.send(unsubscribe_event).is_err() {
426             return Err(SessionError {
427                 value: SessionErrorValue::StreamHubEventSendErr,
428             });
429         }
430         Ok(())
431     }
432 
get_subscriber_info(&self) -> SubscriberInfo433     fn get_subscriber_info(&self) -> SubscriberInfo {
434         let id = if let Some(session_id) = &self.session_id {
435             *session_id
436         } else {
437             Uuid::new(RandomDigitCount::Zero)
438         };
439 
440         SubscriberInfo {
441             id,
442             sub_type: SubscribeType::PlayerWebrtc,
443             sub_data_type: streamhub::define::SubDataType::Packet,
444             notify_info: NotifyInfo {
445                 request_url: String::from(""),
446                 remote_addr: String::from(""),
447             },
448         }
449     }
450 
get_publisher_info(&self) -> PublisherInfo451     fn get_publisher_info(&self) -> PublisherInfo {
452         let id = if let Some(session_id) = &self.session_id {
453             *session_id
454         } else {
455             Uuid::new(RandomDigitCount::Zero)
456         };
457 
458         PublisherInfo {
459             id,
460             pub_type: PublishType::PushWebRTC,
461             pub_data_type: streamhub::define::PubDataType::Packet,
462             notify_info: NotifyInfo {
463                 request_url: String::from(""),
464                 remote_addr: String::from(""),
465             },
466         }
467     }
468 
gen_response(status_code: StatusCode) -> HttpResponse469     fn gen_response(status_code: StatusCode) -> HttpResponse {
470         let reason_phrase = if let Some(reason) = status_code.canonical_reason() {
471             reason.to_string()
472         } else {
473             "".to_string()
474         };
475 
476         HttpResponse {
477             version: "HTTP/1.1".to_string(),
478             status_code: status_code.as_u16(),
479             reason_phrase,
480             ..Default::default()
481         }
482     }
483 
gen_file_response(file_path: &str) -> HttpResponse484     fn gen_file_response(file_path: &str) -> HttpResponse {
485         let mut response = Self::gen_response(http::StatusCode::OK);
486 
487         let mut file = File::open(file_path).expect("Failed to open file");
488         let mut contents = Vec::new();
489         file.read_to_end(&mut contents)
490             .expect("Failed to read file");
491 
492         let contents_str = String::from_utf8_lossy(&contents).to_string();
493 
494         response
495             .headers
496             .insert("Content-Type".to_string(), "text/html".to_string());
497         response.body = Some(contents_str);
498 
499         response
500     }
501 
send_response(&mut self, response: &HttpResponse) -> Result<(), SessionError>502     async fn send_response(&mut self, response: &HttpResponse) -> Result<(), SessionError> {
503         self.writer.write(response.marshal().as_bytes())?;
504         self.writer.flush().await?;
505         Ok(())
506     }
507 }
508 
509 #[derive(Default)]
510 pub struct WebRTCStreamHandler {}
511 
512 impl WebRTCStreamHandler {
new() -> Self513     pub fn new() -> Self {
514         Self {}
515     }
516 }
517 
518 #[async_trait]
519 impl TStreamHandler for WebRTCStreamHandler {
send_prior_data( &self, _sender: DataSender, _sub_type: SubscribeType, ) -> Result<(), ChannelError>520     async fn send_prior_data(
521         &self,
522         _sender: DataSender,
523         _sub_type: SubscribeType,
524     ) -> Result<(), ChannelError> {
525         Ok(())
526     }
get_statistic_data(&self) -> Option<StreamStatistics>527     async fn get_statistic_data(&self) -> Option<StreamStatistics> {
528         None
529     }
530 
send_information(&self, _sender: InformationSender)531     async fn send_information(&self, _sender: InformationSender) {}
532 }
533