1 use tokio::sync::oneshot; 2 3 use { 4 super::{ 5 errors::{HlsError, HlsErrorValue}, 6 flv2hls::Flv2HlsRemuxer, 7 }, 8 rtmp::session::errors::{SessionError, SessionErrorValue}, 9 std::time::Duration, 10 streamhub::{ 11 define::{ 12 FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender, 13 SubscribeType, SubscriberInfo, 14 }, 15 stream::StreamIdentifier, 16 utils::{RandomDigitCount, Uuid}, 17 }, 18 tokio::{sync::mpsc, time::sleep}, 19 xflv::define::FlvData, 20 }; 21 22 ////https://www.jianshu.com/p/d6311f03b81f 23 24 pub struct FlvDataReceiver { 25 app_name: String, 26 stream_name: String, 27 event_producer: StreamHubEventSender, 28 data_consumer: FrameDataReceiver, 29 media_processor: Flv2HlsRemuxer, 30 subscriber_id: Uuid, 31 } 32 33 impl FlvDataReceiver { new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, duration: i64, need_record: bool, ) -> Self34 pub fn new( 35 app_name: String, 36 stream_name: String, 37 event_producer: StreamHubEventSender, 38 duration: i64, 39 need_record: bool, 40 ) -> Self { 41 let (_, data_consumer) = mpsc::unbounded_channel(); 42 let subscriber_id = Uuid::new(RandomDigitCount::Four); 43 44 Self { 45 app_name: app_name.clone(), 46 stream_name: stream_name.clone(), 47 data_consumer, 48 event_producer, 49 media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record), 50 subscriber_id, 51 } 52 } 53 run(&mut self) -> Result<(), HlsError>54 pub async fn run(&mut self) -> Result<(), HlsError> { 55 self.subscribe_from_rtmp_channels(self.app_name.clone(), self.stream_name.clone()) 56 .await?; 57 self.receive_flv_data().await?; 58 59 Ok(()) 60 } 61 receive_flv_data(&mut self) -> Result<(), HlsError>62 pub async fn receive_flv_data(&mut self) -> Result<(), HlsError> { 63 let mut retry_count = 0; 64 65 loop { 66 if let Some(data) = self.data_consumer.recv().await { 67 let flv_data: FlvData = match data { 68 FrameData::Audio { timestamp, data } => FlvData::Audio { timestamp, data }, 69 FrameData::Video { timestamp, data } => FlvData::Video { timestamp, data }, 70 _ => continue, 71 }; 72 retry_count = 0; 73 self.media_processor.process_flv_data(flv_data)?; 74 } else { 75 sleep(Duration::from_millis(100)).await; 76 retry_count += 1; 77 } 78 //When rtmp stream is interupted here we retry 10 times. 79 //maybe have a better way to judge the stream status. 80 //will do an optimization in the future. 81 //todo 82 if retry_count > 10 { 83 break; 84 } 85 } 86 87 self.media_processor.clear_files()?; 88 self.unsubscribe_from_rtmp_channels().await 89 } 90 flush_response_data(&mut self) -> Result<(), HlsError>91 pub fn flush_response_data(&mut self) -> Result<(), HlsError> { 92 Ok(()) 93 } 94 subscribe_from_rtmp_channels( &mut self, app_name: String, stream_name: String, ) -> Result<(), HlsError>95 pub async fn subscribe_from_rtmp_channels( 96 &mut self, 97 app_name: String, 98 stream_name: String, 99 ) -> Result<(), HlsError> { 100 /*the sub info is only used to transfer from RTMP to HLS, but not for client player */ 101 let sub_info = SubscriberInfo { 102 id: self.subscriber_id, 103 sub_type: SubscribeType::GenerateHls, 104 sub_data_type: streamhub::define::SubDataType::Frame, 105 notify_info: NotifyInfo { 106 request_url: String::from(""), 107 remote_addr: String::from(""), 108 }, 109 }; 110 111 let identifier = StreamIdentifier::Rtmp { 112 app_name, 113 stream_name, 114 }; 115 116 let (event_result_sender, event_result_receiver) = oneshot::channel(); 117 118 let subscribe_event = StreamHubEvent::Subscribe { 119 identifier, 120 info: sub_info, 121 result_sender: event_result_sender, 122 }; 123 124 let rv = self.event_producer.send(subscribe_event); 125 if rv.is_err() { 126 let session_error = SessionError { 127 value: SessionErrorValue::StreamHubEventSendErr, 128 }; 129 return Err(HlsError { 130 value: HlsErrorValue::SessionError(session_error), 131 }); 132 } 133 134 let receiver = event_result_receiver.await??.frame_receiver.unwrap(); 135 136 self.data_consumer = receiver; 137 138 Ok(()) 139 } 140 unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HlsError>141 pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HlsError> { 142 let sub_info = SubscriberInfo { 143 id: self.subscriber_id, 144 sub_type: SubscribeType::PlayerHls, 145 sub_data_type: streamhub::define::SubDataType::Frame, 146 notify_info: NotifyInfo { 147 request_url: String::from(""), 148 remote_addr: String::from(""), 149 }, 150 }; 151 152 let identifier = StreamIdentifier::Rtmp { 153 app_name: self.app_name.clone(), 154 stream_name: self.stream_name.clone(), 155 }; 156 157 let subscribe_event = StreamHubEvent::UnSubscribe { 158 identifier, 159 info: sub_info, 160 }; 161 if let Err(err) = self.event_producer.send(subscribe_event) { 162 log::error!("unsubscribe_from_channels err {}", err); 163 } 164 165 Ok(()) 166 } 167 } 168