xref: /xiu/protocol/rtmp/src/relay/push_client.rs (revision 13bac29a)
1 use {
2     super::errors::ClientError,
3     crate::session::client_session::{ClientSession, ClientType},
4     streamhub::{
5         define::{StreamHubEventSender, BroadcastEvent, BroadcastEventReceiver},
6         stream::StreamIdentifier,
7     },
8     tokio::net::TcpStream,
9 };
10 
11 pub struct PushClient {
12     address: String,
13     client_event_consumer: BroadcastEventReceiver,
14     channel_event_producer: StreamHubEventSender,
15 }
16 
17 impl PushClient {
new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self18     pub fn new(
19         address: String,
20         consumer: BroadcastEventReceiver,
21         producer: StreamHubEventSender,
22     ) -> Self {
23         Self {
24             address,
25 
26             client_event_consumer: consumer,
27             channel_event_producer: producer,
28         }
29     }
30 
run(&mut self) -> Result<(), ClientError>31     pub async fn run(&mut self) -> Result<(), ClientError> {
32         log::info!("push client run...");
33 
34         loop {
35             let val = self.client_event_consumer.recv().await?;
36 
37             match val {
38                 BroadcastEvent::Publish { identifier } => {
39                     if let StreamIdentifier::Rtmp {
40                         app_name,
41                         stream_name,
42                     } = identifier
43                     {
44                         log::info!(
45                             "publish app_name: {} stream_name: {} address: {}",
46                             app_name.clone(),
47                             stream_name.clone(),
48                             self.address.clone()
49                         );
50                         let stream = TcpStream::connect(self.address.clone()).await?;
51 
52                         let mut client_session = ClientSession::new(
53                             stream,
54                             ClientType::Publish,
55                             self.address.clone(),
56                             app_name.clone(),
57                             stream_name.clone(),
58                             self.channel_event_producer.clone(),
59                             0,
60                         );
61 
62                         tokio::spawn(async move {
63                             if let Err(err) = client_session.run().await {
64                                 log::error!("client_session as push client run error: {}", err);
65                             }
66                         });
67                     }
68                 }
69 
70                 _ => {
71                     log::info!("push client receive other events");
72                 }
73             }
74         }
75     }
76 }
77