1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics.  See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create.  Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest.  This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker.  That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52 
53 use crate::component::func::{self, Func, call_post_return};
54 use crate::component::{
55     HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56 };
57 use crate::fiber::{self, StoreFiber, StoreFiberYield};
58 use crate::prelude::*;
59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60 use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62 use crate::{
63     AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64     bail, error::format_err,
65 };
66 use error_contexts::GlobalErrorContextRefCount;
67 use futures::channel::oneshot;
68 use futures::future::{self, FutureExt};
69 use futures::stream::{FuturesUnordered, StreamExt};
70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71 use std::any::Any;
72 use std::borrow::ToOwned;
73 use std::boxed::Box;
74 use std::cell::UnsafeCell;
75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76 use std::fmt;
77 use std::future::Future;
78 use std::marker::PhantomData;
79 use std::mem::{self, ManuallyDrop, MaybeUninit};
80 use std::ops::DerefMut;
81 use std::pin::{Pin, pin};
82 use std::ptr::{self, NonNull};
83 use std::sync::Arc;
84 use std::task::{Context, Poll, Waker};
85 use std::vec::Vec;
86 use table::{TableDebug, TableId};
87 use wasmtime_environ::Trap;
88 use wasmtime_environ::component::{
89     CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
90     MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
91     RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
92     TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
93     TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
94 };
95 use wasmtime_environ::packed_option::ReservedValue;
96 
97 pub use abort::JoinHandle;
98 pub use future_stream_any::{FutureAny, StreamAny};
99 pub use futures_and_streams::{
100     Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101     FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102     StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103 };
104 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105 
106 mod abort;
107 mod error_contexts;
108 mod future_stream_any;
109 mod futures_and_streams;
110 pub(crate) mod table;
111 pub(crate) mod tls;
112 
113 /// Constant defined in the Component Model spec to indicate that the async
114 /// intrinsic (e.g. `future.write`) has not yet completed.
115 const BLOCKED: u32 = 0xffff_ffff;
116 
117 /// Corresponds to `CallState` in the upstream spec.
118 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
119 pub enum Status {
120     Starting = 0,
121     Started = 1,
122     Returned = 2,
123     StartCancelled = 3,
124     ReturnCancelled = 4,
125 }
126 
127 impl Status {
128     /// Packs this status and the optional `waitable` provided into a 32-bit
129     /// result that the canonical ABI requires.
130     ///
131     /// The low 4 bits are reserved for the status while the upper 28 bits are
132     /// the waitable, if present.
133     pub fn pack(self, waitable: Option<u32>) -> u32 {
134         assert!(matches!(self, Status::Returned) == waitable.is_none());
135         let waitable = waitable.unwrap_or(0);
136         assert!(waitable < (1 << 28));
137         (waitable << 4) | (self as u32)
138     }
139 }
140 
141 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
142 /// data.
143 #[derive(Clone, Copy, Debug)]
144 enum Event {
145     None,
146     Cancelled,
147     Subtask {
148         status: Status,
149     },
150     StreamRead {
151         code: ReturnCode,
152         pending: Option<(TypeStreamTableIndex, u32)>,
153     },
154     StreamWrite {
155         code: ReturnCode,
156         pending: Option<(TypeStreamTableIndex, u32)>,
157     },
158     FutureRead {
159         code: ReturnCode,
160         pending: Option<(TypeFutureTableIndex, u32)>,
161     },
162     FutureWrite {
163         code: ReturnCode,
164         pending: Option<(TypeFutureTableIndex, u32)>,
165     },
166 }
167 
168 impl Event {
169     /// Lower this event to core Wasm integers for delivery to the guest.
170     ///
171     /// Note that the waitable handle, if any, is assumed to be lowered
172     /// separately.
173     fn parts(self) -> (u32, u32) {
174         const EVENT_NONE: u32 = 0;
175         const EVENT_SUBTASK: u32 = 1;
176         const EVENT_STREAM_READ: u32 = 2;
177         const EVENT_STREAM_WRITE: u32 = 3;
178         const EVENT_FUTURE_READ: u32 = 4;
179         const EVENT_FUTURE_WRITE: u32 = 5;
180         const EVENT_CANCELLED: u32 = 6;
181         match self {
182             Event::None => (EVENT_NONE, 0),
183             Event::Cancelled => (EVENT_CANCELLED, 0),
184             Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185             Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186             Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187             Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188             Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189         }
190     }
191 }
192 
193 /// Corresponds to `CallbackCode` in the spec.
194 mod callback_code {
195     pub const EXIT: u32 = 0;
196     pub const YIELD: u32 = 1;
197     pub const WAIT: u32 = 2;
198 }
199 
200 /// A flag indicating that the callee is an async-lowered export.
201 ///
202 /// This may be passed to the `async-start` intrinsic from a fused adapter.
203 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204 
205 /// Provides access to either store data (via the `get` method) or the store
206 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207 /// instance to which the current host task belongs.
208 ///
209 /// See [`Accessor::with`] for details.
210 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211     store: StoreContextMut<'a, T>,
212     get_data: fn(&mut T) -> D::Data<'_>,
213 }
214 
215 impl<'a, T, D> Access<'a, T, D>
216 where
217     D: HasData + ?Sized,
218     T: 'static,
219 {
220     /// Creates a new [`Access`] from its component parts.
221     pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222         Self { store, get_data }
223     }
224 
225     /// Get mutable access to the store data.
226     pub fn data_mut(&mut self) -> &mut T {
227         self.store.data_mut()
228     }
229 
230     /// Get mutable access to the store data.
231     pub fn get(&mut self) -> D::Data<'_> {
232         (self.get_data)(self.data_mut())
233     }
234 
235     /// Spawn a background task.
236     ///
237     /// See [`Accessor::spawn`] for details.
238     pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239     where
240         T: 'static,
241     {
242         let accessor = Accessor {
243             get_data: self.get_data,
244             token: StoreToken::new(self.store.as_context_mut()),
245         };
246         self.store
247             .as_context_mut()
248             .spawn_with_accessor(accessor, task)
249     }
250 
251     /// Returns the getter this accessor is using to project from `T` into
252     /// `D::Data`.
253     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254         self.get_data
255     }
256 }
257 
258 impl<'a, T, D> AsContext for Access<'a, T, D>
259 where
260     D: HasData + ?Sized,
261     T: 'static,
262 {
263     type Data = T;
264 
265     fn as_context(&self) -> StoreContext<'_, T> {
266         self.store.as_context()
267     }
268 }
269 
270 impl<'a, T, D> AsContextMut for Access<'a, T, D>
271 where
272     D: HasData + ?Sized,
273     T: 'static,
274 {
275     fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276         self.store.as_context_mut()
277     }
278 }
279 
280 /// Provides scoped mutable access to store data in the context of a concurrent
281 /// host task future.
282 ///
283 /// This allows multiple host task futures to execute concurrently and access
284 /// the store between (but not across) `await` points.
285 ///
286 /// # Rationale
287 ///
288 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
290 /// literally store these values. The basic problem is that when a concurrent
291 /// host future is being polled it has access to `&mut T` (and the whole
292 /// `Store`) but when it's not being polled it does not have access to these
293 /// values. This reflects how the store is only ever polling one future at a
294 /// time so the store is effectively being passed between futures.
295 ///
296 /// Rust's `Future` trait, however, has no means of passing a `Store`
297 /// temporarily between futures. The [`Context`](std::task::Context) type does
298 /// not have the ability to attach arbitrary information to it at this time.
299 /// This type, [`Accessor`], is used to bridge this expressivity gap.
300 ///
301 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
302 /// a synchronous manner, the current store. The [`Accessor::with`] function
303 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305 /// not take an `async` closure as its argument, instead it's a synchronous
306 /// closure which must complete during on run of `Future::poll`. This reflects
307 /// how the store is temporarily made available while a host future is being
308 /// polled.
309 ///
310 /// # Implementation
311 ///
312 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313 /// this type additionally doesn't even have a lifetime parameter. This is
314 /// instead a representation of proof of the ability to acquire these while a
315 /// future is being polled. Wasmtime will, when it polls a host future,
316 /// configure ambient state such that the `Accessor` that a future closes over
317 /// will work and be able to access the store.
318 ///
319 /// This has a number of implications for users such as:
320 ///
321 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322 ///   the lifetime of a single future.
323 /// * A future is expected to, however, close over an `Accessor` and keep it
324 ///   alive probably for the duration of the entire future.
325 /// * Different host futures will be given different `Accessor`s, and that's
326 ///   intentional.
327 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328 ///   alleviates some otherwise required bounds to be written down.
329 ///
330 /// # Using `Accessor` in `Drop`
331 ///
332 /// The methods on `Accessor` are only expected to work in the context of
333 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334 /// host future can be dropped at any time throughout the system and Wasmtime
335 /// store context is not necessarily available at that time. It's recommended to
336 /// not use `Accessor` methods in anything connected to a `Drop` implementation
337 /// as they will panic and have unintended results. If you run into this though
338 /// feel free to file an issue on the Wasmtime repository.
339 pub struct Accessor<T: 'static, D = HasSelf<T>>
340 where
341     D: HasData + ?Sized,
342 {
343     token: StoreToken<T>,
344     get_data: fn(&mut T) -> D::Data<'_>,
345 }
346 
347 /// A helper trait to take any type of accessor-with-data in functions.
348 ///
349 /// This trait is similar to [`AsContextMut`] except that it's used when
350 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351 /// [`Accessor`] is the main type used in concurrent settings and is passed to
352 /// functions such as [`Func::call_concurrent`].
353 ///
354 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355 /// this trait. This effectively means that regardless of the `D` in
356 /// `Accessor<T, D>` it can still be passed to a function which just needs a
357 /// store accessor.
358 ///
359 /// Acquiring an [`Accessor`] can be done through
360 /// [`StoreContextMut::run_concurrent`] for example or in a host function
361 /// through
362 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
363 pub trait AsAccessor {
364     /// The `T` in `Store<T>` that this accessor refers to.
365     type Data: 'static;
366 
367     /// The `D` in `Accessor<T, D>`, or the projection out of
368     /// `Self::Data`.
369     type AccessorData: HasData + ?Sized;
370 
371     /// Returns the accessor that this is referring to.
372     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373 }
374 
375 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376     type Data = T::Data;
377     type AccessorData = T::AccessorData;
378 
379     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380         T::as_accessor(self)
381     }
382 }
383 
384 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385     type Data = T;
386     type AccessorData = D;
387 
388     fn as_accessor(&self) -> &Accessor<T, D> {
389         self
390     }
391 }
392 
393 // Note that it is intentional at this time that `Accessor` does not actually
394 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
395 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396 // that matter). This is used to ergonomically simplify bindings where the
397 // majority of the time `Accessor` is closed over in a future which then needs
398 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399 // you already have to write `T: 'static`...) it helps to avoid this.
400 //
401 // Note as well that `Accessor` doesn't actually store its data at all. Instead
402 // it's more of a "proof" of what can be accessed from TLS. API design around
403 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
404 // intentionally made to ensure that `Accessor` is ideally only used in the
405 // context that TLS variables are actually set. For example host functions are
406 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
407 // the value outside of a future. Within the future the TLS variables are all
408 // guaranteed to be set while the future is being polled.
409 //
410 // Finally though this is not an ironclad guarantee, but nor does it need to be.
411 // The TLS APIs are designed to panic or otherwise model usage where they're
412 // called recursively or similar. It's hoped that code cannot be constructed to
413 // actually hit this at runtime but this is not a safety requirement at this
414 // time.
415 const _: () = {
416     const fn assert<T: Send + Sync>() {}
417     assert::<Accessor<UnsafeCell<u32>>>();
418 };
419 
420 impl<T> Accessor<T> {
421     /// Creates a new `Accessor` backed by the specified functions.
422     ///
423     /// - `get`: used to retrieve the store
424     ///
425     /// - `get_data`: used to "project" from the store's associated data to
426     /// another type (e.g. a field of that data or a wrapper around it).
427     ///
428     /// - `spawn`: used to queue spawned background tasks to be run later
429     pub(crate) fn new(token: StoreToken<T>) -> Self {
430         Self {
431             token,
432             get_data: |x| x,
433         }
434     }
435 }
436 
437 impl<T, D> Accessor<T, D>
438 where
439     D: HasData + ?Sized,
440 {
441     /// Run the specified closure, passing it mutable access to the store.
442     ///
443     /// This function is one of the main building blocks of the [`Accessor`]
444     /// type. This yields synchronous, blocking, access to the store via an
445     /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446     /// providing the ability to access `D` via [`Access::get`]. Note that the
447     /// `fun` here is given only temporary access to the store and `T`/`D`
448     /// meaning that the return value `R` here is not allowed to capture borrows
449     /// into the two. If access is needed to data within `T` or `D` outside of
450     /// this closure then it must be `clone`d out, for example.
451     ///
452     /// # Panics
453     ///
454     /// This function will panic if it is call recursively with any other
455     /// accessor already in scope. For example if `with` is called within `fun`,
456     /// then this function will panic. It is up to the embedder to ensure that
457     /// this does not happen.
458     pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459         tls::get(|vmstore| {
460             fun(Access {
461                 store: self.token.as_context_mut(vmstore),
462                 get_data: self.get_data,
463             })
464         })
465     }
466 
467     /// Returns the getter this accessor is using to project from `T` into
468     /// `D::Data`.
469     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470         self.get_data
471     }
472 
473     /// Changes this accessor to access `D2` instead of the current type
474     /// parameter `D`.
475     ///
476     /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477     ///
478     /// # Panics
479     ///
480     /// When using this API the returned value is disconnected from `&self` and
481     /// the lifetime binding the `self` argument. An `Accessor` only works
482     /// within the context of the closure or async closure that it was
483     /// originally given to, however. This means that due to the fact that the
484     /// returned value has no lifetime connection it's possible to use the
485     /// accessor outside of `&self`, the original accessor, and panic.
486     ///
487     /// The returned value should only be used within the scope of the original
488     /// `Accessor` that `self` refers to.
489     pub fn with_getter<D2: HasData>(
490         &self,
491         get_data: fn(&mut T) -> D2::Data<'_>,
492     ) -> Accessor<T, D2> {
493         Accessor {
494             token: self.token,
495             get_data,
496         }
497     }
498 
499     /// Spawn a background task which will receive an `&Accessor<T, D>` and
500     /// run concurrently with any other tasks in progress for the current
501     /// store.
502     ///
503     /// This is particularly useful for host functions which return a `stream`
504     /// or `future` such that the code to write to the write end of that
505     /// `stream` or `future` must run after the function returns.
506     ///
507     /// The returned [`JoinHandle`] may be used to cancel the task.
508     ///
509     /// # Panics
510     ///
511     /// Panics if called within a closure provided to the [`Accessor::with`]
512     /// function. This can only be called outside an active invocation of
513     /// [`Accessor::with`].
514     pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515     where
516         T: 'static,
517     {
518         let accessor = self.clone_for_spawn();
519         self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520     }
521 
522     fn clone_for_spawn(&self) -> Self {
523         Self {
524             token: self.token,
525             get_data: self.get_data,
526         }
527     }
528 }
529 
530 /// Represents a task which may be provided to `Accessor::spawn`,
531 /// `Accessor::forward`, or `StorecContextMut::spawn`.
532 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
533 // option.
534 //
535 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
536 // bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
537 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
538 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
539 // it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
540 // best we can do for the time being.
541 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542 where
543     D: HasData + ?Sized,
544 {
545     /// Run the task.
546     fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547 }
548 
549 /// Represents parameter and result metadata for the caller side of a
550 /// guest->guest call orchestrated by a fused adapter.
551 enum CallerInfo {
552     /// Metadata for a call to an async-lowered import
553     Async {
554         params: Vec<ValRaw>,
555         has_result: bool,
556     },
557     /// Metadata for a call to an sync-lowered import
558     Sync {
559         params: Vec<ValRaw>,
560         result_count: u32,
561     },
562 }
563 
564 /// Indicates how a guest task is waiting on a waitable set.
565 enum WaitMode {
566     /// The guest task is waiting using `task.wait`
567     Fiber(StoreFiber<'static>),
568     /// The guest task is waiting via a callback declared as part of an
569     /// async-lifted export.
570     Callback(Instance),
571 }
572 
573 /// Represents the reason a fiber is suspending itself.
574 #[derive(Debug)]
575 enum SuspendReason {
576     /// The fiber is waiting for an event to be delivered to the specified
577     /// waitable set or task.
578     Waiting {
579         set: TableId<WaitableSet>,
580         thread: QualifiedThreadId,
581         skip_may_block_check: bool,
582     },
583     /// The fiber has finished handling its most recent work item and is waiting
584     /// for another (or to be dropped if it is no longer needed).
585     NeedWork,
586     /// The fiber is yielding and should be resumed once other tasks have had a
587     /// chance to run.
588     Yielding {
589         thread: QualifiedThreadId,
590         skip_may_block_check: bool,
591     },
592     /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
593     ExplicitlySuspending {
594         thread: QualifiedThreadId,
595         skip_may_block_check: bool,
596     },
597 }
598 
599 /// Represents a pending call into guest code for a given guest task.
600 enum GuestCallKind {
601     /// Indicates there's an event to deliver to the task, possibly related to a
602     /// waitable set the task has been waiting on or polling.
603     DeliverEvent {
604         /// The instance to which the task belongs.
605         instance: Instance,
606         /// The waitable set the event belongs to, if any.
607         ///
608         /// If this is `None` the event will be waiting in the
609         /// `GuestTask::event` field for the task.
610         set: Option<TableId<WaitableSet>>,
611     },
612     /// Indicates that a new guest task call is pending and may be executed
613     /// using the specified closure.
614     ///
615     /// If the closure returns `Ok(Some(call))`, the `call` should be run
616     /// immediately using `handle_guest_call`.
617     StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
618     StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
619 }
620 
621 impl fmt::Debug for GuestCallKind {
622     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623         match self {
624             Self::DeliverEvent { instance, set } => f
625                 .debug_struct("DeliverEvent")
626                 .field("instance", instance)
627                 .field("set", set)
628                 .finish(),
629             Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
630             Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
631         }
632     }
633 }
634 
635 /// The target of a suspension intrinsic.
636 #[derive(Copy, Clone, Debug)]
637 pub enum SuspensionTarget {
638     SomeSuspended(u32),
639     Some(u32),
640     None,
641 }
642 
643 impl SuspensionTarget {
644     fn is_none(&self) -> bool {
645         matches!(self, SuspensionTarget::None)
646     }
647     fn is_some(&self) -> bool {
648         !self.is_none()
649     }
650 }
651 
652 /// Represents a pending call into guest code for a given guest thread.
653 #[derive(Debug)]
654 struct GuestCall {
655     thread: QualifiedThreadId,
656     kind: GuestCallKind,
657 }
658 
659 impl GuestCall {
660     /// Returns whether or not the call is ready to run.
661     ///
662     /// A call will not be ready to run if either:
663     ///
664     /// - the (sub-)component instance to be called has already been entered and
665     /// cannot be reentered until an in-progress call completes
666     ///
667     /// - the call is for a not-yet started task and the (sub-)component
668     /// instance to be called has backpressure enabled
669     fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
670         let instance = store
671             .concurrent_state_mut()
672             .get_mut(self.thread.task)?
673             .instance;
674         let state = store.instance_state(instance).concurrent_state();
675 
676         let ready = match &self.kind {
677             GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
678             GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
679             GuestCallKind::StartExplicit(_) => true,
680         };
681         log::trace!(
682             "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
683             state.do_not_enter,
684             state.backpressure
685         );
686         Ok(ready)
687     }
688 }
689 
690 /// Job to be run on a worker fiber.
691 enum WorkerItem {
692     GuestCall(GuestCall),
693     Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
694 }
695 
696 /// Represents a pending work item to be handled by the event loop for a given
697 /// component instance.
698 enum WorkItem {
699     /// A host task to be pushed to `ConcurrentState::futures`.
700     PushFuture(AlwaysMut<HostTaskFuture>),
701     /// A fiber to resume.
702     ResumeFiber(StoreFiber<'static>),
703     /// A thread to resume.
704     ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
705     /// A pending call into guest code for a given guest task.
706     GuestCall(RuntimeComponentInstanceIndex, GuestCall),
707     /// A job to run on a worker fiber.
708     WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
709 }
710 
711 impl fmt::Debug for WorkItem {
712     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
713         match self {
714             Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
715             Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
716             Self::ResumeThread(instance, thread) => f
717                 .debug_tuple("ResumeThread")
718                 .field(instance)
719                 .field(thread)
720                 .finish(),
721             Self::GuestCall(instance, call) => f
722                 .debug_tuple("GuestCall")
723                 .field(instance)
724                 .field(call)
725                 .finish(),
726             Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
727         }
728     }
729 }
730 
731 /// Whether a suspension intrinsic was cancelled or completed
732 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
733 pub(crate) enum WaitResult {
734     Cancelled,
735     Completed,
736 }
737 
738 /// Poll the specified future until it completes on behalf of a guest->host call
739 /// using a sync-lowered import.
740 ///
741 /// This is similar to `Instance::first_poll` except it's for sync-lowered
742 /// imports, meaning we don't need to handle cancellation and we can block the
743 /// caller until the task completes, at which point the caller can handle
744 /// lowering the result to the guest's stack and linear memory.
745 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
746     store: &mut dyn VMStore,
747     future: impl Future<Output = Result<R>> + Send + 'static,
748 ) -> Result<R> {
749     let state = store.concurrent_state_mut();
750     let task = state.unwrap_current_host_thread();
751 
752     // Wrap the future in a closure which will take care of stashing the result
753     // in `GuestTask::result` and resuming this fiber when the host task
754     // completes.
755     let mut future = Box::pin(async move {
756         let result = future.await?;
757         tls::get(move |store| {
758             let state = store.concurrent_state_mut();
759             state.get_mut(task)?.result = Some(Box::new(result) as _);
760 
761             Waitable::Host(task).set_event(
762                 state,
763                 Some(Event::Subtask {
764                     status: Status::Returned,
765                 }),
766             )?;
767 
768             Ok(())
769         })
770     }) as HostTaskFuture;
771 
772     // Finally, poll the future.  We can use a dummy `Waker` here because we'll
773     // add the future to `ConcurrentState::futures` and poll it automatically
774     // from the event loop if it doesn't complete immediately here.
775     let poll = tls::set(store, || {
776         future
777             .as_mut()
778             .poll(&mut Context::from_waker(&Waker::noop()))
779     });
780 
781     match poll {
782         // It completed immediately; check the result and delete the task.
783         Poll::Ready(result) => result?,
784 
785         // It did not complete immediately; add it to
786         // `ConcurrentState::futures` so it will be polled via the event loop;
787         // then use `GuestTask::sync_call_set` to wait for the task to
788         // complete, suspending the current fiber until it does so.
789         Poll::Pending => {
790             let state = store.concurrent_state_mut();
791             state.push_future(future);
792 
793             let caller = state.get_mut(task)?.caller;
794             let set = state.get_mut(caller.task)?.sync_call_set;
795             Waitable::Host(task).join(state, Some(set))?;
796 
797             store.suspend(SuspendReason::Waiting {
798                 set,
799                 thread: caller,
800                 skip_may_block_check: false,
801             })?;
802 
803             // Remove the `task` from the `sync_call_set` to ensure that when
804             // this function returns and the task is deleted that there are no
805             // more lingering references to this host task.
806             Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
807         }
808     }
809 
810     // Retrieve and return the result.
811     Ok(*store
812         .concurrent_state_mut()
813         .get_mut(task)?
814         .result
815         .take()
816         .unwrap()
817         .downcast()
818         .unwrap())
819 }
820 
821 /// Execute the specified guest call.
822 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
823     let mut next = Some(call);
824     while let Some(call) = next.take() {
825         match call.kind {
826             GuestCallKind::DeliverEvent { instance, set } => {
827                 let (event, waitable) = instance
828                     .get_event(store, call.thread.task, set, true)?
829                     .unwrap();
830                 let state = store.concurrent_state_mut();
831                 let task = state.get_mut(call.thread.task)?;
832                 let runtime_instance = task.instance;
833                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
834 
835                 log::trace!(
836                     "use callback to deliver event {event:?} to {:?} for {waitable:?}",
837                     call.thread,
838                 );
839 
840                 let old_thread = store.set_thread(call.thread);
841                 log::trace!(
842                     "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
843                     call.thread
844                 );
845 
846                 store.enter_instance(runtime_instance);
847 
848                 let callback = store
849                     .concurrent_state_mut()
850                     .get_mut(call.thread.task)?
851                     .callback
852                     .take()
853                     .unwrap();
854 
855                 let code = callback(store, event, handle)?;
856 
857                 store
858                     .concurrent_state_mut()
859                     .get_mut(call.thread.task)?
860                     .callback = Some(callback);
861 
862                 store.exit_instance(runtime_instance)?;
863 
864                 store.set_thread(old_thread);
865 
866                 next = instance.handle_callback_code(
867                     store,
868                     call.thread,
869                     runtime_instance.index,
870                     code,
871                 )?;
872 
873                 log::trace!(
874                     "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
875                 );
876             }
877             GuestCallKind::StartImplicit(fun) => {
878                 next = fun(store)?;
879             }
880             GuestCallKind::StartExplicit(fun) => {
881                 fun(store)?;
882             }
883         }
884     }
885 
886     Ok(())
887 }
888 
889 impl<T> Store<T> {
890     /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
891     pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
892     where
893         T: Send + 'static,
894     {
895         ensure!(
896             self.as_context().0.concurrency_support(),
897             "cannot use `run_concurrent` when Config::concurrency_support disabled",
898         );
899         self.as_context_mut().run_concurrent(fun).await
900     }
901 
902     #[doc(hidden)]
903     pub fn assert_concurrent_state_empty(&mut self) {
904         self.as_context_mut().assert_concurrent_state_empty();
905     }
906 
907     #[doc(hidden)]
908     pub fn concurrent_state_table_size(&mut self) -> usize {
909         self.as_context_mut().concurrent_state_table_size()
910     }
911 
912     /// Convenience wrapper for [`StoreContextMut::spawn`].
913     pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
914     where
915         T: 'static,
916     {
917         self.as_context_mut().spawn(task)
918     }
919 }
920 
921 impl<T> StoreContextMut<'_, T> {
922     /// Assert that all the relevant tables and queues in the concurrent state
923     /// for this store are empty.
924     ///
925     /// This is for sanity checking in integration tests
926     /// (e.g. `component-async-tests`) that the relevant state has been cleared
927     /// after each test concludes.  This should help us catch leaks, e.g. guest
928     /// tasks which haven't been deleted despite having completed and having
929     /// been dropped by their supertasks.
930     ///
931     /// Only intended for use in Wasmtime's own testing.
932     #[doc(hidden)]
933     pub fn assert_concurrent_state_empty(self) {
934         let store = self.0;
935         store
936             .store_data_mut()
937             .components
938             .assert_instance_states_empty();
939         let state = store.concurrent_state_mut();
940         assert!(
941             state.table.get_mut().is_empty(),
942             "non-empty table: {:?}",
943             state.table.get_mut()
944         );
945         assert!(state.high_priority.is_empty());
946         assert!(state.low_priority.is_empty());
947         assert!(state.current_thread.is_none());
948         assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
949         assert!(state.global_error_context_ref_counts.is_empty());
950     }
951 
952     /// Helper function to perform tests over the size of the concurrent state
953     /// table which can be useful for detecting leaks.
954     ///
955     /// Only intended for use in Wasmtime's own testing.
956     #[doc(hidden)]
957     pub fn concurrent_state_table_size(&mut self) -> usize {
958         self.0
959             .concurrent_state_mut()
960             .table
961             .get_mut()
962             .iter_mut()
963             .count()
964     }
965 
966     /// Spawn a background task to run as part of this instance's event loop.
967     ///
968     /// The task will receive an `&Accessor<U>` and run concurrently with
969     /// any other tasks in progress for the instance.
970     ///
971     /// Note that the task will only make progress if and when the event loop
972     /// for this instance is run.
973     ///
974     /// The returned [`JoinHandle`] may be used to cancel the task.
975     pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
976     where
977         T: 'static,
978     {
979         let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
980         self.spawn_with_accessor(accessor, task)
981     }
982 
983     /// Internal implementation of `spawn` functions where a `store` is
984     /// available along with an `Accessor`.
985     fn spawn_with_accessor<D>(
986         self,
987         accessor: Accessor<T, D>,
988         task: impl AccessorTask<T, D>,
989     ) -> JoinHandle
990     where
991         T: 'static,
992         D: HasData + ?Sized,
993     {
994         // Create an "abortable future" here where internally the future will
995         // hook calls to poll and possibly spawn more background tasks on each
996         // iteration.
997         let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
998         self.0
999             .concurrent_state_mut()
1000             .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1001         handle
1002     }
1003 
1004     /// Run the specified closure `fun` to completion as part of this store's
1005     /// event loop.
1006     ///
1007     /// This will run `fun` as part of this store's event loop until it
1008     /// yields a result.  `fun` is provided an [`Accessor`], which provides
1009     /// controlled access to the store and its data.
1010     ///
1011     /// This function can be used to invoke [`Func::call_concurrent`] for
1012     /// example within the async closure provided here.
1013     ///
1014     /// This function will unconditionally return an error if
1015     /// [`Config::concurrency_support`] is disabled.
1016     ///
1017     /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1018     ///
1019     /// # Store-blocking behavior
1020     ///
1021     /// At this time there are certain situations in which the `Future` returned
1022     /// by the `AsyncFnOnce` passed to this function will not be polled for an
1023     /// extended period of time, despite one or more `Waker::wake` events having
1024     /// occurred for the task to which it belongs.  This can manifest as the
1025     /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1026     /// the `Store` being held by e.g. a blocking host function, preventing the
1027     /// `Future` from being polled. A canonical example of this is when the
1028     /// `fun` provided to this function attempts to set a timeout for an
1029     /// invocation of a wasm function. In this situation the async closure is
1030     /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1031     /// to elapse. At this time this setup will not always work and the timeout
1032     /// may not reliably fire.
1033     ///
1034     /// This function will not block the current thread and as such is always
1035     /// suitable to run in an `async` context, but the current implementation of
1036     /// Wasmtime can lead to situations where a certain wasm computation is
1037     /// required to make progress the closure to make progress. This is an
1038     /// artifact of Wasmtime's historical implementation of `async` functions
1039     /// and is the topic of [#11869] and [#11870]. In the timeout example from
1040     /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1041     /// progress for a readiness notification of (b) to get delivered.
1042     ///
1043     /// This effectively means that it's not possible to reliably perform a
1044     /// "select" operation within the `fun` closure, which timeouts for example
1045     /// are based on. Fixing this requires some relatively major refactoring
1046     /// work within Wasmtime itself. This is a known pitfall otherwise and one
1047     /// that is intended to be fixed one day. In the meantime it's recommended
1048     /// to apply timeouts or such to the entire `run_concurrent` call itself
1049     /// rather than internally.
1050     ///
1051     /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1052     /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1053     ///
1054     /// # Example
1055     ///
1056     /// ```
1057     /// # use {
1058     /// #   wasmtime::{
1059     /// #     error::{Result},
1060     /// #     component::{ Component, Linker, Resource, ResourceTable},
1061     /// #     Config, Engine, Store
1062     /// #   },
1063     /// # };
1064     /// #
1065     /// # struct MyResource(u32);
1066     /// # struct Ctx { table: ResourceTable }
1067     /// #
1068     /// # async fn foo() -> Result<()> {
1069     /// # let mut config = Config::new();
1070     /// # let engine = Engine::new(&config)?;
1071     /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1072     /// # let mut linker = Linker::new(&engine);
1073     /// # let component = Component::new(&engine, "")?;
1074     /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1075     /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1076     /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1077     /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1078     ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1079     ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1080     ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1081     ///    bar.call_concurrent(accessor, (value.0,)).await?;
1082     ///    Ok(())
1083     /// }).await??;
1084     /// # Ok(())
1085     /// # }
1086     /// ```
1087     pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1088     where
1089         T: Send + 'static,
1090     {
1091         ensure!(
1092             self.0.concurrency_support(),
1093             "cannot use `run_concurrent` when Config::concurrency_support disabled",
1094         );
1095         self.do_run_concurrent(fun, false).await
1096     }
1097 
1098     pub(super) async fn run_concurrent_trap_on_idle<R>(
1099         self,
1100         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1101     ) -> Result<R>
1102     where
1103         T: Send + 'static,
1104     {
1105         self.do_run_concurrent(fun, true).await
1106     }
1107 
1108     async fn do_run_concurrent<R>(
1109         mut self,
1110         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1111         trap_on_idle: bool,
1112     ) -> Result<R>
1113     where
1114         T: Send + 'static,
1115     {
1116         debug_assert!(self.0.concurrency_support());
1117         check_recursive_run();
1118         let token = StoreToken::new(self.as_context_mut());
1119 
1120         struct Dropper<'a, T: 'static, V> {
1121             store: StoreContextMut<'a, T>,
1122             value: ManuallyDrop<V>,
1123         }
1124 
1125         impl<'a, T, V> Drop for Dropper<'a, T, V> {
1126             fn drop(&mut self) {
1127                 tls::set(self.store.0, || {
1128                     // SAFETY: Here we drop the value without moving it for the
1129                     // first and only time -- per the contract for `Drop::drop`,
1130                     // this code won't run again, and the `value` field will no
1131                     // longer be accessible.
1132                     unsafe { ManuallyDrop::drop(&mut self.value) }
1133                 });
1134             }
1135         }
1136 
1137         let accessor = &Accessor::new(token);
1138         let dropper = &mut Dropper {
1139             store: self,
1140             value: ManuallyDrop::new(fun(accessor)),
1141         };
1142         // SAFETY: We never move `dropper` nor its `value` field.
1143         let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1144 
1145         dropper
1146             .store
1147             .as_context_mut()
1148             .poll_until(future, trap_on_idle)
1149             .await
1150     }
1151 
1152     /// Run this store's event loop.
1153     ///
1154     /// The returned future will resolve when the specified future completes or,
1155     /// if `trap_on_idle` is true, when the event loop can't make further
1156     /// progress.
1157     async fn poll_until<R>(
1158         mut self,
1159         mut future: Pin<&mut impl Future<Output = R>>,
1160         trap_on_idle: bool,
1161     ) -> Result<R>
1162     where
1163         T: Send + 'static,
1164     {
1165         struct Reset<'a, T: 'static> {
1166             store: StoreContextMut<'a, T>,
1167             futures: Option<FuturesUnordered<HostTaskFuture>>,
1168         }
1169 
1170         impl<'a, T> Drop for Reset<'a, T> {
1171             fn drop(&mut self) {
1172                 if let Some(futures) = self.futures.take() {
1173                     *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1174                 }
1175             }
1176         }
1177 
1178         loop {
1179             // Take `ConcurrentState::futures` out of the store so we can poll
1180             // it while also safely giving any of the futures inside access to
1181             // `self`.
1182             let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1183             let mut reset = Reset {
1184                 store: self.as_context_mut(),
1185                 futures,
1186             };
1187             let mut next = pin!(reset.futures.as_mut().unwrap().next());
1188 
1189             enum PollResult<R> {
1190                 Complete(R),
1191                 ProcessWork(Vec<WorkItem>),
1192             }
1193             let result = future::poll_fn(|cx| {
1194                 // First, poll the future we were passed as an argument and
1195                 // return immediately if it's ready.
1196                 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1197                     return Poll::Ready(Ok(PollResult::Complete(value)));
1198                 }
1199 
1200                 // Next, poll `ConcurrentState::futures` (which includes any
1201                 // pending host tasks and/or background tasks), returning
1202                 // immediately if one of them fails.
1203                 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1204                     Poll::Ready(Some(output)) => {
1205                         match output {
1206                             Err(e) => return Poll::Ready(Err(e)),
1207                             Ok(()) => {}
1208                         }
1209                         Poll::Ready(true)
1210                     }
1211                     Poll::Ready(None) => Poll::Ready(false),
1212                     Poll::Pending => Poll::Pending,
1213                 };
1214 
1215                 // Next, collect the next batch of work items to process, if any.
1216                 // This will be either all of the high-priority work items, or if
1217                 // there are none, a single low-priority work item.
1218                 let state = reset.store.0.concurrent_state_mut();
1219                 let ready = state.collect_work_items_to_run();
1220                 if !ready.is_empty() {
1221                     return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1222                 }
1223 
1224                 // Finally, if we have nothing else to do right now, determine what to do
1225                 // based on whether there are any pending futures in
1226                 // `ConcurrentState::futures`.
1227                 return match next {
1228                     Poll::Ready(true) => {
1229                         // In this case, one of the futures in
1230                         // `ConcurrentState::futures` completed
1231                         // successfully, so we return now and continue
1232                         // the outer loop in case there is another one
1233                         // ready to complete.
1234                         Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1235                     }
1236                     Poll::Ready(false) => {
1237                         // Poll the future we were passed one last time
1238                         // in case one of `ConcurrentState::futures` had
1239                         // the side effect of unblocking it.
1240                         if let Poll::Ready(value) =
1241                             tls::set(reset.store.0, || future.as_mut().poll(cx))
1242                         {
1243                             Poll::Ready(Ok(PollResult::Complete(value)))
1244                         } else {
1245                             // In this case, there are no more pending
1246                             // futures in `ConcurrentState::futures`,
1247                             // there are no remaining work items, _and_
1248                             // the future we were passed as an argument
1249                             // still hasn't completed.
1250                             if trap_on_idle {
1251                                 // `trap_on_idle` is true, so we exit
1252                                 // immediately.
1253                                 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1254                             } else {
1255                                 // `trap_on_idle` is false, so we assume
1256                                 // that future will wake up and give us
1257                                 // more work to do when it's ready to.
1258                                 Poll::Pending
1259                             }
1260                         }
1261                     }
1262                     // There is at least one pending future in
1263                     // `ConcurrentState::futures` and we have nothing
1264                     // else to do but wait for now, so we return
1265                     // `Pending`.
1266                     Poll::Pending => Poll::Pending,
1267                 };
1268             })
1269             .await;
1270 
1271             // Put the `ConcurrentState::futures` back into the store before we
1272             // return or handle any work items since one or more of those items
1273             // might append more futures.
1274             drop(reset);
1275 
1276             match result? {
1277                 // The future we were passed as an argument completed, so we
1278                 // return the result.
1279                 PollResult::Complete(value) => break Ok(value),
1280                 // The future we were passed has not yet completed, so handle
1281                 // any work items and then loop again.
1282                 PollResult::ProcessWork(ready) => {
1283                     struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1284                         store: StoreContextMut<'a, T>,
1285                         ready: I,
1286                     }
1287 
1288                     impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1289                         fn drop(&mut self) {
1290                             while let Some(item) = self.ready.next() {
1291                                 match item {
1292                                     WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1293                                     WorkItem::PushFuture(future) => {
1294                                         tls::set(self.store.0, move || drop(future))
1295                                     }
1296                                     _ => {}
1297                                 }
1298                             }
1299                         }
1300                     }
1301 
1302                     let mut dispose = Dispose {
1303                         store: self.as_context_mut(),
1304                         ready: ready.into_iter(),
1305                     };
1306 
1307                     while let Some(item) = dispose.ready.next() {
1308                         dispose
1309                             .store
1310                             .as_context_mut()
1311                             .handle_work_item(item)
1312                             .await?;
1313                     }
1314                 }
1315             }
1316         }
1317     }
1318 
1319     /// Handle the specified work item, possibly resuming a fiber if applicable.
1320     async fn handle_work_item(self, item: WorkItem) -> Result<()>
1321     where
1322         T: Send,
1323     {
1324         log::trace!("handle work item {item:?}");
1325         match item {
1326             WorkItem::PushFuture(future) => {
1327                 self.0
1328                     .concurrent_state_mut()
1329                     .futures
1330                     .get_mut()
1331                     .as_mut()
1332                     .unwrap()
1333                     .push(future.into_inner());
1334             }
1335             WorkItem::ResumeFiber(fiber) => {
1336                 self.0.resume_fiber(fiber).await?;
1337             }
1338             WorkItem::ResumeThread(_, thread) => {
1339                 if let GuestThreadState::Ready(fiber) = mem::replace(
1340                     &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1341                     GuestThreadState::Running,
1342                 ) {
1343                     self.0.resume_fiber(fiber).await?;
1344                 } else {
1345                     bail!("cannot resume non-pending thread {thread:?}");
1346                 }
1347             }
1348             WorkItem::GuestCall(_, call) => {
1349                 if call.is_ready(self.0)? {
1350                     self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1351                 } else {
1352                     let state = self.0.concurrent_state_mut();
1353                     let task = state.get_mut(call.thread.task)?;
1354                     if !task.starting_sent {
1355                         task.starting_sent = true;
1356                         if let GuestCallKind::StartImplicit(_) = &call.kind {
1357                             Waitable::Guest(call.thread.task).set_event(
1358                                 state,
1359                                 Some(Event::Subtask {
1360                                     status: Status::Starting,
1361                                 }),
1362                             )?;
1363                         }
1364                     }
1365 
1366                     let instance = state.get_mut(call.thread.task)?.instance;
1367                     self.0
1368                         .instance_state(instance)
1369                         .concurrent_state()
1370                         .pending
1371                         .insert(call.thread, call.kind);
1372                 }
1373             }
1374             WorkItem::WorkerFunction(fun) => {
1375                 self.run_on_worker(WorkerItem::Function(fun)).await?;
1376             }
1377         }
1378 
1379         Ok(())
1380     }
1381 
1382     /// Execute the specified guest call on a worker fiber.
1383     async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1384     where
1385         T: Send,
1386     {
1387         let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1388             fiber
1389         } else {
1390             fiber::make_fiber(self.0, move |store| {
1391                 loop {
1392                     match store.concurrent_state_mut().worker_item.take().unwrap() {
1393                         WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1394                         WorkerItem::Function(fun) => fun.into_inner()(store)?,
1395                     }
1396 
1397                     store.suspend(SuspendReason::NeedWork)?;
1398                 }
1399             })?
1400         };
1401 
1402         let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1403         assert!(worker_item.is_none());
1404         *worker_item = Some(item);
1405 
1406         self.0.resume_fiber(worker).await
1407     }
1408 
1409     /// Wrap the specified host function in a future which will call it, passing
1410     /// it an `&Accessor<T>`.
1411     ///
1412     /// See the `Accessor` documentation for details.
1413     pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1414     where
1415         T: 'static,
1416         F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1417             + Send
1418             + Sync
1419             + 'static,
1420         R: Send + Sync + 'static,
1421     {
1422         let token = StoreToken::new(self);
1423         async move {
1424             let mut accessor = Accessor::new(token);
1425             closure(&mut accessor).await
1426         }
1427     }
1428 }
1429 
1430 impl StoreOpaque {
1431     /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1432     /// guest-to-guest call or a sync host-to-guest call.
1433     ///
1434     /// This task will only be used for the purpose of handling calls to
1435     /// intrinsic functions; both parameter lowering and result lifting are
1436     /// assumed to be taken care of elsewhere.
1437     pub(crate) fn enter_guest_sync_call(
1438         &mut self,
1439         guest_caller: Option<RuntimeInstance>,
1440         callee_async: bool,
1441         callee: RuntimeInstance,
1442     ) -> Result<()> {
1443         log::trace!("enter sync call {callee:?}");
1444         if !self.concurrency_support() {
1445             return Ok(self.enter_call_not_concurrent());
1446         }
1447 
1448         let state = self.concurrent_state_mut();
1449         let thread = state.current_thread;
1450         let instance = if let Some(thread) = thread.guest() {
1451             Some(state.get_mut(thread.task)?.instance)
1452         } else {
1453             None
1454         };
1455         let task = GuestTask::new(
1456             state,
1457             Box::new(move |_, _| unreachable!()),
1458             LiftResult {
1459                 lift: Box::new(move |_, _| unreachable!()),
1460                 ty: TypeTupleIndex::reserved_value(),
1461                 memory: None,
1462                 string_encoding: StringEncoding::Utf8,
1463             },
1464             if let Some(caller) = guest_caller {
1465                 assert_eq!(caller, instance.unwrap());
1466                 Caller::Guest {
1467                     thread: *thread.guest().unwrap(),
1468                 }
1469             } else {
1470                 Caller::Host {
1471                     tx: None,
1472                     exit_tx: Arc::new(oneshot::channel().0),
1473                     host_future_present: false,
1474                     caller: thread,
1475                 }
1476             },
1477             None,
1478             callee,
1479             callee_async,
1480         )?;
1481 
1482         let guest_task = state.push(task)?;
1483         let new_thread = GuestThread::new_implicit(guest_task);
1484         let guest_thread = state.push(new_thread)?;
1485         Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1486             guest_thread,
1487             self,
1488             callee.index,
1489         )?;
1490 
1491         let state = self.concurrent_state_mut();
1492         state.get_mut(guest_task)?.threads.insert(guest_thread);
1493         if guest_caller.is_some() {
1494             let thread = thread.guest().unwrap();
1495             state.get_mut(thread.task)?.subtasks.insert(guest_task);
1496         }
1497 
1498         self.set_thread(QualifiedThreadId {
1499             task: guest_task,
1500             thread: guest_thread,
1501         });
1502 
1503         Ok(())
1504     }
1505 
1506     /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1507     pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1508         if !self.concurrency_support() {
1509             return Ok(self.exit_call_not_concurrent());
1510         }
1511         let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1512         let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1513         log::trace!("exit sync call {instance:?}");
1514         Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1515             self,
1516             thread,
1517             instance.index,
1518         )?;
1519 
1520         let state = self.concurrent_state_mut();
1521         let task = state.get_mut(thread.task)?;
1522         let caller = match &task.caller {
1523             &Caller::Guest { thread } => {
1524                 assert!(guest_caller);
1525                 thread.into()
1526             }
1527             &Caller::Host { caller, .. } => {
1528                 assert!(!guest_caller);
1529                 caller
1530             }
1531         };
1532         self.set_thread(caller);
1533 
1534         let state = self.concurrent_state_mut();
1535         let task = state.get_mut(thread.task)?;
1536         if task.ready_to_delete() {
1537             state.delete(thread.task)?.dispose(state, thread.task)?;
1538         }
1539 
1540         Ok(())
1541     }
1542 
1543     /// Similar to `enter_guest_sync_call` except for when the guest makes a
1544     /// transition to the host.
1545     ///
1546     /// FIXME: this is called for all guest->host transitions and performs some
1547     /// relatively expensive table manipulations. This would ideally be
1548     /// optimized to avoid the full allocation of a `HostTask` in at least some
1549     /// situations.
1550     pub fn enter_host_call(&mut self) -> Result<()> {
1551         let state = self.concurrent_state_mut();
1552         let caller = state.unwrap_current_guest_thread();
1553         let task = state.push(HostTask::new(caller))?;
1554         log::trace!("new host task {task:?}");
1555         self.set_thread(task);
1556         Ok(())
1557     }
1558 
1559     /// Dual of `enter_host_call` and signifies that the host has finished and
1560     /// will be cleaned up.
1561     ///
1562     /// Note that this isn't invoked when the host is invoked asynchronously and
1563     /// the host isn't complete yet. In that situation the host task persists
1564     /// and will be cleaned up separately.
1565     pub fn exit_host_call(&mut self) -> Result<()> {
1566         let task = self.concurrent_state_mut().unwrap_current_host_thread();
1567         log::trace!("delete host task {task:?}");
1568         let task = self.concurrent_state_mut().delete(task)?;
1569         self.set_thread(task.caller);
1570         Ok(())
1571     }
1572 
1573     /// Determine whether the specified instance may be entered from the host.
1574     ///
1575     /// We return `true` here only if all of the following hold:
1576     ///
1577     /// - The top-level instance is not already on the current task's call stack.
1578     /// - The instance is not in need of a post-return function call.
1579     /// - `self` has not been poisoned due to a trap.
1580     pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1581         if self.trapped() {
1582             return false;
1583         }
1584         if !self.concurrency_support() {
1585             return true;
1586         }
1587         let state = self.concurrent_state_mut();
1588         let mut cur = state.current_thread;
1589         loop {
1590             match cur {
1591                 CurrentThread::None => break true,
1592                 CurrentThread::Guest(thread) => {
1593                     let task = state.get_mut(thread.task).unwrap();
1594 
1595                     // Note that we only compare top-level instance IDs here.
1596                     // The idea is that the host is not allowed to recursively
1597                     // enter a top-level instance even if the specific leaf
1598                     // instance is not on the stack. This the behavior defined
1599                     // in the spec, and it allows us to elide runtime checks in
1600                     // guest-to-guest adapters.
1601                     if task.instance.instance == instance.instance {
1602                         break false;
1603                     }
1604                     cur = match task.caller {
1605                         Caller::Host { caller, .. } => caller,
1606                         Caller::Guest { thread } => thread.into(),
1607                     };
1608                 }
1609                 CurrentThread::Host(id) => {
1610                     cur = state.get_mut(id).unwrap().caller.into();
1611                 }
1612             }
1613         }
1614     }
1615 
1616     /// Helper function to retrieve the `InstanceState` for the
1617     /// specified instance.
1618     fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1619         self.component_instance_mut(instance.instance)
1620             .instance_state(instance.index)
1621     }
1622 
1623     fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1624         // Each time we switch threads, we conservatively set `task_may_block`
1625         // to `false` for the component instance we're switching away from (if
1626         // any), meaning it will be `false` for any new thread created for that
1627         // instance unless explicitly set otherwise.
1628         let state = self.concurrent_state_mut();
1629         let old_thread = mem::replace(&mut state.current_thread, thread.into());
1630         if let Some(old_thread) = old_thread.guest() {
1631             let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1632             self.component_instance_mut(instance)
1633                 .set_task_may_block(false)
1634         }
1635 
1636         // If we're switching to a new thread, set its component instance's
1637         // `task_may_block` according to where it left off.
1638         if self.concurrent_state_mut().current_thread.guest().is_some() {
1639             self.set_task_may_block();
1640         }
1641 
1642         old_thread
1643     }
1644 
1645     /// Set the global variable representing whether the current task may block
1646     /// prior to entering Wasm code.
1647     fn set_task_may_block(&mut self) {
1648         let state = self.concurrent_state_mut();
1649         let guest_thread = state.unwrap_current_guest_thread();
1650         let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1651         let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1652         self.component_instance_mut(instance)
1653             .set_task_may_block(may_block)
1654     }
1655 
1656     pub(crate) fn check_blocking(&mut self) -> Result<()> {
1657         if !self.concurrency_support() {
1658             return Ok(());
1659         }
1660         let state = self.concurrent_state_mut();
1661         let task = state.unwrap_current_guest_thread().task;
1662         let instance = state.get_mut(task).unwrap().instance.instance;
1663         let task_may_block = self.component_instance(instance).get_task_may_block();
1664 
1665         if task_may_block {
1666             Ok(())
1667         } else {
1668             Err(Trap::CannotBlockSyncTask.into())
1669         }
1670     }
1671 
1672     /// Record that we're about to enter a (sub-)component instance which does
1673     /// not support more than one concurrent, stackful activation, meaning it
1674     /// cannot be entered again until the next call returns.
1675     fn enter_instance(&mut self, instance: RuntimeInstance) {
1676         log::trace!("enter {instance:?}");
1677         self.instance_state(instance)
1678             .concurrent_state()
1679             .do_not_enter = true;
1680     }
1681 
1682     /// Record that we've exited a (sub-)component instance previously entered
1683     /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1684     /// See the documentation for the latter for details.
1685     fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1686         log::trace!("exit {instance:?}");
1687         self.instance_state(instance)
1688             .concurrent_state()
1689             .do_not_enter = false;
1690         self.partition_pending(instance)
1691     }
1692 
1693     /// Iterate over `InstanceState::pending`, moving any ready items into the
1694     /// "high priority" work item queue.
1695     ///
1696     /// See `GuestCall::is_ready` for details.
1697     fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1698         for (thread, kind) in
1699             mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1700         {
1701             let call = GuestCall { thread, kind };
1702             if call.is_ready(self)? {
1703                 self.concurrent_state_mut()
1704                     .push_high_priority(WorkItem::GuestCall(instance.index, call));
1705             } else {
1706                 self.instance_state(instance)
1707                     .concurrent_state()
1708                     .pending
1709                     .insert(call.thread, call.kind);
1710             }
1711         }
1712 
1713         Ok(())
1714     }
1715 
1716     /// Implements the `backpressure.{inc,dec}` intrinsics.
1717     pub(crate) fn backpressure_modify(
1718         &mut self,
1719         caller_instance: RuntimeInstance,
1720         modify: impl FnOnce(u16) -> Option<u16>,
1721     ) -> Result<()> {
1722         let state = self.instance_state(caller_instance).concurrent_state();
1723         let old = state.backpressure;
1724         let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1725         state.backpressure = new;
1726 
1727         if old > 0 && new == 0 {
1728             // Backpressure was previously enabled and is now disabled; move any
1729             // newly-eligible guest calls to the "high priority" queue.
1730             self.partition_pending(caller_instance)?;
1731         }
1732 
1733         Ok(())
1734     }
1735 
1736     /// Resume the specified fiber, giving it exclusive access to the specified
1737     /// store.
1738     async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1739         let old_thread = self.concurrent_state_mut().current_thread;
1740         log::trace!("resume_fiber: save current thread {old_thread:?}");
1741 
1742         let fiber = fiber::resolve_or_release(self, fiber).await?;
1743 
1744         self.set_thread(old_thread);
1745 
1746         let state = self.concurrent_state_mut();
1747 
1748         if let Some(ot) = old_thread.guest() {
1749             state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1750         }
1751         log::trace!("resume_fiber: restore current thread {old_thread:?}");
1752 
1753         if let Some(mut fiber) = fiber {
1754             log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1755             // See the `SuspendReason` documentation for what each case means.
1756             match state.suspend_reason.take().unwrap() {
1757                 SuspendReason::NeedWork => {
1758                     if state.worker.is_none() {
1759                         state.worker = Some(fiber);
1760                     } else {
1761                         fiber.dispose(self);
1762                     }
1763                 }
1764                 SuspendReason::Yielding { thread, .. } => {
1765                     state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1766                     let instance = state.get_mut(thread.task)?.instance.index;
1767                     state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1768                 }
1769                 SuspendReason::ExplicitlySuspending { thread, .. } => {
1770                     state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1771                 }
1772                 SuspendReason::Waiting { set, thread, .. } => {
1773                     let old = state
1774                         .get_mut(set)?
1775                         .waiting
1776                         .insert(thread, WaitMode::Fiber(fiber));
1777                     assert!(old.is_none());
1778                 }
1779             };
1780         } else {
1781             log::trace!("resume_fiber: fiber has exited");
1782         }
1783 
1784         Ok(())
1785     }
1786 
1787     /// Suspend the current fiber, storing the reason in
1788     /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1789     /// it should be resumed.
1790     ///
1791     /// See the `SuspendReason` documentation for details.
1792     fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1793         log::trace!("suspend fiber: {reason:?}");
1794 
1795         // If we're yielding or waiting on behalf of a guest thread, we'll need to
1796         // pop the call context which manages resource borrows before suspending
1797         // and then push it again once we've resumed.
1798         let task = match &reason {
1799             SuspendReason::Yielding { thread, .. }
1800             | SuspendReason::Waiting { thread, .. }
1801             | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1802             SuspendReason::NeedWork => None,
1803         };
1804 
1805         let old_guest_thread = if task.is_some() {
1806             self.concurrent_state_mut().current_thread
1807         } else {
1808             CurrentThread::None
1809         };
1810 
1811         // We should not have reached here unless either there's no current
1812         // task, or the current task is permitted to block.  In addition, we
1813         // special-case `thread.switch-to` and waiting for a subtask to go from
1814         // `starting` to `started`, both of which we consider non-blocking
1815         // operations despite requiring a suspend.
1816         assert!(
1817             matches!(
1818                 reason,
1819                 SuspendReason::ExplicitlySuspending {
1820                     skip_may_block_check: true,
1821                     ..
1822                 } | SuspendReason::Waiting {
1823                     skip_may_block_check: true,
1824                     ..
1825                 } | SuspendReason::Yielding {
1826                     skip_may_block_check: true,
1827                     ..
1828                 }
1829             ) || old_guest_thread
1830                 .guest()
1831                 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1832                 .unwrap_or(true)
1833         );
1834 
1835         let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1836         assert!(suspend_reason.is_none());
1837         *suspend_reason = Some(reason);
1838 
1839         self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1840 
1841         if task.is_some() {
1842             self.set_thread(old_guest_thread);
1843         }
1844 
1845         Ok(())
1846     }
1847 
1848     fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1849         let state = self.concurrent_state_mut();
1850         let caller = state.unwrap_current_guest_thread();
1851         let old_set = waitable.common(state)?.set;
1852         let set = state.get_mut(caller.task)?.sync_call_set;
1853         waitable.join(state, Some(set))?;
1854         self.suspend(SuspendReason::Waiting {
1855             set,
1856             thread: caller,
1857             skip_may_block_check: false,
1858         })?;
1859         let state = self.concurrent_state_mut();
1860         waitable.join(state, old_set)
1861     }
1862 }
1863 
1864 impl Instance {
1865     /// Get the next pending event for the specified task and (optional)
1866     /// waitable set, along with the waitable handle if applicable.
1867     fn get_event(
1868         self,
1869         store: &mut StoreOpaque,
1870         guest_task: TableId<GuestTask>,
1871         set: Option<TableId<WaitableSet>>,
1872         cancellable: bool,
1873     ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1874         let state = store.concurrent_state_mut();
1875 
1876         if let Some(event) = state.get_mut(guest_task)?.event.take() {
1877             log::trace!("deliver event {event:?} to {guest_task:?}");
1878 
1879             if cancellable || !matches!(event, Event::Cancelled) {
1880                 return Ok(Some((event, None)));
1881             } else {
1882                 state.get_mut(guest_task)?.event = Some(event);
1883             }
1884         }
1885 
1886         Ok(
1887             if let Some((set, waitable)) = set
1888                 .and_then(|set| {
1889                     state
1890                         .get_mut(set)
1891                         .map(|v| v.ready.pop_first().map(|v| (set, v)))
1892                         .transpose()
1893                 })
1894                 .transpose()?
1895             {
1896                 let common = waitable.common(state)?;
1897                 let handle = common.handle.unwrap();
1898                 let event = common.event.take().unwrap();
1899 
1900                 log::trace!(
1901                     "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1902                 );
1903 
1904                 waitable.on_delivery(store, self, event);
1905 
1906                 Some((event, Some((waitable, handle))))
1907             } else {
1908                 None
1909             },
1910         )
1911     }
1912 
1913     /// Handle the `CallbackCode` returned from an async-lifted export or its
1914     /// callback.
1915     ///
1916     /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1917     /// using `handle_guest_call`.
1918     fn handle_callback_code(
1919         self,
1920         store: &mut StoreOpaque,
1921         guest_thread: QualifiedThreadId,
1922         runtime_instance: RuntimeComponentInstanceIndex,
1923         code: u32,
1924     ) -> Result<Option<GuestCall>> {
1925         let (code, set) = unpack_callback_code(code);
1926 
1927         log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1928 
1929         let state = store.concurrent_state_mut();
1930 
1931         let get_set = |store: &mut StoreOpaque, handle| {
1932             if handle == 0 {
1933                 bail!("invalid waitable-set handle");
1934             }
1935 
1936             let set = store
1937                 .instance_state(RuntimeInstance {
1938                     instance: self.id().instance(),
1939                     index: runtime_instance,
1940                 })
1941                 .handle_table()
1942                 .waitable_set_rep(handle)?;
1943 
1944             Ok(TableId::<WaitableSet>::new(set))
1945         };
1946 
1947         Ok(match code {
1948             callback_code::EXIT => {
1949                 log::trace!("implicit thread {guest_thread:?} completed");
1950                 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1951                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1952                 if task.threads.is_empty() && !task.returned_or_cancelled() {
1953                     bail!(Trap::NoAsyncResult);
1954                 }
1955                 match &task.caller {
1956                     Caller::Host { .. } => {
1957                         if task.ready_to_delete() {
1958                             Waitable::Guest(guest_thread.task)
1959                                 .delete_from(store.concurrent_state_mut())?;
1960                         }
1961                     }
1962                     Caller::Guest { .. } => {
1963                         task.exited = true;
1964                         task.callback = None;
1965                     }
1966                 }
1967                 None
1968             }
1969             callback_code::YIELD => {
1970                 let task = state.get_mut(guest_thread.task)?;
1971                 // If an `Event::Cancelled` is pending, we'll deliver that;
1972                 // otherwise, we'll deliver `Event::None`.  Note that
1973                 // `GuestTask::event` is only ever set to one of those two
1974                 // `Event` variants.
1975                 if let Some(event) = task.event {
1976                     assert!(matches!(event, Event::None | Event::Cancelled));
1977                 } else {
1978                     task.event = Some(Event::None);
1979                 }
1980                 let call = GuestCall {
1981                     thread: guest_thread,
1982                     kind: GuestCallKind::DeliverEvent {
1983                         instance: self,
1984                         set: None,
1985                     },
1986                 };
1987                 if state.may_block(guest_thread.task) {
1988                     // Push this thread onto the "low priority" queue so it runs
1989                     // after any other threads have had a chance to run.
1990                     state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1991                     None
1992                 } else {
1993                     // Yielding in a non-blocking context is defined as a no-op
1994                     // according to the spec, so we must run this thread
1995                     // immediately without allowing any others to run.
1996                     Some(call)
1997                 }
1998             }
1999             callback_code::WAIT => {
2000                 // The task may only return `WAIT` if it was created for a call
2001                 // to an async export).  Otherwise, we'll trap.
2002                 state.check_blocking_for(guest_thread.task)?;
2003 
2004                 let set = get_set(store, set)?;
2005                 let state = store.concurrent_state_mut();
2006 
2007                 if state.get_mut(guest_thread.task)?.event.is_some()
2008                     || !state.get_mut(set)?.ready.is_empty()
2009                 {
2010                     // An event is immediately available; deliver it ASAP.
2011                     state.push_high_priority(WorkItem::GuestCall(
2012                         runtime_instance,
2013                         GuestCall {
2014                             thread: guest_thread,
2015                             kind: GuestCallKind::DeliverEvent {
2016                                 instance: self,
2017                                 set: Some(set),
2018                             },
2019                         },
2020                     ));
2021                 } else {
2022                     // No event is immediately available.
2023                     //
2024                     // We're waiting, so register to be woken up when an event
2025                     // is published for this waitable set.
2026                     //
2027                     // Here we also set `GuestTask::wake_on_cancel` which allows
2028                     // `subtask.cancel` to interrupt the wait.
2029                     let old = state
2030                         .get_mut(guest_thread.thread)?
2031                         .wake_on_cancel
2032                         .replace(set);
2033                     assert!(old.is_none());
2034                     let old = state
2035                         .get_mut(set)?
2036                         .waiting
2037                         .insert(guest_thread, WaitMode::Callback(self));
2038                     assert!(old.is_none());
2039                 }
2040                 None
2041             }
2042             _ => bail!("unsupported callback code: {code}"),
2043         })
2044     }
2045 
2046     fn cleanup_thread(
2047         self,
2048         store: &mut StoreOpaque,
2049         guest_thread: QualifiedThreadId,
2050         runtime_instance: RuntimeComponentInstanceIndex,
2051     ) -> Result<()> {
2052         let guest_id = store
2053             .concurrent_state_mut()
2054             .get_mut(guest_thread.thread)?
2055             .instance_rep;
2056         store
2057             .instance_state(RuntimeInstance {
2058                 instance: self.id().instance(),
2059                 index: runtime_instance,
2060             })
2061             .thread_handle_table()
2062             .guest_thread_remove(guest_id.unwrap())?;
2063 
2064         store.concurrent_state_mut().delete(guest_thread.thread)?;
2065         let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2066         task.threads.remove(&guest_thread.thread);
2067         Ok(())
2068     }
2069 
2070     /// Add the specified guest call to the "high priority" work item queue, to
2071     /// be started as soon as backpressure and/or reentrance rules allow.
2072     ///
2073     /// SAFETY: The raw pointer arguments must be valid references to guest
2074     /// functions (with the appropriate signatures) when the closures queued by
2075     /// this function are called.
2076     unsafe fn queue_call<T: 'static>(
2077         self,
2078         mut store: StoreContextMut<T>,
2079         guest_thread: QualifiedThreadId,
2080         callee: SendSyncPtr<VMFuncRef>,
2081         param_count: usize,
2082         result_count: usize,
2083         async_: bool,
2084         callback: Option<SendSyncPtr<VMFuncRef>>,
2085         post_return: Option<SendSyncPtr<VMFuncRef>>,
2086     ) -> Result<()> {
2087         /// Return a closure which will call the specified function in the scope
2088         /// of the specified task.
2089         ///
2090         /// This will use `GuestTask::lower_params` to lower the parameters, but
2091         /// will not lift the result; instead, it returns a
2092         /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2093         /// any, may be lifted.  Note that an async-lifted export will have
2094         /// returned its result using the `task.return` intrinsic (or not
2095         /// returned a result at all, in the case of `task.cancel`), in which
2096         /// case the "result" of this call will either be a callback code or
2097         /// nothing.
2098         ///
2099         /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2100         /// the returned closure is called.
2101         unsafe fn make_call<T: 'static>(
2102             store: StoreContextMut<T>,
2103             guest_thread: QualifiedThreadId,
2104             callee: SendSyncPtr<VMFuncRef>,
2105             param_count: usize,
2106             result_count: usize,
2107         ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2108         + Send
2109         + Sync
2110         + 'static
2111         + use<T> {
2112             let token = StoreToken::new(store);
2113             move |store: &mut dyn VMStore| {
2114                 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2115 
2116                 store
2117                     .concurrent_state_mut()
2118                     .get_mut(guest_thread.thread)?
2119                     .state = GuestThreadState::Running;
2120                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2121                 let lower = task.lower_params.take().unwrap();
2122 
2123                 lower(store, &mut storage[..param_count])?;
2124 
2125                 let mut store = token.as_context_mut(store);
2126 
2127                 // SAFETY: Per the contract documented in `make_call's`
2128                 // documentation, `callee` must be a valid pointer.
2129                 unsafe {
2130                     crate::Func::call_unchecked_raw(
2131                         &mut store,
2132                         callee.as_non_null(),
2133                         NonNull::new(
2134                             &mut storage[..param_count.max(result_count)]
2135                                 as *mut [MaybeUninit<ValRaw>] as _,
2136                         )
2137                         .unwrap(),
2138                     )?;
2139                 }
2140 
2141                 Ok(storage)
2142             }
2143         }
2144 
2145         // SAFETY: Per the contract described in this function documentation,
2146         // the `callee` pointer which `call` closes over must be valid when
2147         // called by the closure we queue below.
2148         let call = unsafe {
2149             make_call(
2150                 store.as_context_mut(),
2151                 guest_thread,
2152                 callee,
2153                 param_count,
2154                 result_count,
2155             )
2156         };
2157 
2158         let callee_instance = store
2159             .0
2160             .concurrent_state_mut()
2161             .get_mut(guest_thread.task)?
2162             .instance;
2163 
2164         let fun = if callback.is_some() {
2165             assert!(async_);
2166 
2167             Box::new(move |store: &mut dyn VMStore| {
2168                 self.add_guest_thread_to_instance_table(
2169                     guest_thread.thread,
2170                     store,
2171                     callee_instance.index,
2172                 )?;
2173                 let old_thread = store.set_thread(guest_thread);
2174                 log::trace!(
2175                     "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2176                 );
2177 
2178                 store.enter_instance(callee_instance);
2179 
2180                 // SAFETY: See the documentation for `make_call` to review the
2181                 // contract we must uphold for `call` here.
2182                 //
2183                 // Per the contract described in the `queue_call`
2184                 // documentation, the `callee` pointer which `call` closes
2185                 // over must be valid.
2186                 let storage = call(store)?;
2187 
2188                 store.exit_instance(callee_instance)?;
2189 
2190                 store.set_thread(old_thread);
2191                 let state = store.concurrent_state_mut();
2192                 old_thread
2193                     .guest()
2194                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2195                 log::trace!("stackless call: restored {old_thread:?} as current thread");
2196 
2197                 // SAFETY: `wasmparser` will have validated that the callback
2198                 // function returns a `i32` result.
2199                 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2200 
2201                 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2202             })
2203                 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2204         } else {
2205             let token = StoreToken::new(store.as_context_mut());
2206             Box::new(move |store: &mut dyn VMStore| {
2207                 self.add_guest_thread_to_instance_table(
2208                     guest_thread.thread,
2209                     store,
2210                     callee_instance.index,
2211                 )?;
2212                 let old_thread = store.set_thread(guest_thread);
2213                 log::trace!(
2214                     "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2215                 );
2216                 let flags = self.id().get(store).instance_flags(callee_instance.index);
2217 
2218                 // Unless this is a callback-less (i.e. stackful)
2219                 // async-lifted export, we need to record that the instance
2220                 // cannot be entered until the call returns.
2221                 if !async_ {
2222                     store.enter_instance(callee_instance);
2223                 }
2224 
2225                 // SAFETY: See the documentation for `make_call` to review the
2226                 // contract we must uphold for `call` here.
2227                 //
2228                 // Per the contract described in the `queue_call`
2229                 // documentation, the `callee` pointer which `call` closes
2230                 // over must be valid.
2231                 let storage = call(store)?;
2232 
2233                 if async_ {
2234                     let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2235                     if task.threads.len() == 1 && !task.returned_or_cancelled() {
2236                         bail!(Trap::NoAsyncResult);
2237                     }
2238                 } else {
2239                     // This is a sync-lifted export, so now is when we lift the
2240                     // result, optionally call the post-return function, if any,
2241                     // and finally notify any current or future waiters that the
2242                     // subtask has returned.
2243 
2244                     let lift = {
2245                         store.exit_instance(callee_instance)?;
2246 
2247                         let state = store.concurrent_state_mut();
2248                         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2249 
2250                         state
2251                             .get_mut(guest_thread.task)?
2252                             .lift_result
2253                             .take()
2254                             .unwrap()
2255                     };
2256 
2257                     // SAFETY: `result_count` represents the number of core Wasm
2258                     // results returned, per `wasmparser`.
2259                     let result = (lift.lift)(store, unsafe {
2260                         mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2261                             &storage[..result_count],
2262                         )
2263                     })?;
2264 
2265                     let post_return_arg = match result_count {
2266                         0 => ValRaw::i32(0),
2267                         // SAFETY: `result_count` represents the number of
2268                         // core Wasm results returned, per `wasmparser`.
2269                         1 => unsafe { storage[0].assume_init() },
2270                         _ => unreachable!(),
2271                     };
2272 
2273                     unsafe {
2274                         call_post_return(
2275                             token.as_context_mut(store),
2276                             post_return.map(|v| v.as_non_null()),
2277                             post_return_arg,
2278                             flags,
2279                         )?;
2280                     }
2281 
2282                     self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2283                 }
2284 
2285                 // This is a callback-less call, so the implicit thread has now completed
2286                 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2287 
2288                 store.set_thread(old_thread);
2289 
2290                 let state = store.concurrent_state_mut();
2291                 let task = state.get_mut(guest_thread.task)?;
2292 
2293                 match &task.caller {
2294                     Caller::Host { .. } => {
2295                         if task.ready_to_delete() {
2296                             Waitable::Guest(guest_thread.task).delete_from(state)?;
2297                         }
2298                     }
2299                     Caller::Guest { .. } => {
2300                         task.exited = true;
2301                     }
2302                 }
2303 
2304                 Ok(None)
2305             })
2306         };
2307 
2308         store
2309             .0
2310             .concurrent_state_mut()
2311             .push_high_priority(WorkItem::GuestCall(
2312                 callee_instance.index,
2313                 GuestCall {
2314                     thread: guest_thread,
2315                     kind: GuestCallKind::StartImplicit(fun),
2316                 },
2317             ));
2318 
2319         Ok(())
2320     }
2321 
2322     /// Prepare (but do not start) a guest->guest call.
2323     ///
2324     /// This is called from fused adapter code generated in
2325     /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2326     /// are synthesized Wasm functions which move the parameters from the caller
2327     /// to the callee and the result from the callee to the caller,
2328     /// respectively.  The adapter will call `Self::start_call` immediately
2329     /// after calling this function.
2330     ///
2331     /// SAFETY: All the pointer arguments must be valid pointers to guest
2332     /// entities (and with the expected signatures for the function references
2333     /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2334     unsafe fn prepare_call<T: 'static>(
2335         self,
2336         mut store: StoreContextMut<T>,
2337         start: *mut VMFuncRef,
2338         return_: *mut VMFuncRef,
2339         caller_instance: RuntimeComponentInstanceIndex,
2340         callee_instance: RuntimeComponentInstanceIndex,
2341         task_return_type: TypeTupleIndex,
2342         callee_async: bool,
2343         memory: *mut VMMemoryDefinition,
2344         string_encoding: u8,
2345         caller_info: CallerInfo,
2346     ) -> Result<()> {
2347         if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2348             // A task may only call an async-typed function via a sync lower if
2349             // it was created by a call to an async export.  Otherwise, we'll
2350             // trap.
2351             store.0.check_blocking()?;
2352         }
2353 
2354         enum ResultInfo {
2355             Heap { results: u32 },
2356             Stack { result_count: u32 },
2357         }
2358 
2359         let result_info = match &caller_info {
2360             CallerInfo::Async {
2361                 has_result: true,
2362                 params,
2363             } => ResultInfo::Heap {
2364                 results: params.last().unwrap().get_u32(),
2365             },
2366             CallerInfo::Async {
2367                 has_result: false, ..
2368             } => ResultInfo::Stack { result_count: 0 },
2369             CallerInfo::Sync {
2370                 result_count,
2371                 params,
2372             } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2373                 results: params.last().unwrap().get_u32(),
2374             },
2375             CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2376                 result_count: *result_count,
2377             },
2378         };
2379 
2380         let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2381 
2382         // Create a new guest task for the call, closing over the `start` and
2383         // `return_` functions to lift the parameters and lower the result,
2384         // respectively.
2385         let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2386         let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2387         let token = StoreToken::new(store.as_context_mut());
2388         let state = store.0.concurrent_state_mut();
2389         let old_thread = state.unwrap_current_guest_thread();
2390 
2391         assert_eq!(
2392             state.get_mut(old_thread.task)?.instance,
2393             RuntimeInstance {
2394                 instance: self.id().instance(),
2395                 index: caller_instance,
2396             }
2397         );
2398 
2399         let new_task = GuestTask::new(
2400             state,
2401             Box::new(move |store, dst| {
2402                 let mut store = token.as_context_mut(store);
2403                 assert!(dst.len() <= MAX_FLAT_PARAMS);
2404                 // The `+ 1` here accounts for the return pointer, if any:
2405                 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2406                 let count = match caller_info {
2407                     // Async callers, if they have a result, use the last
2408                     // parameter as a return pointer so chop that off if
2409                     // relevant here.
2410                     CallerInfo::Async { params, has_result } => {
2411                         let params = &params[..params.len() - usize::from(has_result)];
2412                         for (param, src) in params.iter().zip(&mut src) {
2413                             src.write(*param);
2414                         }
2415                         params.len()
2416                     }
2417 
2418                     // Sync callers forward everything directly.
2419                     CallerInfo::Sync { params, .. } => {
2420                         for (param, src) in params.iter().zip(&mut src) {
2421                             src.write(*param);
2422                         }
2423                         params.len()
2424                     }
2425                 };
2426                 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2427                 // `wasmtime-cranelift`-generated fused adapter code.  Based on
2428                 // how it was constructed (see
2429                 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2430                 // for details) we know it takes count parameters and returns
2431                 // `dst.len()` results.
2432                 unsafe {
2433                     crate::Func::call_unchecked_raw(
2434                         &mut store,
2435                         start.as_non_null(),
2436                         NonNull::new(
2437                             &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2438                         )
2439                         .unwrap(),
2440                     )?;
2441                 }
2442                 dst.copy_from_slice(&src[..dst.len()]);
2443                 let state = store.0.concurrent_state_mut();
2444                 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2445                     state,
2446                     Some(Event::Subtask {
2447                         status: Status::Started,
2448                     }),
2449                 )?;
2450                 Ok(())
2451             }),
2452             LiftResult {
2453                 lift: Box::new(move |store, src| {
2454                     // SAFETY: See comment in closure passed as `lower_params`
2455                     // parameter above.
2456                     let mut store = token.as_context_mut(store);
2457                     let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2458                     if let ResultInfo::Heap { results } = &result_info {
2459                         my_src.push(ValRaw::u32(*results));
2460                     }
2461                     // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2462                     // `wasmtime-cranelift`-generated fused adapter code.  Based
2463                     // on how it was constructed (see
2464                     // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2465                     // for details) we know it takes `src.len()` parameters and
2466                     // returns up to 1 result.
2467                     unsafe {
2468                         crate::Func::call_unchecked_raw(
2469                             &mut store,
2470                             return_.as_non_null(),
2471                             my_src.as_mut_slice().into(),
2472                         )?;
2473                     }
2474                     let state = store.0.concurrent_state_mut();
2475                     let thread = state.unwrap_current_guest_thread();
2476                     if sync_caller {
2477                         state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2478                             if let ResultInfo::Stack { result_count } = &result_info {
2479                                 match result_count {
2480                                     0 => None,
2481                                     1 => Some(my_src[0]),
2482                                     _ => unreachable!(),
2483                                 }
2484                             } else {
2485                                 None
2486                             },
2487                         );
2488                     }
2489                     Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2490                 }),
2491                 ty: task_return_type,
2492                 memory: NonNull::new(memory).map(SendSyncPtr::new),
2493                 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2494             },
2495             Caller::Guest { thread: old_thread },
2496             None,
2497             RuntimeInstance {
2498                 instance: self.id().instance(),
2499                 index: callee_instance,
2500             },
2501             callee_async,
2502         )?;
2503 
2504         let guest_task = state.push(new_task)?;
2505         let new_thread = GuestThread::new_implicit(guest_task);
2506         let guest_thread = state.push(new_thread)?;
2507         state.get_mut(guest_task)?.threads.insert(guest_thread);
2508 
2509         store
2510             .0
2511             .concurrent_state_mut()
2512             .get_mut(old_thread.task)?
2513             .subtasks
2514             .insert(guest_task);
2515 
2516         // Make the new thread the current one so that `Self::start_call` knows
2517         // which one to start.
2518         store.0.set_thread(QualifiedThreadId {
2519             task: guest_task,
2520             thread: guest_thread,
2521         });
2522         log::trace!(
2523             "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2524         );
2525 
2526         Ok(())
2527     }
2528 
2529     /// Call the specified callback function for an async-lifted export.
2530     ///
2531     /// SAFETY: `function` must be a valid reference to a guest function of the
2532     /// correct signature for a callback.
2533     unsafe fn call_callback<T>(
2534         self,
2535         mut store: StoreContextMut<T>,
2536         function: SendSyncPtr<VMFuncRef>,
2537         event: Event,
2538         handle: u32,
2539     ) -> Result<u32> {
2540         let (ordinal, result) = event.parts();
2541         let params = &mut [
2542             ValRaw::u32(ordinal),
2543             ValRaw::u32(handle),
2544             ValRaw::u32(result),
2545         ];
2546         // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2547         // `wasmtime-cranelift`-generated fused adapter code or
2548         // `component::Options`.  Per `wasmparser` callback signature
2549         // validation, we know it takes three parameters and returns one.
2550         unsafe {
2551             crate::Func::call_unchecked_raw(
2552                 &mut store,
2553                 function.as_non_null(),
2554                 params.as_mut_slice().into(),
2555             )?;
2556         }
2557         Ok(params[0].get_u32())
2558     }
2559 
2560     /// Start a guest->guest call previously prepared using
2561     /// `Self::prepare_call`.
2562     ///
2563     /// This is called from fused adapter code generated in
2564     /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2565     /// this function immediately after calling `Self::prepare_call`.
2566     ///
2567     /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2568     /// functions with the appropriate signatures for the current guest task.
2569     /// If this is a call to an async-lowered import, the actual call may be
2570     /// deferred and run after this function returns, in which case the pointer
2571     /// arguments must also be valid when the call happens.
2572     unsafe fn start_call<T: 'static>(
2573         self,
2574         mut store: StoreContextMut<T>,
2575         callback: *mut VMFuncRef,
2576         post_return: *mut VMFuncRef,
2577         callee: *mut VMFuncRef,
2578         param_count: u32,
2579         result_count: u32,
2580         flags: u32,
2581         storage: Option<&mut [MaybeUninit<ValRaw>]>,
2582     ) -> Result<u32> {
2583         let token = StoreToken::new(store.as_context_mut());
2584         let async_caller = storage.is_none();
2585         let state = store.0.concurrent_state_mut();
2586         let guest_thread = state.unwrap_current_guest_thread();
2587         let callee_async = state.get_mut(guest_thread.task)?.async_function;
2588         let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2589         let param_count = usize::try_from(param_count).unwrap();
2590         assert!(param_count <= MAX_FLAT_PARAMS);
2591         let result_count = usize::try_from(result_count).unwrap();
2592         assert!(result_count <= MAX_FLAT_RESULTS);
2593 
2594         let task = state.get_mut(guest_thread.task)?;
2595         if !callback.is_null() {
2596             // We're calling an async-lifted export with a callback, so store
2597             // the callback and related context as part of the task so we can
2598             // call it later when needed.
2599             let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2600             task.callback = Some(Box::new(move |store, event, handle| {
2601                 let store = token.as_context_mut(store);
2602                 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2603             }));
2604         }
2605 
2606         let Caller::Guest { thread: caller } = &task.caller else {
2607             // As of this writing, `start_call` is only used for guest->guest
2608             // calls.
2609             unreachable!()
2610         };
2611         let caller = *caller;
2612         let caller_instance = state.get_mut(caller.task)?.instance;
2613 
2614         // Queue the call as a "high priority" work item.
2615         unsafe {
2616             self.queue_call(
2617                 store.as_context_mut(),
2618                 guest_thread,
2619                 callee,
2620                 param_count,
2621                 result_count,
2622                 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2623                 NonNull::new(callback).map(SendSyncPtr::new),
2624                 NonNull::new(post_return).map(SendSyncPtr::new),
2625             )?;
2626         }
2627 
2628         let state = store.0.concurrent_state_mut();
2629 
2630         // Use the caller's `GuestTask::sync_call_set` to register interest in
2631         // the subtask...
2632         let guest_waitable = Waitable::Guest(guest_thread.task);
2633         let old_set = guest_waitable.common(state)?.set;
2634         let set = state.get_mut(caller.task)?.sync_call_set;
2635         guest_waitable.join(state, Some(set))?;
2636 
2637         // ... and suspend this fiber temporarily while we wait for it to start.
2638         //
2639         // Note that we _could_ call the callee directly using the current fiber
2640         // rather than suspend this one, but that would make reasoning about the
2641         // event loop more complicated and is probably only worth doing if
2642         // there's a measurable performance benefit.  In addition, it would mean
2643         // blocking the caller if the callee calls a blocking sync-lowered
2644         // import, and as of this writing the spec says we must not do that.
2645         //
2646         // Alternatively, the fused adapter code could be modified to call the
2647         // callee directly without calling a host-provided intrinsic at all (in
2648         // which case it would need to do its own, inline backpressure checks,
2649         // etc.).  Again, we'd want to see a measurable performance benefit
2650         // before committing to such an optimization.  And again, we'd need to
2651         // update the spec to allow that.
2652         let (status, waitable) = loop {
2653             store.0.suspend(SuspendReason::Waiting {
2654                 set,
2655                 thread: caller,
2656                 // Normally, `StoreOpaque::suspend` would assert it's being
2657                 // called from a context where blocking is allowed.  However, if
2658                 // `async_caller` is `true`, we'll only "block" long enough for
2659                 // the callee to start, i.e. we won't repeat this loop, so we
2660                 // tell `suspend` it's okay even if we're not allowed to block.
2661                 // Alternatively, if the callee is not an async function, then
2662                 // we know it won't block anyway.
2663                 skip_may_block_check: async_caller || !callee_async,
2664             })?;
2665 
2666             let state = store.0.concurrent_state_mut();
2667 
2668             log::trace!("taking event for {:?}", guest_thread.task);
2669             let event = guest_waitable.take_event(state)?;
2670             let Some(Event::Subtask { status }) = event else {
2671                 unreachable!();
2672             };
2673 
2674             log::trace!("status {status:?} for {:?}", guest_thread.task);
2675 
2676             if status == Status::Returned {
2677                 // It returned, so we can stop waiting.
2678                 break (status, None);
2679             } else if async_caller {
2680                 // It hasn't returned yet, but the caller is calling via an
2681                 // async-lowered import, so we generate a handle for the task
2682                 // waitable and return the status.
2683                 let handle = store
2684                     .0
2685                     .instance_state(caller_instance)
2686                     .handle_table()
2687                     .subtask_insert_guest(guest_thread.task.rep())?;
2688                 store
2689                     .0
2690                     .concurrent_state_mut()
2691                     .get_mut(guest_thread.task)?
2692                     .common
2693                     .handle = Some(handle);
2694                 break (status, Some(handle));
2695             } else {
2696                 // The callee hasn't returned yet, and the caller is calling via
2697                 // a sync-lowered import, so we loop and keep waiting until the
2698                 // callee returns.
2699             }
2700         };
2701 
2702         guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2703 
2704         // Reset the current thread to point to the caller as it resumes control.
2705         store.0.set_thread(caller);
2706         store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2707         log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2708 
2709         if let Some(storage) = storage {
2710             // The caller used a sync-lowered import to call an async-lifted
2711             // export, in which case the result, if any, has been stashed in
2712             // `GuestTask::sync_result`.
2713             let state = store.0.concurrent_state_mut();
2714             let task = state.get_mut(guest_thread.task)?;
2715             if let Some(result) = task.sync_result.take() {
2716                 if let Some(result) = result {
2717                     storage[0] = MaybeUninit::new(result);
2718                 }
2719 
2720                 if task.exited && task.ready_to_delete() {
2721                     Waitable::Guest(guest_thread.task).delete_from(state)?;
2722                 }
2723             }
2724         }
2725 
2726         Ok(status.pack(waitable))
2727     }
2728 
2729     /// Poll the specified future once on behalf of a guest->host call using an
2730     /// async-lowered import.
2731     ///
2732     /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2733     /// `Pending`, add it to the set of futures to be polled as part of this
2734     /// instance's event loop until it completes, and then return
2735     /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2736     ///
2737     /// Whether the future returns `Ready` immediately or later, the `lower`
2738     /// function will be used to lower the result, if any, into the guest caller's
2739     /// stack and linear memory unless the task has been cancelled.
2740     pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2741         self,
2742         mut store: StoreContextMut<'_, T>,
2743         future: impl Future<Output = Result<R>> + Send + 'static,
2744         lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2745     ) -> Result<Option<u32>> {
2746         let token = StoreToken::new(store.as_context_mut());
2747         let state = store.0.concurrent_state_mut();
2748         let task = state.unwrap_current_host_thread();
2749 
2750         // Create an abortable future which hooks calls to poll and manages call
2751         // context state for the future.
2752         let (join_handle, future) = JoinHandle::run(future);
2753         {
2754             let task = state.get_mut(task)?;
2755             assert!(task.join_handle.is_none());
2756             task.join_handle = Some(join_handle);
2757         }
2758 
2759         let mut future = Box::pin(future);
2760 
2761         // Finally, poll the future.  We can use a dummy `Waker` here because
2762         // we'll add the future to `ConcurrentState::futures` and poll it
2763         // automatically from the event loop if it doesn't complete immediately
2764         // here.
2765         let poll = tls::set(store.0, || {
2766             future
2767                 .as_mut()
2768                 .poll(&mut Context::from_waker(&Waker::noop()))
2769         });
2770 
2771         match poll {
2772             // It finished immediately; lower the result and delete the task.
2773             Poll::Ready(Some(result)) => {
2774                 lower(store.as_context_mut(), result?)?;
2775                 return Ok(None);
2776             }
2777 
2778             // Shouldn't be possible since the future isn't cancelled via the
2779             // `join_handle`.
2780             Poll::Ready(None) => unreachable!(),
2781 
2782             // Future isn't ready yet, so fall through.
2783             Poll::Pending => {}
2784         }
2785 
2786         // It hasn't finished yet; add the future to
2787         // `ConcurrentState::futures` so it will be polled by the event
2788         // loop and allocate a waitable handle to return to the guest.
2789 
2790         // Wrap the future in a closure responsible for lowering the result into
2791         // the guest's stack and memory, as well as notifying any waiters that
2792         // the task returned.
2793         let future = Box::pin(async move {
2794             let result = match future.await {
2795                 Some(result) => result?,
2796                 // Task was cancelled; nothing left to do.
2797                 None => return Ok(()),
2798             };
2799             let on_complete = move |store: &mut dyn VMStore| {
2800                 // Restore the `current_thread` to be the host so `lower` knows
2801                 // how to manipulate borrows and knows which scope of borrows
2802                 // to check.
2803                 let mut store = token.as_context_mut(store);
2804                 let state = store.0.concurrent_state_mut();
2805                 assert!(state.current_thread.is_none());
2806                 store.0.set_thread(task);
2807 
2808                 lower(store.as_context_mut(), result)?;
2809                 let state = store.0.concurrent_state_mut();
2810                 state.get_mut(task)?.join_handle.take();
2811                 Waitable::Host(task).set_event(
2812                     state,
2813                     Some(Event::Subtask {
2814                         status: Status::Returned,
2815                     }),
2816                 )?;
2817 
2818                 // Go back to "no current thread" at the end.
2819                 store.0.set_thread(CurrentThread::None);
2820                 Ok(())
2821             };
2822 
2823             // Here we schedule a task to run on a worker fiber to do the
2824             // lowering since it may involve a call to the guest's realloc
2825             // function. This is necessary because calling the guest while
2826             // there are host embedder frames on the stack is unsound.
2827             tls::get(move |store| {
2828                 store
2829                     .concurrent_state_mut()
2830                     .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2831                         on_complete,
2832                     ))));
2833                 Ok(())
2834             })
2835         });
2836 
2837         // Make this task visible to the guest and then record what it
2838         // was made visible as.
2839         let state = store.0.concurrent_state_mut();
2840         state.push_future(future);
2841         let caller = state.get_mut(task)?.caller;
2842         let instance = state.get_mut(caller.task)?.instance;
2843         let handle = store
2844             .0
2845             .instance_state(instance)
2846             .handle_table()
2847             .subtask_insert_host(task.rep())?;
2848         store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2849         log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2850 
2851         // Restore the currently running thread to this host task's
2852         // caller. Note that the host task isn't deallocated as it's
2853         // within the store and will get deallocated later.
2854         store.0.set_thread(caller);
2855         Ok(Some(handle))
2856     }
2857 
2858     /// Implements the `task.return` intrinsic, lifting the result for the
2859     /// current guest task.
2860     pub(crate) fn task_return(
2861         self,
2862         store: &mut dyn VMStore,
2863         ty: TypeTupleIndex,
2864         options: OptionsIndex,
2865         storage: &[ValRaw],
2866     ) -> Result<()> {
2867         let state = store.concurrent_state_mut();
2868         let guest_thread = state.unwrap_current_guest_thread();
2869         let lift = state
2870             .get_mut(guest_thread.task)?
2871             .lift_result
2872             .take()
2873             .ok_or_else(|| {
2874                 format_err!("`task.return` or `task.cancel` called more than once for current task")
2875             })?;
2876         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2877 
2878         let CanonicalOptions {
2879             string_encoding,
2880             data_model,
2881             ..
2882         } = &self.id().get(store).component().env_component().options[options];
2883 
2884         let invalid = ty != lift.ty
2885             || string_encoding != &lift.string_encoding
2886             || match data_model {
2887                 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2888                     Some(memory) => {
2889                         let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2890                         let actual = self.id().get(store).runtime_memory(memory);
2891                         expected != actual.as_ptr()
2892                     }
2893                     // Memory not specified, meaning it didn't need to be
2894                     // specified per validation, so not invalid.
2895                     None => false,
2896                 },
2897                 // Always invalid as this isn't supported.
2898                 CanonicalOptionsDataModel::Gc { .. } => true,
2899             };
2900 
2901         if invalid {
2902             bail!("invalid `task.return` signature and/or options for current task");
2903         }
2904 
2905         log::trace!("task.return for {guest_thread:?}");
2906 
2907         let result = (lift.lift)(store, storage)?;
2908         self.task_complete(store, guest_thread.task, result, Status::Returned)
2909     }
2910 
2911     /// Implements the `task.cancel` intrinsic.
2912     pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2913         let state = store.concurrent_state_mut();
2914         let guest_thread = state.unwrap_current_guest_thread();
2915         let task = state.get_mut(guest_thread.task)?;
2916         if !task.cancel_sent {
2917             bail!("`task.cancel` called by task which has not been cancelled")
2918         }
2919         _ = task.lift_result.take().ok_or_else(|| {
2920             format_err!("`task.return` or `task.cancel` called more than once for current task")
2921         })?;
2922 
2923         assert!(task.result.is_none());
2924 
2925         log::trace!("task.cancel for {guest_thread:?}");
2926 
2927         self.task_complete(
2928             store,
2929             guest_thread.task,
2930             Box::new(DummyResult),
2931             Status::ReturnCancelled,
2932         )
2933     }
2934 
2935     /// Complete the specified guest task (i.e. indicate that it has either
2936     /// returned a (possibly empty) result or cancelled itself).
2937     ///
2938     /// This will return any resource borrows and notify any current or future
2939     /// waiters that the task has completed.
2940     fn task_complete(
2941         self,
2942         store: &mut StoreOpaque,
2943         guest_task: TableId<GuestTask>,
2944         result: Box<dyn Any + Send + Sync>,
2945         status: Status,
2946     ) -> Result<()> {
2947         store
2948             .component_resource_tables(Some(self))
2949             .validate_scope_exit()?;
2950 
2951         let state = store.concurrent_state_mut();
2952         let task = state.get_mut(guest_task)?;
2953 
2954         if let Caller::Host { tx, .. } = &mut task.caller {
2955             if let Some(tx) = tx.take() {
2956                 _ = tx.send(result);
2957             }
2958         } else {
2959             task.result = Some(result);
2960             Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2961         }
2962 
2963         Ok(())
2964     }
2965 
2966     /// Implements the `waitable-set.new` intrinsic.
2967     pub(crate) fn waitable_set_new(
2968         self,
2969         store: &mut StoreOpaque,
2970         caller_instance: RuntimeComponentInstanceIndex,
2971     ) -> Result<u32> {
2972         let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2973         let handle = store
2974             .instance_state(RuntimeInstance {
2975                 instance: self.id().instance(),
2976                 index: caller_instance,
2977             })
2978             .handle_table()
2979             .waitable_set_insert(set.rep())?;
2980         log::trace!("new waitable set {set:?} (handle {handle})");
2981         Ok(handle)
2982     }
2983 
2984     /// Implements the `waitable-set.drop` intrinsic.
2985     pub(crate) fn waitable_set_drop(
2986         self,
2987         store: &mut StoreOpaque,
2988         caller_instance: RuntimeComponentInstanceIndex,
2989         set: u32,
2990     ) -> Result<()> {
2991         let rep = store
2992             .instance_state(RuntimeInstance {
2993                 instance: self.id().instance(),
2994                 index: caller_instance,
2995             })
2996             .handle_table()
2997             .waitable_set_remove(set)?;
2998 
2999         log::trace!("drop waitable set {rep} (handle {set})");
3000 
3001         let set = store
3002             .concurrent_state_mut()
3003             .delete(TableId::<WaitableSet>::new(rep))?;
3004 
3005         if !set.waiting.is_empty() {
3006             bail!("cannot drop waitable set with waiters");
3007         }
3008 
3009         Ok(())
3010     }
3011 
3012     /// Implements the `waitable.join` intrinsic.
3013     pub(crate) fn waitable_join(
3014         self,
3015         store: &mut StoreOpaque,
3016         caller_instance: RuntimeComponentInstanceIndex,
3017         waitable_handle: u32,
3018         set_handle: u32,
3019     ) -> Result<()> {
3020         let mut instance = self.id().get_mut(store);
3021         let waitable =
3022             Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3023 
3024         let set = if set_handle == 0 {
3025             None
3026         } else {
3027             let set = instance.instance_states().0[caller_instance]
3028                 .handle_table()
3029                 .waitable_set_rep(set_handle)?;
3030 
3031             Some(TableId::<WaitableSet>::new(set))
3032         };
3033 
3034         log::trace!(
3035             "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3036         );
3037 
3038         waitable.join(store.concurrent_state_mut(), set)
3039     }
3040 
3041     /// Implements the `subtask.drop` intrinsic.
3042     pub(crate) fn subtask_drop(
3043         self,
3044         store: &mut StoreOpaque,
3045         caller_instance: RuntimeComponentInstanceIndex,
3046         task_id: u32,
3047     ) -> Result<()> {
3048         self.waitable_join(store, caller_instance, task_id, 0)?;
3049 
3050         let (rep, is_host) = store
3051             .instance_state(RuntimeInstance {
3052                 instance: self.id().instance(),
3053                 index: caller_instance,
3054             })
3055             .handle_table()
3056             .subtask_remove(task_id)?;
3057 
3058         let concurrent_state = store.concurrent_state_mut();
3059         let (waitable, expected_caller, delete) = if is_host {
3060             let id = TableId::<HostTask>::new(rep);
3061             let task = concurrent_state.get_mut(id)?;
3062             if task.join_handle.is_some() {
3063                 bail!("cannot drop a subtask which has not yet resolved");
3064             }
3065             (Waitable::Host(id), task.caller, true)
3066         } else {
3067             let id = TableId::<GuestTask>::new(rep);
3068             let task = concurrent_state.get_mut(id)?;
3069             if task.lift_result.is_some() {
3070                 bail!("cannot drop a subtask which has not yet resolved");
3071             }
3072             if let Caller::Guest { thread } = task.caller {
3073                 (
3074                     Waitable::Guest(id),
3075                     thread,
3076                     concurrent_state.get_mut(id)?.exited,
3077                 )
3078             } else {
3079                 unreachable!()
3080             }
3081         };
3082 
3083         waitable.common(concurrent_state)?.handle = None;
3084 
3085         if waitable.take_event(concurrent_state)?.is_some() {
3086             bail!("cannot drop a subtask with an undelivered event");
3087         }
3088 
3089         if delete {
3090             waitable.delete_from(concurrent_state)?;
3091         }
3092 
3093         // Since waitables can neither be passed between instances nor forged,
3094         // this should never fail unless there's a bug in Wasmtime, but we check
3095         // here to be sure:
3096         assert_eq!(
3097             expected_caller,
3098             concurrent_state.unwrap_current_guest_thread(),
3099         );
3100         log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3101         Ok(())
3102     }
3103 
3104     /// Implements the `waitable-set.wait` intrinsic.
3105     pub(crate) fn waitable_set_wait(
3106         self,
3107         store: &mut StoreOpaque,
3108         options: OptionsIndex,
3109         set: u32,
3110         payload: u32,
3111     ) -> Result<u32> {
3112         if !self.options(store, options).async_ {
3113             // The caller may only call `waitable-set.wait` from an async task
3114             // (i.e. a task created via a call to an async export).
3115             // Otherwise, we'll trap.
3116             store.check_blocking()?;
3117         }
3118 
3119         let &CanonicalOptions {
3120             cancellable,
3121             instance: caller_instance,
3122             ..
3123         } = &self.id().get(store).component().env_component().options[options];
3124         let rep = store
3125             .instance_state(RuntimeInstance {
3126                 instance: self.id().instance(),
3127                 index: caller_instance,
3128             })
3129             .handle_table()
3130             .waitable_set_rep(set)?;
3131 
3132         self.waitable_check(
3133             store,
3134             cancellable,
3135             WaitableCheck::Wait,
3136             WaitableCheckParams {
3137                 set: TableId::new(rep),
3138                 options,
3139                 payload,
3140             },
3141         )
3142     }
3143 
3144     /// Implements the `waitable-set.poll` intrinsic.
3145     pub(crate) fn waitable_set_poll(
3146         self,
3147         store: &mut StoreOpaque,
3148         options: OptionsIndex,
3149         set: u32,
3150         payload: u32,
3151     ) -> Result<u32> {
3152         let &CanonicalOptions {
3153             cancellable,
3154             instance: caller_instance,
3155             ..
3156         } = &self.id().get(store).component().env_component().options[options];
3157         let rep = store
3158             .instance_state(RuntimeInstance {
3159                 instance: self.id().instance(),
3160                 index: caller_instance,
3161             })
3162             .handle_table()
3163             .waitable_set_rep(set)?;
3164 
3165         self.waitable_check(
3166             store,
3167             cancellable,
3168             WaitableCheck::Poll,
3169             WaitableCheckParams {
3170                 set: TableId::new(rep),
3171                 options,
3172                 payload,
3173             },
3174         )
3175     }
3176 
3177     /// Implements the `thread.index` intrinsic.
3178     pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3179         let thread_id = store
3180             .concurrent_state_mut()
3181             .unwrap_current_guest_thread()
3182             .thread;
3183         // The unwrap is safe because `instance_rep` must be `Some` by this point
3184         Ok(store
3185             .concurrent_state_mut()
3186             .get_mut(thread_id)?
3187             .instance_rep
3188             .unwrap())
3189     }
3190 
3191     /// Implements the `thread.new-indirect` intrinsic.
3192     pub(crate) fn thread_new_indirect<T: 'static>(
3193         self,
3194         mut store: StoreContextMut<T>,
3195         runtime_instance: RuntimeComponentInstanceIndex,
3196         _func_ty_idx: TypeFuncIndex, // currently unused
3197         start_func_table_idx: RuntimeTableIndex,
3198         start_func_idx: u32,
3199         context: i32,
3200     ) -> Result<u32> {
3201         log::trace!("creating new thread");
3202 
3203         let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3204         let (instance, registry) = self.id().get_mut_and_registry(store.0);
3205         let callee = instance
3206             .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3207             .ok_or_else(|| {
3208                 format_err!("the start function index points to an uninitialized function")
3209             })?;
3210         if callee.type_index(store.0) != start_func_ty.type_index() {
3211             bail!(
3212                 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3213             );
3214         }
3215 
3216         let token = StoreToken::new(store.as_context_mut());
3217         let start_func = Box::new(
3218             move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3219                 let old_thread = store.set_thread(guest_thread);
3220                 log::trace!(
3221                     "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3222                 );
3223 
3224                 let mut store = token.as_context_mut(store);
3225                 let mut params = [ValRaw::i32(context)];
3226                 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3227                 // on a separate fiber if we're running in an async store.
3228                 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3229 
3230                 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3231                 log::trace!("explicit thread {guest_thread:?} completed");
3232                 let state = store.0.concurrent_state_mut();
3233                 let task = state.get_mut(guest_thread.task)?;
3234                 if task.threads.is_empty() && !task.returned_or_cancelled() {
3235                     bail!(Trap::NoAsyncResult);
3236                 }
3237                 store.0.set_thread(old_thread);
3238                 let state = store.0.concurrent_state_mut();
3239                 old_thread
3240                     .guest()
3241                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3242                 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3243                     Waitable::Guest(guest_thread.task).delete_from(state)?;
3244                 }
3245                 log::trace!("thread start: restored {old_thread:?} as current thread");
3246 
3247                 Ok(())
3248             },
3249         );
3250 
3251         let state = store.0.concurrent_state_mut();
3252         let current_thread = state.unwrap_current_guest_thread();
3253         let parent_task = current_thread.task;
3254 
3255         let new_thread = GuestThread::new_explicit(parent_task, start_func);
3256         let thread_id = state.push(new_thread)?;
3257         state.get_mut(parent_task)?.threads.insert(thread_id);
3258 
3259         log::trace!("new thread with id {thread_id:?} created");
3260 
3261         self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3262     }
3263 
3264     pub(crate) fn resume_thread(
3265         self,
3266         store: &mut StoreOpaque,
3267         runtime_instance: RuntimeComponentInstanceIndex,
3268         thread_idx: u32,
3269         high_priority: bool,
3270         allow_ready: bool,
3271     ) -> Result<()> {
3272         let thread_id =
3273             GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3274         let state = store.concurrent_state_mut();
3275         let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3276         let thread = state.get_mut(guest_thread.thread)?;
3277 
3278         match mem::replace(&mut thread.state, GuestThreadState::Running) {
3279             GuestThreadState::NotStartedExplicit(start_func) => {
3280                 log::trace!("starting thread {guest_thread:?}");
3281                 let guest_call = WorkItem::GuestCall(
3282                     runtime_instance,
3283                     GuestCall {
3284                         thread: guest_thread,
3285                         kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3286                             start_func(store, guest_thread)
3287                         })),
3288                     },
3289                 );
3290                 store
3291                     .concurrent_state_mut()
3292                     .push_work_item(guest_call, high_priority);
3293             }
3294             GuestThreadState::Suspended(fiber) => {
3295                 log::trace!("resuming thread {thread_id:?} that was suspended");
3296                 store
3297                     .concurrent_state_mut()
3298                     .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3299             }
3300             GuestThreadState::Ready(fiber) if allow_ready => {
3301                 log::trace!("resuming thread {thread_id:?} that was ready");
3302                 thread.state = GuestThreadState::Ready(fiber);
3303                 store
3304                     .concurrent_state_mut()
3305                     .promote_thread_work_item(guest_thread);
3306             }
3307             other => {
3308                 thread.state = other;
3309                 bail!("cannot resume thread which is not suspended");
3310             }
3311         }
3312         Ok(())
3313     }
3314 
3315     fn add_guest_thread_to_instance_table(
3316         self,
3317         thread_id: TableId<GuestThread>,
3318         store: &mut StoreOpaque,
3319         runtime_instance: RuntimeComponentInstanceIndex,
3320     ) -> Result<u32> {
3321         let guest_id = store
3322             .instance_state(RuntimeInstance {
3323                 instance: self.id().instance(),
3324                 index: runtime_instance,
3325             })
3326             .thread_handle_table()
3327             .guest_thread_insert(thread_id.rep())?;
3328         store
3329             .concurrent_state_mut()
3330             .get_mut(thread_id)?
3331             .instance_rep = Some(guest_id);
3332         Ok(guest_id)
3333     }
3334 
3335     /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3336     /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3337     pub(crate) fn suspension_intrinsic(
3338         self,
3339         store: &mut StoreOpaque,
3340         caller: RuntimeComponentInstanceIndex,
3341         cancellable: bool,
3342         yielding: bool,
3343         to_thread: SuspensionTarget,
3344     ) -> Result<WaitResult> {
3345         let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3346         if to_thread.is_none() {
3347             let state = store.concurrent_state_mut();
3348             if yielding {
3349                 // This is a `thread.yield` call
3350                 if !state.may_block(guest_thread.task) {
3351                     // In a non-blocking context, a `thread.yield` may trigger
3352                     // other threads in the same component instance to run.
3353                     if !state.promote_instance_local_thread_work_item(caller) {
3354                         // No other threads are runnable, so just return
3355                         return Ok(WaitResult::Completed);
3356                     }
3357                 }
3358             } else {
3359                 // The caller may only call `thread.suspend` from an async task
3360                 // (i.e. a task created via a call to an async export).
3361                 // Otherwise, we'll trap.
3362                 store.check_blocking()?;
3363             }
3364         }
3365 
3366         // There could be a pending cancellation from a previous uncancellable wait
3367         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3368             return Ok(WaitResult::Cancelled);
3369         }
3370 
3371         match to_thread {
3372             SuspensionTarget::SomeSuspended(thread) => {
3373                 self.resume_thread(store, caller, thread, true, false)?
3374             }
3375             SuspensionTarget::Some(thread) => {
3376                 self.resume_thread(store, caller, thread, true, true)?
3377             }
3378             SuspensionTarget::None => { /* nothing to do */ }
3379         }
3380 
3381         let reason = if yielding {
3382             SuspendReason::Yielding {
3383                 thread: guest_thread,
3384                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3385                 // we're handling a `thread.yield-to-suspended` call; otherwise it would
3386                 // panic if we called it in a non-blocking context.
3387                 skip_may_block_check: to_thread.is_some(),
3388             }
3389         } else {
3390             SuspendReason::ExplicitlySuspending {
3391                 thread: guest_thread,
3392                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3393                 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3394                 // panic if we called it in a non-blocking context.
3395                 skip_may_block_check: to_thread.is_some(),
3396             }
3397         };
3398 
3399         store.suspend(reason)?;
3400 
3401         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3402             Ok(WaitResult::Cancelled)
3403         } else {
3404             Ok(WaitResult::Completed)
3405         }
3406     }
3407 
3408     /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3409     fn waitable_check(
3410         self,
3411         store: &mut StoreOpaque,
3412         cancellable: bool,
3413         check: WaitableCheck,
3414         params: WaitableCheckParams,
3415     ) -> Result<u32> {
3416         let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3417 
3418         log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3419 
3420         let state = store.concurrent_state_mut();
3421         let task = state.get_mut(guest_thread.task)?;
3422 
3423         // If we're waiting, and there are no events immediately available,
3424         // suspend the fiber until that changes.
3425         match &check {
3426             WaitableCheck::Wait => {
3427                 let set = params.set;
3428 
3429                 if (task.event.is_none()
3430                     || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3431                     && state.get_mut(set)?.ready.is_empty()
3432                 {
3433                     if cancellable {
3434                         let old = state
3435                             .get_mut(guest_thread.thread)?
3436                             .wake_on_cancel
3437                             .replace(set);
3438                         assert!(old.is_none());
3439                     }
3440 
3441                     store.suspend(SuspendReason::Waiting {
3442                         set,
3443                         thread: guest_thread,
3444                         skip_may_block_check: false,
3445                     })?;
3446                 }
3447             }
3448             WaitableCheck::Poll => {}
3449         }
3450 
3451         log::trace!(
3452             "waitable check for {guest_thread:?}; set {:?}, part two",
3453             params.set
3454         );
3455 
3456         // Deliver any pending events to the guest and return.
3457         let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3458 
3459         let (ordinal, handle, result) = match &check {
3460             WaitableCheck::Wait => {
3461                 let (event, waitable) = event.unwrap();
3462                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463                 let (ordinal, result) = event.parts();
3464                 (ordinal, handle, result)
3465             }
3466             WaitableCheck::Poll => {
3467                 if let Some((event, waitable)) = event {
3468                     let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3469                     let (ordinal, result) = event.parts();
3470                     (ordinal, handle, result)
3471                 } else {
3472                     log::trace!(
3473                         "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3474                         guest_thread.task,
3475                         params.set
3476                     );
3477                     let (ordinal, result) = Event::None.parts();
3478                     (ordinal, 0, result)
3479                 }
3480             }
3481         };
3482         let memory = self.options_memory_mut(store, params.options);
3483         let ptr = func::validate_inbounds_dynamic(
3484             &CanonicalAbiInfo::POINTER_PAIR,
3485             memory,
3486             &ValRaw::u32(params.payload),
3487         )?;
3488         memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3489         memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3490         Ok(ordinal)
3491     }
3492 
3493     /// Implements the `subtask.cancel` intrinsic.
3494     pub(crate) fn subtask_cancel(
3495         self,
3496         store: &mut StoreOpaque,
3497         caller_instance: RuntimeComponentInstanceIndex,
3498         async_: bool,
3499         task_id: u32,
3500     ) -> Result<u32> {
3501         if !async_ {
3502             // The caller may only sync call `subtask.cancel` from an async task
3503             // (i.e. a task created via a call to an async export).  Otherwise,
3504             // we'll trap.
3505             store.check_blocking()?;
3506         }
3507 
3508         let (rep, is_host) = store
3509             .instance_state(RuntimeInstance {
3510                 instance: self.id().instance(),
3511                 index: caller_instance,
3512             })
3513             .handle_table()
3514             .subtask_rep(task_id)?;
3515         let (waitable, expected_caller) = if is_host {
3516             let id = TableId::<HostTask>::new(rep);
3517             (
3518                 Waitable::Host(id),
3519                 store.concurrent_state_mut().get_mut(id)?.caller,
3520             )
3521         } else {
3522             let id = TableId::<GuestTask>::new(rep);
3523             if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3524                 (Waitable::Guest(id), thread)
3525             } else {
3526                 unreachable!()
3527             }
3528         };
3529         // Since waitables can neither be passed between instances nor forged,
3530         // this should never fail unless there's a bug in Wasmtime, but we check
3531         // here to be sure:
3532         let concurrent_state = store.concurrent_state_mut();
3533         assert_eq!(
3534             expected_caller,
3535             concurrent_state.unwrap_current_guest_thread(),
3536         );
3537 
3538         log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3539 
3540         if let Waitable::Host(host_task) = waitable {
3541             if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3542                 handle.abort();
3543                 return Ok(Status::ReturnCancelled as u32);
3544             }
3545         } else {
3546             let caller = concurrent_state.unwrap_current_guest_thread();
3547             let guest_task = TableId::<GuestTask>::new(rep);
3548             let task = concurrent_state.get_mut(guest_task)?;
3549             if !task.already_lowered_parameters() {
3550                 // The task is in a `starting` state, meaning it hasn't run at
3551                 // all yet.  Here we update its fields to indicate that it is
3552                 // ready to delete immediately once `subtask.drop` is called.
3553                 task.lower_params = None;
3554                 task.lift_result = None;
3555                 task.exited = true;
3556 
3557                 let instance = task.instance;
3558 
3559                 assert_eq!(1, task.threads.len());
3560                 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3561                 let concurrent_state = store.concurrent_state_mut();
3562                 concurrent_state.delete(thread)?;
3563                 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3564 
3565                 // Not yet started; cancel and remove from pending
3566                 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3567                 let pending_count = pending.len();
3568                 pending.retain(|thread, _| thread.task != guest_task);
3569                 // If there were no pending threads for this task, we're in an error state
3570                 if pending.len() == pending_count {
3571                     bail!("`subtask.cancel` called after terminal status delivered");
3572                 }
3573                 return Ok(Status::StartCancelled as u32);
3574             } else if !task.returned_or_cancelled() {
3575                 // Started, but not yet returned or cancelled; send the
3576                 // `CANCELLED` event
3577                 task.cancel_sent = true;
3578                 // Note that this might overwrite an event that was set earlier
3579                 // (e.g. `Event::None` if the task is yielding, or
3580                 // `Event::Cancelled` if it was already cancelled), but that's
3581                 // okay -- this should supersede the previous state.
3582                 task.event = Some(Event::Cancelled);
3583                 let runtime_instance = task.instance.index;
3584                 for thread in task.threads.clone() {
3585                     let thread = QualifiedThreadId {
3586                         task: guest_task,
3587                         thread,
3588                     };
3589                     if let Some(set) = concurrent_state
3590                         .get_mut(thread.thread)
3591                         .unwrap()
3592                         .wake_on_cancel
3593                         .take()
3594                     {
3595                         let item = match concurrent_state
3596                             .get_mut(set)?
3597                             .waiting
3598                             .remove(&thread)
3599                             .unwrap()
3600                         {
3601                             WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3602                             WaitMode::Callback(instance) => WorkItem::GuestCall(
3603                                 runtime_instance,
3604                                 GuestCall {
3605                                     thread,
3606                                     kind: GuestCallKind::DeliverEvent {
3607                                         instance,
3608                                         set: None,
3609                                     },
3610                                 },
3611                             ),
3612                         };
3613                         concurrent_state.push_high_priority(item);
3614 
3615                         store.suspend(SuspendReason::Yielding {
3616                             thread: caller,
3617                             // `subtask.cancel` is not allowed to be called in a
3618                             // sync context, so we cannot skip the may-block check.
3619                             skip_may_block_check: false,
3620                         })?;
3621                         break;
3622                     }
3623                 }
3624 
3625                 let concurrent_state = store.concurrent_state_mut();
3626                 let task = concurrent_state.get_mut(guest_task)?;
3627                 if !task.returned_or_cancelled() {
3628                     if async_ {
3629                         return Ok(BLOCKED);
3630                     } else {
3631                         store.wait_for_event(Waitable::Guest(guest_task))?;
3632                     }
3633                 }
3634             }
3635         }
3636 
3637         let event = waitable.take_event(store.concurrent_state_mut())?;
3638         if let Some(Event::Subtask {
3639             status: status @ (Status::Returned | Status::ReturnCancelled),
3640         }) = event
3641         {
3642             Ok(status as u32)
3643         } else {
3644             bail!("`subtask.cancel` called after terminal status delivered");
3645         }
3646     }
3647 
3648     pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3649         store.concurrent_state_mut().context_get(slot)
3650     }
3651 
3652     pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3653         store.concurrent_state_mut().context_set(slot, value)
3654     }
3655 }
3656 
3657 /// Trait representing component model ABI async intrinsics and fused adapter
3658 /// helper functions.
3659 ///
3660 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3661 /// which must be valid for at least the duration of the call (and possibly for
3662 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3663 /// pointers used for async calls).
3664 pub trait VMComponentAsyncStore {
3665     /// A helper function for fused adapter modules involving calls where the
3666     /// one of the caller or callee is async.
3667     ///
3668     /// This helper is not used when the caller and callee both use the sync
3669     /// ABI, only when at least one is async is this used.
3670     unsafe fn prepare_call(
3671         &mut self,
3672         instance: Instance,
3673         memory: *mut VMMemoryDefinition,
3674         start: *mut VMFuncRef,
3675         return_: *mut VMFuncRef,
3676         caller_instance: RuntimeComponentInstanceIndex,
3677         callee_instance: RuntimeComponentInstanceIndex,
3678         task_return_type: TypeTupleIndex,
3679         callee_async: bool,
3680         string_encoding: u8,
3681         result_count: u32,
3682         storage: *mut ValRaw,
3683         storage_len: usize,
3684     ) -> Result<()>;
3685 
3686     /// A helper function for fused adapter modules involving calls where the
3687     /// caller is sync-lowered but the callee is async-lifted.
3688     unsafe fn sync_start(
3689         &mut self,
3690         instance: Instance,
3691         callback: *mut VMFuncRef,
3692         callee: *mut VMFuncRef,
3693         param_count: u32,
3694         storage: *mut MaybeUninit<ValRaw>,
3695         storage_len: usize,
3696     ) -> Result<()>;
3697 
3698     /// A helper function for fused adapter modules involving calls where the
3699     /// caller is async-lowered.
3700     unsafe fn async_start(
3701         &mut self,
3702         instance: Instance,
3703         callback: *mut VMFuncRef,
3704         post_return: *mut VMFuncRef,
3705         callee: *mut VMFuncRef,
3706         param_count: u32,
3707         result_count: u32,
3708         flags: u32,
3709     ) -> Result<u32>;
3710 
3711     /// The `future.write` intrinsic.
3712     fn future_write(
3713         &mut self,
3714         instance: Instance,
3715         caller: RuntimeComponentInstanceIndex,
3716         ty: TypeFutureTableIndex,
3717         options: OptionsIndex,
3718         future: u32,
3719         address: u32,
3720     ) -> Result<u32>;
3721 
3722     /// The `future.read` intrinsic.
3723     fn future_read(
3724         &mut self,
3725         instance: Instance,
3726         caller: RuntimeComponentInstanceIndex,
3727         ty: TypeFutureTableIndex,
3728         options: OptionsIndex,
3729         future: u32,
3730         address: u32,
3731     ) -> Result<u32>;
3732 
3733     /// The `future.drop-writable` intrinsic.
3734     fn future_drop_writable(
3735         &mut self,
3736         instance: Instance,
3737         ty: TypeFutureTableIndex,
3738         writer: u32,
3739     ) -> Result<()>;
3740 
3741     /// The `stream.write` intrinsic.
3742     fn stream_write(
3743         &mut self,
3744         instance: Instance,
3745         caller: RuntimeComponentInstanceIndex,
3746         ty: TypeStreamTableIndex,
3747         options: OptionsIndex,
3748         stream: u32,
3749         address: u32,
3750         count: u32,
3751     ) -> Result<u32>;
3752 
3753     /// The `stream.read` intrinsic.
3754     fn stream_read(
3755         &mut self,
3756         instance: Instance,
3757         caller: RuntimeComponentInstanceIndex,
3758         ty: TypeStreamTableIndex,
3759         options: OptionsIndex,
3760         stream: u32,
3761         address: u32,
3762         count: u32,
3763     ) -> Result<u32>;
3764 
3765     /// The "fast-path" implementation of the `stream.write` intrinsic for
3766     /// "flat" (i.e. memcpy-able) payloads.
3767     fn flat_stream_write(
3768         &mut self,
3769         instance: Instance,
3770         caller: RuntimeComponentInstanceIndex,
3771         ty: TypeStreamTableIndex,
3772         options: OptionsIndex,
3773         payload_size: u32,
3774         payload_align: u32,
3775         stream: u32,
3776         address: u32,
3777         count: u32,
3778     ) -> Result<u32>;
3779 
3780     /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3781     /// (i.e. memcpy-able) payloads.
3782     fn flat_stream_read(
3783         &mut self,
3784         instance: Instance,
3785         caller: RuntimeComponentInstanceIndex,
3786         ty: TypeStreamTableIndex,
3787         options: OptionsIndex,
3788         payload_size: u32,
3789         payload_align: u32,
3790         stream: u32,
3791         address: u32,
3792         count: u32,
3793     ) -> Result<u32>;
3794 
3795     /// The `stream.drop-writable` intrinsic.
3796     fn stream_drop_writable(
3797         &mut self,
3798         instance: Instance,
3799         ty: TypeStreamTableIndex,
3800         writer: u32,
3801     ) -> Result<()>;
3802 
3803     /// The `error-context.debug-message` intrinsic.
3804     fn error_context_debug_message(
3805         &mut self,
3806         instance: Instance,
3807         ty: TypeComponentLocalErrorContextTableIndex,
3808         options: OptionsIndex,
3809         err_ctx_handle: u32,
3810         debug_msg_address: u32,
3811     ) -> Result<()>;
3812 
3813     /// The `thread.new-indirect` intrinsic
3814     fn thread_new_indirect(
3815         &mut self,
3816         instance: Instance,
3817         caller: RuntimeComponentInstanceIndex,
3818         func_ty_idx: TypeFuncIndex,
3819         start_func_table_idx: RuntimeTableIndex,
3820         start_func_idx: u32,
3821         context: i32,
3822     ) -> Result<u32>;
3823 }
3824 
3825 /// SAFETY: See trait docs.
3826 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3827     unsafe fn prepare_call(
3828         &mut self,
3829         instance: Instance,
3830         memory: *mut VMMemoryDefinition,
3831         start: *mut VMFuncRef,
3832         return_: *mut VMFuncRef,
3833         caller_instance: RuntimeComponentInstanceIndex,
3834         callee_instance: RuntimeComponentInstanceIndex,
3835         task_return_type: TypeTupleIndex,
3836         callee_async: bool,
3837         string_encoding: u8,
3838         result_count_or_max_if_async: u32,
3839         storage: *mut ValRaw,
3840         storage_len: usize,
3841     ) -> Result<()> {
3842         // SAFETY: The `wasmtime_cranelift`-generated code that calls
3843         // this method will have ensured that `storage` is a valid
3844         // pointer containing at least `storage_len` items.
3845         let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3846 
3847         unsafe {
3848             instance.prepare_call(
3849                 StoreContextMut(self),
3850                 start,
3851                 return_,
3852                 caller_instance,
3853                 callee_instance,
3854                 task_return_type,
3855                 callee_async,
3856                 memory,
3857                 string_encoding,
3858                 match result_count_or_max_if_async {
3859                     PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3860                         params,
3861                         has_result: false,
3862                     },
3863                     PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3864                         params,
3865                         has_result: true,
3866                     },
3867                     result_count => CallerInfo::Sync {
3868                         params,
3869                         result_count,
3870                     },
3871                 },
3872             )
3873         }
3874     }
3875 
3876     unsafe fn sync_start(
3877         &mut self,
3878         instance: Instance,
3879         callback: *mut VMFuncRef,
3880         callee: *mut VMFuncRef,
3881         param_count: u32,
3882         storage: *mut MaybeUninit<ValRaw>,
3883         storage_len: usize,
3884     ) -> Result<()> {
3885         unsafe {
3886             instance
3887                 .start_call(
3888                     StoreContextMut(self),
3889                     callback,
3890                     ptr::null_mut(),
3891                     callee,
3892                     param_count,
3893                     1,
3894                     START_FLAG_ASYNC_CALLEE,
3895                     // SAFETY: The `wasmtime_cranelift`-generated code that calls
3896                     // this method will have ensured that `storage` is a valid
3897                     // pointer containing at least `storage_len` items.
3898                     Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3899                 )
3900                 .map(drop)
3901         }
3902     }
3903 
3904     unsafe fn async_start(
3905         &mut self,
3906         instance: Instance,
3907         callback: *mut VMFuncRef,
3908         post_return: *mut VMFuncRef,
3909         callee: *mut VMFuncRef,
3910         param_count: u32,
3911         result_count: u32,
3912         flags: u32,
3913     ) -> Result<u32> {
3914         unsafe {
3915             instance.start_call(
3916                 StoreContextMut(self),
3917                 callback,
3918                 post_return,
3919                 callee,
3920                 param_count,
3921                 result_count,
3922                 flags,
3923                 None,
3924             )
3925         }
3926     }
3927 
3928     fn future_write(
3929         &mut self,
3930         instance: Instance,
3931         caller: RuntimeComponentInstanceIndex,
3932         ty: TypeFutureTableIndex,
3933         options: OptionsIndex,
3934         future: u32,
3935         address: u32,
3936     ) -> Result<u32> {
3937         instance
3938             .guest_write(
3939                 StoreContextMut(self),
3940                 caller,
3941                 TransmitIndex::Future(ty),
3942                 options,
3943                 None,
3944                 future,
3945                 address,
3946                 1,
3947             )
3948             .map(|result| result.encode())
3949     }
3950 
3951     fn future_read(
3952         &mut self,
3953         instance: Instance,
3954         caller: RuntimeComponentInstanceIndex,
3955         ty: TypeFutureTableIndex,
3956         options: OptionsIndex,
3957         future: u32,
3958         address: u32,
3959     ) -> Result<u32> {
3960         instance
3961             .guest_read(
3962                 StoreContextMut(self),
3963                 caller,
3964                 TransmitIndex::Future(ty),
3965                 options,
3966                 None,
3967                 future,
3968                 address,
3969                 1,
3970             )
3971             .map(|result| result.encode())
3972     }
3973 
3974     fn stream_write(
3975         &mut self,
3976         instance: Instance,
3977         caller: RuntimeComponentInstanceIndex,
3978         ty: TypeStreamTableIndex,
3979         options: OptionsIndex,
3980         stream: u32,
3981         address: u32,
3982         count: u32,
3983     ) -> Result<u32> {
3984         instance
3985             .guest_write(
3986                 StoreContextMut(self),
3987                 caller,
3988                 TransmitIndex::Stream(ty),
3989                 options,
3990                 None,
3991                 stream,
3992                 address,
3993                 count,
3994             )
3995             .map(|result| result.encode())
3996     }
3997 
3998     fn stream_read(
3999         &mut self,
4000         instance: Instance,
4001         caller: RuntimeComponentInstanceIndex,
4002         ty: TypeStreamTableIndex,
4003         options: OptionsIndex,
4004         stream: u32,
4005         address: u32,
4006         count: u32,
4007     ) -> Result<u32> {
4008         instance
4009             .guest_read(
4010                 StoreContextMut(self),
4011                 caller,
4012                 TransmitIndex::Stream(ty),
4013                 options,
4014                 None,
4015                 stream,
4016                 address,
4017                 count,
4018             )
4019             .map(|result| result.encode())
4020     }
4021 
4022     fn future_drop_writable(
4023         &mut self,
4024         instance: Instance,
4025         ty: TypeFutureTableIndex,
4026         writer: u32,
4027     ) -> Result<()> {
4028         instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4029     }
4030 
4031     fn flat_stream_write(
4032         &mut self,
4033         instance: Instance,
4034         caller: RuntimeComponentInstanceIndex,
4035         ty: TypeStreamTableIndex,
4036         options: OptionsIndex,
4037         payload_size: u32,
4038         payload_align: u32,
4039         stream: u32,
4040         address: u32,
4041         count: u32,
4042     ) -> Result<u32> {
4043         instance
4044             .guest_write(
4045                 StoreContextMut(self),
4046                 caller,
4047                 TransmitIndex::Stream(ty),
4048                 options,
4049                 Some(FlatAbi {
4050                     size: payload_size,
4051                     align: payload_align,
4052                 }),
4053                 stream,
4054                 address,
4055                 count,
4056             )
4057             .map(|result| result.encode())
4058     }
4059 
4060     fn flat_stream_read(
4061         &mut self,
4062         instance: Instance,
4063         caller: RuntimeComponentInstanceIndex,
4064         ty: TypeStreamTableIndex,
4065         options: OptionsIndex,
4066         payload_size: u32,
4067         payload_align: u32,
4068         stream: u32,
4069         address: u32,
4070         count: u32,
4071     ) -> Result<u32> {
4072         instance
4073             .guest_read(
4074                 StoreContextMut(self),
4075                 caller,
4076                 TransmitIndex::Stream(ty),
4077                 options,
4078                 Some(FlatAbi {
4079                     size: payload_size,
4080                     align: payload_align,
4081                 }),
4082                 stream,
4083                 address,
4084                 count,
4085             )
4086             .map(|result| result.encode())
4087     }
4088 
4089     fn stream_drop_writable(
4090         &mut self,
4091         instance: Instance,
4092         ty: TypeStreamTableIndex,
4093         writer: u32,
4094     ) -> Result<()> {
4095         instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4096     }
4097 
4098     fn error_context_debug_message(
4099         &mut self,
4100         instance: Instance,
4101         ty: TypeComponentLocalErrorContextTableIndex,
4102         options: OptionsIndex,
4103         err_ctx_handle: u32,
4104         debug_msg_address: u32,
4105     ) -> Result<()> {
4106         instance.error_context_debug_message(
4107             StoreContextMut(self),
4108             ty,
4109             options,
4110             err_ctx_handle,
4111             debug_msg_address,
4112         )
4113     }
4114 
4115     fn thread_new_indirect(
4116         &mut self,
4117         instance: Instance,
4118         caller: RuntimeComponentInstanceIndex,
4119         func_ty_idx: TypeFuncIndex,
4120         start_func_table_idx: RuntimeTableIndex,
4121         start_func_idx: u32,
4122         context: i32,
4123     ) -> Result<u32> {
4124         instance.thread_new_indirect(
4125             StoreContextMut(self),
4126             caller,
4127             func_ty_idx,
4128             start_func_table_idx,
4129             start_func_idx,
4130             context,
4131         )
4132     }
4133 }
4134 
4135 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4136 
4137 /// Represents the state of a pending host task.
4138 ///
4139 /// This is used to represent tasks when the guest calls into the host.
4140 struct HostTask {
4141     common: WaitableCommon,
4142 
4143     /// Guest thread which called the host.
4144     caller: QualifiedThreadId,
4145 
4146     /// State of borrows/etc the host needs to track. Used when the guest passes
4147     /// borrows to the host, for example.
4148     call_context: CallContext,
4149 
4150     /// For host tasks which end up doing some asynchronous work (e.g.
4151     /// async-lowered and didn't complete on the first poll) this handle is used
4152     /// as a signal to cancel the future as it resides in the store's
4153     /// `FuturesUnordered`.
4154     join_handle: Option<JoinHandle>,
4155 
4156     /// Box<Any> of the result of this host task.
4157     result: Option<LiftedResult>,
4158 }
4159 
4160 impl HostTask {
4161     fn new(caller: QualifiedThreadId) -> Self {
4162         Self {
4163             common: WaitableCommon::default(),
4164             call_context: CallContext::default(),
4165             caller,
4166             join_handle: None,
4167             result: None,
4168         }
4169     }
4170 }
4171 
4172 impl TableDebug for HostTask {
4173     fn type_name() -> &'static str {
4174         "HostTask"
4175     }
4176 }
4177 
4178 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4179 
4180 /// Represents the caller of a given guest task.
4181 enum Caller {
4182     /// The host called the guest task.
4183     Host {
4184         /// If present, may be used to deliver the result.
4185         tx: Option<oneshot::Sender<LiftedResult>>,
4186         /// Channel to notify once all subtasks spawned by this caller have
4187         /// completed.
4188         ///
4189         /// Note that we'll never actually send anything to this channel;
4190         /// dropping it when the refcount goes to zero is sufficient to notify
4191         /// the receiver.
4192         exit_tx: Arc<oneshot::Sender<()>>,
4193         /// If true, there's a host future that must be dropped before the task
4194         /// can be deleted.
4195         host_future_present: bool,
4196         /// Represents the caller of the host function which called back into a
4197         /// guest. Note that this thread could belong to an entirely unrelated
4198         /// top-level component instance than the one the host called into.
4199         caller: CurrentThread,
4200     },
4201     /// Another guest thread called the guest task
4202     Guest {
4203         /// The id of the caller
4204         thread: QualifiedThreadId,
4205     },
4206 }
4207 
4208 /// Represents a closure and related canonical ABI parameters required to
4209 /// validate a `task.return` call at runtime and lift the result.
4210 struct LiftResult {
4211     lift: RawLift,
4212     ty: TypeTupleIndex,
4213     memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4214     string_encoding: StringEncoding,
4215 }
4216 
4217 /// The table ID for a guest thread, qualified by the task to which it belongs.
4218 ///
4219 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4220 /// for the common case of identifying the task to which a thread belongs.
4221 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4222 struct QualifiedThreadId {
4223     task: TableId<GuestTask>,
4224     thread: TableId<GuestThread>,
4225 }
4226 
4227 impl QualifiedThreadId {
4228     fn qualify(
4229         state: &mut ConcurrentState,
4230         thread: TableId<GuestThread>,
4231     ) -> Result<QualifiedThreadId> {
4232         Ok(QualifiedThreadId {
4233             task: state.get_mut(thread)?.parent_task,
4234             thread,
4235         })
4236     }
4237 }
4238 
4239 impl fmt::Debug for QualifiedThreadId {
4240     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4241         f.debug_tuple("QualifiedThreadId")
4242             .field(&self.task.rep())
4243             .field(&self.thread.rep())
4244             .finish()
4245     }
4246 }
4247 
4248 enum GuestThreadState {
4249     NotStartedImplicit,
4250     NotStartedExplicit(
4251         Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4252     ),
4253     Running,
4254     Suspended(StoreFiber<'static>),
4255     Ready(StoreFiber<'static>),
4256     Completed,
4257 }
4258 pub struct GuestThread {
4259     /// Context-local state used to implement the `context.{get,set}`
4260     /// intrinsics.
4261     context: [u32; 2],
4262     /// The owning guest task.
4263     parent_task: TableId<GuestTask>,
4264     /// If present, indicates that the thread is currently waiting on the
4265     /// specified set but may be cancelled and woken immediately.
4266     wake_on_cancel: Option<TableId<WaitableSet>>,
4267     /// The execution state of this guest thread
4268     state: GuestThreadState,
4269     /// The index of this thread in the component instance's handle table.
4270     /// This must always be `Some` after initialization.
4271     instance_rep: Option<u32>,
4272 }
4273 
4274 impl GuestThread {
4275     /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4276     /// handle.
4277     fn from_instance(
4278         state: Pin<&mut ComponentInstance>,
4279         caller_instance: RuntimeComponentInstanceIndex,
4280         guest_thread: u32,
4281     ) -> Result<TableId<Self>> {
4282         let rep = state.instance_states().0[caller_instance]
4283             .thread_handle_table()
4284             .guest_thread_rep(guest_thread)?;
4285         Ok(TableId::new(rep))
4286     }
4287 
4288     fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4289         Self {
4290             context: [0; 2],
4291             parent_task,
4292             wake_on_cancel: None,
4293             state: GuestThreadState::NotStartedImplicit,
4294             instance_rep: None,
4295         }
4296     }
4297 
4298     fn new_explicit(
4299         parent_task: TableId<GuestTask>,
4300         start_func: Box<
4301             dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4302         >,
4303     ) -> Self {
4304         Self {
4305             context: [0; 2],
4306             parent_task,
4307             wake_on_cancel: None,
4308             state: GuestThreadState::NotStartedExplicit(start_func),
4309             instance_rep: None,
4310         }
4311     }
4312 }
4313 
4314 impl TableDebug for GuestThread {
4315     fn type_name() -> &'static str {
4316         "GuestThread"
4317     }
4318 }
4319 
4320 enum SyncResult {
4321     NotProduced,
4322     Produced(Option<ValRaw>),
4323     Taken,
4324 }
4325 
4326 impl SyncResult {
4327     fn take(&mut self) -> Option<Option<ValRaw>> {
4328         match mem::replace(self, SyncResult::Taken) {
4329             SyncResult::NotProduced => None,
4330             SyncResult::Produced(val) => Some(val),
4331             SyncResult::Taken => {
4332                 panic!("attempted to take a synchronous result that was already taken")
4333             }
4334         }
4335     }
4336 }
4337 
4338 #[derive(Debug)]
4339 enum HostFutureState {
4340     NotApplicable,
4341     Live,
4342     Dropped,
4343 }
4344 
4345 /// Represents a pending guest task.
4346 pub(crate) struct GuestTask {
4347     /// See `WaitableCommon`
4348     common: WaitableCommon,
4349     /// Closure to lower the parameters passed to this task.
4350     lower_params: Option<RawLower>,
4351     /// See `LiftResult`
4352     lift_result: Option<LiftResult>,
4353     /// A place to stash the type-erased lifted result if it can't be delivered
4354     /// immediately.
4355     result: Option<LiftedResult>,
4356     /// Closure to call the callback function for an async-lifted export, if
4357     /// provided.
4358     callback: Option<CallbackFn>,
4359     /// See `Caller`
4360     caller: Caller,
4361     /// Borrow state for this task.
4362     ///
4363     /// Keeps track of `borrow<T>` received to this task to ensure that
4364     /// everything is dropped by the time it exits.
4365     call_context: CallContext,
4366     /// A place to stash the lowered result for a sync-to-async call until it
4367     /// can be returned to the caller.
4368     sync_result: SyncResult,
4369     /// Whether or not the task has been cancelled (i.e. whether the task is
4370     /// permitted to call `task.cancel`).
4371     cancel_sent: bool,
4372     /// Whether or not we've sent a `Status::Starting` event to any current or
4373     /// future waiters for this waitable.
4374     starting_sent: bool,
4375     /// Pending guest subtasks created by this task (directly or indirectly).
4376     ///
4377     /// This is used to re-parent subtasks which are still running when their
4378     /// parent task is disposed.
4379     subtasks: HashSet<TableId<GuestTask>>,
4380     /// Scratch waitable set used to watch subtasks during synchronous calls.
4381     sync_call_set: TableId<WaitableSet>,
4382     /// The runtime instance to which the exported function for this guest task
4383     /// belongs.
4384     ///
4385     /// Note that the task may do a sync->sync call via a fused adapter which
4386     /// results in that task executing code in a different instance, and it may
4387     /// call host functions and intrinsics from that other instance.
4388     instance: RuntimeInstance,
4389     /// If present, a pending `Event::None` or `Event::Cancelled` to be
4390     /// delivered to this task.
4391     event: Option<Event>,
4392     /// Whether or not the task has exited.
4393     exited: bool,
4394     /// Threads belonging to this task
4395     threads: HashSet<TableId<GuestThread>>,
4396     /// The state of the host future that represents an async task, which must
4397     /// be dropped before we can delete the task.
4398     host_future_state: HostFutureState,
4399     /// Indicates whether this task was created for a call to an async-lifted
4400     /// export.
4401     async_function: bool,
4402 }
4403 
4404 impl GuestTask {
4405     fn already_lowered_parameters(&self) -> bool {
4406         // We reset `lower_params` after we lower the parameters
4407         self.lower_params.is_none()
4408     }
4409 
4410     fn returned_or_cancelled(&self) -> bool {
4411         // We reset `lift_result` after we return or exit
4412         self.lift_result.is_none()
4413     }
4414 
4415     fn ready_to_delete(&self) -> bool {
4416         let threads_completed = self.threads.is_empty();
4417         let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4418         let pending_completion_event = matches!(
4419             self.common.event,
4420             Some(Event::Subtask {
4421                 status: Status::Returned | Status::ReturnCancelled
4422             })
4423         );
4424         let ready = threads_completed
4425             && !has_sync_result
4426             && !pending_completion_event
4427             && !matches!(self.host_future_state, HostFutureState::Live);
4428         log::trace!(
4429             "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4430             threads_completed,
4431             has_sync_result,
4432             pending_completion_event,
4433             self.host_future_state
4434         );
4435         ready
4436     }
4437 
4438     fn new(
4439         state: &mut ConcurrentState,
4440         lower_params: RawLower,
4441         lift_result: LiftResult,
4442         caller: Caller,
4443         callback: Option<CallbackFn>,
4444         instance: RuntimeInstance,
4445         async_function: bool,
4446     ) -> Result<Self> {
4447         let sync_call_set = state.push(WaitableSet::default())?;
4448         let host_future_state = match &caller {
4449             Caller::Guest { .. } => HostFutureState::NotApplicable,
4450             Caller::Host {
4451                 host_future_present,
4452                 ..
4453             } => {
4454                 if *host_future_present {
4455                     HostFutureState::Live
4456                 } else {
4457                     HostFutureState::NotApplicable
4458                 }
4459             }
4460         };
4461         Ok(Self {
4462             common: WaitableCommon::default(),
4463             lower_params: Some(lower_params),
4464             lift_result: Some(lift_result),
4465             result: None,
4466             callback,
4467             caller,
4468             call_context: CallContext::default(),
4469             sync_result: SyncResult::NotProduced,
4470             cancel_sent: false,
4471             starting_sent: false,
4472             subtasks: HashSet::new(),
4473             sync_call_set,
4474             instance,
4475             event: None,
4476             exited: false,
4477             threads: HashSet::new(),
4478             host_future_state,
4479             async_function,
4480         })
4481     }
4482 
4483     /// Dispose of this guest task, reparenting any pending subtasks to the
4484     /// caller.
4485     fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4486         // If there are not-yet-delivered completion events for subtasks in
4487         // `self.sync_call_set`, recursively dispose of those subtasks as well.
4488         for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4489             if let Some(Event::Subtask {
4490                 status: Status::Returned | Status::ReturnCancelled,
4491             }) = waitable.common(state)?.event
4492             {
4493                 waitable.delete_from(state)?;
4494             }
4495         }
4496 
4497         assert!(self.threads.is_empty());
4498 
4499         state.delete(self.sync_call_set)?;
4500 
4501         // Reparent any pending subtasks to the caller.
4502         match &self.caller {
4503             Caller::Guest { thread } => {
4504                 let task_mut = state.get_mut(thread.task)?;
4505                 let present = task_mut.subtasks.remove(&me);
4506                 assert!(present);
4507 
4508                 for subtask in &self.subtasks {
4509                     task_mut.subtasks.insert(*subtask);
4510                 }
4511 
4512                 for subtask in &self.subtasks {
4513                     state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread };
4514                 }
4515             }
4516             Caller::Host {
4517                 exit_tx, caller, ..
4518             } => {
4519                 for subtask in &self.subtasks {
4520                     state.get_mut(*subtask)?.caller = Caller::Host {
4521                         tx: None,
4522                         // Clone `exit_tx` to ensure that it is only dropped
4523                         // once all transitive subtasks of the host call have
4524                         // exited:
4525                         exit_tx: exit_tx.clone(),
4526                         host_future_present: false,
4527                         caller: *caller,
4528                     };
4529                 }
4530             }
4531         }
4532 
4533         for subtask in self.subtasks {
4534             let task = state.get_mut(subtask)?;
4535             if task.exited && task.ready_to_delete() {
4536                 Waitable::Guest(subtask).delete_from(state)?;
4537             }
4538         }
4539 
4540         Ok(())
4541     }
4542 }
4543 
4544 impl TableDebug for GuestTask {
4545     fn type_name() -> &'static str {
4546         "GuestTask"
4547     }
4548 }
4549 
4550 /// Represents state common to all kinds of waitables.
4551 #[derive(Default)]
4552 struct WaitableCommon {
4553     /// The currently pending event for this waitable, if any.
4554     event: Option<Event>,
4555     /// The set to which this waitable belongs, if any.
4556     set: Option<TableId<WaitableSet>>,
4557     /// The handle with which the guest refers to this waitable, if any.
4558     handle: Option<u32>,
4559 }
4560 
4561 /// Represents a Component Model Async `waitable`.
4562 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4563 enum Waitable {
4564     /// A host task
4565     Host(TableId<HostTask>),
4566     /// A guest task
4567     Guest(TableId<GuestTask>),
4568     /// The read or write end of a stream or future
4569     Transmit(TableId<TransmitHandle>),
4570 }
4571 
4572 impl Waitable {
4573     /// Retrieve the `Waitable` corresponding to the specified guest-visible
4574     /// handle.
4575     fn from_instance(
4576         state: Pin<&mut ComponentInstance>,
4577         caller_instance: RuntimeComponentInstanceIndex,
4578         waitable: u32,
4579     ) -> Result<Self> {
4580         use crate::runtime::vm::component::Waitable;
4581 
4582         let (waitable, kind) = state.instance_states().0[caller_instance]
4583             .handle_table()
4584             .waitable_rep(waitable)?;
4585 
4586         Ok(match kind {
4587             Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4588             Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4589             Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4590         })
4591     }
4592 
4593     /// Retrieve the host-visible identifier for this `Waitable`.
4594     fn rep(&self) -> u32 {
4595         match self {
4596             Self::Host(id) => id.rep(),
4597             Self::Guest(id) => id.rep(),
4598             Self::Transmit(id) => id.rep(),
4599         }
4600     }
4601 
4602     /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4603     /// remove it from any set it may currently belong to (when `set` is
4604     /// `None`).
4605     fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4606         log::trace!("waitable {self:?} join set {set:?}",);
4607 
4608         let old = mem::replace(&mut self.common(state)?.set, set);
4609 
4610         if let Some(old) = old {
4611             match *self {
4612                 Waitable::Host(id) => state.remove_child(id, old),
4613                 Waitable::Guest(id) => state.remove_child(id, old),
4614                 Waitable::Transmit(id) => state.remove_child(id, old),
4615             }?;
4616 
4617             state.get_mut(old)?.ready.remove(self);
4618         }
4619 
4620         if let Some(set) = set {
4621             match *self {
4622                 Waitable::Host(id) => state.add_child(id, set),
4623                 Waitable::Guest(id) => state.add_child(id, set),
4624                 Waitable::Transmit(id) => state.add_child(id, set),
4625             }?;
4626 
4627             if self.common(state)?.event.is_some() {
4628                 self.mark_ready(state)?;
4629             }
4630         }
4631 
4632         Ok(())
4633     }
4634 
4635     /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4636     fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4637         Ok(match self {
4638             Self::Host(id) => &mut state.get_mut(*id)?.common,
4639             Self::Guest(id) => &mut state.get_mut(*id)?.common,
4640             Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4641         })
4642     }
4643 
4644     /// Set or clear the pending event for this waitable and either deliver it
4645     /// to the first waiter, if any, or mark it as ready to be delivered to the
4646     /// next waiter that arrives.
4647     fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4648         log::trace!("set event for {self:?}: {event:?}");
4649         self.common(state)?.event = event;
4650         self.mark_ready(state)
4651     }
4652 
4653     /// Take the pending event from this waitable, leaving `None` in its place.
4654     fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4655         let common = self.common(state)?;
4656         let event = common.event.take();
4657         if let Some(set) = self.common(state)?.set {
4658             state.get_mut(set)?.ready.remove(self);
4659         }
4660 
4661         Ok(event)
4662     }
4663 
4664     /// Deliver the current event for this waitable to the first waiter, if any,
4665     /// or else mark it as ready to be delivered to the next waiter that
4666     /// arrives.
4667     fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4668         if let Some(set) = self.common(state)?.set {
4669             state.get_mut(set)?.ready.insert(*self);
4670             if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4671                 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4672                 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4673 
4674                 let item = match mode {
4675                     WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4676                     WaitMode::Callback(instance) => WorkItem::GuestCall(
4677                         state.get_mut(thread.task)?.instance.index,
4678                         GuestCall {
4679                             thread,
4680                             kind: GuestCallKind::DeliverEvent {
4681                                 instance,
4682                                 set: Some(set),
4683                             },
4684                         },
4685                     ),
4686                 };
4687                 state.push_high_priority(item);
4688             }
4689         }
4690         Ok(())
4691     }
4692 
4693     /// Remove this waitable from the instance's rep table.
4694     fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4695         match self {
4696             Self::Host(task) => {
4697                 log::trace!("delete host task {task:?}");
4698                 state.delete(*task)?;
4699             }
4700             Self::Guest(task) => {
4701                 log::trace!("delete guest task {task:?}");
4702                 state.delete(*task)?.dispose(state, *task)?;
4703             }
4704             Self::Transmit(task) => {
4705                 state.delete(*task)?;
4706             }
4707         }
4708 
4709         Ok(())
4710     }
4711 }
4712 
4713 impl fmt::Debug for Waitable {
4714     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4715         match self {
4716             Self::Host(id) => write!(f, "{id:?}"),
4717             Self::Guest(id) => write!(f, "{id:?}"),
4718             Self::Transmit(id) => write!(f, "{id:?}"),
4719         }
4720     }
4721 }
4722 
4723 /// Represents a Component Model Async `waitable-set`.
4724 #[derive(Default)]
4725 struct WaitableSet {
4726     /// Which waitables in this set have pending events, if any.
4727     ready: BTreeSet<Waitable>,
4728     /// Which guest threads are currently waiting on this set, if any.
4729     waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4730 }
4731 
4732 impl TableDebug for WaitableSet {
4733     fn type_name() -> &'static str {
4734         "WaitableSet"
4735     }
4736 }
4737 
4738 /// Type-erased closure to lower the parameters for a guest task.
4739 type RawLower =
4740     Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4741 
4742 /// Type-erased closure to lift the result for a guest task.
4743 type RawLift = Box<
4744     dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4745 >;
4746 
4747 /// Type erased result of a guest task which may be downcast to the expected
4748 /// type by a host caller (or simply ignored in the case of a guest caller; see
4749 /// `DummyResult`).
4750 type LiftedResult = Box<dyn Any + Send + Sync>;
4751 
4752 /// Used to return a result from a `LiftFn` when the actual result has already
4753 /// been lowered to a guest task's stack and linear memory.
4754 struct DummyResult;
4755 
4756 /// Represents the Component Model Async state of a (sub-)component instance.
4757 #[derive(Default)]
4758 pub struct ConcurrentInstanceState {
4759     /// Whether backpressure is set for this instance (enabled if >0)
4760     backpressure: u16,
4761     /// Whether this instance can be entered
4762     do_not_enter: bool,
4763     /// Pending calls for this instance which require `Self::backpressure` to be
4764     /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4765     pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4766 }
4767 
4768 impl ConcurrentInstanceState {
4769     pub fn pending_is_empty(&self) -> bool {
4770         self.pending.is_empty()
4771     }
4772 }
4773 
4774 #[derive(Debug, Copy, Clone)]
4775 enum CurrentThread {
4776     Guest(QualifiedThreadId),
4777     Host(TableId<HostTask>),
4778     None,
4779 }
4780 
4781 impl CurrentThread {
4782     fn guest(&self) -> Option<&QualifiedThreadId> {
4783         match self {
4784             Self::Guest(id) => Some(id),
4785             _ => None,
4786         }
4787     }
4788 
4789     fn host(&self) -> Option<TableId<HostTask>> {
4790         match self {
4791             Self::Host(id) => Some(*id),
4792             _ => None,
4793         }
4794     }
4795 
4796     fn is_none(&self) -> bool {
4797         matches!(self, Self::None)
4798     }
4799 }
4800 
4801 impl From<QualifiedThreadId> for CurrentThread {
4802     fn from(id: QualifiedThreadId) -> Self {
4803         Self::Guest(id)
4804     }
4805 }
4806 
4807 impl From<TableId<HostTask>> for CurrentThread {
4808     fn from(id: TableId<HostTask>) -> Self {
4809         Self::Host(id)
4810     }
4811 }
4812 
4813 /// Represents the Component Model Async state of a store.
4814 pub struct ConcurrentState {
4815     /// The currently running thread, if any.
4816     current_thread: CurrentThread,
4817 
4818     /// The set of pending host and background tasks, if any.
4819     ///
4820     /// See `ComponentInstance::poll_until` for where we temporarily take this
4821     /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4822     futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4823     /// The table of waitables, waitable sets, etc.
4824     table: AlwaysMut<ResourceTable>,
4825     /// The "high priority" work queue for this store's event loop.
4826     high_priority: Vec<WorkItem>,
4827     /// The "low priority" work queue for this store's event loop.
4828     low_priority: VecDeque<WorkItem>,
4829     /// A place to stash the reason a fiber is suspending so that the code which
4830     /// resumed it will know under what conditions the fiber should be resumed
4831     /// again.
4832     suspend_reason: Option<SuspendReason>,
4833     /// A cached fiber which is waiting for work to do.
4834     ///
4835     /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4836     worker: Option<StoreFiber<'static>>,
4837     /// A place to stash the work item for which we're resuming a worker fiber.
4838     worker_item: Option<WorkerItem>,
4839 
4840     /// Reference counts for all component error contexts
4841     ///
4842     /// NOTE: it is possible the global ref count to be *greater* than the sum of
4843     /// (sub)component ref counts as tracked by `error_context_tables`, for
4844     /// example when the host holds one or more references to error contexts.
4845     ///
4846     /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4847     /// component-wide representation) of the index into concurrent state for a given
4848     /// stored `ErrorContext`.
4849     ///
4850     /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4851     /// as a `TableId<ErrorContextState>`.
4852     global_error_context_ref_counts:
4853         BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4854 }
4855 
4856 impl Default for ConcurrentState {
4857     fn default() -> Self {
4858         Self {
4859             current_thread: CurrentThread::None,
4860             table: AlwaysMut::new(ResourceTable::new()),
4861             futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4862             high_priority: Vec::new(),
4863             low_priority: VecDeque::new(),
4864             suspend_reason: None,
4865             worker: None,
4866             worker_item: None,
4867             global_error_context_ref_counts: BTreeMap::new(),
4868         }
4869     }
4870 }
4871 
4872 impl ConcurrentState {
4873     /// Take ownership of any fibers and futures owned by this object.
4874     ///
4875     /// This should be used when disposing of the `Store` containing this object
4876     /// in order to gracefully resolve any and all fibers using
4877     /// `StoreFiber::dispose`.  This is necessary to avoid possible
4878     /// use-after-free bugs due to fibers which may still have access to the
4879     /// `Store`.
4880     ///
4881     /// Additionally, the futures collected with this function should be dropped
4882     /// within a `tls::set` call, which will ensure than any futures closing
4883     /// over an `&Accessor` will have access to the store when dropped, allowing
4884     /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4885     /// panicking.
4886     ///
4887     /// Note that this will leave the object in an inconsistent and unusable
4888     /// state, so it should only be used just prior to dropping it.
4889     pub(crate) fn take_fibers_and_futures(
4890         &mut self,
4891         fibers: &mut Vec<StoreFiber<'static>>,
4892         futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4893     ) {
4894         for entry in self.table.get_mut().iter_mut() {
4895             if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4896                 for mode in mem::take(&mut set.waiting).into_values() {
4897                     if let WaitMode::Fiber(fiber) = mode {
4898                         fibers.push(fiber);
4899                     }
4900                 }
4901             } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4902                 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4903                     mem::replace(&mut thread.state, GuestThreadState::Completed)
4904                 {
4905                     fibers.push(fiber);
4906                 }
4907             }
4908         }
4909 
4910         if let Some(fiber) = self.worker.take() {
4911             fibers.push(fiber);
4912         }
4913 
4914         let mut handle_item = |item| match item {
4915             WorkItem::ResumeFiber(fiber) => {
4916                 fibers.push(fiber);
4917             }
4918             WorkItem::PushFuture(future) => {
4919                 self.futures
4920                     .get_mut()
4921                     .as_mut()
4922                     .unwrap()
4923                     .push(future.into_inner());
4924             }
4925             _ => {}
4926         };
4927 
4928         for item in mem::take(&mut self.high_priority) {
4929             handle_item(item);
4930         }
4931         for item in mem::take(&mut self.low_priority) {
4932             handle_item(item);
4933         }
4934 
4935         if let Some(them) = self.futures.get_mut().take() {
4936             futures.push(them);
4937         }
4938     }
4939 
4940     /// Collect the next set of work items to run. This will be either all
4941     /// high-priority items, or a single low-priority item if there are no
4942     /// high-priority items.
4943     fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4944         let mut ready = mem::take(&mut self.high_priority);
4945         if ready.is_empty() {
4946             if let Some(item) = self.low_priority.pop_back() {
4947                 ready.push(item);
4948             }
4949         }
4950         ready
4951     }
4952 
4953     fn push<V: Send + Sync + 'static>(
4954         &mut self,
4955         value: V,
4956     ) -> Result<TableId<V>, ResourceTableError> {
4957         self.table.get_mut().push(value).map(TableId::from)
4958     }
4959 
4960     fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4961         self.table.get_mut().get_mut(&Resource::from(id))
4962     }
4963 
4964     pub fn add_child<T: 'static, U: 'static>(
4965         &mut self,
4966         child: TableId<T>,
4967         parent: TableId<U>,
4968     ) -> Result<(), ResourceTableError> {
4969         self.table
4970             .get_mut()
4971             .add_child(Resource::from(child), Resource::from(parent))
4972     }
4973 
4974     pub fn remove_child<T: 'static, U: 'static>(
4975         &mut self,
4976         child: TableId<T>,
4977         parent: TableId<U>,
4978     ) -> Result<(), ResourceTableError> {
4979         self.table
4980             .get_mut()
4981             .remove_child(Resource::from(child), Resource::from(parent))
4982     }
4983 
4984     fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4985         self.table.get_mut().delete(Resource::from(id))
4986     }
4987 
4988     fn push_future(&mut self, future: HostTaskFuture) {
4989         // Note that we can't directly push to `ConcurrentState::futures` here
4990         // since this may be called from a future that's being polled inside
4991         // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4992         // so it has exclusive access while polling it.  Therefore, we push a
4993         // work item to the "high priority" queue, which will actually push to
4994         // `ConcurrentState::futures` later.
4995         self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4996     }
4997 
4998     fn push_high_priority(&mut self, item: WorkItem) {
4999         log::trace!("push high priority: {item:?}");
5000         self.high_priority.push(item);
5001     }
5002 
5003     fn push_low_priority(&mut self, item: WorkItem) {
5004         log::trace!("push low priority: {item:?}");
5005         self.low_priority.push_front(item);
5006     }
5007 
5008     fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5009         if high_priority {
5010             self.push_high_priority(item);
5011         } else {
5012             self.push_low_priority(item);
5013         }
5014     }
5015 
5016     fn promote_instance_local_thread_work_item(
5017         &mut self,
5018         current_instance: RuntimeComponentInstanceIndex,
5019     ) -> bool {
5020         self.promote_work_items_matching(|item: &WorkItem| match item {
5021             WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5022                 *instance == current_instance
5023             }
5024             _ => false,
5025         })
5026     }
5027 
5028     fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5029         self.promote_work_items_matching(|item: &WorkItem| match item {
5030             WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5031                 *t == thread
5032             }
5033             _ => false,
5034         })
5035     }
5036 
5037     fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5038     where
5039         F: FnMut(&WorkItem) -> bool,
5040     {
5041         // If there's a high-priority work item to resume the current guest thread,
5042         // we don't need to promote anything, but we return true to indicate that
5043         // work is pending for the current instance.
5044         if self.high_priority.iter().any(&mut predicate) {
5045             true
5046         }
5047         // Otherwise, look for a low-priority work item that matches the current
5048         // instance and promote it to high-priority.
5049         else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5050             let item = self.low_priority.remove(idx).unwrap();
5051             self.push_high_priority(item);
5052             true
5053         } else {
5054             false
5055         }
5056     }
5057 
5058     /// Implements the `context.get` intrinsic.
5059     pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5060         let thread = self.unwrap_current_guest_thread();
5061         let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5062         log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5063         Ok(val)
5064     }
5065 
5066     /// Implements the `context.set` intrinsic.
5067     pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5068         let thread = self.unwrap_current_guest_thread();
5069         log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5070         self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5071         Ok(())
5072     }
5073 
5074     /// Returns whether there's a pending cancellation on the current guest thread,
5075     /// consuming the event if so.
5076     fn take_pending_cancellation(&mut self) -> bool {
5077         let thread = self.unwrap_current_guest_thread();
5078         if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5079             assert!(matches!(event, Event::Cancelled));
5080             true
5081         } else {
5082             false
5083         }
5084     }
5085 
5086     fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5087         if self.may_block(task) {
5088             Ok(())
5089         } else {
5090             Err(Trap::CannotBlockSyncTask.into())
5091         }
5092     }
5093 
5094     fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5095         let task = self.get_mut(task).unwrap();
5096         task.async_function || task.returned_or_cancelled()
5097     }
5098 
5099     /// Used by `ResourceTables` to acquire the current `CallContext` for the
5100     /// specified task.
5101     ///
5102     /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5103     /// below.
5104     pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5105         let (task, is_host) = (task >> 1, task & 1 == 1);
5106         if is_host {
5107             let task: TableId<HostTask> = TableId::new(task);
5108             &mut self.get_mut(task).unwrap().call_context
5109         } else {
5110             let task: TableId<GuestTask> = TableId::new(task);
5111             &mut self.get_mut(task).unwrap().call_context
5112         }
5113     }
5114 
5115     /// Used by `ResourceTables` to record the scope of a borrow to get undone
5116     /// in the future.
5117     pub fn current_call_context_scope_id(&self) -> u32 {
5118         let (bits, is_host) = match self.current_thread {
5119             CurrentThread::Guest(id) => (id.task.rep(), false),
5120             CurrentThread::Host(id) => (id.rep(), true),
5121             CurrentThread::None => unreachable!(),
5122         };
5123         assert_eq!((bits << 1) >> 1, bits);
5124         (bits << 1) | u32::from(is_host)
5125     }
5126 
5127     fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5128         *self.current_thread.guest().unwrap()
5129     }
5130 
5131     fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5132         self.current_thread.host().unwrap()
5133     }
5134 }
5135 
5136 /// Provide a type hint to compiler about the shape of a parameter lower
5137 /// closure.
5138 fn for_any_lower<
5139     F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5140 >(
5141     fun: F,
5142 ) -> F {
5143     fun
5144 }
5145 
5146 /// Provide a type hint to compiler about the shape of a result lift closure.
5147 fn for_any_lift<
5148     F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5149 >(
5150     fun: F,
5151 ) -> F {
5152     fun
5153 }
5154 
5155 /// Wrap the specified future in a `poll_fn` which asserts that the future is
5156 /// only polled from the event loop of the specified `Store`.
5157 ///
5158 /// See `StoreContextMut::run_concurrent` for details.
5159 fn checked<F: Future + Send + 'static>(
5160     id: StoreId,
5161     fut: F,
5162 ) -> impl Future<Output = F::Output> + Send + 'static {
5163     async move {
5164         let mut fut = pin!(fut);
5165         future::poll_fn(move |cx| {
5166             let message = "\
5167                 `Future`s which depend on asynchronous component tasks, streams, or \
5168                 futures to complete may only be polled from the event loop of the \
5169                 store to which they belong.  Please use \
5170                 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5171             ";
5172             tls::try_get(|store| {
5173                 let matched = match store {
5174                     tls::TryGet::Some(store) => store.id() == id,
5175                     tls::TryGet::Taken | tls::TryGet::None => false,
5176                 };
5177 
5178                 if !matched {
5179                     panic!("{message}")
5180                 }
5181             });
5182             fut.as_mut().poll(cx)
5183         })
5184         .await
5185     }
5186 }
5187 
5188 /// Assert that `StoreContextMut::run_concurrent` has not been called from
5189 /// within an store's event loop.
5190 fn check_recursive_run() {
5191     tls::try_get(|store| {
5192         if !matches!(store, tls::TryGet::None) {
5193             panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5194         }
5195     });
5196 }
5197 
5198 fn unpack_callback_code(code: u32) -> (u32, u32) {
5199     (code & 0xF, code >> 4)
5200 }
5201 
5202 /// Helper struct for packaging parameters to be passed to
5203 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5204 /// `waitable-set.poll`.
5205 struct WaitableCheckParams {
5206     set: TableId<WaitableSet>,
5207     options: OptionsIndex,
5208     payload: u32,
5209 }
5210 
5211 /// Indicates whether `ComponentInstance::waitable_check` is being called for
5212 /// `waitable-set.wait` or `waitable-set.poll`.
5213 enum WaitableCheck {
5214     Wait,
5215     Poll,
5216 }
5217 
5218 /// Represents a guest task called from the host, prepared using `prepare_call`.
5219 pub(crate) struct PreparedCall<R> {
5220     /// The guest export to be called
5221     handle: Func,
5222     /// The guest thread created by `prepare_call`
5223     thread: QualifiedThreadId,
5224     /// The number of lowered core Wasm parameters to pass to the call.
5225     param_count: usize,
5226     /// The `oneshot::Receiver` to which the result of the call will be
5227     /// delivered when it is available.
5228     rx: oneshot::Receiver<LiftedResult>,
5229     /// The `oneshot::Receiver` which will resolve when the task -- and any
5230     /// transitive subtasks -- have all exited.
5231     exit_rx: oneshot::Receiver<()>,
5232     _phantom: PhantomData<R>,
5233 }
5234 
5235 impl<R> PreparedCall<R> {
5236     /// Get a copy of the `TaskId` for this `PreparedCall`.
5237     pub(crate) fn task_id(&self) -> TaskId {
5238         TaskId {
5239             task: self.thread.task,
5240         }
5241     }
5242 }
5243 
5244 /// Represents a task created by `prepare_call`.
5245 pub(crate) struct TaskId {
5246     task: TableId<GuestTask>,
5247 }
5248 
5249 impl TaskId {
5250     /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5251     /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5252     /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5253     /// and can be resumed by other tasks for this component, so we mark the future as dropped
5254     /// and delete the task when all threads are done.
5255     pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5256         let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5257         if !task.already_lowered_parameters() {
5258             Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5259         } else {
5260             task.host_future_state = HostFutureState::Dropped;
5261             if task.ready_to_delete() {
5262                 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5263             }
5264         }
5265         Ok(())
5266     }
5267 }
5268 
5269 /// Prepare a call to the specified exported Wasm function, providing functions
5270 /// for lowering the parameters and lifting the result.
5271 ///
5272 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5273 /// loop, use `queue_call`.
5274 pub(crate) fn prepare_call<T, R>(
5275     mut store: StoreContextMut<T>,
5276     handle: Func,
5277     param_count: usize,
5278     host_future_present: bool,
5279     lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5280     + Send
5281     + Sync
5282     + 'static,
5283     lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5284     + Send
5285     + Sync
5286     + 'static,
5287 ) -> Result<PreparedCall<R>> {
5288     let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5289 
5290     let instance = handle.instance().id().get(store.0);
5291     let options = &instance.component().env_component().options[options];
5292     let ty = &instance.component().types()[ty];
5293     let async_function = ty.async_;
5294     let task_return_type = ty.results;
5295     let component_instance = raw_options.instance;
5296     let callback = options.callback.map(|i| instance.runtime_callback(i));
5297     let memory = options
5298         .memory()
5299         .map(|i| instance.runtime_memory(i))
5300         .map(SendSyncPtr::new);
5301     let string_encoding = options.string_encoding;
5302     let token = StoreToken::new(store.as_context_mut());
5303     let state = store.0.concurrent_state_mut();
5304 
5305     let (tx, rx) = oneshot::channel();
5306     let (exit_tx, exit_rx) = oneshot::channel();
5307 
5308     let instance = RuntimeInstance {
5309         instance: handle.instance().id().instance(),
5310         index: component_instance,
5311     };
5312     let caller = state.current_thread;
5313     let task = GuestTask::new(
5314         state,
5315         Box::new(for_any_lower(move |store, params| {
5316             lower_params(handle, token.as_context_mut(store), params)
5317         })),
5318         LiftResult {
5319             lift: Box::new(for_any_lift(move |store, result| {
5320                 lift_result(handle, store, result)
5321             })),
5322             ty: task_return_type,
5323             memory,
5324             string_encoding,
5325         },
5326         Caller::Host {
5327             tx: Some(tx),
5328             exit_tx: Arc::new(exit_tx),
5329             host_future_present,
5330             caller,
5331         },
5332         callback.map(|callback| {
5333             let callback = SendSyncPtr::new(callback);
5334             let instance = handle.instance();
5335             Box::new(move |store: &mut dyn VMStore, event, handle| {
5336                 let store = token.as_context_mut(store);
5337                 // SAFETY: Per the contract of `prepare_call`, the callback
5338                 // will remain valid at least as long is this task exists.
5339                 unsafe { instance.call_callback(store, callback, event, handle) }
5340             }) as CallbackFn
5341         }),
5342         instance,
5343         async_function,
5344     )?;
5345 
5346     let task = state.push(task)?;
5347     let thread = state.push(GuestThread::new_implicit(task))?;
5348     state.get_mut(task)?.threads.insert(thread);
5349 
5350     if !store.0.may_enter(instance) {
5351         bail!(crate::Trap::CannotEnterComponent);
5352     }
5353 
5354     Ok(PreparedCall {
5355         handle,
5356         thread: QualifiedThreadId { task, thread },
5357         param_count,
5358         rx,
5359         exit_rx,
5360         _phantom: PhantomData,
5361     })
5362 }
5363 
5364 /// Queue a call previously prepared using `prepare_call` to be run as part of
5365 /// the associated `ComponentInstance`'s event loop.
5366 ///
5367 /// The returned future will resolve to the result once it is available, but
5368 /// must only be polled via the instance's event loop. See
5369 /// `StoreContextMut::run_concurrent` for details.
5370 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5371     mut store: StoreContextMut<T>,
5372     prepared: PreparedCall<R>,
5373 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5374     let PreparedCall {
5375         handle,
5376         thread,
5377         param_count,
5378         rx,
5379         exit_rx,
5380         ..
5381     } = prepared;
5382 
5383     queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5384 
5385     Ok(checked(
5386         store.0.id(),
5387         rx.map(move |result| {
5388             result
5389                 .map(|v| (*v.downcast().unwrap(), exit_rx))
5390                 .map_err(crate::Error::from)
5391         }),
5392     ))
5393 }
5394 
5395 /// Queue a call previously prepared using `prepare_call` to be run as part of
5396 /// the associated `ComponentInstance`'s event loop.
5397 fn queue_call0<T: 'static>(
5398     store: StoreContextMut<T>,
5399     handle: Func,
5400     guest_thread: QualifiedThreadId,
5401     param_count: usize,
5402 ) -> Result<()> {
5403     let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5404     let is_concurrent = raw_options.async_;
5405     let callback = raw_options.callback;
5406     let instance = handle.instance();
5407     let callee = handle.lifted_core_func(store.0);
5408     let post_return = handle.post_return_core_func(store.0);
5409     let callback = callback.map(|i| {
5410         let instance = instance.id().get(store.0);
5411         SendSyncPtr::new(instance.runtime_callback(i))
5412     });
5413 
5414     log::trace!("queueing call {guest_thread:?}");
5415 
5416     // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5417     // (with signatures appropriate for this call) and will remain valid as
5418     // long as this instance is valid.
5419     unsafe {
5420         instance.queue_call(
5421             store,
5422             guest_thread,
5423             SendSyncPtr::new(callee),
5424             param_count,
5425             1,
5426             is_concurrent,
5427             callback,
5428             post_return.map(SendSyncPtr::new),
5429         )
5430     }
5431 }
5432