pub mod pb { tonic::include_proto!("grpc.examples.echo"); } use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult = Result, Status>; type ResponseStream = Pin> + Send>>; fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn Error + 'static) = err_status; loop { if let Some(io_err) = err.downcast_ref::() { return Some(io_err); } // h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } } err = err.source()?; } } #[derive(Debug)] pub struct EchoServer {} #[tonic::async_trait] impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, _: Request) -> EchoResult { Err(Status::unimplemented("not implemented")) } type ServerStreamingEchoStream = ResponseStream; async fn server_streaming_echo( &self, req: Request, ) -> EchoResult { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr()); // creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } async fn client_streaming_echo( &self, _: Request>, ) -> EchoResult { Err(Status::unimplemented("not implemented")) } type BidirectionalStreamingEchoStream = ResponseStream; async fn bidirectional_streaming_echo( &self, req: Request>, ) -> EchoResult { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); // this spawn here is required if you want to handle connection error. // If we just map `in_stream` and write it back as `out_stream` the `out_stream` // will be dropped when connection error occurs and error will never be propagated // to mapped version of `in_stream`. tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { // here you can handle special case when client // disconnected in unexpected way eprintln!("\tclient disconnected: broken pipe"); break; } } match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was dropped } } } } println!("\tstream ended"); }); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) } } #[tokio::main] async fn main() -> Result<(), Box> { let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(()) }