xref: /tonic/examples/src/uds/server.rs (revision 6c898a23)
1 #![cfg_attr(not(unix), allow(unused_imports))]
2 
3 use futures::TryFutureExt;
4 use std::path::Path;
5 #[cfg(unix)]
6 use tokio::net::UnixListener;
7 use tonic::{transport::Server, Request, Response, Status};
8 
9 pub mod hello_world {
10     tonic::include_proto!("helloworld");
11 }
12 
13 use hello_world::{
14     greeter_server::{Greeter, GreeterServer},
15     HelloReply, HelloRequest,
16 };
17 
18 #[derive(Default)]
19 pub struct MyGreeter {}
20 
21 #[tonic::async_trait]
22 impl Greeter for MyGreeter {
23     async fn say_hello(
24         &self,
25         request: Request<HelloRequest>,
26     ) -> Result<Response<HelloReply>, Status> {
27         println!("Got a request: {:?}", request);
28 
29         let reply = hello_world::HelloReply {
30             message: format!("Hello {}!", request.into_inner().name),
31         };
32         Ok(Response::new(reply))
33     }
34 }
35 
36 #[cfg(unix)]
37 #[tokio::main]
38 async fn main() -> Result<(), Box<dyn std::error::Error>> {
39     let path = "/tmp/tonic/helloworld";
40 
41     tokio::fs::create_dir_all(Path::new(path).parent().unwrap()).await?;
42 
43     let greeter = MyGreeter::default();
44 
45     let incoming = {
46         let uds = UnixListener::bind(path)?;
47 
48         async_stream::stream! {
49             while let item = uds.accept().map_ok(|(st, _)| unix::UnixStream(st)).await {
50                 yield item;
51             }
52         }
53     };
54 
55     Server::builder()
56         .add_service(GreeterServer::new(greeter))
57         .serve_with_incoming(incoming)
58         .await?;
59 
60     Ok(())
61 }
62 
63 #[cfg(unix)]
64 mod unix {
65     use std::{
66         pin::Pin,
67         task::{Context, Poll},
68     };
69 
70     use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
71     use tonic::transport::server::Connected;
72 
73     #[derive(Debug)]
74     pub struct UnixStream(pub tokio::net::UnixStream);
75 
76     impl Connected for UnixStream {}
77 
78     impl AsyncRead for UnixStream {
79         fn poll_read(
80             mut self: Pin<&mut Self>,
81             cx: &mut Context<'_>,
82             buf: &mut ReadBuf<'_>,
83         ) -> Poll<std::io::Result<()>> {
84             Pin::new(&mut self.0).poll_read(cx, buf)
85         }
86     }
87 
88     impl AsyncWrite for UnixStream {
89         fn poll_write(
90             mut self: Pin<&mut Self>,
91             cx: &mut Context<'_>,
92             buf: &[u8],
93         ) -> Poll<std::io::Result<usize>> {
94             Pin::new(&mut self.0).poll_write(cx, buf)
95         }
96 
97         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
98             Pin::new(&mut self.0).poll_flush(cx)
99         }
100 
101         fn poll_shutdown(
102             mut self: Pin<&mut Self>,
103             cx: &mut Context<'_>,
104         ) -> Poll<std::io::Result<()>> {
105             Pin::new(&mut self.0).poll_shutdown(cx)
106         }
107     }
108 }
109 
110 #[cfg(not(unix))]
111 fn main() {
112     panic!("The `uds` example only works on unix systems!");
113 }
114