18e71d710SHarlan use streamhub::stream::StreamIdentifier; 28e71d710SHarlan 3b1840569SHarlanC use { 4b1840569SHarlanC super::errors::ClientError, 58e71d710SHarlan crate::session::client_session::{ClientSession, ClientType}, 6*b36cf5daSHarlan streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, 7b1840569SHarlanC tokio::net::TcpStream, 8b1840569SHarlanC }; 96645fa44SHarlanC 106645fa44SHarlanC pub struct PullClient { 116645fa44SHarlanC address: String, 1213bac29aSHarlan client_event_consumer: BroadcastEventReceiver, 138e71d710SHarlan channel_event_producer: StreamHubEventSender, 146645fa44SHarlanC } 156645fa44SHarlanC 166645fa44SHarlanC impl PullClient { new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self176645fa44SHarlanC pub fn new( 186645fa44SHarlanC address: String, 1913bac29aSHarlan consumer: BroadcastEventReceiver, 208e71d710SHarlan producer: StreamHubEventSender, 216645fa44SHarlanC ) -> Self { 226645fa44SHarlanC Self { 2385c0af6aSLuca Barbato address, 246645fa44SHarlanC 256645fa44SHarlanC client_event_consumer: consumer, 266645fa44SHarlanC channel_event_producer: producer, 276645fa44SHarlanC } 286645fa44SHarlanC } 296645fa44SHarlanC run(&mut self) -> Result<(), ClientError>30b1840569SHarlanC pub async fn run(&mut self) -> Result<(), ClientError> { 316645fa44SHarlanC loop { 32*b36cf5daSHarlan let event = self.client_event_consumer.recv().await?; 330ca99c20SHarlan 34*b36cf5daSHarlan if let BroadcastEvent::Subscribe { 35*b36cf5daSHarlan identifier: 36*b36cf5daSHarlan StreamIdentifier::Rtmp { 376645fa44SHarlanC app_name, 386645fa44SHarlanC stream_name, 39*b36cf5daSHarlan }, 40*b36cf5daSHarlan } = event 410ca99c20SHarlan { 422d9c981bSHarlanC log::info!( 432d9c981bSHarlanC "receive pull event, app_name :{}, stream_name: {}", 442d9c981bSHarlanC app_name, 452d9c981bSHarlanC stream_name 462d9c981bSHarlanC ); 47b97dcde8SHarlanC let stream = TcpStream::connect(self.address.clone()).await?; 48b97dcde8SHarlanC 49b97dcde8SHarlanC let mut client_session = ClientSession::new( 50b97dcde8SHarlanC stream, 51b97dcde8SHarlanC ClientType::Play, 52c8d4d932SHarlan self.address.clone(), 53b97dcde8SHarlanC app_name.clone(), 54b97dcde8SHarlanC stream_name.clone(), 55b97dcde8SHarlanC self.channel_event_producer.clone(), 568e71d710SHarlan 0, 57b97dcde8SHarlanC ); 58b97dcde8SHarlanC 59b97dcde8SHarlanC tokio::spawn(async move { 60b97dcde8SHarlanC if let Err(err) = client_session.run().await { 615de1eabbSHarlanC log::error!("client_session as pull client run error: {}", err); 62b97dcde8SHarlanC } 63b97dcde8SHarlanC }); 64b97dcde8SHarlanC } 656645fa44SHarlanC } 666645fa44SHarlanC } 676645fa44SHarlanC } 68