1 use { 2 super::{ 3 define::{tag_type, HttpResponseDataProducer}, 4 errors::{HttpFLvError, HttpFLvErrorValue}, 5 }, 6 crate::rtmp::{ 7 cache::metadata::MetaData, 8 channels::define::{ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer}, 9 session::{ 10 common::SessionInfo, 11 define::SessionSubType, 12 errors::{SessionError, SessionErrorValue}, 13 }, 14 }, 15 bytes::BytesMut, 16 std::time::Duration, 17 tokio::{ 18 sync::{mpsc, oneshot}, 19 time::sleep, 20 }, 21 uuid::Uuid, 22 xflv::muxer::{FlvMuxer, HEADER_LENGTH}, 23 }; 24 25 pub struct HttpFlv { 26 app_name: String, 27 stream_name: String, 28 29 muxer: FlvMuxer, 30 31 event_producer: ChannelEventProducer, 32 data_consumer: ChannelDataConsumer, 33 http_response_data_producer: HttpResponseDataProducer, 34 subscriber_id: Uuid, 35 } 36 37 impl HttpFlv { 38 pub fn new( 39 app_name: String, 40 stream_name: String, 41 event_producer: ChannelEventProducer, 42 http_response_data_producer: HttpResponseDataProducer, 43 ) -> Self { 44 let (_, data_consumer) = mpsc::unbounded_channel(); 45 let subscriber_id = Uuid::new_v4(); 46 47 Self { 48 app_name, 49 stream_name, 50 muxer: FlvMuxer::new(), 51 data_consumer, 52 event_producer, 53 http_response_data_producer, 54 subscriber_id, 55 } 56 } 57 58 pub async fn run(&mut self) -> Result<(), HttpFLvError> { 59 self.subscribe_from_rtmp_channels().await?; 60 self.send_media_stream().await?; 61 62 Ok(()) 63 } 64 65 pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> { 66 self.muxer.write_flv_header()?; 67 self.muxer.write_previous_tag_size(0)?; 68 69 self.flush_response_data()?; 70 let mut retry_count = 0; 71 //write flv body 72 loop { 73 if let Some(data) = self.data_consumer.recv().await { 74 if let Err(err) = self.write_flv_tag(data) { 75 log::error!("write_flv_tag err: {}", err); 76 retry_count += 1; 77 } else { 78 retry_count = 0; 79 } 80 } else { 81 retry_count += 1; 82 } 83 if retry_count > 10 { 84 break; 85 } 86 } 87 self.unsubscribe_from_rtmp_channels().await 88 } 89 90 pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> { 91 let common_data: BytesMut; 92 let common_timestamp: u32; 93 let tag_type: u8; 94 95 match channel_data { 96 ChannelData::Audio { timestamp, data } => { 97 common_data = data; 98 common_timestamp = timestamp; 99 tag_type = tag_type::AUDIO; 100 } 101 102 ChannelData::Video { timestamp, data } => { 103 common_data = data; 104 common_timestamp = timestamp; 105 tag_type = tag_type::VIDEO; 106 } 107 108 ChannelData::MetaData { timestamp, data } => { 109 let mut metadata = MetaData::default(); 110 metadata.save(data); 111 let data = metadata.remove_set_data_frame()?; 112 113 common_data = data; 114 common_timestamp = timestamp; 115 tag_type = tag_type::SCRIPT_DATA_AMF; 116 } 117 } 118 119 let common_data_len = common_data.len() as u32; 120 121 self.muxer 122 .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?; 123 self.muxer.write_flv_tag_body(common_data)?; 124 self.muxer 125 .write_previous_tag_size(common_data_len + HEADER_LENGTH)?; 126 127 self.flush_response_data()?; 128 129 Ok(()) 130 } 131 132 pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> { 133 let data = self.muxer.writer.extract_current_bytes(); 134 self.http_response_data_producer.start_send(Ok(data))?; 135 136 Ok(()) 137 } 138 139 pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 140 let session_info = SessionInfo { 141 subscriber_id: self.subscriber_id, 142 session_sub_type: SessionSubType::Player, 143 }; 144 145 let subscribe_event = ChannelEvent::UnSubscribe { 146 app_name: self.app_name.clone(), 147 stream_name: self.stream_name.clone(), 148 session_info, 149 }; 150 if let Err(err) = self.event_producer.send(subscribe_event) { 151 log::error!("unsubscribe_from_channels err {}\n", err); 152 } 153 154 Ok(()) 155 } 156 157 pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 158 let mut retry_count: u8 = 0; 159 160 loop { 161 let (sender, receiver) = oneshot::channel(); 162 163 let session_info = SessionInfo { 164 subscriber_id: self.subscriber_id, 165 session_sub_type: SessionSubType::Player, 166 }; 167 168 let subscribe_event = ChannelEvent::Subscribe { 169 app_name: self.app_name.clone(), 170 stream_name: self.stream_name.clone(), 171 session_info: session_info, 172 responder: sender, 173 }; 174 175 let rv = self.event_producer.send(subscribe_event); 176 match rv { 177 Err(_) => { 178 let session_error = SessionError { 179 value: SessionErrorValue::SendChannelDataErr, 180 }; 181 return Err(HttpFLvError { 182 value: HttpFLvErrorValue::SessionError(session_error), 183 }); 184 } 185 _ => {} 186 } 187 188 match receiver.await { 189 Ok(consumer) => { 190 self.data_consumer = consumer; 191 break; 192 } 193 Err(_) => { 194 if retry_count > 10 { 195 let session_error = SessionError { 196 value: SessionErrorValue::SubscribeCountLimitReach, 197 }; 198 return Err(HttpFLvError { 199 value: HttpFLvErrorValue::SessionError(session_error), 200 }); 201 } 202 } 203 } 204 205 sleep(Duration::from_millis(800)).await; 206 retry_count = retry_count + 1; 207 } 208 209 Ok(()) 210 } 211 } 212