1 #![allow(missing_docs)]
2
3 use bencher::{benchmark_group, benchmark_main, Bencher};
4 use bytes::{Buf, BufMut, Bytes, BytesMut};
5 use http_body::{Body, Frame, SizeHint};
6 use std::{
7 fmt::{Error, Formatter},
8 pin::Pin,
9 task::{Context, Poll},
10 };
11 use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};
12
13 macro_rules! bench {
14 ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
15 fn $name(b: &mut Bencher) {
16 let rt = tokio::runtime::Builder::new_multi_thread()
17 .build()
18 .expect("runtime");
19
20 let payload = make_payload($message_size, $message_count);
21 let body = MockBody::new(payload, $chunk_size);
22 b.bytes = body.len() as u64;
23
24 b.iter(|| {
25 rt.block_on(async {
26 let decoder = MockDecoder::new($message_size);
27 let mut stream = Streaming::new_request(decoder, body.clone(), None, None);
28
29 let mut count = 0;
30 while let Some(msg) = stream.message().await.unwrap() {
31 assert_eq!($message_size, msg.len());
32 count += 1;
33 }
34
35 assert_eq!(count, $message_count);
36 assert!(stream.trailers().await.unwrap().is_none());
37 })
38 })
39 }
40 };
41 }
42
43 #[derive(Clone)]
44 struct MockBody {
45 data: Bytes,
46 chunk_size: usize,
47 }
48
49 impl MockBody {
new(data: Bytes, chunk_size: usize) -> Self50 fn new(data: Bytes, chunk_size: usize) -> Self {
51 MockBody { data, chunk_size }
52 }
53
len(&self) -> usize54 fn len(&self) -> usize {
55 self.data.len()
56 }
57 }
58
59 impl Body for MockBody {
60 type Data = Bytes;
61 type Error = Status;
62
poll_frame( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>63 fn poll_frame(
64 mut self: Pin<&mut Self>,
65 _cx: &mut Context<'_>,
66 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
67 if self.data.has_remaining() {
68 let split = std::cmp::min(self.chunk_size, self.data.remaining());
69 Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split)))))
70 } else {
71 Poll::Ready(None)
72 }
73 }
74
is_end_stream(&self) -> bool75 fn is_end_stream(&self) -> bool {
76 !self.data.is_empty()
77 }
78
size_hint(&self) -> SizeHint79 fn size_hint(&self) -> SizeHint {
80 SizeHint::with_exact(self.data.len() as u64)
81 }
82 }
83
84 impl std::fmt::Debug for MockBody {
fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>85 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
86 let sample = self.data.iter().take(10).collect::<Vec<_>>();
87 write!(f, "{:?}...({})", sample, self.data.len())
88 }
89 }
90
91 #[derive(Debug, Clone)]
92 struct MockDecoder {
93 message_size: usize,
94 }
95
96 impl MockDecoder {
new(message_size: usize) -> Self97 fn new(message_size: usize) -> Self {
98 MockDecoder { message_size }
99 }
100 }
101
102 impl Decoder for MockDecoder {
103 type Item = Vec<u8>;
104 type Error = Status;
105
decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>106 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
107 let out = Vec::from(buf.chunk());
108 buf.advance(self.message_size);
109 Ok(Some(out))
110 }
111 }
112
make_payload(message_length: usize, message_count: usize) -> Bytes113 fn make_payload(message_length: usize, message_count: usize) -> Bytes {
114 let mut buf = BytesMut::new();
115
116 for _ in 0..message_count {
117 let msg = vec![97u8; message_length];
118 buf.reserve(msg.len() + 5);
119 buf.put_u8(0);
120 buf.put_u32(msg.len() as u32);
121 buf.put(&msg[..]);
122 }
123
124 buf.freeze()
125 }
126
127 // change body chunk size only
128 bench!(chunk_size_100, 1_000, 100, 1);
129 bench!(chunk_size_500, 1_000, 500, 1);
130 bench!(chunk_size_1005, 1_000, 1_005, 1);
131
132 // change message size only
133 bench!(message_size_1k, 1_000, 1_005, 2);
134 bench!(message_size_5k, 5_000, 1_005, 2);
135 bench!(message_size_10k, 10_000, 1_005, 2);
136
137 // change message count only
138 bench!(message_count_1, 500, 505, 1);
139 bench!(message_count_10, 500, 505, 10);
140 bench!(message_count_20, 500, 505, 20);
141
142 benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);
143
144 benchmark_group!(
145 message_size,
146 message_size_1k,
147 message_size_5k,
148 message_size_10k
149 );
150
151 benchmark_group!(
152 message_count,
153 message_count_1,
154 message_count_10,
155 message_count_20
156 );
157
158 benchmark_main!(chunk_size, message_size, message_count);
159