1 use clap::{App, AppSettings, Arg};
2 use std::io::Write;
3 use std::sync::Arc;
4 use tokio::net::UdpSocket;
5 use util::{conn::conn_disconnected_packet::DisconnectedPacketConn, Conn};
6 use webrtc_sctp::association::*;
7 use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
8 use webrtc_sctp::stream::*;
9 use webrtc_sctp::Error;
10
main() -> Result<(), Error>11 fn main() -> Result<(), Error> {
12 env_logger::Builder::new()
13 .format(|buf, record| {
14 writeln!(
15 buf,
16 "{}:{} [{}] {} - {}",
17 record.file().unwrap_or("unknown"),
18 record.line().unwrap_or(0),
19 record.level(),
20 chrono::Local::now().format("%H:%M:%S.%6f"),
21 record.args()
22 )
23 })
24 .filter(None, log::LevelFilter::Warn)
25 .init();
26
27 let mut app = App::new("SCTP Throughput")
28 .version("0.1.0")
29 .about("An example of SCTP Server")
30 .setting(AppSettings::DeriveDisplayOrder)
31 .setting(AppSettings::SubcommandsNegateReqs)
32 .arg(
33 Arg::with_name("FULLHELP")
34 .help("Prints more detailed help information")
35 .long("fullhelp"),
36 )
37 .arg(
38 Arg::with_name("port")
39 .required_unless("FULLHELP")
40 .takes_value(true)
41 .long("port")
42 .help("use port ."),
43 );
44
45 let matches = app.clone().get_matches();
46
47 if matches.is_present("FULLHELP") {
48 app.print_long_help().unwrap();
49 std::process::exit(0);
50 }
51
52 let port1 = matches.value_of("port").unwrap().to_owned();
53 let port2 = port1.clone();
54
55 std::thread::spawn(|| {
56 tokio::runtime::Runtime::new()
57 .unwrap()
58 .block_on(async move {
59 let conn = DisconnectedPacketConn::new(Arc::new(
60 UdpSocket::bind(format!("127.0.0.1:{port1}")).await.unwrap(),
61 ));
62 println!("listening {}...", conn.local_addr().unwrap());
63
64 let config = Config {
65 net_conn: Arc::new(conn),
66 max_receive_buffer_size: 0,
67 max_message_size: 0,
68 name: "recver".to_owned(),
69 };
70 let a = Association::server(config).await?;
71 println!("created a server");
72
73 let stream = a.accept_stream().await.unwrap();
74 println!("accepted a stream");
75
76 // set unordered = true and 10ms treshold for dropping packets
77 stream.set_reliability_params(true, ReliabilityType::Rexmit, 0);
78
79 let mut buff = [0u8; 65535];
80 let mut recv = 0;
81 let mut pkt_num = 0;
82 let mut loop_num = 0;
83 let mut now = tokio::time::Instant::now();
84 while let Ok(n) = stream.read(&mut buff).await {
85 recv += n;
86 if n != 0 {
87 pkt_num += 1;
88 }
89 loop_num += 1;
90 if now.elapsed().as_secs() == 1 {
91 println!("Throughput: {recv} Bytes/s, {pkt_num} pkts, {loop_num} loops");
92 now = tokio::time::Instant::now();
93 recv = 0;
94 loop_num = 0;
95 pkt_num = 0;
96 }
97 }
98 Result::<(), Error>::Ok(())
99 })
100 });
101
102 std::thread::spawn(|| {
103 tokio::runtime::Runtime::new()
104 .unwrap()
105 .block_on(async move {
106 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
107 conn.connect(format!("127.0.0.1:{port2}")).await.unwrap();
108 println!("connecting 127.0.0.1:{port2}..");
109
110 let config = Config {
111 net_conn: conn,
112 max_receive_buffer_size: 0,
113 max_message_size: 0,
114 name: "sender".to_owned(),
115 };
116 let a = Association::client(config).await.unwrap();
117 println!("created a client");
118
119 let stream = a
120 .open_stream(0, PayloadProtocolIdentifier::Binary)
121 .await
122 .unwrap();
123 println!("opened a stream");
124
125 //const LEN: usize = 1200;
126 const LEN: usize = 65535;
127 let buf = vec![0; LEN];
128 let bytes = bytes::Bytes::from(buf);
129
130 let mut now = tokio::time::Instant::now();
131 let mut pkt_num = 0;
132 while stream.write(&bytes).await.is_ok() {
133 pkt_num += 1;
134 if now.elapsed().as_secs() == 1 {
135 println!("Send {pkt_num} pkts");
136 now = tokio::time::Instant::now();
137 pkt_num = 0;
138 }
139 }
140 Result::<(), Error>::Ok(())
141 })
142 });
143 #[allow(clippy::empty_loop)]
144 loop {}
145 }
146