1 use crate::{ 2 pb::test_service_client::*, pb::unimplemented_service_client::*, pb::*, test_assert, 3 TestAssertion, 4 }; 5 use futures_util::{future, stream, StreamExt}; 6 use tokio::sync::mpsc; 7 use tonic::transport::Channel; 8 use tonic::{metadata::MetadataValue, Code, Request, Response, Status}; 9 10 pub type TestClient = TestServiceClient<Channel>; 11 pub type UnimplementedClient = UnimplementedServiceClient<Channel>; 12 13 const LARGE_REQ_SIZE: usize = 271_828; 14 const LARGE_RSP_SIZE: i32 = 314_159; 15 const REQUEST_LENGTHS: &[i32] = &[27182, 8, 1828, 45904]; 16 const RESPONSE_LENGTHS: &[i32] = &[31415, 9, 2653, 58979]; 17 const TEST_STATUS_MESSAGE: &str = "test status message"; 18 const SPECIAL_TEST_STATUS_MESSAGE: &str = 19 "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n"; 20 21 pub async fn empty_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 22 let result = client.empty_call(Request::new(Empty {})).await; 23 24 assertions.push(test_assert!( 25 "call must be successful", 26 result.is_ok(), 27 format!("result={:?}", result) 28 )); 29 30 if let Ok(response) = result { 31 let body = response.into_inner(); 32 assertions.push(test_assert!( 33 "body must not be null", 34 body == Empty {}, 35 format!("body={:?}", body) 36 )); 37 } 38 } 39 40 pub async fn large_unary(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 41 use std::mem; 42 let payload = crate::client_payload(LARGE_REQ_SIZE); 43 let req = SimpleRequest { 44 response_type: PayloadType::Compressable as i32, 45 response_size: LARGE_RSP_SIZE, 46 payload: Some(payload), 47 ..Default::default() 48 }; 49 50 let result = client.unary_call(Request::new(req)).await; 51 52 assertions.push(test_assert!( 53 "call must be successful", 54 result.is_ok(), 55 format!("result={:?}", result) 56 )); 57 58 if let Ok(response) = result { 59 let body = response.into_inner(); 60 let payload_len = body.payload.as_ref().map(|p| p.body.len()).unwrap_or(0); 61 62 assertions.push(test_assert!( 63 "body must be 314159 bytes", 64 payload_len == LARGE_RSP_SIZE as usize, 65 format!("mem::size_of_val(&body)={:?}", mem::size_of_val(&body)) 66 )); 67 } 68 } 69 70 // pub async fn cachable_unary(client: &mut Client, assertions: &mut Vec<TestAssertion>) { 71 // let payload = Payload { 72 // r#type: PayloadType::Compressable as i32, 73 // body: format!("{:?}", std::time::Instant::now()).into_bytes(), 74 // }; 75 // let req = SimpleRequest { 76 // response_type: PayloadType::Compressable as i32, 77 // payload: Some(payload), 78 // ..Default::default() 79 // }; 80 81 // client. 82 // } 83 84 pub async fn client_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 85 let requests = REQUEST_LENGTHS.iter().map(|len| StreamingInputCallRequest { 86 payload: Some(crate::client_payload(*len as usize)), 87 ..Default::default() 88 }); 89 90 let stream = stream::iter(requests); 91 92 let result = client.streaming_input_call(Request::new(stream)).await; 93 94 assertions.push(test_assert!( 95 "call must be successful", 96 result.is_ok(), 97 format!("result={:?}", result) 98 )); 99 100 if let Ok(response) = result { 101 let body = response.into_inner(); 102 103 assertions.push(test_assert!( 104 "aggregated payload size must be 74922 bytes", 105 body.aggregated_payload_size == 74922, 106 format!("aggregated_payload_size={:?}", body.aggregated_payload_size) 107 )); 108 } 109 } 110 111 pub async fn server_streaming(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 112 let req = StreamingOutputCallRequest { 113 response_parameters: RESPONSE_LENGTHS 114 .iter() 115 .map(|len| ResponseParameters::with_size(*len)) 116 .collect(), 117 ..Default::default() 118 }; 119 let req = Request::new(req); 120 121 let result = client.streaming_output_call(req).await; 122 123 assertions.push(test_assert!( 124 "call must be successful", 125 result.is_ok(), 126 format!("result={:?}", result) 127 )); 128 129 if let Ok(response) = result { 130 let responses = response 131 .into_inner() 132 .filter_map(|m| future::ready(m.ok())) 133 .collect::<Vec<_>>() 134 .await; 135 let actual_response_lengths = crate::response_lengths(&responses); 136 let asserts = vec![ 137 test_assert!( 138 "there should be four responses", 139 responses.len() == 4, 140 format!("responses.len()={:?}", responses.len()) 141 ), 142 test_assert!( 143 "the response payload sizes should match input", 144 RESPONSE_LENGTHS == actual_response_lengths.as_slice(), 145 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths) 146 ), 147 ]; 148 149 assertions.extend(asserts); 150 } 151 } 152 153 pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 154 let (tx, rx) = mpsc::unbounded_channel(); 155 tx.send(make_ping_pong_request(0)).unwrap(); 156 157 let result = client.full_duplex_call(Request::new(rx)).await; 158 159 assertions.push(test_assert!( 160 "call must be successful", 161 result.is_ok(), 162 format!("result={:?}", result) 163 )); 164 165 if let Ok(mut response) = result.map(Response::into_inner) { 166 let mut responses = Vec::new(); 167 168 loop { 169 match response.next().await { 170 Some(result) => { 171 responses.push(result.unwrap()); 172 if responses.len() == REQUEST_LENGTHS.len() { 173 drop(tx); 174 break; 175 } else { 176 tx.send(make_ping_pong_request(responses.len())).unwrap(); 177 } 178 } 179 None => { 180 assertions.push(TestAssertion::Failed { 181 description: 182 "server should keep the stream open until the client closes it", 183 expression: "Stream terminated unexpectedly early", 184 why: None, 185 }); 186 break; 187 } 188 } 189 } 190 191 let actual_response_lengths = crate::response_lengths(&responses); 192 assertions.push(test_assert!( 193 "there should be four responses", 194 responses.len() == RESPONSE_LENGTHS.len(), 195 format!("{:?}={:?}", responses.len(), RESPONSE_LENGTHS.len()) 196 )); 197 assertions.push(test_assert!( 198 "the response payload sizes should match input", 199 RESPONSE_LENGTHS == actual_response_lengths.as_slice(), 200 format!("{:?}={:?}", RESPONSE_LENGTHS, actual_response_lengths) 201 )); 202 } 203 } 204 205 pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 206 let stream = stream::iter(Vec::new()); 207 let result = client.full_duplex_call(Request::new(stream)).await; 208 209 assertions.push(test_assert!( 210 "call must be successful", 211 result.is_ok(), 212 format!("result={:?}", result) 213 )); 214 215 if let Ok(response) = result.map(Response::into_inner) { 216 let responses = response.collect::<Vec<_>>().await; 217 218 assertions.push(test_assert!( 219 "there should be no responses", 220 responses.is_empty(), 221 format!("responses.len()={:?}", responses.len()) 222 )); 223 } 224 } 225 226 pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 227 fn validate_response<T>(result: Result<T, Status>, assertions: &mut Vec<TestAssertion>) 228 where 229 T: std::fmt::Debug, 230 { 231 assertions.push(test_assert!( 232 "call must fail with unknown status code", 233 match &result { 234 Err(status) => status.code() == Code::Unknown, 235 _ => false, 236 }, 237 format!("result={:?}", result) 238 )); 239 240 assertions.push(test_assert!( 241 "call must respsond with expected status message", 242 match &result { 243 Err(status) => status.message() == TEST_STATUS_MESSAGE, 244 _ => false, 245 }, 246 format!("result={:?}", result) 247 )); 248 } 249 250 let simple_req = SimpleRequest { 251 response_status: Some(EchoStatus { 252 code: 2, 253 message: TEST_STATUS_MESSAGE.to_string(), 254 ..Default::default() 255 }), 256 ..Default::default() 257 }; 258 259 let duplex_req = StreamingOutputCallRequest { 260 response_status: Some(EchoStatus { 261 code: 2, 262 message: TEST_STATUS_MESSAGE.to_string(), 263 ..Default::default() 264 }), 265 ..Default::default() 266 }; 267 268 let result = client.unary_call(Request::new(simple_req)).await; 269 validate_response(result, assertions); 270 271 let stream = stream::iter(vec![duplex_req]); 272 let result = match client.full_duplex_call(Request::new(stream)).await { 273 Ok(response) => { 274 let stream = response.into_inner(); 275 let responses = stream.collect::<Vec<_>>().await; 276 Ok(responses) 277 } 278 Err(e) => Err(e), 279 }; 280 281 validate_response(result, assertions); 282 } 283 284 pub async fn special_status_message(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 285 let req = SimpleRequest { 286 response_status: Some(EchoStatus { 287 code: 2, 288 message: SPECIAL_TEST_STATUS_MESSAGE.to_string(), 289 ..Default::default() 290 }), 291 ..Default::default() 292 }; 293 294 let result = client.unary_call(Request::new(req)).await; 295 296 assertions.push(test_assert!( 297 "call must fail with unknown status code", 298 match &result { 299 Err(status) => status.code() == Code::Unknown, 300 _ => false, 301 }, 302 format!("result={:?}", result) 303 )); 304 305 assertions.push(test_assert!( 306 "call must respsond with expected status message", 307 match &result { 308 Err(status) => status.message() == SPECIAL_TEST_STATUS_MESSAGE, 309 _ => false, 310 }, 311 format!("result={:?}", result) 312 )); 313 } 314 315 pub async fn unimplemented_method(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 316 let result = client.unimplemented_call(Request::new(Empty {})).await; 317 assertions.push(test_assert!( 318 "call must fail with unimplemented status code", 319 match &result { 320 Err(status) => status.code() == Code::Unimplemented, 321 _ => false, 322 }, 323 format!("result={:?}", result) 324 )); 325 } 326 327 pub async fn unimplemented_service( 328 client: &mut UnimplementedClient, 329 assertions: &mut Vec<TestAssertion>, 330 ) { 331 let result = client.unimplemented_call(Request::new(Empty {})).await; 332 assertions.push(test_assert!( 333 "call must fail with unimplemented status code", 334 match &result { 335 Err(status) => status.code() == Code::Unimplemented, 336 _ => false, 337 }, 338 format!("result={:?}", result) 339 )); 340 } 341 342 pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) { 343 let key1 = "x-grpc-test-echo-initial"; 344 let value1 = MetadataValue::from_str("test_initial_metadata_value").unwrap(); 345 let key2 = "x-grpc-test-echo-trailing-bin"; 346 let value2 = MetadataValue::from_bytes(&[0xab, 0xab, 0xab]); 347 348 let req = SimpleRequest { 349 response_type: PayloadType::Compressable as i32, 350 response_size: LARGE_RSP_SIZE, 351 payload: Some(crate::client_payload(LARGE_REQ_SIZE)), 352 ..Default::default() 353 }; 354 let mut req_unary = Request::new(req); 355 req_unary.metadata_mut().insert(key1, value1.clone()); 356 req_unary.metadata_mut().insert_bin(key2, value2.clone()); 357 358 let stream = stream::iter(vec![make_ping_pong_request(0)]); 359 let mut req_stream = Request::new(stream); 360 req_stream.metadata_mut().insert(key1, value1.clone()); 361 req_stream.metadata_mut().insert_bin(key2, value2.clone()); 362 363 let response = client 364 .unary_call(req_unary) 365 .await 366 .expect("call should pass."); 367 368 assertions.push(test_assert!( 369 "metadata string must match in unary", 370 response.metadata().get(key1) == Some(&value1), 371 format!("result={:?}", response.metadata().get(key1)) 372 )); 373 assertions.push(test_assert!( 374 "metadata bin must match in unary", 375 response.metadata().get_bin(key2) == Some(&value2), 376 format!("result={:?}", response.metadata().get_bin(key1)) 377 )); 378 379 let response = client 380 .full_duplex_call(req_stream) 381 .await 382 .expect("call should pass."); 383 384 assertions.push(test_assert!( 385 "metadata string must match in unary", 386 response.metadata().get(key1) == Some(&value1), 387 format!("result={:?}", response.metadata().get(key1)) 388 )); 389 390 let mut stream = response.into_inner(); 391 392 let trailers = stream.trailers().await.unwrap().unwrap(); 393 394 assertions.push(test_assert!( 395 "metadata bin must match in unary", 396 trailers.get_bin(key2) == Some(&value2), 397 format!("result={:?}", trailers.get_bin(key1)) 398 )); 399 } 400 401 fn make_ping_pong_request(idx: usize) -> StreamingOutputCallRequest { 402 let req_len = REQUEST_LENGTHS[idx]; 403 let resp_len = RESPONSE_LENGTHS[idx]; 404 StreamingOutputCallRequest { 405 response_parameters: vec![ResponseParameters::with_size(resp_len)], 406 payload: Some(crate::client_payload(req_len as usize)), 407 ..Default::default() 408 } 409 } 410