1 use { 2 super::{errors::HlsError, flv_data_receiver::FlvDataReceiver}, 3 streamhub::{ 4 define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, 5 stream::StreamIdentifier, 6 }, 7 }; 8 9 pub struct HlsRemuxer { 10 client_event_consumer: BroadcastEventReceiver, 11 event_producer: StreamHubEventSender, 12 need_record: bool, 13 } 14 15 impl HlsRemuxer { new( consumer: BroadcastEventReceiver, event_producer: StreamHubEventSender, need_record: bool, ) -> Self16 pub fn new( 17 consumer: BroadcastEventReceiver, 18 event_producer: StreamHubEventSender, 19 need_record: bool, 20 ) -> Self { 21 Self { 22 client_event_consumer: consumer, 23 event_producer, 24 need_record, 25 } 26 } 27 run(&mut self) -> Result<(), HlsError>28 pub async fn run(&mut self) -> Result<(), HlsError> { 29 loop { 30 let val = self.client_event_consumer.recv().await?; 31 match val { 32 BroadcastEvent::Publish { identifier } => { 33 if let StreamIdentifier::Rtmp { 34 app_name, 35 stream_name, 36 } = identifier 37 { 38 let mut rtmp_subscriber = FlvDataReceiver::new( 39 app_name, 40 stream_name, 41 self.event_producer.clone(), 42 5, 43 self.need_record, 44 ); 45 46 tokio::spawn(async move { 47 if let Err(err) = rtmp_subscriber.run().await { 48 println!("hls handler run error {err}"); 49 } 50 }); 51 } 52 } 53 _ => { 54 log::trace!("other infos..."); 55 } 56 } 57 } 58 } 59 } 60