1 use super::*; 2 use tonic::codec::CompressionEncoding; 3 use tonic::Streaming; 4 5 #[tokio::test(flavor = "multi_thread")] 6 async fn client_enabled_server_enabled() { 7 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 8 9 let svc = 10 test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); 11 12 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 13 14 tokio::spawn({ 15 let response_bytes_counter = response_bytes_counter.clone(); 16 async move { 17 Server::builder() 18 .layer( 19 ServiceBuilder::new() 20 .layer(MapResponseBodyLayer::new(move |body| { 21 util::CountBytesBody { 22 inner: body, 23 counter: response_bytes_counter.clone(), 24 } 25 })) 26 .into_inner(), 27 ) 28 .add_service(svc) 29 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 30 .await 31 .unwrap(); 32 } 33 }); 34 35 let mut client = test_client::TestClient::new(mock_io_channel(client).await) 36 .accept_compressed(CompressionEncoding::Gzip); 37 38 let res = client.compress_output_server_stream(()).await.unwrap(); 39 40 assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); 41 42 let mut stream: Streaming<SomeData> = res.into_inner(); 43 44 stream 45 .next() 46 .await 47 .expect("stream empty") 48 .expect("item was error"); 49 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 50 51 stream 52 .next() 53 .await 54 .expect("stream empty") 55 .expect("item was error"); 56 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 57 } 58 59 #[tokio::test(flavor = "multi_thread")] 60 async fn client_disabled_server_enabled() { 61 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 62 63 let svc = 64 test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip); 65 66 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 67 68 tokio::spawn({ 69 let response_bytes_counter = response_bytes_counter.clone(); 70 async move { 71 Server::builder() 72 .layer( 73 ServiceBuilder::new() 74 .layer(MapResponseBodyLayer::new(move |body| { 75 util::CountBytesBody { 76 inner: body, 77 counter: response_bytes_counter.clone(), 78 } 79 })) 80 .into_inner(), 81 ) 82 .add_service(svc) 83 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 84 .await 85 .unwrap(); 86 } 87 }); 88 89 let mut client = test_client::TestClient::new(mock_io_channel(client).await); 90 91 let res = client.compress_output_server_stream(()).await.unwrap(); 92 93 assert!(res.metadata().get("grpc-encoding").is_none()); 94 95 let mut stream: Streaming<SomeData> = res.into_inner(); 96 97 stream 98 .next() 99 .await 100 .expect("stream empty") 101 .expect("item was error"); 102 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 103 } 104 105 #[tokio::test(flavor = "multi_thread")] 106 async fn client_enabled_server_disabled() { 107 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 108 109 let svc = test_server::TestServer::new(Svc::default()); 110 111 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 112 113 tokio::spawn({ 114 let response_bytes_counter = response_bytes_counter.clone(); 115 async move { 116 Server::builder() 117 .layer( 118 ServiceBuilder::new() 119 .layer(MapResponseBodyLayer::new(move |body| { 120 util::CountBytesBody { 121 inner: body, 122 counter: response_bytes_counter.clone(), 123 } 124 })) 125 .into_inner(), 126 ) 127 .add_service(svc) 128 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) 129 .await 130 .unwrap(); 131 } 132 }); 133 134 let mut client = test_client::TestClient::new(mock_io_channel(client).await) 135 .accept_compressed(CompressionEncoding::Gzip); 136 137 let res = client.compress_output_server_stream(()).await.unwrap(); 138 139 assert!(res.metadata().get("grpc-encoding").is_none()); 140 141 let mut stream: Streaming<SomeData> = res.into_inner(); 142 143 stream 144 .next() 145 .await 146 .expect("stream empty") 147 .expect("item was error"); 148 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 149 } 150