xref: /xiu/library/streamhub/src/lib.rs (revision a4ef5d6c)
1 use define::{FrameDataReceiver, PacketDataReceiver, PacketDataSender};
2 
3 use crate::define::PacketData;
4 
5 pub mod define;
6 pub mod errors;
7 pub mod notify;
8 pub mod statistics;
9 pub mod stream;
10 pub mod utils;
11 
12 use {
13     crate::notify::Notifier,
14     define::{
15         AvStatisticSender, BroadcastEvent, BroadcastEventReceiver, BroadcastEventSender,
16         DataReceiver, DataSender, FrameData, FrameDataSender, Information, PubSubInfo,
17         StreamHubEvent, StreamHubEventReceiver, StreamHubEventSender, StreamStatisticSizeSender,
18         SubscribeType, SubscriberInfo, TStreamHandler, TransmitterEvent, TransmitterEventReceiver,
19         TransmitterEventSender,
20     },
21     errors::{ChannelError, ChannelErrorValue},
22     std::collections::HashMap,
23     std::sync::Arc,
24     stream::StreamIdentifier,
25     tokio::sync::{broadcast, mpsc, mpsc::UnboundedReceiver, Mutex},
26     utils::Uuid,
27 };
28 
29 //receive data from ChannelsManager and send to players/subscribers
30 pub struct Transmitter {
31     //used for receiving Audio/Video data from publishers
32     data_receiver: DataReceiver,
33     //used for receiving event
34     event_receiver: TransmitterEventReceiver,
35     //used for sending audio/video frame data to players/subscribers
36     id_to_frame_sender: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>,
37     //used for sending audio/video packet data to players/subscribers
38     id_to_packet_sender: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>,
39     stream_handler: Arc<dyn TStreamHandler>,
40 }
41 
42 impl Transmitter {
new( data_receiver: DataReceiver, event_receiver: UnboundedReceiver<TransmitterEvent>, h: Arc<dyn TStreamHandler>, ) -> Self43     fn new(
44         data_receiver: DataReceiver,
45         event_receiver: UnboundedReceiver<TransmitterEvent>,
46         h: Arc<dyn TStreamHandler>,
47     ) -> Self {
48         Self {
49             data_receiver,
50             event_receiver,
51             id_to_frame_sender: Arc::new(Mutex::new(HashMap::new())),
52             id_to_packet_sender: Arc::new(Mutex::new(HashMap::new())),
53             stream_handler: h,
54         }
55     }
56 
receive_frame_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: FrameDataReceiver, frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, )57     pub async fn receive_frame_data_loop(
58         mut exit: broadcast::Receiver<()>,
59         mut receiver: FrameDataReceiver,
60         frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>,
61     ) {
62         tokio::spawn(async move {
63             loop {
64                 tokio::select! {
65                     data = receiver.recv() => {
66                         if let Some(val) = data {
67                             match val {
68                                 FrameData::MetaData {
69                                     timestamp: _,
70                                     data: _,
71                                 } => {}
72                                 FrameData::Audio { timestamp, data } => {
73                                     let data = FrameData::Audio {
74                                         timestamp,
75                                         data: data.clone(),
76                                     };
77 
78                                     for (_, v) in frame_senders.lock().await.iter() {
79                                         if let Err(audio_err) = v.send(data.clone()).map_err(|_| ChannelError {
80                                             value: ChannelErrorValue::SendAudioError,
81                                         }) {
82                                             log::error!("Transmiter send error: {}", audio_err);
83                                         }
84                                     }
85                                 }
86                                 FrameData::Video { timestamp, data } => {
87                                     let data = FrameData::Video {
88                                         timestamp,
89                                         data: data.clone(),
90                                     };
91                                     for (_, v) in frame_senders.lock().await.iter() {
92                                         if let Err(video_err) = v.send(data.clone()).map_err(|_| ChannelError {
93                                             value: ChannelErrorValue::SendVideoError,
94                                         }) {
95                                             log::error!("Transmiter send error: {}", video_err);
96                                         }
97                                     }
98                                 }
99                                 FrameData::MediaInfo { media_info: _ } => {}
100                             }
101                         }
102                     }
103                     _ = exit.recv()=>{
104                         break;
105                     }
106                 }
107             }
108         });
109     }
110 
receive_packet_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: PacketDataReceiver, packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, )111     pub async fn receive_packet_data_loop(
112         mut exit: broadcast::Receiver<()>,
113         mut receiver: PacketDataReceiver,
114         packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>,
115     ) {
116         tokio::spawn(async move {
117             loop {
118                 tokio::select! {
119                     data = receiver.recv() => {
120                         if let Some(val) = data {
121                             match val {
122 
123                                 PacketData::Audio { timestamp, data } => {
124                                     let data = PacketData::Audio {
125                                         timestamp,
126                                         data: data.clone(),
127                                     };
128 
129                                     for (_, v) in packet_senders.lock().await.iter() {
130                                         if let Err(audio_err) = v.send(data.clone()).map_err(|_| ChannelError {
131                                             value: ChannelErrorValue::SendAudioError,
132                                         }) {
133                                             log::error!("Transmiter send error: {}", audio_err);
134                                         }
135                                     }
136                                 }
137                                 PacketData::Video { timestamp, data } => {
138                                     let data = PacketData::Video {
139                                         timestamp,
140                                         data: data.clone(),
141                                     };
142                                     for (_, v) in packet_senders.lock().await.iter() {
143                                         if let Err(video_err) = v.send(data.clone()).map_err(|_| ChannelError {
144                                             value: ChannelErrorValue::SendVideoError,
145                                         }) {
146                                             log::error!("Transmiter send error: {}", video_err);
147                                         }
148                                     }
149                                 }
150 
151                             }
152                         }
153                     }
154                     _ = exit.recv()=>{
155                         break;
156                     }
157                 }
158             }
159         });
160     }
receive_event_loop( stream_handler: Arc<dyn TStreamHandler>, exit: broadcast::Sender<()>, mut receiver: TransmitterEventReceiver, packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>, frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>, )161     pub async fn receive_event_loop(
162         stream_handler: Arc<dyn TStreamHandler>,
163         exit: broadcast::Sender<()>,
164         mut receiver: TransmitterEventReceiver,
165         packet_senders: Arc<Mutex<HashMap<Uuid, PacketDataSender>>>,
166         frame_senders: Arc<Mutex<HashMap<Uuid, FrameDataSender>>>,
167     ) {
168         tokio::spawn(async move {
169             loop {
170                 if let Some(val) = receiver.recv().await {
171                     match val {
172                         TransmitterEvent::Subscribe { sender, info } => {
173                             if let Err(err) = stream_handler
174                                 .send_prior_data(sender.clone(), info.sub_type)
175                                 .await
176                             {
177                                 log::error!("receive_event_loop send_prior_data err: {}", err);
178                                 break;
179                             }
180                             match sender {
181                                 DataSender::Frame {
182                                     sender: frame_sender,
183                                 } => {
184                                     frame_senders.lock().await.insert(info.id, frame_sender);
185                                 }
186                                 DataSender::Packet {
187                                     sender: packet_sender,
188                                 } => {
189                                     packet_senders.lock().await.insert(info.id, packet_sender);
190                                 }
191                             }
192                         }
193                         TransmitterEvent::UnSubscribe { info } => match info.sub_type {
194                             SubscribeType::PlayerRtp | SubscribeType::PlayerWebrtc => {
195                                 packet_senders.lock().await.remove(&info.id);
196                             }
197                             _ => {
198                                 frame_senders.lock().await.remove(&info.id);
199                             }
200                         },
201                         TransmitterEvent::UnPublish {} => {
202                             if let Err(err) = exit.send(()) {
203                                 log::error!("TransmitterEvent::UnPublish send error: {}", err);
204                             }
205                             break;
206                         }
207                         TransmitterEvent::Api { sender } => {
208                             if let Some(avstatistic_data) =
209                                 stream_handler.get_statistic_data().await
210                             {
211                                 if let Err(err) = sender.send(avstatistic_data) {
212                                     log::info!("Transmitter send avstatistic data err: {}", err);
213                                 }
214                             }
215                         }
216                         TransmitterEvent::Request { sender } => {
217                             stream_handler.send_information(sender).await;
218                         }
219                     }
220                 }
221             }
222         });
223     }
224 
run(self) -> Result<(), ChannelError>225     pub async fn run(self) -> Result<(), ChannelError> {
226         let (tx, _) = broadcast::channel::<()>(1);
227 
228         if let Some(receiver) = self.data_receiver.frame_receiver {
229             Self::receive_frame_data_loop(
230                 tx.subscribe(),
231                 receiver,
232                 self.id_to_frame_sender.clone(),
233             )
234             .await;
235         }
236 
237         if let Some(receiver) = self.data_receiver.packet_receiver {
238             Self::receive_packet_data_loop(
239                 tx.subscribe(),
240                 receiver,
241                 self.id_to_packet_sender.clone(),
242             )
243             .await;
244         }
245 
246         Self::receive_event_loop(
247             self.stream_handler,
248             tx,
249             self.event_receiver,
250             self.id_to_packet_sender,
251             self.id_to_frame_sender,
252         )
253         .await;
254 
255         Ok(())
256     }
257 }
258 
259 pub struct StreamsHub {
260     //app_name to stream_name to producer
261     streams: HashMap<StreamIdentifier, TransmitterEventSender>,
262     //save info to kick off client
263     streams_info: HashMap<Uuid, PubSubInfo>,
264     //event is consumed in Channels, produced from other rtmp sessions
265     hub_event_receiver: StreamHubEventReceiver,
266     //event is produced from other rtmp sessions
267     hub_event_sender: StreamHubEventSender,
268     //client_event_producer: client_event_producer
269     client_event_producer: BroadcastEventSender,
270     //The rtmp static push/pull and the hls transfer is triggered actively,
271     //add a control switches separately.
272     rtmp_push_enabled: bool,
273     rtmp_remuxer_enabled: bool,
274     //enable rtmp pull
275     rtmp_pull_enabled: bool,
276     //enable hls
277     hls_enabled: bool,
278     //http notifier on sub/pub event
279     notifier: Option<Notifier>,
280 }
281 
282 impl StreamsHub {
new(notifier: Option<Notifier>) -> Self283     pub fn new(notifier: Option<Notifier>) -> Self {
284         let (event_producer, event_consumer) = mpsc::unbounded_channel();
285         let (client_producer, _) = broadcast::channel(100);
286 
287         Self {
288             streams: HashMap::new(),
289             streams_info: HashMap::new(),
290             hub_event_receiver: event_consumer,
291             hub_event_sender: event_producer,
292             client_event_producer: client_producer,
293             rtmp_push_enabled: false,
294             rtmp_pull_enabled: false,
295             rtmp_remuxer_enabled: false,
296             hls_enabled: false,
297             notifier,
298         }
299     }
run(&mut self)300     pub async fn run(&mut self) {
301         self.event_loop().await;
302     }
303 
set_rtmp_push_enabled(&mut self, enabled: bool)304     pub fn set_rtmp_push_enabled(&mut self, enabled: bool) {
305         self.rtmp_push_enabled = enabled;
306     }
307 
set_rtmp_pull_enabled(&mut self, enabled: bool)308     pub fn set_rtmp_pull_enabled(&mut self, enabled: bool) {
309         self.rtmp_pull_enabled = enabled;
310     }
311 
set_rtmp_remuxer_enabled(&mut self, enabled: bool)312     pub fn set_rtmp_remuxer_enabled(&mut self, enabled: bool) {
313         self.rtmp_remuxer_enabled = enabled;
314     }
315 
set_hls_enabled(&mut self, enabled: bool)316     pub fn set_hls_enabled(&mut self, enabled: bool) {
317         self.hls_enabled = enabled;
318     }
319 
get_hub_event_sender(&mut self) -> StreamHubEventSender320     pub fn get_hub_event_sender(&mut self) -> StreamHubEventSender {
321         self.hub_event_sender.clone()
322     }
323 
get_client_event_consumer(&mut self) -> BroadcastEventReceiver324     pub fn get_client_event_consumer(&mut self) -> BroadcastEventReceiver {
325         self.client_event_producer.subscribe()
326     }
327 
event_loop(&mut self)328     pub async fn event_loop(&mut self) {
329         while let Some(message) = self.hub_event_receiver.recv().await {
330             let event_serialize_str = if let Ok(data) = serde_json::to_string(&message) {
331                 log::info!("event data: {}", data);
332                 data
333             } else {
334                 String::from("empty body")
335             };
336 
337             match message {
338                 StreamHubEvent::Publish {
339                     identifier,
340                     info,
341                     result_sender,
342                     stream_handler,
343                 } => {
344                     let (frame_sender, packet_sender, receiver) = match info.pub_data_type {
345                         define::PubDataType::Frame => {
346                             let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
347                             (
348                                 Some(sender_chan),
349                                 None,
350                                 DataReceiver {
351                                     frame_receiver: Some(receiver_chan),
352                                     packet_receiver: None,
353                                 },
354                             )
355                         }
356                         define::PubDataType::Packet => {
357                             let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
358                             (
359                                 None,
360                                 Some(sender_chan),
361                                 DataReceiver {
362                                     frame_receiver: None,
363                                     packet_receiver: Some(receiver_chan),
364                                 },
365                             )
366                         }
367                         define::PubDataType::Both => {
368                             let (sender_frame_chan, receiver_frame_chan) =
369                                 mpsc::unbounded_channel();
370                             let (sender_packet_chan, receiver_packet_chan) =
371                                 mpsc::unbounded_channel();
372 
373                             (
374                                 Some(sender_frame_chan),
375                                 Some(sender_packet_chan),
376                                 DataReceiver {
377                                     frame_receiver: Some(receiver_frame_chan),
378                                     packet_receiver: Some(receiver_packet_chan),
379                                 },
380                             )
381                         }
382                     };
383 
384                     let result = match self
385                         .publish(identifier.clone(), receiver, stream_handler)
386                         .await
387                     {
388                         Ok(()) => {
389                             if let Some(notifier) = &self.notifier {
390                                 notifier.on_publish_notify(event_serialize_str).await;
391                             }
392                             self.streams_info
393                                 .insert(info.id, PubSubInfo::Publish { identifier });
394 
395                             Ok((frame_sender, packet_sender))
396                         }
397                         Err(err) => {
398                             log::error!("event_loop Publish err: {}", err);
399                             Err(err)
400                         }
401                     };
402 
403                     if result_sender.send(result).is_err() {
404                         log::error!("event_loop Subscribe error: The receiver dropped.")
405                     }
406                 }
407 
408                 StreamHubEvent::UnPublish {
409                     identifier,
410                     info: _,
411                 } => {
412                     if let Err(err) = self.unpublish(&identifier) {
413                         log::error!(
414                             "event_loop Unpublish err: {} with identifier: {}",
415                             err,
416                             identifier
417                         );
418                     }
419 
420                     if let Some(notifier) = &self.notifier {
421                         notifier.on_unpublish_notify(event_serialize_str).await;
422                     }
423                 }
424                 StreamHubEvent::Subscribe {
425                     identifier,
426                     info,
427                     result_sender,
428                 } => {
429                     let sub_id = info.id;
430                     let info_clone = info.clone();
431 
432                     //new chan for Frame/Packet sender and receiver
433                     let (sender, receiver) = match info.sub_data_type {
434                         define::SubDataType::Frame => {
435                             let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
436                             (
437                                 DataSender::Frame {
438                                     sender: sender_chan,
439                                 },
440                                 DataReceiver {
441                                     frame_receiver: Some(receiver_chan),
442                                     packet_receiver: None,
443                                 },
444                             )
445                         }
446                         define::SubDataType::Packet => {
447                             let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
448                             (
449                                 DataSender::Packet {
450                                     sender: sender_chan,
451                                 },
452                                 DataReceiver {
453                                     frame_receiver: None,
454                                     packet_receiver: Some(receiver_chan),
455                                 },
456                             )
457                         }
458                     };
459 
460                     let rv = match self.subscribe(&identifier, info_clone, sender).await {
461                         Ok(()) => {
462                             if let Some(notifier) = &self.notifier {
463                                 notifier.on_play_notify(event_serialize_str).await;
464                             }
465 
466                             self.streams_info.insert(
467                                 sub_id,
468                                 PubSubInfo::Subscribe {
469                                     identifier,
470                                     sub_info: info,
471                                 },
472                             );
473                             Ok(receiver)
474                         }
475                         Err(err) => {
476                             log::error!("event_loop Subscribe error: {}", err);
477                             Err(err)
478                         }
479                     };
480 
481                     if result_sender.send(rv).is_err() {
482                         log::error!("event_loop Subscribe error: The receiver dropped.")
483                     }
484                 }
485                 StreamHubEvent::UnSubscribe { identifier, info } => {
486                     if self.unsubscribe(&identifier, info).is_ok() {
487                         if let Some(notifier) = &self.notifier {
488                             notifier.on_stop_notify(event_serialize_str).await;
489                         }
490                     }
491                 }
492 
493                 StreamHubEvent::ApiStatistic {
494                     data_sender,
495                     size_sender,
496                 } => {
497                     if let Err(err) = self.api_statistic(data_sender, size_sender) {
498                         log::error!("event_loop api error: {}", err);
499                     }
500                 }
501                 StreamHubEvent::ApiKickClient { id } => {
502                     self.api_kick_off_client(id);
503 
504                     if let Some(notifier) = &self.notifier {
505                         notifier.on_unpublish_notify(event_serialize_str).await;
506                     }
507                 }
508                 StreamHubEvent::Request { identifier, sender } => {
509                     if let Err(err) = self.request(&identifier, sender) {
510                         log::error!("event_loop request error: {}", err);
511                     }
512                 }
513             }
514         }
515     }
516 
request( &mut self, identifier: &StreamIdentifier, sender: mpsc::UnboundedSender<Information>, ) -> Result<(), ChannelError>517     fn request(
518         &mut self,
519         identifier: &StreamIdentifier,
520         sender: mpsc::UnboundedSender<Information>,
521     ) -> Result<(), ChannelError> {
522         if let Some(producer) = self.streams.get_mut(identifier) {
523             let event = TransmitterEvent::Request { sender };
524             log::info!("Request:  stream identifier: {}", identifier);
525             producer.send(event).map_err(|_| ChannelError {
526                 value: ChannelErrorValue::SendError,
527             })?;
528         }
529         Ok(())
530     }
531 
api_statistic( &mut self, data_sender: AvStatisticSender, size_sender: StreamStatisticSizeSender, ) -> Result<(), ChannelError>532     fn api_statistic(
533         &mut self,
534         data_sender: AvStatisticSender,
535         size_sender: StreamStatisticSizeSender,
536     ) -> Result<(), ChannelError> {
537         let mut stream_count: usize = 0;
538         for v in self.streams.values() {
539             stream_count += 1;
540             if let Err(err) = v.send(TransmitterEvent::Api {
541                 sender: data_sender.clone(),
542             }) {
543                 log::error!("TransmitterEvent  api send data err: {}", err);
544                 return Err(ChannelError {
545                     value: ChannelErrorValue::SendError,
546                 });
547             }
548         }
549 
550         if let Err(err) = size_sender.send(stream_count) {
551             log::error!("TransmitterEvent api send size err: {}", err);
552             return Err(ChannelError {
553                 value: ChannelErrorValue::SendError,
554             });
555         }
556 
557         Ok(())
558     }
559 
api_kick_off_client(&mut self, uid: Uuid)560     fn api_kick_off_client(&mut self, uid: Uuid) {
561         let info = if let Some(info) = self.streams_info.get(&uid) {
562             info.clone()
563         } else {
564             return;
565         };
566 
567         match info {
568             PubSubInfo::Publish { identifier } => {
569                 if let Err(err) = self.unpublish(&identifier) {
570                     log::error!(
571                         "event_loop ApiKickClient pub err: {} with identifier: {}",
572                         err,
573                         identifier
574                     );
575                 }
576             }
577             PubSubInfo::Subscribe {
578                 identifier,
579                 sub_info,
580             } => {
581                 if let Err(err) = self.unsubscribe(&identifier, sub_info) {
582                     log::error!(
583                         "event_loop ApiKickClient pub err: {} with identifier: {}",
584                         err,
585                         identifier
586                     );
587                 }
588             }
589         }
590     }
591 
592     //player subscribe a stream
subscribe( &mut self, identifer: &StreamIdentifier, sub_info: SubscriberInfo, sender: DataSender, ) -> Result<(), ChannelError>593     pub async fn subscribe(
594         &mut self,
595         identifer: &StreamIdentifier,
596         sub_info: SubscriberInfo,
597         sender: DataSender,
598     ) -> Result<(), ChannelError> {
599         if let Some(producer) = self.streams.get_mut(identifer) {
600             let event = TransmitterEvent::Subscribe {
601                 sender,
602                 info: sub_info,
603             };
604             log::info!("subscribe:  stream identifier: {}", identifer);
605             producer.send(event).map_err(|_| ChannelError {
606                 value: ChannelErrorValue::SendError,
607             })?;
608 
609             return Ok(());
610         }
611 
612         if self.rtmp_pull_enabled {
613             log::info!("subscribe: try to pull stream, identifier: {}", identifer);
614 
615             let client_event = BroadcastEvent::Subscribe {
616                 identifier: identifer.clone(),
617             };
618 
619             //send subscribe info to pull clients
620             self.client_event_producer
621                 .send(client_event)
622                 .map_err(|_| ChannelError {
623                     value: ChannelErrorValue::SendError,
624                 })?;
625         }
626 
627         Err(ChannelError {
628             value: ChannelErrorValue::NoAppOrStreamName,
629         })
630     }
631 
unsubscribe( &mut self, identifer: &StreamIdentifier, sub_info: SubscriberInfo, ) -> Result<(), ChannelError>632     pub fn unsubscribe(
633         &mut self,
634         identifer: &StreamIdentifier,
635         sub_info: SubscriberInfo,
636     ) -> Result<(), ChannelError> {
637         match self.streams.get_mut(identifer) {
638             Some(producer) => {
639                 log::info!("unsubscribe....:{}", identifer);
640                 let event = TransmitterEvent::UnSubscribe { info: sub_info };
641                 producer.send(event).map_err(|_| ChannelError {
642                     value: ChannelErrorValue::SendError,
643                 })?;
644             }
645             None => {
646                 return Err(ChannelError {
647                     value: ChannelErrorValue::NoAppName,
648                 })
649             }
650         }
651 
652         Ok(())
653     }
654 
655     //publish a stream
publish( &mut self, identifier: StreamIdentifier, receiver: DataReceiver, handler: Arc<dyn TStreamHandler>, ) -> Result<(), ChannelError>656     pub async fn publish(
657         &mut self,
658         identifier: StreamIdentifier,
659         receiver: DataReceiver,
660         handler: Arc<dyn TStreamHandler>,
661     ) -> Result<(), ChannelError> {
662         if self.streams.get(&identifier).is_some() {
663             return Err(ChannelError {
664                 value: ChannelErrorValue::Exists,
665             });
666         }
667 
668         let (event_publisher, event_consumer) = mpsc::unbounded_channel();
669         let transmitter = Transmitter::new(receiver, event_consumer, handler);
670 
671         let identifier_clone = identifier.clone();
672 
673         if let Err(err) = transmitter.run().await {
674             log::error!(
675                 "transmiter run error, idetifier: {}, error: {}",
676                 identifier_clone,
677                 err,
678             );
679         } else {
680             log::info!("transmiter exits: idetifier: {}", identifier_clone);
681         }
682 
683         self.streams.insert(identifier.clone(), event_publisher);
684 
685         if self.rtmp_push_enabled || self.hls_enabled || self.rtmp_remuxer_enabled {
686             let client_event = BroadcastEvent::Publish { identifier };
687 
688             //send publish info to push clients
689             self.client_event_producer
690                 .send(client_event)
691                 .map_err(|_| ChannelError {
692                     value: ChannelErrorValue::SendError,
693                 })?;
694         }
695 
696         Ok(())
697     }
698 
unpublish(&mut self, identifier: &StreamIdentifier) -> Result<(), ChannelError>699     fn unpublish(&mut self, identifier: &StreamIdentifier) -> Result<(), ChannelError> {
700         match self.streams.get_mut(identifier) {
701             Some(producer) => {
702                 let event = TransmitterEvent::UnPublish {};
703                 producer.send(event).map_err(|_| ChannelError {
704                     value: ChannelErrorValue::SendError,
705                 })?;
706                 self.streams.remove(identifier);
707                 log::info!("unpublish remove stream, stream identifier: {}", identifier);
708             }
709             None => {
710                 return Err(ChannelError {
711                     value: ChannelErrorValue::NoAppName,
712                 })
713             }
714         }
715 
716         Ok(())
717     }
718 }
719