1 use streamhub::define::DataSender; 2 use tokio::sync::oneshot; 3 4 use { 5 super::{ 6 define::SessionType, 7 errors::{SessionError, SessionErrorValue}, 8 }, 9 crate::{ 10 cache::errors::CacheError, 11 cache::Cache, 12 chunk::{ 13 define::{chunk_type, csid_type}, 14 packetizer::ChunkPacketizer, 15 ChunkInfo, 16 }, 17 messages::define::msg_type_id, 18 }, 19 async_trait::async_trait, 20 bytes::BytesMut, 21 std::fmt, 22 std::{net::SocketAddr, sync::Arc}, 23 streamhub::{ 24 define::{ 25 FrameData, FrameDataReceiver, FrameDataSender, InformationSender, NotifyInfo, 26 PublishType, PublisherInfo, StreamHubEvent, StreamHubEventSender, SubscribeType, 27 SubscriberInfo, TStreamHandler, 28 }, 29 errors::{ChannelError, ChannelErrorValue}, 30 statistics::StreamStatistics, 31 stream::StreamIdentifier, 32 utils::Uuid, 33 }, 34 tokio::sync::{mpsc, Mutex}, 35 }; 36 37 pub struct Common { 38 //only Server Subscriber or Client Publisher needs to send out trunck data. 39 packetizer: Option<ChunkPacketizer>, 40 41 data_receiver: FrameDataReceiver, 42 data_sender: FrameDataSender, 43 44 event_producer: StreamHubEventSender, 45 pub session_type: SessionType, 46 47 /*save the client side socket connected to the SeverSession */ 48 remote_addr: Option<SocketAddr>, 49 /*request URL from client*/ 50 pub request_url: String, 51 pub stream_handler: Arc<RtmpStreamHandler>, 52 } 53 54 impl Common { new( packetizer: Option<ChunkPacketizer>, event_producer: StreamHubEventSender, session_type: SessionType, remote_addr: Option<SocketAddr>, ) -> Self55 pub fn new( 56 packetizer: Option<ChunkPacketizer>, 57 event_producer: StreamHubEventSender, 58 session_type: SessionType, 59 remote_addr: Option<SocketAddr>, 60 ) -> Self { 61 //only used for init,since I don't found a better way to deal with this. 62 let (init_producer, init_consumer) = mpsc::unbounded_channel(); 63 64 Self { 65 packetizer, 66 67 data_sender: init_producer, 68 data_receiver: init_consumer, 69 70 event_producer, 71 session_type, 72 remote_addr, 73 request_url: String::default(), 74 stream_handler: Arc::new(RtmpStreamHandler::new()), 75 //cache: None, 76 } 77 } send_channel_data(&mut self) -> Result<(), SessionError>78 pub async fn send_channel_data(&mut self) -> Result<(), SessionError> { 79 let mut retry_times = 0; 80 loop { 81 if let Some(data) = self.data_receiver.recv().await { 82 match data { 83 FrameData::Audio { timestamp, data } => { 84 self.send_audio(data, timestamp).await?; 85 } 86 FrameData::Video { timestamp, data } => { 87 self.send_video(data, timestamp).await?; 88 } 89 FrameData::MetaData { timestamp, data } => { 90 self.send_metadata(data, timestamp).await?; 91 } 92 _ => {} 93 } 94 } else { 95 retry_times += 1; 96 log::debug!( 97 "send_channel_data: no data receives ,retry {} times!", 98 retry_times 99 ); 100 101 if retry_times > 10 { 102 return Err(SessionError { 103 value: SessionErrorValue::NoMediaDataReceived, 104 }); 105 } 106 } 107 } 108 } 109 send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>110 pub async fn send_audio(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 111 let mut chunk_info = ChunkInfo::new( 112 csid_type::AUDIO, 113 chunk_type::TYPE_0, 114 timestamp, 115 data.len() as u32, 116 msg_type_id::AUDIO, 117 0, 118 data, 119 ); 120 121 if let Some(packetizer) = &mut self.packetizer { 122 packetizer.write_chunk(&mut chunk_info).await?; 123 } 124 125 Ok(()) 126 } 127 send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError>128 pub async fn send_video(&mut self, data: BytesMut, timestamp: u32) -> Result<(), SessionError> { 129 let mut chunk_info = ChunkInfo::new( 130 csid_type::VIDEO, 131 chunk_type::TYPE_0, 132 timestamp, 133 data.len() as u32, 134 msg_type_id::VIDEO, 135 0, 136 data, 137 ); 138 139 if let Some(packetizer) = &mut self.packetizer { 140 packetizer.write_chunk(&mut chunk_info).await?; 141 } 142 143 Ok(()) 144 } 145 send_metadata( &mut self, data: BytesMut, timestamp: u32, ) -> Result<(), SessionError>146 pub async fn send_metadata( 147 &mut self, 148 data: BytesMut, 149 timestamp: u32, 150 ) -> Result<(), SessionError> { 151 let mut chunk_info = ChunkInfo::new( 152 csid_type::DATA_AMF0_AMF3, 153 chunk_type::TYPE_0, 154 timestamp, 155 data.len() as u32, 156 msg_type_id::DATA_AMF0, 157 0, 158 data, 159 ); 160 161 if let Some(packetizer) = &mut self.packetizer { 162 packetizer.write_chunk(&mut chunk_info).await?; 163 } 164 165 Ok(()) 166 } 167 on_video_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>168 pub async fn on_video_data( 169 &mut self, 170 data: &mut BytesMut, 171 timestamp: &u32, 172 ) -> Result<(), SessionError> { 173 let channel_data = FrameData::Video { 174 timestamp: *timestamp, 175 data: data.clone(), 176 }; 177 178 match self.data_sender.send(channel_data) { 179 Ok(_) => {} 180 Err(err) => { 181 log::error!("send video err: {}", err); 182 return Err(SessionError { 183 value: SessionErrorValue::SendFrameDataErr, 184 }); 185 } 186 } 187 188 self.stream_handler 189 .save_video_data(data, *timestamp) 190 .await?; 191 192 Ok(()) 193 } 194 on_audio_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>195 pub async fn on_audio_data( 196 &mut self, 197 data: &mut BytesMut, 198 timestamp: &u32, 199 ) -> Result<(), SessionError> { 200 let channel_data = FrameData::Audio { 201 timestamp: *timestamp, 202 data: data.clone(), 203 }; 204 205 match self.data_sender.send(channel_data) { 206 Ok(_) => {} 207 Err(err) => { 208 log::error!("receive audio err {}", err); 209 return Err(SessionError { 210 value: SessionErrorValue::SendFrameDataErr, 211 }); 212 } 213 } 214 215 self.stream_handler 216 .save_audio_data(data, *timestamp) 217 .await?; 218 219 Ok(()) 220 } 221 on_meta_data( &mut self, data: &mut BytesMut, timestamp: &u32, ) -> Result<(), SessionError>222 pub async fn on_meta_data( 223 &mut self, 224 data: &mut BytesMut, 225 timestamp: &u32, 226 ) -> Result<(), SessionError> { 227 let channel_data = FrameData::MetaData { 228 timestamp: *timestamp, 229 data: data.clone(), 230 }; 231 232 match self.data_sender.send(channel_data) { 233 Ok(_) => {} 234 Err(_) => { 235 return Err(SessionError { 236 value: SessionErrorValue::SendFrameDataErr, 237 }) 238 } 239 } 240 241 self.stream_handler.save_metadata(data, *timestamp).await; 242 243 Ok(()) 244 } 245 get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo246 fn get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo { 247 let remote_addr = if let Some(addr) = self.remote_addr { 248 addr.to_string() 249 } else { 250 String::from("unknown") 251 }; 252 253 let sub_type = match self.session_type { 254 SessionType::Client => SubscribeType::PublisherRtmp, 255 SessionType::Server => SubscribeType::PlayerRtmp, 256 }; 257 258 SubscriberInfo { 259 id: sub_id, 260 /*rtmp local client subscribe from local rtmp session 261 and publish(relay) the rtmp steam to remote RTMP server*/ 262 sub_type, 263 sub_data_type: streamhub::define::SubDataType::Frame, 264 notify_info: NotifyInfo { 265 request_url: self.request_url.clone(), 266 remote_addr, 267 }, 268 } 269 } 270 get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo271 fn get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo { 272 let remote_addr = if let Some(addr) = self.remote_addr { 273 addr.to_string() 274 } else { 275 String::from("unknown") 276 }; 277 278 let pub_type = match self.session_type { 279 SessionType::Client => PublishType::RelayRtmp, 280 SessionType::Server => PublishType::PushRtmp, 281 }; 282 283 PublisherInfo { 284 id: sub_id, 285 pub_type, 286 pub_data_type: streamhub::define::PubDataType::Frame, 287 notify_info: NotifyInfo { 288 request_url: self.request_url.clone(), 289 remote_addr, 290 }, 291 } 292 } 293 294 /*Subscribe from local channels and then send data to retmote common player or local RTMP relay push client*/ subscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>295 pub async fn subscribe_from_channels( 296 &mut self, 297 app_name: String, 298 stream_name: String, 299 sub_id: Uuid, 300 ) -> Result<(), SessionError> { 301 log::info!( 302 "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}", 303 app_name, 304 stream_name, 305 sub_id 306 ); 307 308 let identifier = StreamIdentifier::Rtmp { 309 app_name, 310 stream_name, 311 }; 312 313 let (event_result_sender, event_result_receiver) = oneshot::channel(); 314 315 let subscribe_event = StreamHubEvent::Subscribe { 316 identifier, 317 info: self.get_subscriber_info(sub_id), 318 result_sender: event_result_sender, 319 }; 320 let rv = self.event_producer.send(subscribe_event); 321 322 if rv.is_err() { 323 return Err(SessionError { 324 value: SessionErrorValue::StreamHubEventSendErr, 325 }); 326 } 327 328 let recv = event_result_receiver.await??; 329 self.data_receiver = recv.frame_receiver.unwrap(); 330 331 Ok(()) 332 } 333 unsubscribe_from_channels( &mut self, app_name: String, stream_name: String, sub_id: Uuid, ) -> Result<(), SessionError>334 pub async fn unsubscribe_from_channels( 335 &mut self, 336 app_name: String, 337 stream_name: String, 338 sub_id: Uuid, 339 ) -> Result<(), SessionError> { 340 let identifier = StreamIdentifier::Rtmp { 341 app_name, 342 stream_name, 343 }; 344 345 let subscribe_event = StreamHubEvent::UnSubscribe { 346 identifier, 347 info: self.get_subscriber_info(sub_id), 348 }; 349 if let Err(err) = self.event_producer.send(subscribe_event) { 350 log::error!("unsubscribe_from_channels err {}", err); 351 } 352 353 Ok(()) 354 } 355 356 /*Begin to receive stream data from remote RTMP push client or local RTMP relay pull client*/ publish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, gop_num: usize, ) -> Result<(), SessionError>357 pub async fn publish_to_channels( 358 &mut self, 359 app_name: String, 360 stream_name: String, 361 pub_id: Uuid, 362 gop_num: usize, 363 ) -> Result<(), SessionError> { 364 self.stream_handler 365 .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num)) 366 .await; 367 368 let (event_result_sender, event_result_receiver) = oneshot::channel(); 369 let publish_event = StreamHubEvent::Publish { 370 identifier: StreamIdentifier::Rtmp { 371 app_name, 372 stream_name, 373 }, 374 info: self.get_publisher_info(pub_id), 375 stream_handler: self.stream_handler.clone(), 376 result_sender: event_result_sender, 377 }; 378 379 if self.event_producer.send(publish_event).is_err() { 380 return Err(SessionError { 381 value: SessionErrorValue::StreamHubEventSendErr, 382 }); 383 } 384 385 let result = event_result_receiver.await??; 386 self.data_sender = result.0.unwrap(); 387 Ok(()) 388 } 389 unpublish_to_channels( &mut self, app_name: String, stream_name: String, pub_id: Uuid, ) -> Result<(), SessionError>390 pub async fn unpublish_to_channels( 391 &mut self, 392 app_name: String, 393 stream_name: String, 394 pub_id: Uuid, 395 ) -> Result<(), SessionError> { 396 log::info!( 397 "unpublish_to_channels, app_name:{}, stream_name:{}", 398 app_name, 399 stream_name 400 ); 401 let unpublish_event = StreamHubEvent::UnPublish { 402 identifier: StreamIdentifier::Rtmp { 403 app_name: app_name.clone(), 404 stream_name: stream_name.clone(), 405 }, 406 info: self.get_publisher_info(pub_id), 407 }; 408 409 match self.event_producer.send(unpublish_event) { 410 Err(_) => { 411 log::error!( 412 "unpublish_to_channels error.app_name: {}, stream_name: {}", 413 app_name, 414 stream_name 415 ); 416 return Err(SessionError { 417 value: SessionErrorValue::StreamHubEventSendErr, 418 }); 419 } 420 _ => { 421 log::info!( 422 "unpublish_to_channels successfully.app_name: {}, stream_name: {}", 423 app_name, 424 stream_name 425 ); 426 } 427 } 428 Ok(()) 429 } 430 } 431 432 #[derive(Default)] 433 pub struct RtmpStreamHandler { 434 /*cache is used to save RTMP sequence/gops/meta data 435 which needs to be send to client(player) */ 436 /*The cache will be used in different threads(save 437 cache in one thread and send cache data to different clients 438 in other threads) */ 439 pub cache: Mutex<Option<Cache>>, 440 } 441 442 impl RtmpStreamHandler { new() -> Self443 pub fn new() -> Self { 444 Self { 445 cache: Mutex::new(None), 446 } 447 } 448 set_cache(&self, cache: Cache)449 pub async fn set_cache(&self, cache: Cache) { 450 *self.cache.lock().await = Some(cache); 451 } 452 save_video_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>453 pub async fn save_video_data( 454 &self, 455 chunk_body: &BytesMut, 456 timestamp: u32, 457 ) -> Result<(), CacheError> { 458 if let Some(cache) = &mut *self.cache.lock().await { 459 cache.save_video_data(chunk_body, timestamp).await?; 460 } 461 Ok(()) 462 } 463 save_audio_data( &self, chunk_body: &BytesMut, timestamp: u32, ) -> Result<(), CacheError>464 pub async fn save_audio_data( 465 &self, 466 chunk_body: &BytesMut, 467 timestamp: u32, 468 ) -> Result<(), CacheError> { 469 if let Some(cache) = &mut *self.cache.lock().await { 470 cache.save_audio_data(chunk_body, timestamp).await?; 471 } 472 Ok(()) 473 } 474 save_metadata(&self, chunk_body: &BytesMut, timestamp: u32)475 pub async fn save_metadata(&self, chunk_body: &BytesMut, timestamp: u32) { 476 if let Some(cache) = &mut *self.cache.lock().await { 477 cache.save_metadata(chunk_body, timestamp); 478 } 479 } 480 } 481 482 #[async_trait] 483 impl TStreamHandler for RtmpStreamHandler { send_prior_data( &self, data_sender: DataSender, sub_type: SubscribeType, ) -> Result<(), ChannelError>484 async fn send_prior_data( 485 &self, 486 data_sender: DataSender, 487 sub_type: SubscribeType, 488 ) -> Result<(), ChannelError> { 489 let sender = match data_sender { 490 DataSender::Frame { sender } => sender, 491 DataSender::Packet { sender: _ } => { 492 return Err(ChannelError { 493 value: ChannelErrorValue::NotCorrectDataSenderType, 494 }); 495 } 496 }; 497 if let Some(cache) = &mut *self.cache.lock().await { 498 if let Some(meta_body_data) = cache.get_metadata() { 499 sender.send(meta_body_data).map_err(|_| ChannelError { 500 value: ChannelErrorValue::SendError, 501 })?; 502 } 503 if let Some(audio_seq_data) = cache.get_audio_seq() { 504 sender.send(audio_seq_data).map_err(|_| ChannelError { 505 value: ChannelErrorValue::SendError, 506 })?; 507 } 508 if let Some(video_seq_data) = cache.get_video_seq() { 509 sender.send(video_seq_data).map_err(|_| ChannelError { 510 value: ChannelErrorValue::SendError, 511 })?; 512 } 513 match sub_type { 514 SubscribeType::PlayerRtmp 515 | SubscribeType::PlayerHttpFlv 516 | SubscribeType::PlayerHls 517 | SubscribeType::GenerateHls => { 518 if let Some(gops_data) = cache.get_gops_data() { 519 for gop in gops_data { 520 for channel_data in gop.get_frame_data() { 521 sender.send(channel_data).map_err(|_| ChannelError { 522 value: ChannelErrorValue::SendError, 523 })?; 524 } 525 } 526 } 527 } 528 _ => {} 529 } 530 } 531 532 Ok(()) 533 } get_statistic_data(&self) -> Option<StreamStatistics>534 async fn get_statistic_data(&self) -> Option<StreamStatistics> { 535 if let Some(cache) = &mut *self.cache.lock().await { 536 return Some(cache.av_statistics.get_avstatistic_data().await); 537 } 538 539 None 540 } 541 send_information(&self, _: InformationSender)542 async fn send_information(&self, _: InformationSender) {} 543 } 544 545 impl fmt::Debug for Common { fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error>546 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { 547 write!(fmt, "S2 {{ member: {:?} }}", self.request_url) 548 } 549 } 550