xref: /tonic/tests/compression/src/lib.rs (revision bbcacd06)
1 #![allow(unused_imports)]
2 
3 use self::util::*;
4 use crate::util::{mock_io_channel, MockStream};
5 use futures::{Stream, StreamExt};
6 use std::convert::TryFrom;
7 use std::{
8     pin::Pin,
9     sync::{
10         atomic::{AtomicUsize, Ordering::SeqCst},
11         Arc,
12     },
13 };
14 use tokio::net::TcpListener;
15 use tonic::{
16     transport::{Channel, Endpoint, Server, Uri},
17     Request, Response, Status, Streaming,
18 };
19 use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder};
20 use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer};
21 
22 mod bidirectional_stream;
23 mod client_stream;
24 mod compressing_request;
25 mod compressing_response;
26 mod server_stream;
27 mod util;
28 
29 tonic::include_proto!("test");
30 
31 #[derive(Debug)]
32 struct Svc {
33     disable_compressing_on_response: bool,
34 }
35 
36 impl Default for Svc {
37     fn default() -> Self {
38         Self {
39             disable_compressing_on_response: false,
40         }
41     }
42 }
43 
44 const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024;
45 
46 impl Svc {
47     fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> {
48         if self.disable_compressing_on_response {
49             res.disable_compression();
50         }
51 
52         res
53     }
54 }
55 
56 #[tonic::async_trait]
57 impl test_server::Test for Svc {
58     async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> {
59         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
60 
61         Ok(self.prepare_response(Response::new(SomeData {
62             data: data.to_vec(),
63         })))
64     }
65 
66     async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> {
67         assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE);
68         Ok(Response::new(()))
69     }
70 
71     type CompressOutputServerStreamStream =
72         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + Sync + 'static>>;
73 
74     async fn compress_output_server_stream(
75         &self,
76         _req: Request<()>,
77     ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
78         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
79         let stream = futures::stream::repeat(SomeData { data })
80             .take(2)
81             .map(Ok::<_, Status>);
82         Ok(self.prepare_response(Response::new(Box::pin(stream))))
83     }
84 
85     async fn compress_input_client_stream(
86         &self,
87         req: Request<Streaming<SomeData>>,
88     ) -> Result<Response<()>, Status> {
89         let mut stream = req.into_inner();
90         while let Some(item) = stream.next().await {
91             item.unwrap();
92         }
93         Ok(self.prepare_response(Response::new(())))
94     }
95 
96     async fn compress_output_client_stream(
97         &self,
98         req: Request<Streaming<SomeData>>,
99     ) -> Result<Response<SomeData>, Status> {
100         let mut stream = req.into_inner();
101         while let Some(item) = stream.next().await {
102             item.unwrap();
103         }
104 
105         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
106 
107         Ok(self.prepare_response(Response::new(SomeData {
108             data: data.to_vec(),
109         })))
110     }
111 
112     type CompressInputOutputBidirectionalStreamStream =
113         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + Sync + 'static>>;
114 
115     async fn compress_input_output_bidirectional_stream(
116         &self,
117         req: Request<Streaming<SomeData>>,
118     ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> {
119         let mut stream = req.into_inner();
120         while let Some(item) = stream.next().await {
121             item.unwrap();
122         }
123 
124         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
125         let stream = futures::stream::repeat(SomeData { data })
126             .take(2)
127             .map(Ok::<_, Status>);
128         Ok(self.prepare_response(Response::new(Box::pin(stream))))
129     }
130 }
131