1*a4ef5d6cSHarlanC use streamhub::define::DataSender; 2*a4ef5d6cSHarlanC use tokio::sync::oneshot; 380f20d70SHarlanC 492df423eSHarlanC use { 592df423eSHarlanC super::{ 68e71d710SHarlan define::SessionType, 792df423eSHarlanC errors::{SessionError, SessionErrorValue}, 892df423eSHarlanC }, 992df423eSHarlanC crate::{ 108e71d710SHarlan cache::errors::CacheError, 118e71d710SHarlan cache::Cache, 1292df423eSHarlanC chunk::{ 13b1840569SHarlanC define::{chunk_type, csid_type}, 1492df423eSHarlanC packetizer::ChunkPacketizer, 1592df423eSHarlanC ChunkInfo, 1692df423eSHarlanC }, 17b1840569SHarlanC messages::define::msg_type_id, 1892df423eSHarlanC }, 198e71d710SHarlan async_trait::async_trait, 2092df423eSHarlanC bytes::BytesMut, 218e71d710SHarlan std::fmt, 228e71d710SHarlan std::{net::SocketAddr, sync::Arc}, 238e71d710SHarlan streamhub::{ 248e71d710SHarlan define::{ 258e71d710SHarlan FrameData, FrameDataReceiver, FrameDataSender, InformationSender, NotifyInfo, 268e71d710SHarlan PublishType, PublisherInfo, StreamHubEvent, StreamHubEventSender, SubscribeType, 278e71d710SHarlan SubscriberInfo, TStreamHandler, 2892df423eSHarlanC }, 298e71d710SHarlan errors::{ChannelError, ChannelErrorValue}, 308e71d710SHarlan statistics::StreamStatistics, 318e71d710SHarlan stream::StreamIdentifier, 328e71d710SHarlan utils::Uuid, 338e71d710SHarlan }, 348e71d710SHarlan tokio::sync::{mpsc, Mutex}, 3592df423eSHarlanC }; 36976f65a6SHarlan 3792df423eSHarlanC pub struct Common { 3813bac29aSHarlan //only Server Subscriber or Client Publisher needs to send out trunck data. 3913bac29aSHarlan packetizer: Option<ChunkPacketizer>, 4092df423eSHarlanC 418e71d710SHarlan data_receiver: FrameDataReceiver, 428e71d710SHarlan data_sender: FrameDataSender, 4392df423eSHarlanC 448e71d710SHarlan event_producer: StreamHubEventSender, 45740804e8SHarlanC pub session_type: SessionType, 46976f65a6SHarlan 47976f65a6SHarlan /*save the client side socket connected to the SeverSession */ 48976f65a6SHarlan remote_addr: Option<SocketAddr>, 49976f65a6SHarlan /*request URL from client*/ 50976f65a6SHarlan pub request_url: String, 518e71d710SHarlan pub stream_handler: Arc<RtmpStreamHandler>, 5292df423eSHarlanC } 5392df423eSHarlanC 5492df423eSHarlanC impl Common { new( packetizer: Option<ChunkPacketizer>, event_producer: StreamHubEventSender, session_type: SessionType, remote_addr: Option<SocketAddr>, ) -> Self5516394c08SHarlanC pub fn new( 5613bac29aSHarlan packetizer: Option<ChunkPacketizer>, 578e71d710SHarlan event_producer: StreamHubEventSender, 5816394c08SHarlanC session_type: SessionType, 59976f65a6SHarlan remote_addr: Option<SocketAddr>, 6016394c08SHarlanC ) -> Self { 6192df423eSHarlanC //only used for init,since I don't found a better way to deal with this. 6292df423eSHarlanC let (init_producer, init_consumer) = mpsc::unbounded_channel(); 6392df423eSHarlanC 6492df423eSHarlanC Self { 6513bac29aSHarlan packetizer, 6692df423eSHarlanC 678e71d710SHarlan data_sender: init_producer, 688e71d710SHarlan data_receiver: init_consumer, 6992df423eSHarlanC 7092df423eSHarlanC event_producer, 7116394c08SHarlanC session_type, 72976f65a6SHarlan remote_addr, 73976f65a6SHarlan request_url: String::default(), 748e71d710SHarlan stream_handler: Arc::new(RtmpStreamHandler::new()), 758e71d710SHarlan //cache: None, 7692df423eSHarlanC } 7792df423eSHarlanC } send_channel_data(&mut self) -> Result<(), SessionError>7892df423eSHarlanC pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 795359bb99Swawacry let mut retry_times = 0; 8092df423eSHarlanC loop { 818e71d710SHarlan if let Some(data) = self.data_receiver.recv().await { 8292df423eSHarlanC match data { 838e71d710SHarlan FrameData::Audio { timestamp, data } => { 8492df423eSHarlanC self.send_audio(data, timestamp).await?; 8592df423eSHarlanC } 868e71d710SHarlan FrameData::Video { timestamp, data } => { 8792df423eSHarlanC self.send_video(data, timestamp).await?; 8892df423eSHarlanC } 898e71d710SHarlan FrameData::MetaData { timestamp, data } => { 9087f493cdSHarlanC self.send_metadata(data, timestamp).await?; 9192df423eSHarlanC } 92b36cf5daSHarlan _ => {} 9392df423eSHarlanC } 945359bb99Swawacry } else { 955359bb99Swawacry retry_times += 1; 965359bb99Swawacry log::debug!( 975359bb99Swawacry "send_channel_data: no data receives ,retry {} times!", 985359bb99Swawacry retry_times 995359bb99Swawacry ); 1005359bb99Swawacry 1015359bb99Swawacry if retry_times > 10 { 1025359bb99Swawacry return Err(SessionError { 1035359bb99Swawacry value: SessionErrorValue::NoMediaDataReceived, 1045359bb99Swawacry }); 1055359bb99Swawacry } 10692df423eSHarlanC } 10792df423eSHarlanC } 10892df423eSHarlanC } 10992df423eSHarlanC send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>11092df423eSHarlanC pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 11192df423eSHarlanC let mut chunk_info = ChunkInfo::new( 11292df423eSHarlanC csid_type::AUDIO, 11392df423eSHarlanC chunk_type::TYPE_0, 11492df423eSHarlanC timestamp, 11592df423eSHarlanC data.len() as u32, 11692df423eSHarlanC msg_type_id::AUDIO, 11792df423eSHarlanC 0, 11892df423eSHarlanC data, 11992df423eSHarlanC ); 12092df423eSHarlanC 12113bac29aSHarlan if let Some(packetizer) = &mut self.packetizer { 12213bac29aSHarlan packetizer.write_chunk(&mut chunk_info).await?; 12313bac29aSHarlan } 12492df423eSHarlanC 12592df423eSHarlanC Ok(()) 12692df423eSHarlanC } 12792df423eSHarlanC send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>12892df423eSHarlanC pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 12992df423eSHarlanC let mut chunk_info = ChunkInfo::new( 13092df423eSHarlanC csid_type::VIDEO, 13192df423eSHarlanC chunk_type::TYPE_0, 13292df423eSHarlanC timestamp, 13392df423eSHarlanC data.len() as u32, 13492df423eSHarlanC msg_type_id::VIDEO, 13592df423eSHarlanC 0, 13692df423eSHarlanC data, 13792df423eSHarlanC ); 13892df423eSHarlanC 13913bac29aSHarlan if let Some(packetizer) = &mut self.packetizer { 14013bac29aSHarlan packetizer.write_chunk(&mut chunk_info).await?; 14113bac29aSHarlan } 14292df423eSHarlanC 14392df423eSHarlanC Ok(()) 14492df423eSHarlanC } 14592df423eSHarlanC send_metadata( &mut self, data: BytesMut, timestamp: u32, ) -> Result<(), SessionError>14687f493cdSHarlanC pub async fn send_metadata( 14787f493cdSHarlanC &mut self, 14887f493cdSHarlanC data: BytesMut, 14987f493cdSHarlanC timestamp: u32, 15087f493cdSHarlanC ) -> Result<(), SessionError> { 15192df423eSHarlanC let mut chunk_info = ChunkInfo::new( 15292df423eSHarlanC csid_type::DATA_AMF0_AMF3, 15392df423eSHarlanC chunk_type::TYPE_0, 15487f493cdSHarlanC timestamp, 15592df423eSHarlanC data.len() as u32, 15692df423eSHarlanC msg_type_id::DATA_AMF0, 15792df423eSHarlanC 0, 15892df423eSHarlanC data, 15992df423eSHarlanC ); 16092df423eSHarlanC 16113bac29aSHarlan if let Some(packetizer) = &mut self.packetizer { 16213bac29aSHarlan packetizer.write_chunk(&mut chunk_info).await?; 16313bac29aSHarlan } 16413bac29aSHarlan 16592df423eSHarlanC Ok(()) 16692df423eSHarlanC } 16792df423eSHarlanC on_video_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>1688e71d710SHarlan pub async fn on_video_data( 16992df423eSHarlanC &mut self, 17092df423eSHarlanC data: &mut BytesMut, 17192df423eSHarlanC timestamp: &u32, 17292df423eSHarlanC ) -> Result<(), SessionError> { 1738e71d710SHarlan let channel_data = FrameData::Video { 17485c0af6aSLuca Barbato timestamp: *timestamp, 17592df423eSHarlanC data: data.clone(), 17692df423eSHarlanC }; 17792df423eSHarlanC 1788e71d710SHarlan match self.data_sender.send(channel_data) { 17992df423eSHarlanC Ok(_) => {} 18092df423eSHarlanC Err(err) => { 1815de1eabbSHarlanC log::error!("send video err: {}", err); 18292df423eSHarlanC return Err(SessionError { 1838e71d710SHarlan value: SessionErrorValue::SendFrameDataErr, 18492df423eSHarlanC }); 18592df423eSHarlanC } 18692df423eSHarlanC } 18792df423eSHarlanC 1888e71d710SHarlan self.stream_handler 1898e71d710SHarlan .save_video_data(data, *timestamp) 1908e71d710SHarlan .await?; 1918e71d710SHarlan 19292df423eSHarlanC Ok(()) 19392df423eSHarlanC } 19492df423eSHarlanC on_audio_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>1958e71d710SHarlan pub async fn on_audio_data( 19692df423eSHarlanC &mut self, 19792df423eSHarlanC data: &mut BytesMut, 19892df423eSHarlanC timestamp: &u32, 19992df423eSHarlanC ) -> Result<(), SessionError> { 2008e71d710SHarlan let channel_data = FrameData::Audio { 20185c0af6aSLuca Barbato timestamp: *timestamp, 20292df423eSHarlanC data: data.clone(), 20392df423eSHarlanC }; 20492df423eSHarlanC 2058e71d710SHarlan match self.data_sender.send(channel_data) { 20692df423eSHarlanC Ok(_) => {} 20792df423eSHarlanC Err(err) => { 20869de9bbdSHarlanC log::error!("receive audio err {}", err); 20992df423eSHarlanC return Err(SessionError { 2108e71d710SHarlan value: SessionErrorValue::SendFrameDataErr, 21192df423eSHarlanC }); 21292df423eSHarlanC } 21392df423eSHarlanC } 21492df423eSHarlanC 2158e71d710SHarlan self.stream_handler 2168e71d710SHarlan .save_audio_data(data, *timestamp) 2178e71d710SHarlan .await?; 2188e71d710SHarlan 21992df423eSHarlanC Ok(()) 22092df423eSHarlanC } 22192df423eSHarlanC on_meta_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>2228e71d710SHarlan pub async fn on_meta_data( 22387f493cdSHarlanC &mut self, 2248e71d710SHarlan data: &mut BytesMut, 22587f493cdSHarlanC timestamp: &u32, 22687f493cdSHarlanC ) -> Result<(), SessionError> { 2278e71d710SHarlan let channel_data = FrameData::MetaData { 22885c0af6aSLuca Barbato timestamp: *timestamp, 2298e71d710SHarlan data: data.clone(), 23087f493cdSHarlanC }; 23192df423eSHarlanC 2328e71d710SHarlan match self.data_sender.send(channel_data) { 23392df423eSHarlanC Ok(_) => {} 23492df423eSHarlanC Err(_) => { 23592df423eSHarlanC return Err(SessionError { 2368e71d710SHarlan value: SessionErrorValue::SendFrameDataErr, 23792df423eSHarlanC }) 23892df423eSHarlanC } 23992df423eSHarlanC } 24092df423eSHarlanC 2418e71d710SHarlan self.stream_handler.save_metadata(data, *timestamp).await; 2428e71d710SHarlan 24392df423eSHarlanC Ok(()) 24492df423eSHarlanC } 24592df423eSHarlanC get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo246ccd9a1faSHarlan fn get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo { 247976f65a6SHarlan let remote_addr = if let Some(addr) = self.remote_addr { 248976f65a6SHarlan addr.to_string() 249976f65a6SHarlan } else { 250976f65a6SHarlan String::from("unknown") 251976f65a6SHarlan }; 252976f65a6SHarlan 2538e71d710SHarlan let sub_type = match self.session_type { 2548e71d710SHarlan SessionType::Client => SubscribeType::PublisherRtmp, 2558e71d710SHarlan SessionType::Server => SubscribeType::PlayerRtmp, 2568e71d710SHarlan }; 2578e71d710SHarlan 2588e71d710SHarlan SubscriberInfo { 259ccd9a1faSHarlan id: sub_id, 260976f65a6SHarlan /*rtmp local client subscribe from local rtmp session 261976f65a6SHarlan and publish(relay) the rtmp steam to remote RTMP server*/ 2628e71d710SHarlan sub_type, 263*a4ef5d6cSHarlanC sub_data_type: streamhub::define::SubDataType::Frame, 264976f65a6SHarlan notify_info: NotifyInfo { 265976f65a6SHarlan request_url: self.request_url.clone(), 266976f65a6SHarlan remote_addr, 267976f65a6SHarlan }, 26816394c08SHarlanC } 26916394c08SHarlanC } 27016394c08SHarlanC get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo271976f65a6SHarlan fn get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo { 272976f65a6SHarlan let remote_addr = if let Some(addr) = self.remote_addr { 273976f65a6SHarlan addr.to_string() 274976f65a6SHarlan } else { 275976f65a6SHarlan String::from("unknown") 276976f65a6SHarlan }; 277976f65a6SHarlan 2788e71d710SHarlan let pub_type = match self.session_type { 2798e71d710SHarlan SessionType::Client => PublishType::RelayRtmp, 2808e71d710SHarlan SessionType::Server => PublishType::PushRtmp, 2818e71d710SHarlan }; 2828e71d710SHarlan 2838e71d710SHarlan PublisherInfo { 284976f65a6SHarlan id: sub_id, 2858e71d710SHarlan pub_type, 286*a4ef5d6cSHarlanC pub_data_type: streamhub::define::PubDataType::Frame, 287976f65a6SHarlan notify_info: NotifyInfo { 288976f65a6SHarlan request_url: self.request_url.clone(), 289976f65a6SHarlan remote_addr, 290976f65a6SHarlan }, 291976f65a6SHarlan } 292976f65a6SHarlan } 293976f65a6SHarlan 294248cdac6SHarlan /*Subscribe from local channels and then send data to retmote common player or local RTMP relay push client*/ subscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>29592df423eSHarlanC pub async fn subscribe_from_channels( 29692df423eSHarlanC &mut self, 29792df423eSHarlanC app_name: String, 29892df423eSHarlanC stream_name: String, 29988d91efdSHarlanC sub_id: Uuid, 30092df423eSHarlanC ) -> Result<(), SessionError> { 3015de1eabbSHarlanC log::info!( 30288d91efdSHarlanC "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}", 30392df423eSHarlanC app_name, 304b36cf5daSHarlan stream_name, 30588d91efdSHarlanC sub_id 30692df423eSHarlanC ); 30792df423eSHarlanC 3088e71d710SHarlan let identifier = StreamIdentifier::Rtmp { 309b36cf5daSHarlan app_name, 310b36cf5daSHarlan stream_name, 3118e71d710SHarlan }; 3128e71d710SHarlan 313*a4ef5d6cSHarlanC let (event_result_sender, event_result_receiver) = oneshot::channel(); 314*a4ef5d6cSHarlanC 3158e71d710SHarlan let subscribe_event = StreamHubEvent::Subscribe { 3168e71d710SHarlan identifier, 317ccd9a1faSHarlan info: self.get_subscriber_info(sub_id), 318*a4ef5d6cSHarlanC result_sender: event_result_sender, 31992df423eSHarlanC }; 32092df423eSHarlanC let rv = self.event_producer.send(subscribe_event); 3210ca99c20SHarlan 3220ca99c20SHarlan if rv.is_err() { 32392df423eSHarlanC return Err(SessionError { 3248e71d710SHarlan value: SessionErrorValue::StreamHubEventSendErr, 3250ca99c20SHarlan }); 32692df423eSHarlanC } 32792df423eSHarlanC 328*a4ef5d6cSHarlanC let recv = event_result_receiver.await??; 329*a4ef5d6cSHarlanC self.data_receiver = recv.frame_receiver.unwrap(); 330b1840569SHarlanC 33192df423eSHarlanC Ok(()) 33292df423eSHarlanC } 33392df423eSHarlanC unsubscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>33492df423eSHarlanC pub async fn unsubscribe_from_channels( 33592df423eSHarlanC &mut self, 33692df423eSHarlanC app_name: String, 33792df423eSHarlanC stream_name: String, 33888d91efdSHarlanC sub_id: Uuid, 33992df423eSHarlanC ) -> Result<(), SessionError> { 3408e71d710SHarlan let identifier = StreamIdentifier::Rtmp { 34192df423eSHarlanC app_name, 34292df423eSHarlanC stream_name, 3438e71d710SHarlan }; 3448e71d710SHarlan 3458e71d710SHarlan let subscribe_event = StreamHubEvent::UnSubscribe { 3468e71d710SHarlan identifier, 347ccd9a1faSHarlan info: self.get_subscriber_info(sub_id), 34892df423eSHarlanC }; 34930c61c6eSHarlanC if let Err(err) = self.event_producer.send(subscribe_event) { 35069de9bbdSHarlanC log::error!("unsubscribe_from_channels err {}", err); 35130c61c6eSHarlanC } 35292df423eSHarlanC 35392df423eSHarlanC Ok(()) 35492df423eSHarlanC } 35592df423eSHarlanC 356976f65a6SHarlan /*Begin to receive stream data from remote RTMP push client or local RTMP relay pull client*/ publish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, gop_num: usize, ) -> Result<(), SessionError>35792df423eSHarlanC pub async fn publish_to_channels( 35892df423eSHarlanC &mut self, 35992df423eSHarlanC app_name: String, 36092df423eSHarlanC stream_name: String, 361976f65a6SHarlan pub_id: Uuid, 3628e71d710SHarlan gop_num: usize, 36392df423eSHarlanC ) -> Result<(), SessionError> { 3648e71d710SHarlan self.stream_handler 3658e71d710SHarlan .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num)) 3668e71d710SHarlan .await; 3678e71d710SHarlan 368*a4ef5d6cSHarlanC let (event_result_sender, event_result_receiver) = oneshot::channel(); 3698e71d710SHarlan let publish_event = StreamHubEvent::Publish { 3708e71d710SHarlan identifier: StreamIdentifier::Rtmp { 37192df423eSHarlanC app_name, 37292df423eSHarlanC stream_name, 3738e71d710SHarlan }, 374976f65a6SHarlan info: self.get_publisher_info(pub_id), 3758e71d710SHarlan stream_handler: self.stream_handler.clone(), 376*a4ef5d6cSHarlanC result_sender: event_result_sender, 37792df423eSHarlanC }; 37892df423eSHarlanC 37913bac29aSHarlan if self.event_producer.send(publish_event).is_err() { 38092df423eSHarlanC return Err(SessionError { 3818e71d710SHarlan value: SessionErrorValue::StreamHubEventSendErr, 3820ca99c20SHarlan }); 38392df423eSHarlanC } 38492df423eSHarlanC 385*a4ef5d6cSHarlanC let result = event_result_receiver.await??; 386*a4ef5d6cSHarlanC self.data_sender = result.0.unwrap(); 38792df423eSHarlanC Ok(()) 38892df423eSHarlanC } 38992df423eSHarlanC unpublish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, ) -> Result<(), SessionError>39092df423eSHarlanC pub async fn unpublish_to_channels( 39192df423eSHarlanC &mut self, 39292df423eSHarlanC app_name: String, 39392df423eSHarlanC stream_name: String, 394976f65a6SHarlan pub_id: Uuid, 39592df423eSHarlanC ) -> Result<(), SessionError> { 396ccd9a1faSHarlan log::info!( 397ccd9a1faSHarlan "unpublish_to_channels, app_name:{}, stream_name:{}", 398ccd9a1faSHarlan app_name, 399ccd9a1faSHarlan stream_name 400ccd9a1faSHarlan ); 4018e71d710SHarlan let unpublish_event = StreamHubEvent::UnPublish { 4028e71d710SHarlan identifier: StreamIdentifier::Rtmp { 40388325f54SHarlanC app_name: app_name.clone(), 40488325f54SHarlanC stream_name: stream_name.clone(), 4058e71d710SHarlan }, 406976f65a6SHarlan info: self.get_publisher_info(pub_id), 40792df423eSHarlanC }; 40892df423eSHarlanC 40913bac29aSHarlan match self.event_producer.send(unpublish_event) { 41092df423eSHarlanC Err(_) => { 41188325f54SHarlanC log::error!( 41288325f54SHarlanC "unpublish_to_channels error.app_name: {}, stream_name: {}", 41388325f54SHarlanC app_name, 41488325f54SHarlanC stream_name 41588325f54SHarlanC ); 41692df423eSHarlanC return Err(SessionError { 4178e71d710SHarlan value: SessionErrorValue::StreamHubEventSendErr, 41892df423eSHarlanC }); 41992df423eSHarlanC } 42092df423eSHarlanC _ => { 42188325f54SHarlanC log::info!( 42288325f54SHarlanC "unpublish_to_channels successfully.app_name: {}, stream_name: {}", 42388325f54SHarlanC app_name, 42488325f54SHarlanC stream_name 42588325f54SHarlanC ); 42692df423eSHarlanC } 42792df423eSHarlanC } 42892df423eSHarlanC Ok(()) 42992df423eSHarlanC } 43092df423eSHarlanC } 4318e71d710SHarlan 432b36cf5daSHarlan #[derive(Default)] 4338e71d710SHarlan pub struct RtmpStreamHandler { 4348e71d710SHarlan /*cache is used to save RTMP sequence/gops/meta data 4358e71d710SHarlan which needs to be send to client(player) */ 4368e71d710SHarlan /*The cache will be used in different threads(save 4378e71d710SHarlan cache in one thread and send cache data to different clients 4388e71d710SHarlan in other threads) */ 4398e71d710SHarlan pub cache: Mutex<Option<Cache>>, 4408e71d710SHarlan } 4418e71d710SHarlan 4428e71d710SHarlan impl RtmpStreamHandler { new() -> Self4438e71d710SHarlan pub fn new() -> Self { 4448e71d710SHarlan Self { 4458e71d710SHarlan cache: Mutex::new(None), 4468e71d710SHarlan } 4478e71d710SHarlan } 4488e71d710SHarlan set_cache(&self, cache: Cache)4498e71d710SHarlan pub async fn set_cache(&self, cache: Cache) { 4508e71d710SHarlan *self.cache.lock().await = Some(cache); 4518e71d710SHarlan } 4528e71d710SHarlan save_video_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>4538e71d710SHarlan pub async fn save_video_data( 4548e71d710SHarlan &self, 4558e71d710SHarlan chunk_body: &BytesMut, 4568e71d710SHarlan timestamp: u32, 4578e71d710SHarlan ) -> Result<(), CacheError> { 4588e71d710SHarlan if let Some(cache) = &mut *self.cache.lock().await { 4598e71d710SHarlan cache.save_video_data(chunk_body, timestamp).await?; 4608e71d710SHarlan } 4618e71d710SHarlan Ok(()) 4628e71d710SHarlan } 4638e71d710SHarlan save_audio_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>4648e71d710SHarlan pub async fn save_audio_data( 4658e71d710SHarlan &self, 4668e71d710SHarlan chunk_body: &BytesMut, 4678e71d710SHarlan timestamp: u32, 4688e71d710SHarlan ) -> Result<(), CacheError> { 4698e71d710SHarlan if let Some(cache) = &mut *self.cache.lock().await { 4708e71d710SHarlan cache.save_audio_data(chunk_body, timestamp).await?; 4718e71d710SHarlan } 4728e71d710SHarlan Ok(()) 4738e71d710SHarlan } 4748e71d710SHarlan save_metadata(&self, chunk_body: &BytesMut, timestamp: u32)4758e71d710SHarlan pub async fn save_metadata(&self, chunk_body: &BytesMut, timestamp: u32) { 4768e71d710SHarlan if let Some(cache) = &mut *self.cache.lock().await { 4778e71d710SHarlan cache.save_metadata(chunk_body, timestamp); 4788e71d710SHarlan } 4798e71d710SHarlan } 4808e71d710SHarlan } 4818e71d710SHarlan 4828e71d710SHarlan #[async_trait] 4838e71d710SHarlan impl TStreamHandler for RtmpStreamHandler { send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>484b36cf5daSHarlan async fn send_prior_data( 4858e71d710SHarlan &self, 48680f20d70SHarlanC data_sender: DataSender, 4878e71d710SHarlan sub_type: SubscribeType, 4888e71d710SHarlan ) -> Result<(), ChannelError> { 48980f20d70SHarlanC let sender = match data_sender { 49080f20d70SHarlanC DataSender::Frame { sender } => sender, 49180f20d70SHarlanC DataSender::Packet { sender: _ } => { 49280f20d70SHarlanC return Err(ChannelError { 49380f20d70SHarlanC value: ChannelErrorValue::NotCorrectDataSenderType, 49480f20d70SHarlanC }); 49580f20d70SHarlanC } 49680f20d70SHarlanC }; 4978e71d710SHarlan if let Some(cache) = &mut *self.cache.lock().await { 4988e71d710SHarlan if let Some(meta_body_data) = cache.get_metadata() { 4998e71d710SHarlan sender.send(meta_body_data).map_err(|_| ChannelError { 5008e71d710SHarlan value: ChannelErrorValue::SendError, 5018e71d710SHarlan })?; 5028e71d710SHarlan } 5038e71d710SHarlan if let Some(audio_seq_data) = cache.get_audio_seq() { 5048e71d710SHarlan sender.send(audio_seq_data).map_err(|_| ChannelError { 5058e71d710SHarlan value: ChannelErrorValue::SendError, 5068e71d710SHarlan })?; 5078e71d710SHarlan } 5088e71d710SHarlan if let Some(video_seq_data) = cache.get_video_seq() { 5098e71d710SHarlan sender.send(video_seq_data).map_err(|_| ChannelError { 5108e71d710SHarlan value: ChannelErrorValue::SendError, 5118e71d710SHarlan })?; 5128e71d710SHarlan } 5138e71d710SHarlan match sub_type { 5148e71d710SHarlan SubscribeType::PlayerRtmp 5158e71d710SHarlan | SubscribeType::PlayerHttpFlv 5168e71d710SHarlan | SubscribeType::PlayerHls 5178e71d710SHarlan | SubscribeType::GenerateHls => { 5188e71d710SHarlan if let Some(gops_data) = cache.get_gops_data() { 5198e71d710SHarlan for gop in gops_data { 5208e71d710SHarlan for channel_data in gop.get_frame_data() { 5218e71d710SHarlan sender.send(channel_data).map_err(|_| ChannelError { 5228e71d710SHarlan value: ChannelErrorValue::SendError, 5238e71d710SHarlan })?; 5248e71d710SHarlan } 5258e71d710SHarlan } 5268e71d710SHarlan } 5278e71d710SHarlan } 5288e71d710SHarlan _ => {} 5298e71d710SHarlan } 5308e71d710SHarlan } 5318e71d710SHarlan 5328e71d710SHarlan Ok(()) 5338e71d710SHarlan } get_statistic_data(&self) -> Option<StreamStatistics>5348e71d710SHarlan async fn get_statistic_data(&self) -> Option<StreamStatistics> { 5358e71d710SHarlan if let Some(cache) = &mut *self.cache.lock().await { 5368e71d710SHarlan return Some(cache.av_statistics.get_avstatistic_data().await); 5378e71d710SHarlan } 5388e71d710SHarlan 5398e71d710SHarlan None 5408e71d710SHarlan } 5418e71d710SHarlan send_information(&self, _: InformationSender)542b36cf5daSHarlan async fn send_information(&self, _: InformationSender) {} 5438e71d710SHarlan } 5448e71d710SHarlan 5458e71d710SHarlan impl fmt::Debug for Common { fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>5468e71d710SHarlan fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { 5478e71d710SHarlan write!(fmt, "S2 {{ member: {:?} }}", self.request_url) 5488e71d710SHarlan } 5498e71d710SHarlan } 550