xref: /xiu/protocol/rtmp/src/rtmp.rs (revision 69de9bbd)
1 use streamhub::define::StreamHubEventSender;
2 
3 use super::session::server_session;
4 use std::net::SocketAddr;
5 use tokio::io::Error;
6 use tokio::net::TcpListener;
7 
8 pub struct RtmpServer {
9     address: String,
10     event_producer: StreamHubEventSender,
11     gop_num: usize,
12 }
13 
14 impl RtmpServer {
new(address: String, event_producer: StreamHubEventSender, gop_num: usize) -> Self15     pub fn new(address: String, event_producer: StreamHubEventSender, gop_num: usize) -> Self {
16         Self {
17             address,
18             event_producer,
19             gop_num,
20         }
21     }
22 
run(&mut self) -> Result<(), Error>23     pub async fn run(&mut self) -> Result<(), Error> {
24         let socket_addr: &SocketAddr = &self.address.parse().unwrap();
25         let listener = TcpListener::bind(socket_addr).await?;
26 
27         log::info!("Rtmp server listening on tcp://{}", socket_addr);
28         loop {
29             let (tcp_stream, _) = listener.accept().await?;
30             //tcp_stream.set_keepalive(Some(Duration::from_secs(30)))?;
31 
32             let mut session = server_session::ServerSession::new(
33                 tcp_stream,
34                 self.event_producer.clone(),
35                 self.gop_num,
36             );
37             tokio::spawn(async move {
38                 if let Err(err) = session.run().await {
39                     log::info!(
40                         "session run error: session_type: {}, app_name: {}, stream_name: {}, err: {}",
41                         session.common.session_type,
42                         session.app_name,
43                         session.stream_name,
44                         err
45                     );
46                 }
47             });
48         }
49     }
50 }
51