xref: /xiu/protocol/httpflv/src/httpflv.rs (revision e05ab47b)
1 use core::time;
2 
3 use crate::define::HttpResponseDataConsumer;
4 
5 use super::define::tag_type;
6 use super::define::HttpResponseDataProducer;
7 use super::errors::HttpFLvError;
8 use super::errors::HttpFLvErrorValue;
9 use byteorder::BigEndian;
10 use networkio::bytes_writer::BytesWriter;
11 use rtmp::amf0::amf0_writer::Amf0Writer;
12 use rtmp::cache::metadata::MetaData;
13 use rtmp::session::common::SessionInfo;
14 use rtmp::session::define::SessionSubType;
15 use rtmp::session::errors::SessionError;
16 use rtmp::session::errors::SessionErrorValue;
17 use {
18     crate::rtmp::channels::define::{
19         ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer,
20     },
21     bytes::BytesMut,
22     networkio::networkio::NetworkIO,
23     std::{sync::Arc, time::Duration},
24     tokio::{
25         sync::{mpsc, oneshot, Mutex},
26         time::sleep,
27     },
28 };
29 
30 const FLV_HEADER: [u8; 9] = [
31     0x46, // 'F'
32     0x4c, //'L'
33     0x56, //'V'
34     0x01, //version
35     0x05, //00000101  audio tag  and video tag
36     0x00, 0x00, 0x00, 0x09, //flv header size
37 ]; // 9
38 const HEADER_LENGTH: u32 = 11;
39 pub struct HttpFlv {
40     app_name: String,
41     stream_name: String,
42     writer: BytesWriter,
43     event_producer: ChannelEventProducer,
44     data_consumer: ChannelDataConsumer,
45 
46     http_response_data_producer: HttpResponseDataProducer,
47 }
48 
49 impl HttpFlv {
50     pub fn new(
51         app_name: String,
52         stream_name: String,
53         event_producer: ChannelEventProducer,
54         http_response_data_producer: HttpResponseDataProducer,
55     ) -> Self {
56         let (_, data_consumer) = mpsc::unbounded_channel();
57 
58         Self {
59             app_name,
60             stream_name,
61             writer: BytesWriter::new(),
62             data_consumer,
63             event_producer,
64             http_response_data_producer,
65         }
66     }
67 
68     pub async fn run(&mut self) -> Result<(), HttpFLvError> {
69         self.subscribe_from_rtmp_channels(self.app_name.clone(), self.stream_name.clone(), 50)
70             .await?;
71 
72         self.send_rtmp_channel_data().await?;
73 
74         Ok(())
75     }
76 
77     pub fn write_flv_header(&mut self) -> Result<(), HttpFLvError> {
78         self.writer.write(&FLV_HEADER)?;
79         Ok(())
80     }
81 
82     pub fn write_previous_tag_size(&mut self, size: u32) -> Result<(), HttpFLvError> {
83         self.writer.write_u32::<BigEndian>(size)?;
84         Ok(())
85     }
86 
87     pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> {
88         match channel_data {
89             ChannelData::Audio { timestamp, data } => {
90                 let len = data.len() as u32;
91                 self.write_flv_tag_header(tag_type::audio, len, timestamp)?;
92                 self.write_previous_tag_size(len + HEADER_LENGTH)?;
93             }
94             ChannelData::Video { timestamp, data } => {
95                 let len = data.len() as u32;
96                 self.write_flv_tag_header(tag_type::video, len, timestamp)?;
97                 self.write_previous_tag_size(len + HEADER_LENGTH)?;
98             }
99             ChannelData::MetaData { body } => {
100                 let mut metadata = MetaData::default();
101                 metadata.save(body);
102                 let body = metadata.remove_set_data_frame()?;
103             }
104         }
105 
106         self.flush_response_data()?;
107 
108         Ok(())
109     }
110 
111     pub fn write_flv_tag_header(
112         &mut self,
113         tag_type: u8,
114         data_size: u32,
115         timestamp: u32,
116     ) -> Result<(), SessionError> {
117         //tag type
118         self.writer.write_u8(tag_type)?;
119         //data size
120         self.writer.write_u24::<BigEndian>(data_size)?;
121         //timestamp
122         self.writer.write_u24::<BigEndian>(timestamp & 0xffffff)?;
123         //timestamp extended.
124         let timestamp_ext = (timestamp >> 24 & 0xff) as u8;
125         self.writer.write_u8(timestamp_ext)?;
126 
127         Ok(())
128     }
129 
130     pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
131         let data = self.writer.extract_current_bytes();
132         self.http_response_data_producer.send(data)?;
133         Ok(())
134     }
135 
136     pub async fn send_rtmp_channel_data(&mut self) -> Result<(), HttpFLvError> {
137         loop {
138             if let Some(data) = self.data_consumer.recv().await {
139                 self.write_flv_tag(data)?;
140             }
141         }
142     }
143 
144     pub async fn subscribe_from_rtmp_channels(
145         &mut self,
146         app_name: String,
147         stream_name: String,
148         session_id: u64,
149     ) -> Result<(), HttpFLvError> {
150         let mut retry_count: u8 = 0;
151 
152         loop {
153             let (sender, receiver) = oneshot::channel();
154 
155             let session_info = SessionInfo {
156                 session_id: session_id,
157                 session_sub_type: SessionSubType::Player,
158             };
159 
160             let subscribe_event = ChannelEvent::Subscribe {
161                 app_name: app_name.clone(),
162                 stream_name: stream_name.clone(),
163                 session_info: session_info,
164                 responder: sender,
165             };
166             let rv = self.event_producer.send(subscribe_event);
167             match rv {
168                 Err(_) => {
169                     let session_error = SessionError {
170                         value: SessionErrorValue::SendChannelDataErr,
171                     };
172                     return Err(HttpFLvError {
173                         value: HttpFLvErrorValue::SessionError(session_error),
174                     });
175                 }
176                 _ => {}
177             }
178 
179             match receiver.await {
180                 Ok(consumer) => {
181                     self.data_consumer = consumer;
182                     break;
183                 }
184                 Err(_) => {
185                     if retry_count > 10 {
186                         let session_error = SessionError {
187                             value: SessionErrorValue::SubscribeCountLimitReach,
188                         };
189                         return Err(HttpFLvError {
190                             value: HttpFLvErrorValue::SessionError(session_error),
191                         });
192                     }
193                 }
194             }
195 
196             sleep(Duration::from_millis(800)).await;
197             retry_count = retry_count + 1;
198         }
199 
200         Ok(())
201     }
202 }
203