xref: /xiu/protocol/httpflv/src/httpflv.rs (revision a4ef5d6c)
1 use tokio::sync::oneshot;
2 use {
3     super::{
4         define::{tag_type, HttpResponseDataProducer},
5         errors::{HttpFLvError, HttpFLvErrorValue},
6     },
7     crate::rtmp::{
8         cache::metadata::MetaData,
9         session::errors::{SessionError, SessionErrorValue},
10     },
11     bytes::BytesMut,
12     std::net::SocketAddr,
13     streamhub::define::{
14         FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender,
15         SubDataType, SubscribeType, SubscriberInfo,
16     },
17     streamhub::{
18         stream::StreamIdentifier,
19         utils::{RandomDigitCount, Uuid},
20     },
21     tokio::sync::mpsc,
22     xflv::muxer::{FlvMuxer, HEADER_LENGTH},
23 };
24 
25 pub struct HttpFlv {
26     app_name: String,
27     stream_name: String,
28 
29     muxer: FlvMuxer,
30 
31     event_producer: StreamHubEventSender,
32     data_consumer: FrameDataReceiver,
33     http_response_data_producer: HttpResponseDataProducer,
34     subscriber_id: Uuid,
35     request_url: String,
36     remote_addr: SocketAddr,
37 }
38 
39 impl HttpFlv {
new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, http_response_data_producer: HttpResponseDataProducer, request_url: String, remote_addr: SocketAddr, ) -> Self40     pub fn new(
41         app_name: String,
42         stream_name: String,
43         event_producer: StreamHubEventSender,
44         http_response_data_producer: HttpResponseDataProducer,
45         request_url: String,
46         remote_addr: SocketAddr,
47     ) -> Self {
48         let (_, data_consumer) = mpsc::unbounded_channel();
49         let subscriber_id = Uuid::new(RandomDigitCount::Four);
50 
51         Self {
52             app_name,
53             stream_name,
54             muxer: FlvMuxer::new(),
55             data_consumer,
56             event_producer,
57             http_response_data_producer,
58             subscriber_id,
59             request_url,
60             remote_addr,
61         }
62     }
63 
run(&mut self) -> Result<(), HttpFLvError>64     pub async fn run(&mut self) -> Result<(), HttpFLvError> {
65         self.subscribe_from_rtmp_channels().await?;
66         self.send_media_stream().await?;
67 
68         Ok(())
69     }
70 
send_media_stream(&mut self) -> Result<(), HttpFLvError>71     pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
72         self.muxer.write_flv_header()?;
73         self.muxer.write_previous_tag_size(0)?;
74 
75         self.flush_response_data()?;
76         let mut retry_count = 0;
77         //write flv body
78         loop {
79             if let Some(data) = self.data_consumer.recv().await {
80                 if let Err(err) = self.write_flv_tag(data) {
81                     if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value {
82                         if err_in.is_disconnected() {
83                             log::info!("write_flv_tag: {}", err_in);
84                             break;
85                         }
86                     }
87                     log::error!("write_flv_tag err: {}", err);
88                     retry_count += 1;
89                 } else {
90                     retry_count = 0;
91                 }
92             } else {
93                 retry_count += 1;
94             }
95             if retry_count > 10 {
96                 break;
97             }
98         }
99         self.unsubscribe_from_rtmp_channels().await
100     }
101 
write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError>102     pub fn write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError> {
103         let (common_data, common_timestamp, tag_type) = match channel_data {
104             FrameData::Audio { timestamp, data } => (data, timestamp, tag_type::AUDIO),
105             FrameData::Video { timestamp, data } => (data, timestamp, tag_type::VIDEO),
106             FrameData::MetaData { timestamp, data } => {
107                 let mut metadata = MetaData::new();
108                 metadata.save(&data);
109                 let data = metadata.remove_set_data_frame()?;
110 
111                 (data, timestamp, tag_type::SCRIPT_DATA_AMF)
112             }
113             _ => {
114                 log::error!("should not be here!!!");
115                 (BytesMut::new(), 0, 0)
116             }
117         };
118 
119         let common_data_len = common_data.len() as u32;
120 
121         self.muxer
122             .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?;
123         self.muxer.write_flv_tag_body(common_data)?;
124         self.muxer
125             .write_previous_tag_size(common_data_len + HEADER_LENGTH)?;
126 
127         self.flush_response_data()?;
128 
129         Ok(())
130     }
131 
flush_response_data(&mut self) -> Result<(), HttpFLvError>132     pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
133         let data = self.muxer.writer.extract_current_bytes();
134         self.http_response_data_producer.start_send(Ok(data))?;
135 
136         Ok(())
137     }
138 
unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>139     pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
140         let sub_info = SubscriberInfo {
141             id: self.subscriber_id,
142             sub_type: SubscribeType::PlayerHttpFlv,
143             sub_data_type: SubDataType::Frame,
144             notify_info: NotifyInfo {
145                 request_url: self.request_url.clone(),
146                 remote_addr: self.remote_addr.to_string(),
147             },
148         };
149 
150         let identifier = StreamIdentifier::Rtmp {
151             app_name: self.app_name.clone(),
152             stream_name: self.stream_name.clone(),
153         };
154 
155         let subscribe_event = StreamHubEvent::UnSubscribe {
156             identifier,
157             info: sub_info,
158         };
159         if let Err(err) = self.event_producer.send(subscribe_event) {
160             log::error!("unsubscribe_from_channels err {}", err);
161         }
162 
163         Ok(())
164     }
165 
subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>166     pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
167         let sub_info = SubscriberInfo {
168             id: self.subscriber_id,
169             sub_type: SubscribeType::PlayerHttpFlv,
170             sub_data_type: SubDataType::Frame,
171             notify_info: NotifyInfo {
172                 request_url: self.request_url.clone(),
173                 remote_addr: self.remote_addr.to_string(),
174             },
175         };
176 
177         let identifier = StreamIdentifier::Rtmp {
178             app_name: self.app_name.clone(),
179             stream_name: self.stream_name.clone(),
180         };
181 
182         let (event_result_sender, event_result_receiver) = oneshot::channel();
183 
184         let subscribe_event = StreamHubEvent::Subscribe {
185             identifier,
186             info: sub_info,
187             result_sender: event_result_sender,
188         };
189 
190         let rv = self.event_producer.send(subscribe_event);
191         if rv.is_err() {
192             let session_error = SessionError {
193                 value: SessionErrorValue::SendFrameDataErr,
194             };
195             return Err(HttpFLvError {
196                 value: HttpFLvErrorValue::SessionError(session_error),
197             });
198         }
199 
200         let receiver = event_result_receiver.await??.frame_receiver.unwrap();
201         self.data_consumer = receiver;
202 
203         Ok(())
204     }
205 }
206