xref: /xiu/protocol/rtmp/src/relay/pull_client.rs (revision b36cf5da)
1 use streamhub::stream::StreamIdentifier;
2 
3 use {
4     super::errors::ClientError,
5     crate::session::client_session::{ClientSession, ClientType},
6     streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
7     tokio::net::TcpStream,
8 };
9 
10 pub struct PullClient {
11     address: String,
12     client_event_consumer: BroadcastEventReceiver,
13     channel_event_producer: StreamHubEventSender,
14 }
15 
16 impl PullClient {
new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self17     pub fn new(
18         address: String,
19         consumer: BroadcastEventReceiver,
20         producer: StreamHubEventSender,
21     ) -> Self {
22         Self {
23             address,
24 
25             client_event_consumer: consumer,
26             channel_event_producer: producer,
27         }
28     }
29 
run(&mut self) -> Result<(), ClientError>30     pub async fn run(&mut self) -> Result<(), ClientError> {
31         loop {
32             let event = self.client_event_consumer.recv().await?;
33 
34             if let BroadcastEvent::Subscribe {
35                 identifier:
36                     StreamIdentifier::Rtmp {
37                         app_name,
38                         stream_name,
39                     },
40             } = event
41             {
42                 log::info!(
43                     "receive pull event, app_name :{}, stream_name: {}",
44                     app_name,
45                     stream_name
46                 );
47                 let stream = TcpStream::connect(self.address.clone()).await?;
48 
49                 let mut client_session = ClientSession::new(
50                     stream,
51                     ClientType::Play,
52                     self.address.clone(),
53                     app_name.clone(),
54                     stream_name.clone(),
55                     self.channel_event_producer.clone(),
56                     0,
57                 );
58 
59                 tokio::spawn(async move {
60                     if let Err(err) = client_session.run().await {
61                         log::error!("client_session as pull client run error: {}", err);
62                     }
63                 });
64             }
65         }
66     }
67 }
68