1 use super::*; 2 use http_body::Body as _; 3 4 #[tokio::test(flavor = "multi_thread")] 5 async fn client_enabled_server_enabled() { 6 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 7 8 let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); 9 10 let request_bytes_counter = Arc::new(AtomicUsize::new(0)); 11 12 fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { 13 assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip"); 14 req 15 } 16 17 tokio::spawn({ 18 let request_bytes_counter = request_bytes_counter.clone(); 19 async move { 20 Server::builder() 21 .layer( 22 ServiceBuilder::new() 23 .layer( 24 ServiceBuilder::new() 25 .map_request(assert_right_encoding) 26 .layer(measure_request_body_size_layer(request_bytes_counter)) 27 .into_inner(), 28 ) 29 .into_inner(), 30 ) 31 .add_service(svc) 32 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) 33 .await 34 .unwrap(); 35 } 36 }); 37 38 let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); 39 40 for _ in 0..3 { 41 client 42 .compress_input_unary(SomeData { 43 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 44 }) 45 .await 46 .unwrap(); 47 let bytes_sent = request_bytes_counter.load(SeqCst); 48 assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE); 49 } 50 } 51 52 #[tokio::test(flavor = "multi_thread")] 53 async fn client_enabled_server_disabled() { 54 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 55 56 let svc = test_server::TestServer::new(Svc::default()); 57 58 tokio::spawn(async move { 59 Server::builder() 60 .add_service(svc) 61 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) 62 .await 63 .unwrap(); 64 }); 65 66 let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); 67 68 let status = client 69 .compress_input_unary(SomeData { 70 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 71 }) 72 .await 73 .unwrap_err(); 74 75 assert_eq!(status.code(), tonic::Code::Unimplemented); 76 assert_eq!( 77 status.message(), 78 "Content is compressed with `gzip` which isn't supported" 79 ); 80 81 assert_eq!( 82 status.metadata().get("grpc-accept-encoding").unwrap(), 83 "identity" 84 ); 85 } 86 87 #[tokio::test(flavor = "multi_thread")] 88 async fn client_mark_compressed_without_header_server_enabled() { 89 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 90 91 let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); 92 93 tokio::spawn({ 94 async move { 95 Server::builder() 96 .add_service(svc) 97 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) 98 .await 99 .unwrap(); 100 } 101 }); 102 103 let mut client = test_client::TestClient::with_interceptor( 104 mock_io_channel(client).await, 105 move |mut req: Request<()>| { 106 req.metadata_mut().remove("grpc-encoding"); 107 Ok(req) 108 }, 109 ) 110 .send_gzip(); 111 112 let status = client 113 .compress_input_unary(SomeData { 114 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 115 }) 116 .await 117 .unwrap_err(); 118 119 assert_eq!(status.code(), tonic::Code::Internal); 120 assert_eq!( 121 status.message(), 122 "protocol error: received message with compressed-flag but no grpc-encoding was specified" 123 ); 124 } 125