1 use super::*;
2 use tonic::Streaming;
3 
4 #[tokio::test(flavor = "multi_thread")]
5 async fn client_enabled_server_enabled() {
6     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
7 
8     let svc = test_server::TestServer::new(Svc::default()).send_gzip();
9 
10     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
11 
12     tokio::spawn({
13         let response_bytes_counter = response_bytes_counter.clone();
14         async move {
15             Server::builder()
16                 .layer(
17                     ServiceBuilder::new()
18                         .layer(MapResponseBodyLayer::new(move |body| {
19                             util::CountBytesBody {
20                                 inner: body,
21                                 counter: response_bytes_counter.clone(),
22                             }
23                         }))
24                         .into_inner(),
25                 )
26                 .add_service(svc)
27                 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(
28                     MockStream(server),
29                 )]))
30                 .await
31                 .unwrap();
32         }
33     });
34 
35     let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip();
36 
37     let res = client.compress_output_server_stream(()).await.unwrap();
38 
39     assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
40 
41     let mut stream: Streaming<SomeData> = res.into_inner();
42 
43     stream
44         .next()
45         .await
46         .expect("stream empty")
47         .expect("item was error");
48     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
49 
50     stream
51         .next()
52         .await
53         .expect("stream empty")
54         .expect("item was error");
55     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
56 }
57 
58 #[tokio::test(flavor = "multi_thread")]
59 async fn client_disabled_server_enabled() {
60     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
61 
62     let svc = test_server::TestServer::new(Svc::default()).send_gzip();
63 
64     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
65 
66     tokio::spawn({
67         let response_bytes_counter = response_bytes_counter.clone();
68         async move {
69             Server::builder()
70                 .layer(
71                     ServiceBuilder::new()
72                         .layer(MapResponseBodyLayer::new(move |body| {
73                             util::CountBytesBody {
74                                 inner: body,
75                                 counter: response_bytes_counter.clone(),
76                             }
77                         }))
78                         .into_inner(),
79                 )
80                 .add_service(svc)
81                 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(
82                     MockStream(server),
83                 )]))
84                 .await
85                 .unwrap();
86         }
87     });
88 
89     let mut client = test_client::TestClient::new(mock_io_channel(client).await);
90 
91     let res = client.compress_output_server_stream(()).await.unwrap();
92 
93     assert!(res.metadata().get("grpc-encoding").is_none());
94 
95     let mut stream: Streaming<SomeData> = res.into_inner();
96 
97     stream
98         .next()
99         .await
100         .expect("stream empty")
101         .expect("item was error");
102     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
103 }
104 
105 #[tokio::test(flavor = "multi_thread")]
106 async fn client_enabled_server_disabled() {
107     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
108 
109     let svc = test_server::TestServer::new(Svc::default());
110 
111     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
112 
113     tokio::spawn({
114         let response_bytes_counter = response_bytes_counter.clone();
115         async move {
116             Server::builder()
117                 .layer(
118                     ServiceBuilder::new()
119                         .layer(MapResponseBodyLayer::new(move |body| {
120                             util::CountBytesBody {
121                                 inner: body,
122                                 counter: response_bytes_counter.clone(),
123                             }
124                         }))
125                         .into_inner(),
126                 )
127                 .add_service(svc)
128                 .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(
129                     MockStream(server),
130                 )]))
131                 .await
132                 .unwrap();
133         }
134     });
135 
136     let mut client = test_client::TestClient::new(mock_io_channel(client).await).accept_gzip();
137 
138     let res = client.compress_output_server_stream(()).await.unwrap();
139 
140     assert!(res.metadata().get("grpc-encoding").is_none());
141 
142     let mut stream: Streaming<SomeData> = res.into_inner();
143 
144     stream
145         .next()
146         .await
147         .expect("stream empty")
148         .expect("item was error");
149     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
150 }
151