1 use super::*;
2 use tonic::codec::CompressionEncoding;
3 use tonic::Streaming;
4 
5 #[tokio::test(flavor = "multi_thread")]
6 async fn client_enabled_server_enabled() {
7     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
8 
9     let svc =
10         test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip);
11 
12     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
13 
14     tokio::spawn({
15         let response_bytes_counter = response_bytes_counter.clone();
16         async move {
17             Server::builder()
18                 .layer(
19                     ServiceBuilder::new()
20                         .layer(MapResponseBodyLayer::new(move |body| {
21                             util::CountBytesBody {
22                                 inner: body,
23                                 counter: response_bytes_counter.clone(),
24                             }
25                         }))
26                         .into_inner(),
27                 )
28                 .add_service(svc)
29                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
30                 .await
31                 .unwrap();
32         }
33     });
34 
35     let mut client = test_client::TestClient::new(mock_io_channel(client).await)
36         .accept_compressed(CompressionEncoding::Gzip);
37 
38     let res = client.compress_output_server_stream(()).await.unwrap();
39 
40     assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
41 
42     let mut stream: Streaming<SomeData> = res.into_inner();
43 
44     stream
45         .next()
46         .await
47         .expect("stream empty")
48         .expect("item was error");
49     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
50 
51     stream
52         .next()
53         .await
54         .expect("stream empty")
55         .expect("item was error");
56     assert!(response_bytes_counter.load(SeqCst) < UNCOMPRESSED_MIN_BODY_SIZE);
57 }
58 
59 #[tokio::test(flavor = "multi_thread")]
60 async fn client_disabled_server_enabled() {
61     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
62 
63     let svc =
64         test_server::TestServer::new(Svc::default()).send_compressed(CompressionEncoding::Gzip);
65 
66     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
67 
68     tokio::spawn({
69         let response_bytes_counter = response_bytes_counter.clone();
70         async move {
71             Server::builder()
72                 .layer(
73                     ServiceBuilder::new()
74                         .layer(MapResponseBodyLayer::new(move |body| {
75                             util::CountBytesBody {
76                                 inner: body,
77                                 counter: response_bytes_counter.clone(),
78                             }
79                         }))
80                         .into_inner(),
81                 )
82                 .add_service(svc)
83                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
84                 .await
85                 .unwrap();
86         }
87     });
88 
89     let mut client = test_client::TestClient::new(mock_io_channel(client).await);
90 
91     let res = client.compress_output_server_stream(()).await.unwrap();
92 
93     assert!(res.metadata().get("grpc-encoding").is_none());
94 
95     let mut stream: Streaming<SomeData> = res.into_inner();
96 
97     stream
98         .next()
99         .await
100         .expect("stream empty")
101         .expect("item was error");
102     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
103 }
104 
105 #[tokio::test(flavor = "multi_thread")]
106 async fn client_enabled_server_disabled() {
107     let (client, server) = tokio::io::duplex(UNCOMPRESSED_MIN_BODY_SIZE * 10);
108 
109     let svc = test_server::TestServer::new(Svc::default());
110 
111     let response_bytes_counter = Arc::new(AtomicUsize::new(0));
112 
113     tokio::spawn({
114         let response_bytes_counter = response_bytes_counter.clone();
115         async move {
116             Server::builder()
117                 .layer(
118                     ServiceBuilder::new()
119                         .layer(MapResponseBodyLayer::new(move |body| {
120                             util::CountBytesBody {
121                                 inner: body,
122                                 counter: response_bytes_counter.clone(),
123                             }
124                         }))
125                         .into_inner(),
126                 )
127                 .add_service(svc)
128                 .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
129                 .await
130                 .unwrap();
131         }
132     });
133 
134     let mut client = test_client::TestClient::new(mock_io_channel(client).await)
135         .accept_compressed(CompressionEncoding::Gzip);
136 
137     let res = client.compress_output_server_stream(()).await.unwrap();
138 
139     assert!(res.metadata().get("grpc-encoding").is_none());
140 
141     let mut stream: Streaming<SomeData> = res.into_inner();
142 
143     stream
144         .next()
145         .await
146         .expect("stream empty")
147         .expect("item was error");
148     assert!(response_bytes_counter.load(SeqCst) > UNCOMPRESSED_MIN_BODY_SIZE);
149 }
150