xref: /tonic/tests/compression/src/lib.rs (revision 23602b47)
10583cff8SDavid Pedersen #![allow(unused_imports)]
20583cff8SDavid Pedersen 
30583cff8SDavid Pedersen use self::util::*;
4d4973642SLucio Franco use crate::util::mock_io_channel;
50583cff8SDavid Pedersen use std::{
60583cff8SDavid Pedersen     pin::Pin,
70583cff8SDavid Pedersen     sync::{
80583cff8SDavid Pedersen         atomic::{AtomicUsize, Ordering::SeqCst},
90583cff8SDavid Pedersen         Arc,
100583cff8SDavid Pedersen     },
110583cff8SDavid Pedersen };
120583cff8SDavid Pedersen use tokio::net::TcpListener;
13*23602b47Stottoto use tokio_stream::{Stream, StreamExt};
140583cff8SDavid Pedersen use tonic::{
150583cff8SDavid Pedersen     transport::{Channel, Endpoint, Server, Uri},
160583cff8SDavid Pedersen     Request, Response, Status, Streaming,
170583cff8SDavid Pedersen };
180583cff8SDavid Pedersen use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder};
190583cff8SDavid Pedersen use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer};
200583cff8SDavid Pedersen 
210583cff8SDavid Pedersen mod bidirectional_stream;
220583cff8SDavid Pedersen mod client_stream;
230583cff8SDavid Pedersen mod compressing_request;
240583cff8SDavid Pedersen mod compressing_response;
250583cff8SDavid Pedersen mod server_stream;
260583cff8SDavid Pedersen mod util;
270583cff8SDavid Pedersen 
280583cff8SDavid Pedersen tonic::include_proto!("test");
290583cff8SDavid Pedersen 
30366d888aStottoto #[derive(Debug, Default)]
310583cff8SDavid Pedersen struct Svc {
320583cff8SDavid Pedersen     disable_compressing_on_response: bool,
330583cff8SDavid Pedersen }
340583cff8SDavid Pedersen 
350583cff8SDavid Pedersen const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024;
360583cff8SDavid Pedersen 
370583cff8SDavid Pedersen impl Svc {
prepare_response<B>(&self, mut res: Response<B>) -> Response<B>380583cff8SDavid Pedersen     fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> {
390583cff8SDavid Pedersen         if self.disable_compressing_on_response {
400583cff8SDavid Pedersen             res.disable_compression();
410583cff8SDavid Pedersen         }
420583cff8SDavid Pedersen 
430583cff8SDavid Pedersen         res
440583cff8SDavid Pedersen     }
450583cff8SDavid Pedersen }
460583cff8SDavid Pedersen 
470583cff8SDavid Pedersen #[tonic::async_trait]
480583cff8SDavid Pedersen impl test_server::Test for Svc {
compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status>490583cff8SDavid Pedersen     async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> {
500583cff8SDavid Pedersen         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
510583cff8SDavid Pedersen 
520583cff8SDavid Pedersen         Ok(self.prepare_response(Response::new(SomeData {
530583cff8SDavid Pedersen             data: data.to_vec(),
540583cff8SDavid Pedersen         })))
550583cff8SDavid Pedersen     }
560583cff8SDavid Pedersen 
compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status>570583cff8SDavid Pedersen     async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> {
580583cff8SDavid Pedersen         assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE);
590583cff8SDavid Pedersen         Ok(Response::new(()))
600583cff8SDavid Pedersen     }
610583cff8SDavid Pedersen 
620583cff8SDavid Pedersen     type CompressOutputServerStreamStream =
6323c1392fSLucio Franco         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>;
640583cff8SDavid Pedersen 
compress_output_server_stream( &self, _req: Request<()>, ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status>650583cff8SDavid Pedersen     async fn compress_output_server_stream(
660583cff8SDavid Pedersen         &self,
670583cff8SDavid Pedersen         _req: Request<()>,
680583cff8SDavid Pedersen     ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
690583cff8SDavid Pedersen         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
70*23602b47Stottoto         let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
710583cff8SDavid Pedersen             .take(2)
720583cff8SDavid Pedersen             .map(Ok::<_, Status>);
730583cff8SDavid Pedersen         Ok(self.prepare_response(Response::new(Box::pin(stream))))
740583cff8SDavid Pedersen     }
750583cff8SDavid Pedersen 
compress_input_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<()>, Status>760583cff8SDavid Pedersen     async fn compress_input_client_stream(
770583cff8SDavid Pedersen         &self,
780583cff8SDavid Pedersen         req: Request<Streaming<SomeData>>,
790583cff8SDavid Pedersen     ) -> Result<Response<()>, Status> {
800583cff8SDavid Pedersen         let mut stream = req.into_inner();
810583cff8SDavid Pedersen         while let Some(item) = stream.next().await {
820583cff8SDavid Pedersen             item.unwrap();
830583cff8SDavid Pedersen         }
840583cff8SDavid Pedersen         Ok(self.prepare_response(Response::new(())))
850583cff8SDavid Pedersen     }
860583cff8SDavid Pedersen 
compress_output_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<SomeData>, Status>870583cff8SDavid Pedersen     async fn compress_output_client_stream(
880583cff8SDavid Pedersen         &self,
890583cff8SDavid Pedersen         req: Request<Streaming<SomeData>>,
900583cff8SDavid Pedersen     ) -> Result<Response<SomeData>, Status> {
910583cff8SDavid Pedersen         let mut stream = req.into_inner();
920583cff8SDavid Pedersen         while let Some(item) = stream.next().await {
930583cff8SDavid Pedersen             item.unwrap();
940583cff8SDavid Pedersen         }
950583cff8SDavid Pedersen 
960583cff8SDavid Pedersen         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE];
970583cff8SDavid Pedersen 
980583cff8SDavid Pedersen         Ok(self.prepare_response(Response::new(SomeData {
990583cff8SDavid Pedersen             data: data.to_vec(),
1000583cff8SDavid Pedersen         })))
1010583cff8SDavid Pedersen     }
1020583cff8SDavid Pedersen 
1030583cff8SDavid Pedersen     type CompressInputOutputBidirectionalStreamStream =
10423c1392fSLucio Franco         Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>;
1050583cff8SDavid Pedersen 
compress_input_output_bidirectional_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status>1060583cff8SDavid Pedersen     async fn compress_input_output_bidirectional_stream(
1070583cff8SDavid Pedersen         &self,
1080583cff8SDavid Pedersen         req: Request<Streaming<SomeData>>,
1090583cff8SDavid Pedersen     ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> {
1100583cff8SDavid Pedersen         let mut stream = req.into_inner();
1110583cff8SDavid Pedersen         while let Some(item) = stream.next().await {
1120583cff8SDavid Pedersen             item.unwrap();
1130583cff8SDavid Pedersen         }
1140583cff8SDavid Pedersen 
1150583cff8SDavid Pedersen         let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
116*23602b47Stottoto         let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
1170583cff8SDavid Pedersen             .take(2)
1180583cff8SDavid Pedersen             .map(Ok::<_, Status>);
1190583cff8SDavid Pedersen         Ok(self.prepare_response(Response::new(Box::pin(stream))))
1200583cff8SDavid Pedersen     }
1210583cff8SDavid Pedersen }
122