1 use std::pin::Pin; 2 3 use tokio_stream::{self as stream, Stream, StreamExt}; 4 use tonic::{Request, Response, Status, Streaming}; 5 6 use pb::{test_server::Test, Input, Output}; 7 8 pub mod pb { 9 tonic::include_proto!("test"); 10 } 11 12 type BoxStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + 'static>>; 13 14 pub struct Svc; 15 16 #[tonic::async_trait] 17 impl Test for Svc { unary_call(&self, req: Request<Input>) -> Result<Response<Output>, Status>18 async fn unary_call(&self, req: Request<Input>) -> Result<Response<Output>, Status> { 19 let req = req.into_inner(); 20 21 if &req.desc == "boom" { 22 Err(Status::invalid_argument("invalid boom")) 23 } else { 24 Ok(Response::new(Output { 25 id: req.id, 26 desc: req.desc, 27 })) 28 } 29 } 30 31 type ServerStreamStream = BoxStream<Output>; 32 server_stream( &self, req: Request<Input>, ) -> Result<Response<Self::ServerStreamStream>, Status>33 async fn server_stream( 34 &self, 35 req: Request<Input>, 36 ) -> Result<Response<Self::ServerStreamStream>, Status> { 37 let req = req.into_inner(); 38 39 Ok(Response::new(Box::pin(stream::iter(vec![1, 2]).map( 40 move |n| { 41 Ok(Output { 42 id: req.id, 43 desc: format!("{}-{}", n, req.desc), 44 }) 45 }, 46 )))) 47 } 48 client_stream( &self, req: Request<Streaming<Input>>, ) -> Result<Response<Output>, Status>49 async fn client_stream( 50 &self, 51 req: Request<Streaming<Input>>, 52 ) -> Result<Response<Output>, Status> { 53 let out = Output { 54 id: 0, 55 desc: "".into(), 56 }; 57 58 Ok(Response::new( 59 req.into_inner() 60 .fold(out, |mut acc, input| { 61 let input = input.unwrap(); 62 acc.id += input.id; 63 acc.desc += &input.desc; 64 acc 65 }) 66 .await, 67 )) 68 } 69 } 70 71 pub mod util { 72 pub mod base64 { 73 use base64::{ 74 alphabet, 75 engine::{ 76 general_purpose::{GeneralPurpose, GeneralPurposeConfig}, 77 DecodePaddingMode, 78 }, 79 }; 80 81 pub const STANDARD: GeneralPurpose = GeneralPurpose::new( 82 &alphabet::STANDARD, 83 GeneralPurposeConfig::new() 84 .with_encode_padding(true) 85 .with_decode_padding_mode(DecodePaddingMode::Indifferent), 86 ); 87 } 88 } 89