xref: /tonic/examples/src/load_balance/server.rs (revision da92dbf8)
1 pub mod pb {
2     tonic::include_proto!("grpc.examples.echo");
3 }
4 
5 use futures::Stream;
6 use std::net::SocketAddr;
7 use std::pin::Pin;
8 use tokio::sync::mpsc;
9 use tonic::{transport::Server, Request, Response, Status, Streaming};
10 
11 use pb::{EchoRequest, EchoResponse};
12 
13 type EchoResult<T> = Result<Response<T>, Status>;
14 type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>;
15 
16 #[derive(Debug)]
17 pub struct EchoServer {
18     addr: SocketAddr,
19 }
20 
21 #[tonic::async_trait]
22 impl pb::echo_server::Echo for EchoServer {
23     async fn unary_echo(&self, request: Request<EchoRequest>) -> EchoResult<EchoResponse> {
24         let message = format!("{} (from {})", request.into_inner().message, self.addr);
25 
26         Ok(Response::new(EchoResponse { message }))
27     }
28 
29     type ServerStreamingEchoStream = ResponseStream;
30 
31     async fn server_streaming_echo(
32         &self,
33         _: Request<EchoRequest>,
34     ) -> EchoResult<Self::ServerStreamingEchoStream> {
35         Err(Status::unimplemented("not implemented"))
36     }
37 
38     async fn client_streaming_echo(
39         &self,
40         _: Request<Streaming<EchoRequest>>,
41     ) -> EchoResult<EchoResponse> {
42         Err(Status::unimplemented("not implemented"))
43     }
44 
45     type BidirectionalStreamingEchoStream = ResponseStream;
46 
47     async fn bidirectional_streaming_echo(
48         &self,
49         _: Request<Streaming<EchoRequest>>,
50     ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
51         Err(Status::unimplemented("not implemented"))
52     }
53 }
54 
55 #[tokio::main]
56 async fn main() -> Result<(), Box<dyn std::error::Error>> {
57     let addrs = ["[::1]:50051", "[::1]:50052"];
58 
59     let (tx, mut rx) = mpsc::unbounded_channel();
60 
61     for addr in &addrs {
62         let addr = addr.parse()?;
63         let tx = tx.clone();
64 
65         let server = EchoServer { addr };
66         let serve = Server::builder()
67             .add_service(pb::echo_server::EchoServer::new(server))
68             .serve(addr);
69 
70         tokio::spawn(async move {
71             if let Err(e) = serve.await {
72                 eprintln!("Error = {:?}", e);
73             }
74 
75             tx.send(()).unwrap();
76         });
77     }
78 
79     rx.recv().await;
80 
81     Ok(())
82 }
83