1 use tokio::sync::oneshot; 2 use { 3 super::{ 4 define::{tag_type, HttpResponseDataProducer}, 5 errors::{HttpFLvError, HttpFLvErrorValue}, 6 }, 7 crate::rtmp::{ 8 cache::metadata::MetaData, 9 session::errors::{SessionError, SessionErrorValue}, 10 }, 11 bytes::BytesMut, 12 std::net::SocketAddr, 13 streamhub::define::{ 14 FrameData, FrameDataReceiver, NotifyInfo, StreamHubEvent, StreamHubEventSender, 15 SubDataType, SubscribeType, SubscriberInfo, 16 }, 17 streamhub::{ 18 stream::StreamIdentifier, 19 utils::{RandomDigitCount, Uuid}, 20 }, 21 tokio::sync::mpsc, 22 xflv::muxer::{FlvMuxer, HEADER_LENGTH}, 23 }; 24 25 pub struct HttpFlv { 26 app_name: String, 27 stream_name: String, 28 29 muxer: FlvMuxer, 30 31 event_producer: StreamHubEventSender, 32 data_consumer: FrameDataReceiver, 33 http_response_data_producer: HttpResponseDataProducer, 34 subscriber_id: Uuid, 35 request_url: String, 36 remote_addr: SocketAddr, 37 } 38 39 impl HttpFlv { new( app_name: String, stream_name: String, event_producer: StreamHubEventSender, http_response_data_producer: HttpResponseDataProducer, request_url: String, remote_addr: SocketAddr, ) -> Self40 pub fn new( 41 app_name: String, 42 stream_name: String, 43 event_producer: StreamHubEventSender, 44 http_response_data_producer: HttpResponseDataProducer, 45 request_url: String, 46 remote_addr: SocketAddr, 47 ) -> Self { 48 let (_, data_consumer) = mpsc::unbounded_channel(); 49 let subscriber_id = Uuid::new(RandomDigitCount::Four); 50 51 Self { 52 app_name, 53 stream_name, 54 muxer: FlvMuxer::new(), 55 data_consumer, 56 event_producer, 57 http_response_data_producer, 58 subscriber_id, 59 request_url, 60 remote_addr, 61 } 62 } 63 run(&mut self) -> Result<(), HttpFLvError>64 pub async fn run(&mut self) -> Result<(), HttpFLvError> { 65 self.subscribe_from_rtmp_channels().await?; 66 self.send_media_stream().await?; 67 68 Ok(()) 69 } 70 send_media_stream(&mut self) -> Result<(), HttpFLvError>71 pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> { 72 self.muxer.write_flv_header()?; 73 self.muxer.write_previous_tag_size(0)?; 74 75 self.flush_response_data()?; 76 let mut retry_count = 0; 77 //write flv body 78 loop { 79 if let Some(data) = self.data_consumer.recv().await { 80 if let Err(err) = self.write_flv_tag(data) { 81 if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value { 82 if err_in.is_disconnected() { 83 log::info!("write_flv_tag: {}", err_in); 84 break; 85 } 86 } 87 log::error!("write_flv_tag err: {}", err); 88 retry_count += 1; 89 } else { 90 retry_count = 0; 91 } 92 } else { 93 retry_count += 1; 94 } 95 if retry_count > 10 { 96 break; 97 } 98 } 99 self.unsubscribe_from_rtmp_channels().await 100 } 101 write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError>102 pub fn write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError> { 103 let (common_data, common_timestamp, tag_type) = match channel_data { 104 FrameData::Audio { timestamp, data } => (data, timestamp, tag_type::AUDIO), 105 FrameData::Video { timestamp, data } => (data, timestamp, tag_type::VIDEO), 106 FrameData::MetaData { timestamp, data } => { 107 let mut metadata = MetaData::new(); 108 metadata.save(&data); 109 let data = metadata.remove_set_data_frame()?; 110 111 (data, timestamp, tag_type::SCRIPT_DATA_AMF) 112 } 113 _ => { 114 log::error!("should not be here!!!"); 115 (BytesMut::new(), 0, 0) 116 } 117 }; 118 119 let common_data_len = common_data.len() as u32; 120 121 self.muxer 122 .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?; 123 self.muxer.write_flv_tag_body(common_data)?; 124 self.muxer 125 .write_previous_tag_size(common_data_len + HEADER_LENGTH)?; 126 127 self.flush_response_data()?; 128 129 Ok(()) 130 } 131 flush_response_data(&mut self) -> Result<(), HttpFLvError>132 pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> { 133 let data = self.muxer.writer.extract_current_bytes(); 134 self.http_response_data_producer.start_send(Ok(data))?; 135 136 Ok(()) 137 } 138 unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>139 pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 140 let sub_info = SubscriberInfo { 141 id: self.subscriber_id, 142 sub_type: SubscribeType::PlayerHttpFlv, 143 sub_data_type: SubDataType::Frame, 144 notify_info: NotifyInfo { 145 request_url: self.request_url.clone(), 146 remote_addr: self.remote_addr.to_string(), 147 }, 148 }; 149 150 let identifier = StreamIdentifier::Rtmp { 151 app_name: self.app_name.clone(), 152 stream_name: self.stream_name.clone(), 153 }; 154 155 let subscribe_event = StreamHubEvent::UnSubscribe { 156 identifier, 157 info: sub_info, 158 }; 159 if let Err(err) = self.event_producer.send(subscribe_event) { 160 log::error!("unsubscribe_from_channels err {}", err); 161 } 162 163 Ok(()) 164 } 165 subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError>166 pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> { 167 let sub_info = SubscriberInfo { 168 id: self.subscriber_id, 169 sub_type: SubscribeType::PlayerHttpFlv, 170 sub_data_type: SubDataType::Frame, 171 notify_info: NotifyInfo { 172 request_url: self.request_url.clone(), 173 remote_addr: self.remote_addr.to_string(), 174 }, 175 }; 176 177 let identifier = StreamIdentifier::Rtmp { 178 app_name: self.app_name.clone(), 179 stream_name: self.stream_name.clone(), 180 }; 181 182 let (event_result_sender, event_result_receiver) = oneshot::channel(); 183 184 let subscribe_event = StreamHubEvent::Subscribe { 185 identifier, 186 info: sub_info, 187 result_sender: event_result_sender, 188 }; 189 190 let rv = self.event_producer.send(subscribe_event); 191 if rv.is_err() { 192 let session_error = SessionError { 193 value: SessionErrorValue::SendFrameDataErr, 194 }; 195 return Err(HttpFLvError { 196 value: HttpFLvErrorValue::SessionError(session_error), 197 }); 198 } 199 200 let receiver = event_result_receiver.await??.frame_receiver.unwrap(); 201 self.data_consumer = receiver; 202 203 Ok(()) 204 } 205 } 206