xref: /tonic/interop/src/client.rs (revision da92dbf8)
1 use crate::{
2     pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert,
3     TestAssertion,
4 };
5 use futures_util::{future, stream, StreamExt};
6 use tokio::sync::mpsc;
7 use tonic::transport::Channel;
8 use tonic::{metadata::MetadataValue, Code, Request, Response, Status};
9 
10 pub type TestClient = TestServiceClient<Channel>;
11 pub type UnimplementedClient = UnimplementedServiceClient<Channel>;
12 
13 const LARGE_REQ_SIZE: usize = 271_828;
14 const LARGE_RSP_SIZE: i32 = 314_159;
15 const REQUEST_LENGTHS: &[i32] = &[27182, 8, 1828, 45904];
16 const RESPONSE_LENGTHS: &[i32] = &[31415, 9, 2653, 58979];
17 const TEST_STATUS_MESSAGE: &str = "test status message";
18 const SPECIAL_TEST_STATUS_MESSAGE: &str =
19     "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP ��\t\n";
20 
21 pub async fn empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
22     let result = client.empty_call(Request::new(Empty {})).await;
23 
24     assertions.push(test_assert!(
25         "call must be successful",
26         result.is_ok(),
27         format!("result={:?}", result)
28     ));
29 
30     if let Ok(response) = result {
31         let body = response.into_inner();
32         assertions.push(test_assert!(
33             "body must not be null",
34             body == Empty {},
35             format!("body={:?}", body)
36         ));
37     }
38 }
39 
40 pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
41     use std::mem;
42     let payload = crate::client_payload(LARGE_REQ_SIZE);
43     let req = SimpleRequest {
44         response_type: PayloadType::Compressable as i32,
45         response_size: LARGE_RSP_SIZE,
46         payload: Some(payload),
47         ..Default::default()
48     };
49 
50     let result = client.unary_call(Request::new(req)).await;
51 
52     assertions.push(test_assert!(
53         "call must be successful",
54         result.is_ok(),
55         format!("result={:?}", result)
56     ));
57 
58     if let Ok(response) = result {
59         let body = response.into_inner();
60         let payload_len = body.payload.as_ref().map(|p| p.body.len()).unwrap_or(0);
61 
62         assertions.push(test_assert!(
63             "body must be 314159 bytes",
64             payload_len == LARGE_RSP_SIZE as usize,
65             format!("mem::size_of_val(&body)={:?}", mem::size_of_val(&body))
66         ));
67     }
68 }
69 
70 // pub async fn cachable_unary(client: &mut Client, assertions: &mut Vec<TestAssertion>) {
71 //     let payload = Payload {
72 //         r#type: PayloadType::Compressable as i32,
73 //         body: format!("{:?}", std::time::Instant::now()).into_bytes(),
74 //     };
75 //     let req = SimpleRequest {
76 //         response_type: PayloadType::Compressable as i32,
77 //         payload: Some(payload),
78 //         ..Default::default()
79 //     };
80 
81 //     client.
82 // }
83 
84 pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
85     let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
86         payload: Some(crate::client_payload(*len as usize)),
87         ..Default::default()
88     });
89 
90     let stream = stream::iter(requests);
91 
92     let result = client.streaming_input_call(Request::new(stream)).await;
93 
94     assertions.push(test_assert!(
95         "call must be successful",
96         result.is_ok(),
97         format!("result={:?}", result)
98     ));
99 
100     if let Ok(response) = result {
101         let body = response.into_inner();
102 
103         assertions.push(test_assert!(
104             "aggregated payload size must be 74922 bytes",
105             body.aggregated_payload_size == 74922,
106             format!("aggregated_payload_size={:?}", body.aggregated_payload_size)
107         ));
108     }
109 }
110 
111 pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
112     let req = StreamingOutputCallRequest {
113         response_parameters: RESPONSE_LENGTHS
114             .iter()
115             .map(|len| ResponseParameters::with_size(*len))
116             .collect(),
117         ..Default::default()
118     };
119     let req = Request::new(req);
120 
121     let result = client.streaming_output_call(req).await;
122 
123     assertions.push(test_assert!(
124         "call must be successful",
125         result.is_ok(),
126         format!("result={:?}", result)
127     ));
128 
129     if let Ok(response) = result {
130         let responses = response
131             .into_inner()
132             .filter_map(|m| future::ready(m.ok()))
133             .collect::<Vec<_>>()
134             .await;
135         let actual_response_lengths = crate::response_lengths(&responses);
136         let asserts = vec![
137             test_assert!(
138                 "there should be four responses",
139                 responses.len() == 4,
140                 format!("responses.len()={:?}", responses.len())
141             ),
142             test_assert!(
143                 "the response payload sizes should match input",
144                 RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
145                 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
146             ),
147         ];
148 
149         assertions.extend(asserts);
150     }
151 }
152 
153 pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
154     let (tx, rx) = mpsc::unbounded_channel();
155     tx.send(make_ping_pong_request(0)).unwrap();
156 
157     let result = client.full_duplex_call(Request::new(rx)).await;
158 
159     assertions.push(test_assert!(
160         "call must be successful",
161         result.is_ok(),
162         format!("result={:?}", result)
163     ));
164 
165     if let Ok(mut response) = result.map(Response::into_inner) {
166         let mut responses = Vec::new();
167 
168         loop {
169             match response.next().await {
170                 Some(result) => {
171                     responses.push(result.unwrap());
172                     if responses.len() == REQUEST_LENGTHS.len() {
173                         drop(tx);
174                         break;
175                     } else {
176                         tx.send(make_ping_pong_request(responses.len())).unwrap();
177                     }
178                 }
179                 None => {
180                     assertions.push(TestAssertion::Failed {
181                         description:
182                             "server should keep the stream open until the client closes it",
183                         expression: "Stream terminated unexpectedly early",
184                         why: None,
185                     });
186                     break;
187                 }
188             }
189         }
190 
191         let actual_response_lengths = crate::response_lengths(&responses);
192         assertions.push(test_assert!(
193             "there should be four responses",
194             responses.len() == RESPONSE_LENGTHS.len(),
195             format!("{:?}={:?}", responses.len(), RESPONSE_LENGTHS.len())
196         ));
197         assertions.push(test_assert!(
198             "the response payload sizes should match input",
199             RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
200             format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
201         ));
202     }
203 }
204 
205 pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
206     let stream = stream::iter(Vec::new());
207     let result = client.full_duplex_call(Request::new(stream)).await;
208 
209     assertions.push(test_assert!(
210         "call must be successful",
211         result.is_ok(),
212         format!("result={:?}", result)
213     ));
214 
215     if let Ok(response) = result.map(Response::into_inner) {
216         let responses = response.collect::<Vec<_>>().await;
217 
218         assertions.push(test_assert!(
219             "there should be no responses",
220             responses.is_empty(),
221             format!("responses.len()={:?}", responses.len())
222         ));
223     }
224 }
225 
226 pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
227     fn validate_response<T>(result: Result<T, Status>, assertions: &mut Vec<TestAssertion>)
228     where
229         T: std::fmt::Debug,
230     {
231         assertions.push(test_assert!(
232             "call must fail with unknown status code",
233             match &result {
234                 Err(status) => status.code() == Code::Unknown,
235                 _ => false,
236             },
237             format!("result={:?}", result)
238         ));
239 
240         assertions.push(test_assert!(
241             "call must respsond with expected status message",
242             match &result {
243                 Err(status) => status.message() == TEST_STATUS_MESSAGE,
244                 _ => false,
245             },
246             format!("result={:?}", result)
247         ));
248     }
249 
250     let simple_req = SimpleRequest {
251         response_status: Some(EchoStatus {
252             code: 2,
253             message: TEST_STATUS_MESSAGE.to_string(),
254             ..Default::default()
255         }),
256         ..Default::default()
257     };
258 
259     let duplex_req = StreamingOutputCallRequest {
260         response_status: Some(EchoStatus {
261             code: 2,
262             message: TEST_STATUS_MESSAGE.to_string(),
263             ..Default::default()
264         }),
265         ..Default::default()
266     };
267 
268     let result = client.unary_call(Request::new(simple_req)).await;
269     validate_response(result, assertions);
270 
271     let stream = stream::iter(vec![duplex_req]);
272     let result = match client.full_duplex_call(Request::new(stream)).await {
273         Ok(response) => {
274             let stream = response.into_inner();
275             let responses = stream.collect::<Vec<_>>().await;
276             Ok(responses)
277         }
278         Err(e) => Err(e),
279     };
280 
281     validate_response(result, assertions);
282 }
283 
284 pub async fn special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
285     let req = SimpleRequest {
286         response_status: Some(EchoStatus {
287             code: 2,
288             message: SPECIAL_TEST_STATUS_MESSAGE.to_string(),
289             ..Default::default()
290         }),
291         ..Default::default()
292     };
293 
294     let result = client.unary_call(Request::new(req)).await;
295 
296     assertions.push(test_assert!(
297         "call must fail with unknown status code",
298         match &result {
299             Err(status) => status.code() == Code::Unknown,
300             _ => false,
301         },
302         format!("result={:?}", result)
303     ));
304 
305     assertions.push(test_assert!(
306         "call must respsond with expected status message",
307         match &result {
308             Err(status) => status.message() == SPECIAL_TEST_STATUS_MESSAGE,
309             _ => false,
310         },
311         format!("result={:?}", result)
312     ));
313 }
314 
315 pub async fn unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
316     let result = client.unimplemented_call(Request::new(Empty {})).await;
317     assertions.push(test_assert!(
318         "call must fail with unimplemented status code",
319         match &result {
320             Err(status) => status.code() == Code::Unimplemented,
321             _ => false,
322         },
323         format!("result={:?}", result)
324     ));
325 }
326 
327 pub async fn unimplemented_service(
328     client: &mut UnimplementedClient,
329     assertions: &mut Vec<TestAssertion>,
330 ) {
331     let result = client.unimplemented_call(Request::new(Empty {})).await;
332     assertions.push(test_assert!(
333         "call must fail with unimplemented status code",
334         match &result {
335             Err(status) => status.code() == Code::Unimplemented,
336             _ => false,
337         },
338         format!("result={:?}", result)
339     ));
340 }
341 
342 pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
343     let key1 = "x-grpc-test-echo-initial";
344     let value1 = MetadataValue::from_str("test_initial_metadata_value").unwrap();
345     let key2 = "x-grpc-test-echo-trailing-bin";
346     let value2 = MetadataValue::from_bytes(&[0xab, 0xab, 0xab]);
347 
348     let req = SimpleRequest {
349         response_type: PayloadType::Compressable as i32,
350         response_size: LARGE_RSP_SIZE,
351         payload: Some(crate::client_payload(LARGE_REQ_SIZE)),
352         ..Default::default()
353     };
354     let mut req_unary = Request::new(req);
355     req_unary.metadata_mut().insert(key1, value1.clone());
356     req_unary.metadata_mut().insert_bin(key2, value2.clone());
357 
358     let stream = stream::iter(vec![make_ping_pong_request(0)]);
359     let mut req_stream = Request::new(stream);
360     req_stream.metadata_mut().insert(key1, value1.clone());
361     req_stream.metadata_mut().insert_bin(key2, value2.clone());
362 
363     let response = client
364         .unary_call(req_unary)
365         .await
366         .expect("call should pass.");
367 
368     assertions.push(test_assert!(
369         "metadata string must match in unary",
370         response.metadata().get(key1) == Some(&value1),
371         format!("result={:?}", response.metadata().get(key1))
372     ));
373     assertions.push(test_assert!(
374         "metadata bin must match in unary",
375         response.metadata().get_bin(key2) == Some(&value2),
376         format!("result={:?}", response.metadata().get_bin(key1))
377     ));
378 
379     let response = client
380         .full_duplex_call(req_stream)
381         .await
382         .expect("call should pass.");
383 
384     assertions.push(test_assert!(
385         "metadata string must match in unary",
386         response.metadata().get(key1) == Some(&value1),
387         format!("result={:?}", response.metadata().get(key1))
388     ));
389 
390     let mut stream = response.into_inner();
391 
392     let trailers = stream.trailers().await.unwrap().unwrap();
393 
394     assertions.push(test_assert!(
395         "metadata bin must match in unary",
396         trailers.get_bin(key2) == Some(&value2),
397         format!("result={:?}", trailers.get_bin(key1))
398     ));
399 }
400 
401 fn make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest {
402     let req_len = REQUEST_LENGTHS[idx];
403     let resp_len = RESPONSE_LENGTHS[idx];
404     StreamingOutputCallRequest {
405         response_parameters: vec![ResponseParameters::with_size(resp_len)],
406         payload: Some(crate::client_payload(req_len as usize)),
407         ..Default::default()
408     }
409 }
410