1 use super::*;
2 use tonic::codec::CompressionEncoding;
3 use tonic::Streaming;
4 
5 util::parametrized_tests! {
6     client_enabled_server_enabled,
7     zstd: CompressionEncoding::Zstd,
8     gzip: CompressionEncoding::Gzip,
9 }
10 
11 #[allow(dead_code)]
12 async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
13     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
14 
15     let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
16 
17     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
18 
19     tokio::spawn({
20         let response_bytes_counter = response_bytes_counter.clone();
21         async move {
22             Server::builder()
23                 .layer(
24                     ServiceBuilder::new()
25                         .layer(MapResponseBodyLayer::new(move |body| {
26                             util::CountBytesBody {
27                                 inner: body,
28                                 counter: response_bytes_counter.clone(),
29                             }
30                         }))
31                         .into_inner(),
32                 )
33                 .add_service(svc)
34                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
35                 .await
36                 .unwrap();
37         }
38     });
39 
40     let mut client =
41         test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
42 
43     let res = client.compress_output_server_stream(()).await.unwrap();
44 
45     let expected = match encoding {
46         CompressionEncoding::Gzip => "gzip",
47         CompressionEncoding::Zstd => "zstd",
48         _ => panic!("unexpected encoding {:?}", encoding),
49     };
50     assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected);
51 
52     let mut stream: Streaming<SomeData> = res.into_inner();
53 
54     stream
55         .next()
56         .await
57         .expect("stream empty")
58         .expect("item was error");
59     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
60 
61     stream
62         .next()
63         .await
64         .expect("stream empty")
65         .expect("item was error");
66     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
67 }
68 
69 util::parametrized_tests! {
70     client_disabled_server_enabled,
71     zstd: CompressionEncoding::Zstd,
72     gzip: CompressionEncoding::Gzip,
73 }
74 
75 #[allow(dead_code)]
76 async fn client_disabled_server_enabled(encoding: CompressionEncoding) {
77     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
78 
79     let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
80 
81     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
82 
83     tokio::spawn({
84         let response_bytes_counter = response_bytes_counter.clone();
85         async move {
86             Server::builder()
87                 .layer(
88                     ServiceBuilder::new()
89                         .layer(MapResponseBodyLayer::new(move |body| {
90                             util::CountBytesBody {
91                                 inner: body,
92                                 counter: response_bytes_counter.clone(),
93                             }
94                         }))
95                         .into_inner(),
96                 )
97                 .add_service(svc)
98                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
99                 .await
100                 .unwrap();
101         }
102     });
103 
104     let mut client = test_client::TestClient::new(mock_io_channel(client).await);
105 
106     let res = client.compress_output_server_stream(()).await.unwrap();
107 
108     assert!(res.metadata().get("grpc-encoding").is_none());
109 
110     let mut stream: Streaming<SomeData> = res.into_inner();
111 
112     stream
113         .next()
114         .await
115         .expect("stream empty")
116         .expect("item was error");
117     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
118 }
119 
120 util::parametrized_tests! {
121     client_enabled_server_disabled,
122     zstd: CompressionEncoding::Zstd,
123     gzip: CompressionEncoding::Gzip,
124 }
125 
126 #[allow(dead_code)]
127 async fn client_enabled_server_disabled(encoding: CompressionEncoding) {
128     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
129 
130     let svc = test_server::TestServer::new(Svc::default());
131 
132     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
133 
134     tokio::spawn({
135         let response_bytes_counter = response_bytes_counter.clone();
136         async move {
137             Server::builder()
138                 .layer(
139                     ServiceBuilder::new()
140                         .layer(MapResponseBodyLayer::new(move |body| {
141                             util::CountBytesBody {
142                                 inner: body,
143                                 counter: response_bytes_counter.clone(),
144                             }
145                         }))
146                         .into_inner(),
147                 )
148                 .add_service(svc)
149                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
150                 .await
151                 .unwrap();
152         }
153     });
154 
155     let mut client =
156         test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
157 
158     let res = client.compress_output_server_stream(()).await.unwrap();
159 
160     assert!(res.metadata().get("grpc-encoding").is_none());
161 
162     let mut stream: Streaming<SomeData> = res.into_inner();
163 
164     stream
165         .next()
166         .await
167         .expect("stream empty")
168         .expect("item was error");
169     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
170 }
171