xref: /tonic/examples/src/streaming/server.rs (revision bbcacd06)
1 pub mod pb {
2     tonic::include_proto!("grpc.examples.echo");
3 }
4 
5 use futures::Stream;
6 use std::net::ToSocketAddrs;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 use tokio::sync::oneshot;
10 use tonic::{transport::Server, Request, Response, Status, Streaming};
11 
12 use pb::{EchoRequest, EchoResponse};
13 
14 type EchoResult<T> = Result<Response<T>, Status>;
15 type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>;
16 
17 #[derive(Debug)]
18 pub struct EchoServer {}
19 
20 #[tonic::async_trait]
21 impl pb::echo_server::Echo for EchoServer {
22     async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> {
23         Err(Status::unimplemented("not implemented"))
24     }
25 
26     type ServerStreamingEchoStream = ResponseStream;
27 
28     async fn server_streaming_echo(
29         &self,
30         req: Request<EchoRequest>,
31     ) -> EchoResult<Self::ServerStreamingEchoStream> {
32         println!("Client connected from: {:?}", req.remote_addr());
33 
34         let (tx, rx) = oneshot::channel::<()>();
35 
36         tokio::spawn(async move {
37             let _ = rx.await;
38             println!("The rx resolved therefore the client disconnected!");
39         });
40 
41         struct ClientDisconnect(oneshot::Sender<()>);
42 
43         impl Stream for ClientDisconnect {
44             type Item = Result<EchoResponse, Status>;
45 
46             fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47                 // A stream that never resovlves to anything....
48                 Poll::Pending
49             }
50         }
51 
52         Ok(Response::new(
53             Box::pin(ClientDisconnect(tx)) as Self::ServerStreamingEchoStream
54         ))
55     }
56 
57     async fn client_streaming_echo(
58         &self,
59         _: Request<Streaming<EchoRequest>>,
60     ) -> EchoResult<EchoResponse> {
61         Err(Status::unimplemented("not implemented"))
62     }
63 
64     type BidirectionalStreamingEchoStream = ResponseStream;
65 
66     async fn bidirectional_streaming_echo(
67         &self,
68         _: Request<Streaming<EchoRequest>>,
69     ) -> EchoResult<Self::BidirectionalStreamingEchoStream> {
70         Err(Status::unimplemented("not implemented"))
71     }
72 }
73 
74 #[tokio::main]
75 async fn main() -> Result<(), Box<dyn std::error::Error>> {
76     let server = EchoServer {};
77     Server::builder()
78         .add_service(pb::echo_server::EchoServer::new(server))
79         .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap())
80         .await
81         .unwrap();
82 
83     Ok(())
84 }
85