1 use { 2 super::errors::ClientError, 3 crate::{ 4 channels::define::{ChannelEventProducer, ClientEvent, ClientEventConsumer}, 5 session::client_session::{ClientSession, ClientType}, 6 }, 7 tokio::net::TcpStream, 8 }; 9 10 pub struct PullClient { 11 address: String, 12 client_event_consumer: ClientEventConsumer, 13 channel_event_producer: ChannelEventProducer, 14 } 15 16 impl PullClient { 17 pub fn new( 18 address: String, 19 consumer: ClientEventConsumer, 20 producer: ChannelEventProducer, 21 ) -> Self { 22 Self { 23 address: address, 24 25 client_event_consumer: consumer, 26 channel_event_producer: producer, 27 } 28 } 29 30 pub async fn run(&mut self) -> Result<(), ClientError> { 31 loop { 32 let val = self.client_event_consumer.recv().await?; 33 match val { 34 ClientEvent::Subscribe { 35 app_name, 36 stream_name, 37 } => { 38 let stream = TcpStream::connect(self.address.clone()).await?; 39 40 let mut client_session = ClientSession::new( 41 stream, 42 ClientType::Play, 43 app_name.clone(), 44 stream_name.clone(), 45 self.channel_event_producer.clone(), 46 ); 47 48 tokio::spawn(async move { 49 if let Err(err) = client_session.run().await { 50 log::error!("client_session as pull client run error: {}", err); 51 } 52 }); 53 } 54 _ => {} 55 } 56 } 57 } 58 } 59