xref: /tonic/tonic/src/transport/server/mod.rs (revision 47ed9d3d)
1 //! Server implementation and builder.
2 
3 mod conn;
4 mod incoming;
5 mod io_stream;
6 mod service;
7 #[cfg(feature = "_tls-any")]
8 mod tls;
9 #[cfg(unix)]
10 mod unix;
11 
12 use tokio_stream::StreamExt as _;
13 use tracing::{debug, trace};
14 
15 #[cfg(feature = "router")]
16 use crate::{server::NamedService, service::Routes};
17 
18 #[cfg(feature = "router")]
19 use std::convert::Infallible;
20 
21 pub use conn::{Connected, TcpConnectInfo};
22 use hyper_util::{
23     rt::{TokioExecutor, TokioIo, TokioTimer},
24     server::conn::auto::{Builder as ConnectionBuilder, HttpServerConnExec},
25     service::TowerToHyperService,
26 };
27 #[cfg(feature = "_tls-any")]
28 pub use tls::ServerTlsConfig;
29 
30 #[cfg(feature = "_tls-any")]
31 pub use conn::TlsConnectInfo;
32 
33 #[cfg(feature = "_tls-any")]
34 use self::service::TlsAcceptor;
35 
36 #[cfg(unix)]
37 pub use unix::UdsConnectInfo;
38 
39 pub use incoming::TcpIncoming;
40 
41 #[cfg(feature = "_tls-any")]
42 use crate::transport::Error;
43 
44 use self::service::{ConnectInfoLayer, ServerIo};
45 use super::service::GrpcTimeout;
46 use crate::body::Body;
47 use crate::service::RecoverErrorLayer;
48 use bytes::Bytes;
49 use http::{Request, Response};
50 use http_body_util::BodyExt;
51 use hyper::{body::Incoming, service::Service as HyperService};
52 use pin_project::pin_project;
53 use std::future::pending;
54 use std::{
55     fmt,
56     future::{self, poll_fn, Future},
57     marker::PhantomData,
58     net::SocketAddr,
59     pin::{pin, Pin},
60     sync::Arc,
61     task::{ready, Context, Poll},
62     time::Duration,
63 };
64 use tokio::io::{AsyncRead, AsyncWrite};
65 use tokio::time::sleep;
66 use tokio_stream::Stream;
67 use tower::{
68     layer::util::{Identity, Stack},
69     layer::Layer,
70     limit::concurrency::ConcurrencyLimitLayer,
71     util::BoxCloneService,
72     Service, ServiceBuilder, ServiceExt,
73 };
74 
75 type BoxService = tower::util::BoxCloneService<Request<Body>, Response<Body>, crate::BoxError>;
76 type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
77 
78 const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
79 
80 /// A default batteries included `transport` server.
81 ///
82 /// This provides an easy builder pattern style builder [`Server`] on top of
83 /// `hyper` connections. This builder exposes easy configuration parameters
84 /// for providing a fully featured http2 based gRPC server. This should provide
85 /// a very good out of the box http2 server for use with tonic but is also a
86 /// reference implementation that should be a good starting point for anyone
87 /// wanting to create a more complex and/or specific implementation.
88 #[derive(Clone)]
89 pub struct Server<L = Identity> {
90     trace_interceptor: Option<TraceInterceptor>,
91     concurrency_limit: Option<usize>,
92     timeout: Option<Duration>,
93     #[cfg(feature = "_tls-any")]
94     tls: Option<TlsAcceptor>,
95     init_stream_window_size: Option<u32>,
96     init_connection_window_size: Option<u32>,
97     max_concurrent_streams: Option<u32>,
98     tcp_keepalive: Option<Duration>,
99     tcp_nodelay: bool,
100     http2_keepalive_interval: Option<Duration>,
101     http2_keepalive_timeout: Option<Duration>,
102     http2_adaptive_window: Option<bool>,
103     http2_max_pending_accept_reset_streams: Option<usize>,
104     http2_max_header_list_size: Option<u32>,
105     max_frame_size: Option<u32>,
106     accept_http1: bool,
107     service_builder: ServiceBuilder<L>,
108     max_connection_age: Option<Duration>,
109 }
110 
111 impl Default for Server<Identity> {
default() -> Self112     fn default() -> Self {
113         Self {
114             trace_interceptor: None,
115             concurrency_limit: None,
116             timeout: None,
117             #[cfg(feature = "_tls-any")]
118             tls: None,
119             init_stream_window_size: None,
120             init_connection_window_size: None,
121             max_concurrent_streams: None,
122             tcp_keepalive: None,
123             tcp_nodelay: false,
124             http2_keepalive_interval: None,
125             http2_keepalive_timeout: None,
126             http2_adaptive_window: None,
127             http2_max_pending_accept_reset_streams: None,
128             http2_max_header_list_size: None,
129             max_frame_size: None,
130             accept_http1: false,
131             service_builder: Default::default(),
132             max_connection_age: None,
133         }
134     }
135 }
136 
137 /// A stack based [`Service`] router.
138 #[cfg(feature = "router")]
139 #[derive(Debug)]
140 pub struct Router<L = Identity> {
141     server: Server<L>,
142     routes: Routes,
143 }
144 
145 impl Server {
146     /// Create a new server builder that can configure a [`Server`].
builder() -> Self147     pub fn builder() -> Self {
148         Server {
149             tcp_nodelay: true,
150             accept_http1: false,
151             ..Default::default()
152         }
153     }
154 }
155 
156 impl<L> Server<L> {
157     /// Configure TLS for this server.
158     #[cfg(feature = "_tls-any")]
tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error>159     pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
160         Ok(Server {
161             tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
162             ..self
163         })
164     }
165 
166     /// Set the concurrency limit applied to on requests inbound per connection.
167     ///
168     /// # Example
169     ///
170     /// ```
171     /// # use tonic::transport::Server;
172     /// # use tower_service::Service;
173     /// # let builder = Server::builder();
174     /// builder.concurrency_limit_per_connection(32);
175     /// ```
176     #[must_use]
concurrency_limit_per_connection(self, limit: usize) -> Self177     pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
178         Server {
179             concurrency_limit: Some(limit),
180             ..self
181         }
182     }
183 
184     /// Set a timeout on for all request handlers.
185     ///
186     /// # Example
187     ///
188     /// ```
189     /// # use tonic::transport::Server;
190     /// # use tower_service::Service;
191     /// # use std::time::Duration;
192     /// # let builder = Server::builder();
193     /// builder.timeout(Duration::from_secs(30));
194     /// ```
195     #[must_use]
timeout(self, timeout: Duration) -> Self196     pub fn timeout(self, timeout: Duration) -> Self {
197         Server {
198             timeout: Some(timeout),
199             ..self
200         }
201     }
202 
203     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
204     /// stream-level flow control.
205     ///
206     /// Default is 65,535
207     ///
208     /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
209     #[must_use]
initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self210     pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
211         Server {
212             init_stream_window_size: sz.into(),
213             ..self
214         }
215     }
216 
217     /// Sets the max connection-level flow control for HTTP2
218     ///
219     /// Default is 65,535
220     #[must_use]
initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self221     pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
222         Server {
223             init_connection_window_size: sz.into(),
224             ..self
225         }
226     }
227 
228     /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
229     /// connections.
230     ///
231     /// Default is no limit (`None`).
232     ///
233     /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
234     #[must_use]
max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self235     pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
236         Server {
237             max_concurrent_streams: max.into(),
238             ..self
239         }
240     }
241 
242     /// Sets the maximum time option in milliseconds that a connection may exist
243     ///
244     /// Default is no limit (`None`).
245     ///
246     /// # Example
247     ///
248     /// ```
249     /// # use tonic::transport::Server;
250     /// # use tower_service::Service;
251     /// # use std::time::Duration;
252     /// # let builder = Server::builder();
253     /// builder.max_connection_age(Duration::from_secs(60));
254     /// ```
255     #[must_use]
max_connection_age(self, max_connection_age: Duration) -> Self256     pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
257         Server {
258             max_connection_age: Some(max_connection_age),
259             ..self
260         }
261     }
262 
263     /// Set whether HTTP2 Ping frames are enabled on accepted connections.
264     ///
265     /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
266     /// specified will be the time interval between HTTP2 Ping frames.
267     /// The timeout for receiving an acknowledgement of the keepalive ping
268     /// can be set with [`Server::http2_keepalive_timeout`].
269     ///
270     /// Default is no HTTP2 keepalive (`None`)
271     ///
272     #[must_use]
http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self273     pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
274         Server {
275             http2_keepalive_interval,
276             ..self
277         }
278     }
279 
280     /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
281     ///
282     /// If the ping is not acknowledged within the timeout, the connection will be closed.
283     /// Does nothing if http2_keep_alive_interval is disabled.
284     ///
285     /// Default is 20 seconds.
286     ///
287     #[must_use]
http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self288     pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
289         Server {
290             http2_keepalive_timeout,
291             ..self
292         }
293     }
294 
295     /// Sets whether to use an adaptive flow control. Defaults to false.
296     /// Enabling this will override the limits set in http2_initial_stream_window_size and
297     /// http2_initial_connection_window_size.
298     #[must_use]
http2_adaptive_window(self, enabled: Option<bool>) -> Self299     pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
300         Server {
301             http2_adaptive_window: enabled,
302             ..self
303         }
304     }
305 
306     /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
307     ///
308     /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
309     ///
310     /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
311     #[must_use]
http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self312     pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
313         Server {
314             http2_max_pending_accept_reset_streams: max,
315             ..self
316         }
317     }
318 
319     /// Set whether TCP keepalive messages are enabled on accepted connections.
320     ///
321     /// If `None` is specified, keepalive is disabled, otherwise the duration
322     /// specified will be the time to remain idle before sending TCP keepalive
323     /// probes.
324     ///
325     /// Default is no keepalive (`None`)
326     ///
327     #[must_use]
tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self328     pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
329         Server {
330             tcp_keepalive,
331             ..self
332         }
333     }
334 
335     /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
336     #[must_use]
tcp_nodelay(self, enabled: bool) -> Self337     pub fn tcp_nodelay(self, enabled: bool) -> Self {
338         Server {
339             tcp_nodelay: enabled,
340             ..self
341         }
342     }
343 
344     /// Sets the max size of received header frames.
345     ///
346     /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
347     #[must_use]
http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self348     pub fn http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self {
349         Server {
350             http2_max_header_list_size: max.into(),
351             ..self
352         }
353     }
354 
355     /// Sets the maximum frame size to use for HTTP2.
356     ///
357     /// Passing `None` will do nothing.
358     ///
359     /// If not set, will default from underlying transport.
360     #[must_use]
max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self361     pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
362         Server {
363             max_frame_size: frame_size.into(),
364             ..self
365         }
366     }
367 
368     /// Allow this server to accept http1 requests.
369     ///
370     /// Accepting http1 requests is only useful when developing `grpc-web`
371     /// enabled services. If this setting is set to `true` but services are
372     /// not correctly configured to handle grpc-web requests, your server may
373     /// return confusing (but correct) protocol errors.
374     ///
375     /// Default is `false`.
376     #[must_use]
accept_http1(self, accept_http1: bool) -> Self377     pub fn accept_http1(self, accept_http1: bool) -> Self {
378         Server {
379             accept_http1,
380             ..self
381         }
382     }
383 
384     /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
385     #[must_use]
trace_fn<F>(self, f: F) -> Self where F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,386     pub fn trace_fn<F>(self, f: F) -> Self
387     where
388         F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
389     {
390         Server {
391             trace_interceptor: Some(Arc::new(f)),
392             ..self
393         }
394     }
395 
396     /// Create a router with the `S` typed service as the first service.
397     ///
398     /// This will clone the `Server` builder and create a router that will
399     /// route around different services.
400     #[cfg(feature = "router")]
add_service<S>(&mut self, svc: S) -> Router<L> where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, L: Clone,401     pub fn add_service<S>(&mut self, svc: S) -> Router<L>
402     where
403         S: Service<Request<Body>, Error = Infallible>
404             + NamedService
405             + Clone
406             + Send
407             + Sync
408             + 'static,
409         S::Response: axum::response::IntoResponse,
410         S::Future: Send + 'static,
411         L: Clone,
412     {
413         Router::new(self.clone(), Routes::new(svc))
414     }
415 
416     /// Create a router with the optional `S` typed service as the first service.
417     ///
418     /// This will clone the `Server` builder and create a router that will
419     /// route around different services.
420     ///
421     /// # Note
422     /// Even when the argument given is `None` this will capture *all* requests to this service name.
423     /// As a result, one cannot use this to toggle between two identically named implementations.
424     #[cfg(feature = "router")]
add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L> where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, L: Clone,425     pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
426     where
427         S: Service<Request<Body>, Error = Infallible>
428             + NamedService
429             + Clone
430             + Send
431             + Sync
432             + 'static,
433         S::Response: axum::response::IntoResponse,
434         S::Future: Send + 'static,
435         L: Clone,
436     {
437         let routes = svc.map(Routes::new).unwrap_or_default();
438         Router::new(self.clone(), routes)
439     }
440 
441     /// Create a router with given [`Routes`].
442     ///
443     /// This will clone the `Server` builder and create a router that will
444     /// route around different services that were already added to the provided `routes`.
445     #[cfg(feature = "router")]
add_routes(&mut self, routes: Routes) -> Router<L> where L: Clone,446     pub fn add_routes(&mut self, routes: Routes) -> Router<L>
447     where
448         L: Clone,
449     {
450         Router::new(self.clone(), routes)
451     }
452 
453     /// Set the [Tower] [`Layer`] all services will be wrapped in.
454     ///
455     /// This enables using middleware from the [Tower ecosystem][eco].
456     ///
457     /// # Example
458     ///
459     /// ```
460     /// # use tonic::transport::Server;
461     /// # use tower_service::Service;
462     /// use tower::timeout::TimeoutLayer;
463     /// use std::time::Duration;
464     ///
465     /// # let mut builder = Server::builder();
466     /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
467     /// ```
468     ///
469     /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
470     /// here as an example.
471     ///
472     /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
473     /// [interceptors]:
474     ///
475     /// ```
476     /// # use tonic::transport::Server;
477     /// # use tower_service::Service;
478     /// use tower::ServiceBuilder;
479     /// use std::time::Duration;
480     /// use tonic::{Request, Status, service::InterceptorLayer};
481     ///
482     /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
483     ///     if valid_credentials(&request) {
484     ///         Ok(request)
485     ///     } else {
486     ///         Err(Status::unauthenticated("invalid credentials"))
487     ///     }
488     /// }
489     ///
490     /// fn valid_credentials(request: &Request<()>) -> bool {
491     ///     // ...
492     ///     # true
493     /// }
494     ///
495     /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
496     ///     Ok(request)
497     /// }
498     ///
499     /// let layer = ServiceBuilder::new()
500     ///     .load_shed()
501     ///     .timeout(Duration::from_secs(30))
502     ///     .layer(InterceptorLayer::new(auth_interceptor))
503     ///     .layer(InterceptorLayer::new(some_other_interceptor))
504     ///     .into_inner();
505     ///
506     /// Server::builder().layer(layer);
507     /// ```
508     ///
509     /// [Tower]: https://github.com/tower-rs/tower
510     /// [`Layer`]: tower::layer::Layer
511     /// [eco]: https://github.com/tower-rs
512     /// [`ServiceBuilder`]: tower::ServiceBuilder
513     /// [interceptors]: crate::service::Interceptor
layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>>514     pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
515         Server {
516             service_builder: self.service_builder.layer(new_layer),
517             trace_interceptor: self.trace_interceptor,
518             concurrency_limit: self.concurrency_limit,
519             timeout: self.timeout,
520             #[cfg(feature = "_tls-any")]
521             tls: self.tls,
522             init_stream_window_size: self.init_stream_window_size,
523             init_connection_window_size: self.init_connection_window_size,
524             max_concurrent_streams: self.max_concurrent_streams,
525             tcp_keepalive: self.tcp_keepalive,
526             tcp_nodelay: self.tcp_nodelay,
527             http2_keepalive_interval: self.http2_keepalive_interval,
528             http2_keepalive_timeout: self.http2_keepalive_timeout,
529             http2_adaptive_window: self.http2_adaptive_window,
530             http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
531             http2_max_header_list_size: self.http2_max_header_list_size,
532             max_frame_size: self.max_frame_size,
533             accept_http1: self.accept_http1,
534             max_connection_age: self.max_connection_age,
535         }
536     }
537 
bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error>538     fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
539         Ok(TcpIncoming::bind(addr)
540             .map_err(super::Error::from_source)?
541             .with_nodelay(Some(self.tcp_nodelay))
542             .with_keepalive(self.tcp_keepalive))
543     }
544 
545     /// Serve the service.
serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,546     pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
547     where
548         L: Layer<S>,
549         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
550         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
551         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
552             Into<crate::BoxError> + Send + 'static,
553         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
554         ResBody::Error: Into<crate::BoxError>,
555     {
556         let incoming = self.bind_incoming(addr)?;
557         self.serve_with_incoming(svc, incoming).await
558     }
559 
560     /// Serve the service with the shutdown signal.
serve_with_shutdown<S, F, ResBody>( self, addr: SocketAddr, svc: S, signal: F, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,561     pub async fn serve_with_shutdown<S, F, ResBody>(
562         self,
563         addr: SocketAddr,
564         svc: S,
565         signal: F,
566     ) -> Result<(), super::Error>
567     where
568         L: Layer<S>,
569         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
570         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
571         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
572             Into<crate::BoxError> + Send + 'static,
573         F: Future<Output = ()>,
574         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
575         ResBody::Error: Into<crate::BoxError>,
576     {
577         let incoming = self.bind_incoming(addr)?;
578         self.serve_with_incoming_shutdown(svc, incoming, signal)
579             .await
580     }
581 
582     /// Serve the service on the provided incoming stream.
serve_with_incoming<S, I, IO, IE, ResBody>( self, svc: S, incoming: I, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,583     pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
584         self,
585         svc: S,
586         incoming: I,
587     ) -> Result<(), super::Error>
588     where
589         L: Layer<S>,
590         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
591         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
592         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
593             Into<crate::BoxError> + Send + 'static,
594         I: Stream<Item = Result<IO, IE>>,
595         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
596         IE: Into<crate::BoxError>,
597         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
598         ResBody::Error: Into<crate::BoxError>,
599     {
600         self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
601             .await
602     }
603 
604     /// Serve the service with the signal on the provided incoming stream.
serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: F, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,605     pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
606         self,
607         svc: S,
608         incoming: I,
609         signal: F,
610     ) -> Result<(), super::Error>
611     where
612         L: Layer<S>,
613         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
614         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
615         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
616             Into<crate::BoxError> + Send + 'static,
617         I: Stream<Item = Result<IO, IE>>,
618         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
619         IE: Into<crate::BoxError>,
620         F: Future<Output = ()>,
621         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
622         ResBody::Error: Into<crate::BoxError>,
623     {
624         self.serve_internal(svc, incoming, Some(signal)).await
625     }
626 
serve_internal<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: Option<F>, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,627     async fn serve_internal<S, I, F, IO, IE, ResBody>(
628         self,
629         svc: S,
630         incoming: I,
631         signal: Option<F>,
632     ) -> Result<(), super::Error>
633     where
634         L: Layer<S>,
635         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
636         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
637         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
638             Into<crate::BoxError> + Send + 'static,
639         I: Stream<Item = Result<IO, IE>>,
640         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
641         IE: Into<crate::BoxError>,
642         F: Future<Output = ()>,
643         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
644         ResBody::Error: Into<crate::BoxError>,
645     {
646         let trace_interceptor = self.trace_interceptor.clone();
647         let concurrency_limit = self.concurrency_limit;
648         let init_connection_window_size = self.init_connection_window_size;
649         let init_stream_window_size = self.init_stream_window_size;
650         let max_concurrent_streams = self.max_concurrent_streams;
651         let timeout = self.timeout;
652         let max_header_list_size = self.http2_max_header_list_size;
653         let max_frame_size = self.max_frame_size;
654         let http2_only = !self.accept_http1;
655 
656         let http2_keepalive_interval = self.http2_keepalive_interval;
657         let http2_keepalive_timeout = self
658             .http2_keepalive_timeout
659             .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
660         let http2_adaptive_window = self.http2_adaptive_window;
661         let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
662         let max_connection_age = self.max_connection_age;
663 
664         let svc = self.service_builder.service(svc);
665 
666         let incoming = io_stream::ServerIoStream::new(
667             incoming,
668             #[cfg(feature = "_tls-any")]
669             self.tls,
670         );
671         let mut svc = MakeSvc {
672             inner: svc,
673             concurrency_limit,
674             timeout,
675             trace_interceptor,
676             _io: PhantomData,
677         };
678 
679         let server = {
680             let mut builder = ConnectionBuilder::new(TokioExecutor::new());
681 
682             if http2_only {
683                 builder = builder.http2_only();
684             }
685 
686             builder
687                 .http2()
688                 .timer(TokioTimer::new())
689                 .initial_connection_window_size(init_connection_window_size)
690                 .initial_stream_window_size(init_stream_window_size)
691                 .max_concurrent_streams(max_concurrent_streams)
692                 .keep_alive_interval(http2_keepalive_interval)
693                 .keep_alive_timeout(http2_keepalive_timeout)
694                 .adaptive_window(http2_adaptive_window.unwrap_or_default())
695                 .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
696                 .max_frame_size(max_frame_size);
697 
698             if let Some(max_header_list_size) = max_header_list_size {
699                 builder.http2().max_header_list_size(max_header_list_size);
700             }
701 
702             builder
703         };
704 
705         let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
706         let signal_tx = Arc::new(signal_tx);
707 
708         let graceful = signal.is_some();
709         let mut sig = pin!(Fuse { inner: signal });
710         let mut incoming = pin!(incoming);
711 
712         loop {
713             tokio::select! {
714                 _ = &mut sig => {
715                     trace!("signal received, shutting down");
716                     break;
717                 },
718                 io = incoming.next() => {
719                     let io = match io {
720                         Some(Ok(io)) => io,
721                         Some(Err(e)) => {
722                             trace!("error accepting connection: {:#}", e);
723                             continue;
724                         },
725                         None => {
726                             break
727                         },
728                     };
729 
730                     trace!("connection accepted");
731 
732                     poll_fn(|cx| svc.poll_ready(cx))
733                         .await
734                         .map_err(super::Error::from_source)?;
735 
736                     let req_svc = svc
737                         .call(&io)
738                         .await
739                         .map_err(super::Error::from_source)?;
740 
741                     let hyper_io = TokioIo::new(io);
742                     let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(Body::new)));
743 
744                     serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age);
745                 }
746             }
747         }
748 
749         if graceful {
750             let _ = signal_tx.send(());
751             drop(signal_rx);
752             trace!(
753                 "waiting for {} connections to close",
754                 signal_tx.receiver_count()
755             );
756 
757             // Wait for all connections to close
758             signal_tx.closed().await;
759         }
760 
761         Ok(())
762     }
763 }
764 
765 // This is moved to its own function as a way to get around
766 // https://github.com/rust-lang/rust/issues/102211
serve_connection<B, IO, S, E>( hyper_io: IO, hyper_svc: S, builder: ConnectionBuilder<E>, mut watcher: Option<tokio::sync::watch::Receiver<()>>, max_connection_age: Option<Duration>, ) where B: http_body::Body + Send + 'static, B::Data: Send, B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync, IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send, E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,767 fn serve_connection<B, IO, S, E>(
768     hyper_io: IO,
769     hyper_svc: S,
770     builder: ConnectionBuilder<E>,
771     mut watcher: Option<tokio::sync::watch::Receiver<()>>,
772     max_connection_age: Option<Duration>,
773 ) where
774     B: http_body::Body + Send + 'static,
775     B::Data: Send,
776     B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
777     IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
778     S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
779     S::Future: Send + 'static,
780     S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
781     E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
782 {
783     tokio::spawn(async move {
784         {
785             let mut sig = pin!(Fuse {
786                 inner: watcher.as_mut().map(|w| w.changed()),
787             });
788 
789             let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
790 
791             let sleep = sleep_or_pending(max_connection_age);
792             tokio::pin!(sleep);
793 
794             loop {
795                 tokio::select! {
796                     rv = &mut conn => {
797                         if let Err(err) = rv {
798                             debug!("failed serving connection: {:#}", err);
799                         }
800                         break;
801                     },
802                     _ = &mut sleep  => {
803                         conn.as_mut().graceful_shutdown();
804                         sleep.set(sleep_or_pending(None));
805                     },
806                     _ = &mut sig => {
807                         conn.as_mut().graceful_shutdown();
808                     }
809                 }
810             }
811         }
812 
813         drop(watcher);
814         trace!("connection closed");
815     });
816 }
817 
sleep_or_pending(wait_for: Option<Duration>)818 async fn sleep_or_pending(wait_for: Option<Duration>) {
819     match wait_for {
820         Some(wait) => sleep(wait).await,
821         None => pending().await,
822     };
823 }
824 
825 #[cfg(feature = "router")]
826 impl<L> Router<L> {
new(server: Server<L>, routes: Routes) -> Self827     pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
828         Self { server, routes }
829     }
830 }
831 
832 #[cfg(feature = "router")]
833 impl<L> Router<L> {
834     /// Add a new service to this router.
add_service<S>(mut self, svc: S) -> Self where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static,835     pub fn add_service<S>(mut self, svc: S) -> Self
836     where
837         S: Service<Request<Body>, Error = Infallible>
838             + NamedService
839             + Clone
840             + Send
841             + Sync
842             + 'static,
843         S::Response: axum::response::IntoResponse,
844         S::Future: Send + 'static,
845     {
846         self.routes = self.routes.add_service(svc);
847         self
848     }
849 
850     /// Add a new optional service to this router.
851     ///
852     /// # Note
853     /// Even when the argument given is `None` this will capture *all* requests to this service name.
854     /// As a result, one cannot use this to toggle between two identically named implementations.
add_optional_service<S>(mut self, svc: Option<S>) -> Self where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static,855     pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
856     where
857         S: Service<Request<Body>, Error = Infallible>
858             + NamedService
859             + Clone
860             + Send
861             + Sync
862             + 'static,
863         S::Response: axum::response::IntoResponse,
864         S::Future: Send + 'static,
865     {
866         if let Some(svc) = svc {
867             self.routes = self.routes.add_service(svc);
868         }
869         self
870     }
871 
872     /// Consume this [`Server`] creating a future that will execute the server
873     /// on [tokio]'s default executor.
874     ///
875     /// [`Server`]: struct.Server.html
876     /// [tokio]: https://docs.rs/tokio
serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer<Routes> + Clone, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,877     pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
878     where
879         L: Layer<Routes> + Clone,
880         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
881         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
882         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
883             Into<crate::BoxError> + Send,
884         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
885         ResBody::Error: Into<crate::BoxError>,
886     {
887         self.server.serve(addr, self.routes.prepare()).await
888     }
889 
890     /// Consume this [`Server`] creating a future that will execute the server
891     /// on [tokio]'s default executor. And shutdown when the provided signal
892     /// is received.
893     ///
894     /// [`Server`]: struct.Server.html
895     /// [tokio]: https://docs.rs/tokio
serve_with_shutdown<F: Future<Output = ()>, ResBody>( self, addr: SocketAddr, signal: F, ) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,896     pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
897         self,
898         addr: SocketAddr,
899         signal: F,
900     ) -> Result<(), super::Error>
901     where
902         L: Layer<Routes>,
903         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
904         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
905         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
906             Into<crate::BoxError> + Send,
907         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
908         ResBody::Error: Into<crate::BoxError>,
909     {
910         self.server
911             .serve_with_shutdown(addr, self.routes.prepare(), signal)
912             .await
913     }
914 
915     /// Consume this [`Server`] creating a future that will execute the server
916     /// on the provided incoming stream of `AsyncRead + AsyncWrite`.
917     ///
918     /// This method discards any provided [`Server`] TCP configuration.
919     ///
920     /// [`Server`]: struct.Server.html
serve_with_incoming<I, IO, IE, ResBody>( self, incoming: I, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,921     pub async fn serve_with_incoming<I, IO, IE, ResBody>(
922         self,
923         incoming: I,
924     ) -> Result<(), super::Error>
925     where
926         I: Stream<Item = Result<IO, IE>>,
927         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
928         IE: Into<crate::BoxError>,
929         L: Layer<Routes>,
930 
931         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
932         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
933         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
934             Into<crate::BoxError> + Send,
935         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
936         ResBody::Error: Into<crate::BoxError>,
937     {
938         self.server
939             .serve_with_incoming(self.routes.prepare(), incoming)
940             .await
941     }
942 
943     /// Consume this [`Server`] creating a future that will execute the server
944     /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
945     /// `serve_with_shutdown` this method will also take a signal future to
946     /// gracefully shutdown the server.
947     ///
948     /// This method discards any provided [`Server`] TCP configuration.
949     ///
950     /// [`Server`]: struct.Server.html
serve_with_incoming_shutdown<I, IO, IE, F, ResBody>( self, incoming: I, signal: F, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,951     pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
952         self,
953         incoming: I,
954         signal: F,
955     ) -> Result<(), super::Error>
956     where
957         I: Stream<Item = Result<IO, IE>>,
958         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
959         IE: Into<crate::BoxError>,
960         F: Future<Output = ()>,
961         L: Layer<Routes>,
962         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
963         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
964         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
965             Into<crate::BoxError> + Send,
966         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
967         ResBody::Error: Into<crate::BoxError>,
968     {
969         self.server
970             .serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
971             .await
972     }
973 }
974 
975 impl<L> fmt::Debug for Server<L> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result976     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977         f.debug_struct("Builder").finish()
978     }
979 }
980 
981 #[derive(Clone)]
982 struct Svc<S> {
983     inner: S,
984     trace_interceptor: Option<TraceInterceptor>,
985 }
986 
987 impl<S, ResBody> Service<Request<Body>> for Svc<S>
988 where
989     S: Service<Request<Body>, Response = Response<ResBody>>,
990     S::Error: Into<crate::BoxError>,
991     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
992     ResBody::Error: Into<crate::BoxError>,
993 {
994     type Response = Response<Body>;
995     type Error = crate::BoxError;
996     type Future = SvcFuture<S::Future>;
997 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>998     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
999         self.inner.poll_ready(cx).map_err(Into::into)
1000     }
1001 
call(&mut self, mut req: Request<Body>) -> Self::Future1002     fn call(&mut self, mut req: Request<Body>) -> Self::Future {
1003         let span = if let Some(trace_interceptor) = &self.trace_interceptor {
1004             let (parts, body) = req.into_parts();
1005             let bodyless_request = Request::from_parts(parts, ());
1006 
1007             let span = trace_interceptor(&bodyless_request);
1008 
1009             let (parts, _) = bodyless_request.into_parts();
1010             req = Request::from_parts(parts, body);
1011 
1012             span
1013         } else {
1014             tracing::Span::none()
1015         };
1016 
1017         SvcFuture {
1018             inner: self.inner.call(req),
1019             span,
1020         }
1021     }
1022 }
1023 
1024 #[pin_project]
1025 struct SvcFuture<F> {
1026     #[pin]
1027     inner: F,
1028     span: tracing::Span,
1029 }
1030 
1031 impl<F, E, ResBody> Future for SvcFuture<F>
1032 where
1033     F: Future<Output = Result<Response<ResBody>, E>>,
1034     E: Into<crate::BoxError>,
1035     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1036     ResBody::Error: Into<crate::BoxError>,
1037 {
1038     type Output = Result<Response<Body>, crate::BoxError>;
1039 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1040     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1041         let this = self.project();
1042         let _guard = this.span.enter();
1043 
1044         let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
1045         let response = response.map(|body| Body::new(body.map_err(Into::into)));
1046         Poll::Ready(Ok(response))
1047     }
1048 }
1049 
1050 impl<S> fmt::Debug for Svc<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1051     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1052         f.debug_struct("Svc").finish()
1053     }
1054 }
1055 
1056 #[derive(Clone)]
1057 struct MakeSvc<S, IO> {
1058     concurrency_limit: Option<usize>,
1059     timeout: Option<Duration>,
1060     inner: S,
1061     trace_interceptor: Option<TraceInterceptor>,
1062     _io: PhantomData<fn() -> IO>,
1063 }
1064 
1065 impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
1066 where
1067     IO: Connected + 'static,
1068     S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1069     S::Future: Send,
1070     S::Error: Into<crate::BoxError> + Send,
1071     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1072     ResBody::Error: Into<crate::BoxError>,
1073 {
1074     type Response = BoxService;
1075     type Error = crate::BoxError;
1076     type Future = future::Ready<Result<Self::Response, Self::Error>>;
1077 
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>1078     fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1079         Ok(()).into()
1080     }
1081 
call(&mut self, io: &ServerIo<IO>) -> Self::Future1082     fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
1083         let conn_info = io.connect_info();
1084 
1085         let svc = self.inner.clone();
1086         let concurrency_limit = self.concurrency_limit;
1087         let timeout = self.timeout;
1088         let trace_interceptor = self.trace_interceptor.clone();
1089 
1090         let svc = ServiceBuilder::new()
1091             .layer(RecoverErrorLayer::new())
1092             .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
1093             .layer_fn(|s| GrpcTimeout::new(s, timeout))
1094             .service(svc);
1095 
1096         let svc = ServiceBuilder::new()
1097             .layer(BoxCloneService::layer())
1098             .layer(ConnectInfoLayer::new(conn_info.clone()))
1099             .service(Svc {
1100                 inner: svc,
1101                 trace_interceptor,
1102             });
1103 
1104         future::ready(Ok(svc))
1105     }
1106 }
1107 
1108 // From `futures-util` crate, borrowed since this is the only dependency tonic requires.
1109 // LICENSE: MIT or Apache-2.0
1110 // A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
1111 #[pin_project]
1112 struct Fuse<F> {
1113     #[pin]
1114     inner: Option<F>,
1115 }
1116 
1117 impl<F> Future for Fuse<F>
1118 where
1119     F: Future,
1120 {
1121     type Output = F::Output;
1122 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1123     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1124         match self.as_mut().project().inner.as_pin_mut() {
1125             Some(fut) => fut.poll(cx).map(|output| {
1126                 self.project().inner.set(None);
1127                 output
1128             }),
1129             None => Poll::Pending,
1130         }
1131     }
1132 }
1133