146040429SLucio Franco pub mod pb {
246040429SLucio Franco tonic::include_proto!("grpc.examples.echo");
346040429SLucio Franco }
446040429SLucio Franco
501e5be50SSylwester Rąpała use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration};
601e5be50SSylwester Rąpała use tokio::sync::mpsc;
7c9c4acbcStottoto use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
846040429SLucio Franco use tonic::{transport::Server, Request, Response, Status, Streaming};
946040429SLucio Franco
1046040429SLucio Franco use pb::{EchoRequest, EchoResponse};
1146040429SLucio Franco
1246040429SLucio Franco type EchoResult<T> = Result<Response<T>, Status>;
1323c1392fSLucio Franco type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>;
1446040429SLucio Franco
match_for_io_error(err_status: &Status) -> Option<&std::io::Error>1501e5be50SSylwester Rąpała fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
1601e5be50SSylwester Rąpała let mut err: &(dyn Error + 'static) = err_status;
1701e5be50SSylwester Rąpała
1801e5be50SSylwester Rąpała loop {
1901e5be50SSylwester Rąpała if let Some(io_err) = err.downcast_ref::<std::io::Error>() {
2001e5be50SSylwester Rąpała return Some(io_err);
2101e5be50SSylwester Rąpała }
2201e5be50SSylwester Rąpała
2301e5be50SSylwester Rąpała // h2::Error do not expose std::io::Error with `source()`
2401e5be50SSylwester Rąpała // https://github.com/hyperium/h2/pull/462
2501e5be50SSylwester Rąpała if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
2601e5be50SSylwester Rąpała if let Some(io_err) = h2_err.get_io() {
2701e5be50SSylwester Rąpała return Some(io_err);
2801e5be50SSylwester Rąpała }
2901e5be50SSylwester Rąpała }
3001e5be50SSylwester Rąpała
31*3c0a00d5Stottoto err = err.source()?;
3201e5be50SSylwester Rąpała }
3301e5be50SSylwester Rąpała }
3401e5be50SSylwester Rąpała
3546040429SLucio Franco #[derive(Debug)]
3646040429SLucio Franco pub struct EchoServer {}
3746040429SLucio Franco
3846040429SLucio Franco #[tonic::async_trait]
3946040429SLucio Franco impl pb::echo_server::Echo for EchoServer {
unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse>4046040429SLucio Franco async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> {
4146040429SLucio Franco Err(Status::unimplemented("not implemented"))
4246040429SLucio Franco }
4346040429SLucio Franco
4446040429SLucio Franco type ServerStreamingEchoStream = ResponseStream;
4546040429SLucio Franco
server_streaming_echo( &self, req: Request<EchoRequest>, ) -> EchoResult<Self::ServerStreamingEchoStream>4646040429SLucio Franco async fn server_streaming_echo(
4746040429SLucio Franco &self,
4846040429SLucio Franco req: Request<EchoRequest>,
4946040429SLucio Franco ) -> EchoResult<Self::ServerStreamingEchoStream> {
5001e5be50SSylwester Rąpała println!("EchoServer::server_streaming_echo");
5101e5be50SSylwester Rąpała println!("\tclient connected from: {:?}", req.remote_addr());
5246040429SLucio Franco
5301e5be50SSylwester Rąpała // creating infinite stream with requested message
5401e5be50SSylwester Rąpała let repeat = std::iter::repeat(EchoResponse {
5501e5be50SSylwester Rąpała message: req.into_inner().message,
5601e5be50SSylwester Rąpała });
5701e5be50SSylwester Rąpała let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200)));
5846040429SLucio Franco
5901e5be50SSylwester Rąpała // spawn and channel are required if you want handle "disconnect" functionality
6001e5be50SSylwester Rąpała // the `out_stream` will not be polled after client disconnect
6101e5be50SSylwester Rąpała let (tx, rx) = mpsc::channel(128);
6246040429SLucio Franco tokio::spawn(async move {
6301e5be50SSylwester Rąpała while let Some(item) = stream.next().await {
6401e5be50SSylwester Rąpała match tx.send(Result::<_, Status>::Ok(item)).await {
6501e5be50SSylwester Rąpała Ok(_) => {
6601e5be50SSylwester Rąpała // item (server response) was queued to be send to client
6701e5be50SSylwester Rąpała }
6801e5be50SSylwester Rąpała Err(_item) => {
6901e5be50SSylwester Rąpała // output_stream was build from rx and both are dropped
7001e5be50SSylwester Rąpała break;
7101e5be50SSylwester Rąpała }
7201e5be50SSylwester Rąpała }
7301e5be50SSylwester Rąpała }
7401e5be50SSylwester Rąpała println!("\tclient disconnected");
7546040429SLucio Franco });
7646040429SLucio Franco
7701e5be50SSylwester Rąpała let output_stream = ReceiverStream::new(rx);
7846040429SLucio Franco Ok(Response::new(
7901e5be50SSylwester Rąpała Box::pin(output_stream) as Self::ServerStreamingEchoStream
8046040429SLucio Franco ))
8146040429SLucio Franco }
8246040429SLucio Franco
client_streaming_echo( &self, _: Request<Streaming<EchoRequest>>, ) -> EchoResult<EchoResponse>8346040429SLucio Franco async fn client_streaming_echo(
8446040429SLucio Franco &self,
8546040429SLucio Franco _: Request<Streaming<EchoRequest>>,
8646040429SLucio Franco ) -> EchoResult<EchoResponse> {
8746040429SLucio Franco Err(Status::unimplemented("not implemented"))
8846040429SLucio Franco }
8946040429SLucio Franco
9046040429SLucio Franco type BidirectionalStreamingEchoStream = ResponseStream;
9146040429SLucio Franco
bidirectional_streaming_echo( &self, req: Request<Streaming<EchoRequest>>, ) -> EchoResult<Self::BidirectionalStreamingEchoStream>9246040429SLucio Franco async fn bidirectional_streaming_echo(
9346040429SLucio Franco &self,
9401e5be50SSylwester Rąpała req: Request<Streaming<EchoRequest>>,
9546040429SLucio Franco ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
9601e5be50SSylwester Rąpała println!("EchoServer::bidirectional_streaming_echo");
9701e5be50SSylwester Rąpała
9801e5be50SSylwester Rąpała let mut in_stream = req.into_inner();
9901e5be50SSylwester Rąpała let (tx, rx) = mpsc::channel(128);
10001e5be50SSylwester Rąpała
10101e5be50SSylwester Rąpała // this spawn here is required if you want to handle connection error.
10201e5be50SSylwester Rąpała // If we just map `in_stream` and write it back as `out_stream` the `out_stream`
103d1dbd031Smwtian // will be dropped when connection error occurs and error will never be propagated
10401e5be50SSylwester Rąpała // to mapped version of `in_stream`.
10501e5be50SSylwester Rąpała tokio::spawn(async move {
10601e5be50SSylwester Rąpała while let Some(result) = in_stream.next().await {
10701e5be50SSylwester Rąpała match result {
10801e5be50SSylwester Rąpała Ok(v) => tx
10901e5be50SSylwester Rąpała .send(Ok(EchoResponse { message: v.message }))
11001e5be50SSylwester Rąpała .await
11101e5be50SSylwester Rąpała .expect("working rx"),
11201e5be50SSylwester Rąpała Err(err) => {
11301e5be50SSylwester Rąpała if let Some(io_err) = match_for_io_error(&err) {
11401e5be50SSylwester Rąpała if io_err.kind() == ErrorKind::BrokenPipe {
11501e5be50SSylwester Rąpała // here you can handle special case when client
11601e5be50SSylwester Rąpała // disconnected in unexpected way
11701e5be50SSylwester Rąpała eprintln!("\tclient disconnected: broken pipe");
11801e5be50SSylwester Rąpała break;
11901e5be50SSylwester Rąpała }
12001e5be50SSylwester Rąpała }
12101e5be50SSylwester Rąpała
12201e5be50SSylwester Rąpała match tx.send(Err(err)).await {
12301e5be50SSylwester Rąpała Ok(_) => (),
124f7090c24Srenshuncui Err(_err) => break, // response was dropped
12501e5be50SSylwester Rąpała }
12601e5be50SSylwester Rąpała }
12701e5be50SSylwester Rąpała }
12801e5be50SSylwester Rąpała }
12901e5be50SSylwester Rąpała println!("\tstream ended");
13001e5be50SSylwester Rąpała });
13101e5be50SSylwester Rąpała
13201e5be50SSylwester Rąpała // echo just write the same data that was received
13301e5be50SSylwester Rąpała let out_stream = ReceiverStream::new(rx);
13401e5be50SSylwester Rąpała
13501e5be50SSylwester Rąpała Ok(Response::new(
13601e5be50SSylwester Rąpała Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream
13701e5be50SSylwester Rąpała ))
13846040429SLucio Franco }
13946040429SLucio Franco }
14046040429SLucio Franco
14146040429SLucio Franco #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>14246040429SLucio Franco async fn main() -> Result<(), Box<dyn std::error::Error>> {
14346040429SLucio Franco let server = EchoServer {};
14446040429SLucio Franco Server::builder()
14546040429SLucio Franco .add_service(pb::echo_server::EchoServer::new(server))
14646040429SLucio Franco .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
14746040429SLucio Franco .await
14846040429SLucio Franco .unwrap();
14946040429SLucio Franco
15046040429SLucio Franco Ok(())
15146040429SLucio Franco }
152