xref: /webrtc/sctp/examples/throughput.rs (revision 5d8fe953)
1 use clap::{App, AppSettings, Arg};
2 use std::io::Write;
3 use std::sync::Arc;
4 use tokio::net::UdpSocket;
5 use util::{conn::conn_disconnected_packet::DisconnectedPacketConn, Conn};
6 use webrtc_sctp::association::*;
7 use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
8 use webrtc_sctp::stream::*;
9 use webrtc_sctp::Error;
10 
main() -> Result<(), Error>11 fn main() -> Result<(), Error> {
12     env_logger::Builder::new()
13         .format(|buf, record| {
14             writeln!(
15                 buf,
16                 "{}:{} [{}] {} - {}",
17                 record.file().unwrap_or("unknown"),
18                 record.line().unwrap_or(0),
19                 record.level(),
20                 chrono::Local::now().format("%H:%M:%S.%6f"),
21                 record.args()
22             )
23         })
24         .filter(None, log::LevelFilter::Warn)
25         .init();
26 
27     let mut app = App::new("SCTP Throughput")
28         .version("0.1.0")
29         .about("An example of SCTP Server")
30         .setting(AppSettings::DeriveDisplayOrder)
31         .setting(AppSettings::SubcommandsNegateReqs)
32         .arg(
33             Arg::with_name("FULLHELP")
34                 .help("Prints more detailed help information")
35                 .long("fullhelp"),
36         )
37         .arg(
38             Arg::with_name("port")
39                 .required_unless("FULLHELP")
40                 .takes_value(true)
41                 .long("port")
42                 .help("use port ."),
43         );
44 
45     let matches = app.clone().get_matches();
46 
47     if matches.is_present("FULLHELP") {
48         app.print_long_help().unwrap();
49         std::process::exit(0);
50     }
51 
52     let port1 = matches.value_of("port").unwrap().to_owned();
53     let port2 = port1.clone();
54 
55     std::thread::spawn(|| {
56         tokio::runtime::Runtime::new()
57             .unwrap()
58             .block_on(async move {
59                 let conn = DisconnectedPacketConn::new(Arc::new(
60                     UdpSocket::bind(format!("127.0.0.1:{port1}")).await.unwrap(),
61                 ));
62                 println!("listening {}...", conn.local_addr().unwrap());
63 
64                 let config = Config {
65                     net_conn: Arc::new(conn),
66                     max_receive_buffer_size: 0,
67                     max_message_size: 0,
68                     name: "recver".to_owned(),
69                 };
70                 let a = Association::server(config).await?;
71                 println!("created a server");
72 
73                 let stream = a.accept_stream().await.unwrap();
74                 println!("accepted a stream");
75 
76                 // set unordered = true and 10ms treshold for dropping packets
77                 stream.set_reliability_params(true, ReliabilityType::Rexmit, 0);
78 
79                 let mut buff = [0u8; 65535];
80                 let mut recv = 0;
81                 let mut pkt_num = 0;
82                 let mut loop_num = 0;
83                 let mut now = tokio::time::Instant::now();
84                 while let Ok(n) = stream.read(&mut buff).await {
85                     recv += n;
86                     if n != 0 {
87                         pkt_num += 1;
88                     }
89                     loop_num += 1;
90                     if now.elapsed().as_secs() == 1 {
91                         println!("Throughput: {recv} Bytes/s, {pkt_num} pkts, {loop_num} loops");
92                         now = tokio::time::Instant::now();
93                         recv = 0;
94                         loop_num = 0;
95                         pkt_num = 0;
96                     }
97                 }
98                 Result::<(), Error>::Ok(())
99             })
100     });
101 
102     std::thread::spawn(|| {
103         tokio::runtime::Runtime::new()
104             .unwrap()
105             .block_on(async move {
106                 let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
107                 conn.connect(format!("127.0.0.1:{port2}")).await.unwrap();
108                 println!("connecting 127.0.0.1:{port2}..");
109 
110                 let config = Config {
111                     net_conn: conn,
112                     max_receive_buffer_size: 0,
113                     max_message_size: 0,
114                     name: "sender".to_owned(),
115                 };
116                 let a = Association::client(config).await.unwrap();
117                 println!("created a client");
118 
119                 let stream = a
120                     .open_stream(0, PayloadProtocolIdentifier::Binary)
121                     .await
122                     .unwrap();
123                 println!("opened a stream");
124 
125                 //const LEN: usize = 1200;
126                 const LEN: usize = 65535;
127                 let buf = vec![0; LEN];
128                 let bytes = bytes::Bytes::from(buf);
129 
130                 let mut now = tokio::time::Instant::now();
131                 let mut pkt_num = 0;
132                 while stream.write(&bytes).await.is_ok() {
133                     pkt_num += 1;
134                     if now.elapsed().as_secs() == 1 {
135                         println!("Send {pkt_num} pkts");
136                         now = tokio::time::Instant::now();
137                         pkt_num = 0;
138                     }
139                 }
140                 Result::<(), Error>::Ok(())
141             })
142     });
143     #[allow(clippy::empty_loop)]
144     loop {}
145 }
146