1 use {
2 anyhow::Result,
3 clap::{value_parser, Arg, Command},
4 rtmp::session::client_session::ClientSession,
5 rtmp::session::client_session::ClientType,
6 rtmp::utils::RtmpUrlParser,
7 std::env,
8 std::process::exit,
9 streamhub::StreamsHub,
10 tokio::net::TcpStream,
11 tokio::signal,
12 tokio::time::Duration,
13 };
14
15 #[tokio::main]
main() -> Result<()>16 async fn main() -> Result<()> {
17 env::set_var("RUST_LOG", "info");
18 env_logger::init();
19
20 let mut cmd = Command::new("pprtmp")
21 .bin_name("pprtmp")
22 .version("0.1.0")
23 .author("HarlanC <[email protected]>")
24 .about("pull and push rtmp!!!")
25 .arg(
26 Arg::new("pullrtmp")
27 .long("pull_rtmp_url")
28 .short('i')
29 .value_name("path")
30 .help("Specify the pull rtmp url.")
31 .value_parser(value_parser!(String))
32 .required(true),
33 )
34 .arg(
35 Arg::new("pushrtmp")
36 .long("push_rtmp_url")
37 .short('o')
38 .value_name("path")
39 .help("Specify the push rtmp url.")
40 .value_parser(value_parser!(String))
41 .required(true),
42 );
43
44 let args: Vec<String> = env::args().collect();
45 if 1 == args.len() {
46 cmd.print_help()?;
47 return Ok(());
48 }
49 let matches = cmd.clone().get_matches();
50 let pull_rtmp_url = matches.get_one::<String>("pullrtmp").unwrap().clone();
51 let push_rtmp_url = matches.get_one::<String>("pushrtmp").unwrap().clone();
52
53 let mut stream_hub = StreamsHub::new(None);
54 let producer = stream_hub.get_hub_event_sender();
55 tokio::spawn(async move { stream_hub.run().await });
56
57 let mut pull_parser = RtmpUrlParser::new(pull_rtmp_url);
58 if let Err(err) = pull_parser.parse_url() {
59 log::error!("err: {}", err);
60 }
61 pull_parser.append_port(String::from("1935"));
62 let stream1 = TcpStream::connect(pull_parser.raw_domain_name.clone()).await?;
63 let mut pull_client_session = ClientSession::new(
64 stream1,
65 ClientType::Play,
66 pull_parser.raw_domain_name,
67 pull_parser.app_name.clone(),
68 pull_parser.raw_stream_name,
69 producer.clone(),
70 0,
71 );
72 tokio::spawn(async move {
73 if let Err(err) = pull_client_session.run().await {
74 log::error!("pull_client_session as pull client run error: {}", err);
75 }
76 });
77
78 tokio::time::sleep(Duration::from_secs(2)).await;
79
80 let mut push_parser = RtmpUrlParser::new(push_rtmp_url);
81 if let Err(err) = push_parser.parse_url() {
82 log::error!("err: {}", err);
83 }
84 push_parser.append_port(String::from("1935"));
85 // push the rtmp stream from local to remote rtmp server
86 let stream2 = TcpStream::connect(push_parser.raw_domain_name.clone()).await?;
87 let mut push_client_session = ClientSession::new(
88 stream2,
89 ClientType::Publish,
90 push_parser.raw_domain_name,
91 push_parser.app_name,
92 push_parser.raw_stream_name,
93 producer.clone(),
94 0,
95 );
96
97 push_client_session.subscribe(pull_parser.app_name, pull_parser.stream_name);
98 tokio::spawn(async move {
99 if let Err(err) = push_client_session.run().await {
100 log::error!("push_client_session as push client run error: {}", err);
101 exit(0);
102 }
103 });
104
105 signal::ctrl_c().await?;
106 Ok(())
107 }
108