xref: /xiu/protocol/rtmp/src/remuxer/mod.rs (revision 69de9bbd)
1 pub mod errors;
2 pub mod rtsp2rtmp;
3 
4 use streamhub::{
5     define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
6     stream::StreamIdentifier,
7 };
8 
9 use self::{errors::RtmpRemuxerError, rtsp2rtmp::Rtsp2RtmpRemuxerSession};
10 
11 //Receive publish event from stream hub and
12 //remux from other protocols to rtmp
13 pub struct RtmpRemuxer {
14     receiver: BroadcastEventReceiver,
15     event_producer: StreamHubEventSender,
16 }
17 
18 impl RtmpRemuxer {
new(receiver: BroadcastEventReceiver, event_producer: StreamHubEventSender) -> Self19     pub fn new(receiver: BroadcastEventReceiver, event_producer: StreamHubEventSender) -> Self {
20         Self {
21             receiver,
22             event_producer,
23         }
24     }
run(&mut self) -> Result<(), RtmpRemuxerError>25     pub async fn run(&mut self) -> Result<(), RtmpRemuxerError> {
26         log::info!("rtmp remuxer start...");
27 
28         loop {
29             let val = self.receiver.recv().await?;
30             log::info!("{:?}", val);
31             match val {
32                 BroadcastEvent::Publish { identifier } => {
33                     if let StreamIdentifier::Rtsp { stream_path } = identifier {
34                         let mut session =
35                             Rtsp2RtmpRemuxerSession::new(stream_path, self.event_producer.clone());
36                         tokio::spawn(async move {
37                             if let Err(err) = session.run().await {
38                                 log::error!("rtsp2rtmp session error: {}", err);
39                             }
40                         });
41                     }
42                 }
43                 _ => {
44                     log::trace!("other infos...");
45                 }
46             }
47         }
48     }
49 }
50