10583cff8SDavid Pedersen use super::*;
2a585a722SMarcus Griep use tonic::codec::CompressionEncoding;
30583cff8SDavid Pedersen use tonic::Streaming;
40583cff8SDavid Pedersen 
5e8cb48fcSQuentin Perez util::parametrized_tests! {
6e8cb48fcSQuentin Perez     client_enabled_server_enabled,
7e8cb48fcSQuentin Perez     zstd: CompressionEncoding::Zstd,
8e8cb48fcSQuentin Perez     gzip: CompressionEncoding::Gzip,
9*79a06cc8SIlya Averyanov     deflate: CompressionEncoding::Deflate,
10e8cb48fcSQuentin Perez }
11e8cb48fcSQuentin Perez 
12e8cb48fcSQuentin Perez #[allow(dead_code)]
client_enabled_server_enabled(encoding: CompressionEncoding)13e8cb48fcSQuentin Perez async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
140583cff8SDavid Pedersen     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
150583cff8SDavid Pedersen 
16e8cb48fcSQuentin Perez     let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
170583cff8SDavid Pedersen 
180583cff8SDavid Pedersen     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
190583cff8SDavid Pedersen 
200583cff8SDavid Pedersen     tokio::spawn({
210583cff8SDavid Pedersen         let response_bytes_counter = response_bytes_counter.clone();
220583cff8SDavid Pedersen         async move {
230583cff8SDavid Pedersen             Server::builder()
240583cff8SDavid Pedersen                 .layer(
250583cff8SDavid Pedersen                     ServiceBuilder::new()
260583cff8SDavid Pedersen                         .layer(MapResponseBodyLayer::new(move |body| {
270583cff8SDavid Pedersen                             util::CountBytesBody {
280583cff8SDavid Pedersen                                 inner: body,
290583cff8SDavid Pedersen                                 counter: response_bytes_counter.clone(),
300583cff8SDavid Pedersen                             }
310583cff8SDavid Pedersen                         }))
320583cff8SDavid Pedersen                         .into_inner(),
330583cff8SDavid Pedersen                 )
340583cff8SDavid Pedersen                 .add_service(svc)
35f089e7a0Stottoto                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
360583cff8SDavid Pedersen                 .await
370583cff8SDavid Pedersen                 .unwrap();
380583cff8SDavid Pedersen         }
390583cff8SDavid Pedersen     });
400583cff8SDavid Pedersen 
41e8cb48fcSQuentin Perez     let mut client =
42e8cb48fcSQuentin Perez         test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
430583cff8SDavid Pedersen 
440583cff8SDavid Pedersen     let res = client.compress_output_server_stream(()).await.unwrap();
450583cff8SDavid Pedersen 
46e8cb48fcSQuentin Perez     let expected = match encoding {
47e8cb48fcSQuentin Perez         CompressionEncoding::Gzip => "gzip",
48e8cb48fcSQuentin Perez         CompressionEncoding::Zstd => "zstd",
49*79a06cc8SIlya Averyanov         CompressionEncoding::Deflate => "deflate",
50e8cb48fcSQuentin Perez         _ => panic!("unexpected encoding {:?}", encoding),
51e8cb48fcSQuentin Perez     };
52e8cb48fcSQuentin Perez     assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected);
530583cff8SDavid Pedersen 
540583cff8SDavid Pedersen     let mut stream: Streaming<SomeData> = res.into_inner();
550583cff8SDavid Pedersen 
560583cff8SDavid Pedersen     stream
570583cff8SDavid Pedersen         .next()
580583cff8SDavid Pedersen         .await
590583cff8SDavid Pedersen         .expect("stream empty")
600583cff8SDavid Pedersen         .expect("item was error");
610583cff8SDavid Pedersen     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
620583cff8SDavid Pedersen 
630583cff8SDavid Pedersen     stream
640583cff8SDavid Pedersen         .next()
650583cff8SDavid Pedersen         .await
660583cff8SDavid Pedersen         .expect("stream empty")
670583cff8SDavid Pedersen         .expect("item was error");
680583cff8SDavid Pedersen     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
690583cff8SDavid Pedersen }
700583cff8SDavid Pedersen 
71e8cb48fcSQuentin Perez util::parametrized_tests! {
72e8cb48fcSQuentin Perez     client_disabled_server_enabled,
73e8cb48fcSQuentin Perez     zstd: CompressionEncoding::Zstd,
74e8cb48fcSQuentin Perez     gzip: CompressionEncoding::Gzip,
75e8cb48fcSQuentin Perez }
76e8cb48fcSQuentin Perez 
77e8cb48fcSQuentin Perez #[allow(dead_code)]
client_disabled_server_enabled(encoding: CompressionEncoding)78e8cb48fcSQuentin Perez async fn client_disabled_server_enabled(encoding: CompressionEncoding) {
790583cff8SDavid Pedersen     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
800583cff8SDavid Pedersen 
81e8cb48fcSQuentin Perez     let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
820583cff8SDavid Pedersen 
830583cff8SDavid Pedersen     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
840583cff8SDavid Pedersen 
850583cff8SDavid Pedersen     tokio::spawn({
860583cff8SDavid Pedersen         let response_bytes_counter = response_bytes_counter.clone();
870583cff8SDavid Pedersen         async move {
880583cff8SDavid Pedersen             Server::builder()
890583cff8SDavid Pedersen                 .layer(
900583cff8SDavid Pedersen                     ServiceBuilder::new()
910583cff8SDavid Pedersen                         .layer(MapResponseBodyLayer::new(move |body| {
920583cff8SDavid Pedersen                             util::CountBytesBody {
930583cff8SDavid Pedersen                                 inner: body,
940583cff8SDavid Pedersen                                 counter: response_bytes_counter.clone(),
950583cff8SDavid Pedersen                             }
960583cff8SDavid Pedersen                         }))
970583cff8SDavid Pedersen                         .into_inner(),
980583cff8SDavid Pedersen                 )
990583cff8SDavid Pedersen                 .add_service(svc)
100f089e7a0Stottoto                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
1010583cff8SDavid Pedersen                 .await
1020583cff8SDavid Pedersen                 .unwrap();
1030583cff8SDavid Pedersen         }
1040583cff8SDavid Pedersen     });
1050583cff8SDavid Pedersen 
1060583cff8SDavid Pedersen     let mut client = test_client::TestClient::new(mock_io_channel(client).await);
1070583cff8SDavid Pedersen 
1080583cff8SDavid Pedersen     let res = client.compress_output_server_stream(()).await.unwrap();
1090583cff8SDavid Pedersen 
1100583cff8SDavid Pedersen     assert!(res.metadata().get("grpc-encoding").is_none());
1110583cff8SDavid Pedersen 
1120583cff8SDavid Pedersen     let mut stream: Streaming<SomeData> = res.into_inner();
1130583cff8SDavid Pedersen 
1140583cff8SDavid Pedersen     stream
1150583cff8SDavid Pedersen         .next()
1160583cff8SDavid Pedersen         .await
1170583cff8SDavid Pedersen         .expect("stream empty")
1180583cff8SDavid Pedersen         .expect("item was error");
1190583cff8SDavid Pedersen     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
1200583cff8SDavid Pedersen }
1210583cff8SDavid Pedersen 
122e8cb48fcSQuentin Perez util::parametrized_tests! {
123e8cb48fcSQuentin Perez     client_enabled_server_disabled,
124e8cb48fcSQuentin Perez     zstd: CompressionEncoding::Zstd,
125e8cb48fcSQuentin Perez     gzip: CompressionEncoding::Gzip,
126*79a06cc8SIlya Averyanov     deflate: CompressionEncoding::Deflate,
127e8cb48fcSQuentin Perez }
128e8cb48fcSQuentin Perez 
129e8cb48fcSQuentin Perez #[allow(dead_code)]
client_enabled_server_disabled(encoding: CompressionEncoding)130e8cb48fcSQuentin Perez async fn client_enabled_server_disabled(encoding: CompressionEncoding) {
1310583cff8SDavid Pedersen     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
1320583cff8SDavid Pedersen 
1330583cff8SDavid Pedersen     let svc = test_server::TestServer::new(Svc::default());
1340583cff8SDavid Pedersen 
1350583cff8SDavid Pedersen     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
1360583cff8SDavid Pedersen 
1370583cff8SDavid Pedersen     tokio::spawn({
1380583cff8SDavid Pedersen         let response_bytes_counter = response_bytes_counter.clone();
1390583cff8SDavid Pedersen         async move {
1400583cff8SDavid Pedersen             Server::builder()
1410583cff8SDavid Pedersen                 .layer(
1420583cff8SDavid Pedersen                     ServiceBuilder::new()
1430583cff8SDavid Pedersen                         .layer(MapResponseBodyLayer::new(move |body| {
1440583cff8SDavid Pedersen                             util::CountBytesBody {
1450583cff8SDavid Pedersen                                 inner: body,
1460583cff8SDavid Pedersen                                 counter: response_bytes_counter.clone(),
1470583cff8SDavid Pedersen                             }
1480583cff8SDavid Pedersen                         }))
1490583cff8SDavid Pedersen                         .into_inner(),
1500583cff8SDavid Pedersen                 )
1510583cff8SDavid Pedersen                 .add_service(svc)
152f089e7a0Stottoto                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
1530583cff8SDavid Pedersen                 .await
1540583cff8SDavid Pedersen                 .unwrap();
1550583cff8SDavid Pedersen         }
1560583cff8SDavid Pedersen     });
1570583cff8SDavid Pedersen 
158e8cb48fcSQuentin Perez     let mut client =
159e8cb48fcSQuentin Perez         test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
1600583cff8SDavid Pedersen 
1610583cff8SDavid Pedersen     let res = client.compress_output_server_stream(()).await.unwrap();
1620583cff8SDavid Pedersen 
1630583cff8SDavid Pedersen     assert!(res.metadata().get("grpc-encoding").is_none());
1640583cff8SDavid Pedersen 
1650583cff8SDavid Pedersen     let mut stream: Streaming<SomeData> = res.into_inner();
1660583cff8SDavid Pedersen 
1670583cff8SDavid Pedersen     stream
1680583cff8SDavid Pedersen         .next()
1690583cff8SDavid Pedersen         .await
1700583cff8SDavid Pedersen         .expect("stream empty")
1710583cff8SDavid Pedersen         .expect("item was error");
1720583cff8SDavid Pedersen     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
1730583cff8SDavid Pedersen }
174