xref: /tonic/tonic/benches/decode.rs (revision 5e9a5bcd)
1 #![allow(missing_docs)]
2 
3 use bencher::{benchmark_group, benchmark_main, Bencher};
4 use bytes::{Buf, BufMut, Bytes, BytesMut};
5 use http_body::{Body, Frame, SizeHint};
6 use std::{
7     fmt::{Error, Formatter},
8     pin::Pin,
9     task::{Context, Poll},
10 };
11 use tonic::{codec::DecodeBuf, codec::Decoder, Status, Streaming};
12 
13 macro_rules! bench {
14     ($name:ident, $message_size:expr, $chunk_size:expr, $message_count:expr) => {
15         fn $name(b: &mut Bencher) {
16             let rt = tokio::runtime::Builder::new_multi_thread()
17                 .build()
18                 .expect("runtime");
19 
20             let payload = make_payload($message_size, $message_count);
21             let body = MockBody::new(payload, $chunk_size);
22             b.bytes = body.len() as u64;
23 
24             b.iter(|| {
25                 rt.block_on(async {
26                     let decoder = MockDecoder::new($message_size);
27                     let mut stream = Streaming::new_request(decoder, body.clone(), None, None);
28 
29                     let mut count = 0;
30                     while let Some(msg) = stream.message().await.unwrap() {
31                         assert_eq!($message_size, msg.len());
32                         count += 1;
33                     }
34 
35                     assert_eq!(count, $message_count);
36                     assert!(stream.trailers().await.unwrap().is_none());
37                 })
38             })
39         }
40     };
41 }
42 
43 #[derive(Clone)]
44 struct MockBody {
45     data: Bytes,
46     chunk_size: usize,
47 }
48 
49 impl MockBody {
new(data: Bytes, chunk_size: usize) -> Self50     fn new(data: Bytes, chunk_size: usize) -> Self {
51         MockBody { data, chunk_size }
52     }
53 
len(&self) -> usize54     fn len(&self) -> usize {
55         self.data.len()
56     }
57 }
58 
59 impl Body for MockBody {
60     type Data = Bytes;
61     type Error = Status;
62 
poll_frame( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>63     fn poll_frame(
64         mut self: Pin<&mut Self>,
65         _cx: &mut Context<'_>,
66     ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
67         if self.data.has_remaining() {
68             let split = std::cmp::min(self.chunk_size, self.data.remaining());
69             Poll::Ready(Some(Ok(Frame::data(self.data.split_to(split)))))
70         } else {
71             Poll::Ready(None)
72         }
73     }
74 
is_end_stream(&self) -> bool75     fn is_end_stream(&self) -> bool {
76         !self.data.is_empty()
77     }
78 
size_hint(&self) -> SizeHint79     fn size_hint(&self) -> SizeHint {
80         SizeHint::with_exact(self.data.len() as u64)
81     }
82 }
83 
84 impl std::fmt::Debug for MockBody {
fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>85     fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
86         let sample = self.data.iter().take(10).collect::<Vec<_>>();
87         write!(f, "{:?}...({})", sample, self.data.len())
88     }
89 }
90 
91 #[derive(Debug, Clone)]
92 struct MockDecoder {
93     message_size: usize,
94 }
95 
96 impl MockDecoder {
new(message_size: usize) -> Self97     fn new(message_size: usize) -> Self {
98         MockDecoder { message_size }
99     }
100 }
101 
102 impl Decoder for MockDecoder {
103     type Item = Vec<u8>;
104     type Error = Status;
105 
decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error>106     fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
107         let out = Vec::from(buf.chunk());
108         buf.advance(self.message_size);
109         Ok(Some(out))
110     }
111 }
112 
make_payload(message_length: usize, message_count: usize) -> Bytes113 fn make_payload(message_length: usize, message_count: usize) -> Bytes {
114     let mut buf = BytesMut::new();
115 
116     for _ in 0..message_count {
117         let msg = vec![97u8; message_length];
118         buf.reserve(msg.len() + 5);
119         buf.put_u8(0);
120         buf.put_u32(msg.len() as u32);
121         buf.put(&msg[..]);
122     }
123 
124     buf.freeze()
125 }
126 
127 // change body chunk size only
128 bench!(chunk_size_100, 1_000, 100, 1);
129 bench!(chunk_size_500, 1_000, 500, 1);
130 bench!(chunk_size_1005, 1_000, 1_005, 1);
131 
132 // change message size only
133 bench!(message_size_1k, 1_000, 1_005, 2);
134 bench!(message_size_5k, 5_000, 1_005, 2);
135 bench!(message_size_10k, 10_000, 1_005, 2);
136 
137 // change message count only
138 bench!(message_count_1, 500, 505, 1);
139 bench!(message_count_10, 500, 505, 10);
140 bench!(message_count_20, 500, 505, 20);
141 
142 benchmark_group!(chunk_size, chunk_size_100, chunk_size_500, chunk_size_1005);
143 
144 benchmark_group!(
145     message_size,
146     message_size_1k,
147     message_size_5k,
148     message_size_10k
149 );
150 
151 benchmark_group!(
152     message_count,
153     message_count_1,
154     message_count_10,
155     message_count_20
156 );
157 
158 benchmark_main!(chunk_size, message_size, message_count);
159