xref: /tonic/tests/compression/src/lib.rs (revision 23602b47)
1 #![allow(unused_imports)]
2 
3 use self::util::*;
4 use crate::util::mock_io_channel;
5 use std::{
6     pin::Pin,
7     sync::{
8         atomic::{AtomicUsize, Ordering::SeqCst},
9         Arc,
10     },
11 };
12 use tokio::net::TcpListener;
13 use tokio_stream::{Stream, StreamExt};
14 use tonic::{
15     transport::{Channel, Endpoint, Server, Uri},
16     Request, Response, Status, Streaming,
17 };
18 use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder};
19 use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer};
20 
21 mod bidirectional_stream;
22 mod client_stream;
23 mod compressing_request;
24 mod compressing_response;
25 mod server_stream;
26 mod util;
27 
28 tonic::include_proto!("test");
29 
30 #[derive(Debug, Default)]
31 struct Svc {
32     disable_compressing_on_response: bool,
33 }
34 
35 const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024;
36 
37 impl Svc {
prepare_response<B>(&self, mut res: Response<B>) -> Response<B>38     fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> {
39         if self.disable_compressing_on_response {
40             res.disable_compression();
41         }
42 
43         res
44     }
45 }
46 
47 #[tonic::async_trait]
48 impl test_server::Test for Svc {
compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status>49     async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> {
50         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
51 
52         Ok(self.prepare_response(Response::new(SomeData {
53             data: data.to_vec(),
54         })))
55     }
56 
compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status>57     async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> {
58         assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE);
59         Ok(Response::new(()))
60     }
61 
62     type CompressOutputServerStreamStream =
63         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>;
64 
compress_output_server_stream( &self, _req: Request<()>, ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status>65     async fn compress_output_server_stream(
66         &self,
67         _req: Request<()>,
68     ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
69         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
70         let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
71             .take(2)
72             .map(Ok::<_, Status>);
73         Ok(self.prepare_response(Response::new(Box::pin(stream))))
74     }
75 
compress_input_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<()>, Status>76     async fn compress_input_client_stream(
77         &self,
78         req: Request<Streaming<SomeData>>,
79     ) -> Result<Response<()>, Status> {
80         let mut stream = req.into_inner();
81         while let Some(item) = stream.next().await {
82             item.unwrap();
83         }
84         Ok(self.prepare_response(Response::new(())))
85     }
86 
compress_output_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<SomeData>, Status>87     async fn compress_output_client_stream(
88         &self,
89         req: Request<Streaming<SomeData>>,
90     ) -> Result<Response<SomeData>, Status> {
91         let mut stream = req.into_inner();
92         while let Some(item) = stream.next().await {
93             item.unwrap();
94         }
95 
96         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
97 
98         Ok(self.prepare_response(Response::new(SomeData {
99             data: data.to_vec(),
100         })))
101     }
102 
103     type CompressInputOutputBidirectionalStreamStream =
104         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>;
105 
compress_input_output_bidirectional_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status>106     async fn compress_input_output_bidirectional_stream(
107         &self,
108         req: Request<Streaming<SomeData>>,
109     ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> {
110         let mut stream = req.into_inner();
111         while let Some(item) = stream.next().await {
112             item.unwrap();
113         }
114 
115         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
116         let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
117             .take(2)
118             .map(Ok::<_, Status>);
119         Ok(self.prepare_response(Response::new(Box::pin(stream))))
120     }
121 }
122