1 use { 2 anyhow::Result, 3 clap::{value_parser, Arg, Command}, 4 rtmp::session::client_session::ClientSession, 5 rtmp::session::client_session::ClientType, 6 rtmp::utils::RtmpUrlParser, 7 std::env, 8 std::process::exit, 9 streamhub::StreamsHub, 10 tokio::net::TcpStream, 11 tokio::signal, 12 tokio::time::Duration, 13 }; 14 15 #[tokio::main] 16 async fn main() -> Result<()> { 17 env::set_var("RUST_LOG", "info"); 18 env_logger::init(); 19 20 let mut cmd = Command::new("pprtmp") 21 .bin_name("pprtmp") 22 .version("0.1.0") 23 .author("HarlanC <[email protected]>") 24 .about("pull and push rtmp!!!") 25 .arg( 26 Arg::new("pullrtmp") 27 .long("pull_rtmp_url") 28 .short('i') 29 .value_name("path") 30 .help("Specify the pull rtmp url.") 31 .value_parser(value_parser!(String)) 32 .required(true), 33 ) 34 .arg( 35 Arg::new("pushrtmp") 36 .long("push_rtmp_url") 37 .short('o') 38 .value_name("path") 39 .help("Specify the push rtmp url.") 40 .value_parser(value_parser!(String)) 41 .required(true), 42 ); 43 44 let args: Vec<String> = env::args().collect(); 45 if 1 == args.len() { 46 cmd.print_help()?; 47 return Ok(()); 48 } 49 let matches = cmd.clone().get_matches(); 50 let pull_rtmp_url = matches.get_one::<String>("pullrtmp").unwrap().clone(); 51 let push_rtmp_url = matches.get_one::<String>("pushrtmp").unwrap().clone(); 52 53 let mut stream_hub = StreamsHub::new(None); 54 let producer = stream_hub.get_hub_event_sender(); 55 tokio::spawn(async move { stream_hub.run().await }); 56 57 let mut pull_parser = RtmpUrlParser::new(pull_rtmp_url); 58 if let Err(err) = pull_parser.parse_url() { 59 log::error!("err: {}", err); 60 } 61 pull_parser.append_port(String::from("1935")); 62 let stream1 = TcpStream::connect(pull_parser.raw_domain_name.clone()).await?; 63 let mut pull_client_session = ClientSession::new( 64 stream1, 65 ClientType::Play, 66 pull_parser.raw_domain_name, 67 pull_parser.app_name.clone(), 68 pull_parser.raw_stream_name, 69 producer.clone(), 70 0, 71 ); 72 tokio::spawn(async move { 73 if let Err(err) = pull_client_session.run().await { 74 log::error!("pull_client_session as pull client run error: {}", err); 75 } 76 }); 77 78 tokio::time::sleep(Duration::from_secs(2)).await; 79 80 let mut push_parser = RtmpUrlParser::new(push_rtmp_url); 81 if let Err(err) = push_parser.parse_url() { 82 log::error!("err: {}", err); 83 } 84 push_parser.append_port(String::from("1935")); 85 // push the rtmp stream from local to remote rtmp server 86 let stream2 = TcpStream::connect(push_parser.raw_domain_name.clone()).await?; 87 let mut push_client_session = ClientSession::new( 88 stream2, 89 ClientType::Publish, 90 push_parser.raw_domain_name, 91 push_parser.app_name, 92 push_parser.raw_stream_name, 93 producer.clone(), 94 0, 95 ); 96 97 push_client_session.subscribe(pull_parser.app_name, pull_parser.stream_name); 98 tokio::spawn(async move { 99 if let Err(err) = push_client_session.run().await { 100 log::error!("push_client_session as push client run error: {}", err); 101 exit(0); 102 } 103 }); 104 105 signal::ctrl_c().await?; 106 Ok(()) 107 } 108