1 use super::*; 2 use tonic::codec::CompressionEncoding; 3 use tonic::Streaming; 4 5 util::parametrized_tests! { 6 client_enabled_server_enabled, 7 zstd: CompressionEncoding::Zstd, 8 gzip: CompressionEncoding::Gzip, 9 } 10 11 #[allow(dead_code)] 12 async fn client_enabled_server_enabled(encoding: CompressionEncoding) { 13 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 14 15 let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding); 16 17 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 18 19 tokio::spawn({ 20 let response_bytes_counter = response_bytes_counter.clone(); 21 async move { 22 Server::builder() 23 .layer( 24 ServiceBuilder::new() 25 .layer(MapResponseBodyLayer::new(move |body| { 26 util::CountBytesBody { 27 inner: body, 28 counter: response_bytes_counter.clone(), 29 } 30 })) 31 .into_inner(), 32 ) 33 .add_service(svc) 34 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 35 .await 36 .unwrap(); 37 } 38 }); 39 40 let mut client = 41 test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding); 42 43 let res = client.compress_output_server_stream(()).await.unwrap(); 44 45 let expected = match encoding { 46 CompressionEncoding::Gzip => "gzip", 47 CompressionEncoding::Zstd => "zstd", 48 _ => panic!("unexpected encoding {:?}", encoding), 49 }; 50 assert_eq!(res.metadata().get("grpc-encoding").unwrap(), expected); 51 52 let mut stream: Streaming<SomeData> = res.into_inner(); 53 54 stream 55 .next() 56 .await 57 .expect("stream empty") 58 .expect("item was error"); 59 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 60 61 stream 62 .next() 63 .await 64 .expect("stream empty") 65 .expect("item was error"); 66 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 67 } 68 69 util::parametrized_tests! { 70 client_disabled_server_enabled, 71 zstd: CompressionEncoding::Zstd, 72 gzip: CompressionEncoding::Gzip, 73 } 74 75 #[allow(dead_code)] 76 async fn client_disabled_server_enabled(encoding: CompressionEncoding) { 77 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 78 79 let svc = test_server::TestServer::new(Svc::default()).send_compressed(encoding); 80 81 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 82 83 tokio::spawn({ 84 let response_bytes_counter = response_bytes_counter.clone(); 85 async move { 86 Server::builder() 87 .layer( 88 ServiceBuilder::new() 89 .layer(MapResponseBodyLayer::new(move |body| { 90 util::CountBytesBody { 91 inner: body, 92 counter: response_bytes_counter.clone(), 93 } 94 })) 95 .into_inner(), 96 ) 97 .add_service(svc) 98 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 99 .await 100 .unwrap(); 101 } 102 }); 103 104 let mut client = test_client::TestClient::new(mock_io_channel(client).await); 105 106 let res = client.compress_output_server_stream(()).await.unwrap(); 107 108 assert!(res.metadata().get("grpc-encoding").is_none()); 109 110 let mut stream: Streaming<SomeData> = res.into_inner(); 111 112 stream 113 .next() 114 .await 115 .expect("stream empty") 116 .expect("item was error"); 117 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 118 } 119 120 util::parametrized_tests! { 121 client_enabled_server_disabled, 122 zstd: CompressionEncoding::Zstd, 123 gzip: CompressionEncoding::Gzip, 124 } 125 126 #[allow(dead_code)] 127 async fn client_enabled_server_disabled(encoding: CompressionEncoding) { 128 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 129 130 let svc = test_server::TestServer::new(Svc::default()); 131 132 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 133 134 tokio::spawn({ 135 let response_bytes_counter = response_bytes_counter.clone(); 136 async move { 137 Server::builder() 138 .layer( 139 ServiceBuilder::new() 140 .layer(MapResponseBodyLayer::new(move |body| { 141 util::CountBytesBody { 142 inner: body, 143 counter: response_bytes_counter.clone(), 144 } 145 })) 146 .into_inner(), 147 ) 148 .add_service(svc) 149 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 150 .await 151 .unwrap(); 152 } 153 }); 154 155 let mut client = 156 test_client::TestClient::new(mock_io_channel(client).await).accept_compressed(encoding); 157 158 let res = client.compress_output_server_stream(()).await.unwrap(); 159 160 assert!(res.metadata().get("grpc-encoding").is_none()); 161 162 let mut stream: Streaming<SomeData> = res.into_inner(); 163 164 stream 165 .next() 166 .await 167 .expect("stream empty") 168 .expect("item was error"); 169 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 170 } 171