xref: /xiu/protocol/hls/src/flv_data_receiver.rs (revision a4ef5d6c)
1 use tokio::sync::oneshot;
2 
3 use {
4     super::{
5         errors::{HlsError, HlsErrorValue},
6         flv2hls::Flv2HlsRemuxer,
7     },
8     rtmp::session::errors::{SessionError, SessionErrorValue},
9     std::time::Duration,
10     streamhub::{
11         define::{
12             FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender,
13             SubscribeType, SubscriberInfo,
14         },
15         stream::StreamIdentifier,
16         utils::{RandomDigitCount, Uuid},
17     },
18     tokio::{sync::mpsc, time::sleep},
19     xflv::define::FlvData,
20 };
21 
22 ////https://www.jianshu.com/p/d6311f03b81f
23 
24 pub struct FlvDataReceiver {
25     app_name: String,
26     stream_name: String,
27     event_producer: StreamHubEventSender,
28     data_consumer: FrameDataReceiver,
29     media_processor: Flv2HlsRemuxer,
30     subscriber_id: Uuid,
31 }
32 
33 impl FlvDataReceiver {
new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, duration: i64, need_record: bool, ) -> Self34     pub fn new(
35         app_name: String,
36         stream_name: String,
37         event_producer: StreamHubEventSender,
38         duration: i64,
39         need_record: bool,
40     ) -> Self {
41         let (_, data_consumer) = mpsc::unbounded_channel();
42         let subscriber_id = Uuid::new(RandomDigitCount::Four);
43 
44         Self {
45             app_name: app_name.clone(),
46             stream_name: stream_name.clone(),
47             data_consumer,
48             event_producer,
49             media_processor: Flv2HlsRemuxer::new(duration, app_name, stream_name, need_record),
50             subscriber_id,
51         }
52     }
53 
run(&mut self) -> Result<(), HlsError>54     pub async fn run(&mut self) -> Result<(), HlsError> {
55         self.subscribe_from_rtmp_channels(self.app_name.clone(), self.stream_name.clone())
56             .await?;
57         self.receive_flv_data().await?;
58 
59         Ok(())
60     }
61 
receive_flv_data(&mut self) -> Result<(), HlsError>62     pub async fn receive_flv_data(&mut self) -> Result<(), HlsError> {
63         let mut retry_count = 0;
64 
65         loop {
66             if let Some(data) = self.data_consumer.recv().await {
67                 let flv_data: FlvData = match data {
68                     FrameData::Audio { timestamp, data } => FlvData::Audio { timestamp, data },
69                     FrameData::Video { timestamp, data } => FlvData::Video { timestamp, data },
70                     _ => continue,
71                 };
72                 retry_count = 0;
73                 self.media_processor.process_flv_data(flv_data)?;
74             } else {
75                 sleep(Duration::from_millis(100)).await;
76                 retry_count += 1;
77             }
78             //When rtmp stream is interupted here we retry 10 times.
79             //maybe have a better way to judge the stream status.
80             //will do an optimization in the future.
81             //todo
82             if retry_count > 10 {
83                 break;
84             }
85         }
86 
87         self.media_processor.clear_files()?;
88         self.unsubscribe_from_rtmp_channels().await
89     }
90 
flush_response_data(&mut self) -> Result<(), HlsError>91     pub fn flush_response_data(&mut self) -> Result<(), HlsError> {
92         Ok(())
93     }
94 
subscribe_from_rtmp_channels( &mut self, app_name: String, stream_name: String, ) -> Result<(), HlsError>95     pub async fn subscribe_from_rtmp_channels(
96         &mut self,
97         app_name: String,
98         stream_name: String,
99     ) -> Result<(), HlsError> {
100         /*the sub info is only used to transfer from RTMP to HLS, but not for client player */
101         let sub_info = SubscriberInfo {
102             id: self.subscriber_id,
103             sub_type: SubscribeType::GenerateHls,
104             sub_data_type: streamhub::define::SubDataType::Frame,
105             notify_info: NotifyInfo {
106                 request_url: String::from(""),
107                 remote_addr: String::from(""),
108             },
109         };
110 
111         let identifier = StreamIdentifier::Rtmp {
112             app_name,
113             stream_name,
114         };
115 
116         let (event_result_sender, event_result_receiver) = oneshot::channel();
117 
118         let subscribe_event = StreamHubEvent::Subscribe {
119             identifier,
120             info: sub_info,
121             result_sender: event_result_sender,
122         };
123 
124         let rv = self.event_producer.send(subscribe_event);
125         if rv.is_err() {
126             let session_error = SessionError {
127                 value: SessionErrorValue::StreamHubEventSendErr,
128             };
129             return Err(HlsError {
130                 value: HlsErrorValue::SessionError(session_error),
131             });
132         }
133 
134         let receiver = event_result_receiver.await??.frame_receiver.unwrap();
135 
136         self.data_consumer = receiver;
137 
138         Ok(())
139     }
140 
unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HlsError>141     pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HlsError> {
142         let sub_info = SubscriberInfo {
143             id: self.subscriber_id,
144             sub_type: SubscribeType::PlayerHls,
145             sub_data_type: streamhub::define::SubDataType::Frame,
146             notify_info: NotifyInfo {
147                 request_url: String::from(""),
148                 remote_addr: String::from(""),
149             },
150         };
151 
152         let identifier = StreamIdentifier::Rtmp {
153             app_name: self.app_name.clone(),
154             stream_name: self.stream_name.clone(),
155         };
156 
157         let subscribe_event = StreamHubEvent::UnSubscribe {
158             identifier,
159             info: sub_info,
160         };
161         if let Err(err) = self.event_producer.send(subscribe_event) {
162             log::error!("unsubscribe_from_channels err {}", err);
163         }
164 
165         Ok(())
166     }
167 }
168