xref: /xiu/protocol/rtmp/src/relay/pull_client.rs (revision b36cf5da)
18e71d710SHarlan use streamhub::stream::StreamIdentifier;
28e71d710SHarlan 
3b1840569SHarlanC use {
4b1840569SHarlanC     super::errors::ClientError,
58e71d710SHarlan     crate::session::client_session::{ClientSession, ClientType},
6*b36cf5daSHarlan     streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
7b1840569SHarlanC     tokio::net::TcpStream,
8b1840569SHarlanC };
96645fa44SHarlanC 
106645fa44SHarlanC pub struct PullClient {
116645fa44SHarlanC     address: String,
1213bac29aSHarlan     client_event_consumer: BroadcastEventReceiver,
138e71d710SHarlan     channel_event_producer: StreamHubEventSender,
146645fa44SHarlanC }
156645fa44SHarlanC 
166645fa44SHarlanC impl PullClient {
new( address: String, consumer: BroadcastEventReceiver, producer: StreamHubEventSender, ) -> Self176645fa44SHarlanC     pub fn new(
186645fa44SHarlanC         address: String,
1913bac29aSHarlan         consumer: BroadcastEventReceiver,
208e71d710SHarlan         producer: StreamHubEventSender,
216645fa44SHarlanC     ) -> Self {
226645fa44SHarlanC         Self {
2385c0af6aSLuca Barbato             address,
246645fa44SHarlanC 
256645fa44SHarlanC             client_event_consumer: consumer,
266645fa44SHarlanC             channel_event_producer: producer,
276645fa44SHarlanC         }
286645fa44SHarlanC     }
296645fa44SHarlanC 
run(&mut self) -> Result<(), ClientError>30b1840569SHarlanC     pub async fn run(&mut self) -> Result<(), ClientError> {
316645fa44SHarlanC         loop {
32*b36cf5daSHarlan             let event = self.client_event_consumer.recv().await?;
330ca99c20SHarlan 
34*b36cf5daSHarlan             if let BroadcastEvent::Subscribe {
35*b36cf5daSHarlan                 identifier:
36*b36cf5daSHarlan                     StreamIdentifier::Rtmp {
376645fa44SHarlanC                         app_name,
386645fa44SHarlanC                         stream_name,
39*b36cf5daSHarlan                     },
40*b36cf5daSHarlan             } = event
410ca99c20SHarlan             {
422d9c981bSHarlanC                 log::info!(
432d9c981bSHarlanC                     "receive pull event, app_name :{}, stream_name: {}",
442d9c981bSHarlanC                     app_name,
452d9c981bSHarlanC                     stream_name
462d9c981bSHarlanC                 );
47b97dcde8SHarlanC                 let stream = TcpStream::connect(self.address.clone()).await?;
48b97dcde8SHarlanC 
49b97dcde8SHarlanC                 let mut client_session = ClientSession::new(
50b97dcde8SHarlanC                     stream,
51b97dcde8SHarlanC                     ClientType::Play,
52c8d4d932SHarlan                     self.address.clone(),
53b97dcde8SHarlanC                     app_name.clone(),
54b97dcde8SHarlanC                     stream_name.clone(),
55b97dcde8SHarlanC                     self.channel_event_producer.clone(),
568e71d710SHarlan                     0,
57b97dcde8SHarlanC                 );
58b97dcde8SHarlanC 
59b97dcde8SHarlanC                 tokio::spawn(async move {
60b97dcde8SHarlanC                     if let Err(err) = client_session.run().await {
615de1eabbSHarlanC                         log::error!("client_session as pull client run error: {}", err);
62b97dcde8SHarlanC                     }
63b97dcde8SHarlanC                 });
64b97dcde8SHarlanC             }
656645fa44SHarlanC         }
666645fa44SHarlanC     }
676645fa44SHarlanC }
68