1 use crate::{ 2 pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert, 3 TestAssertion, 4 }; 5 use tokio::sync::mpsc; 6 use tokio_stream::StreamExt; 7 use tonic::transport::Channel; 8 use tonic::{metadata::MetadataValue, Code, Request, Response, Status}; 9 10 pub type TestClient = TestServiceClient<Channel>; 11 pub type UnimplementedClient = UnimplementedServiceClient<Channel>; 12 13 const LARGE_REQ_SIZE: usize = 271_828; 14 const LARGE_RSP_SIZE: i32 = 314_159; 15 const REQUEST_LENGTHS: &[i32] = &[27182, 8, 1828, 45904]; 16 const RESPONSE_LENGTHS: &[i32] = &[31415, 9, 2653, 58979]; 17 const TEST_STATUS_MESSAGE: &str = "test status message"; 18 const SPECIAL_TEST_STATUS_MESSAGE: &str = 19 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n"; 20 21 pub async fn empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 22 let result = client.empty_call(Request::new(Empty {})).await; 23 24 assertions.push(test_assert!( 25 "call must be successful", 26 result.is_ok(), 27 format!("result={:?}", result) 28 )); 29 30 if let Ok(response) = result { 31 let body = response.into_inner(); 32 assertions.push(test_assert!( 33 "body must not be null", 34 body == Empty {}, 35 format!("body={:?}", body) 36 )); 37 } 38 } 39 40 pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 41 use std::mem; 42 let payload = crate::client_payload(LARGE_REQ_SIZE); 43 let req = SimpleRequest { 44 response_type: PayloadType::Compressable as i32, 45 response_size: LARGE_RSP_SIZE, 46 payload: Some(payload), 47 ..Default::default() 48 }; 49 50 let result = client.unary_call(Request::new(req)).await; 51 52 assertions.push(test_assert!( 53 "call must be successful", 54 result.is_ok(), 55 format!("result={:?}", result) 56 )); 57 58 if let Ok(response) = result { 59 let body = response.into_inner(); 60 let payload_len = body.payload.as_ref().map(|p| p.body.len()).unwrap_or(0); 61 62 assertions.push(test_assert!( 63 "body must be 314159 bytes", 64 payload_len == LARGE_RSP_SIZE as usize, 65 format!("mem::size_of_val(&body)={:?}", mem::size_of_val(&body)) 66 )); 67 } 68 } 69 70 // pub async fn cachable_unary(client: &mut Client, assertions: &mut Vec<TestAssertion>) { 71 // let payload = Payload { 72 // r#type: PayloadType::Compressable as i32, 73 // body: format!("{:?}", std::time::Instant::now()).into_bytes(), 74 // }; 75 // let req = SimpleRequest { 76 // response_type: PayloadType::Compressable as i32, 77 // payload: Some(payload), 78 // ..Default::default() 79 // }; 80 81 // client. 82 // } 83 84 pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 85 let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest { 86 payload: Some(crate::client_payload(*len as usize)), 87 ..Default::default() 88 }); 89 90 let stream = tokio_stream::iter(requests); 91 92 let result = client.streaming_input_call(Request::new(stream)).await; 93 94 assertions.push(test_assert!( 95 "call must be successful", 96 result.is_ok(), 97 format!("result={:?}", result) 98 )); 99 100 if let Ok(response) = result { 101 let body = response.into_inner(); 102 103 assertions.push(test_assert!( 104 "aggregated payload size must be 74922 bytes", 105 body.aggregated_payload_size == 74922, 106 format!("aggregated_payload_size={:?}", body.aggregated_payload_size) 107 )); 108 } 109 } 110 111 pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 112 let req = StreamingOutputCallRequest { 113 response_parameters: RESPONSE_LENGTHS 114 .iter() 115 .map(|len| ResponseParameters::with_size(*len)) 116 .collect(), 117 ..Default::default() 118 }; 119 let req = Request::new(req); 120 121 let result = client.streaming_output_call(req).await; 122 123 assertions.push(test_assert!( 124 "call must be successful", 125 result.is_ok(), 126 format!("result={:?}", result) 127 )); 128 129 if let Ok(response) = result { 130 let responses = response 131 .into_inner() 132 .filter_map(|m| m.ok()) 133 .collect::<Vec<_>>() 134 .await; 135 let actual_response_lengths = crate::response_lengths(&responses); 136 let asserts = vec![ 137 test_assert!( 138 "there should be four responses", 139 responses.len() == 4, 140 format!("responses.len()={:?}", responses.len()) 141 ), 142 test_assert!( 143 "the response payload sizes should match input", 144 RESPONSE_LENGTHS == actual_response_lengths.as_slice(), 145 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths) 146 ), 147 ]; 148 149 assertions.extend(asserts); 150 } 151 } 152 153 pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 154 let (tx, rx) = mpsc::unbounded_channel(); 155 tx.send(make_ping_pong_request(0)).unwrap(); 156 157 let result = client 158 .full_duplex_call(Request::new( 159 tokio_stream::wrappers::UnboundedReceiverStream::new(rx), 160 )) 161 .await; 162 163 assertions.push(test_assert!( 164 "call must be successful", 165 result.is_ok(), 166 format!("result={:?}", result) 167 )); 168 169 if let Ok(mut response) = result.map(Response::into_inner) { 170 let mut responses = Vec::new(); 171 172 loop { 173 match response.next().await { 174 Some(result) => { 175 responses.push(result.unwrap()); 176 if responses.len() == REQUEST_LENGTHS.len() { 177 drop(tx); 178 break; 179 } else { 180 tx.send(make_ping_pong_request(responses.len())).unwrap(); 181 } 182 } 183 None => { 184 assertions.push(TestAssertion::Failed { 185 description: 186 "server should keep the stream open until the client closes it", 187 expression: "Stream terminated unexpectedly early", 188 why: None, 189 }); 190 break; 191 } 192 } 193 } 194 195 let actual_response_lengths = crate::response_lengths(&responses); 196 assertions.push(test_assert!( 197 "there should be four responses", 198 responses.len() == RESPONSE_LENGTHS.len(), 199 format!("{:?}={:?}", responses.len(), RESPONSE_LENGTHS.len()) 200 )); 201 assertions.push(test_assert!( 202 "the response payload sizes should match input", 203 RESPONSE_LENGTHS == actual_response_lengths.as_slice(), 204 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths) 205 )); 206 } 207 } 208 209 pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 210 let stream = tokio_stream::empty(); 211 let result = client.full_duplex_call(Request::new(stream)).await; 212 213 assertions.push(test_assert!( 214 "call must be successful", 215 result.is_ok(), 216 format!("result={:?}", result) 217 )); 218 219 if let Ok(response) = result.map(Response::into_inner) { 220 let responses = response.collect::<Vec<_>>().await; 221 222 assertions.push(test_assert!( 223 "there should be no responses", 224 responses.is_empty(), 225 format!("responses.len()={:?}", responses.len()) 226 )); 227 } 228 } 229 230 pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 231 fn validate_response<T>(result: Result<T, Status>, assertions: &mut Vec<TestAssertion>) 232 where 233 T: std::fmt::Debug, 234 { 235 assertions.push(test_assert!( 236 "call must fail with unknown status code", 237 match &result { 238 Err(status) => status.code() == Code::Unknown, 239 _ => false, 240 }, 241 format!("result={:?}", result) 242 )); 243 244 assertions.push(test_assert!( 245 "call must respsond with expected status message", 246 match &result { 247 Err(status) => status.message() == TEST_STATUS_MESSAGE, 248 _ => false, 249 }, 250 format!("result={:?}", result) 251 )); 252 } 253 254 let simple_req = SimpleRequest { 255 response_status: Some(EchoStatus { 256 code: 2, 257 message: TEST_STATUS_MESSAGE.to_string(), 258 }), 259 ..Default::default() 260 }; 261 262 let duplex_req = StreamingOutputCallRequest { 263 response_status: Some(EchoStatus { 264 code: 2, 265 message: TEST_STATUS_MESSAGE.to_string(), 266 }), 267 ..Default::default() 268 }; 269 270 let result = client.unary_call(Request::new(simple_req)).await; 271 validate_response(result, assertions); 272 273 let stream = tokio_stream::once(duplex_req); 274 let result = match client.full_duplex_call(Request::new(stream)).await { 275 Ok(response) => { 276 let stream = response.into_inner(); 277 let responses = stream.collect::<Vec<_>>().await; 278 Ok(responses) 279 } 280 Err(e) => Err(e), 281 }; 282 283 validate_response(result, assertions); 284 } 285 286 pub async fn special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 287 let req = SimpleRequest { 288 response_status: Some(EchoStatus { 289 code: 2, 290 message: SPECIAL_TEST_STATUS_MESSAGE.to_string(), 291 }), 292 ..Default::default() 293 }; 294 295 let result = client.unary_call(Request::new(req)).await; 296 297 assertions.push(test_assert!( 298 "call must fail with unknown status code", 299 match &result { 300 Err(status) => status.code() == Code::Unknown, 301 _ => false, 302 }, 303 format!("result={:?}", result) 304 )); 305 306 assertions.push(test_assert!( 307 "call must respsond with expected status message", 308 match &result { 309 Err(status) => status.message() == SPECIAL_TEST_STATUS_MESSAGE, 310 _ => false, 311 }, 312 format!("result={:?}", result) 313 )); 314 } 315 316 pub async fn unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 317 let result = client.unimplemented_call(Request::new(Empty {})).await; 318 assertions.push(test_assert!( 319 "call must fail with unimplemented status code", 320 match &result { 321 Err(status) => status.code() == Code::Unimplemented, 322 _ => false, 323 }, 324 format!("result={:?}", result) 325 )); 326 } 327 328 pub async fn unimplemented_service( 329 client: &mut UnimplementedClient, 330 assertions: &mut Vec<TestAssertion>, 331 ) { 332 let result = client.unimplemented_call(Request::new(Empty {})).await; 333 assertions.push(test_assert!( 334 "call must fail with unimplemented status code", 335 match &result { 336 Err(status) => status.code() == Code::Unimplemented, 337 _ => false, 338 }, 339 format!("result={:?}", result) 340 )); 341 } 342 343 pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 344 let key1 = "x-grpc-test-echo-initial"; 345 let value1: MetadataValue<_> = "test_initial_metadata_value".parse().unwrap(); 346 let key2 = "x-grpc-test-echo-trailing-bin"; 347 let value2 = MetadataValue::from_bytes(&[0xab, 0xab, 0xab]); 348 349 let req = SimpleRequest { 350 response_type: PayloadType::Compressable as i32, 351 response_size: LARGE_RSP_SIZE, 352 payload: Some(crate::client_payload(LARGE_REQ_SIZE)), 353 ..Default::default() 354 }; 355 let mut req_unary = Request::new(req); 356 req_unary.metadata_mut().insert(key1, value1.clone()); 357 req_unary.metadata_mut().insert_bin(key2, value2.clone()); 358 359 let stream = tokio_stream::once(make_ping_pong_request(0)); 360 let mut req_stream = Request::new(stream); 361 req_stream.metadata_mut().insert(key1, value1.clone()); 362 req_stream.metadata_mut().insert_bin(key2, value2.clone()); 363 364 let response = client 365 .unary_call(req_unary) 366 .await 367 .expect("call should pass."); 368 369 assertions.push(test_assert!( 370 "metadata string must match in unary", 371 response.metadata().get(key1) == Some(&value1), 372 format!("result={:?}", response.metadata().get(key1)) 373 )); 374 assertions.push(test_assert!( 375 "metadata bin must match in unary", 376 response.metadata().get_bin(key2) == Some(&value2), 377 format!("result={:?}", response.metadata().get_bin(key1)) 378 )); 379 380 let response = client 381 .full_duplex_call(req_stream) 382 .await 383 .expect("call should pass."); 384 385 assertions.push(test_assert!( 386 "metadata string must match in unary", 387 response.metadata().get(key1) == Some(&value1), 388 format!("result={:?}", response.metadata().get(key1)) 389 )); 390 391 let mut stream = response.into_inner(); 392 393 let trailers = stream.trailers().await.unwrap().unwrap(); 394 395 assertions.push(test_assert!( 396 "metadata bin must match in unary", 397 trailers.get_bin(key2) == Some(&value2), 398 format!("result={:?}", trailers.get_bin(key1)) 399 )); 400 } 401 402 fn make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest { 403 let req_len = REQUEST_LENGTHS[idx]; 404 let resp_len = RESPONSE_LENGTHS[idx]; 405 StreamingOutputCallRequest { 406 response_parameters: vec![ResponseParameters::with_size(resp_len)], 407 payload: Some(crate::client_payload(req_len as usize)), 408 ..Default::default() 409 } 410 } 411