xref: /xiu/protocol/httpflv/src/httpflv.rs (revision 6623164a)
1 use {
2     super::{
3         define::{tag_type, HttpResponseDataProducer},
4         errors::{HttpFLvError, HttpFLvErrorValue},
5     },
6     crate::rtmp::{
7         cache::metadata::MetaData,
8         channels::define::{ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer},
9         session::{
10             common::SessionInfo,
11             define::SessionSubType,
12             errors::{SessionError, SessionErrorValue},
13         },
14     },
15     bytes::BytesMut,
16     std::time::Duration,
17     tokio::{
18         sync::{mpsc, oneshot},
19         time::sleep,
20     },
21     uuid::Uuid,
22     xflv::muxer::{FlvMuxer, HEADER_LENGTH},
23 };
24 
25 pub struct HttpFlv {
26     app_name: String,
27     stream_name: String,
28 
29     muxer: FlvMuxer,
30 
31     event_producer: ChannelEventProducer,
32     data_consumer: ChannelDataConsumer,
33     http_response_data_producer: HttpResponseDataProducer,
34     subscriber_id: Uuid,
35 }
36 
37 impl HttpFlv {
38     pub fn new(
39         app_name: String,
40         stream_name: String,
41         event_producer: ChannelEventProducer,
42         http_response_data_producer: HttpResponseDataProducer,
43     ) -> Self {
44         let (_, data_consumer) = mpsc::unbounded_channel();
45         let subscriber_id = Uuid::new_v4();
46 
47         Self {
48             app_name,
49             stream_name,
50             muxer: FlvMuxer::new(),
51             data_consumer,
52             event_producer,
53             http_response_data_producer,
54             subscriber_id,
55         }
56     }
57 
58     pub async fn run(&mut self) -> Result<(), HttpFLvError> {
59         self.subscribe_from_rtmp_channels().await?;
60         self.send_media_stream().await?;
61 
62         Ok(())
63     }
64 
65     pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
66         self.muxer.write_flv_header()?;
67         self.muxer.write_previous_tag_size(0)?;
68 
69         self.flush_response_data()?;
70         let mut retry_count = 0;
71         //write flv body
72         loop {
73             if let Some(data) = self.data_consumer.recv().await {
74                 if let Err(err) = self.write_flv_tag(data) {
75                     log::error!("write_flv_tag err: {}", err);
76                     retry_count += 1;
77                 } else {
78                     retry_count = 0;
79                 }
80             } else {
81                 retry_count += 1;
82             }
83             if retry_count > 10 {
84                 break;
85             }
86         }
87         self.unsubscribe_from_rtmp_channels().await
88     }
89 
90     pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> {
91         let common_data: BytesMut;
92         let common_timestamp: u32;
93         let tag_type: u8;
94 
95         match channel_data {
96             ChannelData::Audio { timestamp, data } => {
97                 common_data = data;
98                 common_timestamp = timestamp;
99                 tag_type = tag_type::AUDIO;
100             }
101 
102             ChannelData::Video { timestamp, data } => {
103                 common_data = data;
104                 common_timestamp = timestamp;
105                 tag_type = tag_type::VIDEO;
106             }
107 
108             ChannelData::MetaData { timestamp, data } => {
109                 let mut metadata = MetaData::default();
110                 metadata.save(data);
111                 let data = metadata.remove_set_data_frame()?;
112 
113                 common_data = data;
114                 common_timestamp = timestamp;
115                 tag_type = tag_type::SCRIPT_DATA_AMF;
116             }
117         }
118 
119         let common_data_len = common_data.len() as u32;
120 
121         self.muxer
122             .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?;
123         self.muxer.write_flv_tag_body(common_data)?;
124         self.muxer
125             .write_previous_tag_size(common_data_len + HEADER_LENGTH)?;
126 
127         self.flush_response_data()?;
128 
129         Ok(())
130     }
131 
132     pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
133         let data = self.muxer.writer.extract_current_bytes();
134         self.http_response_data_producer.start_send(Ok(data))?;
135 
136         Ok(())
137     }
138 
139     pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
140         let session_info = SessionInfo {
141             subscriber_id: self.subscriber_id,
142             session_sub_type: SessionSubType::Player,
143         };
144 
145         let subscribe_event = ChannelEvent::UnSubscribe {
146             app_name: self.app_name.clone(),
147             stream_name: self.stream_name.clone(),
148             session_info,
149         };
150         if let Err(err) = self.event_producer.send(subscribe_event) {
151             log::error!("unsubscribe_from_channels err {}\n", err);
152         }
153 
154         Ok(())
155     }
156 
157     pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
158         let mut retry_count: u8 = 0;
159 
160         loop {
161             let (sender, receiver) = oneshot::channel();
162 
163             let session_info = SessionInfo {
164                 subscriber_id: self.subscriber_id,
165                 session_sub_type: SessionSubType::Player,
166             };
167 
168             let subscribe_event = ChannelEvent::Subscribe {
169                 app_name: self.app_name.clone(),
170                 stream_name: self.stream_name.clone(),
171                 session_info: session_info,
172                 responder: sender,
173             };
174 
175             let rv = self.event_producer.send(subscribe_event);
176             match rv {
177                 Err(_) => {
178                     let session_error = SessionError {
179                         value: SessionErrorValue::SendChannelDataErr,
180                     };
181                     return Err(HttpFLvError {
182                         value: HttpFLvErrorValue::SessionError(session_error),
183                     });
184                 }
185                 _ => {}
186             }
187 
188             match receiver.await {
189                 Ok(consumer) => {
190                     self.data_consumer = consumer;
191                     break;
192                 }
193                 Err(_) => {
194                     if retry_count > 10 {
195                         let session_error = SessionError {
196                             value: SessionErrorValue::SubscribeCountLimitReach,
197                         };
198                         return Err(HttpFLvError {
199                             value: HttpFLvErrorValue::SessionError(session_error),
200                         });
201                     }
202                 }
203             }
204 
205             sleep(Duration::from_millis(800)).await;
206             retry_count = retry_count + 1;
207         }
208 
209         Ok(())
210     }
211 }
212