xref: /xiu/protocol/hls/src/remuxer.rs (revision b754b692)
1 use {
2     super::{errors::HlsError, flv_data_receiver::FlvDataReceiver},
3     streamhub::{
4         define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
5         stream::StreamIdentifier,
6     },
7 };
8 
9 pub struct HlsRemuxer {
10     client_event_consumer: BroadcastEventReceiver,
11     event_producer: StreamHubEventSender,
12     need_record: bool,
13 }
14 
15 impl HlsRemuxer {
new( consumer: BroadcastEventReceiver, event_producer: StreamHubEventSender, need_record: bool, ) -> Self16     pub fn new(
17         consumer: BroadcastEventReceiver,
18         event_producer: StreamHubEventSender,
19         need_record: bool,
20     ) -> Self {
21         Self {
22             client_event_consumer: consumer,
23             event_producer,
24             need_record,
25         }
26     }
27 
run(&mut self) -> Result<(), HlsError>28     pub async fn run(&mut self) -> Result<(), HlsError> {
29         loop {
30             let val = self.client_event_consumer.recv().await?;
31             match val {
32                 BroadcastEvent::Publish { identifier } => {
33                     if let StreamIdentifier::Rtmp {
34                         app_name,
35                         stream_name,
36                     } = identifier
37                     {
38                         let mut rtmp_subscriber = FlvDataReceiver::new(
39                             app_name,
40                             stream_name,
41                             self.event_producer.clone(),
42                             5,
43                             self.need_record,
44                         );
45 
46                         tokio::spawn(async move {
47                             if let Err(err) = rtmp_subscriber.run().await {
48                                 println!("hls handler run error {err}");
49                             }
50                         });
51                     }
52                 }
53                 _ => {
54                     log::trace!("other infos...");
55                 }
56             }
57         }
58     }
59 }
60