1 use webrtc_sctp::association::*;
2 use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
3 use webrtc_sctp::stream::*;
4 use webrtc_sctp::Error;
5
6 use bytes::Bytes;
7 use clap::{App, AppSettings, Arg};
8 use std::net::Shutdown;
9 use std::sync::Arc;
10 use tokio::net::UdpSocket;
11 use tokio::signal;
12 use tokio::sync::mpsc;
13
14 // RUST_LOG=trace cargo run --color=always --package webrtc-sctp --example ping -- --server 0.0.0.0:5678
15
16 #[tokio::main]
main() -> Result<(), Error>17 async fn main() -> Result<(), Error> {
18 /*env_logger::Builder::new()
19 .format(|buf, record| {
20 writeln!(
21 buf,
22 "{}:{} [{}] {} - {}",
23 record.file().unwrap_or("unknown"),
24 record.line().unwrap_or(0),
25 record.level(),
26 chrono::Local::now().format("%H:%M:%S.%6f"),
27 record.args()
28 )
29 })
30 .filter(None, log::LevelFilter::Trace)
31 .init();*/
32
33 let mut app = App::new("SCTP Ping")
34 .version("0.1.0")
35 .author("Rain Liu <[email protected]>")
36 .about("An example of SCTP Client")
37 .setting(AppSettings::DeriveDisplayOrder)
38 .setting(AppSettings::SubcommandsNegateReqs)
39 .arg(
40 Arg::with_name("FULLHELP")
41 .help("Prints more detailed help information")
42 .long("fullhelp"),
43 )
44 .arg(
45 Arg::with_name("server")
46 .required_unless("FULLHELP")
47 .takes_value(true)
48 .long("server")
49 .help("SCTP Server name."),
50 );
51
52 let matches = app.clone().get_matches();
53
54 if matches.is_present("FULLHELP") {
55 app.print_long_help().unwrap();
56 std::process::exit(0);
57 }
58
59 let server = matches.value_of("server").unwrap();
60
61 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
62 conn.connect(server).await.unwrap();
63 println!("connecting {server}..");
64
65 let config = Config {
66 net_conn: conn,
67 max_receive_buffer_size: 0,
68 max_message_size: 0,
69 name: "client".to_owned(),
70 };
71 let a = Association::client(config).await?;
72 println!("created a client");
73
74 let stream = a.open_stream(0, PayloadProtocolIdentifier::String).await?;
75 println!("opened a stream");
76
77 // set unordered = true and 10ms treshold for dropping packets
78 stream.set_reliability_params(true, ReliabilityType::Timed, 10);
79
80 let stream_tx = Arc::clone(&stream);
81 tokio::spawn(async move {
82 let mut ping_seq_num = 0;
83 while ping_seq_num < 10 {
84 let ping_msg = format!("ping {ping_seq_num}");
85 println!("sent: {ping_msg}");
86 stream_tx.write(&Bytes::from(ping_msg)).await?;
87
88 ping_seq_num += 1;
89 }
90
91 println!("finished send ping");
92 Result::<(), Error>::Ok(())
93 });
94
95 let (done_tx, mut done_rx) = mpsc::channel::<()>(1);
96 let stream_rx = Arc::clone(&stream);
97 tokio::spawn(async move {
98 let mut buff = vec![0u8; 1024];
99 while let Ok(n) = stream_rx.read(&mut buff).await {
100 let pong_msg = String::from_utf8(buff[..n].to_vec()).unwrap();
101 println!("received: {pong_msg}");
102 }
103
104 println!("finished recv pong");
105 drop(done_tx);
106 });
107
108 println!("Waiting for Ctrl-C...");
109 signal::ctrl_c().await.expect("failed to listen for event");
110 println!("Closing stream and association...");
111
112 stream.shutdown(Shutdown::Both).await?;
113 a.close().await?;
114
115 let _ = done_rx.recv().await;
116
117 Ok(())
118 }
119