1 pub mod errors; 2 pub mod rtsp2rtmp; 3 4 use streamhub::{ 5 define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, 6 stream::StreamIdentifier, 7 }; 8 9 use self::{errors::RtmpRemuxerError, rtsp2rtmp::Rtsp2RtmpRemuxerSession}; 10 11 //Receive publish event from stream hub and 12 //remux from other protocols to rtmp 13 pub struct RtmpRemuxer { 14 receiver: BroadcastEventReceiver, 15 event_producer: StreamHubEventSender, 16 } 17 18 impl RtmpRemuxer { new(receiver: BroadcastEventReceiver, event_producer: StreamHubEventSender) -> Self19 pub fn new(receiver: BroadcastEventReceiver, event_producer: StreamHubEventSender) -> Self { 20 Self { 21 receiver, 22 event_producer, 23 } 24 } run(&mut self) -> Result<(), RtmpRemuxerError>25 pub async fn run(&mut self) -> Result<(), RtmpRemuxerError> { 26 log::info!("rtmp remuxer start..."); 27 28 loop { 29 let val = self.receiver.recv().await?; 30 log::info!("{:?}", val); 31 match val { 32 BroadcastEvent::Publish { identifier } => { 33 if let StreamIdentifier::Rtsp { stream_path } = identifier { 34 let mut session = 35 Rtsp2RtmpRemuxerSession::new(stream_path, self.event_producer.clone()); 36 tokio::spawn(async move { 37 if let Err(err) = session.run().await { 38 log::error!("rtsp2rtmp session error: {}", err); 39 } 40 }); 41 } 42 } 43 _ => { 44 log::trace!("other infos..."); 45 } 46 } 47 } 48 } 49 } 50