1 use super::*;
2 use tonic::codec::CompressionEncoding;
3 use tonic::Streaming;
4
5 util::parametrized_tests! {
6 client_enabled_server_enabled,
7 zstd: CompressionEncoding::Zstd,
8 gzip: CompressionEncoding::Gzip,
9 deflate: CompressionEncoding::Deflate,
10 }
11
12 #[allow(dead_code)]
client_enabled_server_enabled(encoding: CompressionEncoding)13 async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
14 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
15
16 let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
17
18 let response_bytes_counter = Arc::new(AtomicUsize::new(0));
19
20 tokio::spawn({
21 let response_bytes_counter = response_bytes_counter.clone();
22 async move {
23 Server::builder()
24 .layer(
25 ServiceBuilder::new()
26 .layer(MapResponseBodyLayer::new(move |body| {
27 util::CountBytesBody {
28 inner: body,
29 counter: response_bytes_counter.clone(),
30 }
31 }))
32 .into_inner(),
33 )
34 .add_service(svc)
35 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
36 .await
37 .unwrap();
38 }
39 });
40
41 let mut client =
42 test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
43
44 let res = client.compress_output_server_stream(()).await.unwrap();
45
46 let expected = match encoding {
47 CompressionEncoding::Gzip => "gzip",
48 CompressionEncoding::Zstd => "zstd",
49 CompressionEncoding::Deflate => "deflate",
50 _ => panic!("unexpected encoding {:?}", encoding),
51 };
52 assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected);
53
54 let mut stream: Streaming<SomeData> = res.into_inner();
55
56 stream
57 .next()
58 .await
59 .expect("stream empty")
60 .expect("item was error");
61 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
62
63 stream
64 .next()
65 .await
66 .expect("stream empty")
67 .expect("item was error");
68 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
69 }
70
71 util::parametrized_tests! {
72 client_disabled_server_enabled,
73 zstd: CompressionEncoding::Zstd,
74 gzip: CompressionEncoding::Gzip,
75 }
76
77 #[allow(dead_code)]
client_disabled_server_enabled(encoding: CompressionEncoding)78 async fn client_disabled_server_enabled(encoding: CompressionEncoding) {
79 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
80
81 let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
82
83 let response_bytes_counter = Arc::new(AtomicUsize::new(0));
84
85 tokio::spawn({
86 let response_bytes_counter = response_bytes_counter.clone();
87 async move {
88 Server::builder()
89 .layer(
90 ServiceBuilder::new()
91 .layer(MapResponseBodyLayer::new(move |body| {
92 util::CountBytesBody {
93 inner: body,
94 counter: response_bytes_counter.clone(),
95 }
96 }))
97 .into_inner(),
98 )
99 .add_service(svc)
100 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
101 .await
102 .unwrap();
103 }
104 });
105
106 let mut client = test_client::TestClient::new(mock_io_channel(client).await);
107
108 let res = client.compress_output_server_stream(()).await.unwrap();
109
110 assert!(res.metadata().get("grpc-encoding").is_none());
111
112 let mut stream: Streaming<SomeData> = res.into_inner();
113
114 stream
115 .next()
116 .await
117 .expect("stream empty")
118 .expect("item was error");
119 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
120 }
121
122 util::parametrized_tests! {
123 client_enabled_server_disabled,
124 zstd: CompressionEncoding::Zstd,
125 gzip: CompressionEncoding::Gzip,
126 deflate: CompressionEncoding::Deflate,
127 }
128
129 #[allow(dead_code)]
client_enabled_server_disabled(encoding: CompressionEncoding)130 async fn client_enabled_server_disabled(encoding: CompressionEncoding) {
131 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
132
133 let svc = test_server::TestServer::new(Svc::default());
134
135 let response_bytes_counter = Arc::new(AtomicUsize::new(0));
136
137 tokio::spawn({
138 let response_bytes_counter = response_bytes_counter.clone();
139 async move {
140 Server::builder()
141 .layer(
142 ServiceBuilder::new()
143 .layer(MapResponseBodyLayer::new(move |body| {
144 util::CountBytesBody {
145 inner: body,
146 counter: response_bytes_counter.clone(),
147 }
148 }))
149 .into_inner(),
150 )
151 .add_service(svc)
152 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
153 .await
154 .unwrap();
155 }
156 });
157
158 let mut client =
159 test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
160
161 let res = client.compress_output_server_stream(()).await.unwrap();
162
163 assert!(res.metadata().get("grpc-encoding").is_none());
164
165 let mut stream: Streaming<SomeData> = res.into_inner();
166
167 stream
168 .next()
169 .await
170 .expect("stream empty")
171 .expect("item was error");
172 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
173 }
174