xref: /webrtc/sctp/examples/ping.rs (revision 5d8fe953)
1 use webrtc_sctp::association::*;
2 use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
3 use webrtc_sctp::stream::*;
4 use webrtc_sctp::Error;
5 
6 use bytes::Bytes;
7 use clap::{App, AppSettings, Arg};
8 use std::net::Shutdown;
9 use std::sync::Arc;
10 use tokio::net::UdpSocket;
11 use tokio::signal;
12 use tokio::sync::mpsc;
13 
14 // RUST_LOG=trace cargo run --color=always --package webrtc-sctp --example ping -- --server 0.0.0.0:5678
15 
16 #[tokio::main]
main() -> Result<(), Error>17 async fn main() -> Result<(), Error> {
18     /*env_logger::Builder::new()
19     .format(|buf, record| {
20         writeln!(
21             buf,
22             "{}:{} [{}] {} - {}",
23             record.file().unwrap_or("unknown"),
24             record.line().unwrap_or(0),
25             record.level(),
26             chrono::Local::now().format("%H:%M:%S.%6f"),
27             record.args()
28         )
29     })
30     .filter(None, log::LevelFilter::Trace)
31     .init();*/
32 
33     let mut app = App::new("SCTP Ping")
34         .version("0.1.0")
35         .author("Rain Liu <[email protected]>")
36         .about("An example of SCTP Client")
37         .setting(AppSettings::DeriveDisplayOrder)
38         .setting(AppSettings::SubcommandsNegateReqs)
39         .arg(
40             Arg::with_name("FULLHELP")
41                 .help("Prints more detailed help information")
42                 .long("fullhelp"),
43         )
44         .arg(
45             Arg::with_name("server")
46                 .required_unless("FULLHELP")
47                 .takes_value(true)
48                 .long("server")
49                 .help("SCTP Server name."),
50         );
51 
52     let matches = app.clone().get_matches();
53 
54     if matches.is_present("FULLHELP") {
55         app.print_long_help().unwrap();
56         std::process::exit(0);
57     }
58 
59     let server = matches.value_of("server").unwrap();
60 
61     let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
62     conn.connect(server).await.unwrap();
63     println!("connecting {server}..");
64 
65     let config = Config {
66         net_conn: conn,
67         max_receive_buffer_size: 0,
68         max_message_size: 0,
69         name: "client".to_owned(),
70     };
71     let a = Association::client(config).await?;
72     println!("created a client");
73 
74     let stream = a.open_stream(0, PayloadProtocolIdentifier::String).await?;
75     println!("opened a stream");
76 
77     // set unordered = true and 10ms treshold for dropping packets
78     stream.set_reliability_params(true, ReliabilityType::Timed, 10);
79 
80     let stream_tx = Arc::clone(&stream);
81     tokio::spawn(async move {
82         let mut ping_seq_num = 0;
83         while ping_seq_num < 10 {
84             let ping_msg = format!("ping {ping_seq_num}");
85             println!("sent: {ping_msg}");
86             stream_tx.write(&Bytes::from(ping_msg)).await?;
87 
88             ping_seq_num += 1;
89         }
90 
91         println!("finished send ping");
92         Result::<(), Error>::Ok(())
93     });
94 
95     let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
96     let stream_rx = Arc::clone(&stream);
97     tokio::spawn(async move {
98         let mut buff = vec![0u8; 1024];
99         while let Ok(n) = stream_rx.read(&mut buff).await {
100             let pong_msg = String::from_utf8(buff[..n].to_vec()).unwrap();
101             println!("received: {pong_msg}");
102         }
103 
104         println!("finished recv pong");
105         drop(done_tx);
106     });
107 
108     println!("Waiting for Ctrl-C...");
109     signal::ctrl_c().await.expect("failed to listen for event");
110     println!("Closing stream and association...");
111 
112     stream.shutdown(Shutdown::Both).await?;
113     a.close().await?;
114 
115     let _ = done_rx.recv().await;
116 
117     Ok(())
118 }
119