1 use { 2 super::errors::ClientError, 3 crate::session::client_session::{ClientSession, ClientType}, 4 streamhub::{ 5 define::{StreamHubEventSender, BroadcastEvent, BroadcastEventReceiver}, 6 stream::StreamIdentifier, 7 }, 8 tokio::net::TcpStream, 9 }; 10 11 pub struct PushClient { 12 address: String, 13 client_event_consumer: BroadcastEventReceiver, 14 channel_event_producer: StreamHubEventSender, 15 } 16 17 impl PushClient { new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self18 pub fn new( 19 address: String, 20 consumer: BroadcastEventReceiver, 21 producer: StreamHubEventSender, 22 ) -> Self { 23 Self { 24 address, 25 26 client_event_consumer: consumer, 27 channel_event_producer: producer, 28 } 29 } 30 run(&mut self) -> Result<(), ClientError>31 pub async fn run(&mut self) -> Result<(), ClientError> { 32 log::info!("push client run..."); 33 34 loop { 35 let val = self.client_event_consumer.recv().await?; 36 37 match val { 38 BroadcastEvent::Publish { identifier } => { 39 if let StreamIdentifier::Rtmp { 40 app_name, 41 stream_name, 42 } = identifier 43 { 44 log::info!( 45 "publish app_name: {} stream_name: {} address: {}", 46 app_name.clone(), 47 stream_name.clone(), 48 self.address.clone() 49 ); 50 let stream = TcpStream::connect(self.address.clone()).await?; 51 52 let mut client_session = ClientSession::new( 53 stream, 54 ClientType::Publish, 55 self.address.clone(), 56 app_name.clone(), 57 stream_name.clone(), 58 self.channel_event_producer.clone(), 59 0, 60 ); 61 62 tokio::spawn(async move { 63 if let Err(err) = client_session.run().await { 64 log::error!("client_session as push client run error: {}", err); 65 } 66 }); 67 } 68 } 69 70 _ => { 71 log::info!("push client receive other events"); 72 } 73 } 74 } 75 } 76 } 77