1 use super::*; 2 use http_body::Body as _; 3 use tonic::codec::CompressionEncoding; 4 5 #[tokio::test(flavor = "multi_thread")] 6 async fn client_enabled_server_enabled() { 7 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 8 9 let svc = 10 test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); 11 12 let request_bytes_counter = Arc::new(AtomicUsize::new(0)); 13 14 fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { 15 assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip"); 16 req 17 } 18 19 tokio::spawn({ 20 let request_bytes_counter = request_bytes_counter.clone(); 21 async move { 22 Server::builder() 23 .layer( 24 ServiceBuilder::new() 25 .layer( 26 ServiceBuilder::new() 27 .map_request(assert_right_encoding) 28 .layer(measure_request_body_size_layer(request_bytes_counter)) 29 .into_inner(), 30 ) 31 .into_inner(), 32 ) 33 .add_service(svc) 34 .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) 35 .await 36 .unwrap(); 37 } 38 }); 39 40 let mut client = test_client::TestClient::new(mock_io_channel(client).await) 41 .send_compressed(CompressionEncoding::Gzip); 42 43 for _ in 0..3 { 44 client 45 .compress_input_unary(SomeData { 46 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 47 }) 48 .await 49 .unwrap(); 50 let bytes_sent = request_bytes_counter.load(SeqCst); 51 assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE); 52 } 53 } 54 55 #[tokio::test(flavor = "multi_thread")] 56 async fn client_enabled_server_disabled() { 57 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 58 59 let svc = test_server::TestServer::new(Svc::default()); 60 61 tokio::spawn(async move { 62 Server::builder() 63 .add_service(svc) 64 .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) 65 .await 66 .unwrap(); 67 }); 68 69 let mut client = test_client::TestClient::new(mock_io_channel(client).await) 70 .send_compressed(CompressionEncoding::Gzip); 71 72 let status = client 73 .compress_input_unary(SomeData { 74 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 75 }) 76 .await 77 .unwrap_err(); 78 79 assert_eq!(status.code(), tonic::Code::Unimplemented); 80 assert_eq!( 81 status.message(), 82 "Content is compressed with `gzip` which isn't supported" 83 ); 84 85 assert_eq!( 86 status.metadata().get("grpc-accept-encoding").unwrap(), 87 "identity" 88 ); 89 } 90 91 #[tokio::test(flavor = "multi_thread")] 92 async fn client_mark_compressed_without_header_server_enabled() { 93 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 94 95 let svc = 96 test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip); 97 98 tokio::spawn({ 99 async move { 100 Server::builder() 101 .add_service(svc) 102 .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) 103 .await 104 .unwrap(); 105 } 106 }); 107 108 let mut client = test_client::TestClient::with_interceptor( 109 mock_io_channel(client).await, 110 move |mut req: Request<()>| { 111 req.metadata_mut().remove("grpc-encoding"); 112 Ok(req) 113 }, 114 ) 115 .send_compressed(CompressionEncoding::Gzip); 116 117 let status = client 118 .compress_input_unary(SomeData { 119 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 120 }) 121 .await 122 .unwrap_err(); 123 124 assert_eq!(status.code(), tonic::Code::Internal); 125 assert_eq!( 126 status.message(), 127 "protocol error: received message with compressed-flag but no grpc-encoding was specified" 128 ); 129 } 130