1 use crate::{
2 pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert,
3 TestAssertion,
4 };
5 use tokio::sync::mpsc;
6 use tokio_stream::StreamExt;
7 use tonic::transport::Channel;
8 use tonic::{metadata::MetadataValue, Code, Request, Response, Status};
9
10 pub type TestClient = TestServiceClient<Channel>;
11 pub type UnimplementedClient = UnimplementedServiceClient<Channel>;
12
13 const LARGE_REQ_SIZE: usize = 271_828;
14 const LARGE_RSP_SIZE: i32 = 314_159;
15 const REQUEST_LENGTHS: &[i32] = &[27182, 8, 1828, 45904];
16 const RESPONSE_LENGTHS: &[i32] = &[31415, 9, 2653, 58979];
17 const TEST_STATUS_MESSAGE: &str = "test status message";
18 const SPECIAL_TEST_STATUS_MESSAGE: &str =
19 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n";
20
empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)21 pub async fn empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
22 let result = client.empty_call(Request::new(Empty {})).await;
23
24 assertions.push(test_assert!(
25 "call must be successful",
26 result.is_ok(),
27 format!("result={:?}", result)
28 ));
29
30 if let Ok(response) = result {
31 let body = response.into_inner();
32 assertions.push(test_assert!(
33 "body must not be null",
34 body == Empty {},
35 format!("body={:?}", body)
36 ));
37 }
38 }
39
large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)40 pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
41 use std::mem;
42 let payload = crate::client_payload(LARGE_REQ_SIZE);
43 let req = SimpleRequest {
44 response_type: PayloadType::Compressable as i32,
45 response_size: LARGE_RSP_SIZE,
46 payload: Some(payload),
47 ..Default::default()
48 };
49
50 let result = client.unary_call(Request::new(req)).await;
51
52 assertions.push(test_assert!(
53 "call must be successful",
54 result.is_ok(),
55 format!("result={:?}", result)
56 ));
57
58 if let Ok(response) = result {
59 let body = response.into_inner();
60 let payload_len = body.payload.as_ref().map(|p| p.body.len()).unwrap_or(0);
61
62 assertions.push(test_assert!(
63 "body must be 314159 bytes",
64 payload_len == LARGE_RSP_SIZE as usize,
65 format!("mem::size_of_val(&body)={:?}", mem::size_of_val(&body))
66 ));
67 }
68 }
69
70 // pub async fn cachable_unary(client: &mut Client, assertions: &mut Vec<TestAssertion>) {
71 // let payload = Payload {
72 // r#type: PayloadType::Compressable as i32,
73 // body: format!("{:?}", std::time::Instant::now()).into_bytes(),
74 // };
75 // let req = SimpleRequest {
76 // response_type: PayloadType::Compressable as i32,
77 // payload: Some(payload),
78 // ..Default::default()
79 // };
80
81 // client.
82 // }
83
client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)84 pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
85 let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest {
86 payload: Some(crate::client_payload(*len as usize)),
87 ..Default::default()
88 });
89
90 let stream = tokio_stream::iter(requests);
91
92 let result = client.streaming_input_call(Request::new(stream)).await;
93
94 assertions.push(test_assert!(
95 "call must be successful",
96 result.is_ok(),
97 format!("result={:?}", result)
98 ));
99
100 if let Ok(response) = result {
101 let body = response.into_inner();
102
103 assertions.push(test_assert!(
104 "aggregated payload size must be 74922 bytes",
105 body.aggregated_payload_size == 74922,
106 format!("aggregated_payload_size={:?}", body.aggregated_payload_size)
107 ));
108 }
109 }
110
server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)111 pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
112 let req = StreamingOutputCallRequest {
113 response_parameters: RESPONSE_LENGTHS
114 .iter()
115 .map(|len| ResponseParameters::with_size(*len))
116 .collect(),
117 ..Default::default()
118 };
119 let req = Request::new(req);
120
121 let result = client.streaming_output_call(req).await;
122
123 assertions.push(test_assert!(
124 "call must be successful",
125 result.is_ok(),
126 format!("result={:?}", result)
127 ));
128
129 if let Ok(response) = result {
130 let responses = response
131 .into_inner()
132 .filter_map(|m| m.ok())
133 .collect::<Vec<_>>()
134 .await;
135 let actual_response_lengths = crate::response_lengths(&responses);
136 let asserts = vec![
137 test_assert!(
138 "there should be four responses",
139 responses.len() == 4,
140 format!("responses.len()={:?}", responses.len())
141 ),
142 test_assert!(
143 "the response payload sizes should match input",
144 RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
145 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
146 ),
147 ];
148
149 assertions.extend(asserts);
150 }
151 }
152
ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)153 pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
154 let (tx, rx) = mpsc::unbounded_channel();
155 tx.send(make_ping_pong_request(0)).unwrap();
156
157 let result = client
158 .full_duplex_call(Request::new(
159 tokio_stream::wrappers::UnboundedReceiverStream::new(rx),
160 ))
161 .await;
162
163 assertions.push(test_assert!(
164 "call must be successful",
165 result.is_ok(),
166 format!("result={:?}", result)
167 ));
168
169 if let Ok(mut response) = result.map(Response::into_inner) {
170 let mut responses = Vec::new();
171
172 loop {
173 match response.next().await {
174 Some(result) => {
175 responses.push(result.unwrap());
176 if responses.len() == REQUEST_LENGTHS.len() {
177 drop(tx);
178 break;
179 } else {
180 tx.send(make_ping_pong_request(responses.len())).unwrap();
181 }
182 }
183 None => {
184 assertions.push(TestAssertion::Failed {
185 description:
186 "server should keep the stream open until the client closes it",
187 expression: "Stream terminated unexpectedly early",
188 why: None,
189 });
190 break;
191 }
192 }
193 }
194
195 let actual_response_lengths = crate::response_lengths(&responses);
196 assertions.push(test_assert!(
197 "there should be four responses",
198 responses.len() == RESPONSE_LENGTHS.len(),
199 format!("{:?}={:?}", responses.len(), RESPONSE_LENGTHS.len())
200 ));
201 assertions.push(test_assert!(
202 "the response payload sizes should match input",
203 RESPONSE_LENGTHS == actual_response_lengths.as_slice(),
204 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths)
205 ));
206 }
207 }
208
empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)209 pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
210 let stream = tokio_stream::empty();
211 let result = client.full_duplex_call(Request::new(stream)).await;
212
213 assertions.push(test_assert!(
214 "call must be successful",
215 result.is_ok(),
216 format!("result={:?}", result)
217 ));
218
219 if let Ok(response) = result.map(Response::into_inner) {
220 let responses = response.collect::<Vec<_>>().await;
221
222 assertions.push(test_assert!(
223 "there should be no responses",
224 responses.is_empty(),
225 format!("responses.len()={:?}", responses.len())
226 ));
227 }
228 }
229
status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)230 pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
231 fn validate_response<T>(result: Result<T, Status>, assertions: &mut Vec<TestAssertion>)
232 where
233 T: std::fmt::Debug,
234 {
235 assertions.push(test_assert!(
236 "call must fail with unknown status code",
237 match &result {
238 Err(status) => status.code() == Code::Unknown,
239 _ => false,
240 },
241 format!("result={:?}", result)
242 ));
243
244 assertions.push(test_assert!(
245 "call must respsond with expected status message",
246 match &result {
247 Err(status) => status.message() == TEST_STATUS_MESSAGE,
248 _ => false,
249 },
250 format!("result={:?}", result)
251 ));
252 }
253
254 let simple_req = SimpleRequest {
255 response_status: Some(EchoStatus {
256 code: 2,
257 message: TEST_STATUS_MESSAGE.to_string(),
258 }),
259 ..Default::default()
260 };
261
262 let duplex_req = StreamingOutputCallRequest {
263 response_status: Some(EchoStatus {
264 code: 2,
265 message: TEST_STATUS_MESSAGE.to_string(),
266 }),
267 ..Default::default()
268 };
269
270 let result = client.unary_call(Request::new(simple_req)).await;
271 validate_response(result, assertions);
272
273 let stream = tokio_stream::once(duplex_req);
274 let result = match client.full_duplex_call(Request::new(stream)).await {
275 Ok(response) => {
276 let stream = response.into_inner();
277 let responses = stream.collect::<Vec<_>>().await;
278 Ok(responses)
279 }
280 Err(e) => Err(e),
281 };
282
283 validate_response(result, assertions);
284 }
285
special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)286 pub async fn special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
287 let req = SimpleRequest {
288 response_status: Some(EchoStatus {
289 code: 2,
290 message: SPECIAL_TEST_STATUS_MESSAGE.to_string(),
291 }),
292 ..Default::default()
293 };
294
295 let result = client.unary_call(Request::new(req)).await;
296
297 assertions.push(test_assert!(
298 "call must fail with unknown status code",
299 match &result {
300 Err(status) => status.code() == Code::Unknown,
301 _ => false,
302 },
303 format!("result={:?}", result)
304 ));
305
306 assertions.push(test_assert!(
307 "call must respsond with expected status message",
308 match &result {
309 Err(status) => status.message() == SPECIAL_TEST_STATUS_MESSAGE,
310 _ => false,
311 },
312 format!("result={:?}", result)
313 ));
314 }
315
unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)316 pub async fn unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
317 let result = client.unimplemented_call(Request::new(Empty {})).await;
318 assertions.push(test_assert!(
319 "call must fail with unimplemented status code",
320 match &result {
321 Err(status) => status.code() == Code::Unimplemented,
322 _ => false,
323 },
324 format!("result={:?}", result)
325 ));
326 }
327
unimplemented_service( client: &mut UnimplementedClient, assertions: &mut Vec<TestAssertion>, )328 pub async fn unimplemented_service(
329 client: &mut UnimplementedClient,
330 assertions: &mut Vec<TestAssertion>,
331 ) {
332 let result = client.unimplemented_call(Request::new(Empty {})).await;
333 assertions.push(test_assert!(
334 "call must fail with unimplemented status code",
335 match &result {
336 Err(status) => status.code() == Code::Unimplemented,
337 _ => false,
338 },
339 format!("result={:?}", result)
340 ));
341 }
342
custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>)343 pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
344 let key1 = "x-grpc-test-echo-initial";
345 let value1: MetadataValue<_> = "test_initial_metadata_value".parse().unwrap();
346 let key2 = "x-grpc-test-echo-trailing-bin";
347 let value2 = MetadataValue::from_bytes(&[0xab, 0xab, 0xab]);
348
349 let req = SimpleRequest {
350 response_type: PayloadType::Compressable as i32,
351 response_size: LARGE_RSP_SIZE,
352 payload: Some(crate::client_payload(LARGE_REQ_SIZE)),
353 ..Default::default()
354 };
355 let mut req_unary = Request::new(req);
356 req_unary.metadata_mut().insert(key1, value1.clone());
357 req_unary.metadata_mut().insert_bin(key2, value2.clone());
358
359 let stream = tokio_stream::once(make_ping_pong_request(0));
360 let mut req_stream = Request::new(stream);
361 req_stream.metadata_mut().insert(key1, value1.clone());
362 req_stream.metadata_mut().insert_bin(key2, value2.clone());
363
364 let response = client
365 .unary_call(req_unary)
366 .await
367 .expect("call should pass.");
368
369 assertions.push(test_assert!(
370 "metadata string must match in unary",
371 response.metadata().get(key1) == Some(&value1),
372 format!("result={:?}", response.metadata().get(key1))
373 ));
374 assertions.push(test_assert!(
375 "metadata bin must match in unary",
376 response.metadata().get_bin(key2) == Some(&value2),
377 format!("result={:?}", response.metadata().get_bin(key1))
378 ));
379
380 let response = client
381 .full_duplex_call(req_stream)
382 .await
383 .expect("call should pass.");
384
385 assertions.push(test_assert!(
386 "metadata string must match in unary",
387 response.metadata().get(key1) == Some(&value1),
388 format!("result={:?}", response.metadata().get(key1))
389 ));
390
391 let mut stream = response.into_inner();
392
393 let trailers = stream.trailers().await.unwrap().unwrap();
394
395 assertions.push(test_assert!(
396 "metadata bin must match in unary",
397 trailers.get_bin(key2) == Some(&value2),
398 format!("result={:?}", trailers.get_bin(key1))
399 ));
400 }
401
make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest402 fn make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest {
403 let req_len = REQUEST_LENGTHS[idx];
404 let resp_len = RESPONSE_LENGTHS[idx];
405 StreamingOutputCallRequest {
406 response_parameters: vec![ResponseParameters::with_size(resp_len)],
407 payload: Some(crate::client_payload(req_len as usize)),
408 ..Default::default()
409 }
410 }
411