1 //! Provides utilities useful for dispatching incoming HTTP requests
2 //! `wasi:http/handler` guest instances.
3 
4 #[cfg(feature = "p3")]
5 use crate::p3;
6 use anyhow::{Result, anyhow};
7 use futures::stream::{FuturesUnordered, StreamExt};
8 use std::collections::VecDeque;
9 use std::collections::btree_map::{BTreeMap, Entry};
10 use std::future;
11 use std::pin::{Pin, pin};
12 use std::sync::{
13     Arc, Mutex,
14     atomic::{
15         AtomicBool, AtomicU64, AtomicUsize,
16         Ordering::{Relaxed, SeqCst},
17     },
18 };
19 use std::task::Poll;
20 use std::time::{Duration, Instant};
21 use tokio::sync::Notify;
22 use wasmtime::AsContextMut;
23 use wasmtime::component::Accessor;
24 use wasmtime::{Store, StoreContextMut};
25 
26 /// Alternative p2 bindings generated with `exports: { default: async | store }`
27 /// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
28 pub mod p2 {
29     #[expect(missing_docs, reason = "bindgen-generated code")]
30     pub mod bindings {
31         wasmtime::component::bindgen!({
32             path: "wit",
33             world: "wasi:http/proxy",
34             imports: { default: tracing },
35             exports: { default: async | store },
36             require_store_data_send: true,
37             with: {
38                 // http is in this crate
39                 "wasi:http": crate::bindings::http,
40                 // Upstream package dependencies
41                 "wasi:io": wasmtime_wasi::p2::bindings::io,
42             }
43         });
44 
45         pub use wasi::*;
46     }
47 }
48 
49 /// Represents either a `wasi:http/incoming-handler@0.2.x` or
50 /// `wasi:http/handler@0.3.x` pre-instance.
51 pub enum ProxyPre<T: 'static> {
52     /// A `wasi:http/incoming-handler@0.2.x` pre-instance.
53     P2(p2::bindings::ProxyPre<T>),
54     /// A `wasi:http/handler@0.3.x` pre-instance.
55     #[cfg(feature = "p3")]
56     P3(p3::bindings::ProxyPre<T>),
57 }
58 
59 impl<T: 'static> ProxyPre<T> {
60     async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
61     where
62         T: Send,
63     {
64         Ok(match self {
65             Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
66             #[cfg(feature = "p3")]
67             Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
68         })
69     }
70 }
71 
72 /// Represents either a `wasi:http/incoming-handler@0.2.x` or
73 /// `wasi:http/handler@0.3.x` instance.
74 pub enum Proxy {
75     /// A `wasi:http/incoming-handler@0.2.x` instance.
76     P2(p2::bindings::Proxy),
77     /// A `wasi:http/handler@0.3.x` instance.
78     #[cfg(feature = "p3")]
79     P3(p3::bindings::Proxy),
80 }
81 
82 /// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or
83 /// `wasi:http/handler@0.3.x` instance.
84 pub type TaskFn<T> = Box<
85     dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
86         + Send,
87 >;
88 
89 /// Async MPMC channel where each item is delivered to at most one consumer.
90 struct Queue<T> {
91     queue: Mutex<VecDeque<T>>,
92     notify: Notify,
93 }
94 
95 impl<T> Default for Queue<T> {
96     fn default() -> Self {
97         Self {
98             queue: Default::default(),
99             notify: Default::default(),
100         }
101     }
102 }
103 
104 impl<T> Queue<T> {
105     fn is_empty(&self) -> bool {
106         self.queue.lock().unwrap().is_empty()
107     }
108 
109     fn push(&self, item: T) {
110         self.queue.lock().unwrap().push_back(item);
111         self.notify.notify_one();
112     }
113 
114     fn try_pop(&self) -> Option<T> {
115         self.queue.lock().unwrap().pop_front()
116     }
117 
118     async fn pop(&self) -> T {
119         // This code comes from the Unbound MPMC Channel example in [the
120         // `tokio::sync::Notify`
121         // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
122 
123         let mut notified = pin!(self.notify.notified());
124 
125         loop {
126             notified.as_mut().enable();
127             if let Some(item) = self.try_pop() {
128                 return item;
129             }
130             notified.as_mut().await;
131             notified.set(self.notify.notified());
132         }
133     }
134 }
135 
136 /// Bundles a [`Store`] with a callback to write a profile (if configured).
137 pub struct StoreBundle<T: 'static> {
138     /// The [`Store`] to use to handle requests.
139     pub store: Store<T>,
140     /// Callback to write a profile (if enabled) once all requests have been
141     /// handled.
142     pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
143 }
144 
145 /// Represents the application-specific state of a web server.
146 pub trait HandlerState: 'static + Sync + Send {
147     /// The type of the associated data for [`Store`]s created using
148     /// [`new_store`].
149     type StoreData: Send;
150 
151     /// Create a new [`Store`] for handling one or more requests.
152     ///
153     /// The `req_id` parameter is the value passed in the call to
154     /// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
155     /// will belong.  See that function's documentation for details.
156     fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
157 
158     /// Maximum time allowed to handle a request.
159     ///
160     /// In practice, a guest may be allowed to run up to 2x this time in the
161     /// case of instance reuse to avoid penalizing concurrent requests being
162     /// handled by the same instance.
163     fn request_timeout(&self) -> Duration;
164 
165     /// Maximum time to keep an idle instance around before dropping it.
166     fn idle_instance_timeout(&self) -> Duration;
167 
168     /// Maximum number of requests to handle using a single instance before
169     /// dropping it.
170     fn max_instance_reuse_count(&self) -> usize;
171 
172     /// Maximum number of requests to handle concurrently using a single
173     /// instance.
174     fn max_instance_concurrent_reuse_count(&self) -> usize;
175 
176     /// Called when a worker exits with an error.
177     fn handle_worker_error(&self, error: anyhow::Error);
178 }
179 
180 struct ProxyHandlerInner<S: HandlerState> {
181     state: S,
182     instance_pre: ProxyPre<S::StoreData>,
183     next_id: AtomicU64,
184     task_queue: Queue<TaskFn<S::StoreData>>,
185     worker_count: AtomicUsize,
186 }
187 
188 /// Helper utility to track the start times of tasks accepted by a worker.
189 ///
190 /// This is used to ensure that timeouts are enforced even when the
191 /// `StoreContextMut::run_concurrent` event loop is unable to make progress due
192 /// to the guest either busy looping or being blocked on a synchronous call to a
193 /// host function which has exclusive access to the `Store`.
194 #[derive(Default)]
195 struct StartTimes(BTreeMap<Instant, usize>);
196 
197 impl StartTimes {
198     fn add(&mut self, time: Instant) {
199         *self.0.entry(time).or_insert(0) += 1;
200     }
201 
202     fn remove(&mut self, time: Instant) {
203         let Entry::Occupied(mut entry) = self.0.entry(time) else {
204             unreachable!()
205         };
206         match *entry.get() {
207             0 => unreachable!(),
208             1 => {
209                 entry.remove();
210             }
211             _ => {
212                 *entry.get_mut() -= 1;
213             }
214         }
215     }
216 
217     fn earliest(&self) -> Option<Instant> {
218         self.0.first_key_value().map(|(&k, _)| k)
219     }
220 }
221 
222 struct Worker<S>
223 where
224     S: HandlerState,
225 {
226     handler: ProxyHandler<S>,
227     available: bool,
228 }
229 
230 impl<S> Worker<S>
231 where
232     S: HandlerState,
233 {
234     fn set_available(&mut self, available: bool) {
235         if available != self.available {
236             self.available = available;
237             if available {
238                 self.handler.0.worker_count.fetch_add(1, Relaxed);
239             } else {
240                 // Here we use `SeqCst` to ensure the load/store is ordered
241                 // correctly with respect to the `Queue::is_empty` check we do
242                 // below.
243                 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
244                 // This addresses what would otherwise be a race condition in
245                 // `ProxyHandler::spawn` where it only starts a worker if the
246                 // available worker count is zero.  If we decrement the count to
247                 // zero right after `ProxyHandler::spawn` checks it, then no
248                 // worker will be started; thus it becomes our responsibility to
249                 // start a worker here instead.
250                 if count == 1 && !self.handler.0.task_queue.is_empty() {
251                     self.handler.start_worker(None, None);
252                 }
253             }
254         }
255     }
256 
257     async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
258         if let Err(error) = self.run_(task, req_id).await {
259             self.handler.0.state.handle_worker_error(error);
260         }
261     }
262 
263     async fn run_(
264         &mut self,
265         task: Option<TaskFn<S::StoreData>>,
266         req_id: Option<u64>,
267     ) -> Result<()> {
268         // NB: The code the follows is rather subtle in that it is structured
269         // carefully to provide a few key invariants related to how instance
270         // reuse and request timeouts interact:
271         //
272         // - A task must never be allowed to run for more than 2x the request
273         // timeout, if any.
274         //
275         // - Every task we accept here must be allowed to run for at least 1x
276         // the request timeout, if any.
277         //
278         // - When more than one task is run concurrently in the same instance,
279         // we must stop accepting new tasks as soon as any existing task reaches
280         // the request timeout.  This serves to cap the amount of time we need
281         // to keep the instance alive before _all_ tasks have either completed
282         // or timed out.
283         //
284         // As of this writing, there's an additional wrinkle that makes
285         // guaranteeing those invariants particularly tricky: per #11869 and
286         // #11870, busy guest loops, epoch interruption, and host functions
287         // registered using `Linker::func_{wrap,new}_async` all require
288         // blocking, exclusive access to the `Store`, which effectively prevents
289         // the `StoreContextMut::run_concurrent` event loop from making
290         // progress.  That, in turn, prevents any concurrent tasks from
291         // executing, and also prevents the `AsyncFnOnce` passed to
292         // `run_concurrent` from being polled.  Consequently, we must rely on a
293         // "second line of defense" to ensure tasks are timed out promptly,
294         // which is to check for timeouts _outside_ the `run_concurrent` future.
295         // Once the aforementioned issues have been addressed, we'll be able to
296         // remove that check and its associated baggage.
297 
298         let handler = &self.handler.0;
299 
300         let StoreBundle {
301             mut store,
302             write_profile,
303         } = handler.state.new_store(req_id)?;
304 
305         let request_timeout = handler.state.request_timeout();
306         let idle_instance_timeout = handler.state.idle_instance_timeout();
307         let max_instance_reuse_count = handler.state.max_instance_reuse_count();
308         let max_instance_concurrent_reuse_count =
309             handler.state.max_instance_concurrent_reuse_count();
310 
311         let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
312         let accept_concurrent = AtomicBool::new(true);
313         let task_start_times = Mutex::new(StartTimes::default());
314 
315         let mut future = pin!(store.run_concurrent(async |accessor| {
316             let mut reuse_count = 0;
317             let mut timed_out = false;
318             let mut futures = FuturesUnordered::new();
319 
320             let accept_task = |task: TaskFn<S::StoreData>,
321                                futures: &mut FuturesUnordered<_>,
322                                reuse_count: &mut usize| {
323                 // Set `accept_concurrent` to false, conservatively assuming
324                 // that the new task will be CPU-bound, at least to begin with.
325                 // Only once the `StoreContextMut::run_concurrent` event loop
326                 // returns `Pending` will we set `accept_concurrent` back to
327                 // true and consider accepting more tasks.
328                 //
329                 // This approach avoids taking on more than one CPU-bound task
330                 // at a time, which would hurt throughput vs. leaving the
331                 // additional tasks for other workers to handle.
332                 accept_concurrent.store(false, Relaxed);
333                 *reuse_count += 1;
334 
335                 let start_time = Instant::now().checked_add(request_timeout);
336                 if let Some(start_time) = start_time {
337                     task_start_times.lock().unwrap().add(start_time);
338                 }
339 
340                 futures.push(tokio::time::timeout(request_timeout, async move {
341                     (task)(accessor, proxy).await;
342                     start_time
343                 }));
344             };
345 
346             if let Some(task) = task {
347                 accept_task(task, &mut futures, &mut reuse_count);
348             }
349 
350             let handler = self.handler.clone();
351             while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
352                 let new_task = {
353                     let future_count = futures.len();
354                     let mut next_future = pin!(async {
355                         if futures.is_empty() {
356                             future::pending().await
357                         } else {
358                             futures.next().await.unwrap()
359                         }
360                     });
361                     let mut next_task = pin!(tokio::time::timeout(
362                         if future_count == 0 {
363                             idle_instance_timeout
364                         } else {
365                             Duration::MAX
366                         },
367                         handler.0.task_queue.pop()
368                     ));
369                     // Poll any existing tasks, and if they're all `Pending`
370                     // _and_ we haven't reached any reuse limits yet, poll for a
371                     // new task from the queue.
372                     //
373                     // Note the the order of operations here is important.  By
374                     // polling `next_future` first, we'll disover any tasks that
375                     // may have timed out, at which point we'll stop accepting
376                     // new tasks altogether (see below for details).  This is
377                     // especially imporant in the case where the task was
378                     // blocked on a synchronous call to a host function which
379                     // has exclusive access to the `Store`; once that call
380                     // finishes, the first think we need to do is time out the
381                     // task.  If we were to poll for a new task first, then we'd
382                     // have to wait for _that_ task to finish or time out before
383                     // we could kill the instance.
384                     future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
385                         Poll::Pending => {
386                             // Note that `Pending` here doesn't necessarily mean
387                             // all tasks are blocked on I/O.  They might simply
388                             // be waiting for some deferred work to be done by
389                             // the next turn of the
390                             // `StoreContextMut::run_concurrent` event loop.
391                             // Therefore, we check `accept_concurrent` here and
392                             // only advertise we have capacity for another task
393                             // if either we have no tasks at all or all our
394                             // tasks really are blocked on I/O.
395                             self.set_available(
396                                 reuse_count < max_instance_reuse_count
397                                     && future_count < max_instance_concurrent_reuse_count
398                                     && (future_count == 0 || accept_concurrent.load(Relaxed)),
399                             );
400 
401                             if self.available {
402                                 next_task.as_mut().poll(cx).map(Some)
403                             } else {
404                                 Poll::Pending
405                             }
406                         }
407                         Poll::Ready(Ok(start_time)) => {
408                             // Task completed; carry on!
409                             if let Some(start_time) = start_time {
410                                 task_start_times.lock().unwrap().remove(start_time);
411                             }
412                             Poll::Ready(None)
413                         }
414                         Poll::Ready(Err(_)) => {
415                             // Task timed out; stop accepting new tasks, but
416                             // continue polling until any other, in-progress
417                             // tasks until they have either finished or timed
418                             // out.  This effectively kicks off a "graceful
419                             // shutdown" of the worker, allowing any other
420                             // concurrent tasks time to finish before we drop
421                             // the instance.
422                             //
423                             // TODO: We should also send a cancel request to the
424                             // timed-out task to give it a chance to shut down
425                             // gracefully (and delay dropping the instance for a
426                             // reasonable amount of time), but as of this
427                             // writing Wasmtime does not yet provide an API for
428                             // doing that.  See issue #11833.
429                             timed_out = true;
430                             reuse_count = max_instance_reuse_count;
431                             Poll::Ready(None)
432                         }
433                     })
434                     .await
435                 };
436 
437                 match new_task {
438                     Some(Ok(task)) => {
439                         accept_task(task, &mut futures, &mut reuse_count);
440                     }
441                     Some(Err(_)) => break,
442                     None => {}
443                 }
444             }
445 
446             accessor.with(|mut access| write_profile(access.as_context_mut()));
447 
448             if timed_out {
449                 Err(anyhow!("guest timed out"))
450             } else {
451                 anyhow::Ok(())
452             }
453         }));
454 
455         let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
456 
457         future::poll_fn(|cx| {
458             let poll = future.as_mut().poll(cx);
459             if poll.is_pending() {
460                 // If the future returns `Pending`, that's either because it's
461                 // idle (in which case it can definitely accept a new task) or
462                 // because all its tasks are awaiting I/O, in which case it may
463                 // have capacity for additional tasks to run concurrently.
464                 //
465                 // However, if one of the tasks is blocked on a sync call to a
466                 // host function which has exclusive access to the `Store`, the
467                 // `StoreContextMut::run_concurrent` event loop will be unable
468                 // to make progress until that call finishes.  Similarly, if the
469                 // task loops indefinitely, subject only to epoch interruption,
470                 // the event loop will also be stuck.  Either way, any task
471                 // timeouts created inside the `AsyncFnOnce` we passed to
472                 // `run_concurrent` won't have a chance to trigger.
473                 // Consequently, we need to _also_ enforce timeouts here,
474                 // outside the event loop.
475                 //
476                 // Therefore, we check if the oldest outstanding task has been
477                 // running for at least `request_timeout*2`, which is the
478                 // maximum time needed for any other concurrent tasks to
479                 // complete or time out, at which point we can safely discard
480                 // the instance.  If that deadline has not yet arrived, we
481                 // schedule a wakeup to occur when it does.
482                 //
483                 // We uphold the "never kill an instance with a task which has
484                 // been running for less than the request timeout" invariant
485                 // here by noting that this timeout will only trigger if the
486                 // `AsyncFnOnce` we passed to `run_concurrent` has been unable
487                 // to run for at least the past `request_timeout` amount of
488                 // time, meaning it can't possibly have accepted a task newer
489                 // than that.
490                 if let Some(deadline) = task_start_times
491                     .lock()
492                     .unwrap()
493                     .earliest()
494                     .and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
495                 {
496                     sleep.as_mut().reset(deadline.into());
497                     // Note that this will schedule a wakeup for later if the
498                     // deadline has not yet arrived:
499                     if sleep.as_mut().poll(cx).is_ready() {
500                         // Deadline has been reached; kill the instance with an
501                         // error.
502                         return Poll::Ready(Err(anyhow!("guest timed out")));
503                     }
504                 }
505 
506                 // Otherwise, if no timeouts have elapsed, we set
507                 // `accept_concurrent` to true and, if it wasn't already true
508                 // before, poll the future one more time so it can ask for
509                 // another task if appropriate.
510                 if !accept_concurrent.swap(true, Relaxed) {
511                     return future.as_mut().poll(cx);
512                 }
513             }
514 
515             poll
516         })
517         .await?
518     }
519 }
520 
521 impl<S> Drop for Worker<S>
522 where
523     S: HandlerState,
524 {
525     fn drop(&mut self) {
526         self.set_available(false);
527     }
528 }
529 
530 /// Represents the state of a web server.
531 ///
532 /// Note that this supports optional instance reuse, enabled when
533 /// `S::max_instance_reuse_count()` returns a number greater than one.  See
534 /// [`Self::push`] for details.
535 pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
536 
537 impl<S: HandlerState> Clone for ProxyHandler<S> {
538     fn clone(&self) -> Self {
539         Self(self.0.clone())
540     }
541 }
542 
543 impl<S> ProxyHandler<S>
544 where
545     S: HandlerState,
546 {
547     /// Create a new `ProxyHandler` with the specified application state and
548     /// pre-instance.
549     pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
550         Self(Arc::new(ProxyHandlerInner {
551             state,
552             instance_pre,
553             next_id: AtomicU64::from(0),
554             task_queue: Default::default(),
555             worker_count: AtomicUsize::from(0),
556         }))
557     }
558 
559     /// Push a task to the task queue for this handler.
560     ///
561     /// This will either spawn a new background worker to run the task or
562     /// deliver it to an already-running worker.
563     ///
564     /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
565     /// new worker is started for this task.  It is intended to be used as a
566     /// "request identifier" corresponding to that task and can be used e.g. to
567     /// prefix all logging from the `Store` with that identifier.  Note that a
568     /// non-`None` value only makes sense when `<S as
569     /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
570     /// will not match subsequent tasks handled by the worker.
571     pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
572         match self.0.state.max_instance_reuse_count() {
573             0 => panic!("`max_instance_reuse_count` must be at least 1"),
574             _ => {
575                 if self.0.worker_count.load(Relaxed) == 0 {
576                     // There are no available workers; skip the queue and pass
577                     // the task directly to the worker, which improves
578                     // performance as measured by `wasmtime-server-rps.sh` by
579                     // about 15%.
580                     self.start_worker(Some(task), req_id);
581                 } else {
582                     self.0.task_queue.push(task);
583                     // Start a new worker to handle the task if the last worker
584                     // just went unavailable.  See also `Worker::set_available`
585                     // for what happens if the available worker count goes to
586                     // zero right after we check it here, and note that we only
587                     // check the count _after_ we've pushed the task to the
588                     // queue.  We use `SeqCst` here to ensure that we get an
589                     // updated view of `worker_count` as it exists after the
590                     // `Queue::push` above.
591                     //
592                     // The upshot is that at least one (or more) of the
593                     // following will happen:
594                     //
595                     // - An existing worker will accept the task
596                     // - We'll start a new worker here to accept the task
597                     // - `Worker::set_available` will start a new worker to accept the task
598                     //
599                     // I.e. it should not be possible for the task to be
600                     // orphaned indefinitely in the queue without being
601                     // accepted.
602                     if self.0.worker_count.load(SeqCst) == 0 {
603                         self.start_worker(None, None);
604                     }
605                 }
606             }
607         }
608     }
609 
610     /// Generate a unique request ID.
611     pub fn next_req_id(&self) -> u64 {
612         self.0.next_id.fetch_add(1, Relaxed)
613     }
614 
615     /// Return a reference to the application state.
616     pub fn state(&self) -> &S {
617         &self.0.state
618     }
619 
620     /// Return a reference to the pre-instance.
621     pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
622         &self.0.instance_pre
623     }
624 
625     fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
626         tokio::spawn(
627             Worker {
628                 handler: self.clone(),
629                 available: false,
630             }
631             .run(task, req_id),
632         );
633     }
634 }
635