1 use std::future::Future;
2
3 use tokio_util::sync::CancellationToken;
4 use tonic::{transport::Server, Request, Response, Status};
5
6 use hello_world::greeter_server::{Greeter, GreeterServer};
7 use hello_world::{HelloReply, HelloRequest};
8
9 use tokio::select;
10 use tokio::time::sleep;
11 use tokio::time::Duration;
12
13 pub mod hello_world {
14 tonic::include_proto!("helloworld");
15 }
16
17 #[derive(Default)]
18 pub struct MyGreeter {}
19
20 #[tonic::async_trait]
21 impl Greeter for MyGreeter {
say_hello( &self, request: Request<HelloRequest>, ) -> Result<Response<HelloReply>, Status>22 async fn say_hello(
23 &self,
24 request: Request<HelloRequest>,
25 ) -> Result<Response<HelloReply>, Status> {
26 let remote_addr = request.remote_addr();
27 let request_future = async move {
28 println!("Got a request from {:?}", request.remote_addr());
29
30 // Take a long time to complete request for the client to cancel early
31 sleep(Duration::from_secs(10)).await;
32
33 let reply = hello_world::HelloReply {
34 message: format!("Hello {}!", request.into_inner().name),
35 };
36
37 Ok(Response::new(reply))
38 };
39 let cancellation_future = async move {
40 println!("Request from {:?} cancelled by client", remote_addr);
41 // If this future is executed it means the request future was dropped,
42 // so it doesn't actually matter what is returned here
43 Err(Status::cancelled("Request cancelled by client"))
44 };
45 with_cancellation_handler(request_future, cancellation_future).await
46 }
47 }
48
with_cancellation_handler<FRequest, FCancellation>( request_future: FRequest, cancellation_future: FCancellation, ) -> Result<Response<HelloReply>, Status> where FRequest: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static, FCancellation: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,49 async fn with_cancellation_handler<FRequest, FCancellation>(
50 request_future: FRequest,
51 cancellation_future: FCancellation,
52 ) -> Result<Response<HelloReply>, Status>
53 where
54 FRequest: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
55 FCancellation: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
56 {
57 let token = CancellationToken::new();
58 // Will call token.cancel() when the future is dropped, such as when the client cancels the request
59 let _drop_guard = token.clone().drop_guard();
60 let select_task = tokio::spawn(async move {
61 // Can select on token cancellation on any cancellable future while handling the request,
62 // allowing for custom cleanup code or monitoring
63 select! {
64 res = request_future => res,
65 _ = token.cancelled() => cancellation_future.await,
66 }
67 });
68
69 select_task.await.unwrap()
70 }
71
72 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>73 async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 let addr = "[::1]:50051".parse().unwrap();
75 let greeter = MyGreeter::default();
76
77 println!("GreeterServer listening on {}", addr);
78
79 Server::builder()
80 .add_service(GreeterServer::new(greeter))
81 .serve(addr)
82 .await?;
83
84 Ok(())
85 }
86