10583cff8SDavid Pedersen #![allow(unused_imports)] 20583cff8SDavid Pedersen 30583cff8SDavid Pedersen use self::util::*; 4d4973642SLucio Franco use crate::util::mock_io_channel; 50583cff8SDavid Pedersen use std::{ 60583cff8SDavid Pedersen pin::Pin, 70583cff8SDavid Pedersen sync::{ 80583cff8SDavid Pedersen atomic::{AtomicUsize, Ordering::SeqCst}, 90583cff8SDavid Pedersen Arc, 100583cff8SDavid Pedersen }, 110583cff8SDavid Pedersen }; 120583cff8SDavid Pedersen use tokio::net::TcpListener; 13*23602b47Stottoto use tokio_stream::{Stream, StreamExt}; 140583cff8SDavid Pedersen use tonic::{ 150583cff8SDavid Pedersen transport::{Channel, Endpoint, Server, Uri}, 160583cff8SDavid Pedersen Request, Response, Status, Streaming, 170583cff8SDavid Pedersen }; 180583cff8SDavid Pedersen use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder}; 190583cff8SDavid Pedersen use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer}; 200583cff8SDavid Pedersen 210583cff8SDavid Pedersen mod bidirectional_stream; 220583cff8SDavid Pedersen mod client_stream; 230583cff8SDavid Pedersen mod compressing_request; 240583cff8SDavid Pedersen mod compressing_response; 250583cff8SDavid Pedersen mod server_stream; 260583cff8SDavid Pedersen mod util; 270583cff8SDavid Pedersen 280583cff8SDavid Pedersen tonic::include_proto!("test"); 290583cff8SDavid Pedersen 30366d888aStottoto #[derive(Debug, Default)] 310583cff8SDavid Pedersen struct Svc { 320583cff8SDavid Pedersen disable_compressing_on_response: bool, 330583cff8SDavid Pedersen } 340583cff8SDavid Pedersen 350583cff8SDavid Pedersen const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024; 360583cff8SDavid Pedersen 370583cff8SDavid Pedersen impl Svc { prepare_response<B>(&self, mut res: Response<B>) -> Response<B>380583cff8SDavid Pedersen fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> { 390583cff8SDavid Pedersen if self.disable_compressing_on_response { 400583cff8SDavid Pedersen res.disable_compression(); 410583cff8SDavid Pedersen } 420583cff8SDavid Pedersen 430583cff8SDavid Pedersen res 440583cff8SDavid Pedersen } 450583cff8SDavid Pedersen } 460583cff8SDavid Pedersen 470583cff8SDavid Pedersen #[tonic::async_trait] 480583cff8SDavid Pedersen impl test_server::Test for Svc { compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status>490583cff8SDavid Pedersen async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> { 500583cff8SDavid Pedersen let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 510583cff8SDavid Pedersen 520583cff8SDavid Pedersen Ok(self.prepare_response(Response::new(SomeData { 530583cff8SDavid Pedersen data: data.to_vec(), 540583cff8SDavid Pedersen }))) 550583cff8SDavid Pedersen } 560583cff8SDavid Pedersen compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status>570583cff8SDavid Pedersen async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> { 580583cff8SDavid Pedersen assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE); 590583cff8SDavid Pedersen Ok(Response::new(())) 600583cff8SDavid Pedersen } 610583cff8SDavid Pedersen 620583cff8SDavid Pedersen type CompressOutputServerStreamStream = 6323c1392fSLucio Franco Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>; 640583cff8SDavid Pedersen compress_output_server_stream( &self, _req: Request<()>, ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status>650583cff8SDavid Pedersen async fn compress_output_server_stream( 660583cff8SDavid Pedersen &self, 670583cff8SDavid Pedersen _req: Request<()>, 680583cff8SDavid Pedersen ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> { 690583cff8SDavid Pedersen let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 70*23602b47Stottoto let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) 710583cff8SDavid Pedersen .take(2) 720583cff8SDavid Pedersen .map(Ok::<_, Status>); 730583cff8SDavid Pedersen Ok(self.prepare_response(Response::new(Box::pin(stream)))) 740583cff8SDavid Pedersen } 750583cff8SDavid Pedersen compress_input_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<()>, Status>760583cff8SDavid Pedersen async fn compress_input_client_stream( 770583cff8SDavid Pedersen &self, 780583cff8SDavid Pedersen req: Request<Streaming<SomeData>>, 790583cff8SDavid Pedersen ) -> Result<Response<()>, Status> { 800583cff8SDavid Pedersen let mut stream = req.into_inner(); 810583cff8SDavid Pedersen while let Some(item) = stream.next().await { 820583cff8SDavid Pedersen item.unwrap(); 830583cff8SDavid Pedersen } 840583cff8SDavid Pedersen Ok(self.prepare_response(Response::new(()))) 850583cff8SDavid Pedersen } 860583cff8SDavid Pedersen compress_output_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<SomeData>, Status>870583cff8SDavid Pedersen async fn compress_output_client_stream( 880583cff8SDavid Pedersen &self, 890583cff8SDavid Pedersen req: Request<Streaming<SomeData>>, 900583cff8SDavid Pedersen ) -> Result<Response<SomeData>, Status> { 910583cff8SDavid Pedersen let mut stream = req.into_inner(); 920583cff8SDavid Pedersen while let Some(item) = stream.next().await { 930583cff8SDavid Pedersen item.unwrap(); 940583cff8SDavid Pedersen } 950583cff8SDavid Pedersen 960583cff8SDavid Pedersen let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 970583cff8SDavid Pedersen 980583cff8SDavid Pedersen Ok(self.prepare_response(Response::new(SomeData { 990583cff8SDavid Pedersen data: data.to_vec(), 1000583cff8SDavid Pedersen }))) 1010583cff8SDavid Pedersen } 1020583cff8SDavid Pedersen 1030583cff8SDavid Pedersen type CompressInputOutputBidirectionalStreamStream = 10423c1392fSLucio Franco Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>; 1050583cff8SDavid Pedersen compress_input_output_bidirectional_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status>1060583cff8SDavid Pedersen async fn compress_input_output_bidirectional_stream( 1070583cff8SDavid Pedersen &self, 1080583cff8SDavid Pedersen req: Request<Streaming<SomeData>>, 1090583cff8SDavid Pedersen ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> { 1100583cff8SDavid Pedersen let mut stream = req.into_inner(); 1110583cff8SDavid Pedersen while let Some(item) = stream.next().await { 1120583cff8SDavid Pedersen item.unwrap(); 1130583cff8SDavid Pedersen } 1140583cff8SDavid Pedersen 1150583cff8SDavid Pedersen let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 116*23602b47Stottoto let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) 1170583cff8SDavid Pedersen .take(2) 1180583cff8SDavid Pedersen .map(Ok::<_, Status>); 1190583cff8SDavid Pedersen Ok(self.prepare_response(Response::new(Box::pin(stream)))) 1200583cff8SDavid Pedersen } 1210583cff8SDavid Pedersen } 122