1 #![allow(unused_imports)] 2 3 use self::util::*; 4 use crate::util::{mock_io_channel, MockStream}; 5 use futures::{Stream, StreamExt}; 6 use std::convert::TryFrom; 7 use std::{ 8 pin::Pin, 9 sync::{ 10 atomic::{AtomicUsize, Ordering::SeqCst}, 11 Arc, 12 }, 13 }; 14 use tokio::net::TcpListener; 15 use tonic::{ 16 transport::{Channel, Endpoint, Server, Uri}, 17 Request, Response, Status, Streaming, 18 }; 19 use tower::{layer::layer_fn, service_fn, Service, ServiceBuilder}; 20 use tower_http::{map_request_body::MapRequestBodyLayer, map_response_body::MapResponseBodyLayer}; 21 22 mod bidirectional_stream; 23 mod client_stream; 24 mod compressing_request; 25 mod compressing_response; 26 mod server_stream; 27 mod util; 28 29 tonic::include_proto!("test"); 30 31 #[derive(Debug)] 32 struct Svc { 33 disable_compressing_on_response: bool, 34 } 35 36 impl Default for Svc { 37 fn default() -> Self { 38 Self { 39 disable_compressing_on_response: false, 40 } 41 } 42 } 43 44 const UNCOMPRESSED_MIN_BODY_SIZE: usize = 1024; 45 46 impl Svc { 47 fn prepare_response<B>(&self, mut res: Response<B>) -> Response<B> { 48 if self.disable_compressing_on_response { 49 res.disable_compression(); 50 } 51 52 res 53 } 54 } 55 56 #[tonic::async_trait] 57 impl test_server::Test for Svc { 58 async fn compress_output_unary(&self, _req: Request<()>) -> Result<Response<SomeData>, Status> { 59 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 60 61 Ok(self.prepare_response(Response::new(SomeData { 62 data: data.to_vec(), 63 }))) 64 } 65 66 async fn compress_input_unary(&self, req: Request<SomeData>) -> Result<Response<()>, Status> { 67 assert_eq!(req.into_inner().data.len(), UNCOMPRESSED_MIN_BODY_SIZE); 68 Ok(Response::new(())) 69 } 70 71 type CompressOutputServerStreamStream = 72 Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + Sync + 'static>>; 73 74 async fn compress_output_server_stream( 75 &self, 76 _req: Request<()>, 77 ) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> { 78 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 79 let stream = futures::stream::repeat(SomeData { data }) 80 .take(2) 81 .map(Ok::<_, Status>); 82 Ok(self.prepare_response(Response::new(Box::pin(stream)))) 83 } 84 85 async fn compress_input_client_stream( 86 &self, 87 req: Request<Streaming<SomeData>>, 88 ) -> Result<Response<()>, Status> { 89 let mut stream = req.into_inner(); 90 while let Some(item) = stream.next().await { 91 item.unwrap(); 92 } 93 Ok(self.prepare_response(Response::new(()))) 94 } 95 96 async fn compress_output_client_stream( 97 &self, 98 req: Request<Streaming<SomeData>>, 99 ) -> Result<Response<SomeData>, Status> { 100 let mut stream = req.into_inner(); 101 while let Some(item) = stream.next().await { 102 item.unwrap(); 103 } 104 105 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE]; 106 107 Ok(self.prepare_response(Response::new(SomeData { 108 data: data.to_vec(), 109 }))) 110 } 111 112 type CompressInputOutputBidirectionalStreamStream = 113 Pin<Box<dyn Stream<Item = Result<SomeData, Status>> + Send + Sync + 'static>>; 114 115 async fn compress_input_output_bidirectional_stream( 116 &self, 117 req: Request<Streaming<SomeData>>, 118 ) -> Result<Response<Self::CompressInputOutputBidirectionalStreamStream>, Status> { 119 let mut stream = req.into_inner(); 120 while let Some(item) = stream.next().await { 121 item.unwrap(); 122 } 123 124 let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); 125 let stream = futures::stream::repeat(SomeData { data }) 126 .take(2) 127 .map(Ok::<_, Status>); 128 Ok(self.prepare_response(Response::new(Box::pin(stream)))) 129 } 130 } 131