xref: /xiu/protocol/rtmp/src/session/common.rs (revision a4ef5d6c)
1*a4ef5d6cSHarlanC use streamhub::define::DataSender;
2*a4ef5d6cSHarlanC use tokio::sync::oneshot;
380f20d70SHarlanC 
492df423eSHarlanC use {
592df423eSHarlanC     super::{
68e71d710SHarlan         define::SessionType,
792df423eSHarlanC         errors::{SessionError, SessionErrorValue},
892df423eSHarlanC     },
992df423eSHarlanC     crate::{
108e71d710SHarlan         cache::errors::CacheError,
118e71d710SHarlan         cache::Cache,
1292df423eSHarlanC         chunk::{
13b1840569SHarlanC             define::{chunk_type, csid_type},
1492df423eSHarlanC             packetizer::ChunkPacketizer,
1592df423eSHarlanC             ChunkInfo,
1692df423eSHarlanC         },
17b1840569SHarlanC         messages::define::msg_type_id,
1892df423eSHarlanC     },
198e71d710SHarlan     async_trait::async_trait,
2092df423eSHarlanC     bytes::BytesMut,
218e71d710SHarlan     std::fmt,
228e71d710SHarlan     std::{net::SocketAddr, sync::Arc},
238e71d710SHarlan     streamhub::{
248e71d710SHarlan         define::{
258e71d710SHarlan             FrameData, FrameDataReceiver, FrameDataSender, InformationSender, NotifyInfo,
268e71d710SHarlan             PublishType, PublisherInfo, StreamHubEvent, StreamHubEventSender, SubscribeType,
278e71d710SHarlan             SubscriberInfo, TStreamHandler,
2892df423eSHarlanC         },
298e71d710SHarlan         errors::{ChannelError, ChannelErrorValue},
308e71d710SHarlan         statistics::StreamStatistics,
318e71d710SHarlan         stream::StreamIdentifier,
328e71d710SHarlan         utils::Uuid,
338e71d710SHarlan     },
348e71d710SHarlan     tokio::sync::{mpsc, Mutex},
3592df423eSHarlanC };
36976f65a6SHarlan 
3792df423eSHarlanC pub struct Common {
3813bac29aSHarlan     //only Server Subscriber or Client Publisher needs to send out trunck data.
3913bac29aSHarlan     packetizer: Option<ChunkPacketizer>,
4092df423eSHarlanC 
418e71d710SHarlan     data_receiver: FrameDataReceiver,
428e71d710SHarlan     data_sender: FrameDataSender,
4392df423eSHarlanC 
448e71d710SHarlan     event_producer: StreamHubEventSender,
45740804e8SHarlanC     pub session_type: SessionType,
46976f65a6SHarlan 
47976f65a6SHarlan     /*save the client side socket connected to the SeverSession */
48976f65a6SHarlan     remote_addr: Option<SocketAddr>,
49976f65a6SHarlan     /*request URL from client*/
50976f65a6SHarlan     pub request_url: String,
518e71d710SHarlan     pub stream_handler: Arc<RtmpStreamHandler>,
5292df423eSHarlanC }
5392df423eSHarlanC 
5492df423eSHarlanC impl Common {
new( packetizer: Option<ChunkPacketizer>, event_producer: StreamHubEventSender, session_type: SessionType, remote_addr: Option<SocketAddr>, ) -> Self5516394c08SHarlanC     pub fn new(
5613bac29aSHarlan         packetizer: Option<ChunkPacketizer>,
578e71d710SHarlan         event_producer: StreamHubEventSender,
5816394c08SHarlanC         session_type: SessionType,
59976f65a6SHarlan         remote_addr: Option<SocketAddr>,
6016394c08SHarlanC     ) -> Self {
6192df423eSHarlanC         //only used for init,since I don't found a better way to deal with this.
6292df423eSHarlanC         let (init_producer, init_consumer) = mpsc::unbounded_channel();
6392df423eSHarlanC 
6492df423eSHarlanC         Self {
6513bac29aSHarlan             packetizer,
6692df423eSHarlanC 
678e71d710SHarlan             data_sender: init_producer,
688e71d710SHarlan             data_receiver: init_consumer,
6992df423eSHarlanC 
7092df423eSHarlanC             event_producer,
7116394c08SHarlanC             session_type,
72976f65a6SHarlan             remote_addr,
73976f65a6SHarlan             request_url: String::default(),
748e71d710SHarlan             stream_handler: Arc::new(RtmpStreamHandler::new()),
758e71d710SHarlan             //cache: None,
7692df423eSHarlanC         }
7792df423eSHarlanC     }
send_channel_data(&mut self) -> Result<(), SessionError>7892df423eSHarlanC     pub async fn send_channel_data(&mut self) -> Result<(), SessionError> {
795359bb99Swawacry         let mut retry_times = 0;
8092df423eSHarlanC         loop {
818e71d710SHarlan             if let Some(data) = self.data_receiver.recv().await {
8292df423eSHarlanC                 match data {
838e71d710SHarlan                     FrameData::Audio { timestamp, data } => {
8492df423eSHarlanC                         self.send_audio(data, timestamp).await?;
8592df423eSHarlanC                     }
868e71d710SHarlan                     FrameData::Video { timestamp, data } => {
8792df423eSHarlanC                         self.send_video(data, timestamp).await?;
8892df423eSHarlanC                     }
898e71d710SHarlan                     FrameData::MetaData { timestamp, data } => {
9087f493cdSHarlanC                         self.send_metadata(data, timestamp).await?;
9192df423eSHarlanC                     }
92b36cf5daSHarlan                     _ => {}
9392df423eSHarlanC                 }
945359bb99Swawacry             } else {
955359bb99Swawacry                 retry_times += 1;
965359bb99Swawacry                 log::debug!(
975359bb99Swawacry                     "send_channel_data: no data receives ,retry {} times!",
985359bb99Swawacry                     retry_times
995359bb99Swawacry                 );
1005359bb99Swawacry 
1015359bb99Swawacry                 if retry_times > 10 {
1025359bb99Swawacry                     return Err(SessionError {
1035359bb99Swawacry                         value: SessionErrorValue::NoMediaDataReceived,
1045359bb99Swawacry                     });
1055359bb99Swawacry                 }
10692df423eSHarlanC             }
10792df423eSHarlanC         }
10892df423eSHarlanC     }
10992df423eSHarlanC 
send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>11092df423eSHarlanC     pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
11192df423eSHarlanC         let mut chunk_info = ChunkInfo::new(
11292df423eSHarlanC             csid_type::AUDIO,
11392df423eSHarlanC             chunk_type::TYPE_0,
11492df423eSHarlanC             timestamp,
11592df423eSHarlanC             data.len() as u32,
11692df423eSHarlanC             msg_type_id::AUDIO,
11792df423eSHarlanC             0,
11892df423eSHarlanC             data,
11992df423eSHarlanC         );
12092df423eSHarlanC 
12113bac29aSHarlan         if let Some(packetizer) = &mut self.packetizer {
12213bac29aSHarlan             packetizer.write_chunk(&mut chunk_info).await?;
12313bac29aSHarlan         }
12492df423eSHarlanC 
12592df423eSHarlanC         Ok(())
12692df423eSHarlanC     }
12792df423eSHarlanC 
send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>12892df423eSHarlanC     pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> {
12992df423eSHarlanC         let mut chunk_info = ChunkInfo::new(
13092df423eSHarlanC             csid_type::VIDEO,
13192df423eSHarlanC             chunk_type::TYPE_0,
13292df423eSHarlanC             timestamp,
13392df423eSHarlanC             data.len() as u32,
13492df423eSHarlanC             msg_type_id::VIDEO,
13592df423eSHarlanC             0,
13692df423eSHarlanC             data,
13792df423eSHarlanC         );
13892df423eSHarlanC 
13913bac29aSHarlan         if let Some(packetizer) = &mut self.packetizer {
14013bac29aSHarlan             packetizer.write_chunk(&mut chunk_info).await?;
14113bac29aSHarlan         }
14292df423eSHarlanC 
14392df423eSHarlanC         Ok(())
14492df423eSHarlanC     }
14592df423eSHarlanC 
send_metadata( &mut self, data: BytesMut, timestamp: u32, ) -> Result<(), SessionError>14687f493cdSHarlanC     pub async fn send_metadata(
14787f493cdSHarlanC         &mut self,
14887f493cdSHarlanC         data: BytesMut,
14987f493cdSHarlanC         timestamp: u32,
15087f493cdSHarlanC     ) -> Result<(), SessionError> {
15192df423eSHarlanC         let mut chunk_info = ChunkInfo::new(
15292df423eSHarlanC             csid_type::DATA_AMF0_AMF3,
15392df423eSHarlanC             chunk_type::TYPE_0,
15487f493cdSHarlanC             timestamp,
15592df423eSHarlanC             data.len() as u32,
15692df423eSHarlanC             msg_type_id::DATA_AMF0,
15792df423eSHarlanC             0,
15892df423eSHarlanC             data,
15992df423eSHarlanC         );
16092df423eSHarlanC 
16113bac29aSHarlan         if let Some(packetizer) = &mut self.packetizer {
16213bac29aSHarlan             packetizer.write_chunk(&mut chunk_info).await?;
16313bac29aSHarlan         }
16413bac29aSHarlan 
16592df423eSHarlanC         Ok(())
16692df423eSHarlanC     }
16792df423eSHarlanC 
on_video_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>1688e71d710SHarlan     pub async fn on_video_data(
16992df423eSHarlanC         &mut self,
17092df423eSHarlanC         data: &mut BytesMut,
17192df423eSHarlanC         timestamp: &u32,
17292df423eSHarlanC     ) -> Result<(), SessionError> {
1738e71d710SHarlan         let channel_data = FrameData::Video {
17485c0af6aSLuca Barbato             timestamp: *timestamp,
17592df423eSHarlanC             data: data.clone(),
17692df423eSHarlanC         };
17792df423eSHarlanC 
1788e71d710SHarlan         match self.data_sender.send(channel_data) {
17992df423eSHarlanC             Ok(_) => {}
18092df423eSHarlanC             Err(err) => {
1815de1eabbSHarlanC                 log::error!("send video err: {}", err);
18292df423eSHarlanC                 return Err(SessionError {
1838e71d710SHarlan                     value: SessionErrorValue::SendFrameDataErr,
18492df423eSHarlanC                 });
18592df423eSHarlanC             }
18692df423eSHarlanC         }
18792df423eSHarlanC 
1888e71d710SHarlan         self.stream_handler
1898e71d710SHarlan             .save_video_data(data, *timestamp)
1908e71d710SHarlan             .await?;
1918e71d710SHarlan 
19292df423eSHarlanC         Ok(())
19392df423eSHarlanC     }
19492df423eSHarlanC 
on_audio_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>1958e71d710SHarlan     pub async fn on_audio_data(
19692df423eSHarlanC         &mut self,
19792df423eSHarlanC         data: &mut BytesMut,
19892df423eSHarlanC         timestamp: &u32,
19992df423eSHarlanC     ) -> Result<(), SessionError> {
2008e71d710SHarlan         let channel_data = FrameData::Audio {
20185c0af6aSLuca Barbato             timestamp: *timestamp,
20292df423eSHarlanC             data: data.clone(),
20392df423eSHarlanC         };
20492df423eSHarlanC 
2058e71d710SHarlan         match self.data_sender.send(channel_data) {
20692df423eSHarlanC             Ok(_) => {}
20792df423eSHarlanC             Err(err) => {
20869de9bbdSHarlanC                 log::error!("receive audio err {}", err);
20992df423eSHarlanC                 return Err(SessionError {
2108e71d710SHarlan                     value: SessionErrorValue::SendFrameDataErr,
21192df423eSHarlanC                 });
21292df423eSHarlanC             }
21392df423eSHarlanC         }
21492df423eSHarlanC 
2158e71d710SHarlan         self.stream_handler
2168e71d710SHarlan             .save_audio_data(data, *timestamp)
2178e71d710SHarlan             .await?;
2188e71d710SHarlan 
21992df423eSHarlanC         Ok(())
22092df423eSHarlanC     }
22192df423eSHarlanC 
on_meta_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>2228e71d710SHarlan     pub async fn on_meta_data(
22387f493cdSHarlanC         &mut self,
2248e71d710SHarlan         data: &mut BytesMut,
22587f493cdSHarlanC         timestamp: &u32,
22687f493cdSHarlanC     ) -> Result<(), SessionError> {
2278e71d710SHarlan         let channel_data = FrameData::MetaData {
22885c0af6aSLuca Barbato             timestamp: *timestamp,
2298e71d710SHarlan             data: data.clone(),
23087f493cdSHarlanC         };
23192df423eSHarlanC 
2328e71d710SHarlan         match self.data_sender.send(channel_data) {
23392df423eSHarlanC             Ok(_) => {}
23492df423eSHarlanC             Err(_) => {
23592df423eSHarlanC                 return Err(SessionError {
2368e71d710SHarlan                     value: SessionErrorValue::SendFrameDataErr,
23792df423eSHarlanC                 })
23892df423eSHarlanC             }
23992df423eSHarlanC         }
24092df423eSHarlanC 
2418e71d710SHarlan         self.stream_handler.save_metadata(data, *timestamp).await;
2428e71d710SHarlan 
24392df423eSHarlanC         Ok(())
24492df423eSHarlanC     }
24592df423eSHarlanC 
get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo246ccd9a1faSHarlan     fn get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo {
247976f65a6SHarlan         let remote_addr = if let Some(addr) = self.remote_addr {
248976f65a6SHarlan             addr.to_string()
249976f65a6SHarlan         } else {
250976f65a6SHarlan             String::from("unknown")
251976f65a6SHarlan         };
252976f65a6SHarlan 
2538e71d710SHarlan         let sub_type = match self.session_type {
2548e71d710SHarlan             SessionType::Client => SubscribeType::PublisherRtmp,
2558e71d710SHarlan             SessionType::Server => SubscribeType::PlayerRtmp,
2568e71d710SHarlan         };
2578e71d710SHarlan 
2588e71d710SHarlan         SubscriberInfo {
259ccd9a1faSHarlan             id: sub_id,
260976f65a6SHarlan             /*rtmp local client subscribe from local rtmp session
261976f65a6SHarlan             and publish(relay) the rtmp steam to remote RTMP server*/
2628e71d710SHarlan             sub_type,
263*a4ef5d6cSHarlanC             sub_data_type: streamhub::define::SubDataType::Frame,
264976f65a6SHarlan             notify_info: NotifyInfo {
265976f65a6SHarlan                 request_url: self.request_url.clone(),
266976f65a6SHarlan                 remote_addr,
267976f65a6SHarlan             },
26816394c08SHarlanC         }
26916394c08SHarlanC     }
27016394c08SHarlanC 
get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo271976f65a6SHarlan     fn get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo {
272976f65a6SHarlan         let remote_addr = if let Some(addr) = self.remote_addr {
273976f65a6SHarlan             addr.to_string()
274976f65a6SHarlan         } else {
275976f65a6SHarlan             String::from("unknown")
276976f65a6SHarlan         };
277976f65a6SHarlan 
2788e71d710SHarlan         let pub_type = match self.session_type {
2798e71d710SHarlan             SessionType::Client => PublishType::RelayRtmp,
2808e71d710SHarlan             SessionType::Server => PublishType::PushRtmp,
2818e71d710SHarlan         };
2828e71d710SHarlan 
2838e71d710SHarlan         PublisherInfo {
284976f65a6SHarlan             id: sub_id,
2858e71d710SHarlan             pub_type,
286*a4ef5d6cSHarlanC             pub_data_type: streamhub::define::PubDataType::Frame,
287976f65a6SHarlan             notify_info: NotifyInfo {
288976f65a6SHarlan                 request_url: self.request_url.clone(),
289976f65a6SHarlan                 remote_addr,
290976f65a6SHarlan             },
291976f65a6SHarlan         }
292976f65a6SHarlan     }
293976f65a6SHarlan 
294248cdac6SHarlan     /*Subscribe from local channels and then send data to retmote common player or local RTMP relay push client*/
subscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>29592df423eSHarlanC     pub async fn subscribe_from_channels(
29692df423eSHarlanC         &mut self,
29792df423eSHarlanC         app_name: String,
29892df423eSHarlanC         stream_name: String,
29988d91efdSHarlanC         sub_id: Uuid,
30092df423eSHarlanC     ) -> Result<(), SessionError> {
3015de1eabbSHarlanC         log::info!(
30288d91efdSHarlanC             "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}",
30392df423eSHarlanC             app_name,
304b36cf5daSHarlan             stream_name,
30588d91efdSHarlanC             sub_id
30692df423eSHarlanC         );
30792df423eSHarlanC 
3088e71d710SHarlan         let identifier = StreamIdentifier::Rtmp {
309b36cf5daSHarlan             app_name,
310b36cf5daSHarlan             stream_name,
3118e71d710SHarlan         };
3128e71d710SHarlan 
313*a4ef5d6cSHarlanC         let (event_result_sender, event_result_receiver) = oneshot::channel();
314*a4ef5d6cSHarlanC 
3158e71d710SHarlan         let subscribe_event = StreamHubEvent::Subscribe {
3168e71d710SHarlan             identifier,
317ccd9a1faSHarlan             info: self.get_subscriber_info(sub_id),
318*a4ef5d6cSHarlanC             result_sender: event_result_sender,
31992df423eSHarlanC         };
32092df423eSHarlanC         let rv = self.event_producer.send(subscribe_event);
3210ca99c20SHarlan 
3220ca99c20SHarlan         if rv.is_err() {
32392df423eSHarlanC             return Err(SessionError {
3248e71d710SHarlan                 value: SessionErrorValue::StreamHubEventSendErr,
3250ca99c20SHarlan             });
32692df423eSHarlanC         }
32792df423eSHarlanC 
328*a4ef5d6cSHarlanC         let recv = event_result_receiver.await??;
329*a4ef5d6cSHarlanC         self.data_receiver = recv.frame_receiver.unwrap();
330b1840569SHarlanC 
33192df423eSHarlanC         Ok(())
33292df423eSHarlanC     }
33392df423eSHarlanC 
unsubscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>33492df423eSHarlanC     pub async fn unsubscribe_from_channels(
33592df423eSHarlanC         &mut self,
33692df423eSHarlanC         app_name: String,
33792df423eSHarlanC         stream_name: String,
33888d91efdSHarlanC         sub_id: Uuid,
33992df423eSHarlanC     ) -> Result<(), SessionError> {
3408e71d710SHarlan         let identifier = StreamIdentifier::Rtmp {
34192df423eSHarlanC             app_name,
34292df423eSHarlanC             stream_name,
3438e71d710SHarlan         };
3448e71d710SHarlan 
3458e71d710SHarlan         let subscribe_event = StreamHubEvent::UnSubscribe {
3468e71d710SHarlan             identifier,
347ccd9a1faSHarlan             info: self.get_subscriber_info(sub_id),
34892df423eSHarlanC         };
34930c61c6eSHarlanC         if let Err(err) = self.event_producer.send(subscribe_event) {
35069de9bbdSHarlanC             log::error!("unsubscribe_from_channels err {}", err);
35130c61c6eSHarlanC         }
35292df423eSHarlanC 
35392df423eSHarlanC         Ok(())
35492df423eSHarlanC     }
35592df423eSHarlanC 
356976f65a6SHarlan     /*Begin to receive stream data from remote RTMP push client or local RTMP relay pull client*/
publish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, gop_num: usize, ) -> Result<(), SessionError>35792df423eSHarlanC     pub async fn publish_to_channels(
35892df423eSHarlanC         &mut self,
35992df423eSHarlanC         app_name: String,
36092df423eSHarlanC         stream_name: String,
361976f65a6SHarlan         pub_id: Uuid,
3628e71d710SHarlan         gop_num: usize,
36392df423eSHarlanC     ) -> Result<(), SessionError> {
3648e71d710SHarlan         self.stream_handler
3658e71d710SHarlan             .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num))
3668e71d710SHarlan             .await;
3678e71d710SHarlan 
368*a4ef5d6cSHarlanC         let (event_result_sender, event_result_receiver) = oneshot::channel();
3698e71d710SHarlan         let publish_event = StreamHubEvent::Publish {
3708e71d710SHarlan             identifier: StreamIdentifier::Rtmp {
37192df423eSHarlanC                 app_name,
37292df423eSHarlanC                 stream_name,
3738e71d710SHarlan             },
374976f65a6SHarlan             info: self.get_publisher_info(pub_id),
3758e71d710SHarlan             stream_handler: self.stream_handler.clone(),
376*a4ef5d6cSHarlanC             result_sender: event_result_sender,
37792df423eSHarlanC         };
37892df423eSHarlanC 
37913bac29aSHarlan         if self.event_producer.send(publish_event).is_err() {
38092df423eSHarlanC             return Err(SessionError {
3818e71d710SHarlan                 value: SessionErrorValue::StreamHubEventSendErr,
3820ca99c20SHarlan             });
38392df423eSHarlanC         }
38492df423eSHarlanC 
385*a4ef5d6cSHarlanC         let result = event_result_receiver.await??;
386*a4ef5d6cSHarlanC         self.data_sender = result.0.unwrap();
38792df423eSHarlanC         Ok(())
38892df423eSHarlanC     }
38992df423eSHarlanC 
unpublish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, ) -> Result<(), SessionError>39092df423eSHarlanC     pub async fn unpublish_to_channels(
39192df423eSHarlanC         &mut self,
39292df423eSHarlanC         app_name: String,
39392df423eSHarlanC         stream_name: String,
394976f65a6SHarlan         pub_id: Uuid,
39592df423eSHarlanC     ) -> Result<(), SessionError> {
396ccd9a1faSHarlan         log::info!(
397ccd9a1faSHarlan             "unpublish_to_channels, app_name:{}, stream_name:{}",
398ccd9a1faSHarlan             app_name,
399ccd9a1faSHarlan             stream_name
400ccd9a1faSHarlan         );
4018e71d710SHarlan         let unpublish_event = StreamHubEvent::UnPublish {
4028e71d710SHarlan             identifier: StreamIdentifier::Rtmp {
40388325f54SHarlanC                 app_name: app_name.clone(),
40488325f54SHarlanC                 stream_name: stream_name.clone(),
4058e71d710SHarlan             },
406976f65a6SHarlan             info: self.get_publisher_info(pub_id),
40792df423eSHarlanC         };
40892df423eSHarlanC 
40913bac29aSHarlan         match self.event_producer.send(unpublish_event) {
41092df423eSHarlanC             Err(_) => {
41188325f54SHarlanC                 log::error!(
41288325f54SHarlanC                     "unpublish_to_channels error.app_name: {}, stream_name: {}",
41388325f54SHarlanC                     app_name,
41488325f54SHarlanC                     stream_name
41588325f54SHarlanC                 );
41692df423eSHarlanC                 return Err(SessionError {
4178e71d710SHarlan                     value: SessionErrorValue::StreamHubEventSendErr,
41892df423eSHarlanC                 });
41992df423eSHarlanC             }
42092df423eSHarlanC             _ => {
42188325f54SHarlanC                 log::info!(
42288325f54SHarlanC                     "unpublish_to_channels successfully.app_name: {}, stream_name: {}",
42388325f54SHarlanC                     app_name,
42488325f54SHarlanC                     stream_name
42588325f54SHarlanC                 );
42692df423eSHarlanC             }
42792df423eSHarlanC         }
42892df423eSHarlanC         Ok(())
42992df423eSHarlanC     }
43092df423eSHarlanC }
4318e71d710SHarlan 
432b36cf5daSHarlan #[derive(Default)]
4338e71d710SHarlan pub struct RtmpStreamHandler {
4348e71d710SHarlan     /*cache is used to save RTMP sequence/gops/meta data
4358e71d710SHarlan     which needs to be send to client(player) */
4368e71d710SHarlan     /*The cache will be used in different threads(save
4378e71d710SHarlan     cache in one thread and send cache data to different clients
4388e71d710SHarlan     in other threads) */
4398e71d710SHarlan     pub cache: Mutex<Option<Cache>>,
4408e71d710SHarlan }
4418e71d710SHarlan 
4428e71d710SHarlan impl RtmpStreamHandler {
new() -> Self4438e71d710SHarlan     pub fn new() -> Self {
4448e71d710SHarlan         Self {
4458e71d710SHarlan             cache: Mutex::new(None),
4468e71d710SHarlan         }
4478e71d710SHarlan     }
4488e71d710SHarlan 
set_cache(&self, cache: Cache)4498e71d710SHarlan     pub async fn set_cache(&self, cache: Cache) {
4508e71d710SHarlan         *self.cache.lock().await = Some(cache);
4518e71d710SHarlan     }
4528e71d710SHarlan 
save_video_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>4538e71d710SHarlan     pub async fn save_video_data(
4548e71d710SHarlan         &self,
4558e71d710SHarlan         chunk_body: &BytesMut,
4568e71d710SHarlan         timestamp: u32,
4578e71d710SHarlan     ) -> Result<(), CacheError> {
4588e71d710SHarlan         if let Some(cache) = &mut *self.cache.lock().await {
4598e71d710SHarlan             cache.save_video_data(chunk_body, timestamp).await?;
4608e71d710SHarlan         }
4618e71d710SHarlan         Ok(())
4628e71d710SHarlan     }
4638e71d710SHarlan 
save_audio_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>4648e71d710SHarlan     pub async fn save_audio_data(
4658e71d710SHarlan         &self,
4668e71d710SHarlan         chunk_body: &BytesMut,
4678e71d710SHarlan         timestamp: u32,
4688e71d710SHarlan     ) -> Result<(), CacheError> {
4698e71d710SHarlan         if let Some(cache) = &mut *self.cache.lock().await {
4708e71d710SHarlan             cache.save_audio_data(chunk_body, timestamp).await?;
4718e71d710SHarlan         }
4728e71d710SHarlan         Ok(())
4738e71d710SHarlan     }
4748e71d710SHarlan 
save_metadata(&self, chunk_body: &BytesMut, timestamp: u32)4758e71d710SHarlan     pub async fn save_metadata(&self, chunk_body: &BytesMut, timestamp: u32) {
4768e71d710SHarlan         if let Some(cache) = &mut *self.cache.lock().await {
4778e71d710SHarlan             cache.save_metadata(chunk_body, timestamp);
4788e71d710SHarlan         }
4798e71d710SHarlan     }
4808e71d710SHarlan }
4818e71d710SHarlan 
4828e71d710SHarlan #[async_trait]
4838e71d710SHarlan impl TStreamHandler for RtmpStreamHandler {
send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>484b36cf5daSHarlan     async fn send_prior_data(
4858e71d710SHarlan         &self,
48680f20d70SHarlanC         data_sender: DataSender,
4878e71d710SHarlan         sub_type: SubscribeType,
4888e71d710SHarlan     ) -> Result<(), ChannelError> {
48980f20d70SHarlanC         let sender = match data_sender {
49080f20d70SHarlanC             DataSender::Frame { sender } => sender,
49180f20d70SHarlanC             DataSender::Packet { sender: _ } => {
49280f20d70SHarlanC                 return Err(ChannelError {
49380f20d70SHarlanC                     value: ChannelErrorValue::NotCorrectDataSenderType,
49480f20d70SHarlanC                 });
49580f20d70SHarlanC             }
49680f20d70SHarlanC         };
4978e71d710SHarlan         if let Some(cache) = &mut *self.cache.lock().await {
4988e71d710SHarlan             if let Some(meta_body_data) = cache.get_metadata() {
4998e71d710SHarlan                 sender.send(meta_body_data).map_err(|_| ChannelError {
5008e71d710SHarlan                     value: ChannelErrorValue::SendError,
5018e71d710SHarlan                 })?;
5028e71d710SHarlan             }
5038e71d710SHarlan             if let Some(audio_seq_data) = cache.get_audio_seq() {
5048e71d710SHarlan                 sender.send(audio_seq_data).map_err(|_| ChannelError {
5058e71d710SHarlan                     value: ChannelErrorValue::SendError,
5068e71d710SHarlan                 })?;
5078e71d710SHarlan             }
5088e71d710SHarlan             if let Some(video_seq_data) = cache.get_video_seq() {
5098e71d710SHarlan                 sender.send(video_seq_data).map_err(|_| ChannelError {
5108e71d710SHarlan                     value: ChannelErrorValue::SendError,
5118e71d710SHarlan                 })?;
5128e71d710SHarlan             }
5138e71d710SHarlan             match sub_type {
5148e71d710SHarlan                 SubscribeType::PlayerRtmp
5158e71d710SHarlan                 | SubscribeType::PlayerHttpFlv
5168e71d710SHarlan                 | SubscribeType::PlayerHls
5178e71d710SHarlan                 | SubscribeType::GenerateHls => {
5188e71d710SHarlan                     if let Some(gops_data) = cache.get_gops_data() {
5198e71d710SHarlan                         for gop in gops_data {
5208e71d710SHarlan                             for channel_data in gop.get_frame_data() {
5218e71d710SHarlan                                 sender.send(channel_data).map_err(|_| ChannelError {
5228e71d710SHarlan                                     value: ChannelErrorValue::SendError,
5238e71d710SHarlan                                 })?;
5248e71d710SHarlan                             }
5258e71d710SHarlan                         }
5268e71d710SHarlan                     }
5278e71d710SHarlan                 }
5288e71d710SHarlan                 _ => {}
5298e71d710SHarlan             }
5308e71d710SHarlan         }
5318e71d710SHarlan 
5328e71d710SHarlan         Ok(())
5338e71d710SHarlan     }
get_statistic_data(&self) -> Option<StreamStatistics>5348e71d710SHarlan     async fn get_statistic_data(&self) -> Option<StreamStatistics> {
5358e71d710SHarlan         if let Some(cache) = &mut *self.cache.lock().await {
5368e71d710SHarlan             return Some(cache.av_statistics.get_avstatistic_data().await);
5378e71d710SHarlan         }
5388e71d710SHarlan 
5398e71d710SHarlan         None
5408e71d710SHarlan     }
5418e71d710SHarlan 
send_information(&self, _: InformationSender)542b36cf5daSHarlan     async fn send_information(&self, _: InformationSender) {}
5438e71d710SHarlan }
5448e71d710SHarlan 
5458e71d710SHarlan impl fmt::Debug for Common {
fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>5468e71d710SHarlan     fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
5478e71d710SHarlan         write!(fmt, "S2 {{ member: {:?} }}", self.request_url)
5488e71d710SHarlan     }
5498e71d710SHarlan }
550