1 use super::*; 2 use http_body::Body as _; 3 4 #[tokio::test(flavor = "multi_thread")] 5 async fn client_enabled_server_enabled() { 6 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 7 8 let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); 9 10 let request_bytes_counter = Arc::new(AtomicUsize::new(0)); 11 12 fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> { 13 assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip"); 14 req 15 } 16 17 tokio::spawn({ 18 let request_bytes_counter = request_bytes_counter.clone(); 19 async move { 20 Server::builder() 21 .layer( 22 ServiceBuilder::new() 23 .layer( 24 ServiceBuilder::new() 25 .map_request(assert_right_encoding) 26 .layer(measure_request_body_size_layer(request_bytes_counter)) 27 .into_inner(), 28 ) 29 .into_inner(), 30 ) 31 .add_service(svc) 32 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 33 MockStream(server), 34 )])) 35 .await 36 .unwrap(); 37 } 38 }); 39 40 let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); 41 42 for _ in 0..3 { 43 client 44 .compress_input_unary(SomeData { 45 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 46 }) 47 .await 48 .unwrap(); 49 let bytes_sent = request_bytes_counter.load(SeqCst); 50 assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE); 51 } 52 } 53 54 #[tokio::test(flavor = "multi_thread")] 55 async fn client_enabled_server_disabled() { 56 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 57 58 let svc = test_server::TestServer::new(Svc::default()); 59 60 tokio::spawn(async move { 61 Server::builder() 62 .add_service(svc) 63 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 64 MockStream(server), 65 )])) 66 .await 67 .unwrap(); 68 }); 69 70 let mut client = test_client::TestClient::new(mock_io_channel(client).await).send_gzip(); 71 72 let status = client 73 .compress_input_unary(SomeData { 74 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 75 }) 76 .await 77 .unwrap_err(); 78 79 assert_eq!(status.code(), tonic::Code::Unimplemented); 80 assert_eq!( 81 status.message(), 82 "Content is compressed with `gzip` which isn't supported" 83 ); 84 85 assert_eq!( 86 status.metadata().get("grpc-accept-encoding").unwrap(), 87 "identity" 88 ); 89 } 90 91 #[tokio::test(flavor = "multi_thread")] 92 async fn client_mark_compressed_without_header_server_enabled() { 93 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 94 95 let svc = test_server::TestServer::new(Svc::default()).accept_gzip(); 96 97 tokio::spawn({ 98 async move { 99 Server::builder() 100 .add_service(svc) 101 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 102 MockStream(server), 103 )])) 104 .await 105 .unwrap(); 106 } 107 }); 108 109 let mut client = test_client::TestClient::with_interceptor( 110 mock_io_channel(client).await, 111 move |mut req: Request<()>| { 112 req.metadata_mut().remove("grpc-encoding"); 113 Ok(req) 114 }, 115 ) 116 .send_gzip(); 117 118 let status = client 119 .compress_input_unary(SomeData { 120 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(), 121 }) 122 .await 123 .unwrap_err(); 124 125 assert_eq!(status.code(), tonic::Code::Internal); 126 assert_eq!( 127 status.message(), 128 "protocol error: received message with compressed-flag but no grpc-encoding was specified" 129 ); 130 } 131