1 pub mod pb { 2 tonic::include_proto!("grpc.examples.echo"); 3 } 4 5 use futures::Stream; 6 use std::net::ToSocketAddrs; 7 use std::pin::Pin; 8 use std::task::{Context, Poll}; 9 use tokio::sync::oneshot; 10 use tonic::{transport::Server, Request, Response, Status, Streaming}; 11 12 use pb::{EchoRequest, EchoResponse}; 13 14 type EchoResult<T> = Result<Response<T>, Status>; 15 type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send + Sync>>; 16 17 #[derive(Debug)] 18 pub struct EchoServer {} 19 20 #[tonic::async_trait] 21 impl pb::echo_server::Echo for EchoServer { 22 async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> { 23 Err(Status::unimplemented("not implemented")) 24 } 25 26 type ServerStreamingEchoStream = ResponseStream; 27 28 async fn server_streaming_echo( 29 &self, 30 req: Request<EchoRequest>, 31 ) -> EchoResult<Self::ServerStreamingEchoStream> { 32 println!("Client connected from: {:?}", req.remote_addr()); 33 34 let (tx, rx) = oneshot::channel::<()>(); 35 36 tokio::spawn(async move { 37 let _ = rx.await; 38 println!("The rx resolved therefore the client disconnected!"); 39 }); 40 41 struct ClientDisconnect(oneshot::Sender<()>); 42 43 impl Stream for ClientDisconnect { 44 type Item = Result<EchoResponse, Status>; 45 46 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { 47 // A stream that never resovlves to anything.... 48 Poll::Pending 49 } 50 } 51 52 Ok(Response::new( 53 Box::pin(ClientDisconnect(tx)) as Self::ServerStreamingEchoStream 54 )) 55 } 56 57 async fn client_streaming_echo( 58 &self, 59 _: Request<Streaming<EchoRequest>>, 60 ) -> EchoResult<EchoResponse> { 61 Err(Status::unimplemented("not implemented")) 62 } 63 64 type BidirectionalStreamingEchoStream = ResponseStream; 65 66 async fn bidirectional_streaming_echo( 67 &self, 68 _: Request<Streaming<EchoRequest>>, 69 ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { 70 Err(Status::unimplemented("not implemented")) 71 } 72 } 73 74 #[tokio::main] 75 async fn main() -> Result<(), Box<dyn std::error::Error>> { 76 let server = EchoServer {}; 77 Server::builder() 78 .add_service(pb::echo_server::EchoServer::new(server)) 79 .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap()) 80 .await 81 .unwrap(); 82 83 Ok(()) 84 } 85