1 //! Provides utilities useful for dispatching incoming HTTP requests
2 //! `wasi:http/handler` guest instances.
3 
4 #[cfg(feature = "p3")]
5 use crate::p3;
6 use futures::stream::{FuturesUnordered, StreamExt};
7 use std::collections::VecDeque;
8 use std::collections::btree_map::{BTreeMap, Entry};
9 use std::future;
10 use std::pin::{Pin, pin};
11 use std::sync::{
12     Arc, Mutex,
13     atomic::{
14         AtomicBool, AtomicU64, AtomicUsize,
15         Ordering::{Relaxed, SeqCst},
16     },
17 };
18 use std::task::Poll;
19 use std::time::{Duration, Instant};
20 use tokio::sync::Notify;
21 use wasmtime::AsContextMut;
22 use wasmtime::component::Accessor;
23 use wasmtime::{Result, Store, StoreContextMut, format_err};
24 
25 /// Alternative p2 bindings generated with `exports: { default: async | store }`
26 /// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
27 pub mod p2 {
28     #[expect(missing_docs, reason = "bindgen-generated code")]
29     pub mod bindings {
30         wasmtime::component::bindgen!({
31             path: "wit",
32             world: "wasi:http/proxy",
33             imports: { default: tracing },
34             exports: { default: async | store },
35             require_store_data_send: true,
36             with: {
37                 // http is in this crate
38                 "wasi:http": crate::bindings::http,
39                 // Upstream package dependencies
40                 "wasi:io": wasmtime_wasi::p2::bindings::io,
41             }
42         });
43 
44         pub use wasi::*;
45     }
46 }
47 
48 /// Represents either a `wasi:http/incoming-handler@0.2.x` or
49 /// `wasi:http/handler@0.3.x` pre-instance.
50 pub enum ProxyPre<T: 'static> {
51     /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
52     P2(p2::bindings::ProxyPre<T>),
53     /// A `wasi:http/handler@0.3.x` pre-instance.
54     #[cfg(feature = "p3")]
55     P3(p3::bindings::ServicePre<T>),
56 }
57 
58 impl<T: 'static> ProxyPre<T> {
59     async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
60     where
61         T: Send,
62     {
63         Ok(match self {
64             Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
65             #[cfg(feature = "p3")]
66             Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
67         })
68     }
69 }
70 
71 /// Represents either a `wasi:http/incoming-handler@0.2.x` or
72 /// `wasi:http/handler@0.3.x` instance.
73 pub enum Proxy {
74     /// A `wasi:http/incoming-handler@0.2.x` instance.
75     P2(p2::bindings::Proxy),
76     /// A `wasi:http/handler@0.3.x` instance.
77     #[cfg(feature = "p3")]
78     P3(p3::bindings::Service),
79 }
80 
81 /// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
82 /// `wasi:http/handler@0.3.x` instance.
83 pub type TaskFn<T> = Box<
84     dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
85         + Send,
86 >;
87 
88 /// Async MPMC channel where each item is delivered to at most one consumer.
89 struct Queue<T> {
90     queue: Mutex<VecDeque<T>>,
91     notify: Notify,
92 }
93 
94 impl<T> Default for Queue<T> {
95     fn default() -> Self {
96         Self {
97             queue: Default::default(),
98             notify: Default::default(),
99         }
100     }
101 }
102 
103 impl<T> Queue<T> {
104     fn is_empty(&self) -> bool {
105         self.queue.lock().unwrap().is_empty()
106     }
107 
108     fn push(&self, item: T) {
109         self.queue.lock().unwrap().push_back(item);
110         self.notify.notify_one();
111     }
112 
113     fn try_pop(&self) -> Option<T> {
114         self.queue.lock().unwrap().pop_front()
115     }
116 
117     async fn pop(&self) -> T {
118         // This code comes from the Unbound MPMC Channel example in [the
119         // `tokio::sync::Notify`
120         // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
121 
122         let mut notified = pin!(self.notify.notified());
123 
124         loop {
125             notified.as_mut().enable();
126             if let Some(item) = self.try_pop() {
127                 return item;
128             }
129             notified.as_mut().await;
130             notified.set(self.notify.notified());
131         }
132     }
133 }
134 
135 /// Bundles a [`Store`] with a callback to write a profile (if configured).
136 pub struct StoreBundle<T: 'static> {
137     /// The [`Store`] to use to handle requests.
138     pub store: Store<T>,
139     /// Callback to write a profile (if enabled) once all requests have been
140     /// handled.
141     pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
142 }
143 
144 /// Represents the application-specific state of a web server.
145 pub trait HandlerState: 'static + Sync + Send {
146     /// The type of the associated data for [`Store`]s created using
147     /// [`Self::new_store`].
148     type StoreData: Send;
149 
150     /// Create a new [`Store`] for handling one or more requests.
151     ///
152     /// The `req_id` parameter is the value passed in the call to
153     /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
154     /// will belong.  See that function's documentation for details.
155     fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
156 
157     /// Maximum time allowed to handle a request.
158     ///
159     /// In practice, a guest may be allowed to run up to 2x this time in the
160     /// case of instance reuse to avoid penalizing concurrent requests being
161     /// handled by the same instance.
162     fn request_timeout(&self) -> Duration;
163 
164     /// Maximum time to keep an idle instance around before dropping it.
165     fn idle_instance_timeout(&self) -> Duration;
166 
167     /// Maximum number of requests to handle using a single instance before
168     /// dropping it.
169     fn max_instance_reuse_count(&self) -> usize;
170 
171     /// Maximum number of requests to handle concurrently using a single
172     /// instance.
173     fn max_instance_concurrent_reuse_count(&self) -> usize;
174 
175     /// Called when a worker exits with an error.
176     fn handle_worker_error(&self, error: wasmtime::Error);
177 }
178 
179 struct ProxyHandlerInner<S: HandlerState> {
180     state: S,
181     instance_pre: ProxyPre<S::StoreData>,
182     next_id: AtomicU64,
183     task_queue: Queue<TaskFn<S::StoreData>>,
184     worker_count: AtomicUsize,
185 }
186 
187 /// Helper utility to track the start times of tasks accepted by a worker.
188 ///
189 /// This is used to ensure that timeouts are enforced even when the
190 /// `StoreContextMut::run_concurrent` event loop is unable to make progress due
191 /// to the guest either busy looping or being blocked on a synchronous call to a
192 /// host function which has exclusive access to the `Store`.
193 #[derive(Default)]
194 struct StartTimes(BTreeMap<Instant, usize>);
195 
196 impl StartTimes {
197     fn add(&mut self, time: Instant) {
198         *self.0.entry(time).or_insert(0) += 1;
199     }
200 
201     fn remove(&mut self, time: Instant) {
202         let Entry::Occupied(mut entry) = self.0.entry(time) else {
203             unreachable!()
204         };
205         match *entry.get() {
206             0 => unreachable!(),
207             1 => {
208                 entry.remove();
209             }
210             _ => {
211                 *entry.get_mut() -= 1;
212             }
213         }
214     }
215 
216     fn earliest(&self) -> Option<Instant> {
217         self.0.first_key_value().map(|(&k, _)| k)
218     }
219 }
220 
221 struct Worker<S>
222 where
223     S: HandlerState,
224 {
225     handler: ProxyHandler<S>,
226     available: bool,
227 }
228 
229 impl<S> Worker<S>
230 where
231     S: HandlerState,
232 {
233     fn set_available(&mut self, available: bool) {
234         if available != self.available {
235             self.available = available;
236             if available {
237                 self.handler.0.worker_count.fetch_add(1, Relaxed);
238             } else {
239                 // Here we use `SeqCst` to ensure the load/store is ordered
240                 // correctly with respect to the `Queue::is_empty` check we do
241                 // below.
242                 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
243                 // This addresses what would otherwise be a race condition in
244                 // `ProxyHandler::spawn` where it only starts a worker if the
245                 // available worker count is zero.  If we decrement the count to
246                 // zero right after `ProxyHandler::spawn` checks it, then no
247                 // worker will be started; thus it becomes our responsibility to
248                 // start a worker here instead.
249                 if count == 1 && !self.handler.0.task_queue.is_empty() {
250                     self.handler.start_worker(None, None);
251                 }
252             }
253         }
254     }
255 
256     async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
257         if let Err(error) = self.run_(task, req_id).await {
258             self.handler.0.state.handle_worker_error(error);
259         }
260     }
261 
262     async fn run_(
263         &mut self,
264         task: Option<TaskFn<S::StoreData>>,
265         req_id: Option<u64>,
266     ) -> Result<()> {
267         // NB: The code the follows is rather subtle in that it is structured
268         // carefully to provide a few key invariants related to how instance
269         // reuse and request timeouts interact:
270         //
271         // - A task must never be allowed to run for more than 2x the request
272         // timeout, if any.
273         //
274         // - Every task we accept here must be allowed to run for at least 1x
275         // the request timeout, if any.
276         //
277         // - When more than one task is run concurrently in the same instance,
278         // we must stop accepting new tasks as soon as any existing task reaches
279         // the request timeout.  This serves to cap the amount of time we need
280         // to keep the instance alive before _all_ tasks have either completed
281         // or timed out.
282         //
283         // As of this writing, there's an additional wrinkle that makes
284         // guaranteeing those invariants particularly tricky: per #11869 and
285         // #11870, busy guest loops, epoch interruption, and host functions
286         // registered using `Linker::func_{wrap,new}_async` all require
287         // blocking, exclusive access to the `Store`, which effectively prevents
288         // the `StoreContextMut::run_concurrent` event loop from making
289         // progress.  That, in turn, prevents any concurrent tasks from
290         // executing, and also prevents the `AsyncFnOnce` passed to
291         // `run_concurrent` from being polled.  Consequently, we must rely on a
292         // "second line of defense" to ensure tasks are timed out promptly,
293         // which is to check for timeouts _outside_ the `run_concurrent` future.
294         // Once the aforementioned issues have been addressed, we'll be able to
295         // remove that check and its associated baggage.
296 
297         let handler = &self.handler.0;
298 
299         let StoreBundle {
300             mut store,
301             write_profile,
302         } = handler.state.new_store(req_id)?;
303 
304         let request_timeout = handler.state.request_timeout();
305         let idle_instance_timeout = handler.state.idle_instance_timeout();
306         let max_instance_reuse_count = handler.state.max_instance_reuse_count();
307         let max_instance_concurrent_reuse_count =
308             handler.state.max_instance_concurrent_reuse_count();
309 
310         let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
311         let accept_concurrent = AtomicBool::new(true);
312         let task_start_times = Mutex::new(StartTimes::default());
313 
314         let mut future = pin!(store.run_concurrent(async |accessor| {
315             let mut reuse_count = 0;
316             let mut timed_out = false;
317             let mut futures = FuturesUnordered::new();
318 
319             let accept_task = |task: TaskFn<S::StoreData>,
320                                futures: &mut FuturesUnordered<_>,
321                                reuse_count: &mut usize| {
322                 // Set `accept_concurrent` to false, conservatively assuming
323                 // that the new task will be CPU-bound, at least to begin with.
324                 // Only once the `StoreContextMut::run_concurrent` event loop
325                 // returns `Pending` will we set `accept_concurrent` back to
326                 // true and consider accepting more tasks.
327                 //
328                 // This approach avoids taking on more than one CPU-bound task
329                 // at a time, which would hurt throughput vs. leaving the
330                 // additional tasks for other workers to handle.
331                 accept_concurrent.store(false, Relaxed);
332                 *reuse_count += 1;
333 
334                 let start_time = Instant::now().checked_add(request_timeout);
335                 if let Some(start_time) = start_time {
336                     task_start_times.lock().unwrap().add(start_time);
337                 }
338 
339                 futures.push(tokio::time::timeout(request_timeout, async move {
340                     (task)(accessor, proxy).await;
341                     start_time
342                 }));
343             };
344 
345             if let Some(task) = task {
346                 accept_task(task, &mut futures, &mut reuse_count);
347             }
348 
349             let handler = self.handler.clone();
350             while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
351                 let new_task = {
352                     let future_count = futures.len();
353                     let mut next_future = pin!(async {
354                         if futures.is_empty() {
355                             future::pending().await
356                         } else {
357                             futures.next().await.unwrap()
358                         }
359                     });
360                     let mut next_task = pin!(tokio::time::timeout(
361                         if future_count == 0 {
362                             idle_instance_timeout
363                         } else {
364                             Duration::MAX
365                         },
366                         handler.0.task_queue.pop()
367                     ));
368                     // Poll any existing tasks, and if they're all `Pending`
369                     // _and_ we haven't reached any reuse limits yet, poll for a
370                     // new task from the queue.
371                     //
372                     // Note the the order of operations here is important.  By
373                     // polling `next_future` first, we'll disover any tasks that
374                     // may have timed out, at which point we'll stop accepting
375                     // new tasks altogether (see below for details).  This is
376                     // especially imporant in the case where the task was
377                     // blocked on a synchronous call to a host function which
378                     // has exclusive access to the `Store`; once that call
379                     // finishes, the first think we need to do is time out the
380                     // task.  If we were to poll for a new task first, then we'd
381                     // have to wait for _that_ task to finish or time out before
382                     // we could kill the instance.
383                     future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
384                         Poll::Pending => {
385                             // Note that `Pending` here doesn't necessarily mean
386                             // all tasks are blocked on I/O.  They might simply
387                             // be waiting for some deferred work to be done by
388                             // the next turn of the
389                             // `StoreContextMut::run_concurrent` event loop.
390                             // Therefore, we check `accept_concurrent` here and
391                             // only advertise we have capacity for another task
392                             // if either we have no tasks at all or all our
393                             // tasks really are blocked on I/O.
394                             self.set_available(
395                                 reuse_count < max_instance_reuse_count
396                                     && future_count < max_instance_concurrent_reuse_count
397                                     && (future_count == 0 || accept_concurrent.load(Relaxed)),
398                             );
399 
400                             if self.available {
401                                 next_task.as_mut().poll(cx).map(Some)
402                             } else {
403                                 Poll::Pending
404                             }
405                         }
406                         Poll::Ready(Ok(start_time)) => {
407                             // Task completed; carry on!
408                             if let Some(start_time) = start_time {
409                                 task_start_times.lock().unwrap().remove(start_time);
410                             }
411                             Poll::Ready(None)
412                         }
413                         Poll::Ready(Err(_)) => {
414                             // Task timed out; stop accepting new tasks, but
415                             // continue polling until any other, in-progress
416                             // tasks until they have either finished or timed
417                             // out.  This effectively kicks off a "graceful
418                             // shutdown" of the worker, allowing any other
419                             // concurrent tasks time to finish before we drop
420                             // the instance.
421                             //
422                             // TODO: We should also send a cancel request to the
423                             // timed-out task to give it a chance to shut down
424                             // gracefully (and delay dropping the instance for a
425                             // reasonable amount of time), but as of this
426                             // writing Wasmtime does not yet provide an API for
427                             // doing that.  See issue #11833.
428                             timed_out = true;
429                             reuse_count = max_instance_reuse_count;
430                             Poll::Ready(None)
431                         }
432                     })
433                     .await
434                 };
435 
436                 match new_task {
437                     Some(Ok(task)) => {
438                         accept_task(task, &mut futures, &mut reuse_count);
439                     }
440                     Some(Err(_)) => break,
441                     None => {}
442                 }
443             }
444 
445             accessor.with(|mut access| write_profile(access.as_context_mut()));
446 
447             if timed_out {
448                 Err(format_err!("guest timed out"))
449             } else {
450                 wasmtime::error::Ok(())
451             }
452         }));
453 
454         let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
455 
456         future::poll_fn(|cx| {
457             let poll = future.as_mut().poll(cx);
458             if poll.is_pending() {
459                 // If the future returns `Pending`, that's either because it's
460                 // idle (in which case it can definitely accept a new task) or
461                 // because all its tasks are awaiting I/O, in which case it may
462                 // have capacity for additional tasks to run concurrently.
463                 //
464                 // However, if one of the tasks is blocked on a sync call to a
465                 // host function which has exclusive access to the `Store`, the
466                 // `StoreContextMut::run_concurrent` event loop will be unable
467                 // to make progress until that call finishes.  Similarly, if the
468                 // task loops indefinitely, subject only to epoch interruption,
469                 // the event loop will also be stuck.  Either way, any task
470                 // timeouts created inside the `AsyncFnOnce` we passed to
471                 // `run_concurrent` won't have a chance to trigger.
472                 // Consequently, we need to _also_ enforce timeouts here,
473                 // outside the event loop.
474                 //
475                 // Therefore, we check if the oldest outstanding task has been
476                 // running for at least `request_timeout*2`, which is the
477                 // maximum time needed for any other concurrent tasks to
478                 // complete or time out, at which point we can safely discard
479                 // the instance.  If that deadline has not yet arrived, we
480                 // schedule a wakeup to occur when it does.
481                 //
482                 // We uphold the "never kill an instance with a task which has
483                 // been running for less than the request timeout" invariant
484                 // here by noting that this timeout will only trigger if the
485                 // `AsyncFnOnce` we passed to `run_concurrent` has been unable
486                 // to run for at least the past `request_timeout` amount of
487                 // time, meaning it can't possibly have accepted a task newer
488                 // than that.
489                 if let Some(deadline) = task_start_times
490                     .lock()
491                     .unwrap()
492                     .earliest()
493                     .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
494                 {
495                     sleep.as_mut().reset(deadline.into());
496                     // Note that this will schedule a wakeup for later if the
497                     // deadline has not yet arrived:
498                     if sleep.as_mut().poll(cx).is_ready() {
499                         // Deadline has been reached; kill the instance with an
500                         // error.
501                         return Poll::Ready(Err(format_err!("guest timed out")));
502                     }
503                 }
504 
505                 // Otherwise, if no timeouts have elapsed, we set
506                 // `accept_concurrent` to true and, if it wasn't already true
507                 // before, poll the future one more time so it can ask for
508                 // another task if appropriate.
509                 if !accept_concurrent.swap(true, Relaxed) {
510                     return future.as_mut().poll(cx);
511                 }
512             }
513 
514             poll
515         })
516         .await?
517     }
518 }
519 
520 impl<S> Drop for Worker<S>
521 where
522     S: HandlerState,
523 {
524     fn drop(&mut self) {
525         self.set_available(false);
526     }
527 }
528 
529 /// Represents the state of a web server.
530 ///
531 /// Note that this supports optional instance reuse, enabled when
532 /// `S::max_instance_reuse_count()` returns a number greater than one.  See
533 /// [`Self::spawn`] for details.
534 pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
535 
536 impl<S: HandlerState> Clone for ProxyHandler<S> {
537     fn clone(&self) -> Self {
538         Self(self.0.clone())
539     }
540 }
541 
542 impl<S> ProxyHandler<S>
543 where
544     S: HandlerState,
545 {
546     /// Create a new `ProxyHandler` with the specified application state and
547     /// pre-instance.
548     pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
549         Self(Arc::new(ProxyHandlerInner {
550             state,
551             instance_pre,
552             next_id: AtomicU64::from(0),
553             task_queue: Default::default(),
554             worker_count: AtomicUsize::from(0),
555         }))
556     }
557 
558     /// Push a task to the task queue for this handler.
559     ///
560     /// This will either spawn a new background worker to run the task or
561     /// deliver it to an already-running worker.
562     ///
563     /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
564     /// new worker is started for this task.  It is intended to be used as a
565     /// "request identifier" corresponding to that task and can be used e.g. to
566     /// prefix all logging from the `Store` with that identifier.  Note that a
567     /// non-`None` value only makes sense when `<S as
568     /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
569     /// will not match subsequent tasks handled by the worker.
570     pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
571         match self.0.state.max_instance_reuse_count() {
572             0 => panic!("`max_instance_reuse_count` must be at least 1"),
573             _ => {
574                 if self.0.worker_count.load(Relaxed) == 0 {
575                     // There are no available workers; skip the queue and pass
576                     // the task directly to the worker, which improves
577                     // performance as measured by `wasmtime-server-rps.sh` by
578                     // about 15%.
579                     self.start_worker(Some(task), req_id);
580                 } else {
581                     self.0.task_queue.push(task);
582                     // Start a new worker to handle the task if the last worker
583                     // just went unavailable.  See also `Worker::set_available`
584                     // for what happens if the available worker count goes to
585                     // zero right after we check it here, and note that we only
586                     // check the count _after_ we've pushed the task to the
587                     // queue.  We use `SeqCst` here to ensure that we get an
588                     // updated view of `worker_count` as it exists after the
589                     // `Queue::push` above.
590                     //
591                     // The upshot is that at least one (or more) of the
592                     // following will happen:
593                     //
594                     // - An existing worker will accept the task
595                     // - We'll start a new worker here to accept the task
596                     // - `Worker::set_available` will start a new worker to accept the task
597                     //
598                     // I.e. it should not be possible for the task to be
599                     // orphaned indefinitely in the queue without being
600                     // accepted.
601                     if self.0.worker_count.load(SeqCst) == 0 {
602                         self.start_worker(None, None);
603                     }
604                 }
605             }
606         }
607     }
608 
609     /// Generate a unique request ID.
610     pub fn next_req_id(&self) -> u64 {
611         self.0.next_id.fetch_add(1, Relaxed)
612     }
613 
614     /// Return a reference to the application state.
615     pub fn state(&self) -> &S {
616         &self.0.state
617     }
618 
619     /// Return a reference to the pre-instance.
620     pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
621         &self.0.instance_pre
622     }
623 
624     fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
625         tokio::spawn(
626             Worker {
627                 handler: self.clone(),
628                 available: false,
629             }
630             .run(task, req_id),
631         );
632     }
633 }
634