1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics.  See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create.  Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest.  This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker.  That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52 
53 use crate::component::func::{self, Func};
54 use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
55 use crate::fiber::{self, StoreFiber, StoreFiberYield};
56 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57 use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
58 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
59 use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
60 use anyhow::{Context as _, Result, anyhow, bail};
61 use error_contexts::GlobalErrorContextRefCount;
62 use futures::channel::oneshot;
63 use futures::future::{self, Either, FutureExt};
64 use futures::stream::{FuturesUnordered, StreamExt};
65 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
66 use std::any::Any;
67 use std::borrow::ToOwned;
68 use std::boxed::Box;
69 use std::cell::UnsafeCell;
70 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
71 use std::fmt;
72 use std::future::Future;
73 use std::marker::PhantomData;
74 use std::mem::{self, ManuallyDrop, MaybeUninit};
75 use std::ops::DerefMut;
76 use std::pin::{Pin, pin};
77 use std::ptr::{self, NonNull};
78 use std::slice;
79 use std::sync::Arc;
80 use std::task::{Context, Poll, Waker};
81 use std::vec::Vec;
82 use table::{TableDebug, TableId};
83 use wasmtime_environ::Trap;
84 use wasmtime_environ::component::{
85     CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS,
86     OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
87     RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
88     TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
89     TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
90 };
91 
92 pub use abort::JoinHandle;
93 pub use future_stream_any::{FutureAny, StreamAny};
94 pub use futures_and_streams::{
95     Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
96     FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
97     StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
98 };
99 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
100 
101 mod abort;
102 mod error_contexts;
103 mod future_stream_any;
104 mod futures_and_streams;
105 pub(crate) mod table;
106 pub(crate) mod tls;
107 
108 /// Constant defined in the Component Model spec to indicate that the async
109 /// intrinsic (e.g. `future.write`) has not yet completed.
110 const BLOCKED: u32 = 0xffff_ffff;
111 
112 /// Corresponds to `CallState` in the upstream spec.
113 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
114 pub enum Status {
115     Starting = 0,
116     Started = 1,
117     Returned = 2,
118     StartCancelled = 3,
119     ReturnCancelled = 4,
120 }
121 
122 impl Status {
123     /// Packs this status and the optional `waitable` provided into a 32-bit
124     /// result that the canonical ABI requires.
125     ///
126     /// The low 4 bits are reserved for the status while the upper 28 bits are
127     /// the waitable, if present.
128     pub fn pack(self, waitable: Option<u32>) -> u32 {
129         assert!(matches!(self, Status::Returned) == waitable.is_none());
130         let waitable = waitable.unwrap_or(0);
131         assert!(waitable < (1 << 28));
132         (waitable << 4) | (self as u32)
133     }
134 }
135 
136 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
137 /// data.
138 #[derive(Clone, Copy, Debug)]
139 enum Event {
140     None,
141     Cancelled,
142     Subtask {
143         status: Status,
144     },
145     StreamRead {
146         code: ReturnCode,
147         pending: Option<(TypeStreamTableIndex, u32)>,
148     },
149     StreamWrite {
150         code: ReturnCode,
151         pending: Option<(TypeStreamTableIndex, u32)>,
152     },
153     FutureRead {
154         code: ReturnCode,
155         pending: Option<(TypeFutureTableIndex, u32)>,
156     },
157     FutureWrite {
158         code: ReturnCode,
159         pending: Option<(TypeFutureTableIndex, u32)>,
160     },
161 }
162 
163 impl Event {
164     /// Lower this event to core Wasm integers for delivery to the guest.
165     ///
166     /// Note that the waitable handle, if any, is assumed to be lowered
167     /// separately.
168     fn parts(self) -> (u32, u32) {
169         const EVENT_NONE: u32 = 0;
170         const EVENT_SUBTASK: u32 = 1;
171         const EVENT_STREAM_READ: u32 = 2;
172         const EVENT_STREAM_WRITE: u32 = 3;
173         const EVENT_FUTURE_READ: u32 = 4;
174         const EVENT_FUTURE_WRITE: u32 = 5;
175         const EVENT_CANCELLED: u32 = 6;
176         match self {
177             Event::None => (EVENT_NONE, 0),
178             Event::Cancelled => (EVENT_CANCELLED, 0),
179             Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
180             Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
181             Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
182             Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
183             Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
184         }
185     }
186 }
187 
188 /// Corresponds to `CallbackCode` in the spec.
189 mod callback_code {
190     pub const EXIT: u32 = 0;
191     pub const YIELD: u32 = 1;
192     pub const WAIT: u32 = 2;
193     pub const POLL: u32 = 3;
194 }
195 
196 /// A flag indicating that the callee is an async-lowered export.
197 ///
198 /// This may be passed to the `async-start` intrinsic from a fused adapter.
199 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
200 
201 /// Provides access to either store data (via the `get` method) or the store
202 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
203 /// instance to which the current host task belongs.
204 ///
205 /// See [`Accessor::with`] for details.
206 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
207     store: StoreContextMut<'a, T>,
208     get_data: fn(&mut T) -> D::Data<'_>,
209 }
210 
211 impl<'a, T, D> Access<'a, T, D>
212 where
213     D: HasData + ?Sized,
214     T: 'static,
215 {
216     /// Creates a new [`Access`] from its component parts.
217     pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
218         Self { store, get_data }
219     }
220 
221     /// Get mutable access to the store data.
222     pub fn data_mut(&mut self) -> &mut T {
223         self.store.data_mut()
224     }
225 
226     /// Get mutable access to the store data.
227     pub fn get(&mut self) -> D::Data<'_> {
228         (self.get_data)(self.data_mut())
229     }
230 
231     /// Spawn a background task.
232     ///
233     /// See [`Accessor::spawn`] for details.
234     pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
235     where
236         T: 'static,
237     {
238         let accessor = Accessor {
239             get_data: self.get_data,
240             token: StoreToken::new(self.store.as_context_mut()),
241         };
242         self.store
243             .as_context_mut()
244             .spawn_with_accessor(accessor, task)
245     }
246 
247     /// Returns the getter this accessor is using to project from `T` into
248     /// `D::Data`.
249     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
250         self.get_data
251     }
252 }
253 
254 impl<'a, T, D> AsContext for Access<'a, T, D>
255 where
256     D: HasData + ?Sized,
257     T: 'static,
258 {
259     type Data = T;
260 
261     fn as_context(&self) -> StoreContext<'_, T> {
262         self.store.as_context()
263     }
264 }
265 
266 impl<'a, T, D> AsContextMut for Access<'a, T, D>
267 where
268     D: HasData + ?Sized,
269     T: 'static,
270 {
271     fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
272         self.store.as_context_mut()
273     }
274 }
275 
276 /// Provides scoped mutable access to store data in the context of a concurrent
277 /// host task future.
278 ///
279 /// This allows multiple host task futures to execute concurrently and access
280 /// the store between (but not across) `await` points.
281 ///
282 /// # Rationale
283 ///
284 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
285 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
286 /// literally store these values. The basic problem is that when a concurrent
287 /// host future is being polled it has access to `&mut T` (and the whole
288 /// `Store`) but when it's not being polled it does not have access to these
289 /// values. This reflects how the store is only ever polling one future at a
290 /// time so the store is effectively being passed between futures.
291 ///
292 /// Rust's `Future` trait, however, has no means of passing a `Store`
293 /// temporarily between futures. The [`Context`](std::task::Context) type does
294 /// not have the ability to attach arbitrary information to it at this time.
295 /// This type, [`Accessor`], is used to bridge this expressivity gap.
296 ///
297 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
298 /// a synchronous manner, the current store. The [`Accessor::with`] function
299 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
300 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
301 /// not take an `async` closure as its argument, instead it's a synchronous
302 /// closure which must complete during on run of `Future::poll`. This reflects
303 /// how the store is temporarily made available while a host future is being
304 /// polled.
305 ///
306 /// # Implementation
307 ///
308 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
309 /// this type additionally doesn't even have a lifetime parameter. This is
310 /// instead a representation of proof of the ability to acquire these while a
311 /// future is being polled. Wasmtime will, when it polls a host future,
312 /// configure ambient state such that the `Accessor` that a future closes over
313 /// will work and be able to access the store.
314 ///
315 /// This has a number of implications for users such as:
316 ///
317 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
318 ///   the lifetime of a single future.
319 /// * A future is expected to, however, close over an `Accessor` and keep it
320 ///   alive probably for the duration of the entire future.
321 /// * Different host futures will be given different `Accessor`s, and that's
322 ///   intentional.
323 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
324 ///   alleviates some otherwise required bounds to be written down.
325 ///
326 /// # Using `Accessor` in `Drop`
327 ///
328 /// The methods on `Accessor` are only expected to work in the context of
329 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
330 /// host future can be dropped at any time throughout the system and Wasmtime
331 /// store context is not necessarily available at that time. It's recommended to
332 /// not use `Accessor` methods in anything connected to a `Drop` implementation
333 /// as they will panic and have unintended results. If you run into this though
334 /// feel free to file an issue on the Wasmtime repository.
335 pub struct Accessor<T: 'static, D = HasSelf<T>>
336 where
337     D: HasData + ?Sized,
338 {
339     token: StoreToken<T>,
340     get_data: fn(&mut T) -> D::Data<'_>,
341 }
342 
343 /// A helper trait to take any type of accessor-with-data in functions.
344 ///
345 /// This trait is similar to [`AsContextMut`] except that it's used when
346 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
347 /// [`Accessor`] is the main type used in concurrent settings and is passed to
348 /// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
349 ///
350 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
351 /// this trait. This effectively means that regardless of the `D` in
352 /// `Accessor<T, D>` it can still be passed to a function which just needs a
353 /// store accessor.
354 ///
355 /// Acquiring an [`Accessor`] can be done through
356 /// [`StoreContextMut::run_concurrent`] for example or in a host function
357 /// through
358 /// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
359 pub trait AsAccessor {
360     /// The `T` in `Store<T>` that this accessor refers to.
361     type Data: 'static;
362 
363     /// The `D` in `Accessor<T, D>`, or the projection out of
364     /// `Self::Data`.
365     type AccessorData: HasData + ?Sized;
366 
367     /// Returns the accessor that this is referring to.
368     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
369 }
370 
371 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
372     type Data = T::Data;
373     type AccessorData = T::AccessorData;
374 
375     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
376         T::as_accessor(self)
377     }
378 }
379 
380 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
381     type Data = T;
382     type AccessorData = D;
383 
384     fn as_accessor(&self) -> &Accessor<T, D> {
385         self
386     }
387 }
388 
389 // Note that it is intentional at this time that `Accessor` does not actually
390 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
391 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
392 // that matter). This is used to ergonomically simplify bindings where the
393 // majority of the time `Accessor` is closed over in a future which then needs
394 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
395 // you already have to write `T: 'static`...) it helps to avoid this.
396 //
397 // Note as well that `Accessor` doesn't actually store its data at all. Instead
398 // it's more of a "proof" of what can be accessed from TLS. API design around
399 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
400 // intentionally made to ensure that `Accessor` is ideally only used in the
401 // context that TLS variables are actually set. For example host functions are
402 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
403 // the value outside of a future. Within the future the TLS variables are all
404 // guaranteed to be set while the future is being polled.
405 //
406 // Finally though this is not an ironclad guarantee, but nor does it need to be.
407 // The TLS APIs are designed to panic or otherwise model usage where they're
408 // called recursively or similar. It's hoped that code cannot be constructed to
409 // actually hit this at runtime but this is not a safety requirement at this
410 // time.
411 const _: () = {
412     const fn assert<T: Send + Sync>() {}
413     assert::<Accessor<UnsafeCell<u32>>>();
414 };
415 
416 impl<T> Accessor<T> {
417     /// Creates a new `Accessor` backed by the specified functions.
418     ///
419     /// - `get`: used to retrieve the store
420     ///
421     /// - `get_data`: used to "project" from the store's associated data to
422     /// another type (e.g. a field of that data or a wrapper around it).
423     ///
424     /// - `spawn`: used to queue spawned background tasks to be run later
425     pub(crate) fn new(token: StoreToken<T>) -> Self {
426         Self {
427             token,
428             get_data: |x| x,
429         }
430     }
431 }
432 
433 impl<T, D> Accessor<T, D>
434 where
435     D: HasData + ?Sized,
436 {
437     /// Run the specified closure, passing it mutable access to the store.
438     ///
439     /// This function is one of the main building blocks of the [`Accessor`]
440     /// type. This yields synchronous, blocking, access to store via an
441     /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
442     /// providing the ability to access `D` via [`Access::get`]. Note that the
443     /// `fun` here is given only temporary access to the store and `T`/`D`
444     /// meaning that the return value `R` here is not allowed to capture borrows
445     /// into the two. If access is needed to data within `T` or `D` outside of
446     /// this closure then it must be `clone`d out, for example.
447     ///
448     /// # Panics
449     ///
450     /// This function will panic if it is call recursively with any other
451     /// accessor already in scope. For example if `with` is called within `fun`,
452     /// then this function will panic. It is up to the embedder to ensure that
453     /// this does not happen.
454     pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
455         tls::get(|vmstore| {
456             fun(Access {
457                 store: self.token.as_context_mut(vmstore),
458                 get_data: self.get_data,
459             })
460         })
461     }
462 
463     /// Returns the getter this accessor is using to project from `T` into
464     /// `D::Data`.
465     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
466         self.get_data
467     }
468 
469     /// Changes this accessor to access `D2` instead of the current type
470     /// parameter `D`.
471     ///
472     /// This changes the underlying data access from `T` to `D2::Data<'_>`.
473     ///
474     /// # Panics
475     ///
476     /// When using this API the returned value is disconnected from `&self` and
477     /// the lifetime binding the `self` argument. An `Accessor` only works
478     /// within the context of the closure or async closure that it was
479     /// originally given to, however. This means that due to the fact that the
480     /// returned value has no lifetime connection it's possible to use the
481     /// accessor outside of `&self`, the original accessor, and panic.
482     ///
483     /// The returned value should only be used within the scope of the original
484     /// `Accessor` that `self` refers to.
485     pub fn with_getter<D2: HasData>(
486         &self,
487         get_data: fn(&mut T) -> D2::Data<'_>,
488     ) -> Accessor<T, D2> {
489         Accessor {
490             token: self.token,
491             get_data,
492         }
493     }
494 
495     /// Spawn a background task which will receive an `&Accessor<T, D>` and
496     /// run concurrently with any other tasks in progress for the current
497     /// store.
498     ///
499     /// This is particularly useful for host functions which return a `stream`
500     /// or `future` such that the code to write to the write end of that
501     /// `stream` or `future` must run after the function returns.
502     ///
503     /// The returned [`JoinHandle`] may be used to cancel the task.
504     ///
505     /// # Panics
506     ///
507     /// Panics if called within a closure provided to the [`Accessor::with`]
508     /// function. This can only be called outside an active invocation of
509     /// [`Accessor::with`].
510     pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
511     where
512         T: 'static,
513     {
514         let accessor = self.clone_for_spawn();
515         self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
516     }
517 
518     fn clone_for_spawn(&self) -> Self {
519         Self {
520             token: self.token,
521             get_data: self.get_data,
522         }
523     }
524 }
525 
526 /// Represents a task which may be provided to `Accessor::spawn`,
527 /// `Accessor::forward`, or `StorecContextMut::spawn`.
528 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
529 // option.
530 //
531 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
532 // bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
533 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
534 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
535 // it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
536 // best we can do for the time being.
537 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
538 where
539     D: HasData + ?Sized,
540 {
541     /// Run the task.
542     fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
543 }
544 
545 /// Represents parameter and result metadata for the caller side of a
546 /// guest->guest call orchestrated by a fused adapter.
547 enum CallerInfo {
548     /// Metadata for a call to an async-lowered import
549     Async {
550         params: Vec<ValRaw>,
551         has_result: bool,
552     },
553     /// Metadata for a call to an sync-lowered import
554     Sync {
555         params: Vec<ValRaw>,
556         result_count: u32,
557     },
558 }
559 
560 /// Indicates how a guest task is waiting on a waitable set.
561 enum WaitMode {
562     /// The guest task is waiting using `task.wait`
563     Fiber(StoreFiber<'static>),
564     /// The guest task is waiting via a callback declared as part of an
565     /// async-lifted export.
566     Callback(Instance),
567 }
568 
569 /// Represents the reason a fiber is suspending itself.
570 #[derive(Debug)]
571 enum SuspendReason {
572     /// The fiber is waiting for an event to be delivered to the specified
573     /// waitable set or task.
574     Waiting {
575         set: TableId<WaitableSet>,
576         thread: QualifiedThreadId,
577         skip_may_block_check: bool,
578     },
579     /// The fiber has finished handling its most recent work item and is waiting
580     /// for another (or to be dropped if it is no longer needed).
581     NeedWork,
582     /// The fiber is yielding and should be resumed once other tasks have had a
583     /// chance to run.
584     Yielding { thread: QualifiedThreadId },
585     /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
586     ExplicitlySuspending {
587         thread: QualifiedThreadId,
588         skip_may_block_check: bool,
589     },
590 }
591 
592 /// Represents a pending call into guest code for a given guest task.
593 enum GuestCallKind {
594     /// Indicates there's an event to deliver to the task, possibly related to a
595     /// waitable set the task has been waiting on or polling.
596     DeliverEvent {
597         /// The instance to which the task belongs.
598         instance: Instance,
599         /// The waitable set the event belongs to, if any.
600         ///
601         /// If this is `None` the event will be waiting in the
602         /// `GuestTask::event` field for the task.
603         set: Option<TableId<WaitableSet>>,
604     },
605     /// Indicates that a new guest task call is pending and may be executed
606     /// using the specified closure.
607     ///
608     /// If the closure returns `Ok(Some(call))`, the `call` should be run
609     /// immediately using `handle_guest_call`.
610     StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
611     StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
612 }
613 
614 impl fmt::Debug for GuestCallKind {
615     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
616         match self {
617             Self::DeliverEvent { instance, set } => f
618                 .debug_struct("DeliverEvent")
619                 .field("instance", instance)
620                 .field("set", set)
621                 .finish(),
622             Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
623             Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
624         }
625     }
626 }
627 
628 /// Represents a pending call into guest code for a given guest thread.
629 #[derive(Debug)]
630 struct GuestCall {
631     thread: QualifiedThreadId,
632     kind: GuestCallKind,
633 }
634 
635 impl GuestCall {
636     /// Returns whether or not the call is ready to run.
637     ///
638     /// A call will not be ready to run if either:
639     ///
640     /// - the (sub-)component instance to be called has already been entered and
641     /// cannot be reentered until an in-progress call completes
642     ///
643     /// - the call is for a not-yet started task and the (sub-)component
644     /// instance to be called has backpressure enabled
645     fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> {
646         let task_instance = state.get_mut(self.thread.task)?.instance;
647         let state = state.instance_state(task_instance);
648 
649         let ready = match &self.kind {
650             GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
651             GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
652             GuestCallKind::StartExplicit(_) => true,
653         };
654         log::trace!(
655             "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
656             state.do_not_enter,
657             state.backpressure
658         );
659         Ok(ready)
660     }
661 }
662 
663 /// Job to be run on a worker fiber.
664 enum WorkerItem {
665     GuestCall(GuestCall),
666     Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
667 }
668 
669 /// Represents state related to an in-progress poll operation (e.g. `task.poll`
670 /// or `CallbackCode.POLL`).
671 #[derive(Debug)]
672 struct PollParams {
673     /// The instance to which the polling thread belongs.
674     instance: Instance,
675     /// The polling thread.
676     thread: QualifiedThreadId,
677     /// The waitable set being polled.
678     set: TableId<WaitableSet>,
679 }
680 
681 /// Represents a pending work item to be handled by the event loop for a given
682 /// component instance.
683 enum WorkItem {
684     /// A host task to be pushed to `ConcurrentState::futures`.
685     PushFuture(AlwaysMut<HostTaskFuture>),
686     /// A fiber to resume.
687     ResumeFiber(StoreFiber<'static>),
688     /// A pending call into guest code for a given guest task.
689     GuestCall(GuestCall),
690     /// A pending `task.poll` or `CallbackCode.POLL` operation.
691     Poll(PollParams),
692     /// A job to run on a worker fiber.
693     WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
694 }
695 
696 impl fmt::Debug for WorkItem {
697     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
698         match self {
699             Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
700             Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
701             Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
702             Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(),
703             Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
704         }
705     }
706 }
707 
708 /// Whether a suspension intrinsic was cancelled or completed
709 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
710 pub(crate) enum WaitResult {
711     Cancelled,
712     Completed,
713 }
714 
715 /// Raise a trap if the calling task is synchronous and trying to block prior to
716 /// returning a value.
717 pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
718     store.concurrent_state_mut().check_blocking()
719 }
720 
721 /// Poll the specified future until it completes on behalf of a guest->host call
722 /// using a sync-lowered import.
723 ///
724 /// This is similar to `Instance::first_poll` except it's for sync-lowered
725 /// imports, meaning we don't need to handle cancellation and we can block the
726 /// caller until the task completes, at which point the caller can handle
727 /// lowering the result to the guest's stack and linear memory.
728 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
729     store: &mut dyn VMStore,
730     future: impl Future<Output = Result<R>> + Send + 'static,
731     caller_instance: RuntimeComponentInstanceIndex,
732 ) -> Result<R> {
733     let state = store.concurrent_state_mut();
734 
735     // If there is no current guest thread set, that means the host function was
736     // registered using e.g. `LinkerInstance::func_wrap`, in which case it
737     // should complete immediately.
738     let Some(caller) = state.guest_thread else {
739         return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
740             Poll::Ready(result) => result,
741             Poll::Pending => {
742                 unreachable!()
743             }
744         };
745     };
746 
747     // Save any existing result stashed in `GuestTask::result` so we can replace
748     // it with the new result.
749     let old_result = state
750         .get_mut(caller.task)
751         .with_context(|| format!("bad handle: {caller:?}"))?
752         .result
753         .take();
754 
755     // Add a temporary host task into the table so we can track its progress.
756     // Note that we'll never allocate a waitable handle for the guest since
757     // we're being called synchronously.
758     let task = state.push(HostTask::new(caller_instance, None))?;
759 
760     log::trace!("new host task child of {caller:?}: {task:?}");
761 
762     // Wrap the future in a closure which will take care of stashing the result
763     // in `GuestTask::result` and resuming this fiber when the host task
764     // completes.
765     let mut future = Box::pin(async move {
766         let result = future.await?;
767         tls::get(move |store| {
768             let state = store.concurrent_state_mut();
769             state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
770 
771             Waitable::Host(task).set_event(
772                 state,
773                 Some(Event::Subtask {
774                     status: Status::Returned,
775                 }),
776             )?;
777 
778             Ok(())
779         })
780     }) as HostTaskFuture;
781 
782     // Finally, poll the future.  We can use a dummy `Waker` here because we'll
783     // add the future to `ConcurrentState::futures` and poll it automatically
784     // from the event loop if it doesn't complete immediately here.
785     let poll = tls::set(store, || {
786         future
787             .as_mut()
788             .poll(&mut Context::from_waker(&Waker::noop()))
789     });
790 
791     match poll {
792         Poll::Ready(result) => {
793             // It completed immediately; check the result and delete the task.
794             result?;
795             log::trace!("delete host task {task:?} (already ready)");
796             store.concurrent_state_mut().delete(task)?;
797         }
798         Poll::Pending => {
799             // It did not complete immediately; add it to
800             // `ConcurrentState::futures` so it will be polled via the event
801             // loop; then use `GuestTask::sync_call_set` to wait for the task to
802             // complete, suspending the current fiber until it does so.
803             let state = store.concurrent_state_mut();
804             state.push_future(future);
805 
806             let set = state.get_mut(caller.task)?.sync_call_set;
807             Waitable::Host(task).join(state, Some(set))?;
808 
809             store.suspend(SuspendReason::Waiting {
810                 set,
811                 thread: caller,
812                 skip_may_block_check: false,
813             })?;
814         }
815     }
816 
817     // Retrieve and return the result.
818     Ok(*mem::replace(
819         &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
820         old_result,
821     )
822     .unwrap()
823     .downcast()
824     .unwrap())
825 }
826 
827 /// Execute the specified guest call.
828 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
829     let mut next = Some(call);
830     while let Some(call) = next.take() {
831         match call.kind {
832             GuestCallKind::DeliverEvent { instance, set } => {
833                 let (event, waitable) = instance
834                     .get_event(store, call.thread.task, set, true)?
835                     .unwrap();
836                 let state = store.concurrent_state_mut();
837                 let task = state.get_mut(call.thread.task)?;
838                 let runtime_instance = task.instance;
839                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
840 
841                 log::trace!(
842                     "use callback to deliver event {event:?} to {:?} for {waitable:?}",
843                     call.thread,
844                 );
845 
846                 let old_thread = state.guest_thread.replace(call.thread);
847                 log::trace!(
848                     "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
849                     call.thread
850                 );
851 
852                 store.maybe_push_call_context(call.thread.task)?;
853 
854                 let state = store.concurrent_state_mut();
855                 state.enter_instance(runtime_instance);
856 
857                 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap();
858 
859                 let code = callback(store, runtime_instance, event, handle)?;
860 
861                 let state = store.concurrent_state_mut();
862 
863                 state.get_mut(call.thread.task)?.callback = Some(callback);
864                 state.exit_instance(runtime_instance)?;
865 
866                 store.maybe_pop_call_context(call.thread.task)?;
867 
868                 next = instance.handle_callback_code(store, call.thread, runtime_instance, code)?;
869 
870                 store.concurrent_state_mut().guest_thread = old_thread;
871                 log::trace!(
872                     "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
873                 );
874             }
875             GuestCallKind::StartImplicit(fun) => {
876                 next = fun(store)?;
877             }
878             GuestCallKind::StartExplicit(fun) => {
879                 fun(store)?;
880             }
881         }
882     }
883 
884     Ok(())
885 }
886 
887 impl<T> Store<T> {
888     /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
889     pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
890     where
891         T: Send + 'static,
892     {
893         self.as_context_mut().run_concurrent(fun).await
894     }
895 
896     #[doc(hidden)]
897     pub fn assert_concurrent_state_empty(&mut self) {
898         self.as_context_mut().assert_concurrent_state_empty();
899     }
900 
901     /// Convenience wrapper for [`StoreContextMut::spawn`].
902     pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
903     where
904         T: 'static,
905     {
906         self.as_context_mut().spawn(task)
907     }
908 }
909 
910 impl<T> StoreContextMut<'_, T> {
911     /// Assert that all the relevant tables and queues in the concurrent state
912     /// for this store are empty.
913     ///
914     /// This is for sanity checking in integration tests
915     /// (e.g. `component-async-tests`) that the relevant state has been cleared
916     /// after each test concludes.  This should help us catch leaks, e.g. guest
917     /// tasks which haven't been deleted despite having completed and having
918     /// been dropped by their supertasks.
919     #[doc(hidden)]
920     pub fn assert_concurrent_state_empty(self) {
921         let store = self.0;
922         store
923             .store_data_mut()
924             .components
925             .assert_guest_tables_empty();
926         let state = store.concurrent_state_mut();
927         assert!(
928             state.table.get_mut().is_empty(),
929             "non-empty table: {:?}",
930             state.table.get_mut()
931         );
932         assert!(state.high_priority.is_empty());
933         assert!(state.low_priority.is_empty());
934         assert!(state.guest_thread.is_none());
935         assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
936         assert!(
937             state
938                 .instance_states
939                 .iter()
940                 .all(|(_, state)| state.pending.is_empty())
941         );
942         assert!(state.global_error_context_ref_counts.is_empty());
943     }
944 
945     /// Spawn a background task to run as part of this instance's event loop.
946     ///
947     /// The task will receive an `&Accessor<U>` and run concurrently with
948     /// any other tasks in progress for the instance.
949     ///
950     /// Note that the task will only make progress if and when the event loop
951     /// for this instance is run.
952     ///
953     /// The returned [`SpawnHandle`] may be used to cancel the task.
954     pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
955     where
956         T: 'static,
957     {
958         let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
959         self.spawn_with_accessor(accessor, task)
960     }
961 
962     /// Internal implementation of `spawn` functions where a `store` is
963     /// available along with an `Accessor`.
964     fn spawn_with_accessor<D>(
965         self,
966         accessor: Accessor<T, D>,
967         task: impl AccessorTask<T, D>,
968     ) -> JoinHandle
969     where
970         T: 'static,
971         D: HasData + ?Sized,
972     {
973         // Create an "abortable future" here where internally the future will
974         // hook calls to poll and possibly spawn more background tasks on each
975         // iteration.
976         let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
977         self.0
978             .concurrent_state_mut()
979             .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
980         handle
981     }
982 
983     /// Run the specified closure `fun` to completion as part of this store's
984     /// event loop.
985     ///
986     /// This will run `fun` as part of this store's event loop until it
987     /// yields a result.  `fun` is provided an [`Accessor`], which provides
988     /// controlled access to the store and its data.
989     ///
990     /// This function can be used to invoke [`Func::call_concurrent`] for
991     /// example within the async closure provided here.
992     ///
993     /// # Store-blocking behavior
994     ///
995     ///
996     ///
997     /// At this time there are certain situations in which the `Future` returned
998     /// by the `AsyncFnOnce` passed to this function will not be polled for an
999     /// extended period of time, despite one or more `Waker::wake` events having
1000     /// occurred for the task to which it belongs.  This can manifest as the
1001     /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1002     /// the `Store` being held by e.g. a blocking host function, preventing the
1003     /// `Future` from being polled. A canonical example of this is when the
1004     /// `fun` provided to this function attempts to set a timeout for an
1005     /// invocation of a wasm function. In this situation the async closure is
1006     /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1007     /// to elapse. At this time this setup will not always work and the timeout
1008     /// may not reliably fire.
1009     ///
1010     /// This function will not block the current thread and as such is always
1011     /// suitable to run in an `async` context, but the current implementation of
1012     /// Wasmtime can lead to situations where a certain wasm computation is
1013     /// required to make progress the closure to make progress. This is an
1014     /// artifact of Wasmtime's historical implementation of `async` functions
1015     /// and is the topic of [#11869] and [#11870]. In the timeout example from
1016     /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1017     /// progress for a readiness notification of (b) to get delivered.
1018     ///
1019     /// This effectively means that it's not possible to reliably perform a
1020     /// "select" operation within the `fun` closure, which timeouts for example
1021     /// are based on. Fixing this requires some relatively major refactoring
1022     /// work within Wasmtime itself. This is a known pitfall otherwise and one
1023     /// that is intended to be fixed one day. In the meantime it's recommended
1024     /// to apply timeouts or such to the entire `run_concurrent` call itself
1025     /// rather than internally.
1026     ///
1027     /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1028     /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1029     ///
1030     /// # Example
1031     ///
1032     /// ```
1033     /// # use {
1034     /// #   anyhow::{Result},
1035     /// #   wasmtime::{
1036     /// #     component::{ Component, Linker, Resource, ResourceTable},
1037     /// #     Config, Engine, Store
1038     /// #   },
1039     /// # };
1040     /// #
1041     /// # struct MyResource(u32);
1042     /// # struct Ctx { table: ResourceTable }
1043     /// #
1044     /// # async fn foo() -> Result<()> {
1045     /// # let mut config = Config::new();
1046     /// # let engine = Engine::new(&config)?;
1047     /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1048     /// # let mut linker = Linker::new(&engine);
1049     /// # let component = Component::new(&engine, "")?;
1050     /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1051     /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1052     /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1053     /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1054     ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1055     ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1056     ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1057     ///    bar.call_concurrent(accessor, (value.0,)).await?;
1058     ///    Ok(())
1059     /// }).await??;
1060     /// # Ok(())
1061     /// # }
1062     /// ```
1063     pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1064     where
1065         T: Send + 'static,
1066     {
1067         self.do_run_concurrent(fun, false).await
1068     }
1069 
1070     pub(super) async fn run_concurrent_trap_on_idle<R>(
1071         self,
1072         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1073     ) -> Result<R>
1074     where
1075         T: Send + 'static,
1076     {
1077         self.do_run_concurrent(fun, true).await
1078     }
1079 
1080     async fn do_run_concurrent<R>(
1081         mut self,
1082         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1083         trap_on_idle: bool,
1084     ) -> Result<R>
1085     where
1086         T: Send + 'static,
1087     {
1088         check_recursive_run();
1089         let token = StoreToken::new(self.as_context_mut());
1090 
1091         struct Dropper<'a, T: 'static, V> {
1092             store: StoreContextMut<'a, T>,
1093             value: ManuallyDrop<V>,
1094         }
1095 
1096         impl<'a, T, V> Drop for Dropper<'a, T, V> {
1097             fn drop(&mut self) {
1098                 tls::set(self.store.0, || {
1099                     // SAFETY: Here we drop the value without moving it for the
1100                     // first and only time -- per the contract for `Drop::drop`,
1101                     // this code won't run again, and the `value` field will no
1102                     // longer be accessible.
1103                     unsafe { ManuallyDrop::drop(&mut self.value) }
1104                 });
1105             }
1106         }
1107 
1108         let accessor = &Accessor::new(token);
1109         let dropper = &mut Dropper {
1110             store: self,
1111             value: ManuallyDrop::new(fun(accessor)),
1112         };
1113         // SAFETY: We never move `dropper` nor its `value` field.
1114         let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1115 
1116         dropper
1117             .store
1118             .as_context_mut()
1119             .poll_until(future, trap_on_idle)
1120             .await
1121     }
1122 
1123     /// Run this store's event loop.
1124     ///
1125     /// The returned future will resolve when the specified future completes or,
1126     /// if `trap_on_idle` is true, when the event loop can't make further
1127     /// progress.
1128     async fn poll_until<R>(
1129         mut self,
1130         mut future: Pin<&mut impl Future<Output = R>>,
1131         trap_on_idle: bool,
1132     ) -> Result<R>
1133     where
1134         T: Send + 'static,
1135     {
1136         struct Reset<'a, T: 'static> {
1137             store: StoreContextMut<'a, T>,
1138             futures: Option<FuturesUnordered<HostTaskFuture>>,
1139         }
1140 
1141         impl<'a, T> Drop for Reset<'a, T> {
1142             fn drop(&mut self) {
1143                 if let Some(futures) = self.futures.take() {
1144                     *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1145                 }
1146             }
1147         }
1148 
1149         loop {
1150             // Take `ConcurrentState::futures` out of the store so we can poll
1151             // it while also safely giving any of the futures inside access to
1152             // `self`.
1153             let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1154             let mut reset = Reset {
1155                 store: self.as_context_mut(),
1156                 futures,
1157             };
1158             let mut next = pin!(reset.futures.as_mut().unwrap().next());
1159 
1160             let result = future::poll_fn(|cx| {
1161                 // First, poll the future we were passed as an argument and
1162                 // return immediately if it's ready.
1163                 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1164                     return Poll::Ready(Ok(Either::Left(value)));
1165                 }
1166 
1167                 // Next, poll `ConcurrentState::futures` (which includes any
1168                 // pending host tasks and/or background tasks), returning
1169                 // immediately if one of them fails.
1170                 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1171                     Poll::Ready(Some(output)) => {
1172                         match output {
1173                             Err(e) => return Poll::Ready(Err(e)),
1174                             Ok(()) => {}
1175                         }
1176                         Poll::Ready(true)
1177                     }
1178                     Poll::Ready(None) => Poll::Ready(false),
1179                     Poll::Pending => Poll::Pending,
1180                 };
1181 
1182                 // Next, check the "high priority" work queue and return
1183                 // immediately if it has at least one item.
1184                 let state = reset.store.0.concurrent_state_mut();
1185                 let ready = mem::take(&mut state.high_priority);
1186                 let ready = if ready.is_empty() {
1187                     // Next, check the "low priority" work queue and return
1188                     // immediately if it has at least one item.
1189                     let ready = mem::take(&mut state.low_priority);
1190                     if ready.is_empty() {
1191                         return match next {
1192                             Poll::Ready(true) => {
1193                                 // In this case, one of the futures in
1194                                 // `ConcurrentState::futures` completed
1195                                 // successfully, so we return now and continue
1196                                 // the outer loop in case there is another one
1197                                 // ready to complete.
1198                                 Poll::Ready(Ok(Either::Right(Vec::new())))
1199                             }
1200                             Poll::Ready(false) => {
1201                                 // Poll the future we were passed one last time
1202                                 // in case one of `ConcurrentState::futures` had
1203                                 // the side effect of unblocking it.
1204                                 if let Poll::Ready(value) =
1205                                     tls::set(reset.store.0, || future.as_mut().poll(cx))
1206                                 {
1207                                     Poll::Ready(Ok(Either::Left(value)))
1208                                 } else {
1209                                     // In this case, there are no more pending
1210                                     // futures in `ConcurrentState::futures`,
1211                                     // there are no remaining work items, _and_
1212                                     // the future we were passed as an argument
1213                                     // still hasn't completed.
1214                                     if trap_on_idle {
1215                                         // `trap_on_idle` is true, so we exit
1216                                         // immediately.
1217                                         Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1218                                     } else {
1219                                         // `trap_on_idle` is false, so we assume
1220                                         // that future will wake up and give us
1221                                         // more work to do when it's ready to.
1222                                         Poll::Pending
1223                                     }
1224                                 }
1225                             }
1226                             // There is at least one pending future in
1227                             // `ConcurrentState::futures` and we have nothing
1228                             // else to do but wait for now, so we return
1229                             // `Pending`.
1230                             Poll::Pending => Poll::Pending,
1231                         };
1232                     } else {
1233                         ready
1234                     }
1235                 } else {
1236                     ready
1237                 };
1238 
1239                 Poll::Ready(Ok(Either::Right(ready)))
1240             })
1241             .await;
1242 
1243             // Put the `ConcurrentState::futures` back into the store before we
1244             // return or handle any work items since one or more of those items
1245             // might append more futures.
1246             drop(reset);
1247 
1248             match result? {
1249                 // The future we were passed as an argument completed, so we
1250                 // return the result.
1251                 Either::Left(value) => break Ok(value),
1252                 // The future we were passed has not yet completed, so handle
1253                 // any work items and then loop again.
1254                 Either::Right(ready) => {
1255                     struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1256                         store: StoreContextMut<'a, T>,
1257                         ready: I,
1258                     }
1259 
1260                     impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1261                         fn drop(&mut self) {
1262                             while let Some(item) = self.ready.next() {
1263                                 match item {
1264                                     WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1265                                     WorkItem::PushFuture(future) => {
1266                                         tls::set(self.store.0, move || drop(future))
1267                                     }
1268                                     _ => {}
1269                                 }
1270                             }
1271                         }
1272                     }
1273 
1274                     let mut dispose = Dispose {
1275                         store: self.as_context_mut(),
1276                         ready: ready.into_iter(),
1277                     };
1278 
1279                     while let Some(item) = dispose.ready.next() {
1280                         dispose
1281                             .store
1282                             .as_context_mut()
1283                             .handle_work_item(item)
1284                             .await?;
1285                     }
1286                 }
1287             }
1288         }
1289     }
1290 
1291     /// Handle the specified work item, possibly resuming a fiber if applicable.
1292     async fn handle_work_item(self, item: WorkItem) -> Result<()>
1293     where
1294         T: Send,
1295     {
1296         log::trace!("handle work item {item:?}");
1297         match item {
1298             WorkItem::PushFuture(future) => {
1299                 self.0
1300                     .concurrent_state_mut()
1301                     .futures
1302                     .get_mut()
1303                     .as_mut()
1304                     .unwrap()
1305                     .push(future.into_inner());
1306             }
1307             WorkItem::ResumeFiber(fiber) => {
1308                 self.0.resume_fiber(fiber).await?;
1309             }
1310             WorkItem::GuestCall(call) => {
1311                 let state = self.0.concurrent_state_mut();
1312                 if call.is_ready(state)? {
1313                     self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1314                 } else {
1315                     let task = state.get_mut(call.thread.task)?;
1316                     if !task.starting_sent {
1317                         task.starting_sent = true;
1318                         if let GuestCallKind::StartImplicit(_) = &call.kind {
1319                             Waitable::Guest(call.thread.task).set_event(
1320                                 state,
1321                                 Some(Event::Subtask {
1322                                     status: Status::Starting,
1323                                 }),
1324                             )?;
1325                         }
1326                     }
1327 
1328                     let runtime_instance = state.get_mut(call.thread.task)?.instance;
1329                     state
1330                         .instance_state(runtime_instance)
1331                         .pending
1332                         .insert(call.thread, call.kind);
1333                 }
1334             }
1335             WorkItem::Poll(params) => {
1336                 let state = self.0.concurrent_state_mut();
1337                 if state.get_mut(params.thread.task)?.event.is_some()
1338                     || !state.get_mut(params.set)?.ready.is_empty()
1339                 {
1340                     // There's at least one event immediately available; deliver
1341                     // it to the guest ASAP.
1342                     state.push_high_priority(WorkItem::GuestCall(GuestCall {
1343                         thread: params.thread,
1344                         kind: GuestCallKind::DeliverEvent {
1345                             instance: params.instance,
1346                             set: Some(params.set),
1347                         },
1348                     }));
1349                 } else {
1350                     // There are no events immediately available; deliver
1351                     // `Event::None` to the guest.
1352                     state.get_mut(params.thread.task)?.event = Some(Event::None);
1353                     state.push_high_priority(WorkItem::GuestCall(GuestCall {
1354                         thread: params.thread,
1355                         kind: GuestCallKind::DeliverEvent {
1356                             instance: params.instance,
1357                             set: Some(params.set),
1358                         },
1359                     }));
1360                 }
1361             }
1362             WorkItem::WorkerFunction(fun) => {
1363                 self.run_on_worker(WorkerItem::Function(fun)).await?;
1364             }
1365         }
1366 
1367         Ok(())
1368     }
1369 
1370     /// Execute the specified guest call on a worker fiber.
1371     async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1372     where
1373         T: Send,
1374     {
1375         let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1376             fiber
1377         } else {
1378             fiber::make_fiber(self.0, move |store| {
1379                 loop {
1380                     match store.concurrent_state_mut().worker_item.take().unwrap() {
1381                         WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1382                         WorkerItem::Function(fun) => fun.into_inner()(store)?,
1383                     }
1384 
1385                     store.suspend(SuspendReason::NeedWork)?;
1386                 }
1387             })?
1388         };
1389 
1390         let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1391         assert!(worker_item.is_none());
1392         *worker_item = Some(item);
1393 
1394         self.0.resume_fiber(worker).await
1395     }
1396 
1397     /// Wrap the specified host function in a future which will call it, passing
1398     /// it an `&Accessor<T>`.
1399     ///
1400     /// See the `Accessor` documentation for details.
1401     pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1402     where
1403         T: 'static,
1404         F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1405             + Send
1406             + Sync
1407             + 'static,
1408         R: Send + Sync + 'static,
1409     {
1410         let token = StoreToken::new(self);
1411         async move {
1412             let mut accessor = Accessor::new(token);
1413             closure(&mut accessor).await
1414         }
1415     }
1416 }
1417 
1418 impl StoreOpaque {
1419     /// Resume the specified fiber, giving it exclusive access to the specified
1420     /// store.
1421     async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1422         let old_thread = self.concurrent_state_mut().guest_thread;
1423         log::trace!("resume_fiber: save current thread {old_thread:?}");
1424 
1425         let fiber = fiber::resolve_or_release(self, fiber).await?;
1426 
1427         let state = self.concurrent_state_mut();
1428 
1429         state.guest_thread = old_thread;
1430         if let Some(ref ot) = old_thread {
1431             state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1432         }
1433         log::trace!("resume_fiber: restore current thread {old_thread:?}");
1434 
1435         if let Some(mut fiber) = fiber {
1436             log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1437             // See the `SuspendReason` documentation for what each case means.
1438             match state.suspend_reason.take().unwrap() {
1439                 SuspendReason::NeedWork => {
1440                     if state.worker.is_none() {
1441                         state.worker = Some(fiber);
1442                     } else {
1443                         fiber.dispose(self);
1444                     }
1445                 }
1446                 SuspendReason::Yielding { thread, .. } => {
1447                     state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1448                     state.push_low_priority(WorkItem::ResumeFiber(fiber));
1449                 }
1450                 SuspendReason::ExplicitlySuspending { thread, .. } => {
1451                     state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1452                 }
1453                 SuspendReason::Waiting { set, thread, .. } => {
1454                     let old = state
1455                         .get_mut(set)?
1456                         .waiting
1457                         .insert(thread, WaitMode::Fiber(fiber));
1458                     assert!(old.is_none());
1459                 }
1460             };
1461         } else {
1462             log::trace!("resume_fiber: fiber has exited");
1463         }
1464 
1465         Ok(())
1466     }
1467 
1468     /// Suspend the current fiber, storing the reason in
1469     /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1470     /// it should be resumed.
1471     ///
1472     /// See the `SuspendReason` documentation for details.
1473     fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1474         log::trace!("suspend fiber: {reason:?}");
1475 
1476         // If we're yielding or waiting on behalf of a guest thread, we'll need to
1477         // pop the call context which manages resource borrows before suspending
1478         // and then push it again once we've resumed.
1479         let task = match &reason {
1480             SuspendReason::Yielding { thread, .. }
1481             | SuspendReason::Waiting { thread, .. }
1482             | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1483             SuspendReason::NeedWork => None,
1484         };
1485 
1486         let old_guest_thread = if let Some(task) = task {
1487             self.maybe_pop_call_context(task)?;
1488             self.concurrent_state_mut().guest_thread
1489         } else {
1490             None
1491         };
1492 
1493         // We should not have reached here unless either there's no current
1494         // task, or the current task is permitted to block.  In addition, we
1495         // special-case `thread.switch-to` and waiting for a subtask to go from
1496         // `starting` to `started`, both of which we consider non-blocking
1497         // operations despite requiring a suspend.
1498         assert!(
1499             matches!(
1500                 reason,
1501                 SuspendReason::ExplicitlySuspending {
1502                     skip_may_block_check: true,
1503                     ..
1504                 } | SuspendReason::Waiting {
1505                     skip_may_block_check: true,
1506                     ..
1507                 }
1508             ) || old_guest_thread
1509                 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1510                 .unwrap_or(true)
1511         );
1512 
1513         let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1514         assert!(suspend_reason.is_none());
1515         *suspend_reason = Some(reason);
1516 
1517         self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1518 
1519         if let Some(task) = task {
1520             self.concurrent_state_mut().guest_thread = old_guest_thread;
1521             self.maybe_push_call_context(task)?;
1522         }
1523 
1524         Ok(())
1525     }
1526 
1527     /// Push the call context for managing resource borrows for the specified
1528     /// guest task if it has not yet either returned a result or cancelled
1529     /// itself.
1530     fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1531         let task = self.concurrent_state_mut().get_mut(guest_task)?;
1532 
1533         if !task.returned_or_cancelled() {
1534             log::trace!("push call context for {guest_task:?}");
1535             let call_context = task.call_context.take().unwrap();
1536             self.component_resource_state().0.push(call_context);
1537         }
1538         Ok(())
1539     }
1540 
1541     /// Pop the call context for managing resource borrows for the specified
1542     /// guest task if it has not yet either returned a result or cancelled
1543     /// itself.
1544     fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1545         if !self
1546             .concurrent_state_mut()
1547             .get_mut(guest_task)?
1548             .returned_or_cancelled()
1549         {
1550             log::trace!("pop call context for {guest_task:?}");
1551             let call_context = Some(self.component_resource_state().0.pop().unwrap());
1552             self.concurrent_state_mut()
1553                 .get_mut(guest_task)?
1554                 .call_context = call_context;
1555         }
1556         Ok(())
1557     }
1558 
1559     fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1560         let state = self.concurrent_state_mut();
1561         let caller = state.guest_thread.unwrap();
1562         let old_set = waitable.common(state)?.set;
1563         let set = state.get_mut(caller.task)?.sync_call_set;
1564         waitable.join(state, Some(set))?;
1565         self.suspend(SuspendReason::Waiting {
1566             set,
1567             thread: caller,
1568             skip_may_block_check: false,
1569         })?;
1570         let state = self.concurrent_state_mut();
1571         waitable.join(state, old_set)
1572     }
1573 }
1574 
1575 impl Instance {
1576     /// Get the next pending event for the specified task and (optional)
1577     /// waitable set, along with the waitable handle if applicable.
1578     fn get_event(
1579         self,
1580         store: &mut StoreOpaque,
1581         guest_task: TableId<GuestTask>,
1582         set: Option<TableId<WaitableSet>>,
1583         cancellable: bool,
1584     ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1585         let state = store.concurrent_state_mut();
1586 
1587         if let Some(event) = state.get_mut(guest_task)?.event.take() {
1588             log::trace!("deliver event {event:?} to {guest_task:?}");
1589 
1590             if cancellable || !matches!(event, Event::Cancelled) {
1591                 return Ok(Some((event, None)));
1592             } else {
1593                 state.get_mut(guest_task)?.event = Some(event);
1594             }
1595         }
1596 
1597         Ok(
1598             if let Some((set, waitable)) = set
1599                 .and_then(|set| {
1600                     state
1601                         .get_mut(set)
1602                         .map(|v| v.ready.pop_first().map(|v| (set, v)))
1603                         .transpose()
1604                 })
1605                 .transpose()?
1606             {
1607                 let common = waitable.common(state)?;
1608                 let handle = common.handle.unwrap();
1609                 let event = common.event.take().unwrap();
1610 
1611                 log::trace!(
1612                     "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1613                 );
1614 
1615                 waitable.on_delivery(store, self, event);
1616 
1617                 Some((event, Some((waitable, handle))))
1618             } else {
1619                 None
1620             },
1621         )
1622     }
1623 
1624     /// Handle the `CallbackCode` returned from an async-lifted export or its
1625     /// callback.
1626     ///
1627     /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1628     /// using `handle_guest_call`.
1629     fn handle_callback_code(
1630         self,
1631         store: &mut StoreOpaque,
1632         guest_thread: QualifiedThreadId,
1633         runtime_instance: RuntimeComponentInstanceIndex,
1634         code: u32,
1635     ) -> Result<Option<GuestCall>> {
1636         let (code, set) = unpack_callback_code(code);
1637 
1638         log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1639 
1640         let state = store.concurrent_state_mut();
1641 
1642         let get_set = |store, handle| {
1643             if handle == 0 {
1644                 bail!("invalid waitable-set handle");
1645             }
1646 
1647             let set = self.id().get_mut(store).guest_tables().0[runtime_instance]
1648                 .waitable_set_rep(handle)?;
1649 
1650             Ok(TableId::<WaitableSet>::new(set))
1651         };
1652 
1653         Ok(match code {
1654             callback_code::EXIT => {
1655                 log::trace!("implicit thread {guest_thread:?} completed");
1656                 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1657                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1658                 if task.threads.is_empty() && !task.returned_or_cancelled() {
1659                     bail!(Trap::NoAsyncResult);
1660                 }
1661                 match &task.caller {
1662                     Caller::Host { .. } => {
1663                         if task.ready_to_delete() {
1664                             Waitable::Guest(guest_thread.task)
1665                                 .delete_from(store.concurrent_state_mut())?;
1666                         }
1667                     }
1668                     Caller::Guest { .. } => {
1669                         task.exited = true;
1670                         task.callback = None;
1671                     }
1672                 }
1673                 None
1674             }
1675             callback_code::YIELD => {
1676                 let task = state.get_mut(guest_thread.task)?;
1677                 assert!(task.event.is_none());
1678                 task.event = Some(Event::None);
1679                 let call = GuestCall {
1680                     thread: guest_thread,
1681                     kind: GuestCallKind::DeliverEvent {
1682                         instance: self,
1683                         set: None,
1684                     },
1685                 };
1686                 if state.may_block(guest_thread.task) {
1687                     // Push this thread onto the "low priority" queue so it runs
1688                     // after any other threads have had a chance to run.
1689                     state.push_low_priority(WorkItem::GuestCall(call));
1690                     None
1691                 } else {
1692                     // Yielding in a non-blocking context is defined as a no-op
1693                     // according to the spec, so we must run this thread
1694                     // immediately without allowing any others to run.
1695                     Some(call)
1696                 }
1697             }
1698             callback_code::WAIT | callback_code::POLL => {
1699                 // The task may only return `WAIT` or `POLL` if it was created
1700                 // for a call to an async export).  Otherwise, we'll trap.
1701                 state.check_blocking_for(guest_thread.task)?;
1702 
1703                 let set = get_set(store, set)?;
1704                 let state = store.concurrent_state_mut();
1705 
1706                 if state.get_mut(guest_thread.task)?.event.is_some()
1707                     || !state.get_mut(set)?.ready.is_empty()
1708                 {
1709                     // An event is immediately available; deliver it ASAP.
1710                     state.push_high_priority(WorkItem::GuestCall(GuestCall {
1711                         thread: guest_thread,
1712                         kind: GuestCallKind::DeliverEvent {
1713                             instance: self,
1714                             set: Some(set),
1715                         },
1716                     }));
1717                 } else {
1718                     // No event is immediately available.
1719                     match code {
1720                         callback_code::POLL => {
1721                             // We're polling, so just yield and check whether an
1722                             // event has arrived after that.
1723                             state.push_low_priority(WorkItem::Poll(PollParams {
1724                                 instance: self,
1725                                 thread: guest_thread,
1726                                 set,
1727                             }));
1728                         }
1729                         callback_code::WAIT => {
1730                             // We're waiting, so register to be woken up when an
1731                             // event is published for this waitable set.
1732                             //
1733                             // Here we also set `GuestTask::wake_on_cancel`
1734                             // which allows `subtask.cancel` to interrupt the
1735                             // wait.
1736                             let old = state
1737                                 .get_mut(guest_thread.thread)?
1738                                 .wake_on_cancel
1739                                 .replace(set);
1740                             assert!(old.is_none());
1741                             let old = state
1742                                 .get_mut(set)?
1743                                 .waiting
1744                                 .insert(guest_thread, WaitMode::Callback(self));
1745                             assert!(old.is_none());
1746                         }
1747                         _ => unreachable!(),
1748                     }
1749                 }
1750                 None
1751             }
1752             _ => bail!("unsupported callback code: {code}"),
1753         })
1754     }
1755 
1756     fn cleanup_thread(
1757         self,
1758         store: &mut StoreOpaque,
1759         guest_thread: QualifiedThreadId,
1760         runtime_instance: RuntimeComponentInstanceIndex,
1761     ) -> Result<()> {
1762         let guest_id = store
1763             .concurrent_state_mut()
1764             .get_mut(guest_thread.thread)?
1765             .instance_rep;
1766         self.id().get_mut(store).guest_tables().0[runtime_instance]
1767             .guest_thread_remove(guest_id.unwrap())?;
1768 
1769         store.concurrent_state_mut().delete(guest_thread.thread)?;
1770         let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1771         task.threads.remove(&guest_thread.thread);
1772         Ok(())
1773     }
1774 
1775     /// Add the specified guest call to the "high priority" work item queue, to
1776     /// be started as soon as backpressure and/or reentrance rules allow.
1777     ///
1778     /// SAFETY: The raw pointer arguments must be valid references to guest
1779     /// functions (with the appropriate signatures) when the closures queued by
1780     /// this function are called.
1781     unsafe fn queue_call<T: 'static>(
1782         self,
1783         mut store: StoreContextMut<T>,
1784         guest_thread: QualifiedThreadId,
1785         callee: SendSyncPtr<VMFuncRef>,
1786         param_count: usize,
1787         result_count: usize,
1788         flags: Option<InstanceFlags>,
1789         async_: bool,
1790         callback: Option<SendSyncPtr<VMFuncRef>>,
1791         post_return: Option<SendSyncPtr<VMFuncRef>>,
1792     ) -> Result<()> {
1793         /// Return a closure which will call the specified function in the scope
1794         /// of the specified task.
1795         ///
1796         /// This will use `GuestTask::lower_params` to lower the parameters, but
1797         /// will not lift the result; instead, it returns a
1798         /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1799         /// any, may be lifted.  Note that an async-lifted export will have
1800         /// returned its result using the `task.return` intrinsic (or not
1801         /// returned a result at all, in the case of `task.cancel`), in which
1802         /// case the "result" of this call will either be a callback code or
1803         /// nothing.
1804         ///
1805         /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1806         /// the returned closure is called.
1807         unsafe fn make_call<T: 'static>(
1808             store: StoreContextMut<T>,
1809             guest_thread: QualifiedThreadId,
1810             callee: SendSyncPtr<VMFuncRef>,
1811             param_count: usize,
1812             result_count: usize,
1813             flags: Option<InstanceFlags>,
1814         ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1815         + Send
1816         + Sync
1817         + 'static
1818         + use<T> {
1819             let token = StoreToken::new(store);
1820             move |store: &mut dyn VMStore| {
1821                 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1822 
1823                 store
1824                     .concurrent_state_mut()
1825                     .get_mut(guest_thread.thread)?
1826                     .state = GuestThreadState::Running;
1827                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1828                 let may_enter_after_call = task.call_post_return_automatically();
1829                 let lower = task.lower_params.take().unwrap();
1830 
1831                 lower(store, &mut storage[..param_count])?;
1832 
1833                 let mut store = token.as_context_mut(store);
1834 
1835                 // SAFETY: Per the contract documented in `make_call's`
1836                 // documentation, `callee` must be a valid pointer.
1837                 unsafe {
1838                     if let Some(mut flags) = flags {
1839                         flags.set_may_enter(false);
1840                     }
1841                     crate::Func::call_unchecked_raw(
1842                         &mut store,
1843                         callee.as_non_null(),
1844                         NonNull::new(
1845                             &mut storage[..param_count.max(result_count)]
1846                                 as *mut [MaybeUninit<ValRaw>] as _,
1847                         )
1848                         .unwrap(),
1849                     )?;
1850                     if let Some(mut flags) = flags {
1851                         flags.set_may_enter(may_enter_after_call);
1852                     }
1853                 }
1854 
1855                 Ok(storage)
1856             }
1857         }
1858 
1859         // SAFETY: Per the contract described in this function documentation,
1860         // the `callee` pointer which `call` closes over must be valid when
1861         // called by the closure we queue below.
1862         let call = unsafe {
1863             make_call(
1864                 store.as_context_mut(),
1865                 guest_thread,
1866                 callee,
1867                 param_count,
1868                 result_count,
1869                 flags,
1870             )
1871         };
1872 
1873         let callee_instance = store
1874             .0
1875             .concurrent_state_mut()
1876             .get_mut(guest_thread.task)?
1877             .instance;
1878         let fun = if callback.is_some() {
1879             assert!(async_);
1880 
1881             Box::new(move |store: &mut dyn VMStore| {
1882                 self.add_guest_thread_to_instance_table(
1883                     guest_thread.thread,
1884                     store,
1885                     callee_instance,
1886                 )?;
1887                 let old_thread = store
1888                     .concurrent_state_mut()
1889                     .guest_thread
1890                     .replace(guest_thread);
1891                 log::trace!(
1892                     "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1893                 );
1894 
1895                 store.maybe_push_call_context(guest_thread.task)?;
1896 
1897                 store.concurrent_state_mut().enter_instance(callee_instance);
1898 
1899                 // SAFETY: See the documentation for `make_call` to review the
1900                 // contract we must uphold for `call` here.
1901                 //
1902                 // Per the contract described in the `queue_call`
1903                 // documentation, the `callee` pointer which `call` closes
1904                 // over must be valid.
1905                 let storage = call(store)?;
1906 
1907                 store
1908                     .concurrent_state_mut()
1909                     .exit_instance(callee_instance)?;
1910 
1911                 store.maybe_pop_call_context(guest_thread.task)?;
1912 
1913                 let state = store.concurrent_state_mut();
1914                 state.guest_thread = old_thread;
1915                 old_thread
1916                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1917                 log::trace!("stackless call: restored {old_thread:?} as current thread");
1918 
1919                 // SAFETY: `wasmparser` will have validated that the callback
1920                 // function returns a `i32` result.
1921                 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1922 
1923                 self.handle_callback_code(store, guest_thread, callee_instance, code)
1924             })
1925                 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
1926         } else {
1927             let token = StoreToken::new(store.as_context_mut());
1928             Box::new(move |store: &mut dyn VMStore| {
1929                 self.add_guest_thread_to_instance_table(
1930                     guest_thread.thread,
1931                     store,
1932                     callee_instance,
1933                 )?;
1934                 let old_thread = store
1935                     .concurrent_state_mut()
1936                     .guest_thread
1937                     .replace(guest_thread);
1938                 log::trace!(
1939                     "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1940                 );
1941                 let mut flags = self.id().get(store).instance_flags(callee_instance);
1942 
1943                 store.maybe_push_call_context(guest_thread.task)?;
1944 
1945                 // Unless this is a callback-less (i.e. stackful)
1946                 // async-lifted export, we need to record that the instance
1947                 // cannot be entered until the call returns.
1948                 if !async_ {
1949                     store.concurrent_state_mut().enter_instance(callee_instance);
1950                 }
1951 
1952                 // SAFETY: See the documentation for `make_call` to review the
1953                 // contract we must uphold for `call` here.
1954                 //
1955                 // Per the contract described in the `queue_call`
1956                 // documentation, the `callee` pointer which `call` closes
1957                 // over must be valid.
1958                 let storage = call(store)?;
1959 
1960                 // This is a callback-less call, so the implicit thread has now completed
1961                 self.cleanup_thread(store, guest_thread, callee_instance)?;
1962 
1963                 if async_ {
1964                     let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1965                     if task.threads.is_empty() && !task.returned_or_cancelled() {
1966                         bail!(Trap::NoAsyncResult);
1967                     }
1968                 } else {
1969                     // This is a sync-lifted export, so now is when we lift the
1970                     // result, optionally call the post-return function, if any,
1971                     // and finally notify any current or future waiters that the
1972                     // subtask has returned.
1973 
1974                     let lift = {
1975                         let state = store.concurrent_state_mut();
1976                         state.exit_instance(callee_instance)?;
1977 
1978                         assert!(state.get_mut(guest_thread.task)?.result.is_none());
1979 
1980                         state
1981                             .get_mut(guest_thread.task)?
1982                             .lift_result
1983                             .take()
1984                             .unwrap()
1985                     };
1986 
1987                     // SAFETY: `result_count` represents the number of core Wasm
1988                     // results returned, per `wasmparser`.
1989                     let result = (lift.lift)(store, unsafe {
1990                         mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
1991                             &storage[..result_count],
1992                         )
1993                     })?;
1994 
1995                     let post_return_arg = match result_count {
1996                         0 => ValRaw::i32(0),
1997                         // SAFETY: `result_count` represents the number of
1998                         // core Wasm results returned, per `wasmparser`.
1999                         1 => unsafe { storage[0].assume_init() },
2000                         _ => unreachable!(),
2001                     };
2002 
2003                     if store
2004                         .concurrent_state_mut()
2005                         .get_mut(guest_thread.task)?
2006                         .call_post_return_automatically()
2007                     {
2008                         unsafe {
2009                             flags.set_may_leave(false);
2010                             flags.set_needs_post_return(false);
2011                         }
2012 
2013                         if let Some(func) = post_return {
2014                             let mut store = token.as_context_mut(store);
2015 
2016                             // SAFETY: `func` is a valid `*mut VMFuncRef` from
2017                             // either `wasmtime-cranelift`-generated fused adapter
2018                             // code or `component::Options`.  Per `wasmparser`
2019                             // post-return signature validation, we know it takes a
2020                             // single parameter.
2021                             unsafe {
2022                                 crate::Func::call_unchecked_raw(
2023                                     &mut store,
2024                                     func.as_non_null(),
2025                                     slice::from_ref(&post_return_arg).into(),
2026                                 )?;
2027                             }
2028                         }
2029 
2030                         unsafe {
2031                             flags.set_may_leave(true);
2032                             flags.set_may_enter(true);
2033                         }
2034                     }
2035 
2036                     self.task_complete(
2037                         store,
2038                         guest_thread.task,
2039                         result,
2040                         Status::Returned,
2041                         post_return_arg,
2042                     )?;
2043                 }
2044 
2045                 store.maybe_pop_call_context(guest_thread.task)?;
2046 
2047                 let state = store.concurrent_state_mut();
2048                 let task = state.get_mut(guest_thread.task)?;
2049 
2050                 match &task.caller {
2051                     Caller::Host { .. } => {
2052                         if task.ready_to_delete() {
2053                             Waitable::Guest(guest_thread.task).delete_from(state)?;
2054                         }
2055                     }
2056                     Caller::Guest { .. } => {
2057                         task.exited = true;
2058                     }
2059                 }
2060 
2061                 Ok(None)
2062             })
2063         };
2064 
2065         store
2066             .0
2067             .concurrent_state_mut()
2068             .push_high_priority(WorkItem::GuestCall(GuestCall {
2069                 thread: guest_thread,
2070                 kind: GuestCallKind::StartImplicit(fun),
2071             }));
2072 
2073         Ok(())
2074     }
2075 
2076     /// Prepare (but do not start) a guest->guest call.
2077     ///
2078     /// This is called from fused adapter code generated in
2079     /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2080     /// are synthesized Wasm functions which move the parameters from the caller
2081     /// to the callee and the result from the callee to the caller,
2082     /// respectively.  The adapter will call `Self::start_call` immediately
2083     /// after calling this function.
2084     ///
2085     /// SAFETY: All the pointer arguments must be valid pointers to guest
2086     /// entities (and with the expected signatures for the function references
2087     /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2088     unsafe fn prepare_call<T: 'static>(
2089         self,
2090         mut store: StoreContextMut<T>,
2091         start: *mut VMFuncRef,
2092         return_: *mut VMFuncRef,
2093         caller_instance: RuntimeComponentInstanceIndex,
2094         callee_instance: RuntimeComponentInstanceIndex,
2095         task_return_type: TypeTupleIndex,
2096         callee_async: bool,
2097         memory: *mut VMMemoryDefinition,
2098         string_encoding: u8,
2099         caller_info: CallerInfo,
2100     ) -> Result<()> {
2101         self.id().get(store.0).check_may_leave(caller_instance)?;
2102 
2103         if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2104             // A task may only call an async-typed function via a sync lower if
2105             // it was created by a call to an async export.  Otherwise, we'll
2106             // trap.
2107             store.0.concurrent_state_mut().check_blocking()?;
2108         }
2109 
2110         enum ResultInfo {
2111             Heap { results: u32 },
2112             Stack { result_count: u32 },
2113         }
2114 
2115         let result_info = match &caller_info {
2116             CallerInfo::Async {
2117                 has_result: true,
2118                 params,
2119             } => ResultInfo::Heap {
2120                 results: params.last().unwrap().get_u32(),
2121             },
2122             CallerInfo::Async {
2123                 has_result: false, ..
2124             } => ResultInfo::Stack { result_count: 0 },
2125             CallerInfo::Sync {
2126                 result_count,
2127                 params,
2128             } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2129                 results: params.last().unwrap().get_u32(),
2130             },
2131             CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2132                 result_count: *result_count,
2133             },
2134         };
2135 
2136         let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2137 
2138         // Create a new guest task for the call, closing over the `start` and
2139         // `return_` functions to lift the parameters and lower the result,
2140         // respectively.
2141         let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2142         let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2143         let token = StoreToken::new(store.as_context_mut());
2144         let state = store.0.concurrent_state_mut();
2145         let old_thread = state.guest_thread.take();
2146         let new_task = GuestTask::new(
2147             state,
2148             Box::new(move |store, dst| {
2149                 let mut store = token.as_context_mut(store);
2150                 assert!(dst.len() <= MAX_FLAT_PARAMS);
2151                 // The `+ 1` here accounts for the return pointer, if any:
2152                 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2153                 let count = match caller_info {
2154                     // Async callers, if they have a result, use the last
2155                     // parameter as a return pointer so chop that off if
2156                     // relevant here.
2157                     CallerInfo::Async { params, has_result } => {
2158                         let params = &params[..params.len() - usize::from(has_result)];
2159                         for (param, src) in params.iter().zip(&mut src) {
2160                             src.write(*param);
2161                         }
2162                         params.len()
2163                     }
2164 
2165                     // Sync callers forward everything directly.
2166                     CallerInfo::Sync { params, .. } => {
2167                         for (param, src) in params.iter().zip(&mut src) {
2168                             src.write(*param);
2169                         }
2170                         params.len()
2171                     }
2172                 };
2173                 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2174                 // `wasmtime-cranelift`-generated fused adapter code.  Based on
2175                 // how it was constructed (see
2176                 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2177                 // for details) we know it takes count parameters and returns
2178                 // `dst.len()` results.
2179                 unsafe {
2180                     crate::Func::call_unchecked_raw(
2181                         &mut store,
2182                         start.as_non_null(),
2183                         NonNull::new(
2184                             &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2185                         )
2186                         .unwrap(),
2187                     )?;
2188                 }
2189                 dst.copy_from_slice(&src[..dst.len()]);
2190                 let state = store.0.concurrent_state_mut();
2191                 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2192                     state,
2193                     Some(Event::Subtask {
2194                         status: Status::Started,
2195                     }),
2196                 )?;
2197                 Ok(())
2198             }),
2199             LiftResult {
2200                 lift: Box::new(move |store, src| {
2201                     // SAFETY: See comment in closure passed as `lower_params`
2202                     // parameter above.
2203                     let mut store = token.as_context_mut(store);
2204                     let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2205                     if let ResultInfo::Heap { results } = &result_info {
2206                         my_src.push(ValRaw::u32(*results));
2207                     }
2208                     // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2209                     // `wasmtime-cranelift`-generated fused adapter code.  Based
2210                     // on how it was constructed (see
2211                     // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2212                     // for details) we know it takes `src.len()` parameters and
2213                     // returns up to 1 result.
2214                     unsafe {
2215                         crate::Func::call_unchecked_raw(
2216                             &mut store,
2217                             return_.as_non_null(),
2218                             my_src.as_mut_slice().into(),
2219                         )?;
2220                     }
2221                     let state = store.0.concurrent_state_mut();
2222                     let thread = state.guest_thread.unwrap();
2223                     if sync_caller {
2224                         state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2225                             if let ResultInfo::Stack { result_count } = &result_info {
2226                                 match result_count {
2227                                     0 => None,
2228                                     1 => Some(my_src[0]),
2229                                     _ => unreachable!(),
2230                                 }
2231                             } else {
2232                                 None
2233                             },
2234                         );
2235                     }
2236                     Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2237                 }),
2238                 ty: task_return_type,
2239                 memory: NonNull::new(memory).map(SendSyncPtr::new),
2240                 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2241             },
2242             Caller::Guest {
2243                 thread: old_thread.unwrap(),
2244                 instance: caller_instance,
2245             },
2246             None,
2247             callee_instance,
2248             callee_async,
2249         )?;
2250 
2251         let guest_task = state.push(new_task)?;
2252         let new_thread = GuestThread::new_implicit(guest_task);
2253         let guest_thread = state.push(new_thread)?;
2254         state.get_mut(guest_task)?.threads.insert(guest_thread);
2255 
2256         let state = store.0.concurrent_state_mut();
2257         if let Some(old_thread) = old_thread {
2258             if !state.may_enter(guest_task) {
2259                 bail!(crate::Trap::CannotEnterComponent);
2260             }
2261 
2262             state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2263         };
2264 
2265         // Make the new thread the current one so that `Self::start_call` knows
2266         // which one to start.
2267         state.guest_thread = Some(QualifiedThreadId {
2268             task: guest_task,
2269             thread: guest_thread,
2270         });
2271         log::trace!(
2272             "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2273         );
2274 
2275         Ok(())
2276     }
2277 
2278     /// Call the specified callback function for an async-lifted export.
2279     ///
2280     /// SAFETY: `function` must be a valid reference to a guest function of the
2281     /// correct signature for a callback.
2282     unsafe fn call_callback<T>(
2283         self,
2284         mut store: StoreContextMut<T>,
2285         callee_instance: RuntimeComponentInstanceIndex,
2286         function: SendSyncPtr<VMFuncRef>,
2287         event: Event,
2288         handle: u32,
2289         may_enter_after_call: bool,
2290     ) -> Result<u32> {
2291         let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2292 
2293         let (ordinal, result) = event.parts();
2294         let params = &mut [
2295             ValRaw::u32(ordinal),
2296             ValRaw::u32(handle),
2297             ValRaw::u32(result),
2298         ];
2299         // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2300         // `wasmtime-cranelift`-generated fused adapter code or
2301         // `component::Options`.  Per `wasmparser` callback signature
2302         // validation, we know it takes three parameters and returns one.
2303         unsafe {
2304             flags.set_may_enter(false);
2305             crate::Func::call_unchecked_raw(
2306                 &mut store,
2307                 function.as_non_null(),
2308                 params.as_mut_slice().into(),
2309             )?;
2310             flags.set_may_enter(may_enter_after_call);
2311         }
2312         Ok(params[0].get_u32())
2313     }
2314 
2315     /// Start a guest->guest call previously prepared using
2316     /// `Self::prepare_call`.
2317     ///
2318     /// This is called from fused adapter code generated in
2319     /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2320     /// this function immediately after calling `Self::prepare_call`.
2321     ///
2322     /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2323     /// functions with the appropriate signatures for the current guest task.
2324     /// If this is a call to an async-lowered import, the actual call may be
2325     /// deferred and run after this function returns, in which case the pointer
2326     /// arguments must also be valid when the call happens.
2327     unsafe fn start_call<T: 'static>(
2328         self,
2329         mut store: StoreContextMut<T>,
2330         callback: *mut VMFuncRef,
2331         post_return: *mut VMFuncRef,
2332         callee: *mut VMFuncRef,
2333         param_count: u32,
2334         result_count: u32,
2335         flags: u32,
2336         storage: Option<&mut [MaybeUninit<ValRaw>]>,
2337     ) -> Result<u32> {
2338         let token = StoreToken::new(store.as_context_mut());
2339         let async_caller = storage.is_none();
2340         let state = store.0.concurrent_state_mut();
2341         let guest_thread = state.guest_thread.unwrap();
2342         let callee_async = state.get_mut(guest_thread.task)?.async_function;
2343         let may_enter_after_call = state
2344             .get_mut(guest_thread.task)?
2345             .call_post_return_automatically();
2346         let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2347         let param_count = usize::try_from(param_count).unwrap();
2348         assert!(param_count <= MAX_FLAT_PARAMS);
2349         let result_count = usize::try_from(result_count).unwrap();
2350         assert!(result_count <= MAX_FLAT_RESULTS);
2351 
2352         let task = state.get_mut(guest_thread.task)?;
2353         if !callback.is_null() {
2354             // We're calling an async-lifted export with a callback, so store
2355             // the callback and related context as part of the task so we can
2356             // call it later when needed.
2357             let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2358             task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2359                 let store = token.as_context_mut(store);
2360                 unsafe {
2361                     self.call_callback::<T>(
2362                         store,
2363                         runtime_instance,
2364                         callback,
2365                         event,
2366                         handle,
2367                         may_enter_after_call,
2368                     )
2369                 }
2370             }));
2371         }
2372 
2373         let Caller::Guest {
2374             thread: caller,
2375             instance: runtime_instance,
2376         } = &task.caller
2377         else {
2378             // As of this writing, `start_call` is only used for guest->guest
2379             // calls.
2380             unreachable!()
2381         };
2382         let caller = *caller;
2383         let caller_instance = *runtime_instance;
2384 
2385         let callee_instance = task.instance;
2386 
2387         let instance_flags = if callback.is_null() {
2388             None
2389         } else {
2390             Some(self.id().get(store.0).instance_flags(callee_instance))
2391         };
2392 
2393         // Queue the call as a "high priority" work item.
2394         unsafe {
2395             self.queue_call(
2396                 store.as_context_mut(),
2397                 guest_thread,
2398                 callee,
2399                 param_count,
2400                 result_count,
2401                 instance_flags,
2402                 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2403                 NonNull::new(callback).map(SendSyncPtr::new),
2404                 NonNull::new(post_return).map(SendSyncPtr::new),
2405             )?;
2406         }
2407 
2408         let state = store.0.concurrent_state_mut();
2409 
2410         // Use the caller's `GuestTask::sync_call_set` to register interest in
2411         // the subtask...
2412         let guest_waitable = Waitable::Guest(guest_thread.task);
2413         let old_set = guest_waitable.common(state)?.set;
2414         let set = state.get_mut(caller.task)?.sync_call_set;
2415         guest_waitable.join(state, Some(set))?;
2416 
2417         // ... and suspend this fiber temporarily while we wait for it to start.
2418         //
2419         // Note that we _could_ call the callee directly using the current fiber
2420         // rather than suspend this one, but that would make reasoning about the
2421         // event loop more complicated and is probably only worth doing if
2422         // there's a measurable performance benefit.  In addition, it would mean
2423         // blocking the caller if the callee calls a blocking sync-lowered
2424         // import, and as of this writing the spec says we must not do that.
2425         //
2426         // Alternatively, the fused adapter code could be modified to call the
2427         // callee directly without calling a host-provided intrinsic at all (in
2428         // which case it would need to do its own, inline backpressure checks,
2429         // etc.).  Again, we'd want to see a measurable performance benefit
2430         // before committing to such an optimization.  And again, we'd need to
2431         // update the spec to allow that.
2432         let (status, waitable) = loop {
2433             store.0.suspend(SuspendReason::Waiting {
2434                 set,
2435                 thread: caller,
2436                 // Normally, `StoreOpaque::suspend` would assert it's being
2437                 // called from a context where blocking is allowed.  However, if
2438                 // `async_caller` is `true`, we'll only "block" long enough for
2439                 // the callee to start, i.e. we won't repeat this loop, so we
2440                 // tell `suspend` it's okay even if we're not allowed to block.
2441                 // Alternatively, if the callee is not an async function, then
2442                 // we know it won't block anyway.
2443                 skip_may_block_check: async_caller || !callee_async,
2444             })?;
2445 
2446             let state = store.0.concurrent_state_mut();
2447 
2448             log::trace!("taking event for {:?}", guest_thread.task);
2449             let event = guest_waitable.take_event(state)?;
2450             let Some(Event::Subtask { status }) = event else {
2451                 unreachable!();
2452             };
2453 
2454             log::trace!("status {status:?} for {:?}", guest_thread.task);
2455 
2456             if status == Status::Returned {
2457                 // It returned, so we can stop waiting.
2458                 break (status, None);
2459             } else if async_caller {
2460                 // It hasn't returned yet, but the caller is calling via an
2461                 // async-lowered import, so we generate a handle for the task
2462                 // waitable and return the status.
2463                 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2464                     .subtask_insert_guest(guest_thread.task.rep())?;
2465                 store
2466                     .0
2467                     .concurrent_state_mut()
2468                     .get_mut(guest_thread.task)?
2469                     .common
2470                     .handle = Some(handle);
2471                 break (status, Some(handle));
2472             } else {
2473                 // The callee hasn't returned yet, and the caller is calling via
2474                 // a sync-lowered import, so we loop and keep waiting until the
2475                 // callee returns.
2476             }
2477         };
2478 
2479         let state = store.0.concurrent_state_mut();
2480 
2481         guest_waitable.join(state, old_set)?;
2482 
2483         if let Some(storage) = storage {
2484             // The caller used a sync-lowered import to call an async-lifted
2485             // export, in which case the result, if any, has been stashed in
2486             // `GuestTask::sync_result`.
2487             let task = state.get_mut(guest_thread.task)?;
2488             if let Some(result) = task.sync_result.take() {
2489                 if let Some(result) = result {
2490                     storage[0] = MaybeUninit::new(result);
2491                 }
2492 
2493                 if task.exited {
2494                     if task.ready_to_delete() {
2495                         Waitable::Guest(guest_thread.task).delete_from(state)?;
2496                     }
2497                 }
2498             }
2499         }
2500 
2501         // Reset the current thread to point to the caller as it resumes control.
2502         state.guest_thread = Some(caller);
2503         state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2504         log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2505 
2506         Ok(status.pack(waitable))
2507     }
2508 
2509     /// Poll the specified future once on behalf of a guest->host call using an
2510     /// async-lowered import.
2511     ///
2512     /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2513     /// `Pending`, add it to the set of futures to be polled as part of this
2514     /// instance's event loop until it completes, and then return
2515     /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2516     ///
2517     /// Whether the future returns `Ready` immediately or later, the `lower`
2518     /// function will be used to lower the result, if any, into the guest caller's
2519     /// stack and linear memory unless the task has been cancelled.
2520     pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2521         self,
2522         mut store: StoreContextMut<T>,
2523         future: impl Future<Output = Result<R>> + Send + 'static,
2524         caller_instance: RuntimeComponentInstanceIndex,
2525         lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2526     ) -> Result<Option<u32>> {
2527         let token = StoreToken::new(store.as_context_mut());
2528         let state = store.0.concurrent_state_mut();
2529         let caller = state.guest_thread.unwrap();
2530 
2531         // Create an abortable future which hooks calls to poll and manages call
2532         // context state for the future.
2533         let (join_handle, future) = JoinHandle::run(async move {
2534             let mut future = pin!(future);
2535             let mut call_context = None;
2536             future::poll_fn(move |cx| {
2537                 // Push the call context for managing any resource borrows
2538                 // for the task.
2539                 tls::get(|store| {
2540                     if let Some(call_context) = call_context.take() {
2541                         token
2542                             .as_context_mut(store)
2543                             .0
2544                             .component_resource_state()
2545                             .0
2546                             .push(call_context);
2547                     }
2548                 });
2549 
2550                 let result = future.as_mut().poll(cx);
2551 
2552                 if result.is_pending() {
2553                     // Pop the call context for managing any resource
2554                     // borrows for the task.
2555                     tls::get(|store| {
2556                         call_context = Some(
2557                             token
2558                                 .as_context_mut(store)
2559                                 .0
2560                                 .component_resource_state()
2561                                 .0
2562                                 .pop()
2563                                 .unwrap(),
2564                         );
2565                     });
2566                 }
2567                 result
2568             })
2569             .await
2570         });
2571 
2572         // We create a new host task even though it might complete immediately
2573         // (in which case we won't need to pass a waitable back to the guest).
2574         // If it does complete immediately, we'll remove it before we return.
2575         let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2576 
2577         log::trace!("new host task child of {caller:?}: {task:?}");
2578 
2579         let mut future = Box::pin(future);
2580 
2581         // Finally, poll the future.  We can use a dummy `Waker` here because
2582         // we'll add the future to `ConcurrentState::futures` and poll it
2583         // automatically from the event loop if it doesn't complete immediately
2584         // here.
2585         let poll = tls::set(store.0, || {
2586             future
2587                 .as_mut()
2588                 .poll(&mut Context::from_waker(&Waker::noop()))
2589         });
2590 
2591         Ok(match poll {
2592             Poll::Ready(None) => unreachable!(),
2593             Poll::Ready(Some(result)) => {
2594                 // It finished immediately; lower the result and delete the
2595                 // task.
2596                 lower(store.as_context_mut(), result?)?;
2597                 log::trace!("delete host task {task:?} (already ready)");
2598                 store.0.concurrent_state_mut().delete(task)?;
2599                 None
2600             }
2601             Poll::Pending => {
2602                 // It hasn't finished yet; add the future to
2603                 // `ConcurrentState::futures` so it will be polled by the event
2604                 // loop and allocate a waitable handle to return to the guest.
2605 
2606                 // Wrap the future in a closure responsible for lowering the result into
2607                 // the guest's stack and memory, as well as notifying any waiters that
2608                 // the task returned.
2609                 let future =
2610                     Box::pin(async move {
2611                         let result = match future.await {
2612                             Some(result) => result?,
2613                             // Task was cancelled; nothing left to do.
2614                             None => return Ok(()),
2615                         };
2616                         tls::get(move |store| {
2617                             // Here we schedule a task to run on a worker fiber to do
2618                             // the lowering since it may involve a call to the guest's
2619                             // realloc function.  This is necessary because calling the
2620                             // guest while there are host embedder frames on the stack
2621                             // is unsound.
2622                             store.concurrent_state_mut().push_high_priority(
2623                                 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2624                                     lower(token.as_context_mut(store), result)?;
2625                                     let state = store.concurrent_state_mut();
2626                                     state.get_mut(task)?.join_handle.take();
2627                                     Waitable::Host(task).set_event(
2628                                         state,
2629                                         Some(Event::Subtask {
2630                                             status: Status::Returned,
2631                                         }),
2632                                     )
2633                                 }))),
2634                             );
2635                             Ok(())
2636                         })
2637                     });
2638 
2639                 store.0.concurrent_state_mut().push_future(future);
2640                 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance]
2641                     .subtask_insert_host(task.rep())?;
2642                 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2643                 log::trace!(
2644                     "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2645                 );
2646                 Some(handle)
2647             }
2648         })
2649     }
2650 
2651     /// Implements the `task.return` intrinsic, lifting the result for the
2652     /// current guest task.
2653     pub(crate) fn task_return(
2654         self,
2655         store: &mut dyn VMStore,
2656         caller: RuntimeComponentInstanceIndex,
2657         ty: TypeTupleIndex,
2658         options: OptionsIndex,
2659         storage: &[ValRaw],
2660     ) -> Result<()> {
2661         self.id().get(store).check_may_leave(caller)?;
2662         let state = store.concurrent_state_mut();
2663         let guest_thread = state.guest_thread.unwrap();
2664         let lift = state
2665             .get_mut(guest_thread.task)?
2666             .lift_result
2667             .take()
2668             .ok_or_else(|| {
2669                 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2670             })?;
2671         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2672 
2673         let CanonicalOptions {
2674             string_encoding,
2675             data_model,
2676             ..
2677         } = &self.id().get(store).component().env_component().options[options];
2678 
2679         let invalid = ty != lift.ty
2680             || string_encoding != &lift.string_encoding
2681             || match data_model {
2682                 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2683                     Some(memory) => {
2684                         let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2685                         let actual = self.id().get(store).runtime_memory(memory);
2686                         expected != actual.as_ptr()
2687                     }
2688                     // Memory not specified, meaning it didn't need to be
2689                     // specified per validation, so not invalid.
2690                     None => false,
2691                 },
2692                 // Always invalid as this isn't supported.
2693                 CanonicalOptionsDataModel::Gc { .. } => true,
2694             };
2695 
2696         if invalid {
2697             bail!("invalid `task.return` signature and/or options for current task");
2698         }
2699 
2700         log::trace!("task.return for {guest_thread:?}");
2701 
2702         let result = (lift.lift)(store, storage)?;
2703         self.task_complete(
2704             store,
2705             guest_thread.task,
2706             result,
2707             Status::Returned,
2708             ValRaw::i32(0),
2709         )
2710     }
2711 
2712     /// Implements the `task.cancel` intrinsic.
2713     pub(crate) fn task_cancel(
2714         self,
2715         store: &mut StoreOpaque,
2716         caller: RuntimeComponentInstanceIndex,
2717     ) -> Result<()> {
2718         self.id().get(store).check_may_leave(caller)?;
2719         let state = store.concurrent_state_mut();
2720         let guest_thread = state.guest_thread.unwrap();
2721         let task = state.get_mut(guest_thread.task)?;
2722         if !task.cancel_sent {
2723             bail!("`task.cancel` called by task which has not been cancelled")
2724         }
2725         _ = task.lift_result.take().ok_or_else(|| {
2726             anyhow!("`task.return` or `task.cancel` called more than once for current task")
2727         })?;
2728 
2729         assert!(task.result.is_none());
2730 
2731         log::trace!("task.cancel for {guest_thread:?}");
2732 
2733         self.task_complete(
2734             store,
2735             guest_thread.task,
2736             Box::new(DummyResult),
2737             Status::ReturnCancelled,
2738             ValRaw::i32(0),
2739         )
2740     }
2741 
2742     /// Complete the specified guest task (i.e. indicate that it has either
2743     /// returned a (possibly empty) result or cancelled itself).
2744     ///
2745     /// This will return any resource borrows and notify any current or future
2746     /// waiters that the task has completed.
2747     fn task_complete(
2748         self,
2749         store: &mut StoreOpaque,
2750         guest_task: TableId<GuestTask>,
2751         result: Box<dyn Any + Send + Sync>,
2752         status: Status,
2753         post_return_arg: ValRaw,
2754     ) -> Result<()> {
2755         if store
2756             .concurrent_state_mut()
2757             .get_mut(guest_task)?
2758             .call_post_return_automatically()
2759         {
2760             let (calls, host_table, _, instance) =
2761                 store.component_resource_state_with_instance(self);
2762             ResourceTables {
2763                 calls,
2764                 host_table: Some(host_table),
2765                 guest: Some(instance.guest_tables()),
2766             }
2767             .exit_call()?;
2768         } else {
2769             // As of this writing, the only scenario where `call_post_return_automatically`
2770             // would be false for a `GuestTask` is for host-to-guest calls using
2771             // `[Typed]Func::call_async`, in which case the `function_index`
2772             // should be a non-`None` value.
2773             let function_index = store
2774                 .concurrent_state_mut()
2775                 .get_mut(guest_task)?
2776                 .function_index
2777                 .unwrap();
2778             self.id()
2779                 .get_mut(store)
2780                 .post_return_arg_set(function_index, post_return_arg);
2781         }
2782 
2783         let state = store.concurrent_state_mut();
2784         let task = state.get_mut(guest_task)?;
2785 
2786         if let Caller::Host { tx, .. } = &mut task.caller {
2787             if let Some(tx) = tx.take() {
2788                 _ = tx.send(result);
2789             }
2790         } else {
2791             task.result = Some(result);
2792             Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2793         }
2794 
2795         Ok(())
2796     }
2797 
2798     /// Implements the `waitable-set.new` intrinsic.
2799     pub(crate) fn waitable_set_new(
2800         self,
2801         store: &mut StoreOpaque,
2802         caller_instance: RuntimeComponentInstanceIndex,
2803     ) -> Result<u32> {
2804         self.id().get_mut(store).check_may_leave(caller_instance)?;
2805         let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2806         let handle = self.id().get_mut(store).guest_tables().0[caller_instance]
2807             .waitable_set_insert(set.rep())?;
2808         log::trace!("new waitable set {set:?} (handle {handle})");
2809         Ok(handle)
2810     }
2811 
2812     /// Implements the `waitable-set.drop` intrinsic.
2813     pub(crate) fn waitable_set_drop(
2814         self,
2815         store: &mut StoreOpaque,
2816         caller_instance: RuntimeComponentInstanceIndex,
2817         set: u32,
2818     ) -> Result<()> {
2819         self.id().get_mut(store).check_may_leave(caller_instance)?;
2820         let rep =
2821             self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?;
2822 
2823         log::trace!("drop waitable set {rep} (handle {set})");
2824 
2825         let set = store
2826             .concurrent_state_mut()
2827             .delete(TableId::<WaitableSet>::new(rep))?;
2828 
2829         if !set.waiting.is_empty() {
2830             bail!("cannot drop waitable set with waiters");
2831         }
2832 
2833         Ok(())
2834     }
2835 
2836     /// Implements the `waitable.join` intrinsic.
2837     pub(crate) fn waitable_join(
2838         self,
2839         store: &mut StoreOpaque,
2840         caller_instance: RuntimeComponentInstanceIndex,
2841         waitable_handle: u32,
2842         set_handle: u32,
2843     ) -> Result<()> {
2844         let mut instance = self.id().get_mut(store);
2845         instance.check_may_leave(caller_instance)?;
2846         let waitable =
2847             Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2848 
2849         let set = if set_handle == 0 {
2850             None
2851         } else {
2852             let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?;
2853 
2854             Some(TableId::<WaitableSet>::new(set))
2855         };
2856 
2857         log::trace!(
2858             "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2859         );
2860 
2861         waitable.join(store.concurrent_state_mut(), set)
2862     }
2863 
2864     /// Implements the `subtask.drop` intrinsic.
2865     pub(crate) fn subtask_drop(
2866         self,
2867         store: &mut StoreOpaque,
2868         caller_instance: RuntimeComponentInstanceIndex,
2869         task_id: u32,
2870     ) -> Result<()> {
2871         self.id().get_mut(store).check_may_leave(caller_instance)?;
2872         self.waitable_join(store, caller_instance, task_id, 0)?;
2873 
2874         let (rep, is_host) =
2875             self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?;
2876 
2877         let concurrent_state = store.concurrent_state_mut();
2878         let (waitable, expected_caller_instance, delete) = if is_host {
2879             let id = TableId::<HostTask>::new(rep);
2880             let task = concurrent_state.get_mut(id)?;
2881             if task.join_handle.is_some() {
2882                 bail!("cannot drop a subtask which has not yet resolved");
2883             }
2884             (Waitable::Host(id), task.caller_instance, true)
2885         } else {
2886             let id = TableId::<GuestTask>::new(rep);
2887             let task = concurrent_state.get_mut(id)?;
2888             if task.lift_result.is_some() {
2889                 bail!("cannot drop a subtask which has not yet resolved");
2890             }
2891             if let Caller::Guest { instance, .. } = &task.caller {
2892                 (Waitable::Guest(id), *instance, task.exited)
2893             } else {
2894                 unreachable!()
2895             }
2896         };
2897 
2898         waitable.common(concurrent_state)?.handle = None;
2899 
2900         if waitable.take_event(concurrent_state)?.is_some() {
2901             bail!("cannot drop a subtask with an undelivered event");
2902         }
2903 
2904         if delete {
2905             waitable.delete_from(concurrent_state)?;
2906         }
2907 
2908         // Since waitables can neither be passed between instances nor forged,
2909         // this should never fail unless there's a bug in Wasmtime, but we check
2910         // here to be sure:
2911         assert_eq!(expected_caller_instance, caller_instance);
2912         log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2913         Ok(())
2914     }
2915 
2916     /// Implements the `waitable-set.wait` intrinsic.
2917     pub(crate) fn waitable_set_wait(
2918         self,
2919         store: &mut StoreOpaque,
2920         caller: RuntimeComponentInstanceIndex,
2921         options: OptionsIndex,
2922         set: u32,
2923         payload: u32,
2924     ) -> Result<u32> {
2925         self.id().get(store).check_may_leave(caller)?;
2926 
2927         if !self.options(store, options).async_ {
2928             // The caller may only call `waitable-set.wait` from an async task
2929             // (i.e. a task created via a call to an async export).
2930             // Otherwise, we'll trap.
2931             store.concurrent_state_mut().check_blocking()?;
2932         }
2933 
2934         let &CanonicalOptions {
2935             cancellable,
2936             instance: caller_instance,
2937             ..
2938         } = &self.id().get(store).component().env_component().options[options];
2939         let rep =
2940             self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2941 
2942         self.waitable_check(
2943             store,
2944             cancellable,
2945             WaitableCheck::Wait(WaitableCheckParams {
2946                 set: TableId::new(rep),
2947                 options,
2948                 payload,
2949             }),
2950         )
2951     }
2952 
2953     /// Implements the `waitable-set.poll` intrinsic.
2954     pub(crate) fn waitable_set_poll(
2955         self,
2956         store: &mut StoreOpaque,
2957         caller: RuntimeComponentInstanceIndex,
2958         options: OptionsIndex,
2959         set: u32,
2960         payload: u32,
2961     ) -> Result<u32> {
2962         self.id().get(store).check_may_leave(caller)?;
2963 
2964         if !self.options(store, options).async_ {
2965             // The caller may only call `waitable-set.poll` from an async task
2966             // (i.e. a task created via a call to an async export).
2967             // Otherwise, we'll trap.
2968             store.concurrent_state_mut().check_blocking()?;
2969         }
2970 
2971         let &CanonicalOptions {
2972             cancellable,
2973             instance: caller_instance,
2974             ..
2975         } = &self.id().get(store).component().env_component().options[options];
2976         let rep =
2977             self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?;
2978 
2979         self.waitable_check(
2980             store,
2981             cancellable,
2982             WaitableCheck::Poll(WaitableCheckParams {
2983                 set: TableId::new(rep),
2984                 options,
2985                 payload,
2986             }),
2987         )
2988     }
2989 
2990     /// Implements the `thread.index` intrinsic.
2991     pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
2992         let thread_id = store
2993             .concurrent_state_mut()
2994             .guest_thread
2995             .ok_or_else(|| anyhow!("no current thread"))?
2996             .thread;
2997         // The unwrap is safe because `instance_rep` must be `Some` by this point
2998         Ok(store
2999             .concurrent_state_mut()
3000             .get_mut(thread_id)?
3001             .instance_rep
3002             .unwrap())
3003     }
3004 
3005     /// Implements the `thread.new-indirect` intrinsic.
3006     pub(crate) fn thread_new_indirect<T: 'static>(
3007         self,
3008         mut store: StoreContextMut<T>,
3009         runtime_instance: RuntimeComponentInstanceIndex,
3010         _func_ty_idx: TypeFuncIndex, // currently unused
3011         start_func_table_idx: RuntimeTableIndex,
3012         start_func_idx: u32,
3013         context: i32,
3014     ) -> Result<u32> {
3015         self.id().get(store.0).check_may_leave(runtime_instance)?;
3016 
3017         log::trace!("creating new thread");
3018 
3019         let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3020         let (instance, registry) = self.id().get_mut_and_registry(store.0);
3021         let callee = instance
3022             .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3023             .ok_or_else(|| {
3024                 anyhow!("the start function index points to an uninitialized function")
3025             })?;
3026         if callee.type_index(store.0) != start_func_ty.type_index() {
3027             bail!(
3028                 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3029             );
3030         }
3031 
3032         let token = StoreToken::new(store.as_context_mut());
3033         let start_func = Box::new(
3034             move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3035                 let old_thread = store
3036                     .concurrent_state_mut()
3037                     .guest_thread
3038                     .replace(guest_thread);
3039                 log::trace!(
3040                     "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3041                 );
3042 
3043                 store.maybe_push_call_context(guest_thread.task)?;
3044 
3045                 let mut store = token.as_context_mut(store);
3046                 let mut params = [ValRaw::i32(context)];
3047                 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3048                 // on a separate fiber if we're running in an async store.
3049                 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3050 
3051                 store.0.maybe_pop_call_context(guest_thread.task)?;
3052 
3053                 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3054                 log::trace!("explicit thread {guest_thread:?} completed");
3055                 let state = store.0.concurrent_state_mut();
3056                 let task = state.get_mut(guest_thread.task)?;
3057                 if task.threads.is_empty() && !task.returned_or_cancelled() {
3058                     bail!(Trap::NoAsyncResult);
3059                 }
3060                 state.guest_thread = old_thread;
3061                 old_thread
3062                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3063                 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3064                     Waitable::Guest(guest_thread.task).delete_from(state)?;
3065                 }
3066                 log::trace!("thread start: restored {old_thread:?} as current thread");
3067 
3068                 Ok(())
3069             },
3070         );
3071 
3072         let state = store.0.concurrent_state_mut();
3073         let current_thread = state.guest_thread.unwrap();
3074         let parent_task = current_thread.task;
3075 
3076         let new_thread = GuestThread::new_explicit(parent_task, start_func);
3077         let thread_id = state.push(new_thread)?;
3078         state.get_mut(parent_task)?.threads.insert(thread_id);
3079 
3080         log::trace!("new thread with id {thread_id:?} created");
3081 
3082         self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3083     }
3084 
3085     pub(crate) fn resume_suspended_thread(
3086         self,
3087         store: &mut StoreOpaque,
3088         runtime_instance: RuntimeComponentInstanceIndex,
3089         thread_idx: u32,
3090         high_priority: bool,
3091     ) -> Result<()> {
3092         let thread_id =
3093             GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3094         let state = store.concurrent_state_mut();
3095         let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3096         let thread = state.get_mut(guest_thread.thread)?;
3097 
3098         match mem::replace(&mut thread.state, GuestThreadState::Running) {
3099             GuestThreadState::NotStartedExplicit(start_func) => {
3100                 log::trace!("starting thread {guest_thread:?}");
3101                 let guest_call = WorkItem::GuestCall(GuestCall {
3102                     thread: guest_thread,
3103                     kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3104                         start_func(store, guest_thread)
3105                     })),
3106                 });
3107                 store
3108                     .concurrent_state_mut()
3109                     .push_work_item(guest_call, high_priority);
3110             }
3111             GuestThreadState::Suspended(fiber) => {
3112                 log::trace!("resuming thread {thread_id:?} that was suspended");
3113                 store
3114                     .concurrent_state_mut()
3115                     .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3116             }
3117             _ => {
3118                 bail!("cannot resume thread which is not suspended");
3119             }
3120         }
3121         Ok(())
3122     }
3123 
3124     fn add_guest_thread_to_instance_table(
3125         self,
3126         thread_id: TableId<GuestThread>,
3127         store: &mut StoreOpaque,
3128         runtime_instance: RuntimeComponentInstanceIndex,
3129     ) -> Result<u32> {
3130         let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance]
3131             .guest_thread_insert(thread_id.rep())?;
3132         store
3133             .concurrent_state_mut()
3134             .get_mut(thread_id)?
3135             .instance_rep = Some(guest_id);
3136         Ok(guest_id)
3137     }
3138 
3139     /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3140     /// and `thread.switch-to` intrinsics.
3141     pub(crate) fn suspension_intrinsic(
3142         self,
3143         store: &mut StoreOpaque,
3144         caller: RuntimeComponentInstanceIndex,
3145         cancellable: bool,
3146         yielding: bool,
3147         to_thread: Option<u32>,
3148     ) -> Result<WaitResult> {
3149         self.id().get(store).check_may_leave(caller)?;
3150 
3151         if to_thread.is_none() {
3152             let state = store.concurrent_state_mut();
3153             if yielding {
3154                 // This is a `thread.yield` call
3155                 if !state.may_block(state.guest_thread.unwrap().task) {
3156                     // The spec defines `thread.yield` to be a no-op in a
3157                     // non-blocking context, so we return immediately without giving
3158                     // any other thread a chance to run.
3159                     return Ok(WaitResult::Completed);
3160                 }
3161             } else {
3162                 // The caller may only call `thread.suspend` from an async task
3163                 // (i.e. a task created via a call to an async export).
3164                 // Otherwise, we'll trap.
3165                 state.check_blocking()?;
3166             }
3167         }
3168 
3169         // There could be a pending cancellation from a previous uncancellable wait
3170         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3171             return Ok(WaitResult::Cancelled);
3172         }
3173 
3174         if let Some(thread) = to_thread {
3175             self.resume_suspended_thread(store, caller, thread, true)?;
3176         }
3177 
3178         let state = store.concurrent_state_mut();
3179         let guest_thread = state.guest_thread.unwrap();
3180         let reason = if yielding {
3181             SuspendReason::Yielding {
3182                 thread: guest_thread,
3183             }
3184         } else {
3185             SuspendReason::ExplicitlySuspending {
3186                 thread: guest_thread,
3187                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3188                 // we're handling a `thread.switch-to` call; otherwise it would
3189                 // panic if we called it in a non-blocking context.
3190                 skip_may_block_check: to_thread.is_some(),
3191             }
3192         };
3193 
3194         store.suspend(reason)?;
3195 
3196         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3197             Ok(WaitResult::Cancelled)
3198         } else {
3199             Ok(WaitResult::Completed)
3200         }
3201     }
3202 
3203     /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3204     fn waitable_check(
3205         self,
3206         store: &mut StoreOpaque,
3207         cancellable: bool,
3208         check: WaitableCheck,
3209     ) -> Result<u32> {
3210         let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3211 
3212         let (wait, set) = match &check {
3213             WaitableCheck::Wait(params) => (true, Some(params.set)),
3214             WaitableCheck::Poll(params) => (false, Some(params.set)),
3215         };
3216 
3217         log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3218         // First, suspend this fiber, allowing any other threads to run.
3219         store.suspend(SuspendReason::Yielding {
3220             thread: guest_thread,
3221         })?;
3222 
3223         log::trace!("waitable check for {guest_thread:?}; set {set:?}");
3224 
3225         let state = store.concurrent_state_mut();
3226         let task = state.get_mut(guest_thread.task)?;
3227 
3228         // If we're waiting, and there are no events immediately available,
3229         // suspend the fiber until that changes.
3230         if wait {
3231             let set = set.unwrap();
3232 
3233             if (task.event.is_none()
3234                 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3235                 && state.get_mut(set)?.ready.is_empty()
3236             {
3237                 if cancellable {
3238                     let old = state
3239                         .get_mut(guest_thread.thread)?
3240                         .wake_on_cancel
3241                         .replace(set);
3242                     assert!(old.is_none());
3243                 }
3244 
3245                 store.suspend(SuspendReason::Waiting {
3246                     set,
3247                     thread: guest_thread,
3248                     skip_may_block_check: false,
3249                 })?;
3250             }
3251         }
3252 
3253         log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two");
3254 
3255         let result = match check {
3256             // Deliver any pending events to the guest and return.
3257             WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => {
3258                 let event =
3259                     self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3260 
3261                 let (ordinal, handle, result) = if wait {
3262                     let (event, waitable) = event.unwrap();
3263                     let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3264                     let (ordinal, result) = event.parts();
3265                     (ordinal, handle, result)
3266                 } else {
3267                     if let Some((event, waitable)) = event {
3268                         let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3269                         let (ordinal, result) = event.parts();
3270                         (ordinal, handle, result)
3271                     } else {
3272                         log::trace!(
3273                             "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3274                             guest_thread.task,
3275                             params.set
3276                         );
3277                         let (ordinal, result) = Event::None.parts();
3278                         (ordinal, 0, result)
3279                     }
3280                 };
3281                 let memory = self.options_memory_mut(store, params.options);
3282                 let ptr =
3283                     func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?;
3284                 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3285                 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3286                 Ok(ordinal)
3287             }
3288         };
3289 
3290         result
3291     }
3292 
3293     /// Implements the `subtask.cancel` intrinsic.
3294     pub(crate) fn subtask_cancel(
3295         self,
3296         store: &mut StoreOpaque,
3297         caller_instance: RuntimeComponentInstanceIndex,
3298         async_: bool,
3299         task_id: u32,
3300     ) -> Result<u32> {
3301         self.id().get(store).check_may_leave(caller_instance)?;
3302 
3303         if !async_ {
3304             // The caller may only sync call `subtask.cancel` from an async task
3305             // (i.e. a task created via a call to an async export).  Otherwise,
3306             // we'll trap.
3307             store.concurrent_state_mut().check_blocking()?;
3308         }
3309 
3310         let (rep, is_host) =
3311             self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?;
3312         let (waitable, expected_caller_instance) = if is_host {
3313             let id = TableId::<HostTask>::new(rep);
3314             (
3315                 Waitable::Host(id),
3316                 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3317             )
3318         } else {
3319             let id = TableId::<GuestTask>::new(rep);
3320             if let Caller::Guest { instance, .. } =
3321                 &store.concurrent_state_mut().get_mut(id)?.caller
3322             {
3323                 (Waitable::Guest(id), *instance)
3324             } else {
3325                 unreachable!()
3326             }
3327         };
3328         // Since waitables can neither be passed between instances nor forged,
3329         // this should never fail unless there's a bug in Wasmtime, but we check
3330         // here to be sure:
3331         assert_eq!(expected_caller_instance, caller_instance);
3332 
3333         log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3334 
3335         let concurrent_state = store.concurrent_state_mut();
3336         if let Waitable::Host(host_task) = waitable {
3337             if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3338                 handle.abort();
3339                 return Ok(Status::ReturnCancelled as u32);
3340             }
3341         } else {
3342             let caller = concurrent_state.guest_thread.unwrap();
3343             let guest_task = TableId::<GuestTask>::new(rep);
3344             let task = concurrent_state.get_mut(guest_task)?;
3345             if !task.already_lowered_parameters() {
3346                 task.lower_params = None;
3347                 task.lift_result = None;
3348 
3349                 // Not yet started; cancel and remove from pending
3350                 let callee_instance = task.instance;
3351 
3352                 let pending = &mut concurrent_state.instance_state(callee_instance).pending;
3353                 let pending_count = pending.len();
3354                 pending.retain(|thread, _| thread.task != guest_task);
3355                 // If there were no pending threads for this task, we're in an error state
3356                 if pending.len() == pending_count {
3357                     bail!("`subtask.cancel` called after terminal status delivered");
3358                 }
3359                 return Ok(Status::StartCancelled as u32);
3360             } else if !task.returned_or_cancelled() {
3361                 // Started, but not yet returned or cancelled; send the
3362                 // `CANCELLED` event
3363                 task.cancel_sent = true;
3364                 // Note that this might overwrite an event that was set earlier
3365                 // (e.g. `Event::None` if the task is yielding, or
3366                 // `Event::Cancelled` if it was already cancelled), but that's
3367                 // okay -- this should supersede the previous state.
3368                 task.event = Some(Event::Cancelled);
3369                 for thread in task.threads.clone() {
3370                     let thread = QualifiedThreadId {
3371                         task: guest_task,
3372                         thread,
3373                     };
3374                     if let Some(set) = concurrent_state
3375                         .get_mut(thread.thread)
3376                         .unwrap()
3377                         .wake_on_cancel
3378                         .take()
3379                     {
3380                         let item = match concurrent_state
3381                             .get_mut(set)?
3382                             .waiting
3383                             .remove(&thread)
3384                             .unwrap()
3385                         {
3386                             WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3387                             WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3388                                 thread,
3389                                 kind: GuestCallKind::DeliverEvent {
3390                                     instance,
3391                                     set: None,
3392                                 },
3393                             }),
3394                         };
3395                         concurrent_state.push_high_priority(item);
3396 
3397                         store.suspend(SuspendReason::Yielding { thread: caller })?;
3398                         break;
3399                     }
3400                 }
3401 
3402                 let concurrent_state = store.concurrent_state_mut();
3403                 let task = concurrent_state.get_mut(guest_task)?;
3404                 if !task.returned_or_cancelled() {
3405                     if async_ {
3406                         return Ok(BLOCKED);
3407                     } else {
3408                         store.wait_for_event(Waitable::Guest(guest_task))?;
3409                     }
3410                 }
3411             }
3412         }
3413 
3414         let event = waitable.take_event(store.concurrent_state_mut())?;
3415         if let Some(Event::Subtask {
3416             status: status @ (Status::Returned | Status::ReturnCancelled),
3417         }) = event
3418         {
3419             Ok(status as u32)
3420         } else {
3421             bail!("`subtask.cancel` called after terminal status delivered");
3422         }
3423     }
3424 
3425     pub(crate) fn context_get(
3426         self,
3427         store: &mut StoreOpaque,
3428         caller: RuntimeComponentInstanceIndex,
3429         slot: u32,
3430     ) -> Result<u32> {
3431         self.id().get(store).check_may_leave(caller)?;
3432         store.concurrent_state_mut().context_get(slot)
3433     }
3434 
3435     pub(crate) fn context_set(
3436         self,
3437         store: &mut StoreOpaque,
3438         caller: RuntimeComponentInstanceIndex,
3439         slot: u32,
3440         value: u32,
3441     ) -> Result<()> {
3442         self.id().get(store).check_may_leave(caller)?;
3443         store.concurrent_state_mut().context_set(slot, value)
3444     }
3445 }
3446 
3447 /// Trait representing component model ABI async intrinsics and fused adapter
3448 /// helper functions.
3449 ///
3450 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3451 /// which must be valid for at least the duration of the call (and possibly for
3452 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3453 /// pointers used for async calls).
3454 pub trait VMComponentAsyncStore {
3455     /// A helper function for fused adapter modules involving calls where the
3456     /// one of the caller or callee is async.
3457     ///
3458     /// This helper is not used when the caller and callee both use the sync
3459     /// ABI, only when at least one is async is this used.
3460     unsafe fn prepare_call(
3461         &mut self,
3462         instance: Instance,
3463         memory: *mut VMMemoryDefinition,
3464         start: *mut VMFuncRef,
3465         return_: *mut VMFuncRef,
3466         caller_instance: RuntimeComponentInstanceIndex,
3467         callee_instance: RuntimeComponentInstanceIndex,
3468         task_return_type: TypeTupleIndex,
3469         callee_async: bool,
3470         string_encoding: u8,
3471         result_count: u32,
3472         storage: *mut ValRaw,
3473         storage_len: usize,
3474     ) -> Result<()>;
3475 
3476     /// A helper function for fused adapter modules involving calls where the
3477     /// caller is sync-lowered but the callee is async-lifted.
3478     unsafe fn sync_start(
3479         &mut self,
3480         instance: Instance,
3481         callback: *mut VMFuncRef,
3482         callee: *mut VMFuncRef,
3483         param_count: u32,
3484         storage: *mut MaybeUninit<ValRaw>,
3485         storage_len: usize,
3486     ) -> Result<()>;
3487 
3488     /// A helper function for fused adapter modules involving calls where the
3489     /// caller is async-lowered.
3490     unsafe fn async_start(
3491         &mut self,
3492         instance: Instance,
3493         callback: *mut VMFuncRef,
3494         post_return: *mut VMFuncRef,
3495         callee: *mut VMFuncRef,
3496         param_count: u32,
3497         result_count: u32,
3498         flags: u32,
3499     ) -> Result<u32>;
3500 
3501     /// The `future.write` intrinsic.
3502     fn future_write(
3503         &mut self,
3504         instance: Instance,
3505         caller: RuntimeComponentInstanceIndex,
3506         ty: TypeFutureTableIndex,
3507         options: OptionsIndex,
3508         future: u32,
3509         address: u32,
3510     ) -> Result<u32>;
3511 
3512     /// The `future.read` intrinsic.
3513     fn future_read(
3514         &mut self,
3515         instance: Instance,
3516         caller: RuntimeComponentInstanceIndex,
3517         ty: TypeFutureTableIndex,
3518         options: OptionsIndex,
3519         future: u32,
3520         address: u32,
3521     ) -> Result<u32>;
3522 
3523     /// The `future.drop-writable` intrinsic.
3524     fn future_drop_writable(
3525         &mut self,
3526         instance: Instance,
3527         caller: RuntimeComponentInstanceIndex,
3528         ty: TypeFutureTableIndex,
3529         writer: u32,
3530     ) -> Result<()>;
3531 
3532     /// The `stream.write` intrinsic.
3533     fn stream_write(
3534         &mut self,
3535         instance: Instance,
3536         caller: RuntimeComponentInstanceIndex,
3537         ty: TypeStreamTableIndex,
3538         options: OptionsIndex,
3539         stream: u32,
3540         address: u32,
3541         count: u32,
3542     ) -> Result<u32>;
3543 
3544     /// The `stream.read` intrinsic.
3545     fn stream_read(
3546         &mut self,
3547         instance: Instance,
3548         caller: RuntimeComponentInstanceIndex,
3549         ty: TypeStreamTableIndex,
3550         options: OptionsIndex,
3551         stream: u32,
3552         address: u32,
3553         count: u32,
3554     ) -> Result<u32>;
3555 
3556     /// The "fast-path" implementation of the `stream.write` intrinsic for
3557     /// "flat" (i.e. memcpy-able) payloads.
3558     fn flat_stream_write(
3559         &mut self,
3560         instance: Instance,
3561         caller: RuntimeComponentInstanceIndex,
3562         ty: TypeStreamTableIndex,
3563         options: OptionsIndex,
3564         payload_size: u32,
3565         payload_align: u32,
3566         stream: u32,
3567         address: u32,
3568         count: u32,
3569     ) -> Result<u32>;
3570 
3571     /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3572     /// (i.e. memcpy-able) payloads.
3573     fn flat_stream_read(
3574         &mut self,
3575         instance: Instance,
3576         caller: RuntimeComponentInstanceIndex,
3577         ty: TypeStreamTableIndex,
3578         options: OptionsIndex,
3579         payload_size: u32,
3580         payload_align: u32,
3581         stream: u32,
3582         address: u32,
3583         count: u32,
3584     ) -> Result<u32>;
3585 
3586     /// The `stream.drop-writable` intrinsic.
3587     fn stream_drop_writable(
3588         &mut self,
3589         instance: Instance,
3590         caller: RuntimeComponentInstanceIndex,
3591         ty: TypeStreamTableIndex,
3592         writer: u32,
3593     ) -> Result<()>;
3594 
3595     /// The `error-context.debug-message` intrinsic.
3596     fn error_context_debug_message(
3597         &mut self,
3598         instance: Instance,
3599         caller: RuntimeComponentInstanceIndex,
3600         ty: TypeComponentLocalErrorContextTableIndex,
3601         options: OptionsIndex,
3602         err_ctx_handle: u32,
3603         debug_msg_address: u32,
3604     ) -> Result<()>;
3605 
3606     /// The `thread.new-indirect` intrinsic
3607     fn thread_new_indirect(
3608         &mut self,
3609         instance: Instance,
3610         caller: RuntimeComponentInstanceIndex,
3611         func_ty_idx: TypeFuncIndex,
3612         start_func_table_idx: RuntimeTableIndex,
3613         start_func_idx: u32,
3614         context: i32,
3615     ) -> Result<u32>;
3616 }
3617 
3618 /// SAFETY: See trait docs.
3619 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3620     unsafe fn prepare_call(
3621         &mut self,
3622         instance: Instance,
3623         memory: *mut VMMemoryDefinition,
3624         start: *mut VMFuncRef,
3625         return_: *mut VMFuncRef,
3626         caller_instance: RuntimeComponentInstanceIndex,
3627         callee_instance: RuntimeComponentInstanceIndex,
3628         task_return_type: TypeTupleIndex,
3629         callee_async: bool,
3630         string_encoding: u8,
3631         result_count_or_max_if_async: u32,
3632         storage: *mut ValRaw,
3633         storage_len: usize,
3634     ) -> Result<()> {
3635         // SAFETY: The `wasmtime_cranelift`-generated code that calls
3636         // this method will have ensured that `storage` is a valid
3637         // pointer containing at least `storage_len` items.
3638         let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3639 
3640         unsafe {
3641             instance.prepare_call(
3642                 StoreContextMut(self),
3643                 start,
3644                 return_,
3645                 caller_instance,
3646                 callee_instance,
3647                 task_return_type,
3648                 callee_async,
3649                 memory,
3650                 string_encoding,
3651                 match result_count_or_max_if_async {
3652                     PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3653                         params,
3654                         has_result: false,
3655                     },
3656                     PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3657                         params,
3658                         has_result: true,
3659                     },
3660                     result_count => CallerInfo::Sync {
3661                         params,
3662                         result_count,
3663                     },
3664                 },
3665             )
3666         }
3667     }
3668 
3669     unsafe fn sync_start(
3670         &mut self,
3671         instance: Instance,
3672         callback: *mut VMFuncRef,
3673         callee: *mut VMFuncRef,
3674         param_count: u32,
3675         storage: *mut MaybeUninit<ValRaw>,
3676         storage_len: usize,
3677     ) -> Result<()> {
3678         unsafe {
3679             instance
3680                 .start_call(
3681                     StoreContextMut(self),
3682                     callback,
3683                     ptr::null_mut(),
3684                     callee,
3685                     param_count,
3686                     1,
3687                     START_FLAG_ASYNC_CALLEE,
3688                     // SAFETY: The `wasmtime_cranelift`-generated code that calls
3689                     // this method will have ensured that `storage` is a valid
3690                     // pointer containing at least `storage_len` items.
3691                     Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3692                 )
3693                 .map(drop)
3694         }
3695     }
3696 
3697     unsafe fn async_start(
3698         &mut self,
3699         instance: Instance,
3700         callback: *mut VMFuncRef,
3701         post_return: *mut VMFuncRef,
3702         callee: *mut VMFuncRef,
3703         param_count: u32,
3704         result_count: u32,
3705         flags: u32,
3706     ) -> Result<u32> {
3707         unsafe {
3708             instance.start_call(
3709                 StoreContextMut(self),
3710                 callback,
3711                 post_return,
3712                 callee,
3713                 param_count,
3714                 result_count,
3715                 flags,
3716                 None,
3717             )
3718         }
3719     }
3720 
3721     fn future_write(
3722         &mut self,
3723         instance: Instance,
3724         caller: RuntimeComponentInstanceIndex,
3725         ty: TypeFutureTableIndex,
3726         options: OptionsIndex,
3727         future: u32,
3728         address: u32,
3729     ) -> Result<u32> {
3730         instance.id().get(self).check_may_leave(caller)?;
3731         instance
3732             .guest_write(
3733                 StoreContextMut(self),
3734                 caller,
3735                 TransmitIndex::Future(ty),
3736                 options,
3737                 None,
3738                 future,
3739                 address,
3740                 1,
3741             )
3742             .map(|result| result.encode())
3743     }
3744 
3745     fn future_read(
3746         &mut self,
3747         instance: Instance,
3748         caller: RuntimeComponentInstanceIndex,
3749         ty: TypeFutureTableIndex,
3750         options: OptionsIndex,
3751         future: u32,
3752         address: u32,
3753     ) -> Result<u32> {
3754         instance.id().get(self).check_may_leave(caller)?;
3755         instance
3756             .guest_read(
3757                 StoreContextMut(self),
3758                 caller,
3759                 TransmitIndex::Future(ty),
3760                 options,
3761                 None,
3762                 future,
3763                 address,
3764                 1,
3765             )
3766             .map(|result| result.encode())
3767     }
3768 
3769     fn stream_write(
3770         &mut self,
3771         instance: Instance,
3772         caller: RuntimeComponentInstanceIndex,
3773         ty: TypeStreamTableIndex,
3774         options: OptionsIndex,
3775         stream: u32,
3776         address: u32,
3777         count: u32,
3778     ) -> Result<u32> {
3779         instance.id().get(self).check_may_leave(caller)?;
3780         instance
3781             .guest_write(
3782                 StoreContextMut(self),
3783                 caller,
3784                 TransmitIndex::Stream(ty),
3785                 options,
3786                 None,
3787                 stream,
3788                 address,
3789                 count,
3790             )
3791             .map(|result| result.encode())
3792     }
3793 
3794     fn stream_read(
3795         &mut self,
3796         instance: Instance,
3797         caller: RuntimeComponentInstanceIndex,
3798         ty: TypeStreamTableIndex,
3799         options: OptionsIndex,
3800         stream: u32,
3801         address: u32,
3802         count: u32,
3803     ) -> Result<u32> {
3804         instance.id().get(self).check_may_leave(caller)?;
3805         instance
3806             .guest_read(
3807                 StoreContextMut(self),
3808                 caller,
3809                 TransmitIndex::Stream(ty),
3810                 options,
3811                 None,
3812                 stream,
3813                 address,
3814                 count,
3815             )
3816             .map(|result| result.encode())
3817     }
3818 
3819     fn future_drop_writable(
3820         &mut self,
3821         instance: Instance,
3822         caller: RuntimeComponentInstanceIndex,
3823         ty: TypeFutureTableIndex,
3824         writer: u32,
3825     ) -> Result<()> {
3826         instance.id().get(self).check_may_leave(caller)?;
3827         instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3828     }
3829 
3830     fn flat_stream_write(
3831         &mut self,
3832         instance: Instance,
3833         caller: RuntimeComponentInstanceIndex,
3834         ty: TypeStreamTableIndex,
3835         options: OptionsIndex,
3836         payload_size: u32,
3837         payload_align: u32,
3838         stream: u32,
3839         address: u32,
3840         count: u32,
3841     ) -> Result<u32> {
3842         instance.id().get(self).check_may_leave(caller)?;
3843         instance
3844             .guest_write(
3845                 StoreContextMut(self),
3846                 caller,
3847                 TransmitIndex::Stream(ty),
3848                 options,
3849                 Some(FlatAbi {
3850                     size: payload_size,
3851                     align: payload_align,
3852                 }),
3853                 stream,
3854                 address,
3855                 count,
3856             )
3857             .map(|result| result.encode())
3858     }
3859 
3860     fn flat_stream_read(
3861         &mut self,
3862         instance: Instance,
3863         caller: RuntimeComponentInstanceIndex,
3864         ty: TypeStreamTableIndex,
3865         options: OptionsIndex,
3866         payload_size: u32,
3867         payload_align: u32,
3868         stream: u32,
3869         address: u32,
3870         count: u32,
3871     ) -> Result<u32> {
3872         instance.id().get(self).check_may_leave(caller)?;
3873         instance
3874             .guest_read(
3875                 StoreContextMut(self),
3876                 caller,
3877                 TransmitIndex::Stream(ty),
3878                 options,
3879                 Some(FlatAbi {
3880                     size: payload_size,
3881                     align: payload_align,
3882                 }),
3883                 stream,
3884                 address,
3885                 count,
3886             )
3887             .map(|result| result.encode())
3888     }
3889 
3890     fn stream_drop_writable(
3891         &mut self,
3892         instance: Instance,
3893         caller: RuntimeComponentInstanceIndex,
3894         ty: TypeStreamTableIndex,
3895         writer: u32,
3896     ) -> Result<()> {
3897         instance.id().get(self).check_may_leave(caller)?;
3898         instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3899     }
3900 
3901     fn error_context_debug_message(
3902         &mut self,
3903         instance: Instance,
3904         caller: RuntimeComponentInstanceIndex,
3905         ty: TypeComponentLocalErrorContextTableIndex,
3906         options: OptionsIndex,
3907         err_ctx_handle: u32,
3908         debug_msg_address: u32,
3909     ) -> Result<()> {
3910         instance.id().get(self).check_may_leave(caller)?;
3911         instance.error_context_debug_message(
3912             StoreContextMut(self),
3913             ty,
3914             options,
3915             err_ctx_handle,
3916             debug_msg_address,
3917         )
3918     }
3919 
3920     fn thread_new_indirect(
3921         &mut self,
3922         instance: Instance,
3923         caller: RuntimeComponentInstanceIndex,
3924         func_ty_idx: TypeFuncIndex,
3925         start_func_table_idx: RuntimeTableIndex,
3926         start_func_idx: u32,
3927         context: i32,
3928     ) -> Result<u32> {
3929         instance.thread_new_indirect(
3930             StoreContextMut(self),
3931             caller,
3932             func_ty_idx,
3933             start_func_table_idx,
3934             start_func_idx,
3935             context,
3936         )
3937     }
3938 }
3939 
3940 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
3941 
3942 /// Represents the state of a pending host task.
3943 struct HostTask {
3944     common: WaitableCommon,
3945     caller_instance: RuntimeComponentInstanceIndex,
3946     join_handle: Option<JoinHandle>,
3947 }
3948 
3949 impl HostTask {
3950     fn new(
3951         caller_instance: RuntimeComponentInstanceIndex,
3952         join_handle: Option<JoinHandle>,
3953     ) -> Self {
3954         Self {
3955             common: WaitableCommon::default(),
3956             caller_instance,
3957             join_handle,
3958         }
3959     }
3960 }
3961 
3962 impl TableDebug for HostTask {
3963     fn type_name() -> &'static str {
3964         "HostTask"
3965     }
3966 }
3967 
3968 type CallbackFn = Box<
3969     dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
3970         + Send
3971         + Sync
3972         + 'static,
3973 >;
3974 
3975 /// Represents the caller of a given guest task.
3976 enum Caller {
3977     /// The host called the guest task.
3978     Host {
3979         /// If present, may be used to deliver the result.
3980         tx: Option<oneshot::Sender<LiftedResult>>,
3981         /// Channel to notify once all subtasks spawned by this caller have
3982         /// completed.
3983         ///
3984         /// Note that we'll never actually send anything to this channel;
3985         /// dropping it when the refcount goes to zero is sufficient to notify
3986         /// the receiver.
3987         exit_tx: Arc<oneshot::Sender<()>>,
3988         /// If true, there's a host future that must be dropped before the task
3989         /// can be deleted.
3990         host_future_present: bool,
3991         /// If true, call `post-return` function (if any) automatically.
3992         call_post_return_automatically: bool,
3993     },
3994     /// Another guest thread called the guest task
3995     Guest {
3996         /// The id of the caller
3997         thread: QualifiedThreadId,
3998         /// The instance to use to enforce reentrance rules.
3999         ///
4000         /// Note that this might not be the same as the instance the caller task
4001         /// started executing in given that one or more synchronous guest->guest
4002         /// calls may have occurred involving multiple instances.
4003         instance: RuntimeComponentInstanceIndex,
4004     },
4005 }
4006 
4007 /// Represents a closure and related canonical ABI parameters required to
4008 /// validate a `task.return` call at runtime and lift the result.
4009 struct LiftResult {
4010     lift: RawLift,
4011     ty: TypeTupleIndex,
4012     memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4013     string_encoding: StringEncoding,
4014 }
4015 
4016 /// The table ID for a guest thread, qualified by the task to which it belongs.
4017 ///
4018 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4019 /// for the common case of identifying the task to which a thread belongs.
4020 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4021 struct QualifiedThreadId {
4022     task: TableId<GuestTask>,
4023     thread: TableId<GuestThread>,
4024 }
4025 
4026 impl QualifiedThreadId {
4027     fn qualify(
4028         state: &mut ConcurrentState,
4029         thread: TableId<GuestThread>,
4030     ) -> Result<QualifiedThreadId> {
4031         Ok(QualifiedThreadId {
4032             task: state.get_mut(thread)?.parent_task,
4033             thread,
4034         })
4035     }
4036 }
4037 
4038 impl fmt::Debug for QualifiedThreadId {
4039     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4040         f.debug_tuple("QualifiedThreadId")
4041             .field(&self.task.rep())
4042             .field(&self.thread.rep())
4043             .finish()
4044     }
4045 }
4046 
4047 enum GuestThreadState {
4048     NotStartedImplicit,
4049     NotStartedExplicit(
4050         Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4051     ),
4052     Running,
4053     Suspended(StoreFiber<'static>),
4054     Pending,
4055     Completed,
4056 }
4057 pub struct GuestThread {
4058     /// Context-local state used to implement the `context.{get,set}`
4059     /// intrinsics.
4060     context: [u32; 2],
4061     /// The owning guest task.
4062     parent_task: TableId<GuestTask>,
4063     /// If present, indicates that the thread is currently waiting on the
4064     /// specified set but may be cancelled and woken immediately.
4065     wake_on_cancel: Option<TableId<WaitableSet>>,
4066     /// The execution state of this guest thread
4067     state: GuestThreadState,
4068     /// The index of this thread in the component instance's handle table.
4069     /// This must always be `Some` after initialization.
4070     instance_rep: Option<u32>,
4071 }
4072 
4073 impl GuestThread {
4074     /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4075     /// handle.
4076     fn from_instance(
4077         state: Pin<&mut ComponentInstance>,
4078         caller_instance: RuntimeComponentInstanceIndex,
4079         guest_thread: u32,
4080     ) -> Result<TableId<Self>> {
4081         let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?;
4082         Ok(TableId::new(rep))
4083     }
4084 
4085     fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4086         Self {
4087             context: [0; 2],
4088             parent_task,
4089             wake_on_cancel: None,
4090             state: GuestThreadState::NotStartedImplicit,
4091             instance_rep: None,
4092         }
4093     }
4094 
4095     fn new_explicit(
4096         parent_task: TableId<GuestTask>,
4097         start_func: Box<
4098             dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4099         >,
4100     ) -> Self {
4101         Self {
4102             context: [0; 2],
4103             parent_task,
4104             wake_on_cancel: None,
4105             state: GuestThreadState::NotStartedExplicit(start_func),
4106             instance_rep: None,
4107         }
4108     }
4109 }
4110 
4111 impl TableDebug for GuestThread {
4112     fn type_name() -> &'static str {
4113         "GuestThread"
4114     }
4115 }
4116 
4117 enum SyncResult {
4118     NotProduced,
4119     Produced(Option<ValRaw>),
4120     Taken,
4121 }
4122 
4123 impl SyncResult {
4124     fn take(&mut self) -> Option<Option<ValRaw>> {
4125         match mem::replace(self, SyncResult::Taken) {
4126             SyncResult::NotProduced => None,
4127             SyncResult::Produced(val) => Some(val),
4128             SyncResult::Taken => {
4129                 panic!("attempted to take a synchronous result that was already taken")
4130             }
4131         }
4132     }
4133 }
4134 
4135 #[derive(Debug)]
4136 enum HostFutureState {
4137     NotApplicable,
4138     Live,
4139     Dropped,
4140 }
4141 
4142 /// Represents a pending guest task.
4143 pub(crate) struct GuestTask {
4144     /// See `WaitableCommon`
4145     common: WaitableCommon,
4146     /// Closure to lower the parameters passed to this task.
4147     lower_params: Option<RawLower>,
4148     /// See `LiftResult`
4149     lift_result: Option<LiftResult>,
4150     /// A place to stash the type-erased lifted result if it can't be delivered
4151     /// immediately.
4152     result: Option<LiftedResult>,
4153     /// Closure to call the callback function for an async-lifted export, if
4154     /// provided.
4155     callback: Option<CallbackFn>,
4156     /// See `Caller`
4157     caller: Caller,
4158     /// A place to stash the call context for managing resource borrows while
4159     /// switching between guest tasks.
4160     call_context: Option<CallContext>,
4161     /// A place to stash the lowered result for a sync-to-async call until it
4162     /// can be returned to the caller.
4163     sync_result: SyncResult,
4164     /// Whether or not the task has been cancelled (i.e. whether the task is
4165     /// permitted to call `task.cancel`).
4166     cancel_sent: bool,
4167     /// Whether or not we've sent a `Status::Starting` event to any current or
4168     /// future waiters for this waitable.
4169     starting_sent: bool,
4170     /// Pending guest subtasks created by this task (directly or indirectly).
4171     ///
4172     /// This is used to re-parent subtasks which are still running when their
4173     /// parent task is disposed.
4174     subtasks: HashSet<TableId<GuestTask>>,
4175     /// Scratch waitable set used to watch subtasks during synchronous calls.
4176     sync_call_set: TableId<WaitableSet>,
4177     /// The instance to which the exported function for this guest task belongs.
4178     ///
4179     /// Note that the task may do a sync->sync call via a fused adapter which
4180     /// results in that task executing code in a different instance, and it may
4181     /// call host functions and intrinsics from that other instance.
4182     instance: RuntimeComponentInstanceIndex,
4183     /// If present, a pending `Event::None` or `Event::Cancelled` to be
4184     /// delivered to this task.
4185     event: Option<Event>,
4186     /// The `ExportIndex` of the guest function being called, if known.
4187     function_index: Option<ExportIndex>,
4188     /// Whether or not the task has exited.
4189     exited: bool,
4190     /// Threads belonging to this task
4191     threads: HashSet<TableId<GuestThread>>,
4192     /// The state of the host future that represents an async task, which must
4193     /// be dropped before we can delete the task.
4194     host_future_state: HostFutureState,
4195     /// Indicates whether this task was created for a call to an async-lifted
4196     /// export.
4197     async_function: bool,
4198 }
4199 
4200 impl GuestTask {
4201     fn already_lowered_parameters(&self) -> bool {
4202         // We reset `lower_params` after we lower the parameters
4203         self.lower_params.is_none()
4204     }
4205     fn returned_or_cancelled(&self) -> bool {
4206         // We reset `lift_result` after we return or exit
4207         self.lift_result.is_none()
4208     }
4209     fn ready_to_delete(&self) -> bool {
4210         let threads_completed = self.threads.is_empty();
4211         let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4212         let pending_completion_event = matches!(
4213             self.common.event,
4214             Some(Event::Subtask {
4215                 status: Status::Returned | Status::ReturnCancelled
4216             })
4217         );
4218         let ready = threads_completed
4219             && !has_sync_result
4220             && !pending_completion_event
4221             && !matches!(self.host_future_state, HostFutureState::Live);
4222         log::trace!(
4223             "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4224             threads_completed,
4225             has_sync_result,
4226             pending_completion_event,
4227             self.host_future_state
4228         );
4229         ready
4230     }
4231     fn new(
4232         state: &mut ConcurrentState,
4233         lower_params: RawLower,
4234         lift_result: LiftResult,
4235         caller: Caller,
4236         callback: Option<CallbackFn>,
4237         component_instance: RuntimeComponentInstanceIndex,
4238         async_function: bool,
4239     ) -> Result<Self> {
4240         let sync_call_set = state.push(WaitableSet::default())?;
4241         let host_future_state = match &caller {
4242             Caller::Guest { .. } => HostFutureState::NotApplicable,
4243             Caller::Host {
4244                 host_future_present,
4245                 ..
4246             } => {
4247                 if *host_future_present {
4248                     HostFutureState::Live
4249                 } else {
4250                     HostFutureState::NotApplicable
4251                 }
4252             }
4253         };
4254         Ok(Self {
4255             common: WaitableCommon::default(),
4256             lower_params: Some(lower_params),
4257             lift_result: Some(lift_result),
4258             result: None,
4259             callback,
4260             caller,
4261             call_context: Some(CallContext::default()),
4262             sync_result: SyncResult::NotProduced,
4263             cancel_sent: false,
4264             starting_sent: false,
4265             subtasks: HashSet::new(),
4266             sync_call_set,
4267             instance: component_instance,
4268             event: None,
4269             function_index: None,
4270             exited: false,
4271             threads: HashSet::new(),
4272             host_future_state,
4273             async_function,
4274         })
4275     }
4276 
4277     /// Dispose of this guest task, reparenting any pending subtasks to the
4278     /// caller.
4279     fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4280         // If there are not-yet-delivered completion events for subtasks in
4281         // `self.sync_call_set`, recursively dispose of those subtasks as well.
4282         for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4283             if let Some(Event::Subtask {
4284                 status: Status::Returned | Status::ReturnCancelled,
4285             }) = waitable.common(state)?.event
4286             {
4287                 waitable.delete_from(state)?;
4288             }
4289         }
4290 
4291         assert!(self.threads.is_empty());
4292 
4293         state.delete(self.sync_call_set)?;
4294 
4295         // Reparent any pending subtasks to the caller.
4296         match &self.caller {
4297             Caller::Guest {
4298                 thread,
4299                 instance: runtime_instance,
4300             } => {
4301                 let task_mut = state.get_mut(thread.task)?;
4302                 let present = task_mut.subtasks.remove(&me);
4303                 assert!(present);
4304 
4305                 for subtask in &self.subtasks {
4306                     task_mut.subtasks.insert(*subtask);
4307                 }
4308 
4309                 for subtask in &self.subtasks {
4310                     state.get_mut(*subtask)?.caller = Caller::Guest {
4311                         thread: *thread,
4312                         instance: *runtime_instance,
4313                     };
4314                 }
4315             }
4316             Caller::Host { exit_tx, .. } => {
4317                 for subtask in &self.subtasks {
4318                     state.get_mut(*subtask)?.caller = Caller::Host {
4319                         tx: None,
4320                         // Clone `exit_tx` to ensure that it is only dropped
4321                         // once all transitive subtasks of the host call have
4322                         // exited:
4323                         exit_tx: exit_tx.clone(),
4324                         host_future_present: false,
4325                         call_post_return_automatically: true,
4326                     };
4327                 }
4328             }
4329         }
4330 
4331         for subtask in self.subtasks {
4332             if state.get_mut(subtask)?.exited {
4333                 Waitable::Guest(subtask).delete_from(state)?;
4334             }
4335         }
4336 
4337         Ok(())
4338     }
4339 
4340     fn call_post_return_automatically(&self) -> bool {
4341         matches!(
4342             self.caller,
4343             Caller::Guest { .. }
4344                 | Caller::Host {
4345                     call_post_return_automatically: true,
4346                     ..
4347                 }
4348         )
4349     }
4350 }
4351 
4352 impl TableDebug for GuestTask {
4353     fn type_name() -> &'static str {
4354         "GuestTask"
4355     }
4356 }
4357 
4358 /// Represents state common to all kinds of waitables.
4359 #[derive(Default)]
4360 struct WaitableCommon {
4361     /// The currently pending event for this waitable, if any.
4362     event: Option<Event>,
4363     /// The set to which this waitable belongs, if any.
4364     set: Option<TableId<WaitableSet>>,
4365     /// The handle with which the guest refers to this waitable, if any.
4366     handle: Option<u32>,
4367 }
4368 
4369 /// Represents a Component Model Async `waitable`.
4370 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4371 enum Waitable {
4372     /// A host task
4373     Host(TableId<HostTask>),
4374     /// A guest task
4375     Guest(TableId<GuestTask>),
4376     /// The read or write end of a stream or future
4377     Transmit(TableId<TransmitHandle>),
4378 }
4379 
4380 impl Waitable {
4381     /// Retrieve the `Waitable` corresponding to the specified guest-visible
4382     /// handle.
4383     fn from_instance(
4384         state: Pin<&mut ComponentInstance>,
4385         caller_instance: RuntimeComponentInstanceIndex,
4386         waitable: u32,
4387     ) -> Result<Self> {
4388         use crate::runtime::vm::component::Waitable;
4389 
4390         let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?;
4391 
4392         Ok(match kind {
4393             Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4394             Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4395             Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4396         })
4397     }
4398 
4399     /// Retrieve the host-visible identifier for this `Waitable`.
4400     fn rep(&self) -> u32 {
4401         match self {
4402             Self::Host(id) => id.rep(),
4403             Self::Guest(id) => id.rep(),
4404             Self::Transmit(id) => id.rep(),
4405         }
4406     }
4407 
4408     /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4409     /// remove it from any set it may currently belong to (when `set` is
4410     /// `None`).
4411     fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4412         log::trace!("waitable {self:?} join set {set:?}",);
4413 
4414         let old = mem::replace(&mut self.common(state)?.set, set);
4415 
4416         if let Some(old) = old {
4417             match *self {
4418                 Waitable::Host(id) => state.remove_child(id, old),
4419                 Waitable::Guest(id) => state.remove_child(id, old),
4420                 Waitable::Transmit(id) => state.remove_child(id, old),
4421             }?;
4422 
4423             state.get_mut(old)?.ready.remove(self);
4424         }
4425 
4426         if let Some(set) = set {
4427             match *self {
4428                 Waitable::Host(id) => state.add_child(id, set),
4429                 Waitable::Guest(id) => state.add_child(id, set),
4430                 Waitable::Transmit(id) => state.add_child(id, set),
4431             }?;
4432 
4433             if self.common(state)?.event.is_some() {
4434                 self.mark_ready(state)?;
4435             }
4436         }
4437 
4438         Ok(())
4439     }
4440 
4441     /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4442     fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4443         Ok(match self {
4444             Self::Host(id) => &mut state.get_mut(*id)?.common,
4445             Self::Guest(id) => &mut state.get_mut(*id)?.common,
4446             Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4447         })
4448     }
4449 
4450     /// Set or clear the pending event for this waitable and either deliver it
4451     /// to the first waiter, if any, or mark it as ready to be delivered to the
4452     /// next waiter that arrives.
4453     fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4454         log::trace!("set event for {self:?}: {event:?}");
4455         self.common(state)?.event = event;
4456         self.mark_ready(state)
4457     }
4458 
4459     /// Take the pending event from this waitable, leaving `None` in its place.
4460     fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4461         let common = self.common(state)?;
4462         let event = common.event.take();
4463         if let Some(set) = self.common(state)?.set {
4464             state.get_mut(set)?.ready.remove(self);
4465         }
4466 
4467         Ok(event)
4468     }
4469 
4470     /// Deliver the current event for this waitable to the first waiter, if any,
4471     /// or else mark it as ready to be delivered to the next waiter that
4472     /// arrives.
4473     fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4474         if let Some(set) = self.common(state)?.set {
4475             state.get_mut(set)?.ready.insert(*self);
4476             if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4477                 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4478                 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4479 
4480                 let item = match mode {
4481                     WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4482                     WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4483                         thread,
4484                         kind: GuestCallKind::DeliverEvent {
4485                             instance,
4486                             set: Some(set),
4487                         },
4488                     }),
4489                 };
4490                 state.push_high_priority(item);
4491             }
4492         }
4493         Ok(())
4494     }
4495 
4496     /// Remove this waitable from the instance's rep table.
4497     fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4498         match self {
4499             Self::Host(task) => {
4500                 log::trace!("delete host task {task:?}");
4501                 state.delete(*task)?;
4502             }
4503             Self::Guest(task) => {
4504                 log::trace!("delete guest task {task:?}");
4505                 state.delete(*task)?.dispose(state, *task)?;
4506             }
4507             Self::Transmit(task) => {
4508                 state.delete(*task)?;
4509             }
4510         }
4511 
4512         Ok(())
4513     }
4514 }
4515 
4516 impl fmt::Debug for Waitable {
4517     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4518         match self {
4519             Self::Host(id) => write!(f, "{id:?}"),
4520             Self::Guest(id) => write!(f, "{id:?}"),
4521             Self::Transmit(id) => write!(f, "{id:?}"),
4522         }
4523     }
4524 }
4525 
4526 /// Represents a Component Model Async `waitable-set`.
4527 #[derive(Default)]
4528 struct WaitableSet {
4529     /// Which waitables in this set have pending events, if any.
4530     ready: BTreeSet<Waitable>,
4531     /// Which guest threads are currently waiting on this set, if any.
4532     waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4533 }
4534 
4535 impl TableDebug for WaitableSet {
4536     fn type_name() -> &'static str {
4537         "WaitableSet"
4538     }
4539 }
4540 
4541 /// Type-erased closure to lower the parameters for a guest task.
4542 type RawLower =
4543     Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4544 
4545 /// Type-erased closure to lift the result for a guest task.
4546 type RawLift = Box<
4547     dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4548 >;
4549 
4550 /// Type erased result of a guest task which may be downcast to the expected
4551 /// type by a host caller (or simply ignored in the case of a guest caller; see
4552 /// `DummyResult`).
4553 type LiftedResult = Box<dyn Any + Send + Sync>;
4554 
4555 /// Used to return a result from a `LiftFn` when the actual result has already
4556 /// been lowered to a guest task's stack and linear memory.
4557 struct DummyResult;
4558 
4559 /// Represents the Component Model Async state of a (sub-)component instance.
4560 #[derive(Default)]
4561 struct InstanceState {
4562     /// Whether backpressure is set for this instance (enabled if >0)
4563     backpressure: u16,
4564     /// Whether this instance can be entered
4565     do_not_enter: bool,
4566     /// Pending calls for this instance which require `Self::backpressure` to be
4567     /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4568     pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4569 }
4570 
4571 /// Represents the Component Model Async state of a store.
4572 pub struct ConcurrentState {
4573     /// The currently running guest thread, if any.
4574     guest_thread: Option<QualifiedThreadId>,
4575 
4576     /// The set of pending host and background tasks, if any.
4577     ///
4578     /// See `ComponentInstance::poll_until` for where we temporarily take this
4579     /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4580     futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4581     /// The table of waitables, waitable sets, etc.
4582     table: AlwaysMut<ResourceTable>,
4583     /// Per (sub-)component instance states.
4584     ///
4585     /// See `InstanceState` for details and note that this map is lazily
4586     /// populated as needed.
4587     // TODO: this can and should be a `PrimaryMap`
4588     instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>,
4589     /// The "high priority" work queue for this store's event loop.
4590     high_priority: Vec<WorkItem>,
4591     /// The "low priority" work queue for this store's event loop.
4592     low_priority: Vec<WorkItem>,
4593     /// A place to stash the reason a fiber is suspending so that the code which
4594     /// resumed it will know under what conditions the fiber should be resumed
4595     /// again.
4596     suspend_reason: Option<SuspendReason>,
4597     /// A cached fiber which is waiting for work to do.
4598     ///
4599     /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4600     worker: Option<StoreFiber<'static>>,
4601     /// A place to stash the work item for which we're resuming a worker fiber.
4602     worker_item: Option<WorkerItem>,
4603 
4604     /// Reference counts for all component error contexts
4605     ///
4606     /// NOTE: it is possible the global ref count to be *greater* than the sum of
4607     /// (sub)component ref counts as tracked by `error_context_tables`, for
4608     /// example when the host holds one or more references to error contexts.
4609     ///
4610     /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4611     /// component-wide representation) of the index into concurrent state for a given
4612     /// stored `ErrorContext`.
4613     ///
4614     /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4615     /// as a `TableId<ErrorContextState>`.
4616     global_error_context_ref_counts:
4617         BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4618 }
4619 
4620 impl Default for ConcurrentState {
4621     fn default() -> Self {
4622         Self {
4623             guest_thread: None,
4624             table: AlwaysMut::new(ResourceTable::new()),
4625             futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4626             instance_states: HashMap::new(),
4627             high_priority: Vec::new(),
4628             low_priority: Vec::new(),
4629             suspend_reason: None,
4630             worker: None,
4631             worker_item: None,
4632             global_error_context_ref_counts: BTreeMap::new(),
4633         }
4634     }
4635 }
4636 
4637 impl ConcurrentState {
4638     /// Take ownership of any fibers and futures owned by this object.
4639     ///
4640     /// This should be used when disposing of the `Store` containing this object
4641     /// in order to gracefully resolve any and all fibers using
4642     /// `StoreFiber::dispose`.  This is necessary to avoid possible
4643     /// use-after-free bugs due to fibers which may still have access to the
4644     /// `Store`.
4645     ///
4646     /// Additionally, the futures collected with this function should be dropped
4647     /// within a `tls::set` call, which will ensure than any futures closing
4648     /// over an `&Accessor` will have access to the store when dropped, allowing
4649     /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4650     /// panicking.
4651     ///
4652     /// Note that this will leave the object in an inconsistent and unusable
4653     /// state, so it should only be used just prior to dropping it.
4654     pub(crate) fn take_fibers_and_futures(
4655         &mut self,
4656         fibers: &mut Vec<StoreFiber<'static>>,
4657         futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4658     ) {
4659         for entry in self.table.get_mut().iter_mut() {
4660             if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4661                 for mode in mem::take(&mut set.waiting).into_values() {
4662                     if let WaitMode::Fiber(fiber) = mode {
4663                         fibers.push(fiber);
4664                     }
4665                 }
4666             } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4667                 if let GuestThreadState::Suspended(fiber) =
4668                     mem::replace(&mut thread.state, GuestThreadState::Completed)
4669                 {
4670                     fibers.push(fiber);
4671                 }
4672             }
4673         }
4674 
4675         if let Some(fiber) = self.worker.take() {
4676             fibers.push(fiber);
4677         }
4678 
4679         let mut take_items = |list| {
4680             for item in mem::take(list) {
4681                 match item {
4682                     WorkItem::ResumeFiber(fiber) => {
4683                         fibers.push(fiber);
4684                     }
4685                     WorkItem::PushFuture(future) => {
4686                         self.futures
4687                             .get_mut()
4688                             .as_mut()
4689                             .unwrap()
4690                             .push(future.into_inner());
4691                     }
4692                     _ => {}
4693                 }
4694             }
4695         };
4696 
4697         take_items(&mut self.high_priority);
4698         take_items(&mut self.low_priority);
4699 
4700         if let Some(them) = self.futures.get_mut().take() {
4701             futures.push(them);
4702         }
4703     }
4704 
4705     fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState {
4706         self.instance_states.entry(instance).or_default()
4707     }
4708 
4709     fn push<V: Send + Sync + 'static>(
4710         &mut self,
4711         value: V,
4712     ) -> Result<TableId<V>, ResourceTableError> {
4713         self.table.get_mut().push(value).map(TableId::from)
4714     }
4715 
4716     fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4717         self.table.get_mut().get_mut(&Resource::from(id))
4718     }
4719 
4720     pub fn add_child<T: 'static, U: 'static>(
4721         &mut self,
4722         child: TableId<T>,
4723         parent: TableId<U>,
4724     ) -> Result<(), ResourceTableError> {
4725         self.table
4726             .get_mut()
4727             .add_child(Resource::from(child), Resource::from(parent))
4728     }
4729 
4730     pub fn remove_child<T: 'static, U: 'static>(
4731         &mut self,
4732         child: TableId<T>,
4733         parent: TableId<U>,
4734     ) -> Result<(), ResourceTableError> {
4735         self.table
4736             .get_mut()
4737             .remove_child(Resource::from(child), Resource::from(parent))
4738     }
4739 
4740     fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4741         self.table.get_mut().delete(Resource::from(id))
4742     }
4743 
4744     fn push_future(&mut self, future: HostTaskFuture) {
4745         // Note that we can't directly push to `ConcurrentState::futures` here
4746         // since this may be called from a future that's being polled inside
4747         // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4748         // so it has exclusive access while polling it.  Therefore, we push a
4749         // work item to the "high priority" queue, which will actually push to
4750         // `ConcurrentState::futures` later.
4751         self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4752     }
4753 
4754     fn push_high_priority(&mut self, item: WorkItem) {
4755         log::trace!("push high priority: {item:?}");
4756         self.high_priority.push(item);
4757     }
4758 
4759     fn push_low_priority(&mut self, item: WorkItem) {
4760         log::trace!("push low priority: {item:?}");
4761         self.low_priority.push(item);
4762     }
4763 
4764     fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4765         if high_priority {
4766             self.push_high_priority(item);
4767         } else {
4768             self.push_low_priority(item);
4769         }
4770     }
4771 
4772     /// Determine whether the instance associated with the specified guest task
4773     /// may be entered (i.e. is not already on the async call stack).
4774     ///
4775     /// This is an additional check on top of the "may_enter" instance flag;
4776     /// it's needed because async-lifted exports with callback functions must
4777     /// not call their own instances directly or indirectly, and due to the
4778     /// "stackless" nature of callback-enabled guest tasks this may happen even
4779     /// if there are no activation records on the stack (i.e. the "may_enter"
4780     /// field is `true`) for that instance.
4781     fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4782         let guest_instance = self.get_mut(guest_task).unwrap().instance;
4783 
4784         // Walk the task tree back to the root, looking for potential
4785         // reentrance.
4786         //
4787         // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4788         // such that each bit represents and instance which has been entered by
4789         // that task or an ancestor of that task, in which case this would be a
4790         // constant time check.
4791         loop {
4792             let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4793                 Caller::Host { .. } => break true,
4794                 Caller::Guest { thread, instance } => {
4795                     if *instance == guest_instance {
4796                         break false;
4797                     } else {
4798                         *thread
4799                     }
4800                 }
4801             };
4802             guest_task = next_thread.task;
4803         }
4804     }
4805 
4806     /// Record that we're about to enter a (sub-)component instance which does
4807     /// not support more than one concurrent, stackful activation, meaning it
4808     /// cannot be entered again until the next call returns.
4809     fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) {
4810         self.instance_state(instance).do_not_enter = true;
4811     }
4812 
4813     /// Record that we've exited a (sub-)component instance previously entered
4814     /// with `Self::enter_instance` and then calls `Self::partition_pending`.
4815     /// See the documentation for the latter for details.
4816     fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4817         self.instance_state(instance).do_not_enter = false;
4818         self.partition_pending(instance)
4819     }
4820 
4821     /// Iterate over `InstanceState::pending`, moving any ready items into the
4822     /// "high priority" work item queue.
4823     ///
4824     /// See `GuestCall::is_ready` for details.
4825     fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> {
4826         for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
4827             let call = GuestCall { thread, kind };
4828             if call.is_ready(self)? {
4829                 self.push_high_priority(WorkItem::GuestCall(call));
4830             } else {
4831                 self.instance_state(instance)
4832                     .pending
4833                     .insert(call.thread, call.kind);
4834             }
4835         }
4836 
4837         Ok(())
4838     }
4839 
4840     /// Implements the `backpressure.{inc,dec}` intrinsics.
4841     pub(crate) fn backpressure_modify(
4842         &mut self,
4843         caller_instance: RuntimeComponentInstanceIndex,
4844         modify: impl FnOnce(u16) -> Option<u16>,
4845     ) -> Result<()> {
4846         let state = self.instance_state(caller_instance);
4847         let old = state.backpressure;
4848         let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
4849         state.backpressure = new;
4850 
4851         if old > 0 && new == 0 {
4852             // Backpressure was previously enabled and is now disabled; move any
4853             // newly-eligible guest calls to the "high priority" queue.
4854             self.partition_pending(caller_instance)?;
4855         }
4856 
4857         Ok(())
4858     }
4859 
4860     /// Implements the `context.get` intrinsic.
4861     pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4862         let thread = self.guest_thread.unwrap();
4863         let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4864         log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4865         Ok(val)
4866     }
4867 
4868     /// Implements the `context.set` intrinsic.
4869     pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4870         let thread = self.guest_thread.unwrap();
4871         log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4872         self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4873         Ok(())
4874     }
4875 
4876     /// Returns whether there's a pending cancellation on the current guest thread,
4877     /// consuming the event if so.
4878     fn take_pending_cancellation(&mut self) -> bool {
4879         let thread = self.guest_thread.unwrap();
4880         if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4881             assert!(matches!(event, Event::Cancelled));
4882             true
4883         } else {
4884             false
4885         }
4886     }
4887 
4888     fn check_blocking(&mut self) -> Result<()> {
4889         let task = self.guest_thread.unwrap().task;
4890         self.check_blocking_for(task)
4891     }
4892 
4893     fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4894         if self.may_block(task) {
4895             Ok(())
4896         } else {
4897             Err(Trap::CannotBlockSyncTask.into())
4898         }
4899     }
4900 
4901     fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4902         let task = self.get_mut(task).unwrap();
4903         task.async_function || task.returned_or_cancelled()
4904     }
4905 }
4906 
4907 /// Provide a type hint to compiler about the shape of a parameter lower
4908 /// closure.
4909 fn for_any_lower<
4910     F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4911 >(
4912     fun: F,
4913 ) -> F {
4914     fun
4915 }
4916 
4917 /// Provide a type hint to compiler about the shape of a result lift closure.
4918 fn for_any_lift<
4919     F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4920 >(
4921     fun: F,
4922 ) -> F {
4923     fun
4924 }
4925 
4926 /// Wrap the specified future in a `poll_fn` which asserts that the future is
4927 /// only polled from the event loop of the specified `Store`.
4928 ///
4929 /// See `StoreContextMut::run_concurrent` for details.
4930 fn checked<F: Future + Send + 'static>(
4931     id: StoreId,
4932     fut: F,
4933 ) -> impl Future<Output = F::Output> + Send + 'static {
4934     async move {
4935         let mut fut = pin!(fut);
4936         future::poll_fn(move |cx| {
4937             let message = "\
4938                 `Future`s which depend on asynchronous component tasks, streams, or \
4939                 futures to complete may only be polled from the event loop of the \
4940                 store to which they belong.  Please use \
4941                 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4942             ";
4943             tls::try_get(|store| {
4944                 let matched = match store {
4945                     tls::TryGet::Some(store) => store.id() == id,
4946                     tls::TryGet::Taken | tls::TryGet::None => false,
4947                 };
4948 
4949                 if !matched {
4950                     panic!("{message}")
4951                 }
4952             });
4953             fut.as_mut().poll(cx)
4954         })
4955         .await
4956     }
4957 }
4958 
4959 /// Assert that `StoreContextMut::run_concurrent` has not been called from
4960 /// within an store's event loop.
4961 fn check_recursive_run() {
4962     tls::try_get(|store| {
4963         if !matches!(store, tls::TryGet::None) {
4964             panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
4965         }
4966     });
4967 }
4968 
4969 fn unpack_callback_code(code: u32) -> (u32, u32) {
4970     (code & 0xF, code >> 4)
4971 }
4972 
4973 /// Helper struct for packaging parameters to be passed to
4974 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
4975 /// `waitable-set.poll`.
4976 struct WaitableCheckParams {
4977     set: TableId<WaitableSet>,
4978     options: OptionsIndex,
4979     payload: u32,
4980 }
4981 
4982 /// Helper enum for passing parameters to `ComponentInstance::waitable_check`.
4983 enum WaitableCheck {
4984     Wait(WaitableCheckParams),
4985     Poll(WaitableCheckParams),
4986 }
4987 
4988 /// Represents a guest task called from the host, prepared using `prepare_call`.
4989 pub(crate) struct PreparedCall<R> {
4990     /// The guest export to be called
4991     handle: Func,
4992     /// The guest thread created by `prepare_call`
4993     thread: QualifiedThreadId,
4994     /// The number of lowered core Wasm parameters to pass to the call.
4995     param_count: usize,
4996     /// The `oneshot::Receiver` to which the result of the call will be
4997     /// delivered when it is available.
4998     rx: oneshot::Receiver<LiftedResult>,
4999     /// The `oneshot::Receiver` which will resolve when the task -- and any
5000     /// transitive subtasks -- have all exited.
5001     exit_rx: oneshot::Receiver<()>,
5002     _phantom: PhantomData<R>,
5003 }
5004 
5005 impl<R> PreparedCall<R> {
5006     /// Get a copy of the `TaskId` for this `PreparedCall`.
5007     pub(crate) fn task_id(&self) -> TaskId {
5008         TaskId {
5009             task: self.thread.task,
5010         }
5011     }
5012 }
5013 
5014 /// Represents a task created by `prepare_call`.
5015 pub(crate) struct TaskId {
5016     task: TableId<GuestTask>,
5017 }
5018 
5019 impl TaskId {
5020     /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5021     /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5022     /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5023     /// and can be resumed by other tasks for this component, so we mark the future as dropped
5024     /// and delete the task when all threads are done.
5025     pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5026         let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5027         if !task.already_lowered_parameters() {
5028             Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5029         } else {
5030             task.host_future_state = HostFutureState::Dropped;
5031             if task.ready_to_delete() {
5032                 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5033             }
5034         }
5035         Ok(())
5036     }
5037 }
5038 
5039 /// Prepare a call to the specified exported Wasm function, providing functions
5040 /// for lowering the parameters and lifting the result.
5041 ///
5042 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5043 /// loop, use `queue_call`.
5044 pub(crate) fn prepare_call<T, R>(
5045     mut store: StoreContextMut<T>,
5046     handle: Func,
5047     param_count: usize,
5048     host_future_present: bool,
5049     call_post_return_automatically: bool,
5050     lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5051     + Send
5052     + Sync
5053     + 'static,
5054     lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5055     + Send
5056     + Sync
5057     + 'static,
5058 ) -> Result<PreparedCall<R>> {
5059     let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5060 
5061     let instance = handle.instance().id().get(store.0);
5062     let options = &instance.component().env_component().options[options];
5063     let ty = &instance.component().types()[ty];
5064     let async_function = ty.async_;
5065     let task_return_type = ty.results;
5066     let component_instance = raw_options.instance;
5067     let callback = options.callback.map(|i| instance.runtime_callback(i));
5068     let memory = options
5069         .memory()
5070         .map(|i| instance.runtime_memory(i))
5071         .map(SendSyncPtr::new);
5072     let string_encoding = options.string_encoding;
5073     let token = StoreToken::new(store.as_context_mut());
5074     let state = store.0.concurrent_state_mut();
5075 
5076     assert!(state.guest_thread.is_none());
5077 
5078     let (tx, rx) = oneshot::channel();
5079     let (exit_tx, exit_rx) = oneshot::channel();
5080 
5081     let mut task = GuestTask::new(
5082         state,
5083         Box::new(for_any_lower(move |store, params| {
5084             lower_params(handle, token.as_context_mut(store), params)
5085         })),
5086         LiftResult {
5087             lift: Box::new(for_any_lift(move |store, result| {
5088                 lift_result(handle, store, result)
5089             })),
5090             ty: task_return_type,
5091             memory,
5092             string_encoding,
5093         },
5094         Caller::Host {
5095             tx: Some(tx),
5096             exit_tx: Arc::new(exit_tx),
5097             host_future_present,
5098             call_post_return_automatically,
5099         },
5100         callback.map(|callback| {
5101             let callback = SendSyncPtr::new(callback);
5102             let instance = handle.instance();
5103             Box::new(
5104                 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5105                     let store = token.as_context_mut(store);
5106                     // SAFETY: Per the contract of `prepare_call`, the callback
5107                     // will remain valid at least as long is this task exists.
5108                     unsafe {
5109                         instance.call_callback(
5110                             store,
5111                             runtime_instance,
5112                             callback,
5113                             event,
5114                             handle,
5115                             call_post_return_automatically,
5116                         )
5117                     }
5118                 },
5119             ) as CallbackFn
5120         }),
5121         component_instance,
5122         async_function,
5123     )?;
5124     task.function_index = Some(handle.index());
5125 
5126     let task = state.push(task)?;
5127     let thread = state.push(GuestThread::new_implicit(task))?;
5128     state.get_mut(task)?.threads.insert(thread);
5129 
5130     Ok(PreparedCall {
5131         handle,
5132         thread: QualifiedThreadId { task, thread },
5133         param_count,
5134         rx,
5135         exit_rx,
5136         _phantom: PhantomData,
5137     })
5138 }
5139 
5140 /// Queue a call previously prepared using `prepare_call` to be run as part of
5141 /// the associated `ComponentInstance`'s event loop.
5142 ///
5143 /// The returned future will resolve to the result once it is available, but
5144 /// must only be polled via the instance's event loop. See
5145 /// `StoreContextMut::run_concurrent` for details.
5146 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5147     mut store: StoreContextMut<T>,
5148     prepared: PreparedCall<R>,
5149 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5150     let PreparedCall {
5151         handle,
5152         thread,
5153         param_count,
5154         rx,
5155         exit_rx,
5156         ..
5157     } = prepared;
5158 
5159     queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5160 
5161     Ok(checked(
5162         store.0.id(),
5163         rx.map(move |result| {
5164             result
5165                 .map(|v| (*v.downcast().unwrap(), exit_rx))
5166                 .map_err(anyhow::Error::from)
5167         }),
5168     ))
5169 }
5170 
5171 /// Queue a call previously prepared using `prepare_call` to be run as part of
5172 /// the associated `ComponentInstance`'s event loop.
5173 fn queue_call0<T: 'static>(
5174     store: StoreContextMut<T>,
5175     handle: Func,
5176     guest_thread: QualifiedThreadId,
5177     param_count: usize,
5178 ) -> Result<()> {
5179     let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5180     let is_concurrent = raw_options.async_;
5181     let callback = raw_options.callback;
5182     let instance = handle.instance();
5183     let callee = handle.lifted_core_func(store.0);
5184     let post_return = handle.post_return_core_func(store.0);
5185     let callback = callback.map(|i| {
5186         let instance = instance.id().get(store.0);
5187         SendSyncPtr::new(instance.runtime_callback(i))
5188     });
5189 
5190     log::trace!("queueing call {guest_thread:?}");
5191 
5192     let instance_flags = if callback.is_none() {
5193         None
5194     } else {
5195         Some(flags)
5196     };
5197 
5198     // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5199     // (with signatures appropriate for this call) and will remain valid as
5200     // long as this instance is valid.
5201     unsafe {
5202         instance.queue_call(
5203             store,
5204             guest_thread,
5205             SendSyncPtr::new(callee),
5206             param_count,
5207             1,
5208             instance_flags,
5209             is_concurrent,
5210             callback,
5211             post_return.map(SendSyncPtr::new),
5212         )
5213     }
5214 }
5215