xref: /xiu/protocol/httpflv/src/httpflv.rs (revision a4ef5d6c)
1*a4ef5d6cSHarlanC use tokio::sync::oneshot;
20c504437SHarlanC use {
3a3d19cccSHarlanC     super::{
4a3d19cccSHarlanC         define::{tag_type, HttpResponseDataProducer},
5a3d19cccSHarlanC         errors::{HttpFLvError, HttpFLvErrorValue},
6a3d19cccSHarlanC     },
7a3d19cccSHarlanC     crate::rtmp::{
8a3d19cccSHarlanC         cache::metadata::MetaData,
98e71d710SHarlan         session::errors::{SessionError, SessionErrorValue},
100c504437SHarlanC     },
110c504437SHarlanC     bytes::BytesMut,
128e71d710SHarlan     std::net::SocketAddr,
138e71d710SHarlan     streamhub::define::{
148e71d710SHarlan         FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender,
15*a4ef5d6cSHarlanC         SubDataType, SubscribeType, SubscriberInfo,
160c504437SHarlanC     },
178e71d710SHarlan     streamhub::{
188e71d710SHarlan         stream::StreamIdentifier,
198e71d710SHarlan         utils::{RandomDigitCount, Uuid},
208e71d710SHarlan     },
218e71d710SHarlan     tokio::sync::mpsc,
22740804e8SHarlanC     xflv::muxer::{FlvMuxer, HEADER_LENGTH},
230c504437SHarlanC };
240c504437SHarlanC 
250c504437SHarlanC pub struct HttpFlv {
26e05ab47bSHarlanC     app_name: String,
27e05ab47bSHarlanC     stream_name: String,
2888d91efdSHarlanC 
299eba8d65SHarlanC     muxer: FlvMuxer,
3088d91efdSHarlanC 
318e71d710SHarlan     event_producer: StreamHubEventSender,
328e71d710SHarlan     data_consumer: FrameDataReceiver,
33e05ab47bSHarlanC     http_response_data_producer: HttpResponseDataProducer,
3488d91efdSHarlanC     subscriber_id: Uuid,
35976f65a6SHarlan     request_url: String,
36976f65a6SHarlan     remote_addr: SocketAddr,
370c504437SHarlanC }
380c504437SHarlanC 
390c504437SHarlanC impl HttpFlv {
new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, http_response_data_producer: HttpResponseDataProducer, request_url: String, remote_addr: SocketAddr, ) -> Self40e05ab47bSHarlanC     pub fn new(
41e05ab47bSHarlanC         app_name: String,
42e05ab47bSHarlanC         stream_name: String,
438e71d710SHarlan         event_producer: StreamHubEventSender,
44e05ab47bSHarlanC         http_response_data_producer: HttpResponseDataProducer,
45976f65a6SHarlan         request_url: String,
46976f65a6SHarlan         remote_addr: SocketAddr,
47e05ab47bSHarlanC     ) -> Self {
480c504437SHarlanC         let (_, data_consumer) = mpsc::unbounded_channel();
498e71d710SHarlan         let subscriber_id = Uuid::new(RandomDigitCount::Four);
50e05ab47bSHarlanC 
510c504437SHarlanC         Self {
52e05ab47bSHarlanC             app_name,
53e05ab47bSHarlanC             stream_name,
549eba8d65SHarlanC             muxer: FlvMuxer::new(),
550c504437SHarlanC             data_consumer,
560c504437SHarlanC             event_producer,
57e05ab47bSHarlanC             http_response_data_producer,
5888d91efdSHarlanC             subscriber_id,
59976f65a6SHarlan             request_url,
60976f65a6SHarlan             remote_addr,
610c504437SHarlanC         }
620c504437SHarlanC     }
630c504437SHarlanC 
run(&mut self) -> Result<(), HttpFLvError>64e05ab47bSHarlanC     pub async fn run(&mut self) -> Result<(), HttpFLvError> {
6588d91efdSHarlanC         self.subscribe_from_rtmp_channels().await?;
66740804e8SHarlanC         self.send_media_stream().await?;
67e05ab47bSHarlanC 
68e05ab47bSHarlanC         Ok(())
69e05ab47bSHarlanC     }
70e05ab47bSHarlanC 
send_media_stream(&mut self) -> Result<(), HttpFLvError>71740804e8SHarlanC     pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
729eba8d65SHarlanC         self.muxer.write_flv_header()?;
739eba8d65SHarlanC         self.muxer.write_previous_tag_size(0)?;
74740804e8SHarlanC 
75b10c2c1dSHarlanC         self.flush_response_data()?;
76740804e8SHarlanC         let mut retry_count = 0;
77b10c2c1dSHarlanC         //write flv body
78b10c2c1dSHarlanC         loop {
79b10c2c1dSHarlanC             if let Some(data) = self.data_consumer.recv().await {
80740804e8SHarlanC                 if let Err(err) = self.write_flv_tag(data) {
81*a4ef5d6cSHarlanC                     if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value {
82*a4ef5d6cSHarlanC                         if err_in.is_disconnected() {
83*a4ef5d6cSHarlanC                             log::info!("write_flv_tag: {}", err_in);
84dcd08966Sningnao                             break;
85dcd08966Sningnao                         }
86*a4ef5d6cSHarlanC                     }
87740804e8SHarlanC                     log::error!("write_flv_tag err: {}", err);
88740804e8SHarlanC                     retry_count += 1;
89740804e8SHarlanC                 } else {
90740804e8SHarlanC                     retry_count = 0;
91740804e8SHarlanC                 }
92740804e8SHarlanC             } else {
93740804e8SHarlanC                 retry_count += 1;
94740804e8SHarlanC             }
95740804e8SHarlanC             if retry_count > 10 {
96740804e8SHarlanC                 break;
97b10c2c1dSHarlanC             }
98b10c2c1dSHarlanC         }
9988d91efdSHarlanC         self.unsubscribe_from_rtmp_channels().await
100b10c2c1dSHarlanC     }
101b10c2c1dSHarlanC 
write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError>1028e71d710SHarlan     pub fn write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError> {
103b36cf5daSHarlan         let (common_data, common_timestamp, tag_type) = match channel_data {
104b36cf5daSHarlan             FrameData::Audio { timestamp, data } => (data, timestamp, tag_type::AUDIO),
105b36cf5daSHarlan             FrameData::Video { timestamp, data } => (data, timestamp, tag_type::VIDEO),
1068e71d710SHarlan             FrameData::MetaData { timestamp, data } => {
1070ca99c20SHarlan                 let mut metadata = MetaData::new();
1088e71d710SHarlan                 metadata.save(&data);
10995a688c2SHarlanC                 let data = metadata.remove_set_data_frame()?;
11056ad483aSHarlanC 
111b36cf5daSHarlan                 (data, timestamp, tag_type::SCRIPT_DATA_AMF)
112783195d9SHarlanC             }
113b36cf5daSHarlan             _ => {
114b36cf5daSHarlan                 log::error!("should not be here!!!");
115b36cf5daSHarlan                 (BytesMut::new(), 0, 0)
116783195d9SHarlanC             }
117b36cf5daSHarlan         };
118783195d9SHarlanC 
11956ad483aSHarlanC         let common_data_len = common_data.len() as u32;
12056ad483aSHarlanC 
12156ad483aSHarlanC         self.muxer
12256ad483aSHarlanC             .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?;
12356ad483aSHarlanC         self.muxer.write_flv_tag_body(common_data)?;
12456ad483aSHarlanC         self.muxer
12556ad483aSHarlanC             .write_previous_tag_size(common_data_len + HEADER_LENGTH)?;
12656ad483aSHarlanC 
127e05ab47bSHarlanC         self.flush_response_data()?;
12856ad483aSHarlanC 
129783195d9SHarlanC         Ok(())
1300c504437SHarlanC     }
1310c504437SHarlanC 
flush_response_data(&mut self) -> Result<(), HttpFLvError>132e05ab47bSHarlanC     pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
1339eba8d65SHarlanC         let data = self.muxer.writer.extract_current_bytes();
134*a4ef5d6cSHarlanC         self.http_response_data_producer.start_send(Ok(data))?;
135740804e8SHarlanC 
136740804e8SHarlanC         Ok(())
137740804e8SHarlanC     }
138740804e8SHarlanC 
unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>13988d91efdSHarlanC     pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
140ccd9a1faSHarlan         let sub_info = SubscriberInfo {
141ccd9a1faSHarlan             id: self.subscriber_id,
142ccd9a1faSHarlan             sub_type: SubscribeType::PlayerHttpFlv,
143*a4ef5d6cSHarlanC             sub_data_type: SubDataType::Frame,
144976f65a6SHarlan             notify_info: NotifyInfo {
145976f65a6SHarlan                 request_url: self.request_url.clone(),
146976f65a6SHarlan                 remote_addr: self.remote_addr.to_string(),
147976f65a6SHarlan             },
148740804e8SHarlanC         };
149740804e8SHarlanC 
1508e71d710SHarlan         let identifier = StreamIdentifier::Rtmp {
151740804e8SHarlanC             app_name: self.app_name.clone(),
152740804e8SHarlanC             stream_name: self.stream_name.clone(),
1538e71d710SHarlan         };
1548e71d710SHarlan 
1558e71d710SHarlan         let subscribe_event = StreamHubEvent::UnSubscribe {
1568e71d710SHarlan             identifier,
157ccd9a1faSHarlan             info: sub_info,
158740804e8SHarlanC         };
159740804e8SHarlanC         if let Err(err) = self.event_producer.send(subscribe_event) {
16069de9bbdSHarlanC             log::error!("unsubscribe_from_channels err {}", err);
161740804e8SHarlanC         }
162740804e8SHarlanC 
163e05ab47bSHarlanC         Ok(())
164e05ab47bSHarlanC     }
165e05ab47bSHarlanC 
subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>16688d91efdSHarlanC     pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
167ccd9a1faSHarlan         let sub_info = SubscriberInfo {
168ccd9a1faSHarlan             id: self.subscriber_id,
169ccd9a1faSHarlan             sub_type: SubscribeType::PlayerHttpFlv,
170*a4ef5d6cSHarlanC             sub_data_type: SubDataType::Frame,
171976f65a6SHarlan             notify_info: NotifyInfo {
172976f65a6SHarlan                 request_url: self.request_url.clone(),
173976f65a6SHarlan                 remote_addr: self.remote_addr.to_string(),
174976f65a6SHarlan             },
1750c504437SHarlanC         };
1760c504437SHarlanC 
1778e71d710SHarlan         let identifier = StreamIdentifier::Rtmp {
178740804e8SHarlanC             app_name: self.app_name.clone(),
179740804e8SHarlanC             stream_name: self.stream_name.clone(),
1808e71d710SHarlan         };
1818e71d710SHarlan 
182*a4ef5d6cSHarlanC         let (event_result_sender, event_result_receiver) = oneshot::channel();
183*a4ef5d6cSHarlanC 
1848e71d710SHarlan         let subscribe_event = StreamHubEvent::Subscribe {
1858e71d710SHarlan             identifier,
186ccd9a1faSHarlan             info: sub_info,
187*a4ef5d6cSHarlanC             result_sender: event_result_sender,
1880c504437SHarlanC         };
18988325f54SHarlanC 
1900c504437SHarlanC         let rv = self.event_producer.send(subscribe_event);
191a6684c8dSHarlan         if rv.is_err() {
1920c504437SHarlanC             let session_error = SessionError {
1938e71d710SHarlan                 value: SessionErrorValue::SendFrameDataErr,
1940c504437SHarlanC             };
1950c504437SHarlanC             return Err(HttpFLvError {
1960c504437SHarlanC                 value: HttpFLvErrorValue::SessionError(session_error),
1970c504437SHarlanC             });
1980c504437SHarlanC         }
1990c504437SHarlanC 
200*a4ef5d6cSHarlanC         let receiver = event_result_receiver.await??.frame_receiver.unwrap();
2018e71d710SHarlan         self.data_consumer = receiver;
2020c504437SHarlanC 
2030c504437SHarlanC         Ok(())
2040c504437SHarlanC     }
2050c504437SHarlanC }
206