1 use { 2 super::{ 3 define::{SessionSubType, SessionType}, 4 errors::{SessionError, SessionErrorValue}, 5 }, 6 crate::{ 7 channels::define::{ 8 ChannelData, ChannelDataConsumer, ChannelDataProducer, ChannelEvent, 9 ChannelEventProducer, 10 }, 11 chunk::{ 12 define::{chunk_type, csid_type}, 13 packetizer::ChunkPacketizer, 14 ChunkInfo, 15 }, 16 messages::define::msg_type_id, 17 }, 18 bytes::BytesMut, 19 bytesio::bytesio::BytesIO, 20 std::{sync::Arc, time::Duration}, 21 tokio::{ 22 sync::{mpsc, oneshot, Mutex}, 23 time::sleep, 24 }, 25 uuid::Uuid, 26 }; 27 #[derive(Debug)] 28 pub struct SessionInfo { 29 pub subscriber_id: Uuid, 30 pub session_sub_type: SessionSubType, 31 } 32 pub struct Common { 33 packetizer: ChunkPacketizer, 34 35 data_consumer: ChannelDataConsumer, 36 data_producer: ChannelDataProducer, 37 38 event_producer: ChannelEventProducer, 39 pub session_type: SessionType, 40 } 41 42 impl Common { 43 pub fn new( 44 net_io: Arc<Mutex<BytesIO>>, 45 event_producer: ChannelEventProducer, 46 session_type: SessionType, 47 ) -> Self { 48 //only used for init,since I don't found a better way to deal with this. 49 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 50 51 Self { 52 packetizer: ChunkPacketizer::new(Arc::clone(&net_io)), 53 54 data_producer: init_producer, 55 data_consumer: init_consumer, 56 57 event_producer, 58 session_type, 59 } 60 } 61 pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 62 loop { 63 if let Some(data) = self.data_consumer.recv().await { 64 match data { 65 ChannelData::Audio { timestamp, data } => { 66 self.send_audio(data, timestamp).await?; 67 } 68 ChannelData::Video { timestamp, data } => { 69 self.send_video(data, timestamp).await?; 70 } 71 ChannelData::MetaData { timestamp, data } => { 72 self.send_metadata(data, timestamp).await?; 73 } 74 } 75 } 76 } 77 } 78 79 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 80 let mut chunk_info = ChunkInfo::new( 81 csid_type::AUDIO, 82 chunk_type::TYPE_0, 83 timestamp, 84 data.len() as u32, 85 msg_type_id::AUDIO, 86 0, 87 data, 88 ); 89 90 self.packetizer.write_chunk(&mut chunk_info).await?; 91 92 Ok(()) 93 } 94 95 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 96 let mut chunk_info = ChunkInfo::new( 97 csid_type::VIDEO, 98 chunk_type::TYPE_0, 99 timestamp, 100 data.len() as u32, 101 msg_type_id::VIDEO, 102 0, 103 data, 104 ); 105 106 self.packetizer.write_chunk(&mut chunk_info).await?; 107 108 Ok(()) 109 } 110 111 pub async fn send_metadata( 112 &mut self, 113 data: BytesMut, 114 timestamp: u32, 115 ) -> Result<(), SessionError> { 116 let mut chunk_info = ChunkInfo::new( 117 csid_type::DATA_AMF0_AMF3, 118 chunk_type::TYPE_0, 119 timestamp, 120 data.len() as u32, 121 msg_type_id::DATA_AMF0, 122 0, 123 data, 124 ); 125 126 self.packetizer.write_chunk(&mut chunk_info).await?; 127 Ok(()) 128 } 129 130 pub fn on_video_data( 131 &mut self, 132 data: &mut BytesMut, 133 timestamp: &u32, 134 ) -> Result<(), SessionError> { 135 let data = ChannelData::Video { 136 timestamp: timestamp.clone(), 137 data: data.clone(), 138 }; 139 140 match self.data_producer.send(data) { 141 Ok(_) => {} 142 Err(err) => { 143 log::error!("send video err: {}", err); 144 return Err(SessionError { 145 value: SessionErrorValue::SendChannelDataErr, 146 }); 147 } 148 } 149 150 Ok(()) 151 } 152 153 pub fn on_audio_data( 154 &mut self, 155 data: &mut BytesMut, 156 timestamp: &u32, 157 ) -> Result<(), SessionError> { 158 let data = ChannelData::Audio { 159 timestamp: timestamp.clone(), 160 data: data.clone(), 161 }; 162 163 match self.data_producer.send(data) { 164 Ok(_) => {} 165 Err(err) => { 166 log::error!("receive audio err {}\n", err); 167 return Err(SessionError { 168 value: SessionErrorValue::SendChannelDataErr, 169 }); 170 } 171 } 172 173 Ok(()) 174 } 175 176 pub fn on_meta_data( 177 &mut self, 178 body: &mut BytesMut, 179 timestamp: &u32, 180 ) -> Result<(), SessionError> { 181 let data = ChannelData::MetaData { 182 timestamp: timestamp.clone(), 183 data: body.clone(), 184 }; 185 186 match self.data_producer.send(data) { 187 Ok(_) => {} 188 Err(_) => { 189 return Err(SessionError { 190 value: SessionErrorValue::SendChannelDataErr, 191 }) 192 } 193 } 194 195 Ok(()) 196 } 197 198 fn get_session_info(&mut self, sub_id: Uuid) -> SessionInfo { 199 match self.session_type { 200 SessionType::Client => SessionInfo { 201 subscriber_id: sub_id, 202 session_sub_type: SessionSubType::Publisher, 203 }, 204 SessionType::Server => SessionInfo { 205 subscriber_id: sub_id, 206 session_sub_type: SessionSubType::Player, 207 }, 208 } 209 } 210 211 /*Begin to send data to common player or relay pull client*/ 212 pub async fn subscribe_from_channels( 213 &mut self, 214 app_name: String, 215 stream_name: String, 216 sub_id: Uuid, 217 ) -> Result<(), SessionError> { 218 log::info!( 219 "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}", 220 app_name, 221 stream_name.clone(), 222 sub_id 223 ); 224 225 let mut retry_count: u8 = 0; 226 227 loop { 228 let (sender, receiver) = oneshot::channel(); 229 230 let subscribe_event = ChannelEvent::Subscribe { 231 app_name: app_name.clone(), 232 stream_name: stream_name.clone(), 233 session_info: self.get_session_info(sub_id), 234 responder: sender, 235 }; 236 let rv = self.event_producer.send(subscribe_event); 237 match rv { 238 Err(_) => { 239 return Err(SessionError { 240 value: SessionErrorValue::ChannelEventSendErr, 241 }) 242 } 243 _ => {} 244 } 245 246 match receiver.await { 247 Ok(consumer) => { 248 self.data_consumer = consumer; 249 break; 250 } 251 Err(_) => { 252 if retry_count > 10 { 253 return Err(SessionError { 254 value: SessionErrorValue::SubscribeCountLimitReach, 255 }); 256 } 257 } 258 } 259 260 sleep(Duration::from_millis(800)).await; 261 retry_count = retry_count + 1; 262 } 263 264 Ok(()) 265 } 266 267 pub async fn unsubscribe_from_channels( 268 &mut self, 269 app_name: String, 270 stream_name: String, 271 sub_id: Uuid, 272 ) -> Result<(), SessionError> { 273 let subscribe_event = ChannelEvent::UnSubscribe { 274 app_name, 275 stream_name, 276 session_info: self.get_session_info(sub_id), 277 }; 278 if let Err(err) = self.event_producer.send(subscribe_event) { 279 log::error!("unsubscribe_from_channels err {}\n", err); 280 } 281 282 Ok(()) 283 } 284 285 /*Begin to receive stream data from RTMP push client or RTMP relay push client*/ 286 pub async fn publish_to_channels( 287 &mut self, 288 app_name: String, 289 stream_name: String, 290 ) -> Result<(), SessionError> { 291 let (sender, receiver) = oneshot::channel(); 292 let publish_event = ChannelEvent::Publish { 293 app_name, 294 stream_name, 295 responder: sender, 296 }; 297 298 let rv = self.event_producer.send(publish_event); 299 match rv { 300 Err(_) => { 301 return Err(SessionError { 302 value: SessionErrorValue::ChannelEventSendErr, 303 }) 304 } 305 _ => {} 306 } 307 308 match receiver.await { 309 Ok(producer) => { 310 self.data_producer = producer; 311 } 312 Err(err) => { 313 log::error!("publish_to_channels err{}\n", err); 314 } 315 } 316 Ok(()) 317 } 318 319 pub async fn unpublish_to_channels( 320 &mut self, 321 app_name: String, 322 stream_name: String, 323 ) -> Result<(), SessionError> { 324 let unpublish_event = ChannelEvent::UnPublish { 325 app_name: app_name.clone(), 326 stream_name: stream_name.clone(), 327 }; 328 329 let rv = self.event_producer.send(unpublish_event); 330 match rv { 331 Err(_) => { 332 log::error!( 333 "unpublish_to_channels error.app_name: {}, stream_name: {}", 334 app_name, 335 stream_name 336 ); 337 return Err(SessionError { 338 value: SessionErrorValue::ChannelEventSendErr, 339 }); 340 } 341 _ => { 342 log::info!( 343 "unpublish_to_channels successfully.app_name: {}, stream_name: {}", 344 app_name, 345 stream_name 346 ); 347 } 348 } 349 Ok(()) 350 } 351 } 352