xref: /tonic/examples/src/cancellation/server.rs (revision 8cba85e2)
1 use std::future::Future;
2 
3 use tokio_util::sync::CancellationToken;
4 use tonic::{transport::Server, Request, Response, Status};
5 
6 use hello_world::greeter_server::{Greeter, GreeterServer};
7 use hello_world::{HelloReply, HelloRequest};
8 
9 use tokio::select;
10 use tokio::time::sleep;
11 use tokio::time::Duration;
12 
13 pub mod hello_world {
14     tonic::include_proto!("helloworld");
15 }
16 
17 #[derive(Default)]
18 pub struct MyGreeter {}
19 
20 #[tonic::async_trait]
21 impl Greeter for MyGreeter {
say_hello( &self, request: Request<HelloRequest>, ) -> Result<Response<HelloReply>, Status>22     async fn say_hello(
23         &self,
24         request: Request<HelloRequest>,
25     ) -> Result<Response<HelloReply>, Status> {
26         let remote_addr = request.remote_addr();
27         let request_future = async move {
28             println!("Got a request from {:?}", request.remote_addr());
29 
30             // Take a long time to complete request for the client to cancel early
31             sleep(Duration::from_secs(10)).await;
32 
33             let reply = hello_world::HelloReply {
34                 message: format!("Hello {}!", request.into_inner().name),
35             };
36 
37             Ok(Response::new(reply))
38         };
39         let cancellation_future = async move {
40             println!("Request from {:?} cancelled by client", remote_addr);
41             // If this future is executed it means the request future was dropped,
42             // so it doesn't actually matter what is returned here
43             Err(Status::cancelled("Request cancelled by client"))
44         };
45         with_cancellation_handler(request_future, cancellation_future).await
46     }
47 }
48 
with_cancellation_handler<FRequest, FCancellation>( request_future: FRequest, cancellation_future: FCancellation, ) -> Result<Response<HelloReply>, Status> where FRequest: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static, FCancellation: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,49 async fn with_cancellation_handler<FRequest, FCancellation>(
50     request_future: FRequest,
51     cancellation_future: FCancellation,
52 ) -> Result<Response<HelloReply>, Status>
53 where
54     FRequest: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
55     FCancellation: Future<Output = Result<Response<HelloReply>, Status>> + Send + 'static,
56 {
57     let token = CancellationToken::new();
58     // Will call token.cancel() when the future is dropped, such as when the client cancels the request
59     let _drop_guard = token.clone().drop_guard();
60     let select_task = tokio::spawn(async move {
61         // Can select on token cancellation on any cancellable future while handling the request,
62         // allowing for custom cleanup code or monitoring
63         select! {
64             res = request_future => res,
65             _ = token.cancelled() => cancellation_future.await,
66         }
67     });
68 
69     select_task.await.unwrap()
70 }
71 
72 #[tokio::main]
main() -> Result<(), Box<dyn std::error::Error>>73 async fn main() -> Result<(), Box<dyn std::error::Error>> {
74     let addr = "[::1]:50051".parse().unwrap();
75     let greeter = MyGreeter::default();
76 
77     println!("GreeterServer listening on {}", addr);
78 
79     Server::builder()
80         .add_service(GreeterServer::new(greeter))
81         .serve(addr)
82         .await?;
83 
84     Ok(())
85 }
86