xref: /xiu/application/pprtmp/src/main.rs (revision 8e71d710)
1 use {
2     anyhow::Result,
3     clap::{value_parser, Arg, Command},
4     rtmp::session::client_session::ClientSession,
5     rtmp::session::client_session::ClientType,
6     rtmp::utils::RtmpUrlParser,
7     std::env,
8     std::process::exit,
9     streamhub::StreamsHub,
10     tokio::net::TcpStream,
11     tokio::signal,
12     tokio::time::Duration,
13 };
14 
15 #[tokio::main]
main() -> Result<()>16 async fn main() -> Result<()> {
17     env::set_var("RUST_LOG", "info");
18     env_logger::init();
19 
20     let mut cmd = Command::new("pprtmp")
21         .bin_name("pprtmp")
22         .version("0.1.0")
23         .author("HarlanC <[email protected]>")
24         .about("pull and push rtmp!!!")
25         .arg(
26             Arg::new("pullrtmp")
27                 .long("pull_rtmp_url")
28                 .short('i')
29                 .value_name("path")
30                 .help("Specify the pull rtmp url.")
31                 .value_parser(value_parser!(String))
32                 .required(true),
33         )
34         .arg(
35             Arg::new("pushrtmp")
36                 .long("push_rtmp_url")
37                 .short('o')
38                 .value_name("path")
39                 .help("Specify the push rtmp url.")
40                 .value_parser(value_parser!(String))
41                 .required(true),
42         );
43 
44     let args: Vec<String> = env::args().collect();
45     if 1 == args.len() {
46         cmd.print_help()?;
47         return Ok(());
48     }
49     let matches = cmd.clone().get_matches();
50     let pull_rtmp_url = matches.get_one::<String>("pullrtmp").unwrap().clone();
51     let push_rtmp_url = matches.get_one::<String>("pushrtmp").unwrap().clone();
52 
53     let mut stream_hub = StreamsHub::new(None);
54     let producer = stream_hub.get_hub_event_sender();
55     tokio::spawn(async move { stream_hub.run().await });
56 
57     let mut pull_parser = RtmpUrlParser::new(pull_rtmp_url);
58     if let Err(err) = pull_parser.parse_url() {
59         log::error!("err: {}", err);
60     }
61     pull_parser.append_port(String::from("1935"));
62     let stream1 = TcpStream::connect(pull_parser.raw_domain_name.clone()).await?;
63     let mut pull_client_session = ClientSession::new(
64         stream1,
65         ClientType::Play,
66         pull_parser.raw_domain_name,
67         pull_parser.app_name.clone(),
68         pull_parser.raw_stream_name,
69         producer.clone(),
70         0,
71     );
72     tokio::spawn(async move {
73         if let Err(err) = pull_client_session.run().await {
74             log::error!("pull_client_session as pull client run error: {}", err);
75         }
76     });
77 
78     tokio::time::sleep(Duration::from_secs(2)).await;
79 
80     let mut push_parser = RtmpUrlParser::new(push_rtmp_url);
81     if let Err(err) = push_parser.parse_url() {
82         log::error!("err: {}", err);
83     }
84     push_parser.append_port(String::from("1935"));
85     // push the rtmp stream from local to remote rtmp server
86     let stream2 = TcpStream::connect(push_parser.raw_domain_name.clone()).await?;
87     let mut push_client_session = ClientSession::new(
88         stream2,
89         ClientType::Publish,
90         push_parser.raw_domain_name,
91         push_parser.app_name,
92         push_parser.raw_stream_name,
93         producer.clone(),
94         0,
95     );
96 
97     push_client_session.subscribe(pull_parser.app_name, pull_parser.stream_name);
98     tokio::spawn(async move {
99         if let Err(err) = push_client_session.run().await {
100             log::error!("push_client_session as push client run error: {}", err);
101             exit(0);
102         }
103     });
104 
105     signal::ctrl_c().await?;
106     Ok(())
107 }
108