1 use super::*;
2 use tonic::codec::CompressionEncoding;
3 
4 #[tokio::test(flavor = "multi_thread")]
5 async fn client_enabled_server_enabled() {
6     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
7 
8     let svc = test_server::TestServer::new(Svc::default())
9         .accept_compressed(CompressionEncoding::Gzip)
10         .send_compressed(CompressionEncoding::Gzip);
11 
12     let request_bytes_counter = Arc::new(AtomicUsize::new(0));
13     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
14 
15     fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
16         assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip");
17         req
18     }
19 
20     tokio::spawn({
21         let request_bytes_counter = request_bytes_counter.clone();
22         let response_bytes_counter = response_bytes_counter.clone();
23         async move {
24             Server::builder()
25                 .layer(
26                     ServiceBuilder::new()
27                         .map_request(assert_right_encoding)
28                         .layer(measure_request_body_size_layer(
29                             request_bytes_counter.clone(),
30                         ))
31                         .layer(MapResponseBodyLayer::new(move |body| {
32                             util::CountBytesBody {
33                                 inner: body,
34                                 counter: response_bytes_counter.clone(),
35                             }
36                         }))
37                         .into_inner(),
38                 )
39                 .add_service(svc)
40                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
41                 .await
42                 .unwrap();
43         }
44     });
45 
46     let mut client = test_client::TestClient::new(mock_io_channel(client).await)
47         .send_compressed(CompressionEncoding::Gzip)
48         .accept_compressed(CompressionEncoding::Gzip);
49 
50     let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
51     let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
52     let req = Request::new(stream);
53 
54     let res = client
55         .compress_input_output_bidirectional_stream(req)
56         .await
57         .unwrap();
58 
59     assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
60 
61     let mut stream: Streaming<SomeData> = res.into_inner();
62 
63     stream
64         .next()
65         .await
66         .expect("stream empty")
67         .expect("item was error");
68 
69     stream
70         .next()
71         .await
72         .expect("stream empty")
73         .expect("item was error");
74 
75     assert!(request_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
76     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
77 }
78