xref: /tonic/examples/src/routeguide/server.rs (revision 61555ff2)
1 use std::collections::HashMap;
2 use std::pin::Pin;
3 use std::sync::Arc;
4 use std::time::Instant;
5 
6 use futures::{Stream, StreamExt};
7 use tokio::sync::mpsc;
8 use tonic::transport::Server;
9 use tonic::{Request, Response, Status};
10 
11 use routeguide::route_guide_server::{RouteGuide, RouteGuideServer};
12 use routeguide::{Feature, Point, Rectangle, RouteNote, RouteSummary};
13 
14 pub mod routeguide {
15     tonic::include_proto!("routeguide");
16 }
17 
18 mod data;
19 
20 #[derive(Debug)]
21 pub struct RouteGuideService {
22     features: Arc<Vec<Feature>>,
23 }
24 
25 #[tonic::async_trait]
26 impl RouteGuide for RouteGuideService {
27     async fn get_feature(&self, request: Request<Point>) -> Result<Response<Feature>, Status> {
28         println!("GetFeature = {:?}", request);
29 
30         for feature in &self.features[..] {
31             if feature.location.as_ref() == Some(request.get_ref()) {
32                 return Ok(Response::new(feature.clone()));
33             }
34         }
35 
36         Ok(Response::new(Feature::default()))
37     }
38 
39     type ListFeaturesStream =
40         Pin<Box<dyn Stream<Item = Result<Feature, Status>> + Send + Sync + 'static>>;
41 
42     async fn list_features(
43         &self,
44         request: Request<Rectangle>,
45     ) -> Result<Response<Self::ListFeaturesStream>, Status> {
46         println!("ListFeatures = {:?}", request);
47 
48         let (tx, rx) = mpsc::channel(4);
49         let features = self.features.clone();
50 
51         tokio::spawn(async move {
52             for feature in &features[..] {
53                 if in_range(feature.location.as_ref().unwrap(), request.get_ref()) {
54                     println!("  => send {:?}", feature);
55                     tx.send(Ok(feature.clone())).await.unwrap();
56                 }
57             }
58 
59             println!(" /// done sending");
60         });
61 
62         Ok(Response::new(Box::pin(
63             tokio_stream::wrappers::ReceiverStream::new(rx),
64         )))
65     }
66 
67     async fn record_route(
68         &self,
69         request: Request<tonic::Streaming<Point>>,
70     ) -> Result<Response<RouteSummary>, Status> {
71         println!("RecordRoute");
72 
73         let mut stream = request.into_inner();
74 
75         let mut summary = RouteSummary::default();
76         let mut last_point = None;
77         let now = Instant::now();
78 
79         while let Some(point) = stream.next().await {
80             let point = point?;
81 
82             println!("  ==> Point = {:?}", point);
83 
84             // Increment the point count
85             summary.point_count += 1;
86 
87             // Find features
88             for feature in &self.features[..] {
89                 if feature.location.as_ref() == Some(&point) {
90                     summary.feature_count += 1;
91                 }
92             }
93 
94             // Calculate the distance
95             if let Some(ref last_point) = last_point {
96                 summary.distance += calc_distance(last_point, &point);
97             }
98 
99             last_point = Some(point);
100         }
101 
102         summary.elapsed_time = now.elapsed().as_secs() as i32;
103 
104         Ok(Response::new(summary))
105     }
106 
107     type RouteChatStream =
108         Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>>;
109 
110     async fn route_chat(
111         &self,
112         request: Request<tonic::Streaming<RouteNote>>,
113     ) -> Result<Response<Self::RouteChatStream>, Status> {
114         println!("RouteChat");
115 
116         let mut notes = HashMap::new();
117         let mut stream = request.into_inner();
118 
119         let output = async_stream::try_stream! {
120             while let Some(note) = stream.next().await {
121                 let note = note?;
122 
123                 let location = note.location.clone().unwrap();
124 
125                 let location_notes = notes.entry(location).or_insert(vec![]);
126                 location_notes.push(note);
127 
128                 for note in location_notes {
129                     yield note.clone();
130                 }
131             }
132         };
133 
134         Ok(Response::new(Box::pin(output) as Self::RouteChatStream))
135     }
136 }
137 
138 #[tokio::main]
139 async fn main() -> Result<(), Box<dyn std::error::Error>> {
140     let addr = "[::1]:10000".parse().unwrap();
141 
142     println!("RouteGuideServer listening on: {}", addr);
143 
144     let route_guide = RouteGuideService {
145         features: Arc::new(data::load()),
146     };
147 
148     let svc = RouteGuideServer::new(route_guide);
149 
150     Server::builder().add_service(svc).serve(addr).await?;
151 
152     Ok(())
153 }
154 
155 impl Eq for Point {}
156 
157 fn in_range(point: &Point, rect: &Rectangle) -> bool {
158     use std::cmp;
159 
160     let lo = rect.lo.as_ref().unwrap();
161     let hi = rect.hi.as_ref().unwrap();
162 
163     let left = cmp::min(lo.longitude, hi.longitude);
164     let right = cmp::max(lo.longitude, hi.longitude);
165     let top = cmp::max(lo.latitude, hi.latitude);
166     let bottom = cmp::min(lo.latitude, hi.latitude);
167 
168     point.longitude >= left
169         && point.longitude <= right
170         && point.latitude >= bottom
171         && point.latitude <= top
172 }
173 
174 /// Calculates the distance between two points using the "haversine" formula.
175 /// This code was taken from http://www.movable-type.co.uk/scripts/latlong.html.
176 fn calc_distance(p1: &Point, p2: &Point) -> i32 {
177     const CORD_FACTOR: f64 = 1e7;
178     const R: f64 = 6_371_000.0; // meters
179 
180     let lat1 = p1.latitude as f64 / CORD_FACTOR;
181     let lat2 = p2.latitude as f64 / CORD_FACTOR;
182     let lng1 = p1.longitude as f64 / CORD_FACTOR;
183     let lng2 = p2.longitude as f64 / CORD_FACTOR;
184 
185     let lat_rad1 = lat1.to_radians();
186     let lat_rad2 = lat2.to_radians();
187 
188     let delta_lat = (lat2 - lat1).to_radians();
189     let delta_lng = (lng2 - lng1).to_radians();
190 
191     let a = (delta_lat / 2f64).sin() * (delta_lat / 2f64).sin()
192         + (lat_rad1).cos() * (lat_rad2).cos() * (delta_lng / 2f64).sin() * (delta_lng / 2f64).sin();
193 
194     let c = 2f64 * a.sqrt().atan2((1f64 - a).sqrt());
195 
196     (R * c) as i32
197 }
198