1 #![allow(unused_imports)] 2 3 use self::util::*; 4 use crate::util::mock_io_channel; 5 use std::{ 6 pin::Pin, 7 sync::{ 8 atomic::{AtomicUsize, Ordering::SeqCst}, 9 Arc, 10 }, 11 }; 12 use tokio::net::TcpListener; 13 use tokio_stream::{Stream, StreamExt}; 14 use tonic::{ 15 transport::{Channel, Endpoint, Server, Uri}, 16 Request, Response, Status, Streaming, 17 }; 18 use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder}; 19 use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer}; 20 21 mod bidirectional_stream; 22 mod client_stream; 23 mod compressing_request; 24 mod compressing_response; 25 mod server_stream; 26 mod util; 27 28 tonic::include_proto!("test"); 29 30 #[derive(Debug, Default)] 31 struct Svc { 32 disable_compressing_on_response: bool, 33 } 34 35 const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024; 36 37 impl Svc { prepare_response<B>(&self, mut res: Response<B>) -> Response<B>38 fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> { 39 if self.disable_compressing_on_response { 40 res.disable_compression(); 41 } 42 43 res 44 } 45 } 46 47 #[tonic::async_trait] 48 impl test_server::Test for Svc { compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status>49 async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> { 50 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 51 52 Ok(self.prepare_response(Response::new(SomeData { 53 data: data.to_vec(), 54 }))) 55 } 56 compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status>57 async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> { 58 assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE); 59 Ok(Response::new(())) 60 } 61 62 type CompressOutputServerStreamStream = 63 Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>; 64 compress_output_server_stream( &self, _req: Request<()>, ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status>65 async fn compress_output_server_stream( 66 &self, 67 _req: Request<()>, 68 ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> { 69 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 70 let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) 71 .take(2) 72 .map(Ok::<_, Status>); 73 Ok(self.prepare_response(Response::new(Box::pin(stream)))) 74 } 75 compress_input_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<()>, Status>76 async fn compress_input_client_stream( 77 &self, 78 req: Request<Streaming<SomeData>>, 79 ) -> Result<Response<()>, Status> { 80 let mut stream = req.into_inner(); 81 while let Some(item) = stream.next().await { 82 item.unwrap(); 83 } 84 Ok(self.prepare_response(Response::new(()))) 85 } 86 compress_output_client_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<SomeData>, Status>87 async fn compress_output_client_stream( 88 &self, 89 req: Request<Streaming<SomeData>>, 90 ) -> Result<Response<SomeData>, Status> { 91 let mut stream = req.into_inner(); 92 while let Some(item) = stream.next().await { 93 item.unwrap(); 94 } 95 96 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 97 98 Ok(self.prepare_response(Response::new(SomeData { 99 data: data.to_vec(), 100 }))) 101 } 102 103 type CompressInputOutputBidirectionalStreamStream = 104 Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + 'static>>; 105 compress_input_output_bidirectional_stream( &self, req: Request<Streaming<SomeData>>, ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status>106 async fn compress_input_output_bidirectional_stream( 107 &self, 108 req: Request<Streaming<SomeData>>, 109 ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> { 110 let mut stream = req.into_inner(); 111 while let Some(item) = stream.next().await { 112 item.unwrap(); 113 } 114 115 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 116 let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) 117 .take(2) 118 .map(Ok::<_, Status>); 119 Ok(self.prepare_response(Response::new(Box::pin(stream)))) 120 } 121 } 122