1 use define::{FrameDataReceiver, PacketDataReceiver, PacketDataSender}; 2 3 use crate::define::PacketData; 4 5 pub mod define; 6 pub mod errors; 7 pub mod notify; 8 pub mod statistics; 9 pub mod stream; 10 pub mod utils; 11 12 use { 13 crate::notify::Notifier, 14 define::{ 15 AvStatisticSender, BroadcastEvent, BroadcastEventReceiver, BroadcastEventSender, 16 DataReceiver, DataSender, FrameData, FrameDataSender, Information, PubSubInfo, 17 StreamHubEvent, StreamHubEventReceiver, StreamHubEventSender, StreamStatisticSizeSender, 18 SubscribeType, SubscriberInfo, TStreamHandler, TransmitterEvent, TransmitterEventReceiver, 19 TransmitterEventSender, 20 }, 21 errors::{ChannelError, ChannelErrorValue}, 22 std::collections::HashMap, 23 std::sync::Arc, 24 stream::StreamIdentifier, 25 tokio::sync::{broadcast, mpsc, mpsc::UnboundedReceiver, Mutex}, 26 utils::Uuid, 27 }; 28 29 //receive data from ChannelsManager and send to players/subscribers 30 pub struct Transmitter { 31 //used for receiving Audio/Video data from publishers 32 data_receiver: DataReceiver, 33 //used for receiving event 34 event_receiver: TransmitterEventReceiver, 35 //used for sending audio/video frame data to players/subscribers 36 id_to_frame_sender: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, 37 //used for sending audio/video packet data to players/subscribers 38 id_to_packet_sender: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, 39 stream_handler: Arc<dyn TStreamHandler>, 40 } 41 42 impl Transmitter { new( data_receiver: DataReceiver, event_receiver: UnboundedReceiver<TransmitterEvent>, h: Arc<dyn TStreamHandler>, ) -> Self43 fn new( 44 data_receiver: DataReceiver, 45 event_receiver: UnboundedReceiver<TransmitterEvent>, 46 h: Arc<dyn TStreamHandler>, 47 ) -> Self { 48 Self { 49 data_receiver, 50 event_receiver, 51 id_to_frame_sender: Arc::new(Mutex::new(HashMap::new())), 52 id_to_packet_sender: Arc::new(Mutex::new(HashMap::new())), 53 stream_handler: h, 54 } 55 } 56 receive_frame_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: FrameDataReceiver, frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, )57 pub async fn receive_frame_data_loop( 58 mut exit: broadcast::Receiver<()>, 59 mut receiver: FrameDataReceiver, 60 frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, 61 ) { 62 tokio::spawn(async move { 63 loop { 64 tokio::select! { 65 data = receiver.recv() => { 66 if let Some(val) = data { 67 match val { 68 FrameData::MetaData { 69 timestamp: _, 70 data: _, 71 } => {} 72 FrameData::Audio { timestamp, data } => { 73 let data = FrameData::Audio { 74 timestamp, 75 data: data.clone(), 76 }; 77 78 for (_, v) in frame_senders.lock().await.iter() { 79 if let Err(audio_err) = v.send(data.clone()).map_err(|_| ChannelError { 80 value: ChannelErrorValue::SendAudioError, 81 }) { 82 log::error!("Transmiter send error: {}", audio_err); 83 } 84 } 85 } 86 FrameData::Video { timestamp, data } => { 87 let data = FrameData::Video { 88 timestamp, 89 data: data.clone(), 90 }; 91 for (_, v) in frame_senders.lock().await.iter() { 92 if let Err(video_err) = v.send(data.clone()).map_err(|_| ChannelError { 93 value: ChannelErrorValue::SendVideoError, 94 }) { 95 log::error!("Transmiter send error: {}", video_err); 96 } 97 } 98 } 99 FrameData::MediaInfo { media_info: _ } => {} 100 } 101 } 102 } 103 _ = exit.recv()=>{ 104 break; 105 } 106 } 107 } 108 }); 109 } 110 receive_packet_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: PacketDataReceiver, packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, )111 pub async fn receive_packet_data_loop( 112 mut exit: broadcast::Receiver<()>, 113 mut receiver: PacketDataReceiver, 114 packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, 115 ) { 116 tokio::spawn(async move { 117 loop { 118 tokio::select! { 119 data = receiver.recv() => { 120 if let Some(val) = data { 121 match val { 122 123 PacketData::Audio { timestamp, data } => { 124 let data = PacketData::Audio { 125 timestamp, 126 data: data.clone(), 127 }; 128 129 for (_, v) in packet_senders.lock().await.iter() { 130 if let Err(audio_err) = v.send(data.clone()).map_err(|_| ChannelError { 131 value: ChannelErrorValue::SendAudioError, 132 }) { 133 log::error!("Transmiter send error: {}", audio_err); 134 } 135 } 136 } 137 PacketData::Video { timestamp, data } => { 138 let data = PacketData::Video { 139 timestamp, 140 data: data.clone(), 141 }; 142 for (_, v) in packet_senders.lock().await.iter() { 143 if let Err(video_err) = v.send(data.clone()).map_err(|_| ChannelError { 144 value: ChannelErrorValue::SendVideoError, 145 }) { 146 log::error!("Transmiter send error: {}", video_err); 147 } 148 } 149 } 150 151 } 152 } 153 } 154 _ = exit.recv()=>{ 155 break; 156 } 157 } 158 } 159 }); 160 } receive_event_loop( stream_handler: Arc<dyn TStreamHandler>, exit: broadcast::Sender<()>, mut receiver: TransmitterEventReceiver, packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, )161 pub async fn receive_event_loop( 162 stream_handler: Arc<dyn TStreamHandler>, 163 exit: broadcast::Sender<()>, 164 mut receiver: TransmitterEventReceiver, 165 packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, 166 frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, 167 ) { 168 tokio::spawn(async move { 169 loop { 170 if let Some(val) = receiver.recv().await { 171 match val { 172 TransmitterEvent::Subscribe { sender, info } => { 173 if let Err(err) = stream_handler 174 .send_prior_data(sender.clone(), info.sub_type) 175 .await 176 { 177 log::error!("receive_event_loop send_prior_data err: {}", err); 178 break; 179 } 180 match sender { 181 DataSender::Frame { 182 sender: frame_sender, 183 } => { 184 frame_senders.lock().await.insert(info.id, frame_sender); 185 } 186 DataSender::Packet { 187 sender: packet_sender, 188 } => { 189 packet_senders.lock().await.insert(info.id, packet_sender); 190 } 191 } 192 } 193 TransmitterEvent::UnSubscribe { info } => match info.sub_type { 194 SubscribeType::PlayerRtp | SubscribeType::PlayerWebrtc => { 195 packet_senders.lock().await.remove(&info.id); 196 } 197 _ => { 198 frame_senders.lock().await.remove(&info.id); 199 } 200 }, 201 TransmitterEvent::UnPublish {} => { 202 if let Err(err) = exit.send(()) { 203 log::error!("TransmitterEvent::UnPublish send error: {}", err); 204 } 205 break; 206 } 207 TransmitterEvent::Api { sender } => { 208 if let Some(avstatistic_data) = 209 stream_handler.get_statistic_data().await 210 { 211 if let Err(err) = sender.send(avstatistic_data) { 212 log::info!("Transmitter send avstatistic data err: {}", err); 213 } 214 } 215 } 216 TransmitterEvent::Request { sender } => { 217 stream_handler.send_information(sender).await; 218 } 219 } 220 } 221 } 222 }); 223 } 224 run(self) -> Result<(), ChannelError>225 pub async fn run(self) -> Result<(), ChannelError> { 226 let (tx, _) = broadcast::channel::<()>(1); 227 228 if let Some(receiver) = self.data_receiver.frame_receiver { 229 Self::receive_frame_data_loop( 230 tx.subscribe(), 231 receiver, 232 self.id_to_frame_sender.clone(), 233 ) 234 .await; 235 } 236 237 if let Some(receiver) = self.data_receiver.packet_receiver { 238 Self::receive_packet_data_loop( 239 tx.subscribe(), 240 receiver, 241 self.id_to_packet_sender.clone(), 242 ) 243 .await; 244 } 245 246 Self::receive_event_loop( 247 self.stream_handler, 248 tx, 249 self.event_receiver, 250 self.id_to_packet_sender, 251 self.id_to_frame_sender, 252 ) 253 .await; 254 255 Ok(()) 256 } 257 } 258 259 pub struct StreamsHub { 260 //app_name to stream_name to producer 261 streams: HashMap<StreamIdentifier, TransmitterEventSender>, 262 //save info to kick off client 263 streams_info: HashMap<Uuid, PubSubInfo>, 264 //event is consumed in Channels, produced from other rtmp sessions 265 hub_event_receiver: StreamHubEventReceiver, 266 //event is produced from other rtmp sessions 267 hub_event_sender: StreamHubEventSender, 268 //client_event_producer: client_event_producer 269 client_event_producer: BroadcastEventSender, 270 //The rtmp static push/pull and the hls transfer is triggered actively, 271 //add a control switches separately. 272 rtmp_push_enabled: bool, 273 rtmp_remuxer_enabled: bool, 274 //enable rtmp pull 275 rtmp_pull_enabled: bool, 276 //enable hls 277 hls_enabled: bool, 278 //http notifier on sub/pub event 279 notifier: Option<Notifier>, 280 } 281 282 impl StreamsHub { new(notifier: Option<Notifier>) -> Self283 pub fn new(notifier: Option<Notifier>) -> Self { 284 let (event_producer, event_consumer) = mpsc::unbounded_channel(); 285 let (client_producer, _) = broadcast::channel(100); 286 287 Self { 288 streams: HashMap::new(), 289 streams_info: HashMap::new(), 290 hub_event_receiver: event_consumer, 291 hub_event_sender: event_producer, 292 client_event_producer: client_producer, 293 rtmp_push_enabled: false, 294 rtmp_pull_enabled: false, 295 rtmp_remuxer_enabled: false, 296 hls_enabled: false, 297 notifier, 298 } 299 } run(&mut self)300 pub async fn run(&mut self) { 301 self.event_loop().await; 302 } 303 set_rtmp_push_enabled(&mut self, enabled: bool)304 pub fn set_rtmp_push_enabled(&mut self, enabled: bool) { 305 self.rtmp_push_enabled = enabled; 306 } 307 set_rtmp_pull_enabled(&mut self, enabled: bool)308 pub fn set_rtmp_pull_enabled(&mut self, enabled: bool) { 309 self.rtmp_pull_enabled = enabled; 310 } 311 set_rtmp_remuxer_enabled(&mut self, enabled: bool)312 pub fn set_rtmp_remuxer_enabled(&mut self, enabled: bool) { 313 self.rtmp_remuxer_enabled = enabled; 314 } 315 set_hls_enabled(&mut self, enabled: bool)316 pub fn set_hls_enabled(&mut self, enabled: bool) { 317 self.hls_enabled = enabled; 318 } 319 get_hub_event_sender(&mut self) -> StreamHubEventSender320 pub fn get_hub_event_sender(&mut self) -> StreamHubEventSender { 321 self.hub_event_sender.clone() 322 } 323 get_client_event_consumer(&mut self) -> BroadcastEventReceiver324 pub fn get_client_event_consumer(&mut self) -> BroadcastEventReceiver { 325 self.client_event_producer.subscribe() 326 } 327 event_loop(&mut self)328 pub async fn event_loop(&mut self) { 329 while let Some(message) = self.hub_event_receiver.recv().await { 330 let event_serialize_str = if let Ok(data) = serde_json::to_string(&message) { 331 log::info!("event data: {}", data); 332 data 333 } else { 334 String::from("empty body") 335 }; 336 337 match message { 338 StreamHubEvent::Publish { 339 identifier, 340 info, 341 result_sender, 342 stream_handler, 343 } => { 344 let (frame_sender, packet_sender, receiver) = match info.pub_data_type { 345 define::PubDataType::Frame => { 346 let (sender_chan, receiver_chan) = mpsc::unbounded_channel(); 347 ( 348 Some(sender_chan), 349 None, 350 DataReceiver { 351 frame_receiver: Some(receiver_chan), 352 packet_receiver: None, 353 }, 354 ) 355 } 356 define::PubDataType::Packet => { 357 let (sender_chan, receiver_chan) = mpsc::unbounded_channel(); 358 ( 359 None, 360 Some(sender_chan), 361 DataReceiver { 362 frame_receiver: None, 363 packet_receiver: Some(receiver_chan), 364 }, 365 ) 366 } 367 define::PubDataType::Both => { 368 let (sender_frame_chan, receiver_frame_chan) = 369 mpsc::unbounded_channel(); 370 let (sender_packet_chan, receiver_packet_chan) = 371 mpsc::unbounded_channel(); 372 373 ( 374 Some(sender_frame_chan), 375 Some(sender_packet_chan), 376 DataReceiver { 377 frame_receiver: Some(receiver_frame_chan), 378 packet_receiver: Some(receiver_packet_chan), 379 }, 380 ) 381 } 382 }; 383 384 let result = match self 385 .publish(identifier.clone(), receiver, stream_handler) 386 .await 387 { 388 Ok(()) => { 389 if let Some(notifier) = &self.notifier { 390 notifier.on_publish_notify(event_serialize_str).await; 391 } 392 self.streams_info 393 .insert(info.id, PubSubInfo::Publish { identifier }); 394 395 Ok((frame_sender, packet_sender)) 396 } 397 Err(err) => { 398 log::error!("event_loop Publish err: {}", err); 399 Err(err) 400 } 401 }; 402 403 if result_sender.send(result).is_err() { 404 log::error!("event_loop Subscribe error: The receiver dropped.") 405 } 406 } 407 408 StreamHubEvent::UnPublish { 409 identifier, 410 info: _, 411 } => { 412 if let Err(err) = self.unpublish(&identifier) { 413 log::error!( 414 "event_loop Unpublish err: {} with identifier: {}", 415 err, 416 identifier 417 ); 418 } 419 420 if let Some(notifier) = &self.notifier { 421 notifier.on_unpublish_notify(event_serialize_str).await; 422 } 423 } 424 StreamHubEvent::Subscribe { 425 identifier, 426 info, 427 result_sender, 428 } => { 429 let sub_id = info.id; 430 let info_clone = info.clone(); 431 432 //new chan for Frame/Packet sender and receiver 433 let (sender, receiver) = match info.sub_data_type { 434 define::SubDataType::Frame => { 435 let (sender_chan, receiver_chan) = mpsc::unbounded_channel(); 436 ( 437 DataSender::Frame { 438 sender: sender_chan, 439 }, 440 DataReceiver { 441 frame_receiver: Some(receiver_chan), 442 packet_receiver: None, 443 }, 444 ) 445 } 446 define::SubDataType::Packet => { 447 let (sender_chan, receiver_chan) = mpsc::unbounded_channel(); 448 ( 449 DataSender::Packet { 450 sender: sender_chan, 451 }, 452 DataReceiver { 453 frame_receiver: None, 454 packet_receiver: Some(receiver_chan), 455 }, 456 ) 457 } 458 }; 459 460 let rv = match self.subscribe(&identifier, info_clone, sender).await { 461 Ok(()) => { 462 if let Some(notifier) = &self.notifier { 463 notifier.on_play_notify(event_serialize_str).await; 464 } 465 466 self.streams_info.insert( 467 sub_id, 468 PubSubInfo::Subscribe { 469 identifier, 470 sub_info: info, 471 }, 472 ); 473 Ok(receiver) 474 } 475 Err(err) => { 476 log::error!("event_loop Subscribe error: {}", err); 477 Err(err) 478 } 479 }; 480 481 if result_sender.send(rv).is_err() { 482 log::error!("event_loop Subscribe error: The receiver dropped.") 483 } 484 } 485 StreamHubEvent::UnSubscribe { identifier, info } => { 486 if self.unsubscribe(&identifier, info).is_ok() { 487 if let Some(notifier) = &self.notifier { 488 notifier.on_stop_notify(event_serialize_str).await; 489 } 490 } 491 } 492 493 StreamHubEvent::ApiStatistic { 494 data_sender, 495 size_sender, 496 } => { 497 if let Err(err) = self.api_statistic(data_sender, size_sender) { 498 log::error!("event_loop api error: {}", err); 499 } 500 } 501 StreamHubEvent::ApiKickClient { id } => { 502 self.api_kick_off_client(id); 503 504 if let Some(notifier) = &self.notifier { 505 notifier.on_unpublish_notify(event_serialize_str).await; 506 } 507 } 508 StreamHubEvent::Request { identifier, sender } => { 509 if let Err(err) = self.request(&identifier, sender) { 510 log::error!("event_loop request error: {}", err); 511 } 512 } 513 } 514 } 515 } 516 request( &mut self, identifier: &StreamIdentifier, sender: mpsc::UnboundedSender<Information>, ) -> Result<(), ChannelError>517 fn request( 518 &mut self, 519 identifier: &StreamIdentifier, 520 sender: mpsc::UnboundedSender<Information>, 521 ) -> Result<(), ChannelError> { 522 if let Some(producer) = self.streams.get_mut(identifier) { 523 let event = TransmitterEvent::Request { sender }; 524 log::info!("Request: stream identifier: {}", identifier); 525 producer.send(event).map_err(|_| ChannelError { 526 value: ChannelErrorValue::SendError, 527 })?; 528 } 529 Ok(()) 530 } 531 api_statistic( &mut self, data_sender: AvStatisticSender, size_sender: StreamStatisticSizeSender, ) -> Result<(), ChannelError>532 fn api_statistic( 533 &mut self, 534 data_sender: AvStatisticSender, 535 size_sender: StreamStatisticSizeSender, 536 ) -> Result<(), ChannelError> { 537 let mut stream_count: usize = 0; 538 for v in self.streams.values() { 539 stream_count += 1; 540 if let Err(err) = v.send(TransmitterEvent::Api { 541 sender: data_sender.clone(), 542 }) { 543 log::error!("TransmitterEvent api send data err: {}", err); 544 return Err(ChannelError { 545 value: ChannelErrorValue::SendError, 546 }); 547 } 548 } 549 550 if let Err(err) = size_sender.send(stream_count) { 551 log::error!("TransmitterEvent api send size err: {}", err); 552 return Err(ChannelError { 553 value: ChannelErrorValue::SendError, 554 }); 555 } 556 557 Ok(()) 558 } 559 api_kick_off_client(&mut self, uid: Uuid)560 fn api_kick_off_client(&mut self, uid: Uuid) { 561 let info = if let Some(info) = self.streams_info.get(&uid) { 562 info.clone() 563 } else { 564 return; 565 }; 566 567 match info { 568 PubSubInfo::Publish { identifier } => { 569 if let Err(err) = self.unpublish(&identifier) { 570 log::error!( 571 "event_loop ApiKickClient pub err: {} with identifier: {}", 572 err, 573 identifier 574 ); 575 } 576 } 577 PubSubInfo::Subscribe { 578 identifier, 579 sub_info, 580 } => { 581 if let Err(err) = self.unsubscribe(&identifier, sub_info) { 582 log::error!( 583 "event_loop ApiKickClient pub err: {} with identifier: {}", 584 err, 585 identifier 586 ); 587 } 588 } 589 } 590 } 591 592 //player subscribe a stream subscribe( &mut self, identifer: &StreamIdentifier, sub_info: SubscriberInfo, sender: DataSender, ) -> Result<(), ChannelError>593 pub async fn subscribe( 594 &mut self, 595 identifer: &StreamIdentifier, 596 sub_info: SubscriberInfo, 597 sender: DataSender, 598 ) -> Result<(), ChannelError> { 599 if let Some(producer) = self.streams.get_mut(identifer) { 600 let event = TransmitterEvent::Subscribe { 601 sender, 602 info: sub_info, 603 }; 604 log::info!("subscribe: stream identifier: {}", identifer); 605 producer.send(event).map_err(|_| ChannelError { 606 value: ChannelErrorValue::SendError, 607 })?; 608 609 return Ok(()); 610 } 611 612 if self.rtmp_pull_enabled { 613 log::info!("subscribe: try to pull stream, identifier: {}", identifer); 614 615 let client_event = BroadcastEvent::Subscribe { 616 identifier: identifer.clone(), 617 }; 618 619 //send subscribe info to pull clients 620 self.client_event_producer 621 .send(client_event) 622 .map_err(|_| ChannelError { 623 value: ChannelErrorValue::SendError, 624 })?; 625 } 626 627 Err(ChannelError { 628 value: ChannelErrorValue::NoAppOrStreamName, 629 }) 630 } 631 unsubscribe( &mut self, identifer: &StreamIdentifier, sub_info: SubscriberInfo, ) -> Result<(), ChannelError>632 pub fn unsubscribe( 633 &mut self, 634 identifer: &StreamIdentifier, 635 sub_info: SubscriberInfo, 636 ) -> Result<(), ChannelError> { 637 match self.streams.get_mut(identifer) { 638 Some(producer) => { 639 log::info!("unsubscribe....:{}", identifer); 640 let event = TransmitterEvent::UnSubscribe { info: sub_info }; 641 producer.send(event).map_err(|_| ChannelError { 642 value: ChannelErrorValue::SendError, 643 })?; 644 } 645 None => { 646 return Err(ChannelError { 647 value: ChannelErrorValue::NoAppName, 648 }) 649 } 650 } 651 652 Ok(()) 653 } 654 655 //publish a stream publish( &mut self, identifier: StreamIdentifier, receiver: DataReceiver, handler: Arc<dyn TStreamHandler>, ) -> Result<(), ChannelError>656 pub async fn publish( 657 &mut self, 658 identifier: StreamIdentifier, 659 receiver: DataReceiver, 660 handler: Arc<dyn TStreamHandler>, 661 ) -> Result<(), ChannelError> { 662 if self.streams.get(&identifier).is_some() { 663 return Err(ChannelError { 664 value: ChannelErrorValue::Exists, 665 }); 666 } 667 668 let (event_publisher, event_consumer) = mpsc::unbounded_channel(); 669 let transmitter = Transmitter::new(receiver, event_consumer, handler); 670 671 let identifier_clone = identifier.clone(); 672 673 if let Err(err) = transmitter.run().await { 674 log::error!( 675 "transmiter run error, idetifier: {}, error: {}", 676 identifier_clone, 677 err, 678 ); 679 } else { 680 log::info!("transmiter exits: idetifier: {}", identifier_clone); 681 } 682 683 self.streams.insert(identifier.clone(), event_publisher); 684 685 if self.rtmp_push_enabled || self.hls_enabled || self.rtmp_remuxer_enabled { 686 let client_event = BroadcastEvent::Publish { identifier }; 687 688 //send publish info to push clients 689 self.client_event_producer 690 .send(client_event) 691 .map_err(|_| ChannelError { 692 value: ChannelErrorValue::SendError, 693 })?; 694 } 695 696 Ok(()) 697 } 698 unpublish(&mut self, identifier: &StreamIdentifier) -> Result<(), ChannelError>699 fn unpublish(&mut self, identifier: &StreamIdentifier) -> Result<(), ChannelError> { 700 match self.streams.get_mut(identifier) { 701 Some(producer) => { 702 let event = TransmitterEvent::UnPublish {}; 703 producer.send(event).map_err(|_| ChannelError { 704 value: ChannelErrorValue::SendError, 705 })?; 706 self.streams.remove(identifier); 707 log::info!("unpublish remove stream, stream identifier: {}", identifier); 708 } 709 None => { 710 return Err(ChannelError { 711 value: ChannelErrorValue::NoAppName, 712 }) 713 } 714 } 715 716 Ok(()) 717 } 718 } 719