1 use core::time; 2 3 use crate::define::HttpResponseDataConsumer; 4 5 use super::define::tag_type; 6 use super::define::HttpResponseDataProducer; 7 use super::errors::HttpFLvError; 8 use super::errors::HttpFLvErrorValue; 9 use byteorder::BigEndian; 10 use networkio::bytes_writer::BytesWriter; 11 use rtmp::amf0::amf0_writer::Amf0Writer; 12 use rtmp::cache::metadata::MetaData; 13 use rtmp::session::common::SessionInfo; 14 use rtmp::session::define::SessionSubType; 15 use rtmp::session::errors::SessionError; 16 use rtmp::session::errors::SessionErrorValue; 17 use { 18 crate::rtmp::channels::define::{ 19 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, ChannelEventProducer, 20 }, 21 bytes::BytesMut, 22 networkio::networkio::NetworkIO, 23 std::{sync::Arc, time::Duration}, 24 tokio::{ 25 sync::{mpsc, oneshot, Mutex}, 26 time::sleep, 27 }, 28 }; 29 30 const FLV_HEADER: [u8; 9] = [ 31 0x46, // 'F' 32 0x4c, //'L' 33 0x56, //'V' 34 0x01, //version 35 0x05, //00000101 audio tag and video tag 36 0x00, 0x00, 0x00, 0x09, //flv header size 37 ]; // 9 38 const HEADER_LENGTH: u32 = 11; 39 pub struct HttpFlv { 40 app_name: String, 41 stream_name: String, 42 writer: BytesWriter, 43 event_producer: ChannelEventProducer, 44 data_consumer: ChannelDataConsumer, 45 46 http_response_data_producer: HttpResponseDataProducer, 47 } 48 49 impl HttpFlv { 50 pub fn new( 51 app_name: String, 52 stream_name: String, 53 event_producer: ChannelEventProducer, 54 http_response_data_producer: HttpResponseDataProducer, 55 ) -> Self { 56 let (_, data_consumer) = mpsc::unbounded_channel(); 57 58 Self { 59 app_name, 60 stream_name, 61 writer: BytesWriter::new(), 62 data_consumer, 63 event_producer, 64 http_response_data_producer, 65 } 66 } 67 68 pub async fn run(&mut self) -> Result<(), HttpFLvError> { 69 self.subscribe_from_rtmp_channels(self.app_name.clone(), self.stream_name.clone(), 50) 70 .await?; 71 72 self.send_rtmp_channel_data().await?; 73 74 Ok(()) 75 } 76 77 pub fn write_flv_header(&mut self) -> Result<(), HttpFLvError> { 78 self.writer.write(&FLV_HEADER)?; 79 Ok(()) 80 } 81 82 pub fn write_previous_tag_size(&mut self, size: u32) -> Result<(), HttpFLvError> { 83 self.writer.write_u32::<BigEndian>(size)?; 84 Ok(()) 85 } 86 87 pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> { 88 match channel_data { 89 ChannelData::Audio { timestamp, data } => { 90 let len = data.len() as u32; 91 self.write_flv_tag_header(tag_type::audio, len, timestamp)?; 92 self.write_previous_tag_size(len + HEADER_LENGTH)?; 93 } 94 ChannelData::Video { timestamp, data } => { 95 let len = data.len() as u32; 96 self.write_flv_tag_header(tag_type::video, len, timestamp)?; 97 self.write_previous_tag_size(len + HEADER_LENGTH)?; 98 } 99 ChannelData::MetaData { body } => { 100 let mut metadata = MetaData::default(); 101 metadata.save(body); 102 let body = metadata.remove_set_data_frame()?; 103 } 104 } 105 106 self.flush_response_data()?; 107 108 Ok(()) 109 } 110 111 pub fn write_flv_tag_header( 112 &mut self, 113 tag_type: u8, 114 data_size: u32, 115 timestamp: u32, 116 ) -> Result<(), SessionError> { 117 //tag type 118 self.writer.write_u8(tag_type)?; 119 //data size 120 self.writer.write_u24::<BigEndian>(data_size)?; 121 //timestamp 122 self.writer.write_u24::<BigEndian>(timestamp & 0xffffff)?; 123 //timestamp extended. 124 let timestamp_ext = (timestamp >> 24 & 0xff) as u8; 125 self.writer.write_u8(timestamp_ext)?; 126 127 Ok(()) 128 } 129 130 pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> { 131 let data = self.writer.extract_current_bytes(); 132 self.http_response_data_producer.send(data)?; 133 Ok(()) 134 } 135 136 pub async fn send_rtmp_channel_data(&mut self) -> Result<(), HttpFLvError> { 137 loop { 138 if let Some(data) = self.data_consumer.recv().await { 139 self.write_flv_tag(data)?; 140 } 141 } 142 } 143 144 pub async fn subscribe_from_rtmp_channels( 145 &mut self, 146 app_name: String, 147 stream_name: String, 148 session_id: u64, 149 ) -> Result<(), HttpFLvError> { 150 let mut retry_count: u8 = 0; 151 152 loop { 153 let (sender, receiver) = oneshot::channel(); 154 155 let session_info = SessionInfo { 156 session_id: session_id, 157 session_sub_type: SessionSubType::Player, 158 }; 159 160 let subscribe_event = ChannelEvent::Subscribe { 161 app_name: app_name.clone(), 162 stream_name: stream_name.clone(), 163 session_info: session_info, 164 responder: sender, 165 }; 166 let rv = self.event_producer.send(subscribe_event); 167 match rv { 168 Err(_) => { 169 let session_error = SessionError { 170 value: SessionErrorValue::SendChannelDataErr, 171 }; 172 return Err(HttpFLvError { 173 value: HttpFLvErrorValue::SessionError(session_error), 174 }); 175 } 176 _ => {} 177 } 178 179 match receiver.await { 180 Ok(consumer) => { 181 self.data_consumer = consumer; 182 break; 183 } 184 Err(_) => { 185 if retry_count > 10 { 186 let session_error = SessionError { 187 value: SessionErrorValue::SubscribeCountLimitReach, 188 }; 189 return Err(HttpFLvError { 190 value: HttpFLvErrorValue::SessionError(session_error), 191 }); 192 } 193 } 194 } 195 196 sleep(Duration::from_millis(800)).await; 197 retry_count = retry_count + 1; 198 } 199 200 Ok(()) 201 } 202 } 203