1 pub mod pb { 2 tonic::include_proto!("grpc.examples.echo"); 3 } 4 5 use futures::Stream; 6 use std::net::SocketAddr; 7 use std::pin::Pin; 8 use tokio::sync::mpsc; 9 use tonic::{transport::Server, Request, Response, Status, Streaming}; 10 11 use pb::{EchoRequest, EchoResponse}; 12 13 type EchoResult<T> = Result<Response<T>, Status>; 14 type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>; 15 16 #[derive(Debug)] 17 pub struct EchoServer { 18 addr: SocketAddr, 19 } 20 21 #[tonic::async_trait] 22 impl pb::echo_server::Echo for EchoServer { 23 async fn unary_echo(&self, request: Request<EchoRequest>) -> EchoResult<EchoResponse> { 24 let message = format!("{} (from {})", request.into_inner().message, self.addr); 25 26 Ok(Response::new(EchoResponse { message })) 27 } 28 29 type ServerStreamingEchoStream = ResponseStream; 30 31 async fn server_streaming_echo( 32 &self, 33 _: Request<EchoRequest>, 34 ) -> EchoResult<Self::ServerStreamingEchoStream> { 35 Err(Status::unimplemented("not implemented")) 36 } 37 38 async fn client_streaming_echo( 39 &self, 40 _: Request<Streaming<EchoRequest>>, 41 ) -> EchoResult<EchoResponse> { 42 Err(Status::unimplemented("not implemented")) 43 } 44 45 type BidirectionalStreamingEchoStream = ResponseStream; 46 47 async fn bidirectional_streaming_echo( 48 &self, 49 _: Request<Streaming<EchoRequest>>, 50 ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { 51 Err(Status::unimplemented("not implemented")) 52 } 53 } 54 55 #[tokio::main] 56 async fn main() -> Result<(), Box<dyn std::error::Error>> { 57 let addrs = ["[::1]:50051", "[::1]:50052"]; 58 59 let (tx, mut rx) = mpsc::unbounded_channel(); 60 61 for addr in &addrs { 62 let addr = addr.parse()?; 63 let tx = tx.clone(); 64 65 let server = EchoServer { addr }; 66 let serve = Server::builder() 67 .add_service(pb::echo_server::EchoServer::new(server)) 68 .serve(addr); 69 70 tokio::spawn(async move { 71 if let Err(e) = serve.await { 72 eprintln!("Error = {:?}", e); 73 } 74 75 tx.send(()).unwrap(); 76 }); 77 } 78 79 rx.recv().await; 80 81 Ok(()) 82 } 83