1*a4ef5d6cSHarlanC use tokio::sync::oneshot; 20c504437SHarlanC use { 3a3d19cccSHarlanC super::{ 4a3d19cccSHarlanC define::{tag_type, HttpResponseDataProducer}, 5a3d19cccSHarlanC errors::{HttpFLvError, HttpFLvErrorValue}, 6a3d19cccSHarlanC }, 7a3d19cccSHarlanC crate::rtmp::{ 8a3d19cccSHarlanC cache::metadata::MetaData, 98e71d710SHarlan session::errors::{SessionError, SessionErrorValue}, 100c504437SHarlanC }, 110c504437SHarlanC bytes::BytesMut, 128e71d710SHarlan std::net::SocketAddr, 138e71d710SHarlan streamhub::define::{ 148e71d710SHarlan FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender, 15*a4ef5d6cSHarlanC SubDataType, SubscribeType, SubscriberInfo, 160c504437SHarlanC }, 178e71d710SHarlan streamhub::{ 188e71d710SHarlan stream::StreamIdentifier, 198e71d710SHarlan utils::{RandomDigitCount, Uuid}, 208e71d710SHarlan }, 218e71d710SHarlan tokio::sync::mpsc, 22740804e8SHarlanC xflv::muxer::{FlvMuxer, HEADER_LENGTH}, 230c504437SHarlanC }; 240c504437SHarlanC 250c504437SHarlanC pub struct HttpFlv { 26e05ab47bSHarlanC app_name: String, 27e05ab47bSHarlanC stream_name: String, 2888d91efdSHarlanC 299eba8d65SHarlanC muxer: FlvMuxer, 3088d91efdSHarlanC 318e71d710SHarlan event_producer: StreamHubEventSender, 328e71d710SHarlan data_consumer: FrameDataReceiver, 33e05ab47bSHarlanC http_response_data_producer: HttpResponseDataProducer, 3488d91efdSHarlanC subscriber_id: Uuid, 35976f65a6SHarlan request_url: String, 36976f65a6SHarlan remote_addr: SocketAddr, 370c504437SHarlanC } 380c504437SHarlanC 390c504437SHarlanC impl HttpFlv { new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, http_response_data_producer: HttpResponseDataProducer, request_url: String, remote_addr: SocketAddr, ) -> Self40e05ab47bSHarlanC pub fn new( 41e05ab47bSHarlanC app_name: String, 42e05ab47bSHarlanC stream_name: String, 438e71d710SHarlan event_producer: StreamHubEventSender, 44e05ab47bSHarlanC http_response_data_producer: HttpResponseDataProducer, 45976f65a6SHarlan request_url: String, 46976f65a6SHarlan remote_addr: SocketAddr, 47e05ab47bSHarlanC ) -> Self { 480c504437SHarlanC let (_, data_consumer) = mpsc::unbounded_channel(); 498e71d710SHarlan let subscriber_id = Uuid::new(RandomDigitCount::Four); 50e05ab47bSHarlanC 510c504437SHarlanC Self { 52e05ab47bSHarlanC app_name, 53e05ab47bSHarlanC stream_name, 549eba8d65SHarlanC muxer: FlvMuxer::new(), 550c504437SHarlanC data_consumer, 560c504437SHarlanC event_producer, 57e05ab47bSHarlanC http_response_data_producer, 5888d91efdSHarlanC subscriber_id, 59976f65a6SHarlan request_url, 60976f65a6SHarlan remote_addr, 610c504437SHarlanC } 620c504437SHarlanC } 630c504437SHarlanC run(&mut self) -> Result<(), HttpFLvError>64e05ab47bSHarlanC pub async fn run(&mut self) -> Result<(), HttpFLvError> { 6588d91efdSHarlanC self.subscribe_from_rtmp_channels().await?; 66740804e8SHarlanC self.send_media_stream().await?; 67e05ab47bSHarlanC 68e05ab47bSHarlanC Ok(()) 69e05ab47bSHarlanC } 70e05ab47bSHarlanC send_media_stream(&mut self) -> Result<(), HttpFLvError>71740804e8SHarlanC pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> { 729eba8d65SHarlanC self.muxer.write_flv_header()?; 739eba8d65SHarlanC self.muxer.write_previous_tag_size(0)?; 74740804e8SHarlanC 75b10c2c1dSHarlanC self.flush_response_data()?; 76740804e8SHarlanC let mut retry_count = 0; 77b10c2c1dSHarlanC //write flv body 78b10c2c1dSHarlanC loop { 79b10c2c1dSHarlanC if let Some(data) = self.data_consumer.recv().await { 80740804e8SHarlanC if let Err(err) = self.write_flv_tag(data) { 81*a4ef5d6cSHarlanC if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value { 82*a4ef5d6cSHarlanC if err_in.is_disconnected() { 83*a4ef5d6cSHarlanC log::info!("write_flv_tag: {}", err_in); 84dcd08966Sningnao break; 85dcd08966Sningnao } 86*a4ef5d6cSHarlanC } 87740804e8SHarlanC log::error!("write_flv_tag err: {}", err); 88740804e8SHarlanC retry_count += 1; 89740804e8SHarlanC } else { 90740804e8SHarlanC retry_count = 0; 91740804e8SHarlanC } 92740804e8SHarlanC } else { 93740804e8SHarlanC retry_count += 1; 94740804e8SHarlanC } 95740804e8SHarlanC if retry_count > 10 { 96740804e8SHarlanC break; 97b10c2c1dSHarlanC } 98b10c2c1dSHarlanC } 9988d91efdSHarlanC self.unsubscribe_from_rtmp_channels().await 100b10c2c1dSHarlanC } 101b10c2c1dSHarlanC write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError>1028e71d710SHarlan pub fn write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError> { 103b36cf5daSHarlan let (common_data, common_timestamp, tag_type) = match channel_data { 104b36cf5daSHarlan FrameData::Audio { timestamp, data } => (data, timestamp, tag_type::AUDIO), 105b36cf5daSHarlan FrameData::Video { timestamp, data } => (data, timestamp, tag_type::VIDEO), 1068e71d710SHarlan FrameData::MetaData { timestamp, data } => { 1070ca99c20SHarlan let mut metadata = MetaData::new(); 1088e71d710SHarlan metadata.save(&data); 10995a688c2SHarlanC let data = metadata.remove_set_data_frame()?; 11056ad483aSHarlanC 111b36cf5daSHarlan (data, timestamp, tag_type::SCRIPT_DATA_AMF) 112783195d9SHarlanC } 113b36cf5daSHarlan _ => { 114b36cf5daSHarlan log::error!("should not be here!!!"); 115b36cf5daSHarlan (BytesMut::new(), 0, 0) 116783195d9SHarlanC } 117b36cf5daSHarlan }; 118783195d9SHarlanC 11956ad483aSHarlanC let common_data_len = common_data.len() as u32; 12056ad483aSHarlanC 12156ad483aSHarlanC self.muxer 12256ad483aSHarlanC .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?; 12356ad483aSHarlanC self.muxer.write_flv_tag_body(common_data)?; 12456ad483aSHarlanC self.muxer 12556ad483aSHarlanC .write_previous_tag_size(common_data_len + HEADER_LENGTH)?; 12656ad483aSHarlanC 127e05ab47bSHarlanC self.flush_response_data()?; 12856ad483aSHarlanC 129783195d9SHarlanC Ok(()) 1300c504437SHarlanC } 1310c504437SHarlanC flush_response_data(&mut self) -> Result<(), HttpFLvError>132e05ab47bSHarlanC pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> { 1339eba8d65SHarlanC let data = self.muxer.writer.extract_current_bytes(); 134*a4ef5d6cSHarlanC self.http_response_data_producer.start_send(Ok(data))?; 135740804e8SHarlanC 136740804e8SHarlanC Ok(()) 137740804e8SHarlanC } 138740804e8SHarlanC unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>13988d91efdSHarlanC pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 140ccd9a1faSHarlan let sub_info = SubscriberInfo { 141ccd9a1faSHarlan id: self.subscriber_id, 142ccd9a1faSHarlan sub_type: SubscribeType::PlayerHttpFlv, 143*a4ef5d6cSHarlanC sub_data_type: SubDataType::Frame, 144976f65a6SHarlan notify_info: NotifyInfo { 145976f65a6SHarlan request_url: self.request_url.clone(), 146976f65a6SHarlan remote_addr: self.remote_addr.to_string(), 147976f65a6SHarlan }, 148740804e8SHarlanC }; 149740804e8SHarlanC 1508e71d710SHarlan let identifier = StreamIdentifier::Rtmp { 151740804e8SHarlanC app_name: self.app_name.clone(), 152740804e8SHarlanC stream_name: self.stream_name.clone(), 1538e71d710SHarlan }; 1548e71d710SHarlan 1558e71d710SHarlan let subscribe_event = StreamHubEvent::UnSubscribe { 1568e71d710SHarlan identifier, 157ccd9a1faSHarlan info: sub_info, 158740804e8SHarlanC }; 159740804e8SHarlanC if let Err(err) = self.event_producer.send(subscribe_event) { 16069de9bbdSHarlanC log::error!("unsubscribe_from_channels err {}", err); 161740804e8SHarlanC } 162740804e8SHarlanC 163e05ab47bSHarlanC Ok(()) 164e05ab47bSHarlanC } 165e05ab47bSHarlanC subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>16688d91efdSHarlanC pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 167ccd9a1faSHarlan let sub_info = SubscriberInfo { 168ccd9a1faSHarlan id: self.subscriber_id, 169ccd9a1faSHarlan sub_type: SubscribeType::PlayerHttpFlv, 170*a4ef5d6cSHarlanC sub_data_type: SubDataType::Frame, 171976f65a6SHarlan notify_info: NotifyInfo { 172976f65a6SHarlan request_url: self.request_url.clone(), 173976f65a6SHarlan remote_addr: self.remote_addr.to_string(), 174976f65a6SHarlan }, 1750c504437SHarlanC }; 1760c504437SHarlanC 1778e71d710SHarlan let identifier = StreamIdentifier::Rtmp { 178740804e8SHarlanC app_name: self.app_name.clone(), 179740804e8SHarlanC stream_name: self.stream_name.clone(), 1808e71d710SHarlan }; 1818e71d710SHarlan 182*a4ef5d6cSHarlanC let (event_result_sender, event_result_receiver) = oneshot::channel(); 183*a4ef5d6cSHarlanC 1848e71d710SHarlan let subscribe_event = StreamHubEvent::Subscribe { 1858e71d710SHarlan identifier, 186ccd9a1faSHarlan info: sub_info, 187*a4ef5d6cSHarlanC result_sender: event_result_sender, 1880c504437SHarlanC }; 18988325f54SHarlanC 1900c504437SHarlanC let rv = self.event_producer.send(subscribe_event); 191a6684c8dSHarlan if rv.is_err() { 1920c504437SHarlanC let session_error = SessionError { 1938e71d710SHarlan value: SessionErrorValue::SendFrameDataErr, 1940c504437SHarlanC }; 1950c504437SHarlanC return Err(HttpFLvError { 1960c504437SHarlanC value: HttpFLvErrorValue::SessionError(session_error), 1970c504437SHarlanC }); 1980c504437SHarlanC } 1990c504437SHarlanC 200*a4ef5d6cSHarlanC let receiver = event_result_receiver.await??.frame_receiver.unwrap(); 2018e71d710SHarlan self.data_consumer = receiver; 2020c504437SHarlanC 2030c504437SHarlanC Ok(()) 2040c504437SHarlanC } 2050c504437SHarlanC } 206