1 use streamhub::stream::StreamIdentifier; 2 3 use { 4 super::errors::ClientError, 5 crate::session::client_session::{ClientSession, ClientType}, 6 streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender}, 7 tokio::net::TcpStream, 8 }; 9 10 pub struct PullClient { 11 address: String, 12 client_event_consumer: BroadcastEventReceiver, 13 channel_event_producer: StreamHubEventSender, 14 } 15 16 impl PullClient { new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self17 pub fn new( 18 address: String, 19 consumer: BroadcastEventReceiver, 20 producer: StreamHubEventSender, 21 ) -> Self { 22 Self { 23 address, 24 25 client_event_consumer: consumer, 26 channel_event_producer: producer, 27 } 28 } 29 run(&mut self) -> Result<(), ClientError>30 pub async fn run(&mut self) -> Result<(), ClientError> { 31 loop { 32 let event = self.client_event_consumer.recv().await?; 33 34 if let BroadcastEvent::Subscribe { 35 identifier: 36 StreamIdentifier::Rtmp { 37 app_name, 38 stream_name, 39 }, 40 } = event 41 { 42 log::info!( 43 "receive pull event, app_name :{}, stream_name: {}", 44 app_name, 45 stream_name 46 ); 47 let stream = TcpStream::connect(self.address.clone()).await?; 48 49 let mut client_session = ClientSession::new( 50 stream, 51 ClientType::Play, 52 self.address.clone(), 53 app_name.clone(), 54 stream_name.clone(), 55 self.channel_event_producer.clone(), 56 0, 57 ); 58 59 tokio::spawn(async move { 60 if let Err(err) = client_session.run().await { 61 log::error!("client_session as pull client run error: {}", err); 62 } 63 }); 64 } 65 } 66 } 67 } 68