10583cff8SDavid Pedersen use super::*;
2a585a722SMarcus Griep use tonic::codec::CompressionEncoding;
30583cff8SDavid Pedersen use tonic::Streaming;
40583cff8SDavid Pedersen
5e8cb48fcSQuentin Perez util::parametrized_tests! {
6e8cb48fcSQuentin Perez client_enabled_server_enabled,
7e8cb48fcSQuentin Perez zstd: CompressionEncoding::Zstd,
8e8cb48fcSQuentin Perez gzip: CompressionEncoding::Gzip,
9*79a06cc8SIlya Averyanov deflate: CompressionEncoding::Deflate,
10e8cb48fcSQuentin Perez }
11e8cb48fcSQuentin Perez
12e8cb48fcSQuentin Perez #[allow(dead_code)]
client_enabled_server_enabled(encoding: CompressionEncoding)13e8cb48fcSQuentin Perez async fn client_enabled_server_enabled(encoding: CompressionEncoding) {
140583cff8SDavid Pedersen let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
150583cff8SDavid Pedersen
16e8cb48fcSQuentin Perez let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
170583cff8SDavid Pedersen
180583cff8SDavid Pedersen let response_bytes_counter = Arc::new(AtomicUsize::new(0));
190583cff8SDavid Pedersen
200583cff8SDavid Pedersen tokio::spawn({
210583cff8SDavid Pedersen let response_bytes_counter = response_bytes_counter.clone();
220583cff8SDavid Pedersen async move {
230583cff8SDavid Pedersen Server::builder()
240583cff8SDavid Pedersen .layer(
250583cff8SDavid Pedersen ServiceBuilder::new()
260583cff8SDavid Pedersen .layer(MapResponseBodyLayer::new(move |body| {
270583cff8SDavid Pedersen util::CountBytesBody {
280583cff8SDavid Pedersen inner: body,
290583cff8SDavid Pedersen counter: response_bytes_counter.clone(),
300583cff8SDavid Pedersen }
310583cff8SDavid Pedersen }))
320583cff8SDavid Pedersen .into_inner(),
330583cff8SDavid Pedersen )
340583cff8SDavid Pedersen .add_service(svc)
35f089e7a0Stottoto .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
360583cff8SDavid Pedersen .await
370583cff8SDavid Pedersen .unwrap();
380583cff8SDavid Pedersen }
390583cff8SDavid Pedersen });
400583cff8SDavid Pedersen
41e8cb48fcSQuentin Perez let mut client =
42e8cb48fcSQuentin Perez test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
430583cff8SDavid Pedersen
440583cff8SDavid Pedersen let res = client.compress_output_server_stream(()).await.unwrap();
450583cff8SDavid Pedersen
46e8cb48fcSQuentin Perez let expected = match encoding {
47e8cb48fcSQuentin Perez CompressionEncoding::Gzip => "gzip",
48e8cb48fcSQuentin Perez CompressionEncoding::Zstd => "zstd",
49*79a06cc8SIlya Averyanov CompressionEncoding::Deflate => "deflate",
50e8cb48fcSQuentin Perez _ => panic!("unexpected encoding {:?}", encoding),
51e8cb48fcSQuentin Perez };
52e8cb48fcSQuentin Perez assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected);
530583cff8SDavid Pedersen
540583cff8SDavid Pedersen let mut stream: Streaming<SomeData> = res.into_inner();
550583cff8SDavid Pedersen
560583cff8SDavid Pedersen stream
570583cff8SDavid Pedersen .next()
580583cff8SDavid Pedersen .await
590583cff8SDavid Pedersen .expect("stream empty")
600583cff8SDavid Pedersen .expect("item was error");
610583cff8SDavid Pedersen assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
620583cff8SDavid Pedersen
630583cff8SDavid Pedersen stream
640583cff8SDavid Pedersen .next()
650583cff8SDavid Pedersen .await
660583cff8SDavid Pedersen .expect("stream empty")
670583cff8SDavid Pedersen .expect("item was error");
680583cff8SDavid Pedersen assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
690583cff8SDavid Pedersen }
700583cff8SDavid Pedersen
71e8cb48fcSQuentin Perez util::parametrized_tests! {
72e8cb48fcSQuentin Perez client_disabled_server_enabled,
73e8cb48fcSQuentin Perez zstd: CompressionEncoding::Zstd,
74e8cb48fcSQuentin Perez gzip: CompressionEncoding::Gzip,
75e8cb48fcSQuentin Perez }
76e8cb48fcSQuentin Perez
77e8cb48fcSQuentin Perez #[allow(dead_code)]
client_disabled_server_enabled(encoding: CompressionEncoding)78e8cb48fcSQuentin Perez async fn client_disabled_server_enabled(encoding: CompressionEncoding) {
790583cff8SDavid Pedersen let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
800583cff8SDavid Pedersen
81e8cb48fcSQuentin Perez let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding);
820583cff8SDavid Pedersen
830583cff8SDavid Pedersen let response_bytes_counter = Arc::new(AtomicUsize::new(0));
840583cff8SDavid Pedersen
850583cff8SDavid Pedersen tokio::spawn({
860583cff8SDavid Pedersen let response_bytes_counter = response_bytes_counter.clone();
870583cff8SDavid Pedersen async move {
880583cff8SDavid Pedersen Server::builder()
890583cff8SDavid Pedersen .layer(
900583cff8SDavid Pedersen ServiceBuilder::new()
910583cff8SDavid Pedersen .layer(MapResponseBodyLayer::new(move |body| {
920583cff8SDavid Pedersen util::CountBytesBody {
930583cff8SDavid Pedersen inner: body,
940583cff8SDavid Pedersen counter: response_bytes_counter.clone(),
950583cff8SDavid Pedersen }
960583cff8SDavid Pedersen }))
970583cff8SDavid Pedersen .into_inner(),
980583cff8SDavid Pedersen )
990583cff8SDavid Pedersen .add_service(svc)
100f089e7a0Stottoto .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
1010583cff8SDavid Pedersen .await
1020583cff8SDavid Pedersen .unwrap();
1030583cff8SDavid Pedersen }
1040583cff8SDavid Pedersen });
1050583cff8SDavid Pedersen
1060583cff8SDavid Pedersen let mut client = test_client::TestClient::new(mock_io_channel(client).await);
1070583cff8SDavid Pedersen
1080583cff8SDavid Pedersen let res = client.compress_output_server_stream(()).await.unwrap();
1090583cff8SDavid Pedersen
1100583cff8SDavid Pedersen assert!(res.metadata().get("grpc-encoding").is_none());
1110583cff8SDavid Pedersen
1120583cff8SDavid Pedersen let mut stream: Streaming<SomeData> = res.into_inner();
1130583cff8SDavid Pedersen
1140583cff8SDavid Pedersen stream
1150583cff8SDavid Pedersen .next()
1160583cff8SDavid Pedersen .await
1170583cff8SDavid Pedersen .expect("stream empty")
1180583cff8SDavid Pedersen .expect("item was error");
1190583cff8SDavid Pedersen assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
1200583cff8SDavid Pedersen }
1210583cff8SDavid Pedersen
122e8cb48fcSQuentin Perez util::parametrized_tests! {
123e8cb48fcSQuentin Perez client_enabled_server_disabled,
124e8cb48fcSQuentin Perez zstd: CompressionEncoding::Zstd,
125e8cb48fcSQuentin Perez gzip: CompressionEncoding::Gzip,
126*79a06cc8SIlya Averyanov deflate: CompressionEncoding::Deflate,
127e8cb48fcSQuentin Perez }
128e8cb48fcSQuentin Perez
129e8cb48fcSQuentin Perez #[allow(dead_code)]
client_enabled_server_disabled(encoding: CompressionEncoding)130e8cb48fcSQuentin Perez async fn client_enabled_server_disabled(encoding: CompressionEncoding) {
1310583cff8SDavid Pedersen let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
1320583cff8SDavid Pedersen
1330583cff8SDavid Pedersen let svc = test_server::TestServer::new(Svc::default());
1340583cff8SDavid Pedersen
1350583cff8SDavid Pedersen let response_bytes_counter = Arc::new(AtomicUsize::new(0));
1360583cff8SDavid Pedersen
1370583cff8SDavid Pedersen tokio::spawn({
1380583cff8SDavid Pedersen let response_bytes_counter = response_bytes_counter.clone();
1390583cff8SDavid Pedersen async move {
1400583cff8SDavid Pedersen Server::builder()
1410583cff8SDavid Pedersen .layer(
1420583cff8SDavid Pedersen ServiceBuilder::new()
1430583cff8SDavid Pedersen .layer(MapResponseBodyLayer::new(move |body| {
1440583cff8SDavid Pedersen util::CountBytesBody {
1450583cff8SDavid Pedersen inner: body,
1460583cff8SDavid Pedersen counter: response_bytes_counter.clone(),
1470583cff8SDavid Pedersen }
1480583cff8SDavid Pedersen }))
1490583cff8SDavid Pedersen .into_inner(),
1500583cff8SDavid Pedersen )
1510583cff8SDavid Pedersen .add_service(svc)
152f089e7a0Stottoto .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
1530583cff8SDavid Pedersen .await
1540583cff8SDavid Pedersen .unwrap();
1550583cff8SDavid Pedersen }
1560583cff8SDavid Pedersen });
1570583cff8SDavid Pedersen
158e8cb48fcSQuentin Perez let mut client =
159e8cb48fcSQuentin Perez test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding);
1600583cff8SDavid Pedersen
1610583cff8SDavid Pedersen let res = client.compress_output_server_stream(()).await.unwrap();
1620583cff8SDavid Pedersen
1630583cff8SDavid Pedersen assert!(res.metadata().get("grpc-encoding").is_none());
1640583cff8SDavid Pedersen
1650583cff8SDavid Pedersen let mut stream: Streaming<SomeData> = res.into_inner();
1660583cff8SDavid Pedersen
1670583cff8SDavid Pedersen stream
1680583cff8SDavid Pedersen .next()
1690583cff8SDavid Pedersen .await
1700583cff8SDavid Pedersen .expect("stream empty")
1710583cff8SDavid Pedersen .expect("item was error");
1720583cff8SDavid Pedersen assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
1730583cff8SDavid Pedersen }
174