xref: /tonic/interop/src/client.rs (revision f089e7a0)
1 use crate::{
2     pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert,
3     TestAssertion,
4 };
5 use tokio::sync::mpsc;
6 use tokio_stream::StreamExt;
7 use tonic::transport::Channel;
8 use tonic::{metadata::MetadataValue, Code, Request, Response, Status};
9 
10 pub type TestClient = TestServiceClient<Channel>;
11 pub type UnimplementedClient = UnimplementedServiceClient<Channel>;
12 
13 const LARGE_REQ_SIZE: usize = 271_828;
14 const LARGE_RSP_SIZE: i32 = 314_159;
15 const REQUEST_LENGTHS: &[i32] = &[27182, 8, 1828, 45904];
16 const RESPONSE_LENGTHS: &[i32] = &[31415, 9, 2653, 58979];
17 const TEST_STATUS_MESSAGE: &str = "test status message";
18 const SPECIAL_TEST_STATUS_MESSAGE: &str =
19     "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP ��\t\n";
20 
empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)21 pub async fn empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
22     let result = client.empty_call(Request::new(Empty {})).await;
23 
24     assertions.push(test_assert!(
25         "call must be successful",
26         result.is_ok(),
27         format!("result={:?}", result)
28     ));
29 
30     if let Ok(response) = result {
31         let body = response.into_inner();
32         assertions.push(test_assert!(
33             "body must not be null",
34             body == Empty {},
35             format!("body={:?}", body)
36         ));
37     }
38 }
39 
large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)40 pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
41     use std::mem;
42     let payload = crate::client_payload(LARGE_REQ_SIZE);
43     let req = SimpleRequest {
44         response_type: PayloadType::Compressable as i32,
45         response_size: LARGE_RSP_SIZE,
46         payload: Some(payload),
47         ..Default::default()
48     };
49 
50     let result = client.unary_call(Request::new(req)).await;
51 
52     assertions.push(test_assert!(
53         "call must be successful",
54         result.is_ok(),
55         format!("result={:?}", result)
56     ));
57 
58     if let Ok(response) = result {
59         let body = response.into_inner();
60         let payload_len = body.payload.as_ref().map(|p| p.body.len()).unwrap_or(0);
61 
62         assertions.push(test_assert!(
63             "body must be 314159 bytes",
64             payload_len == LARGE_RSP_SIZE as usize,
65             format!("mem::size_of_val(&body)={:?}", mem::size_of_val(&body))
66         ));
67     }
68 }
69 
70 // pub async fn cachable_unary(client: &mut Client, assertions: &mut Vec<TestAssertion>) {
71 //     let payload = Payload {
72 //         r#type: PayloadType::Compressable as i32,
73 //         body: format!("{:?}", std::time::Instant::now()).into_bytes(),
74 //     };
75 //     let req = SimpleRequest {
76 //         response_type: PayloadType::Compressable as i32,
77 //         payload: Some(payload),
78 //         ..Default::default()
79 //     };
80 
81 //     client.
82 // }
83 
client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)84 pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
85     let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
86         payload: Some(crate::client_payload(*len as usize)),
87         ..Default::default()
88     });
89 
90     let stream = tokio_stream::iter(requests);
91 
92     let result = client.streaming_input_call(Request::new(stream)).await;
93 
94     assertions.push(test_assert!(
95         "call must be successful",
96         result.is_ok(),
97         format!("result={:?}", result)
98     ));
99 
100     if let Ok(response) = result {
101         let body = response.into_inner();
102 
103         assertions.push(test_assert!(
104             "aggregated payload size must be 74922 bytes",
105             body.aggregated_payload_size == 74922,
106             format!("aggregated_payload_size={:?}", body.aggregated_payload_size)
107         ));
108     }
109 }
110 
server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)111 pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
112     let req = StreamingOutputCallRequest {
113         response_parameters: RESPONSE_LENGTHS
114             .iter()
115             .map(|len| ResponseParameters::with_size(*len))
116             .collect(),
117         ..Default::default()
118     };
119     let req = Request::new(req);
120 
121     let result = client.streaming_output_call(req).await;
122 
123     assertions.push(test_assert!(
124         "call must be successful",
125         result.is_ok(),
126         format!("result={:?}", result)
127     ));
128 
129     if let Ok(response) = result {
130         let responses = response
131             .into_inner()
132             .filter_map(|m| m.ok())
133             .collect::<Vec<_>>()
134             .await;
135         let actual_response_lengths = crate::response_lengths(&responses);
136         let asserts = vec![
137             test_assert!(
138                 "there should be four responses",
139                 responses.len() == 4,
140                 format!("responses.len()={:?}", responses.len())
141             ),
142             test_assert!(
143                 "the response payload sizes should match input",
144                 RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
145                 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
146             ),
147         ];
148 
149         assertions.extend(asserts);
150     }
151 }
152 
ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)153 pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
154     let (tx, rx) = mpsc::unbounded_channel();
155     tx.send(make_ping_pong_request(0)).unwrap();
156 
157     let result = client
158         .full_duplex_call(Request::new(
159             tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
160         ))
161         .await;
162 
163     assertions.push(test_assert!(
164         "call must be successful",
165         result.is_ok(),
166         format!("result={:?}", result)
167     ));
168 
169     if let Ok(mut response) = result.map(Response::into_inner) {
170         let mut responses = Vec::new();
171 
172         loop {
173             match response.next().await {
174                 Some(result) => {
175                     responses.push(result.unwrap());
176                     if responses.len() == REQUEST_LENGTHS.len() {
177                         drop(tx);
178                         break;
179                     } else {
180                         tx.send(make_ping_pong_request(responses.len())).unwrap();
181                     }
182                 }
183                 None => {
184                     assertions.push(TestAssertion::Failed {
185                         description:
186                             "server should keep the stream open until the client closes it",
187                         expression: "Stream terminated unexpectedly early",
188                         why: None,
189                     });
190                     break;
191                 }
192             }
193         }
194 
195         let actual_response_lengths = crate::response_lengths(&responses);
196         assertions.push(test_assert!(
197             "there should be four responses",
198             responses.len() == RESPONSE_LENGTHS.len(),
199             format!("{:?}={:?}", responses.len(), RESPONSE_LENGTHS.len())
200         ));
201         assertions.push(test_assert!(
202             "the response payload sizes should match input",
203             RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
204             format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
205         ));
206     }
207 }
208 
empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)209 pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
210     let stream = tokio_stream::empty();
211     let result = client.full_duplex_call(Request::new(stream)).await;
212 
213     assertions.push(test_assert!(
214         "call must be successful",
215         result.is_ok(),
216         format!("result={:?}", result)
217     ));
218 
219     if let Ok(response) = result.map(Response::into_inner) {
220         let responses = response.collect::<Vec<_>>().await;
221 
222         assertions.push(test_assert!(
223             "there should be no responses",
224             responses.is_empty(),
225             format!("responses.len()={:?}", responses.len())
226         ));
227     }
228 }
229 
status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)230 pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
231     fn validate_response<T>(result: Result<T, Status>, assertions: &mut Vec<TestAssertion>)
232     where
233         T: std::fmt::Debug,
234     {
235         assertions.push(test_assert!(
236             "call must fail with unknown status code",
237             match &result {
238                 Err(status) => status.code() == Code::Unknown,
239                 _ => false,
240             },
241             format!("result={:?}", result)
242         ));
243 
244         assertions.push(test_assert!(
245             "call must respsond with expected status message",
246             match &result {
247                 Err(status) => status.message() == TEST_STATUS_MESSAGE,
248                 _ => false,
249             },
250             format!("result={:?}", result)
251         ));
252     }
253 
254     let simple_req = SimpleRequest {
255         response_status: Some(EchoStatus {
256             code: 2,
257             message: TEST_STATUS_MESSAGE.to_string(),
258         }),
259         ..Default::default()
260     };
261 
262     let duplex_req = StreamingOutputCallRequest {
263         response_status: Some(EchoStatus {
264             code: 2,
265             message: TEST_STATUS_MESSAGE.to_string(),
266         }),
267         ..Default::default()
268     };
269 
270     let result = client.unary_call(Request::new(simple_req)).await;
271     validate_response(result, assertions);
272 
273     let stream = tokio_stream::once(duplex_req);
274     let result = match client.full_duplex_call(Request::new(stream)).await {
275         Ok(response) => {
276             let stream = response.into_inner();
277             let responses = stream.collect::<Vec<_>>().await;
278             Ok(responses)
279         }
280         Err(e) => Err(e),
281     };
282 
283     validate_response(result, assertions);
284 }
285 
special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)286 pub async fn special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
287     let req = SimpleRequest {
288         response_status: Some(EchoStatus {
289             code: 2,
290             message: SPECIAL_TEST_STATUS_MESSAGE.to_string(),
291         }),
292         ..Default::default()
293     };
294 
295     let result = client.unary_call(Request::new(req)).await;
296 
297     assertions.push(test_assert!(
298         "call must fail with unknown status code",
299         match &result {
300             Err(status) => status.code() == Code::Unknown,
301             _ => false,
302         },
303         format!("result={:?}", result)
304     ));
305 
306     assertions.push(test_assert!(
307         "call must respsond with expected status message",
308         match &result {
309             Err(status) => status.message() == SPECIAL_TEST_STATUS_MESSAGE,
310             _ => false,
311         },
312         format!("result={:?}", result)
313     ));
314 }
315 
unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)316 pub async fn unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
317     let result = client.unimplemented_call(Request::new(Empty {})).await;
318     assertions.push(test_assert!(
319         "call must fail with unimplemented status code",
320         match &result {
321             Err(status) => status.code() == Code::Unimplemented,
322             _ => false,
323         },
324         format!("result={:?}", result)
325     ));
326 }
327 
unimplemented_service( client: &mut UnimplementedClient, assertions: &mut Vec<TestAssertion>, )328 pub async fn unimplemented_service(
329     client: &mut UnimplementedClient,
330     assertions: &mut Vec<TestAssertion>,
331 ) {
332     let result = client.unimplemented_call(Request::new(Empty {})).await;
333     assertions.push(test_assert!(
334         "call must fail with unimplemented status code",
335         match &result {
336             Err(status) => status.code() == Code::Unimplemented,
337             _ => false,
338         },
339         format!("result={:?}", result)
340     ));
341 }
342 
custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)343 pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
344     let key1 = "x-grpc-test-echo-initial";
345     let value1: MetadataValue<_> = "test_initial_metadata_value".parse().unwrap();
346     let key2 = "x-grpc-test-echo-trailing-bin";
347     let value2 = MetadataValue::from_bytes(&[0xab, 0xab, 0xab]);
348 
349     let req = SimpleRequest {
350         response_type: PayloadType::Compressable as i32,
351         response_size: LARGE_RSP_SIZE,
352         payload: Some(crate::client_payload(LARGE_REQ_SIZE)),
353         ..Default::default()
354     };
355     let mut req_unary = Request::new(req);
356     req_unary.metadata_mut().insert(key1, value1.clone());
357     req_unary.metadata_mut().insert_bin(key2, value2.clone());
358 
359     let stream = tokio_stream::once(make_ping_pong_request(0));
360     let mut req_stream = Request::new(stream);
361     req_stream.metadata_mut().insert(key1, value1.clone());
362     req_stream.metadata_mut().insert_bin(key2, value2.clone());
363 
364     let response = client
365         .unary_call(req_unary)
366         .await
367         .expect("call should pass.");
368 
369     assertions.push(test_assert!(
370         "metadata string must match in unary",
371         response.metadata().get(key1) == Some(&value1),
372         format!("result={:?}", response.metadata().get(key1))
373     ));
374     assertions.push(test_assert!(
375         "metadata bin must match in unary",
376         response.metadata().get_bin(key2) == Some(&value2),
377         format!("result={:?}", response.metadata().get_bin(key1))
378     ));
379 
380     let response = client
381         .full_duplex_call(req_stream)
382         .await
383         .expect("call should pass.");
384 
385     assertions.push(test_assert!(
386         "metadata string must match in unary",
387         response.metadata().get(key1) == Some(&value1),
388         format!("result={:?}", response.metadata().get(key1))
389     ));
390 
391     let mut stream = response.into_inner();
392 
393     let trailers = stream.trailers().await.unwrap().unwrap();
394 
395     assertions.push(test_assert!(
396         "metadata bin must match in unary",
397         trailers.get_bin(key2) == Some(&value2),
398         format!("result={:?}", trailers.get_bin(key1))
399     ));
400 }
401 
make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest402 fn make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest {
403     let req_len = REQUEST_LENGTHS[idx];
404     let resp_len = RESPONSE_LENGTHS[idx];
405     StreamingOutputCallRequest {
406         response_parameters: vec![ResponseParameters::with_size(resp_len)],
407         payload: Some(crate::client_payload(req_len as usize)),
408         ..Default::default()
409     }
410 }
411