1 use super::*;
2 use http_body::Body as _;
3 use tonic::codec::CompressionEncoding;
4 
5 #[tokio::test(flavor = "multi_thread")]
6 async fn client_enabled_server_enabled() {
7     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
8 
9     let svc =
10         test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip);
11 
12     let request_bytes_counter = Arc::new(AtomicUsize::new(0));
13 
14     fn assert_right_encoding<B>(req: http::Request<B>) -> http::Request<B> {
15         assert_eq!(req.headers().get("grpc-encoding").unwrap(), "gzip");
16         req
17     }
18 
19     tokio::spawn({
20         let request_bytes_counter = request_bytes_counter.clone();
21         async move {
22             Server::builder()
23                 .layer(
24                     ServiceBuilder::new()
25                         .layer(
26                             ServiceBuilder::new()
27                                 .map_request(assert_right_encoding)
28                                 .layer(measure_request_body_size_layer(request_bytes_counter))
29                                 .into_inner(),
30                         )
31                         .into_inner(),
32                 )
33                 .add_service(svc)
34                 .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
35                 .await
36                 .unwrap();
37         }
38     });
39 
40     let mut client = test_client::TestClient::new(mock_io_channel(client).await)
41         .send_compressed(CompressionEncoding::Gzip);
42 
43     for _ in 0..3 {
44         client
45             .compress_input_unary(SomeData {
46                 data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(),
47             })
48             .await
49             .unwrap();
50         let bytes_sent = request_bytes_counter.load(SeqCst);
51         assert!(bytes_sent < UNCOMPRESSED_MIN_BODY_SIZE);
52     }
53 }
54 
55 #[tokio::test(flavor = "multi_thread")]
56 async fn client_enabled_server_disabled() {
57     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
58 
59     let svc = test_server::TestServer::new(Svc::default());
60 
61     tokio::spawn(async move {
62         Server::builder()
63             .add_service(svc)
64             .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
65             .await
66             .unwrap();
67     });
68 
69     let mut client = test_client::TestClient::new(mock_io_channel(client).await)
70         .send_compressed(CompressionEncoding::Gzip);
71 
72     let status = client
73         .compress_input_unary(SomeData {
74             data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(),
75         })
76         .await
77         .unwrap_err();
78 
79     assert_eq!(status.code(), tonic::Code::Unimplemented);
80     assert_eq!(
81         status.message(),
82         "Content is compressed with `gzip` which isn't supported"
83     );
84 
85     assert_eq!(
86         status.metadata().get("grpc-accept-encoding").unwrap(),
87         "identity"
88     );
89 }
90 
91 #[tokio::test(flavor = "multi_thread")]
92 async fn client_mark_compressed_without_header_server_enabled() {
93     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
94 
95     let svc =
96         test_server::TestServer::new(Svc::default()).accept_compressed(CompressionEncoding::Gzip);
97 
98     tokio::spawn({
99         async move {
100             Server::builder()
101                 .add_service(svc)
102                 .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
103                 .await
104                 .unwrap();
105         }
106     });
107 
108     let mut client = test_client::TestClient::with_interceptor(
109         mock_io_channel(client).await,
110         move |mut req: Request<()>| {
111             req.metadata_mut().remove("grpc-encoding");
112             Ok(req)
113         },
114     )
115     .send_compressed(CompressionEncoding::Gzip);
116 
117     let status = client
118         .compress_input_unary(SomeData {
119             data: [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(),
120         })
121         .await
122         .unwrap_err();
123 
124     assert_eq!(status.code(), tonic::Code::Internal);
125     assert_eq!(
126         status.message(),
127         "protocol error: received message with compressed-flag but no grpc-encoding was specified"
128     );
129 }
130