1 //! Server implementation and builder.
2
3 mod conn;
4 mod incoming;
5 mod io_stream;
6 mod service;
7 #[cfg(feature = "_tls-any")]
8 mod tls;
9 #[cfg(unix)]
10 mod unix;
11
12 use tokio_stream::StreamExt as _;
13 use tracing::{debug, trace};
14
15 #[cfg(feature = "router")]
16 use crate::{server::NamedService, service::Routes};
17
18 #[cfg(feature = "router")]
19 use std::convert::Infallible;
20
21 pub use conn::{Connected, TcpConnectInfo};
22 use hyper_util::{
23 rt::{TokioExecutor, TokioIo, TokioTimer},
24 server::conn::auto::{Builder as ConnectionBuilder, HttpServerConnExec},
25 service::TowerToHyperService,
26 };
27 #[cfg(feature = "_tls-any")]
28 pub use tls::ServerTlsConfig;
29
30 #[cfg(feature = "_tls-any")]
31 pub use conn::TlsConnectInfo;
32
33 #[cfg(feature = "_tls-any")]
34 use self::service::TlsAcceptor;
35
36 #[cfg(unix)]
37 pub use unix::UdsConnectInfo;
38
39 pub use incoming::TcpIncoming;
40
41 #[cfg(feature = "_tls-any")]
42 use crate::transport::Error;
43
44 use self::service::{ConnectInfoLayer, ServerIo};
45 use super::service::GrpcTimeout;
46 use crate::body::Body;
47 use crate::service::RecoverErrorLayer;
48 use bytes::Bytes;
49 use http::{Request, Response};
50 use http_body_util::BodyExt;
51 use hyper::{body::Incoming, service::Service as HyperService};
52 use pin_project::pin_project;
53 use std::future::pending;
54 use std::{
55 fmt,
56 future::{self, poll_fn, Future},
57 marker::PhantomData,
58 net::SocketAddr,
59 pin::{pin, Pin},
60 sync::Arc,
61 task::{ready, Context, Poll},
62 time::Duration,
63 };
64 use tokio::io::{AsyncRead, AsyncWrite};
65 use tokio::time::sleep;
66 use tokio_stream::Stream;
67 use tower::{
68 layer::util::{Identity, Stack},
69 layer::Layer,
70 limit::concurrency::ConcurrencyLimitLayer,
71 util::BoxCloneService,
72 Service, ServiceBuilder, ServiceExt,
73 };
74
75 type BoxService = tower::util::BoxCloneService<Request<Body>, Response<Body>, crate::BoxError>;
76 type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
77
78 const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
79
80 /// A default batteries included `transport` server.
81 ///
82 /// This provides an easy builder pattern style builder [`Server`] on top of
83 /// `hyper` connections. This builder exposes easy configuration parameters
84 /// for providing a fully featured http2 based gRPC server. This should provide
85 /// a very good out of the box http2 server for use with tonic but is also a
86 /// reference implementation that should be a good starting point for anyone
87 /// wanting to create a more complex and/or specific implementation.
88 #[derive(Clone)]
89 pub struct Server<L = Identity> {
90 trace_interceptor: Option<TraceInterceptor>,
91 concurrency_limit: Option<usize>,
92 timeout: Option<Duration>,
93 #[cfg(feature = "_tls-any")]
94 tls: Option<TlsAcceptor>,
95 init_stream_window_size: Option<u32>,
96 init_connection_window_size: Option<u32>,
97 max_concurrent_streams: Option<u32>,
98 tcp_keepalive: Option<Duration>,
99 tcp_nodelay: bool,
100 http2_keepalive_interval: Option<Duration>,
101 http2_keepalive_timeout: Option<Duration>,
102 http2_adaptive_window: Option<bool>,
103 http2_max_pending_accept_reset_streams: Option<usize>,
104 http2_max_header_list_size: Option<u32>,
105 max_frame_size: Option<u32>,
106 accept_http1: bool,
107 service_builder: ServiceBuilder<L>,
108 max_connection_age: Option<Duration>,
109 }
110
111 impl Default for Server<Identity> {
default() -> Self112 fn default() -> Self {
113 Self {
114 trace_interceptor: None,
115 concurrency_limit: None,
116 timeout: None,
117 #[cfg(feature = "_tls-any")]
118 tls: None,
119 init_stream_window_size: None,
120 init_connection_window_size: None,
121 max_concurrent_streams: None,
122 tcp_keepalive: None,
123 tcp_nodelay: false,
124 http2_keepalive_interval: None,
125 http2_keepalive_timeout: None,
126 http2_adaptive_window: None,
127 http2_max_pending_accept_reset_streams: None,
128 http2_max_header_list_size: None,
129 max_frame_size: None,
130 accept_http1: false,
131 service_builder: Default::default(),
132 max_connection_age: None,
133 }
134 }
135 }
136
137 /// A stack based [`Service`] router.
138 #[cfg(feature = "router")]
139 #[derive(Debug)]
140 pub struct Router<L = Identity> {
141 server: Server<L>,
142 routes: Routes,
143 }
144
145 impl Server {
146 /// Create a new server builder that can configure a [`Server`].
builder() -> Self147 pub fn builder() -> Self {
148 Server {
149 tcp_nodelay: true,
150 accept_http1: false,
151 ..Default::default()
152 }
153 }
154 }
155
156 impl<L> Server<L> {
157 /// Configure TLS for this server.
158 #[cfg(feature = "_tls-any")]
tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error>159 pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
160 Ok(Server {
161 tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
162 ..self
163 })
164 }
165
166 /// Set the concurrency limit applied to on requests inbound per connection.
167 ///
168 /// # Example
169 ///
170 /// ```
171 /// # use tonic::transport::Server;
172 /// # use tower_service::Service;
173 /// # let builder = Server::builder();
174 /// builder.concurrency_limit_per_connection(32);
175 /// ```
176 #[must_use]
concurrency_limit_per_connection(self, limit: usize) -> Self177 pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
178 Server {
179 concurrency_limit: Some(limit),
180 ..self
181 }
182 }
183
184 /// Set a timeout on for all request handlers.
185 ///
186 /// # Example
187 ///
188 /// ```
189 /// # use tonic::transport::Server;
190 /// # use tower_service::Service;
191 /// # use std::time::Duration;
192 /// # let builder = Server::builder();
193 /// builder.timeout(Duration::from_secs(30));
194 /// ```
195 #[must_use]
timeout(self, timeout: Duration) -> Self196 pub fn timeout(self, timeout: Duration) -> Self {
197 Server {
198 timeout: Some(timeout),
199 ..self
200 }
201 }
202
203 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
204 /// stream-level flow control.
205 ///
206 /// Default is 65,535
207 ///
208 /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
209 #[must_use]
initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self210 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
211 Server {
212 init_stream_window_size: sz.into(),
213 ..self
214 }
215 }
216
217 /// Sets the max connection-level flow control for HTTP2
218 ///
219 /// Default is 65,535
220 #[must_use]
initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self221 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
222 Server {
223 init_connection_window_size: sz.into(),
224 ..self
225 }
226 }
227
228 /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
229 /// connections.
230 ///
231 /// Default is no limit (`None`).
232 ///
233 /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
234 #[must_use]
max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self235 pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
236 Server {
237 max_concurrent_streams: max.into(),
238 ..self
239 }
240 }
241
242 /// Sets the maximum time option in milliseconds that a connection may exist
243 ///
244 /// Default is no limit (`None`).
245 ///
246 /// # Example
247 ///
248 /// ```
249 /// # use tonic::transport::Server;
250 /// # use tower_service::Service;
251 /// # use std::time::Duration;
252 /// # let builder = Server::builder();
253 /// builder.max_connection_age(Duration::from_secs(60));
254 /// ```
255 #[must_use]
max_connection_age(self, max_connection_age: Duration) -> Self256 pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
257 Server {
258 max_connection_age: Some(max_connection_age),
259 ..self
260 }
261 }
262
263 /// Set whether HTTP2 Ping frames are enabled on accepted connections.
264 ///
265 /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
266 /// specified will be the time interval between HTTP2 Ping frames.
267 /// The timeout for receiving an acknowledgement of the keepalive ping
268 /// can be set with [`Server::http2_keepalive_timeout`].
269 ///
270 /// Default is no HTTP2 keepalive (`None`)
271 ///
272 #[must_use]
http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self273 pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
274 Server {
275 http2_keepalive_interval,
276 ..self
277 }
278 }
279
280 /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
281 ///
282 /// If the ping is not acknowledged within the timeout, the connection will be closed.
283 /// Does nothing if http2_keep_alive_interval is disabled.
284 ///
285 /// Default is 20 seconds.
286 ///
287 #[must_use]
http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self288 pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
289 Server {
290 http2_keepalive_timeout,
291 ..self
292 }
293 }
294
295 /// Sets whether to use an adaptive flow control. Defaults to false.
296 /// Enabling this will override the limits set in http2_initial_stream_window_size and
297 /// http2_initial_connection_window_size.
298 #[must_use]
http2_adaptive_window(self, enabled: Option<bool>) -> Self299 pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
300 Server {
301 http2_adaptive_window: enabled,
302 ..self
303 }
304 }
305
306 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
307 ///
308 /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
309 ///
310 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
311 #[must_use]
http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self312 pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
313 Server {
314 http2_max_pending_accept_reset_streams: max,
315 ..self
316 }
317 }
318
319 /// Set whether TCP keepalive messages are enabled on accepted connections.
320 ///
321 /// If `None` is specified, keepalive is disabled, otherwise the duration
322 /// specified will be the time to remain idle before sending TCP keepalive
323 /// probes.
324 ///
325 /// Default is no keepalive (`None`)
326 ///
327 #[must_use]
tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self328 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
329 Server {
330 tcp_keepalive,
331 ..self
332 }
333 }
334
335 /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
336 #[must_use]
tcp_nodelay(self, enabled: bool) -> Self337 pub fn tcp_nodelay(self, enabled: bool) -> Self {
338 Server {
339 tcp_nodelay: enabled,
340 ..self
341 }
342 }
343
344 /// Sets the max size of received header frames.
345 ///
346 /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
347 #[must_use]
http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self348 pub fn http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self {
349 Server {
350 http2_max_header_list_size: max.into(),
351 ..self
352 }
353 }
354
355 /// Sets the maximum frame size to use for HTTP2.
356 ///
357 /// Passing `None` will do nothing.
358 ///
359 /// If not set, will default from underlying transport.
360 #[must_use]
max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self361 pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
362 Server {
363 max_frame_size: frame_size.into(),
364 ..self
365 }
366 }
367
368 /// Allow this server to accept http1 requests.
369 ///
370 /// Accepting http1 requests is only useful when developing `grpc-web`
371 /// enabled services. If this setting is set to `true` but services are
372 /// not correctly configured to handle grpc-web requests, your server may
373 /// return confusing (but correct) protocol errors.
374 ///
375 /// Default is `false`.
376 #[must_use]
accept_http1(self, accept_http1: bool) -> Self377 pub fn accept_http1(self, accept_http1: bool) -> Self {
378 Server {
379 accept_http1,
380 ..self
381 }
382 }
383
384 /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
385 #[must_use]
trace_fn<F>(self, f: F) -> Self where F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,386 pub fn trace_fn<F>(self, f: F) -> Self
387 where
388 F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
389 {
390 Server {
391 trace_interceptor: Some(Arc::new(f)),
392 ..self
393 }
394 }
395
396 /// Create a router with the `S` typed service as the first service.
397 ///
398 /// This will clone the `Server` builder and create a router that will
399 /// route around different services.
400 #[cfg(feature = "router")]
add_service<S>(&mut self, svc: S) -> Router<L> where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, L: Clone,401 pub fn add_service<S>(&mut self, svc: S) -> Router<L>
402 where
403 S: Service<Request<Body>, Error = Infallible>
404 + NamedService
405 + Clone
406 + Send
407 + Sync
408 + 'static,
409 S::Response: axum::response::IntoResponse,
410 S::Future: Send + 'static,
411 L: Clone,
412 {
413 Router::new(self.clone(), Routes::new(svc))
414 }
415
416 /// Create a router with the optional `S` typed service as the first service.
417 ///
418 /// This will clone the `Server` builder and create a router that will
419 /// route around different services.
420 ///
421 /// # Note
422 /// Even when the argument given is `None` this will capture *all* requests to this service name.
423 /// As a result, one cannot use this to toggle between two identically named implementations.
424 #[cfg(feature = "router")]
add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L> where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static, L: Clone,425 pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
426 where
427 S: Service<Request<Body>, Error = Infallible>
428 + NamedService
429 + Clone
430 + Send
431 + Sync
432 + 'static,
433 S::Response: axum::response::IntoResponse,
434 S::Future: Send + 'static,
435 L: Clone,
436 {
437 let routes = svc.map(Routes::new).unwrap_or_default();
438 Router::new(self.clone(), routes)
439 }
440
441 /// Create a router with given [`Routes`].
442 ///
443 /// This will clone the `Server` builder and create a router that will
444 /// route around different services that were already added to the provided `routes`.
445 #[cfg(feature = "router")]
add_routes(&mut self, routes: Routes) -> Router<L> where L: Clone,446 pub fn add_routes(&mut self, routes: Routes) -> Router<L>
447 where
448 L: Clone,
449 {
450 Router::new(self.clone(), routes)
451 }
452
453 /// Set the [Tower] [`Layer`] all services will be wrapped in.
454 ///
455 /// This enables using middleware from the [Tower ecosystem][eco].
456 ///
457 /// # Example
458 ///
459 /// ```
460 /// # use tonic::transport::Server;
461 /// # use tower_service::Service;
462 /// use tower::timeout::TimeoutLayer;
463 /// use std::time::Duration;
464 ///
465 /// # let mut builder = Server::builder();
466 /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
467 /// ```
468 ///
469 /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
470 /// here as an example.
471 ///
472 /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
473 /// [interceptors]:
474 ///
475 /// ```
476 /// # use tonic::transport::Server;
477 /// # use tower_service::Service;
478 /// use tower::ServiceBuilder;
479 /// use std::time::Duration;
480 /// use tonic::{Request, Status, service::InterceptorLayer};
481 ///
482 /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
483 /// if valid_credentials(&request) {
484 /// Ok(request)
485 /// } else {
486 /// Err(Status::unauthenticated("invalid credentials"))
487 /// }
488 /// }
489 ///
490 /// fn valid_credentials(request: &Request<()>) -> bool {
491 /// // ...
492 /// # true
493 /// }
494 ///
495 /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
496 /// Ok(request)
497 /// }
498 ///
499 /// let layer = ServiceBuilder::new()
500 /// .load_shed()
501 /// .timeout(Duration::from_secs(30))
502 /// .layer(InterceptorLayer::new(auth_interceptor))
503 /// .layer(InterceptorLayer::new(some_other_interceptor))
504 /// .into_inner();
505 ///
506 /// Server::builder().layer(layer);
507 /// ```
508 ///
509 /// [Tower]: https://github.com/tower-rs/tower
510 /// [`Layer`]: tower::layer::Layer
511 /// [eco]: https://github.com/tower-rs
512 /// [`ServiceBuilder`]: tower::ServiceBuilder
513 /// [interceptors]: crate::service::Interceptor
layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>>514 pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
515 Server {
516 service_builder: self.service_builder.layer(new_layer),
517 trace_interceptor: self.trace_interceptor,
518 concurrency_limit: self.concurrency_limit,
519 timeout: self.timeout,
520 #[cfg(feature = "_tls-any")]
521 tls: self.tls,
522 init_stream_window_size: self.init_stream_window_size,
523 init_connection_window_size: self.init_connection_window_size,
524 max_concurrent_streams: self.max_concurrent_streams,
525 tcp_keepalive: self.tcp_keepalive,
526 tcp_nodelay: self.tcp_nodelay,
527 http2_keepalive_interval: self.http2_keepalive_interval,
528 http2_keepalive_timeout: self.http2_keepalive_timeout,
529 http2_adaptive_window: self.http2_adaptive_window,
530 http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
531 http2_max_header_list_size: self.http2_max_header_list_size,
532 max_frame_size: self.max_frame_size,
533 accept_http1: self.accept_http1,
534 max_connection_age: self.max_connection_age,
535 }
536 }
537
bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error>538 fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
539 Ok(TcpIncoming::bind(addr)
540 .map_err(super::Error::from_source)?
541 .with_nodelay(Some(self.tcp_nodelay))
542 .with_keepalive(self.tcp_keepalive))
543 }
544
545 /// Serve the service.
serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,546 pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
547 where
548 L: Layer<S>,
549 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
550 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
551 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
552 Into<crate::BoxError> + Send + 'static,
553 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
554 ResBody::Error: Into<crate::BoxError>,
555 {
556 let incoming = self.bind_incoming(addr)?;
557 self.serve_with_incoming(svc, incoming).await
558 }
559
560 /// Serve the service with the shutdown signal.
serve_with_shutdown<S, F, ResBody>( self, addr: SocketAddr, svc: S, signal: F, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,561 pub async fn serve_with_shutdown<S, F, ResBody>(
562 self,
563 addr: SocketAddr,
564 svc: S,
565 signal: F,
566 ) -> Result<(), super::Error>
567 where
568 L: Layer<S>,
569 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
570 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
571 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
572 Into<crate::BoxError> + Send + 'static,
573 F: Future<Output = ()>,
574 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
575 ResBody::Error: Into<crate::BoxError>,
576 {
577 let incoming = self.bind_incoming(addr)?;
578 self.serve_with_incoming_shutdown(svc, incoming, signal)
579 .await
580 }
581
582 /// Serve the service on the provided incoming stream.
serve_with_incoming<S, I, IO, IE, ResBody>( self, svc: S, incoming: I, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,583 pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
584 self,
585 svc: S,
586 incoming: I,
587 ) -> Result<(), super::Error>
588 where
589 L: Layer<S>,
590 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
591 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
592 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
593 Into<crate::BoxError> + Send + 'static,
594 I: Stream<Item = Result<IO, IE>>,
595 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
596 IE: Into<crate::BoxError>,
597 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
598 ResBody::Error: Into<crate::BoxError>,
599 {
600 self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
601 .await
602 }
603
604 /// Serve the service with the signal on the provided incoming stream.
serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: F, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,605 pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
606 self,
607 svc: S,
608 incoming: I,
609 signal: F,
610 ) -> Result<(), super::Error>
611 where
612 L: Layer<S>,
613 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
614 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
615 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
616 Into<crate::BoxError> + Send + 'static,
617 I: Stream<Item = Result<IO, IE>>,
618 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
619 IE: Into<crate::BoxError>,
620 F: Future<Output = ()>,
621 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
622 ResBody::Error: Into<crate::BoxError>,
623 {
624 self.serve_internal(svc, incoming, Some(signal)).await
625 }
626
serve_internal<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: Option<F>, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send + 'static, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,627 async fn serve_internal<S, I, F, IO, IE, ResBody>(
628 self,
629 svc: S,
630 incoming: I,
631 signal: Option<F>,
632 ) -> Result<(), super::Error>
633 where
634 L: Layer<S>,
635 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
636 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
637 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
638 Into<crate::BoxError> + Send + 'static,
639 I: Stream<Item = Result<IO, IE>>,
640 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
641 IE: Into<crate::BoxError>,
642 F: Future<Output = ()>,
643 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
644 ResBody::Error: Into<crate::BoxError>,
645 {
646 let trace_interceptor = self.trace_interceptor.clone();
647 let concurrency_limit = self.concurrency_limit;
648 let init_connection_window_size = self.init_connection_window_size;
649 let init_stream_window_size = self.init_stream_window_size;
650 let max_concurrent_streams = self.max_concurrent_streams;
651 let timeout = self.timeout;
652 let max_header_list_size = self.http2_max_header_list_size;
653 let max_frame_size = self.max_frame_size;
654 let http2_only = !self.accept_http1;
655
656 let http2_keepalive_interval = self.http2_keepalive_interval;
657 let http2_keepalive_timeout = self
658 .http2_keepalive_timeout
659 .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
660 let http2_adaptive_window = self.http2_adaptive_window;
661 let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
662 let max_connection_age = self.max_connection_age;
663
664 let svc = self.service_builder.service(svc);
665
666 let incoming = io_stream::ServerIoStream::new(
667 incoming,
668 #[cfg(feature = "_tls-any")]
669 self.tls,
670 );
671 let mut svc = MakeSvc {
672 inner: svc,
673 concurrency_limit,
674 timeout,
675 trace_interceptor,
676 _io: PhantomData,
677 };
678
679 let server = {
680 let mut builder = ConnectionBuilder::new(TokioExecutor::new());
681
682 if http2_only {
683 builder = builder.http2_only();
684 }
685
686 builder
687 .http2()
688 .timer(TokioTimer::new())
689 .initial_connection_window_size(init_connection_window_size)
690 .initial_stream_window_size(init_stream_window_size)
691 .max_concurrent_streams(max_concurrent_streams)
692 .keep_alive_interval(http2_keepalive_interval)
693 .keep_alive_timeout(http2_keepalive_timeout)
694 .adaptive_window(http2_adaptive_window.unwrap_or_default())
695 .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
696 .max_frame_size(max_frame_size);
697
698 if let Some(max_header_list_size) = max_header_list_size {
699 builder.http2().max_header_list_size(max_header_list_size);
700 }
701
702 builder
703 };
704
705 let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
706 let signal_tx = Arc::new(signal_tx);
707
708 let graceful = signal.is_some();
709 let mut sig = pin!(Fuse { inner: signal });
710 let mut incoming = pin!(incoming);
711
712 loop {
713 tokio::select! {
714 _ = &mut sig => {
715 trace!("signal received, shutting down");
716 break;
717 },
718 io = incoming.next() => {
719 let io = match io {
720 Some(Ok(io)) => io,
721 Some(Err(e)) => {
722 trace!("error accepting connection: {:#}", e);
723 continue;
724 },
725 None => {
726 break
727 },
728 };
729
730 trace!("connection accepted");
731
732 poll_fn(|cx| svc.poll_ready(cx))
733 .await
734 .map_err(super::Error::from_source)?;
735
736 let req_svc = svc
737 .call(&io)
738 .await
739 .map_err(super::Error::from_source)?;
740
741 let hyper_io = TokioIo::new(io);
742 let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(Body::new)));
743
744 serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age);
745 }
746 }
747 }
748
749 if graceful {
750 let _ = signal_tx.send(());
751 drop(signal_rx);
752 trace!(
753 "waiting for {} connections to close",
754 signal_tx.receiver_count()
755 );
756
757 // Wait for all connections to close
758 signal_tx.closed().await;
759 }
760
761 Ok(())
762 }
763 }
764
765 // This is moved to its own function as a way to get around
766 // https://github.com/rust-lang/rust/issues/102211
serve_connection<B, IO, S, E>( hyper_io: IO, hyper_svc: S, builder: ConnectionBuilder<E>, mut watcher: Option<tokio::sync::watch::Receiver<()>>, max_connection_age: Option<Duration>, ) where B: http_body::Body + Send + 'static, B::Data: Send, B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync, IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send, E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,767 fn serve_connection<B, IO, S, E>(
768 hyper_io: IO,
769 hyper_svc: S,
770 builder: ConnectionBuilder<E>,
771 mut watcher: Option<tokio::sync::watch::Receiver<()>>,
772 max_connection_age: Option<Duration>,
773 ) where
774 B: http_body::Body + Send + 'static,
775 B::Data: Send,
776 B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
777 IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
778 S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
779 S::Future: Send + 'static,
780 S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
781 E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
782 {
783 tokio::spawn(async move {
784 {
785 let mut sig = pin!(Fuse {
786 inner: watcher.as_mut().map(|w| w.changed()),
787 });
788
789 let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
790
791 let sleep = sleep_or_pending(max_connection_age);
792 tokio::pin!(sleep);
793
794 loop {
795 tokio::select! {
796 rv = &mut conn => {
797 if let Err(err) = rv {
798 debug!("failed serving connection: {:#}", err);
799 }
800 break;
801 },
802 _ = &mut sleep => {
803 conn.as_mut().graceful_shutdown();
804 sleep.set(sleep_or_pending(None));
805 },
806 _ = &mut sig => {
807 conn.as_mut().graceful_shutdown();
808 }
809 }
810 }
811 }
812
813 drop(watcher);
814 trace!("connection closed");
815 });
816 }
817
sleep_or_pending(wait_for: Option<Duration>)818 async fn sleep_or_pending(wait_for: Option<Duration>) {
819 match wait_for {
820 Some(wait) => sleep(wait).await,
821 None => pending().await,
822 };
823 }
824
825 #[cfg(feature = "router")]
826 impl<L> Router<L> {
new(server: Server<L>, routes: Routes) -> Self827 pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
828 Self { server, routes }
829 }
830 }
831
832 #[cfg(feature = "router")]
833 impl<L> Router<L> {
834 /// Add a new service to this router.
add_service<S>(mut self, svc: S) -> Self where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static,835 pub fn add_service<S>(mut self, svc: S) -> Self
836 where
837 S: Service<Request<Body>, Error = Infallible>
838 + NamedService
839 + Clone
840 + Send
841 + Sync
842 + 'static,
843 S::Response: axum::response::IntoResponse,
844 S::Future: Send + 'static,
845 {
846 self.routes = self.routes.add_service(svc);
847 self
848 }
849
850 /// Add a new optional service to this router.
851 ///
852 /// # Note
853 /// Even when the argument given is `None` this will capture *all* requests to this service name.
854 /// As a result, one cannot use this to toggle between two identically named implementations.
add_optional_service<S>(mut self, svc: Option<S>) -> Self where S: Service<Request<Body>, Error = Infallible> + NamedService + Clone + Send + Sync + 'static, S::Response: axum::response::IntoResponse, S::Future: Send + 'static,855 pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
856 where
857 S: Service<Request<Body>, Error = Infallible>
858 + NamedService
859 + Clone
860 + Send
861 + Sync
862 + 'static,
863 S::Response: axum::response::IntoResponse,
864 S::Future: Send + 'static,
865 {
866 if let Some(svc) = svc {
867 self.routes = self.routes.add_service(svc);
868 }
869 self
870 }
871
872 /// Consume this [`Server`] creating a future that will execute the server
873 /// on [tokio]'s default executor.
874 ///
875 /// [`Server`]: struct.Server.html
876 /// [tokio]: https://docs.rs/tokio
serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer<Routes> + Clone, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,877 pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
878 where
879 L: Layer<Routes> + Clone,
880 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
881 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
882 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
883 Into<crate::BoxError> + Send,
884 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
885 ResBody::Error: Into<crate::BoxError>,
886 {
887 self.server.serve(addr, self.routes.prepare()).await
888 }
889
890 /// Consume this [`Server`] creating a future that will execute the server
891 /// on [tokio]'s default executor. And shutdown when the provided signal
892 /// is received.
893 ///
894 /// [`Server`]: struct.Server.html
895 /// [tokio]: https://docs.rs/tokio
serve_with_shutdown<F: Future<Output = ()>, ResBody>( self, addr: SocketAddr, signal: F, ) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,896 pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
897 self,
898 addr: SocketAddr,
899 signal: F,
900 ) -> Result<(), super::Error>
901 where
902 L: Layer<Routes>,
903 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
904 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
905 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
906 Into<crate::BoxError> + Send,
907 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
908 ResBody::Error: Into<crate::BoxError>,
909 {
910 self.server
911 .serve_with_shutdown(addr, self.routes.prepare(), signal)
912 .await
913 }
914
915 /// Consume this [`Server`] creating a future that will execute the server
916 /// on the provided incoming stream of `AsyncRead + AsyncWrite`.
917 ///
918 /// This method discards any provided [`Server`] TCP configuration.
919 ///
920 /// [`Server`]: struct.Server.html
serve_with_incoming<I, IO, IE, ResBody>( self, incoming: I, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,921 pub async fn serve_with_incoming<I, IO, IE, ResBody>(
922 self,
923 incoming: I,
924 ) -> Result<(), super::Error>
925 where
926 I: Stream<Item = Result<IO, IE>>,
927 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
928 IE: Into<crate::BoxError>,
929 L: Layer<Routes>,
930
931 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
932 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
933 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
934 Into<crate::BoxError> + Send,
935 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
936 ResBody::Error: Into<crate::BoxError>,
937 {
938 self.server
939 .serve_with_incoming(self.routes.prepare(), incoming)
940 .await
941 }
942
943 /// Consume this [`Server`] creating a future that will execute the server
944 /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
945 /// `serve_with_shutdown` this method will also take a signal future to
946 /// gracefully shutdown the server.
947 ///
948 /// This method discards any provided [`Server`] TCP configuration.
949 ///
950 /// [`Server`]: struct.Server.html
serve_with_incoming_shutdown<I, IO, IE, F, ResBody>( self, incoming: I, signal: F, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IE: Into<crate::BoxError>, F: Future<Output = ()>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::BoxError> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::BoxError>,951 pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
952 self,
953 incoming: I,
954 signal: F,
955 ) -> Result<(), super::Error>
956 where
957 I: Stream<Item = Result<IO, IE>>,
958 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
959 IE: Into<crate::BoxError>,
960 F: Future<Output = ()>,
961 L: Layer<Routes>,
962 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
963 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
964 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
965 Into<crate::BoxError> + Send,
966 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
967 ResBody::Error: Into<crate::BoxError>,
968 {
969 self.server
970 .serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
971 .await
972 }
973 }
974
975 impl<L> fmt::Debug for Server<L> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result976 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977 f.debug_struct("Builder").finish()
978 }
979 }
980
981 #[derive(Clone)]
982 struct Svc<S> {
983 inner: S,
984 trace_interceptor: Option<TraceInterceptor>,
985 }
986
987 impl<S, ResBody> Service<Request<Body>> for Svc<S>
988 where
989 S: Service<Request<Body>, Response = Response<ResBody>>,
990 S::Error: Into<crate::BoxError>,
991 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
992 ResBody::Error: Into<crate::BoxError>,
993 {
994 type Response = Response<Body>;
995 type Error = crate::BoxError;
996 type Future = SvcFuture<S::Future>;
997
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>998 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
999 self.inner.poll_ready(cx).map_err(Into::into)
1000 }
1001
call(&mut self, mut req: Request<Body>) -> Self::Future1002 fn call(&mut self, mut req: Request<Body>) -> Self::Future {
1003 let span = if let Some(trace_interceptor) = &self.trace_interceptor {
1004 let (parts, body) = req.into_parts();
1005 let bodyless_request = Request::from_parts(parts, ());
1006
1007 let span = trace_interceptor(&bodyless_request);
1008
1009 let (parts, _) = bodyless_request.into_parts();
1010 req = Request::from_parts(parts, body);
1011
1012 span
1013 } else {
1014 tracing::Span::none()
1015 };
1016
1017 SvcFuture {
1018 inner: self.inner.call(req),
1019 span,
1020 }
1021 }
1022 }
1023
1024 #[pin_project]
1025 struct SvcFuture<F> {
1026 #[pin]
1027 inner: F,
1028 span: tracing::Span,
1029 }
1030
1031 impl<F, E, ResBody> Future for SvcFuture<F>
1032 where
1033 F: Future<Output = Result<Response<ResBody>, E>>,
1034 E: Into<crate::BoxError>,
1035 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1036 ResBody::Error: Into<crate::BoxError>,
1037 {
1038 type Output = Result<Response<Body>, crate::BoxError>;
1039
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1040 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1041 let this = self.project();
1042 let _guard = this.span.enter();
1043
1044 let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
1045 let response = response.map(|body| Body::new(body.map_err(Into::into)));
1046 Poll::Ready(Ok(response))
1047 }
1048 }
1049
1050 impl<S> fmt::Debug for Svc<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1051 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1052 f.debug_struct("Svc").finish()
1053 }
1054 }
1055
1056 #[derive(Clone)]
1057 struct MakeSvc<S, IO> {
1058 concurrency_limit: Option<usize>,
1059 timeout: Option<Duration>,
1060 inner: S,
1061 trace_interceptor: Option<TraceInterceptor>,
1062 _io: PhantomData<fn() -> IO>,
1063 }
1064
1065 impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
1066 where
1067 IO: Connected + 'static,
1068 S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1069 S::Future: Send,
1070 S::Error: Into<crate::BoxError> + Send,
1071 ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1072 ResBody::Error: Into<crate::BoxError>,
1073 {
1074 type Response = BoxService;
1075 type Error = crate::BoxError;
1076 type Future = future::Ready<Result<Self::Response, Self::Error>>;
1077
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>1078 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1079 Ok(()).into()
1080 }
1081
call(&mut self, io: &ServerIo<IO>) -> Self::Future1082 fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
1083 let conn_info = io.connect_info();
1084
1085 let svc = self.inner.clone();
1086 let concurrency_limit = self.concurrency_limit;
1087 let timeout = self.timeout;
1088 let trace_interceptor = self.trace_interceptor.clone();
1089
1090 let svc = ServiceBuilder::new()
1091 .layer(RecoverErrorLayer::new())
1092 .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
1093 .layer_fn(|s| GrpcTimeout::new(s, timeout))
1094 .service(svc);
1095
1096 let svc = ServiceBuilder::new()
1097 .layer(BoxCloneService::layer())
1098 .layer(ConnectInfoLayer::new(conn_info.clone()))
1099 .service(Svc {
1100 inner: svc,
1101 trace_interceptor,
1102 });
1103
1104 future::ready(Ok(svc))
1105 }
1106 }
1107
1108 // From `futures-util` crate, borrowed since this is the only dependency tonic requires.
1109 // LICENSE: MIT or Apache-2.0
1110 // A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
1111 #[pin_project]
1112 struct Fuse<F> {
1113 #[pin]
1114 inner: Option<F>,
1115 }
1116
1117 impl<F> Future for Fuse<F>
1118 where
1119 F: Future,
1120 {
1121 type Output = F::Output;
1122
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>1123 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1124 match self.as_mut().project().inner.as_pin_mut() {
1125 Some(fut) => fut.poll(cx).map(|output| {
1126 self.project().inner.set(None);
1127 output
1128 }),
1129 None => Poll::Pending,
1130 }
1131 }
1132 }
1133