1 use super::*; 2 use tonic::Streaming; 3 4 #[tokio::test(flavor = "multi_thread")] 5 async fn client_enabled_server_enabled() { 6 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 7 8 let svc = test_server::TestServer::new(Svc::default()).send_gzip(); 9 10 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 11 12 tokio::spawn({ 13 let response_bytes_counter = response_bytes_counter.clone(); 14 async move { 15 Server::builder() 16 .layer( 17 ServiceBuilder::new() 18 .layer(MapResponseBodyLayer::new(move |body| { 19 util::CountBytesBody { 20 inner: body, 21 counter: response_bytes_counter.clone(), 22 } 23 })) 24 .into_inner(), 25 ) 26 .add_service(svc) 27 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 28 MockStream(server), 29 )])) 30 .await 31 .unwrap(); 32 } 33 }); 34 35 let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); 36 37 let res = client.compress_output_server_stream(()).await.unwrap(); 38 39 assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); 40 41 let mut stream: Streaming<SomeData> = res.into_inner(); 42 43 stream 44 .next() 45 .await 46 .expect("stream empty") 47 .expect("item was error"); 48 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 49 50 stream 51 .next() 52 .await 53 .expect("stream empty") 54 .expect("item was error"); 55 assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE); 56 } 57 58 #[tokio::test(flavor = "multi_thread")] 59 async fn client_disabled_server_enabled() { 60 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 61 62 let svc = test_server::TestServer::new(Svc::default()).send_gzip(); 63 64 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 65 66 tokio::spawn({ 67 let response_bytes_counter = response_bytes_counter.clone(); 68 async move { 69 Server::builder() 70 .layer( 71 ServiceBuilder::new() 72 .layer(MapResponseBodyLayer::new(move |body| { 73 util::CountBytesBody { 74 inner: body, 75 counter: response_bytes_counter.clone(), 76 } 77 })) 78 .into_inner(), 79 ) 80 .add_service(svc) 81 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 82 MockStream(server), 83 )])) 84 .await 85 .unwrap(); 86 } 87 }); 88 89 let mut client = test_client::TestClient::new(mock_io_channel(client).await); 90 91 let res = client.compress_output_server_stream(()).await.unwrap(); 92 93 assert!(res.metadata().get("grpc-encoding").is_none()); 94 95 let mut stream: Streaming<SomeData> = res.into_inner(); 96 97 stream 98 .next() 99 .await 100 .expect("stream empty") 101 .expect("item was error"); 102 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 103 } 104 105 #[tokio::test(flavor = "multi_thread")] 106 async fn client_enabled_server_disabled() { 107 let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10); 108 109 let svc = test_server::TestServer::new(Svc::default()); 110 111 let response_bytes_counter = Arc::new(AtomicUsize::new(0)); 112 113 tokio::spawn({ 114 let response_bytes_counter = response_bytes_counter.clone(); 115 async move { 116 Server::builder() 117 .layer( 118 ServiceBuilder::new() 119 .layer(MapResponseBodyLayer::new(move |body| { 120 util::CountBytesBody { 121 inner: body, 122 counter: response_bytes_counter.clone(), 123 } 124 })) 125 .into_inner(), 126 ) 127 .add_service(svc) 128 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>( 129 MockStream(server), 130 )])) 131 .await 132 .unwrap(); 133 } 134 }); 135 136 let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip(); 137 138 let res = client.compress_output_server_stream(()).await.unwrap(); 139 140 assert!(res.metadata().get("grpc-encoding").is_none()); 141 142 let mut stream: Streaming<SomeData> = res.into_inner(); 143 144 stream 145 .next() 146 .await 147 .expect("stream empty") 148 .expect("item was error"); 149 assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE); 150 } 151