xref: /xiu/protocol/rtmp/src/relay/pull_client.rs (revision 88d91efd)
1 use {
2     super::errors::ClientError,
3     crate::{
4         channels::define::{ChannelEventProducer, ClientEvent, ClientEventConsumer},
5         session::client_session::{ClientSession, ClientType},
6     },
7     tokio::net::TcpStream,
8 };
9 
10 pub struct PullClient {
11     address: String,
12     client_event_consumer: ClientEventConsumer,
13     channel_event_producer: ChannelEventProducer,
14 }
15 
16 impl PullClient {
17     pub fn new(
18         address: String,
19         consumer: ClientEventConsumer,
20         producer: ChannelEventProducer,
21     ) -> Self {
22         Self {
23             address: address,
24 
25             client_event_consumer: consumer,
26             channel_event_producer: producer,
27         }
28     }
29 
30     pub async fn run(&mut self) -> Result<(), ClientError> {
31         loop {
32             let val = self.client_event_consumer.recv().await?;
33             match val {
34                 ClientEvent::Subscribe {
35                     app_name,
36                     stream_name,
37                 } => {
38                     let stream = TcpStream::connect(self.address.clone()).await?;
39 
40                     let mut client_session = ClientSession::new(
41                         stream,
42                         ClientType::Play,
43                         app_name.clone(),
44                         stream_name.clone(),
45                         self.channel_event_producer.clone(),
46                     );
47 
48                     tokio::spawn(async move {
49                         if let Err(err) = client_session.run().await {
50                             log::error!("client_session as pull client run error: {}", err);
51                         }
52                     });
53                 }
54                 _ => {}
55             }
56         }
57     }
58 }
59