1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics.  See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create.  Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest.  This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker.  That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52 
53 use crate::component::func::{self, Func};
54 use crate::component::store::StoreComponentInstanceId;
55 use crate::component::{
56     ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError,
57 };
58 use crate::fiber::{self, StoreFiber, StoreFiberYield};
59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60 use crate::vm::component::{
61     CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables,
62 };
63 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
64 use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
65 use anyhow::{Context as _, Result, anyhow, bail};
66 use error_contexts::GlobalErrorContextRefCount;
67 use futures::channel::oneshot;
68 use futures::future::{self, Either, FutureExt};
69 use futures::stream::{FuturesUnordered, StreamExt};
70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71 use std::any::Any;
72 use std::borrow::ToOwned;
73 use std::boxed::Box;
74 use std::cell::UnsafeCell;
75 use std::collections::{BTreeMap, BTreeSet, HashSet};
76 use std::fmt;
77 use std::future::Future;
78 use std::marker::PhantomData;
79 use std::mem::{self, ManuallyDrop, MaybeUninit};
80 use std::ops::DerefMut;
81 use std::pin::{Pin, pin};
82 use std::ptr::{self, NonNull};
83 use std::slice;
84 use std::sync::Arc;
85 use std::task::{Context, Poll, Waker};
86 use std::vec::Vec;
87 use table::{TableDebug, TableId};
88 use wasmtime_environ::Trap;
89 use wasmtime_environ::component::{
90     CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS,
91     MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
92     RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
93     TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
94     TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
95 };
96 
97 pub use abort::JoinHandle;
98 pub use future_stream_any::{FutureAny, StreamAny};
99 pub use futures_and_streams::{
100     Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
101     FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
102     StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
103 };
104 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
105 
106 mod abort;
107 mod error_contexts;
108 mod future_stream_any;
109 mod futures_and_streams;
110 pub(crate) mod table;
111 pub(crate) mod tls;
112 
113 /// Constant defined in the Component Model spec to indicate that the async
114 /// intrinsic (e.g. `future.write`) has not yet completed.
115 const BLOCKED: u32 = 0xffff_ffff;
116 
117 /// Corresponds to `CallState` in the upstream spec.
118 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
119 pub enum Status {
120     Starting = 0,
121     Started = 1,
122     Returned = 2,
123     StartCancelled = 3,
124     ReturnCancelled = 4,
125 }
126 
127 impl Status {
128     /// Packs this status and the optional `waitable` provided into a 32-bit
129     /// result that the canonical ABI requires.
130     ///
131     /// The low 4 bits are reserved for the status while the upper 28 bits are
132     /// the waitable, if present.
133     pub fn pack(self, waitable: Option<u32>) -> u32 {
134         assert!(matches!(self, Status::Returned) == waitable.is_none());
135         let waitable = waitable.unwrap_or(0);
136         assert!(waitable < (1 << 28));
137         (waitable << 4) | (self as u32)
138     }
139 }
140 
141 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
142 /// data.
143 #[derive(Clone, Copy, Debug)]
144 enum Event {
145     None,
146     Cancelled,
147     Subtask {
148         status: Status,
149     },
150     StreamRead {
151         code: ReturnCode,
152         pending: Option<(TypeStreamTableIndex, u32)>,
153     },
154     StreamWrite {
155         code: ReturnCode,
156         pending: Option<(TypeStreamTableIndex, u32)>,
157     },
158     FutureRead {
159         code: ReturnCode,
160         pending: Option<(TypeFutureTableIndex, u32)>,
161     },
162     FutureWrite {
163         code: ReturnCode,
164         pending: Option<(TypeFutureTableIndex, u32)>,
165     },
166 }
167 
168 impl Event {
169     /// Lower this event to core Wasm integers for delivery to the guest.
170     ///
171     /// Note that the waitable handle, if any, is assumed to be lowered
172     /// separately.
173     fn parts(self) -> (u32, u32) {
174         const EVENT_NONE: u32 = 0;
175         const EVENT_SUBTASK: u32 = 1;
176         const EVENT_STREAM_READ: u32 = 2;
177         const EVENT_STREAM_WRITE: u32 = 3;
178         const EVENT_FUTURE_READ: u32 = 4;
179         const EVENT_FUTURE_WRITE: u32 = 5;
180         const EVENT_CANCELLED: u32 = 6;
181         match self {
182             Event::None => (EVENT_NONE, 0),
183             Event::Cancelled => (EVENT_CANCELLED, 0),
184             Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
185             Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
186             Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
187             Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
188             Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
189         }
190     }
191 }
192 
193 /// Corresponds to `CallbackCode` in the spec.
194 mod callback_code {
195     pub const EXIT: u32 = 0;
196     pub const YIELD: u32 = 1;
197     pub const WAIT: u32 = 2;
198 }
199 
200 /// A flag indicating that the callee is an async-lowered export.
201 ///
202 /// This may be passed to the `async-start` intrinsic from a fused adapter.
203 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
204 
205 /// Provides access to either store data (via the `get` method) or the store
206 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
207 /// instance to which the current host task belongs.
208 ///
209 /// See [`Accessor::with`] for details.
210 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
211     store: StoreContextMut<'a, T>,
212     get_data: fn(&mut T) -> D::Data<'_>,
213 }
214 
215 impl<'a, T, D> Access<'a, T, D>
216 where
217     D: HasData + ?Sized,
218     T: 'static,
219 {
220     /// Creates a new [`Access`] from its component parts.
221     pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
222         Self { store, get_data }
223     }
224 
225     /// Get mutable access to the store data.
226     pub fn data_mut(&mut self) -> &mut T {
227         self.store.data_mut()
228     }
229 
230     /// Get mutable access to the store data.
231     pub fn get(&mut self) -> D::Data<'_> {
232         (self.get_data)(self.data_mut())
233     }
234 
235     /// Spawn a background task.
236     ///
237     /// See [`Accessor::spawn`] for details.
238     pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
239     where
240         T: 'static,
241     {
242         let accessor = Accessor {
243             get_data: self.get_data,
244             token: StoreToken::new(self.store.as_context_mut()),
245         };
246         self.store
247             .as_context_mut()
248             .spawn_with_accessor(accessor, task)
249     }
250 
251     /// Returns the getter this accessor is using to project from `T` into
252     /// `D::Data`.
253     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
254         self.get_data
255     }
256 }
257 
258 impl<'a, T, D> AsContext for Access<'a, T, D>
259 where
260     D: HasData + ?Sized,
261     T: 'static,
262 {
263     type Data = T;
264 
265     fn as_context(&self) -> StoreContext<'_, T> {
266         self.store.as_context()
267     }
268 }
269 
270 impl<'a, T, D> AsContextMut for Access<'a, T, D>
271 where
272     D: HasData + ?Sized,
273     T: 'static,
274 {
275     fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
276         self.store.as_context_mut()
277     }
278 }
279 
280 /// Provides scoped mutable access to store data in the context of a concurrent
281 /// host task future.
282 ///
283 /// This allows multiple host task futures to execute concurrently and access
284 /// the store between (but not across) `await` points.
285 ///
286 /// # Rationale
287 ///
288 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
289 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
290 /// literally store these values. The basic problem is that when a concurrent
291 /// host future is being polled it has access to `&mut T` (and the whole
292 /// `Store`) but when it's not being polled it does not have access to these
293 /// values. This reflects how the store is only ever polling one future at a
294 /// time so the store is effectively being passed between futures.
295 ///
296 /// Rust's `Future` trait, however, has no means of passing a `Store`
297 /// temporarily between futures. The [`Context`](std::task::Context) type does
298 /// not have the ability to attach arbitrary information to it at this time.
299 /// This type, [`Accessor`], is used to bridge this expressivity gap.
300 ///
301 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
302 /// a synchronous manner, the current store. The [`Accessor::with`] function
303 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
304 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
305 /// not take an `async` closure as its argument, instead it's a synchronous
306 /// closure which must complete during on run of `Future::poll`. This reflects
307 /// how the store is temporarily made available while a host future is being
308 /// polled.
309 ///
310 /// # Implementation
311 ///
312 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
313 /// this type additionally doesn't even have a lifetime parameter. This is
314 /// instead a representation of proof of the ability to acquire these while a
315 /// future is being polled. Wasmtime will, when it polls a host future,
316 /// configure ambient state such that the `Accessor` that a future closes over
317 /// will work and be able to access the store.
318 ///
319 /// This has a number of implications for users such as:
320 ///
321 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
322 ///   the lifetime of a single future.
323 /// * A future is expected to, however, close over an `Accessor` and keep it
324 ///   alive probably for the duration of the entire future.
325 /// * Different host futures will be given different `Accessor`s, and that's
326 ///   intentional.
327 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
328 ///   alleviates some otherwise required bounds to be written down.
329 ///
330 /// # Using `Accessor` in `Drop`
331 ///
332 /// The methods on `Accessor` are only expected to work in the context of
333 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
334 /// host future can be dropped at any time throughout the system and Wasmtime
335 /// store context is not necessarily available at that time. It's recommended to
336 /// not use `Accessor` methods in anything connected to a `Drop` implementation
337 /// as they will panic and have unintended results. If you run into this though
338 /// feel free to file an issue on the Wasmtime repository.
339 pub struct Accessor<T: 'static, D = HasSelf<T>>
340 where
341     D: HasData + ?Sized,
342 {
343     token: StoreToken<T>,
344     get_data: fn(&mut T) -> D::Data<'_>,
345 }
346 
347 /// A helper trait to take any type of accessor-with-data in functions.
348 ///
349 /// This trait is similar to [`AsContextMut`] except that it's used when
350 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
351 /// [`Accessor`] is the main type used in concurrent settings and is passed to
352 /// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`].
353 ///
354 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
355 /// this trait. This effectively means that regardless of the `D` in
356 /// `Accessor<T, D>` it can still be passed to a function which just needs a
357 /// store accessor.
358 ///
359 /// Acquiring an [`Accessor`] can be done through
360 /// [`StoreContextMut::run_concurrent`] for example or in a host function
361 /// through
362 /// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent).
363 pub trait AsAccessor {
364     /// The `T` in `Store<T>` that this accessor refers to.
365     type Data: 'static;
366 
367     /// The `D` in `Accessor<T, D>`, or the projection out of
368     /// `Self::Data`.
369     type AccessorData: HasData + ?Sized;
370 
371     /// Returns the accessor that this is referring to.
372     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
373 }
374 
375 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
376     type Data = T::Data;
377     type AccessorData = T::AccessorData;
378 
379     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
380         T::as_accessor(self)
381     }
382 }
383 
384 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
385     type Data = T;
386     type AccessorData = D;
387 
388     fn as_accessor(&self) -> &Accessor<T, D> {
389         self
390     }
391 }
392 
393 // Note that it is intentional at this time that `Accessor` does not actually
394 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
395 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
396 // that matter). This is used to ergonomically simplify bindings where the
397 // majority of the time `Accessor` is closed over in a future which then needs
398 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
399 // you already have to write `T: 'static`...) it helps to avoid this.
400 //
401 // Note as well that `Accessor` doesn't actually store its data at all. Instead
402 // it's more of a "proof" of what can be accessed from TLS. API design around
403 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
404 // intentionally made to ensure that `Accessor` is ideally only used in the
405 // context that TLS variables are actually set. For example host functions are
406 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
407 // the value outside of a future. Within the future the TLS variables are all
408 // guaranteed to be set while the future is being polled.
409 //
410 // Finally though this is not an ironclad guarantee, but nor does it need to be.
411 // The TLS APIs are designed to panic or otherwise model usage where they're
412 // called recursively or similar. It's hoped that code cannot be constructed to
413 // actually hit this at runtime but this is not a safety requirement at this
414 // time.
415 const _: () = {
416     const fn assert<T: Send + Sync>() {}
417     assert::<Accessor<UnsafeCell<u32>>>();
418 };
419 
420 impl<T> Accessor<T> {
421     /// Creates a new `Accessor` backed by the specified functions.
422     ///
423     /// - `get`: used to retrieve the store
424     ///
425     /// - `get_data`: used to "project" from the store's associated data to
426     /// another type (e.g. a field of that data or a wrapper around it).
427     ///
428     /// - `spawn`: used to queue spawned background tasks to be run later
429     pub(crate) fn new(token: StoreToken<T>) -> Self {
430         Self {
431             token,
432             get_data: |x| x,
433         }
434     }
435 }
436 
437 impl<T, D> Accessor<T, D>
438 where
439     D: HasData + ?Sized,
440 {
441     /// Run the specified closure, passing it mutable access to the store.
442     ///
443     /// This function is one of the main building blocks of the [`Accessor`]
444     /// type. This yields synchronous, blocking, access to store via an
445     /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
446     /// providing the ability to access `D` via [`Access::get`]. Note that the
447     /// `fun` here is given only temporary access to the store and `T`/`D`
448     /// meaning that the return value `R` here is not allowed to capture borrows
449     /// into the two. If access is needed to data within `T` or `D` outside of
450     /// this closure then it must be `clone`d out, for example.
451     ///
452     /// # Panics
453     ///
454     /// This function will panic if it is call recursively with any other
455     /// accessor already in scope. For example if `with` is called within `fun`,
456     /// then this function will panic. It is up to the embedder to ensure that
457     /// this does not happen.
458     pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
459         tls::get(|vmstore| {
460             fun(Access {
461                 store: self.token.as_context_mut(vmstore),
462                 get_data: self.get_data,
463             })
464         })
465     }
466 
467     /// Returns the getter this accessor is using to project from `T` into
468     /// `D::Data`.
469     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
470         self.get_data
471     }
472 
473     /// Changes this accessor to access `D2` instead of the current type
474     /// parameter `D`.
475     ///
476     /// This changes the underlying data access from `T` to `D2::Data<'_>`.
477     ///
478     /// # Panics
479     ///
480     /// When using this API the returned value is disconnected from `&self` and
481     /// the lifetime binding the `self` argument. An `Accessor` only works
482     /// within the context of the closure or async closure that it was
483     /// originally given to, however. This means that due to the fact that the
484     /// returned value has no lifetime connection it's possible to use the
485     /// accessor outside of `&self`, the original accessor, and panic.
486     ///
487     /// The returned value should only be used within the scope of the original
488     /// `Accessor` that `self` refers to.
489     pub fn with_getter<D2: HasData>(
490         &self,
491         get_data: fn(&mut T) -> D2::Data<'_>,
492     ) -> Accessor<T, D2> {
493         Accessor {
494             token: self.token,
495             get_data,
496         }
497     }
498 
499     /// Spawn a background task which will receive an `&Accessor<T, D>` and
500     /// run concurrently with any other tasks in progress for the current
501     /// store.
502     ///
503     /// This is particularly useful for host functions which return a `stream`
504     /// or `future` such that the code to write to the write end of that
505     /// `stream` or `future` must run after the function returns.
506     ///
507     /// The returned [`JoinHandle`] may be used to cancel the task.
508     ///
509     /// # Panics
510     ///
511     /// Panics if called within a closure provided to the [`Accessor::with`]
512     /// function. This can only be called outside an active invocation of
513     /// [`Accessor::with`].
514     pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
515     where
516         T: 'static,
517     {
518         let accessor = self.clone_for_spawn();
519         self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
520     }
521 
522     fn clone_for_spawn(&self) -> Self {
523         Self {
524             token: self.token,
525             get_data: self.get_data,
526         }
527     }
528 }
529 
530 /// Represents a task which may be provided to `Accessor::spawn`,
531 /// `Accessor::forward`, or `StorecContextMut::spawn`.
532 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
533 // option.
534 //
535 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
536 // bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
537 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
538 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
539 // it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
540 // best we can do for the time being.
541 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
542 where
543     D: HasData + ?Sized,
544 {
545     /// Run the task.
546     fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
547 }
548 
549 /// Represents parameter and result metadata for the caller side of a
550 /// guest->guest call orchestrated by a fused adapter.
551 enum CallerInfo {
552     /// Metadata for a call to an async-lowered import
553     Async {
554         params: Vec<ValRaw>,
555         has_result: bool,
556     },
557     /// Metadata for a call to an sync-lowered import
558     Sync {
559         params: Vec<ValRaw>,
560         result_count: u32,
561     },
562 }
563 
564 /// Indicates how a guest task is waiting on a waitable set.
565 enum WaitMode {
566     /// The guest task is waiting using `task.wait`
567     Fiber(StoreFiber<'static>),
568     /// The guest task is waiting via a callback declared as part of an
569     /// async-lifted export.
570     Callback(Instance),
571 }
572 
573 /// Represents the reason a fiber is suspending itself.
574 #[derive(Debug)]
575 enum SuspendReason {
576     /// The fiber is waiting for an event to be delivered to the specified
577     /// waitable set or task.
578     Waiting {
579         set: TableId<WaitableSet>,
580         thread: QualifiedThreadId,
581         skip_may_block_check: bool,
582     },
583     /// The fiber has finished handling its most recent work item and is waiting
584     /// for another (or to be dropped if it is no longer needed).
585     NeedWork,
586     /// The fiber is yielding and should be resumed once other tasks have had a
587     /// chance to run.
588     Yielding { thread: QualifiedThreadId },
589     /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
590     ExplicitlySuspending {
591         thread: QualifiedThreadId,
592         skip_may_block_check: bool,
593     },
594 }
595 
596 /// Represents a pending call into guest code for a given guest task.
597 enum GuestCallKind {
598     /// Indicates there's an event to deliver to the task, possibly related to a
599     /// waitable set the task has been waiting on or polling.
600     DeliverEvent {
601         /// The instance to which the task belongs.
602         instance: Instance,
603         /// The waitable set the event belongs to, if any.
604         ///
605         /// If this is `None` the event will be waiting in the
606         /// `GuestTask::event` field for the task.
607         set: Option<TableId<WaitableSet>>,
608     },
609     /// Indicates that a new guest task call is pending and may be executed
610     /// using the specified closure.
611     ///
612     /// If the closure returns `Ok(Some(call))`, the `call` should be run
613     /// immediately using `handle_guest_call`.
614     StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
615     StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
616 }
617 
618 impl fmt::Debug for GuestCallKind {
619     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
620         match self {
621             Self::DeliverEvent { instance, set } => f
622                 .debug_struct("DeliverEvent")
623                 .field("instance", instance)
624                 .field("set", set)
625                 .finish(),
626             Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
627             Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
628         }
629     }
630 }
631 
632 /// Represents a pending call into guest code for a given guest thread.
633 #[derive(Debug)]
634 struct GuestCall {
635     thread: QualifiedThreadId,
636     kind: GuestCallKind,
637 }
638 
639 impl GuestCall {
640     /// Returns whether or not the call is ready to run.
641     ///
642     /// A call will not be ready to run if either:
643     ///
644     /// - the (sub-)component instance to be called has already been entered and
645     /// cannot be reentered until an in-progress call completes
646     ///
647     /// - the call is for a not-yet started task and the (sub-)component
648     /// instance to be called has backpressure enabled
649     fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
650         let instance = store
651             .concurrent_state_mut()
652             .get_mut(self.thread.task)?
653             .instance;
654         let state = store.instance_state(instance);
655 
656         let ready = match &self.kind {
657             GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
658             GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
659             GuestCallKind::StartExplicit(_) => true,
660         };
661         log::trace!(
662             "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
663             state.do_not_enter,
664             state.backpressure
665         );
666         Ok(ready)
667     }
668 }
669 
670 /// Job to be run on a worker fiber.
671 enum WorkerItem {
672     GuestCall(GuestCall),
673     Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
674 }
675 
676 /// Represents a pending work item to be handled by the event loop for a given
677 /// component instance.
678 enum WorkItem {
679     /// A host task to be pushed to `ConcurrentState::futures`.
680     PushFuture(AlwaysMut<HostTaskFuture>),
681     /// A fiber to resume.
682     ResumeFiber(StoreFiber<'static>),
683     /// A pending call into guest code for a given guest task.
684     GuestCall(GuestCall),
685     /// A job to run on a worker fiber.
686     WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
687 }
688 
689 impl fmt::Debug for WorkItem {
690     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
691         match self {
692             Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
693             Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
694             Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(),
695             Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
696         }
697     }
698 }
699 
700 /// Whether a suspension intrinsic was cancelled or completed
701 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
702 pub(crate) enum WaitResult {
703     Cancelled,
704     Completed,
705 }
706 
707 /// Raise a trap if the calling task is synchronous and trying to block prior to
708 /// returning a value.
709 pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> {
710     store.concurrent_state_mut().check_blocking()
711 }
712 
713 /// Poll the specified future until it completes on behalf of a guest->host call
714 /// using a sync-lowered import.
715 ///
716 /// This is similar to `Instance::first_poll` except it's for sync-lowered
717 /// imports, meaning we don't need to handle cancellation and we can block the
718 /// caller until the task completes, at which point the caller can handle
719 /// lowering the result to the guest's stack and linear memory.
720 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
721     store: &mut dyn VMStore,
722     future: impl Future<Output = Result<R>> + Send + 'static,
723     caller_instance: RuntimeComponentInstanceIndex,
724 ) -> Result<R> {
725     let state = store.concurrent_state_mut();
726 
727     // If there is no current guest thread set, that means the host function was
728     // registered using e.g. `LinkerInstance::func_wrap`, in which case it
729     // should complete immediately.
730     let Some(caller) = state.guest_thread else {
731         return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) {
732             Poll::Ready(result) => result,
733             Poll::Pending => {
734                 unreachable!()
735             }
736         };
737     };
738 
739     // Save any existing result stashed in `GuestTask::result` so we can replace
740     // it with the new result.
741     let old_result = state
742         .get_mut(caller.task)
743         .with_context(|| format!("bad handle: {caller:?}"))?
744         .result
745         .take();
746 
747     // Add a temporary host task into the table so we can track its progress.
748     // Note that we'll never allocate a waitable handle for the guest since
749     // we're being called synchronously.
750     let task = state.push(HostTask::new(caller_instance, None))?;
751 
752     log::trace!("new host task child of {caller:?}: {task:?}");
753 
754     // Wrap the future in a closure which will take care of stashing the result
755     // in `GuestTask::result` and resuming this fiber when the host task
756     // completes.
757     let mut future = Box::pin(async move {
758         let result = future.await?;
759         tls::get(move |store| {
760             let state = store.concurrent_state_mut();
761             state.get_mut(caller.task)?.result = Some(Box::new(result) as _);
762 
763             Waitable::Host(task).set_event(
764                 state,
765                 Some(Event::Subtask {
766                     status: Status::Returned,
767                 }),
768             )?;
769 
770             Ok(())
771         })
772     }) as HostTaskFuture;
773 
774     // Finally, poll the future.  We can use a dummy `Waker` here because we'll
775     // add the future to `ConcurrentState::futures` and poll it automatically
776     // from the event loop if it doesn't complete immediately here.
777     let poll = tls::set(store, || {
778         future
779             .as_mut()
780             .poll(&mut Context::from_waker(&Waker::noop()))
781     });
782 
783     match poll {
784         Poll::Ready(result) => {
785             // It completed immediately; check the result and delete the task.
786             result?;
787             log::trace!("delete host task {task:?} (already ready)");
788             store.concurrent_state_mut().delete(task)?;
789         }
790         Poll::Pending => {
791             // It did not complete immediately; add it to
792             // `ConcurrentState::futures` so it will be polled via the event
793             // loop; then use `GuestTask::sync_call_set` to wait for the task to
794             // complete, suspending the current fiber until it does so.
795             let state = store.concurrent_state_mut();
796             state.push_future(future);
797 
798             let set = state.get_mut(caller.task)?.sync_call_set;
799             Waitable::Host(task).join(state, Some(set))?;
800 
801             store.suspend(SuspendReason::Waiting {
802                 set,
803                 thread: caller,
804                 skip_may_block_check: false,
805             })?;
806         }
807     }
808 
809     // Retrieve and return the result.
810     Ok(*mem::replace(
811         &mut store.concurrent_state_mut().get_mut(caller.task)?.result,
812         old_result,
813     )
814     .unwrap()
815     .downcast()
816     .unwrap())
817 }
818 
819 /// Execute the specified guest call.
820 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821     let mut next = Some(call);
822     while let Some(call) = next.take() {
823         match call.kind {
824             GuestCallKind::DeliverEvent { instance, set } => {
825                 let (event, waitable) = instance
826                     .get_event(store, call.thread.task, set, true)?
827                     .unwrap();
828                 let state = store.concurrent_state_mut();
829                 let task = state.get_mut(call.thread.task)?;
830                 let runtime_instance = task.instance;
831                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832 
833                 log::trace!(
834                     "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835                     call.thread,
836                 );
837 
838                 let old_thread = state.guest_thread.replace(call.thread);
839                 log::trace!(
840                     "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841                     call.thread
842                 );
843 
844                 store.maybe_push_call_context(call.thread.task)?;
845 
846                 store.enter_instance(runtime_instance);
847 
848                 let callback = store
849                     .concurrent_state_mut()
850                     .get_mut(call.thread.task)?
851                     .callback
852                     .take()
853                     .unwrap();
854 
855                 let code = callback(store, runtime_instance.index, event, handle)?;
856 
857                 store
858                     .concurrent_state_mut()
859                     .get_mut(call.thread.task)?
860                     .callback = Some(callback);
861 
862                 store.exit_instance(runtime_instance)?;
863 
864                 store.maybe_pop_call_context(call.thread.task)?;
865 
866                 next = instance.handle_callback_code(
867                     store,
868                     call.thread,
869                     runtime_instance.index,
870                     code,
871                 )?;
872 
873                 store.concurrent_state_mut().guest_thread = old_thread;
874                 log::trace!(
875                     "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
876                 );
877             }
878             GuestCallKind::StartImplicit(fun) => {
879                 next = fun(store)?;
880             }
881             GuestCallKind::StartExplicit(fun) => {
882                 fun(store)?;
883             }
884         }
885     }
886 
887     Ok(())
888 }
889 
890 impl<T> Store<T> {
891     /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
892     pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
893     where
894         T: Send + 'static,
895     {
896         self.as_context_mut().run_concurrent(fun).await
897     }
898 
899     #[doc(hidden)]
900     pub fn assert_concurrent_state_empty(&mut self) {
901         self.as_context_mut().assert_concurrent_state_empty();
902     }
903 
904     /// Convenience wrapper for [`StoreContextMut::spawn`].
905     pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
906     where
907         T: 'static,
908     {
909         self.as_context_mut().spawn(task)
910     }
911 }
912 
913 impl<T> StoreContextMut<'_, T> {
914     /// Assert that all the relevant tables and queues in the concurrent state
915     /// for this store are empty.
916     ///
917     /// This is for sanity checking in integration tests
918     /// (e.g. `component-async-tests`) that the relevant state has been cleared
919     /// after each test concludes.  This should help us catch leaks, e.g. guest
920     /// tasks which haven't been deleted despite having completed and having
921     /// been dropped by their supertasks.
922     #[doc(hidden)]
923     pub fn assert_concurrent_state_empty(self) {
924         let store = self.0;
925         store
926             .store_data_mut()
927             .components
928             .assert_instance_states_empty();
929         let state = store.concurrent_state_mut();
930         assert!(
931             state.table.get_mut().is_empty(),
932             "non-empty table: {:?}",
933             state.table.get_mut()
934         );
935         assert!(state.high_priority.is_empty());
936         assert!(state.low_priority.is_empty());
937         assert!(state.guest_thread.is_none());
938         assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
939         assert!(state.global_error_context_ref_counts.is_empty());
940     }
941 
942     /// Spawn a background task to run as part of this instance's event loop.
943     ///
944     /// The task will receive an `&Accessor<U>` and run concurrently with
945     /// any other tasks in progress for the instance.
946     ///
947     /// Note that the task will only make progress if and when the event loop
948     /// for this instance is run.
949     ///
950     /// The returned [`SpawnHandle`] may be used to cancel the task.
951     pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
952     where
953         T: 'static,
954     {
955         let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
956         self.spawn_with_accessor(accessor, task)
957     }
958 
959     /// Internal implementation of `spawn` functions where a `store` is
960     /// available along with an `Accessor`.
961     fn spawn_with_accessor<D>(
962         self,
963         accessor: Accessor<T, D>,
964         task: impl AccessorTask<T, D>,
965     ) -> JoinHandle
966     where
967         T: 'static,
968         D: HasData + ?Sized,
969     {
970         // Create an "abortable future" here where internally the future will
971         // hook calls to poll and possibly spawn more background tasks on each
972         // iteration.
973         let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
974         self.0
975             .concurrent_state_mut()
976             .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
977         handle
978     }
979 
980     /// Run the specified closure `fun` to completion as part of this store's
981     /// event loop.
982     ///
983     /// This will run `fun` as part of this store's event loop until it
984     /// yields a result.  `fun` is provided an [`Accessor`], which provides
985     /// controlled access to the store and its data.
986     ///
987     /// This function can be used to invoke [`Func::call_concurrent`] for
988     /// example within the async closure provided here.
989     ///
990     /// # Store-blocking behavior
991     ///
992     ///
993     ///
994     /// At this time there are certain situations in which the `Future` returned
995     /// by the `AsyncFnOnce` passed to this function will not be polled for an
996     /// extended period of time, despite one or more `Waker::wake` events having
997     /// occurred for the task to which it belongs.  This can manifest as the
998     /// `Future` seeming to be "blocked" or "locked up", but is actually due to
999     /// the `Store` being held by e.g. a blocking host function, preventing the
1000     /// `Future` from being polled. A canonical example of this is when the
1001     /// `fun` provided to this function attempts to set a timeout for an
1002     /// invocation of a wasm function. In this situation the async closure is
1003     /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1004     /// to elapse. At this time this setup will not always work and the timeout
1005     /// may not reliably fire.
1006     ///
1007     /// This function will not block the current thread and as such is always
1008     /// suitable to run in an `async` context, but the current implementation of
1009     /// Wasmtime can lead to situations where a certain wasm computation is
1010     /// required to make progress the closure to make progress. This is an
1011     /// artifact of Wasmtime's historical implementation of `async` functions
1012     /// and is the topic of [#11869] and [#11870]. In the timeout example from
1013     /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1014     /// progress for a readiness notification of (b) to get delivered.
1015     ///
1016     /// This effectively means that it's not possible to reliably perform a
1017     /// "select" operation within the `fun` closure, which timeouts for example
1018     /// are based on. Fixing this requires some relatively major refactoring
1019     /// work within Wasmtime itself. This is a known pitfall otherwise and one
1020     /// that is intended to be fixed one day. In the meantime it's recommended
1021     /// to apply timeouts or such to the entire `run_concurrent` call itself
1022     /// rather than internally.
1023     ///
1024     /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1025     /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1026     ///
1027     /// # Example
1028     ///
1029     /// ```
1030     /// # use {
1031     /// #   anyhow::{Result},
1032     /// #   wasmtime::{
1033     /// #     component::{ Component, Linker, Resource, ResourceTable},
1034     /// #     Config, Engine, Store
1035     /// #   },
1036     /// # };
1037     /// #
1038     /// # struct MyResource(u32);
1039     /// # struct Ctx { table: ResourceTable }
1040     /// #
1041     /// # async fn foo() -> Result<()> {
1042     /// # let mut config = Config::new();
1043     /// # let engine = Engine::new(&config)?;
1044     /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1045     /// # let mut linker = Linker::new(&engine);
1046     /// # let component = Component::new(&engine, "")?;
1047     /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1048     /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1049     /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1050     /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1051     ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1052     ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0;
1053     ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1054     ///    bar.call_concurrent(accessor, (value.0,)).await?;
1055     ///    Ok(())
1056     /// }).await??;
1057     /// # Ok(())
1058     /// # }
1059     /// ```
1060     pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1061     where
1062         T: Send + 'static,
1063     {
1064         self.do_run_concurrent(fun, false).await
1065     }
1066 
1067     pub(super) async fn run_concurrent_trap_on_idle<R>(
1068         self,
1069         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1070     ) -> Result<R>
1071     where
1072         T: Send + 'static,
1073     {
1074         self.do_run_concurrent(fun, true).await
1075     }
1076 
1077     async fn do_run_concurrent<R>(
1078         mut self,
1079         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1080         trap_on_idle: bool,
1081     ) -> Result<R>
1082     where
1083         T: Send + 'static,
1084     {
1085         check_recursive_run();
1086         let token = StoreToken::new(self.as_context_mut());
1087 
1088         struct Dropper<'a, T: 'static, V> {
1089             store: StoreContextMut<'a, T>,
1090             value: ManuallyDrop<V>,
1091         }
1092 
1093         impl<'a, T, V> Drop for Dropper<'a, T, V> {
1094             fn drop(&mut self) {
1095                 tls::set(self.store.0, || {
1096                     // SAFETY: Here we drop the value without moving it for the
1097                     // first and only time -- per the contract for `Drop::drop`,
1098                     // this code won't run again, and the `value` field will no
1099                     // longer be accessible.
1100                     unsafe { ManuallyDrop::drop(&mut self.value) }
1101                 });
1102             }
1103         }
1104 
1105         let accessor = &Accessor::new(token);
1106         let dropper = &mut Dropper {
1107             store: self,
1108             value: ManuallyDrop::new(fun(accessor)),
1109         };
1110         // SAFETY: We never move `dropper` nor its `value` field.
1111         let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1112 
1113         dropper
1114             .store
1115             .as_context_mut()
1116             .poll_until(future, trap_on_idle)
1117             .await
1118     }
1119 
1120     /// Run this store's event loop.
1121     ///
1122     /// The returned future will resolve when the specified future completes or,
1123     /// if `trap_on_idle` is true, when the event loop can't make further
1124     /// progress.
1125     async fn poll_until<R>(
1126         mut self,
1127         mut future: Pin<&mut impl Future<Output = R>>,
1128         trap_on_idle: bool,
1129     ) -> Result<R>
1130     where
1131         T: Send + 'static,
1132     {
1133         struct Reset<'a, T: 'static> {
1134             store: StoreContextMut<'a, T>,
1135             futures: Option<FuturesUnordered<HostTaskFuture>>,
1136         }
1137 
1138         impl<'a, T> Drop for Reset<'a, T> {
1139             fn drop(&mut self) {
1140                 if let Some(futures) = self.futures.take() {
1141                     *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1142                 }
1143             }
1144         }
1145 
1146         loop {
1147             // Take `ConcurrentState::futures` out of the store so we can poll
1148             // it while also safely giving any of the futures inside access to
1149             // `self`.
1150             let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1151             let mut reset = Reset {
1152                 store: self.as_context_mut(),
1153                 futures,
1154             };
1155             let mut next = pin!(reset.futures.as_mut().unwrap().next());
1156 
1157             let result = future::poll_fn(|cx| {
1158                 // First, poll the future we were passed as an argument and
1159                 // return immediately if it's ready.
1160                 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1161                     return Poll::Ready(Ok(Either::Left(value)));
1162                 }
1163 
1164                 // Next, poll `ConcurrentState::futures` (which includes any
1165                 // pending host tasks and/or background tasks), returning
1166                 // immediately if one of them fails.
1167                 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1168                     Poll::Ready(Some(output)) => {
1169                         match output {
1170                             Err(e) => return Poll::Ready(Err(e)),
1171                             Ok(()) => {}
1172                         }
1173                         Poll::Ready(true)
1174                     }
1175                     Poll::Ready(None) => Poll::Ready(false),
1176                     Poll::Pending => Poll::Pending,
1177                 };
1178 
1179                 // Next, check the "high priority" work queue and return
1180                 // immediately if it has at least one item.
1181                 let state = reset.store.0.concurrent_state_mut();
1182                 let ready = mem::take(&mut state.high_priority);
1183                 let ready = if ready.is_empty() {
1184                     // Next, check the "low priority" work queue and return
1185                     // immediately if it has at least one item.
1186                     let ready = mem::take(&mut state.low_priority);
1187                     if ready.is_empty() {
1188                         return match next {
1189                             Poll::Ready(true) => {
1190                                 // In this case, one of the futures in
1191                                 // `ConcurrentState::futures` completed
1192                                 // successfully, so we return now and continue
1193                                 // the outer loop in case there is another one
1194                                 // ready to complete.
1195                                 Poll::Ready(Ok(Either::Right(Vec::new())))
1196                             }
1197                             Poll::Ready(false) => {
1198                                 // Poll the future we were passed one last time
1199                                 // in case one of `ConcurrentState::futures` had
1200                                 // the side effect of unblocking it.
1201                                 if let Poll::Ready(value) =
1202                                     tls::set(reset.store.0, || future.as_mut().poll(cx))
1203                                 {
1204                                     Poll::Ready(Ok(Either::Left(value)))
1205                                 } else {
1206                                     // In this case, there are no more pending
1207                                     // futures in `ConcurrentState::futures`,
1208                                     // there are no remaining work items, _and_
1209                                     // the future we were passed as an argument
1210                                     // still hasn't completed.
1211                                     if trap_on_idle {
1212                                         // `trap_on_idle` is true, so we exit
1213                                         // immediately.
1214                                         Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock)))
1215                                     } else {
1216                                         // `trap_on_idle` is false, so we assume
1217                                         // that future will wake up and give us
1218                                         // more work to do when it's ready to.
1219                                         Poll::Pending
1220                                     }
1221                                 }
1222                             }
1223                             // There is at least one pending future in
1224                             // `ConcurrentState::futures` and we have nothing
1225                             // else to do but wait for now, so we return
1226                             // `Pending`.
1227                             Poll::Pending => Poll::Pending,
1228                         };
1229                     } else {
1230                         ready
1231                     }
1232                 } else {
1233                     ready
1234                 };
1235 
1236                 Poll::Ready(Ok(Either::Right(ready)))
1237             })
1238             .await;
1239 
1240             // Put the `ConcurrentState::futures` back into the store before we
1241             // return or handle any work items since one or more of those items
1242             // might append more futures.
1243             drop(reset);
1244 
1245             match result? {
1246                 // The future we were passed as an argument completed, so we
1247                 // return the result.
1248                 Either::Left(value) => break Ok(value),
1249                 // The future we were passed has not yet completed, so handle
1250                 // any work items and then loop again.
1251                 Either::Right(ready) => {
1252                     struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1253                         store: StoreContextMut<'a, T>,
1254                         ready: I,
1255                     }
1256 
1257                     impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1258                         fn drop(&mut self) {
1259                             while let Some(item) = self.ready.next() {
1260                                 match item {
1261                                     WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1262                                     WorkItem::PushFuture(future) => {
1263                                         tls::set(self.store.0, move || drop(future))
1264                                     }
1265                                     _ => {}
1266                                 }
1267                             }
1268                         }
1269                     }
1270 
1271                     let mut dispose = Dispose {
1272                         store: self.as_context_mut(),
1273                         ready: ready.into_iter(),
1274                     };
1275 
1276                     while let Some(item) = dispose.ready.next() {
1277                         dispose
1278                             .store
1279                             .as_context_mut()
1280                             .handle_work_item(item)
1281                             .await?;
1282                     }
1283                 }
1284             }
1285         }
1286     }
1287 
1288     /// Handle the specified work item, possibly resuming a fiber if applicable.
1289     async fn handle_work_item(self, item: WorkItem) -> Result<()>
1290     where
1291         T: Send,
1292     {
1293         log::trace!("handle work item {item:?}");
1294         match item {
1295             WorkItem::PushFuture(future) => {
1296                 self.0
1297                     .concurrent_state_mut()
1298                     .futures
1299                     .get_mut()
1300                     .as_mut()
1301                     .unwrap()
1302                     .push(future.into_inner());
1303             }
1304             WorkItem::ResumeFiber(fiber) => {
1305                 self.0.resume_fiber(fiber).await?;
1306             }
1307             WorkItem::GuestCall(call) => {
1308                 if call.is_ready(self.0)? {
1309                     self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1310                 } else {
1311                     let state = self.0.concurrent_state_mut();
1312                     let task = state.get_mut(call.thread.task)?;
1313                     if !task.starting_sent {
1314                         task.starting_sent = true;
1315                         if let GuestCallKind::StartImplicit(_) = &call.kind {
1316                             Waitable::Guest(call.thread.task).set_event(
1317                                 state,
1318                                 Some(Event::Subtask {
1319                                     status: Status::Starting,
1320                                 }),
1321                             )?;
1322                         }
1323                     }
1324 
1325                     let instance = state.get_mut(call.thread.task)?.instance;
1326                     self.0
1327                         .instance_state(instance)
1328                         .pending
1329                         .insert(call.thread, call.kind);
1330                 }
1331             }
1332             WorkItem::WorkerFunction(fun) => {
1333                 self.run_on_worker(WorkerItem::Function(fun)).await?;
1334             }
1335         }
1336 
1337         Ok(())
1338     }
1339 
1340     /// Execute the specified guest call on a worker fiber.
1341     async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1342     where
1343         T: Send,
1344     {
1345         let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1346             fiber
1347         } else {
1348             fiber::make_fiber(self.0, move |store| {
1349                 loop {
1350                     match store.concurrent_state_mut().worker_item.take().unwrap() {
1351                         WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1352                         WorkerItem::Function(fun) => fun.into_inner()(store)?,
1353                     }
1354 
1355                     store.suspend(SuspendReason::NeedWork)?;
1356                 }
1357             })?
1358         };
1359 
1360         let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1361         assert!(worker_item.is_none());
1362         *worker_item = Some(item);
1363 
1364         self.0.resume_fiber(worker).await
1365     }
1366 
1367     /// Wrap the specified host function in a future which will call it, passing
1368     /// it an `&Accessor<T>`.
1369     ///
1370     /// See the `Accessor` documentation for details.
1371     pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1372     where
1373         T: 'static,
1374         F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1375             + Send
1376             + Sync
1377             + 'static,
1378         R: Send + Sync + 'static,
1379     {
1380         let token = StoreToken::new(self);
1381         async move {
1382             let mut accessor = Accessor::new(token);
1383             closure(&mut accessor).await
1384         }
1385     }
1386 }
1387 
1388 #[derive(Debug, Copy, Clone)]
1389 pub struct RuntimeInstance {
1390     pub instance: ComponentInstanceId,
1391     pub index: RuntimeComponentInstanceIndex,
1392 }
1393 
1394 impl StoreOpaque {
1395     /// Helper function to retrieve the `ConcurrentInstanceState` for the
1396     /// specified instance.
1397     fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState {
1398         StoreComponentInstanceId::new(self.id(), instance.instance)
1399             .get_mut(self)
1400             .instance_state(instance.index)
1401             .unwrap()
1402             .concurrent_state()
1403     }
1404 
1405     /// Helper function to retrieve the `HandleTable` for the specified
1406     /// instance.
1407     fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable {
1408         StoreComponentInstanceId::new(self.id(), instance.instance)
1409             .get_mut(self)
1410             .instance_state(instance.index)
1411             .unwrap()
1412             .handle_table()
1413     }
1414 
1415     /// Record that we're about to enter a (sub-)component instance which does
1416     /// not support more than one concurrent, stackful activation, meaning it
1417     /// cannot be entered again until the next call returns.
1418     fn enter_instance(&mut self, instance: RuntimeInstance) {
1419         log::trace!("enter {instance:?}");
1420         self.instance_state(instance).do_not_enter = true;
1421     }
1422 
1423     /// Record that we've exited a (sub-)component instance previously entered
1424     /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1425     /// See the documentation for the latter for details.
1426     fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1427         log::trace!("exit {instance:?}");
1428         self.instance_state(instance).do_not_enter = false;
1429         self.partition_pending(instance)
1430     }
1431 
1432     /// Iterate over `InstanceState::pending`, moving any ready items into the
1433     /// "high priority" work item queue.
1434     ///
1435     /// See `GuestCall::is_ready` for details.
1436     fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1437         for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() {
1438             let call = GuestCall { thread, kind };
1439             if call.is_ready(self)? {
1440                 self.concurrent_state_mut()
1441                     .push_high_priority(WorkItem::GuestCall(call));
1442             } else {
1443                 self.instance_state(instance)
1444                     .pending
1445                     .insert(call.thread, call.kind);
1446             }
1447         }
1448 
1449         Ok(())
1450     }
1451 
1452     /// Implements the `backpressure.{inc,dec}` intrinsics.
1453     pub(crate) fn backpressure_modify(
1454         &mut self,
1455         caller_instance: RuntimeInstance,
1456         modify: impl FnOnce(u16) -> Option<u16>,
1457     ) -> Result<()> {
1458         let state = self.instance_state(caller_instance);
1459         let old = state.backpressure;
1460         let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?;
1461         state.backpressure = new;
1462 
1463         if old > 0 && new == 0 {
1464             // Backpressure was previously enabled and is now disabled; move any
1465             // newly-eligible guest calls to the "high priority" queue.
1466             self.partition_pending(caller_instance)?;
1467         }
1468 
1469         Ok(())
1470     }
1471 
1472     /// Resume the specified fiber, giving it exclusive access to the specified
1473     /// store.
1474     async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1475         let old_thread = self.concurrent_state_mut().guest_thread;
1476         log::trace!("resume_fiber: save current thread {old_thread:?}");
1477 
1478         let fiber = fiber::resolve_or_release(self, fiber).await?;
1479 
1480         let state = self.concurrent_state_mut();
1481 
1482         state.guest_thread = old_thread;
1483         if let Some(ref ot) = old_thread {
1484             state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1485         }
1486         log::trace!("resume_fiber: restore current thread {old_thread:?}");
1487 
1488         if let Some(mut fiber) = fiber {
1489             log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1490             // See the `SuspendReason` documentation for what each case means.
1491             match state.suspend_reason.take().unwrap() {
1492                 SuspendReason::NeedWork => {
1493                     if state.worker.is_none() {
1494                         state.worker = Some(fiber);
1495                     } else {
1496                         fiber.dispose(self);
1497                     }
1498                 }
1499                 SuspendReason::Yielding { thread, .. } => {
1500                     state.get_mut(thread.thread)?.state = GuestThreadState::Pending;
1501                     state.push_low_priority(WorkItem::ResumeFiber(fiber));
1502                 }
1503                 SuspendReason::ExplicitlySuspending { thread, .. } => {
1504                     state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1505                 }
1506                 SuspendReason::Waiting { set, thread, .. } => {
1507                     let old = state
1508                         .get_mut(set)?
1509                         .waiting
1510                         .insert(thread, WaitMode::Fiber(fiber));
1511                     assert!(old.is_none());
1512                 }
1513             };
1514         } else {
1515             log::trace!("resume_fiber: fiber has exited");
1516         }
1517 
1518         Ok(())
1519     }
1520 
1521     /// Suspend the current fiber, storing the reason in
1522     /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1523     /// it should be resumed.
1524     ///
1525     /// See the `SuspendReason` documentation for details.
1526     fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1527         log::trace!("suspend fiber: {reason:?}");
1528 
1529         // If we're yielding or waiting on behalf of a guest thread, we'll need to
1530         // pop the call context which manages resource borrows before suspending
1531         // and then push it again once we've resumed.
1532         let task = match &reason {
1533             SuspendReason::Yielding { thread, .. }
1534             | SuspendReason::Waiting { thread, .. }
1535             | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1536             SuspendReason::NeedWork => None,
1537         };
1538 
1539         let old_guest_thread = if let Some(task) = task {
1540             self.maybe_pop_call_context(task)?;
1541             self.concurrent_state_mut().guest_thread
1542         } else {
1543             None
1544         };
1545 
1546         // We should not have reached here unless either there's no current
1547         // task, or the current task is permitted to block.  In addition, we
1548         // special-case `thread.switch-to` and waiting for a subtask to go from
1549         // `starting` to `started`, both of which we consider non-blocking
1550         // operations despite requiring a suspend.
1551         assert!(
1552             matches!(
1553                 reason,
1554                 SuspendReason::ExplicitlySuspending {
1555                     skip_may_block_check: true,
1556                     ..
1557                 } | SuspendReason::Waiting {
1558                     skip_may_block_check: true,
1559                     ..
1560                 }
1561             ) || old_guest_thread
1562                 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1563                 .unwrap_or(true)
1564         );
1565 
1566         let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1567         assert!(suspend_reason.is_none());
1568         *suspend_reason = Some(reason);
1569 
1570         self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1571 
1572         if let Some(task) = task {
1573             self.concurrent_state_mut().guest_thread = old_guest_thread;
1574             self.maybe_push_call_context(task)?;
1575         }
1576 
1577         Ok(())
1578     }
1579 
1580     /// Push the call context for managing resource borrows for the specified
1581     /// guest task if it has not yet either returned a result or cancelled
1582     /// itself.
1583     fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1584         let task = self.concurrent_state_mut().get_mut(guest_task)?;
1585 
1586         if !task.returned_or_cancelled() {
1587             log::trace!("push call context for {guest_task:?}");
1588             let call_context = task.call_context.take().unwrap();
1589             self.component_resource_state().0.push(call_context);
1590         }
1591         Ok(())
1592     }
1593 
1594     /// Pop the call context for managing resource borrows for the specified
1595     /// guest task if it has not yet either returned a result or cancelled
1596     /// itself.
1597     fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> {
1598         if !self
1599             .concurrent_state_mut()
1600             .get_mut(guest_task)?
1601             .returned_or_cancelled()
1602         {
1603             log::trace!("pop call context for {guest_task:?}");
1604             let call_context = Some(self.component_resource_state().0.pop().unwrap());
1605             self.concurrent_state_mut()
1606                 .get_mut(guest_task)?
1607                 .call_context = call_context;
1608         }
1609         Ok(())
1610     }
1611 
1612     fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1613         let state = self.concurrent_state_mut();
1614         let caller = state.guest_thread.unwrap();
1615         let old_set = waitable.common(state)?.set;
1616         let set = state.get_mut(caller.task)?.sync_call_set;
1617         waitable.join(state, Some(set))?;
1618         self.suspend(SuspendReason::Waiting {
1619             set,
1620             thread: caller,
1621             skip_may_block_check: false,
1622         })?;
1623         let state = self.concurrent_state_mut();
1624         waitable.join(state, old_set)
1625     }
1626 }
1627 
1628 impl Instance {
1629     /// Get the next pending event for the specified task and (optional)
1630     /// waitable set, along with the waitable handle if applicable.
1631     fn get_event(
1632         self,
1633         store: &mut StoreOpaque,
1634         guest_task: TableId<GuestTask>,
1635         set: Option<TableId<WaitableSet>>,
1636         cancellable: bool,
1637     ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1638         let state = store.concurrent_state_mut();
1639 
1640         if let Some(event) = state.get_mut(guest_task)?.event.take() {
1641             log::trace!("deliver event {event:?} to {guest_task:?}");
1642 
1643             if cancellable || !matches!(event, Event::Cancelled) {
1644                 return Ok(Some((event, None)));
1645             } else {
1646                 state.get_mut(guest_task)?.event = Some(event);
1647             }
1648         }
1649 
1650         Ok(
1651             if let Some((set, waitable)) = set
1652                 .and_then(|set| {
1653                     state
1654                         .get_mut(set)
1655                         .map(|v| v.ready.pop_first().map(|v| (set, v)))
1656                         .transpose()
1657                 })
1658                 .transpose()?
1659             {
1660                 let common = waitable.common(state)?;
1661                 let handle = common.handle.unwrap();
1662                 let event = common.event.take().unwrap();
1663 
1664                 log::trace!(
1665                     "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1666                 );
1667 
1668                 waitable.on_delivery(store, self, event);
1669 
1670                 Some((event, Some((waitable, handle))))
1671             } else {
1672                 None
1673             },
1674         )
1675     }
1676 
1677     /// Handle the `CallbackCode` returned from an async-lifted export or its
1678     /// callback.
1679     ///
1680     /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1681     /// using `handle_guest_call`.
1682     fn handle_callback_code(
1683         self,
1684         store: &mut StoreOpaque,
1685         guest_thread: QualifiedThreadId,
1686         runtime_instance: RuntimeComponentInstanceIndex,
1687         code: u32,
1688     ) -> Result<Option<GuestCall>> {
1689         let (code, set) = unpack_callback_code(code);
1690 
1691         log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1692 
1693         let state = store.concurrent_state_mut();
1694 
1695         let get_set = |store: &mut StoreOpaque, handle| {
1696             if handle == 0 {
1697                 bail!("invalid waitable-set handle");
1698             }
1699 
1700             let set = store
1701                 .handle_table(RuntimeInstance {
1702                     instance: self.id().instance(),
1703                     index: runtime_instance,
1704                 })
1705                 .waitable_set_rep(handle)?;
1706 
1707             Ok(TableId::<WaitableSet>::new(set))
1708         };
1709 
1710         Ok(match code {
1711             callback_code::EXIT => {
1712                 log::trace!("implicit thread {guest_thread:?} completed");
1713                 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1714                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1715                 if task.threads.is_empty() && !task.returned_or_cancelled() {
1716                     bail!(Trap::NoAsyncResult);
1717                 }
1718                 match &task.caller {
1719                     Caller::Host { .. } => {
1720                         if task.ready_to_delete() {
1721                             Waitable::Guest(guest_thread.task)
1722                                 .delete_from(store.concurrent_state_mut())?;
1723                         }
1724                     }
1725                     Caller::Guest { .. } => {
1726                         task.exited = true;
1727                         task.callback = None;
1728                     }
1729                 }
1730                 None
1731             }
1732             callback_code::YIELD => {
1733                 let task = state.get_mut(guest_thread.task)?;
1734                 // If an `Event::Cancelled` is pending, we'll deliver that;
1735                 // otherwise, we'll deliver `Event::None`.  Note that
1736                 // `GuestTask::event` is only ever set to one of those two
1737                 // `Event` variants.
1738                 if let Some(event) = task.event {
1739                     assert!(matches!(event, Event::None | Event::Cancelled));
1740                 } else {
1741                     task.event = Some(Event::None);
1742                 }
1743                 let call = GuestCall {
1744                     thread: guest_thread,
1745                     kind: GuestCallKind::DeliverEvent {
1746                         instance: self,
1747                         set: None,
1748                     },
1749                 };
1750                 if state.may_block(guest_thread.task) {
1751                     // Push this thread onto the "low priority" queue so it runs
1752                     // after any other threads have had a chance to run.
1753                     state.push_low_priority(WorkItem::GuestCall(call));
1754                     None
1755                 } else {
1756                     // Yielding in a non-blocking context is defined as a no-op
1757                     // according to the spec, so we must run this thread
1758                     // immediately without allowing any others to run.
1759                     Some(call)
1760                 }
1761             }
1762             callback_code::WAIT => {
1763                 // The task may only return `WAIT` if it was created for a call
1764                 // to an async export).  Otherwise, we'll trap.
1765                 state.check_blocking_for(guest_thread.task)?;
1766 
1767                 let set = get_set(store, set)?;
1768                 let state = store.concurrent_state_mut();
1769 
1770                 if state.get_mut(guest_thread.task)?.event.is_some()
1771                     || !state.get_mut(set)?.ready.is_empty()
1772                 {
1773                     // An event is immediately available; deliver it ASAP.
1774                     state.push_high_priority(WorkItem::GuestCall(GuestCall {
1775                         thread: guest_thread,
1776                         kind: GuestCallKind::DeliverEvent {
1777                             instance: self,
1778                             set: Some(set),
1779                         },
1780                     }));
1781                 } else {
1782                     // No event is immediately available.
1783                     //
1784                     // We're waiting, so register to be woken up when an event
1785                     // is published for this waitable set.
1786                     //
1787                     // Here we also set `GuestTask::wake_on_cancel` which allows
1788                     // `subtask.cancel` to interrupt the wait.
1789                     let old = state
1790                         .get_mut(guest_thread.thread)?
1791                         .wake_on_cancel
1792                         .replace(set);
1793                     assert!(old.is_none());
1794                     let old = state
1795                         .get_mut(set)?
1796                         .waiting
1797                         .insert(guest_thread, WaitMode::Callback(self));
1798                     assert!(old.is_none());
1799                 }
1800                 None
1801             }
1802             _ => bail!("unsupported callback code: {code}"),
1803         })
1804     }
1805 
1806     fn cleanup_thread(
1807         self,
1808         store: &mut StoreOpaque,
1809         guest_thread: QualifiedThreadId,
1810         runtime_instance: RuntimeComponentInstanceIndex,
1811     ) -> Result<()> {
1812         let guest_id = store
1813             .concurrent_state_mut()
1814             .get_mut(guest_thread.thread)?
1815             .instance_rep;
1816         store
1817             .handle_table(RuntimeInstance {
1818                 instance: self.id().instance(),
1819                 index: runtime_instance,
1820             })
1821             .guest_thread_remove(guest_id.unwrap())?;
1822 
1823         store.concurrent_state_mut().delete(guest_thread.thread)?;
1824         let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1825         task.threads.remove(&guest_thread.thread);
1826         Ok(())
1827     }
1828 
1829     /// Add the specified guest call to the "high priority" work item queue, to
1830     /// be started as soon as backpressure and/or reentrance rules allow.
1831     ///
1832     /// SAFETY: The raw pointer arguments must be valid references to guest
1833     /// functions (with the appropriate signatures) when the closures queued by
1834     /// this function are called.
1835     unsafe fn queue_call<T: 'static>(
1836         self,
1837         mut store: StoreContextMut<T>,
1838         guest_thread: QualifiedThreadId,
1839         callee: SendSyncPtr<VMFuncRef>,
1840         param_count: usize,
1841         result_count: usize,
1842         flags: Option<InstanceFlags>,
1843         async_: bool,
1844         callback: Option<SendSyncPtr<VMFuncRef>>,
1845         post_return: Option<SendSyncPtr<VMFuncRef>>,
1846     ) -> Result<()> {
1847         /// Return a closure which will call the specified function in the scope
1848         /// of the specified task.
1849         ///
1850         /// This will use `GuestTask::lower_params` to lower the parameters, but
1851         /// will not lift the result; instead, it returns a
1852         /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
1853         /// any, may be lifted.  Note that an async-lifted export will have
1854         /// returned its result using the `task.return` intrinsic (or not
1855         /// returned a result at all, in the case of `task.cancel`), in which
1856         /// case the "result" of this call will either be a callback code or
1857         /// nothing.
1858         ///
1859         /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
1860         /// the returned closure is called.
1861         unsafe fn make_call<T: 'static>(
1862             store: StoreContextMut<T>,
1863             guest_thread: QualifiedThreadId,
1864             callee: SendSyncPtr<VMFuncRef>,
1865             param_count: usize,
1866             result_count: usize,
1867             flags: Option<InstanceFlags>,
1868         ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
1869         + Send
1870         + Sync
1871         + 'static
1872         + use<T> {
1873             let token = StoreToken::new(store);
1874             move |store: &mut dyn VMStore| {
1875                 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
1876 
1877                 store
1878                     .concurrent_state_mut()
1879                     .get_mut(guest_thread.thread)?
1880                     .state = GuestThreadState::Running;
1881                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1882                 let may_enter_after_call = task.call_post_return_automatically();
1883                 let lower = task.lower_params.take().unwrap();
1884 
1885                 lower(store, &mut storage[..param_count])?;
1886 
1887                 let mut store = token.as_context_mut(store);
1888 
1889                 // SAFETY: Per the contract documented in `make_call's`
1890                 // documentation, `callee` must be a valid pointer.
1891                 unsafe {
1892                     if let Some(mut flags) = flags {
1893                         flags.set_may_enter(false);
1894                     }
1895                     crate::Func::call_unchecked_raw(
1896                         &mut store,
1897                         callee.as_non_null(),
1898                         NonNull::new(
1899                             &mut storage[..param_count.max(result_count)]
1900                                 as *mut [MaybeUninit<ValRaw>] as _,
1901                         )
1902                         .unwrap(),
1903                     )?;
1904                     if let Some(mut flags) = flags {
1905                         flags.set_may_enter(may_enter_after_call);
1906                     }
1907                 }
1908 
1909                 Ok(storage)
1910             }
1911         }
1912 
1913         // SAFETY: Per the contract described in this function documentation,
1914         // the `callee` pointer which `call` closes over must be valid when
1915         // called by the closure we queue below.
1916         let call = unsafe {
1917             make_call(
1918                 store.as_context_mut(),
1919                 guest_thread,
1920                 callee,
1921                 param_count,
1922                 result_count,
1923                 flags,
1924             )
1925         };
1926 
1927         let callee_instance = store
1928             .0
1929             .concurrent_state_mut()
1930             .get_mut(guest_thread.task)?
1931             .instance;
1932 
1933         let fun = if callback.is_some() {
1934             assert!(async_);
1935 
1936             Box::new(move |store: &mut dyn VMStore| {
1937                 self.add_guest_thread_to_instance_table(
1938                     guest_thread.thread,
1939                     store,
1940                     callee_instance.index,
1941                 )?;
1942                 let old_thread = store
1943                     .concurrent_state_mut()
1944                     .guest_thread
1945                     .replace(guest_thread);
1946                 log::trace!(
1947                     "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
1948                 );
1949 
1950                 store.maybe_push_call_context(guest_thread.task)?;
1951 
1952                 store.enter_instance(callee_instance);
1953 
1954                 // SAFETY: See the documentation for `make_call` to review the
1955                 // contract we must uphold for `call` here.
1956                 //
1957                 // Per the contract described in the `queue_call`
1958                 // documentation, the `callee` pointer which `call` closes
1959                 // over must be valid.
1960                 let storage = call(store)?;
1961 
1962                 store.exit_instance(callee_instance)?;
1963 
1964                 store.maybe_pop_call_context(guest_thread.task)?;
1965 
1966                 let state = store.concurrent_state_mut();
1967                 state.guest_thread = old_thread;
1968                 old_thread
1969                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
1970                 log::trace!("stackless call: restored {old_thread:?} as current thread");
1971 
1972                 // SAFETY: `wasmparser` will have validated that the callback
1973                 // function returns a `i32` result.
1974                 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
1975 
1976                 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
1977             })
1978                 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
1979         } else {
1980             let token = StoreToken::new(store.as_context_mut());
1981             Box::new(move |store: &mut dyn VMStore| {
1982                 self.add_guest_thread_to_instance_table(
1983                     guest_thread.thread,
1984                     store,
1985                     callee_instance.index,
1986                 )?;
1987                 let old_thread = store
1988                     .concurrent_state_mut()
1989                     .guest_thread
1990                     .replace(guest_thread);
1991                 log::trace!(
1992                     "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
1993                 );
1994                 let mut flags = self.id().get(store).instance_flags(callee_instance.index);
1995 
1996                 store.maybe_push_call_context(guest_thread.task)?;
1997 
1998                 // Unless this is a callback-less (i.e. stackful)
1999                 // async-lifted export, we need to record that the instance
2000                 // cannot be entered until the call returns.
2001                 if !async_ {
2002                     store.enter_instance(callee_instance);
2003                 }
2004 
2005                 // SAFETY: See the documentation for `make_call` to review the
2006                 // contract we must uphold for `call` here.
2007                 //
2008                 // Per the contract described in the `queue_call`
2009                 // documentation, the `callee` pointer which `call` closes
2010                 // over must be valid.
2011                 let storage = call(store)?;
2012 
2013                 // This is a callback-less call, so the implicit thread has now completed
2014                 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2015 
2016                 if async_ {
2017                     let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2018                     if task.threads.is_empty() && !task.returned_or_cancelled() {
2019                         bail!(Trap::NoAsyncResult);
2020                     }
2021                 } else {
2022                     // This is a sync-lifted export, so now is when we lift the
2023                     // result, optionally call the post-return function, if any,
2024                     // and finally notify any current or future waiters that the
2025                     // subtask has returned.
2026 
2027                     let lift = {
2028                         store.exit_instance(callee_instance)?;
2029 
2030                         let state = store.concurrent_state_mut();
2031                         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2032 
2033                         state
2034                             .get_mut(guest_thread.task)?
2035                             .lift_result
2036                             .take()
2037                             .unwrap()
2038                     };
2039 
2040                     // SAFETY: `result_count` represents the number of core Wasm
2041                     // results returned, per `wasmparser`.
2042                     let result = (lift.lift)(store, unsafe {
2043                         mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2044                             &storage[..result_count],
2045                         )
2046                     })?;
2047 
2048                     let post_return_arg = match result_count {
2049                         0 => ValRaw::i32(0),
2050                         // SAFETY: `result_count` represents the number of
2051                         // core Wasm results returned, per `wasmparser`.
2052                         1 => unsafe { storage[0].assume_init() },
2053                         _ => unreachable!(),
2054                     };
2055 
2056                     if store
2057                         .concurrent_state_mut()
2058                         .get_mut(guest_thread.task)?
2059                         .call_post_return_automatically()
2060                     {
2061                         unsafe {
2062                             flags.set_may_leave(false);
2063                             flags.set_needs_post_return(false);
2064                         }
2065 
2066                         if let Some(func) = post_return {
2067                             let mut store = token.as_context_mut(store);
2068 
2069                             // SAFETY: `func` is a valid `*mut VMFuncRef` from
2070                             // either `wasmtime-cranelift`-generated fused adapter
2071                             // code or `component::Options`.  Per `wasmparser`
2072                             // post-return signature validation, we know it takes a
2073                             // single parameter.
2074                             unsafe {
2075                                 crate::Func::call_unchecked_raw(
2076                                     &mut store,
2077                                     func.as_non_null(),
2078                                     slice::from_ref(&post_return_arg).into(),
2079                                 )?;
2080                             }
2081                         }
2082 
2083                         unsafe {
2084                             flags.set_may_leave(true);
2085                             flags.set_may_enter(true);
2086                         }
2087                     }
2088 
2089                     self.task_complete(
2090                         store,
2091                         guest_thread.task,
2092                         result,
2093                         Status::Returned,
2094                         post_return_arg,
2095                     )?;
2096                 }
2097 
2098                 store.maybe_pop_call_context(guest_thread.task)?;
2099 
2100                 let state = store.concurrent_state_mut();
2101                 let task = state.get_mut(guest_thread.task)?;
2102 
2103                 match &task.caller {
2104                     Caller::Host { .. } => {
2105                         if task.ready_to_delete() {
2106                             Waitable::Guest(guest_thread.task).delete_from(state)?;
2107                         }
2108                     }
2109                     Caller::Guest { .. } => {
2110                         task.exited = true;
2111                     }
2112                 }
2113 
2114                 Ok(None)
2115             })
2116         };
2117 
2118         store
2119             .0
2120             .concurrent_state_mut()
2121             .push_high_priority(WorkItem::GuestCall(GuestCall {
2122                 thread: guest_thread,
2123                 kind: GuestCallKind::StartImplicit(fun),
2124             }));
2125 
2126         Ok(())
2127     }
2128 
2129     /// Prepare (but do not start) a guest->guest call.
2130     ///
2131     /// This is called from fused adapter code generated in
2132     /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2133     /// are synthesized Wasm functions which move the parameters from the caller
2134     /// to the callee and the result from the callee to the caller,
2135     /// respectively.  The adapter will call `Self::start_call` immediately
2136     /// after calling this function.
2137     ///
2138     /// SAFETY: All the pointer arguments must be valid pointers to guest
2139     /// entities (and with the expected signatures for the function references
2140     /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2141     unsafe fn prepare_call<T: 'static>(
2142         self,
2143         mut store: StoreContextMut<T>,
2144         start: *mut VMFuncRef,
2145         return_: *mut VMFuncRef,
2146         caller_instance: RuntimeComponentInstanceIndex,
2147         callee_instance: RuntimeComponentInstanceIndex,
2148         task_return_type: TypeTupleIndex,
2149         callee_async: bool,
2150         memory: *mut VMMemoryDefinition,
2151         string_encoding: u8,
2152         caller_info: CallerInfo,
2153     ) -> Result<()> {
2154         self.id().get(store.0).check_may_leave(caller_instance)?;
2155 
2156         if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2157             // A task may only call an async-typed function via a sync lower if
2158             // it was created by a call to an async export.  Otherwise, we'll
2159             // trap.
2160             store.0.concurrent_state_mut().check_blocking()?;
2161         }
2162 
2163         enum ResultInfo {
2164             Heap { results: u32 },
2165             Stack { result_count: u32 },
2166         }
2167 
2168         let result_info = match &caller_info {
2169             CallerInfo::Async {
2170                 has_result: true,
2171                 params,
2172             } => ResultInfo::Heap {
2173                 results: params.last().unwrap().get_u32(),
2174             },
2175             CallerInfo::Async {
2176                 has_result: false, ..
2177             } => ResultInfo::Stack { result_count: 0 },
2178             CallerInfo::Sync {
2179                 result_count,
2180                 params,
2181             } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2182                 results: params.last().unwrap().get_u32(),
2183             },
2184             CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2185                 result_count: *result_count,
2186             },
2187         };
2188 
2189         let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2190 
2191         // Create a new guest task for the call, closing over the `start` and
2192         // `return_` functions to lift the parameters and lower the result,
2193         // respectively.
2194         let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2195         let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2196         let token = StoreToken::new(store.as_context_mut());
2197         let state = store.0.concurrent_state_mut();
2198         let old_thread = state.guest_thread.take();
2199         let new_task = GuestTask::new(
2200             state,
2201             Box::new(move |store, dst| {
2202                 let mut store = token.as_context_mut(store);
2203                 assert!(dst.len() <= MAX_FLAT_PARAMS);
2204                 // The `+ 1` here accounts for the return pointer, if any:
2205                 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2206                 let count = match caller_info {
2207                     // Async callers, if they have a result, use the last
2208                     // parameter as a return pointer so chop that off if
2209                     // relevant here.
2210                     CallerInfo::Async { params, has_result } => {
2211                         let params = &params[..params.len() - usize::from(has_result)];
2212                         for (param, src) in params.iter().zip(&mut src) {
2213                             src.write(*param);
2214                         }
2215                         params.len()
2216                     }
2217 
2218                     // Sync callers forward everything directly.
2219                     CallerInfo::Sync { params, .. } => {
2220                         for (param, src) in params.iter().zip(&mut src) {
2221                             src.write(*param);
2222                         }
2223                         params.len()
2224                     }
2225                 };
2226                 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2227                 // `wasmtime-cranelift`-generated fused adapter code.  Based on
2228                 // how it was constructed (see
2229                 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2230                 // for details) we know it takes count parameters and returns
2231                 // `dst.len()` results.
2232                 unsafe {
2233                     crate::Func::call_unchecked_raw(
2234                         &mut store,
2235                         start.as_non_null(),
2236                         NonNull::new(
2237                             &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2238                         )
2239                         .unwrap(),
2240                     )?;
2241                 }
2242                 dst.copy_from_slice(&src[..dst.len()]);
2243                 let state = store.0.concurrent_state_mut();
2244                 Waitable::Guest(state.guest_thread.unwrap().task).set_event(
2245                     state,
2246                     Some(Event::Subtask {
2247                         status: Status::Started,
2248                     }),
2249                 )?;
2250                 Ok(())
2251             }),
2252             LiftResult {
2253                 lift: Box::new(move |store, src| {
2254                     // SAFETY: See comment in closure passed as `lower_params`
2255                     // parameter above.
2256                     let mut store = token.as_context_mut(store);
2257                     let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2258                     if let ResultInfo::Heap { results } = &result_info {
2259                         my_src.push(ValRaw::u32(*results));
2260                     }
2261                     // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2262                     // `wasmtime-cranelift`-generated fused adapter code.  Based
2263                     // on how it was constructed (see
2264                     // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2265                     // for details) we know it takes `src.len()` parameters and
2266                     // returns up to 1 result.
2267                     unsafe {
2268                         crate::Func::call_unchecked_raw(
2269                             &mut store,
2270                             return_.as_non_null(),
2271                             my_src.as_mut_slice().into(),
2272                         )?;
2273                     }
2274                     let state = store.0.concurrent_state_mut();
2275                     let thread = state.guest_thread.unwrap();
2276                     if sync_caller {
2277                         state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2278                             if let ResultInfo::Stack { result_count } = &result_info {
2279                                 match result_count {
2280                                     0 => None,
2281                                     1 => Some(my_src[0]),
2282                                     _ => unreachable!(),
2283                                 }
2284                             } else {
2285                                 None
2286                             },
2287                         );
2288                     }
2289                     Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2290                 }),
2291                 ty: task_return_type,
2292                 memory: NonNull::new(memory).map(SendSyncPtr::new),
2293                 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2294             },
2295             Caller::Guest {
2296                 thread: old_thread.unwrap(),
2297                 instance: caller_instance,
2298             },
2299             None,
2300             self,
2301             callee_instance,
2302             callee_async,
2303         )?;
2304 
2305         let guest_task = state.push(new_task)?;
2306         let new_thread = GuestThread::new_implicit(guest_task);
2307         let guest_thread = state.push(new_thread)?;
2308         state.get_mut(guest_task)?.threads.insert(guest_thread);
2309 
2310         let state = store.0.concurrent_state_mut();
2311         if let Some(old_thread) = old_thread {
2312             if !state.may_enter(guest_task) {
2313                 bail!(crate::Trap::CannotEnterComponent);
2314             }
2315 
2316             state.get_mut(old_thread.task)?.subtasks.insert(guest_task);
2317         };
2318 
2319         // Make the new thread the current one so that `Self::start_call` knows
2320         // which one to start.
2321         state.guest_thread = Some(QualifiedThreadId {
2322             task: guest_task,
2323             thread: guest_thread,
2324         });
2325         log::trace!(
2326             "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2327         );
2328 
2329         Ok(())
2330     }
2331 
2332     /// Call the specified callback function for an async-lifted export.
2333     ///
2334     /// SAFETY: `function` must be a valid reference to a guest function of the
2335     /// correct signature for a callback.
2336     unsafe fn call_callback<T>(
2337         self,
2338         mut store: StoreContextMut<T>,
2339         callee_instance: RuntimeComponentInstanceIndex,
2340         function: SendSyncPtr<VMFuncRef>,
2341         event: Event,
2342         handle: u32,
2343         may_enter_after_call: bool,
2344     ) -> Result<u32> {
2345         let mut flags = self.id().get(store.0).instance_flags(callee_instance);
2346 
2347         let (ordinal, result) = event.parts();
2348         let params = &mut [
2349             ValRaw::u32(ordinal),
2350             ValRaw::u32(handle),
2351             ValRaw::u32(result),
2352         ];
2353         // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2354         // `wasmtime-cranelift`-generated fused adapter code or
2355         // `component::Options`.  Per `wasmparser` callback signature
2356         // validation, we know it takes three parameters and returns one.
2357         unsafe {
2358             flags.set_may_enter(false);
2359             crate::Func::call_unchecked_raw(
2360                 &mut store,
2361                 function.as_non_null(),
2362                 params.as_mut_slice().into(),
2363             )?;
2364             flags.set_may_enter(may_enter_after_call);
2365         }
2366         Ok(params[0].get_u32())
2367     }
2368 
2369     /// Start a guest->guest call previously prepared using
2370     /// `Self::prepare_call`.
2371     ///
2372     /// This is called from fused adapter code generated in
2373     /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2374     /// this function immediately after calling `Self::prepare_call`.
2375     ///
2376     /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2377     /// functions with the appropriate signatures for the current guest task.
2378     /// If this is a call to an async-lowered import, the actual call may be
2379     /// deferred and run after this function returns, in which case the pointer
2380     /// arguments must also be valid when the call happens.
2381     unsafe fn start_call<T: 'static>(
2382         self,
2383         mut store: StoreContextMut<T>,
2384         callback: *mut VMFuncRef,
2385         post_return: *mut VMFuncRef,
2386         callee: *mut VMFuncRef,
2387         param_count: u32,
2388         result_count: u32,
2389         flags: u32,
2390         storage: Option<&mut [MaybeUninit<ValRaw>]>,
2391     ) -> Result<u32> {
2392         let token = StoreToken::new(store.as_context_mut());
2393         let async_caller = storage.is_none();
2394         let state = store.0.concurrent_state_mut();
2395         let guest_thread = state.guest_thread.unwrap();
2396         let callee_async = state.get_mut(guest_thread.task)?.async_function;
2397         let may_enter_after_call = state
2398             .get_mut(guest_thread.task)?
2399             .call_post_return_automatically();
2400         let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2401         let param_count = usize::try_from(param_count).unwrap();
2402         assert!(param_count <= MAX_FLAT_PARAMS);
2403         let result_count = usize::try_from(result_count).unwrap();
2404         assert!(result_count <= MAX_FLAT_RESULTS);
2405 
2406         let task = state.get_mut(guest_thread.task)?;
2407         if !callback.is_null() {
2408             // We're calling an async-lifted export with a callback, so store
2409             // the callback and related context as part of the task so we can
2410             // call it later when needed.
2411             let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2412             task.callback = Some(Box::new(move |store, runtime_instance, event, handle| {
2413                 let store = token.as_context_mut(store);
2414                 unsafe {
2415                     self.call_callback::<T>(
2416                         store,
2417                         runtime_instance,
2418                         callback,
2419                         event,
2420                         handle,
2421                         may_enter_after_call,
2422                     )
2423                 }
2424             }));
2425         }
2426 
2427         let Caller::Guest {
2428             thread: caller,
2429             instance: runtime_instance,
2430         } = &task.caller
2431         else {
2432             // As of this writing, `start_call` is only used for guest->guest
2433             // calls.
2434             unreachable!()
2435         };
2436         let caller = *caller;
2437         let caller_instance = *runtime_instance;
2438 
2439         let callee_instance = task.instance;
2440 
2441         let instance_flags = if callback.is_null() {
2442             None
2443         } else {
2444             Some(self.id().get(store.0).instance_flags(callee_instance.index))
2445         };
2446 
2447         // Queue the call as a "high priority" work item.
2448         unsafe {
2449             self.queue_call(
2450                 store.as_context_mut(),
2451                 guest_thread,
2452                 callee,
2453                 param_count,
2454                 result_count,
2455                 instance_flags,
2456                 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2457                 NonNull::new(callback).map(SendSyncPtr::new),
2458                 NonNull::new(post_return).map(SendSyncPtr::new),
2459             )?;
2460         }
2461 
2462         let state = store.0.concurrent_state_mut();
2463 
2464         // Use the caller's `GuestTask::sync_call_set` to register interest in
2465         // the subtask...
2466         let guest_waitable = Waitable::Guest(guest_thread.task);
2467         let old_set = guest_waitable.common(state)?.set;
2468         let set = state.get_mut(caller.task)?.sync_call_set;
2469         guest_waitable.join(state, Some(set))?;
2470 
2471         // ... and suspend this fiber temporarily while we wait for it to start.
2472         //
2473         // Note that we _could_ call the callee directly using the current fiber
2474         // rather than suspend this one, but that would make reasoning about the
2475         // event loop more complicated and is probably only worth doing if
2476         // there's a measurable performance benefit.  In addition, it would mean
2477         // blocking the caller if the callee calls a blocking sync-lowered
2478         // import, and as of this writing the spec says we must not do that.
2479         //
2480         // Alternatively, the fused adapter code could be modified to call the
2481         // callee directly without calling a host-provided intrinsic at all (in
2482         // which case it would need to do its own, inline backpressure checks,
2483         // etc.).  Again, we'd want to see a measurable performance benefit
2484         // before committing to such an optimization.  And again, we'd need to
2485         // update the spec to allow that.
2486         let (status, waitable) = loop {
2487             store.0.suspend(SuspendReason::Waiting {
2488                 set,
2489                 thread: caller,
2490                 // Normally, `StoreOpaque::suspend` would assert it's being
2491                 // called from a context where blocking is allowed.  However, if
2492                 // `async_caller` is `true`, we'll only "block" long enough for
2493                 // the callee to start, i.e. we won't repeat this loop, so we
2494                 // tell `suspend` it's okay even if we're not allowed to block.
2495                 // Alternatively, if the callee is not an async function, then
2496                 // we know it won't block anyway.
2497                 skip_may_block_check: async_caller || !callee_async,
2498             })?;
2499 
2500             let state = store.0.concurrent_state_mut();
2501 
2502             log::trace!("taking event for {:?}", guest_thread.task);
2503             let event = guest_waitable.take_event(state)?;
2504             let Some(Event::Subtask { status }) = event else {
2505                 unreachable!();
2506             };
2507 
2508             log::trace!("status {status:?} for {:?}", guest_thread.task);
2509 
2510             if status == Status::Returned {
2511                 // It returned, so we can stop waiting.
2512                 break (status, None);
2513             } else if async_caller {
2514                 // It hasn't returned yet, but the caller is calling via an
2515                 // async-lowered import, so we generate a handle for the task
2516                 // waitable and return the status.
2517                 let handle = store
2518                     .0
2519                     .handle_table(RuntimeInstance {
2520                         instance: self.id().instance(),
2521                         index: caller_instance,
2522                     })
2523                     .subtask_insert_guest(guest_thread.task.rep())?;
2524                 store
2525                     .0
2526                     .concurrent_state_mut()
2527                     .get_mut(guest_thread.task)?
2528                     .common
2529                     .handle = Some(handle);
2530                 break (status, Some(handle));
2531             } else {
2532                 // The callee hasn't returned yet, and the caller is calling via
2533                 // a sync-lowered import, so we loop and keep waiting until the
2534                 // callee returns.
2535             }
2536         };
2537 
2538         let state = store.0.concurrent_state_mut();
2539 
2540         guest_waitable.join(state, old_set)?;
2541 
2542         if let Some(storage) = storage {
2543             // The caller used a sync-lowered import to call an async-lifted
2544             // export, in which case the result, if any, has been stashed in
2545             // `GuestTask::sync_result`.
2546             let task = state.get_mut(guest_thread.task)?;
2547             if let Some(result) = task.sync_result.take() {
2548                 if let Some(result) = result {
2549                     storage[0] = MaybeUninit::new(result);
2550                 }
2551 
2552                 if task.exited {
2553                     if task.ready_to_delete() {
2554                         Waitable::Guest(guest_thread.task).delete_from(state)?;
2555                     }
2556                 }
2557             }
2558         }
2559 
2560         // Reset the current thread to point to the caller as it resumes control.
2561         state.guest_thread = Some(caller);
2562         state.get_mut(caller.thread)?.state = GuestThreadState::Running;
2563         log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2564 
2565         Ok(status.pack(waitable))
2566     }
2567 
2568     /// Poll the specified future once on behalf of a guest->host call using an
2569     /// async-lowered import.
2570     ///
2571     /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2572     /// `Pending`, add it to the set of futures to be polled as part of this
2573     /// instance's event loop until it completes, and then return
2574     /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2575     ///
2576     /// Whether the future returns `Ready` immediately or later, the `lower`
2577     /// function will be used to lower the result, if any, into the guest caller's
2578     /// stack and linear memory unless the task has been cancelled.
2579     pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2580         self,
2581         mut store: StoreContextMut<'_, T>,
2582         future: impl Future<Output = Result<R>> + Send + 'static,
2583         caller_instance: RuntimeComponentInstanceIndex,
2584         lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static,
2585     ) -> Result<Option<u32>> {
2586         let token = StoreToken::new(store.as_context_mut());
2587         let state = store.0.concurrent_state_mut();
2588         let caller = state.guest_thread.unwrap();
2589 
2590         // Create an abortable future which hooks calls to poll and manages call
2591         // context state for the future.
2592         let (join_handle, future) = JoinHandle::run(async move {
2593             let mut future = pin!(future);
2594             let mut call_context = None;
2595             future::poll_fn(move |cx| {
2596                 // Push the call context for managing any resource borrows
2597                 // for the task.
2598                 tls::get(|store| {
2599                     if let Some(call_context) = call_context.take() {
2600                         token
2601                             .as_context_mut(store)
2602                             .0
2603                             .component_resource_state()
2604                             .0
2605                             .push(call_context);
2606                     }
2607                 });
2608 
2609                 let result = future.as_mut().poll(cx);
2610 
2611                 if result.is_pending() {
2612                     // Pop the call context for managing any resource
2613                     // borrows for the task.
2614                     tls::get(|store| {
2615                         call_context = Some(
2616                             token
2617                                 .as_context_mut(store)
2618                                 .0
2619                                 .component_resource_state()
2620                                 .0
2621                                 .pop()
2622                                 .unwrap(),
2623                         );
2624                     });
2625                 }
2626                 result
2627             })
2628             .await
2629         });
2630 
2631         // We create a new host task even though it might complete immediately
2632         // (in which case we won't need to pass a waitable back to the guest).
2633         // If it does complete immediately, we'll remove it before we return.
2634         let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?;
2635 
2636         log::trace!("new host task child of {caller:?}: {task:?}");
2637 
2638         let mut future = Box::pin(future);
2639 
2640         // Finally, poll the future.  We can use a dummy `Waker` here because
2641         // we'll add the future to `ConcurrentState::futures` and poll it
2642         // automatically from the event loop if it doesn't complete immediately
2643         // here.
2644         let poll = tls::set(store.0, || {
2645             future
2646                 .as_mut()
2647                 .poll(&mut Context::from_waker(&Waker::noop()))
2648         });
2649 
2650         Ok(match poll {
2651             Poll::Ready(None) => unreachable!(),
2652             Poll::Ready(Some(result)) => {
2653                 // It finished immediately; lower the result and delete the
2654                 // task.
2655                 lower(store.as_context_mut(), result?)?;
2656                 log::trace!("delete host task {task:?} (already ready)");
2657                 store.0.concurrent_state_mut().delete(task)?;
2658                 None
2659             }
2660             Poll::Pending => {
2661                 // It hasn't finished yet; add the future to
2662                 // `ConcurrentState::futures` so it will be polled by the event
2663                 // loop and allocate a waitable handle to return to the guest.
2664 
2665                 // Wrap the future in a closure responsible for lowering the result into
2666                 // the guest's stack and memory, as well as notifying any waiters that
2667                 // the task returned.
2668                 let future =
2669                     Box::pin(async move {
2670                         let result = match future.await {
2671                             Some(result) => result?,
2672                             // Task was cancelled; nothing left to do.
2673                             None => return Ok(()),
2674                         };
2675                         tls::get(move |store| {
2676                             // Here we schedule a task to run on a worker fiber to do
2677                             // the lowering since it may involve a call to the guest's
2678                             // realloc function.  This is necessary because calling the
2679                             // guest while there are host embedder frames on the stack
2680                             // is unsound.
2681                             store.concurrent_state_mut().push_high_priority(
2682                                 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| {
2683                                     lower(token.as_context_mut(store), result)?;
2684                                     let state = store.concurrent_state_mut();
2685                                     state.get_mut(task)?.join_handle.take();
2686                                     Waitable::Host(task).set_event(
2687                                         state,
2688                                         Some(Event::Subtask {
2689                                             status: Status::Returned,
2690                                         }),
2691                                     )
2692                                 }))),
2693                             );
2694                             Ok(())
2695                         })
2696                     });
2697 
2698                 store.0.concurrent_state_mut().push_future(future);
2699                 let handle = store
2700                     .0
2701                     .handle_table(RuntimeInstance {
2702                         instance: self.id().instance(),
2703                         index: caller_instance,
2704                     })
2705                     .subtask_insert_host(task.rep())?;
2706                 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2707                 log::trace!(
2708                     "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}"
2709                 );
2710                 Some(handle)
2711             }
2712         })
2713     }
2714 
2715     /// Implements the `task.return` intrinsic, lifting the result for the
2716     /// current guest task.
2717     pub(crate) fn task_return(
2718         self,
2719         store: &mut dyn VMStore,
2720         caller: RuntimeComponentInstanceIndex,
2721         ty: TypeTupleIndex,
2722         options: OptionsIndex,
2723         storage: &[ValRaw],
2724     ) -> Result<()> {
2725         self.id().get(store).check_may_leave(caller)?;
2726         let state = store.concurrent_state_mut();
2727         let guest_thread = state.guest_thread.unwrap();
2728         let lift = state
2729             .get_mut(guest_thread.task)?
2730             .lift_result
2731             .take()
2732             .ok_or_else(|| {
2733                 anyhow!("`task.return` or `task.cancel` called more than once for current task")
2734             })?;
2735         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2736 
2737         let CanonicalOptions {
2738             string_encoding,
2739             data_model,
2740             ..
2741         } = &self.id().get(store).component().env_component().options[options];
2742 
2743         let invalid = ty != lift.ty
2744             || string_encoding != &lift.string_encoding
2745             || match data_model {
2746                 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2747                     Some(memory) => {
2748                         let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2749                         let actual = self.id().get(store).runtime_memory(memory);
2750                         expected != actual.as_ptr()
2751                     }
2752                     // Memory not specified, meaning it didn't need to be
2753                     // specified per validation, so not invalid.
2754                     None => false,
2755                 },
2756                 // Always invalid as this isn't supported.
2757                 CanonicalOptionsDataModel::Gc { .. } => true,
2758             };
2759 
2760         if invalid {
2761             bail!("invalid `task.return` signature and/or options for current task");
2762         }
2763 
2764         log::trace!("task.return for {guest_thread:?}");
2765 
2766         let result = (lift.lift)(store, storage)?;
2767         self.task_complete(
2768             store,
2769             guest_thread.task,
2770             result,
2771             Status::Returned,
2772             ValRaw::i32(0),
2773         )
2774     }
2775 
2776     /// Implements the `task.cancel` intrinsic.
2777     pub(crate) fn task_cancel(
2778         self,
2779         store: &mut StoreOpaque,
2780         caller: RuntimeComponentInstanceIndex,
2781     ) -> Result<()> {
2782         self.id().get(store).check_may_leave(caller)?;
2783         let state = store.concurrent_state_mut();
2784         let guest_thread = state.guest_thread.unwrap();
2785         let task = state.get_mut(guest_thread.task)?;
2786         if !task.cancel_sent {
2787             bail!("`task.cancel` called by task which has not been cancelled")
2788         }
2789         _ = task.lift_result.take().ok_or_else(|| {
2790             anyhow!("`task.return` or `task.cancel` called more than once for current task")
2791         })?;
2792 
2793         assert!(task.result.is_none());
2794 
2795         log::trace!("task.cancel for {guest_thread:?}");
2796 
2797         self.task_complete(
2798             store,
2799             guest_thread.task,
2800             Box::new(DummyResult),
2801             Status::ReturnCancelled,
2802             ValRaw::i32(0),
2803         )
2804     }
2805 
2806     /// Complete the specified guest task (i.e. indicate that it has either
2807     /// returned a (possibly empty) result or cancelled itself).
2808     ///
2809     /// This will return any resource borrows and notify any current or future
2810     /// waiters that the task has completed.
2811     fn task_complete(
2812         self,
2813         store: &mut StoreOpaque,
2814         guest_task: TableId<GuestTask>,
2815         result: Box<dyn Any + Send + Sync>,
2816         status: Status,
2817         post_return_arg: ValRaw,
2818     ) -> Result<()> {
2819         if store
2820             .concurrent_state_mut()
2821             .get_mut(guest_task)?
2822             .call_post_return_automatically()
2823         {
2824             let (calls, host_table, _, instance) =
2825                 store.component_resource_state_with_instance(self);
2826             ResourceTables {
2827                 calls,
2828                 host_table: Some(host_table),
2829                 guest: Some(instance.instance_states()),
2830             }
2831             .exit_call()?;
2832         } else {
2833             // As of this writing, the only scenario where `call_post_return_automatically`
2834             // would be false for a `GuestTask` is for host-to-guest calls using
2835             // `[Typed]Func::call_async`, in which case the `function_index`
2836             // should be a non-`None` value.
2837             let function_index = store
2838                 .concurrent_state_mut()
2839                 .get_mut(guest_task)?
2840                 .function_index
2841                 .unwrap();
2842             self.id()
2843                 .get_mut(store)
2844                 .post_return_arg_set(function_index, post_return_arg);
2845         }
2846 
2847         let state = store.concurrent_state_mut();
2848         let task = state.get_mut(guest_task)?;
2849 
2850         if let Caller::Host { tx, .. } = &mut task.caller {
2851             if let Some(tx) = tx.take() {
2852                 _ = tx.send(result);
2853             }
2854         } else {
2855             task.result = Some(result);
2856             Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2857         }
2858 
2859         Ok(())
2860     }
2861 
2862     /// Implements the `waitable-set.new` intrinsic.
2863     pub(crate) fn waitable_set_new(
2864         self,
2865         store: &mut StoreOpaque,
2866         caller_instance: RuntimeComponentInstanceIndex,
2867     ) -> Result<u32> {
2868         self.id().get_mut(store).check_may_leave(caller_instance)?;
2869         let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2870         let handle = store
2871             .handle_table(RuntimeInstance {
2872                 instance: self.id().instance(),
2873                 index: caller_instance,
2874             })
2875             .waitable_set_insert(set.rep())?;
2876         log::trace!("new waitable set {set:?} (handle {handle})");
2877         Ok(handle)
2878     }
2879 
2880     /// Implements the `waitable-set.drop` intrinsic.
2881     pub(crate) fn waitable_set_drop(
2882         self,
2883         store: &mut StoreOpaque,
2884         caller_instance: RuntimeComponentInstanceIndex,
2885         set: u32,
2886     ) -> Result<()> {
2887         self.id().get_mut(store).check_may_leave(caller_instance)?;
2888         let rep = store
2889             .handle_table(RuntimeInstance {
2890                 instance: self.id().instance(),
2891                 index: caller_instance,
2892             })
2893             .waitable_set_remove(set)?;
2894 
2895         log::trace!("drop waitable set {rep} (handle {set})");
2896 
2897         let set = store
2898             .concurrent_state_mut()
2899             .delete(TableId::<WaitableSet>::new(rep))?;
2900 
2901         if !set.waiting.is_empty() {
2902             bail!("cannot drop waitable set with waiters");
2903         }
2904 
2905         Ok(())
2906     }
2907 
2908     /// Implements the `waitable.join` intrinsic.
2909     pub(crate) fn waitable_join(
2910         self,
2911         store: &mut StoreOpaque,
2912         caller_instance: RuntimeComponentInstanceIndex,
2913         waitable_handle: u32,
2914         set_handle: u32,
2915     ) -> Result<()> {
2916         let mut instance = self.id().get_mut(store);
2917         instance.check_may_leave(caller_instance)?;
2918         let waitable =
2919             Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
2920 
2921         let set = if set_handle == 0 {
2922             None
2923         } else {
2924             let set = instance.instance_states().0[caller_instance]
2925                 .handle_table()
2926                 .waitable_set_rep(set_handle)?;
2927 
2928             Some(TableId::<WaitableSet>::new(set))
2929         };
2930 
2931         log::trace!(
2932             "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
2933         );
2934 
2935         waitable.join(store.concurrent_state_mut(), set)
2936     }
2937 
2938     /// Implements the `subtask.drop` intrinsic.
2939     pub(crate) fn subtask_drop(
2940         self,
2941         store: &mut StoreOpaque,
2942         caller_instance: RuntimeComponentInstanceIndex,
2943         task_id: u32,
2944     ) -> Result<()> {
2945         self.id().get_mut(store).check_may_leave(caller_instance)?;
2946         self.waitable_join(store, caller_instance, task_id, 0)?;
2947 
2948         let (rep, is_host) = store
2949             .handle_table(RuntimeInstance {
2950                 instance: self.id().instance(),
2951                 index: caller_instance,
2952             })
2953             .subtask_remove(task_id)?;
2954 
2955         let concurrent_state = store.concurrent_state_mut();
2956         let (waitable, expected_caller_instance, delete) = if is_host {
2957             let id = TableId::<HostTask>::new(rep);
2958             let task = concurrent_state.get_mut(id)?;
2959             if task.join_handle.is_some() {
2960                 bail!("cannot drop a subtask which has not yet resolved");
2961             }
2962             (Waitable::Host(id), task.caller_instance, true)
2963         } else {
2964             let id = TableId::<GuestTask>::new(rep);
2965             let task = concurrent_state.get_mut(id)?;
2966             if task.lift_result.is_some() {
2967                 bail!("cannot drop a subtask which has not yet resolved");
2968             }
2969             if let Caller::Guest { instance, .. } = &task.caller {
2970                 (Waitable::Guest(id), *instance, task.exited)
2971             } else {
2972                 unreachable!()
2973             }
2974         };
2975 
2976         waitable.common(concurrent_state)?.handle = None;
2977 
2978         if waitable.take_event(concurrent_state)?.is_some() {
2979             bail!("cannot drop a subtask with an undelivered event");
2980         }
2981 
2982         if delete {
2983             waitable.delete_from(concurrent_state)?;
2984         }
2985 
2986         // Since waitables can neither be passed between instances nor forged,
2987         // this should never fail unless there's a bug in Wasmtime, but we check
2988         // here to be sure:
2989         assert_eq!(expected_caller_instance, caller_instance);
2990         log::trace!("subtask_drop {waitable:?} (handle {task_id})");
2991         Ok(())
2992     }
2993 
2994     /// Implements the `waitable-set.wait` intrinsic.
2995     pub(crate) fn waitable_set_wait(
2996         self,
2997         store: &mut StoreOpaque,
2998         caller: RuntimeComponentInstanceIndex,
2999         options: OptionsIndex,
3000         set: u32,
3001         payload: u32,
3002     ) -> Result<u32> {
3003         self.id().get(store).check_may_leave(caller)?;
3004 
3005         if !self.options(store, options).async_ {
3006             // The caller may only call `waitable-set.wait` from an async task
3007             // (i.e. a task created via a call to an async export).
3008             // Otherwise, we'll trap.
3009             store.concurrent_state_mut().check_blocking()?;
3010         }
3011 
3012         let &CanonicalOptions {
3013             cancellable,
3014             instance: caller_instance,
3015             ..
3016         } = &self.id().get(store).component().env_component().options[options];
3017         let rep = store
3018             .handle_table(RuntimeInstance {
3019                 instance: self.id().instance(),
3020                 index: caller_instance,
3021             })
3022             .waitable_set_rep(set)?;
3023 
3024         self.waitable_check(
3025             store,
3026             cancellable,
3027             WaitableCheck::Wait,
3028             WaitableCheckParams {
3029                 set: TableId::new(rep),
3030                 options,
3031                 payload,
3032             },
3033         )
3034     }
3035 
3036     /// Implements the `waitable-set.poll` intrinsic.
3037     pub(crate) fn waitable_set_poll(
3038         self,
3039         store: &mut StoreOpaque,
3040         caller: RuntimeComponentInstanceIndex,
3041         options: OptionsIndex,
3042         set: u32,
3043         payload: u32,
3044     ) -> Result<u32> {
3045         self.id().get(store).check_may_leave(caller)?;
3046 
3047         let &CanonicalOptions {
3048             cancellable,
3049             instance: caller_instance,
3050             ..
3051         } = &self.id().get(store).component().env_component().options[options];
3052         let rep = store
3053             .handle_table(RuntimeInstance {
3054                 instance: self.id().instance(),
3055                 index: caller_instance,
3056             })
3057             .waitable_set_rep(set)?;
3058 
3059         self.waitable_check(
3060             store,
3061             cancellable,
3062             WaitableCheck::Poll,
3063             WaitableCheckParams {
3064                 set: TableId::new(rep),
3065                 options,
3066                 payload,
3067             },
3068         )
3069     }
3070 
3071     /// Implements the `thread.index` intrinsic.
3072     pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3073         let thread_id = store
3074             .concurrent_state_mut()
3075             .guest_thread
3076             .ok_or_else(|| anyhow!("no current thread"))?
3077             .thread;
3078         // The unwrap is safe because `instance_rep` must be `Some` by this point
3079         Ok(store
3080             .concurrent_state_mut()
3081             .get_mut(thread_id)?
3082             .instance_rep
3083             .unwrap())
3084     }
3085 
3086     /// Implements the `thread.new-indirect` intrinsic.
3087     pub(crate) fn thread_new_indirect<T: 'static>(
3088         self,
3089         mut store: StoreContextMut<T>,
3090         runtime_instance: RuntimeComponentInstanceIndex,
3091         _func_ty_idx: TypeFuncIndex, // currently unused
3092         start_func_table_idx: RuntimeTableIndex,
3093         start_func_idx: u32,
3094         context: i32,
3095     ) -> Result<u32> {
3096         self.id().get(store.0).check_may_leave(runtime_instance)?;
3097 
3098         log::trace!("creating new thread");
3099 
3100         let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3101         let (instance, registry) = self.id().get_mut_and_registry(store.0);
3102         let callee = instance
3103             .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3104             .ok_or_else(|| {
3105                 anyhow!("the start function index points to an uninitialized function")
3106             })?;
3107         if callee.type_index(store.0) != start_func_ty.type_index() {
3108             bail!(
3109                 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3110             );
3111         }
3112 
3113         let token = StoreToken::new(store.as_context_mut());
3114         let start_func = Box::new(
3115             move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3116                 let old_thread = store
3117                     .concurrent_state_mut()
3118                     .guest_thread
3119                     .replace(guest_thread);
3120                 log::trace!(
3121                     "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3122                 );
3123 
3124                 store.maybe_push_call_context(guest_thread.task)?;
3125 
3126                 let mut store = token.as_context_mut(store);
3127                 let mut params = [ValRaw::i32(context)];
3128                 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3129                 // on a separate fiber if we're running in an async store.
3130                 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3131 
3132                 store.0.maybe_pop_call_context(guest_thread.task)?;
3133 
3134                 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3135                 log::trace!("explicit thread {guest_thread:?} completed");
3136                 let state = store.0.concurrent_state_mut();
3137                 let task = state.get_mut(guest_thread.task)?;
3138                 if task.threads.is_empty() && !task.returned_or_cancelled() {
3139                     bail!(Trap::NoAsyncResult);
3140                 }
3141                 state.guest_thread = old_thread;
3142                 old_thread
3143                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3144                 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3145                     Waitable::Guest(guest_thread.task).delete_from(state)?;
3146                 }
3147                 log::trace!("thread start: restored {old_thread:?} as current thread");
3148 
3149                 Ok(())
3150             },
3151         );
3152 
3153         let state = store.0.concurrent_state_mut();
3154         let current_thread = state.guest_thread.unwrap();
3155         let parent_task = current_thread.task;
3156 
3157         let new_thread = GuestThread::new_explicit(parent_task, start_func);
3158         let thread_id = state.push(new_thread)?;
3159         state.get_mut(parent_task)?.threads.insert(thread_id);
3160 
3161         log::trace!("new thread with id {thread_id:?} created");
3162 
3163         self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3164     }
3165 
3166     pub(crate) fn resume_suspended_thread(
3167         self,
3168         store: &mut StoreOpaque,
3169         runtime_instance: RuntimeComponentInstanceIndex,
3170         thread_idx: u32,
3171         high_priority: bool,
3172     ) -> Result<()> {
3173         let thread_id =
3174             GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3175         let state = store.concurrent_state_mut();
3176         let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3177         let thread = state.get_mut(guest_thread.thread)?;
3178 
3179         match mem::replace(&mut thread.state, GuestThreadState::Running) {
3180             GuestThreadState::NotStartedExplicit(start_func) => {
3181                 log::trace!("starting thread {guest_thread:?}");
3182                 let guest_call = WorkItem::GuestCall(GuestCall {
3183                     thread: guest_thread,
3184                     kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3185                         start_func(store, guest_thread)
3186                     })),
3187                 });
3188                 store
3189                     .concurrent_state_mut()
3190                     .push_work_item(guest_call, high_priority);
3191             }
3192             GuestThreadState::Suspended(fiber) => {
3193                 log::trace!("resuming thread {thread_id:?} that was suspended");
3194                 store
3195                     .concurrent_state_mut()
3196                     .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3197             }
3198             _ => {
3199                 bail!("cannot resume thread which is not suspended");
3200             }
3201         }
3202         Ok(())
3203     }
3204 
3205     fn add_guest_thread_to_instance_table(
3206         self,
3207         thread_id: TableId<GuestThread>,
3208         store: &mut StoreOpaque,
3209         runtime_instance: RuntimeComponentInstanceIndex,
3210     ) -> Result<u32> {
3211         let guest_id = store
3212             .handle_table(RuntimeInstance {
3213                 instance: self.id().instance(),
3214                 index: runtime_instance,
3215             })
3216             .guest_thread_insert(thread_id.rep())?;
3217         store
3218             .concurrent_state_mut()
3219             .get_mut(thread_id)?
3220             .instance_rep = Some(guest_id);
3221         Ok(guest_id)
3222     }
3223 
3224     /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`,
3225     /// and `thread.switch-to` intrinsics.
3226     pub(crate) fn suspension_intrinsic(
3227         self,
3228         store: &mut StoreOpaque,
3229         caller: RuntimeComponentInstanceIndex,
3230         cancellable: bool,
3231         yielding: bool,
3232         to_thread: Option<u32>,
3233     ) -> Result<WaitResult> {
3234         self.id().get(store).check_may_leave(caller)?;
3235 
3236         if to_thread.is_none() {
3237             let state = store.concurrent_state_mut();
3238             if yielding {
3239                 // This is a `thread.yield` call
3240                 if !state.may_block(state.guest_thread.unwrap().task) {
3241                     // The spec defines `thread.yield` to be a no-op in a
3242                     // non-blocking context, so we return immediately without giving
3243                     // any other thread a chance to run.
3244                     return Ok(WaitResult::Completed);
3245                 }
3246             } else {
3247                 // The caller may only call `thread.suspend` from an async task
3248                 // (i.e. a task created via a call to an async export).
3249                 // Otherwise, we'll trap.
3250                 state.check_blocking()?;
3251             }
3252         }
3253 
3254         // There could be a pending cancellation from a previous uncancellable wait
3255         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3256             return Ok(WaitResult::Cancelled);
3257         }
3258 
3259         if let Some(thread) = to_thread {
3260             self.resume_suspended_thread(store, caller, thread, true)?;
3261         }
3262 
3263         let state = store.concurrent_state_mut();
3264         let guest_thread = state.guest_thread.unwrap();
3265         let reason = if yielding {
3266             SuspendReason::Yielding {
3267                 thread: guest_thread,
3268             }
3269         } else {
3270             SuspendReason::ExplicitlySuspending {
3271                 thread: guest_thread,
3272                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3273                 // we're handling a `thread.switch-to` call; otherwise it would
3274                 // panic if we called it in a non-blocking context.
3275                 skip_may_block_check: to_thread.is_some(),
3276             }
3277         };
3278 
3279         store.suspend(reason)?;
3280 
3281         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3282             Ok(WaitResult::Cancelled)
3283         } else {
3284             Ok(WaitResult::Completed)
3285         }
3286     }
3287 
3288     /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3289     fn waitable_check(
3290         self,
3291         store: &mut StoreOpaque,
3292         cancellable: bool,
3293         check: WaitableCheck,
3294         params: WaitableCheckParams,
3295     ) -> Result<u32> {
3296         let guest_thread = store.concurrent_state_mut().guest_thread.unwrap();
3297 
3298         log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3299 
3300         let state = store.concurrent_state_mut();
3301         let task = state.get_mut(guest_thread.task)?;
3302 
3303         // If we're waiting, and there are no events immediately available,
3304         // suspend the fiber until that changes.
3305         match &check {
3306             WaitableCheck::Wait => {
3307                 let set = params.set;
3308 
3309                 if (task.event.is_none()
3310                     || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3311                     && state.get_mut(set)?.ready.is_empty()
3312                 {
3313                     if cancellable {
3314                         let old = state
3315                             .get_mut(guest_thread.thread)?
3316                             .wake_on_cancel
3317                             .replace(set);
3318                         assert!(old.is_none());
3319                     }
3320 
3321                     store.suspend(SuspendReason::Waiting {
3322                         set,
3323                         thread: guest_thread,
3324                         skip_may_block_check: false,
3325                     })?;
3326                 }
3327             }
3328             WaitableCheck::Poll => {}
3329         }
3330 
3331         log::trace!(
3332             "waitable check for {guest_thread:?}; set {:?}, part two",
3333             params.set
3334         );
3335 
3336         // Deliver any pending events to the guest and return.
3337         let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3338 
3339         let (ordinal, handle, result) = match &check {
3340             WaitableCheck::Wait => {
3341                 let (event, waitable) = event.unwrap();
3342                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3343                 let (ordinal, result) = event.parts();
3344                 (ordinal, handle, result)
3345             }
3346             WaitableCheck::Poll => {
3347                 if let Some((event, waitable)) = event {
3348                     let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3349                     let (ordinal, result) = event.parts();
3350                     (ordinal, handle, result)
3351                 } else {
3352                     log::trace!(
3353                         "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3354                         guest_thread.task,
3355                         params.set
3356                     );
3357                     let (ordinal, result) = Event::None.parts();
3358                     (ordinal, 0, result)
3359                 }
3360             }
3361         };
3362         let memory = self.options_memory_mut(store, params.options);
3363         let ptr = func::validate_inbounds_dynamic(
3364             &CanonicalAbiInfo::POINTER_PAIR,
3365             memory,
3366             &ValRaw::u32(params.payload),
3367         )?;
3368         memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3369         memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3370         Ok(ordinal)
3371     }
3372 
3373     /// Implements the `subtask.cancel` intrinsic.
3374     pub(crate) fn subtask_cancel(
3375         self,
3376         store: &mut StoreOpaque,
3377         caller_instance: RuntimeComponentInstanceIndex,
3378         async_: bool,
3379         task_id: u32,
3380     ) -> Result<u32> {
3381         self.id().get(store).check_may_leave(caller_instance)?;
3382 
3383         if !async_ {
3384             // The caller may only sync call `subtask.cancel` from an async task
3385             // (i.e. a task created via a call to an async export).  Otherwise,
3386             // we'll trap.
3387             store.concurrent_state_mut().check_blocking()?;
3388         }
3389 
3390         let (rep, is_host) = store
3391             .handle_table(RuntimeInstance {
3392                 instance: self.id().instance(),
3393                 index: caller_instance,
3394             })
3395             .subtask_rep(task_id)?;
3396         let (waitable, expected_caller_instance) = if is_host {
3397             let id = TableId::<HostTask>::new(rep);
3398             (
3399                 Waitable::Host(id),
3400                 store.concurrent_state_mut().get_mut(id)?.caller_instance,
3401             )
3402         } else {
3403             let id = TableId::<GuestTask>::new(rep);
3404             if let Caller::Guest { instance, .. } =
3405                 &store.concurrent_state_mut().get_mut(id)?.caller
3406             {
3407                 (Waitable::Guest(id), *instance)
3408             } else {
3409                 unreachable!()
3410             }
3411         };
3412         // Since waitables can neither be passed between instances nor forged,
3413         // this should never fail unless there's a bug in Wasmtime, but we check
3414         // here to be sure:
3415         assert_eq!(expected_caller_instance, caller_instance);
3416 
3417         log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3418 
3419         let concurrent_state = store.concurrent_state_mut();
3420         if let Waitable::Host(host_task) = waitable {
3421             if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() {
3422                 handle.abort();
3423                 return Ok(Status::ReturnCancelled as u32);
3424             }
3425         } else {
3426             let caller = concurrent_state.guest_thread.unwrap();
3427             let guest_task = TableId::<GuestTask>::new(rep);
3428             let task = concurrent_state.get_mut(guest_task)?;
3429             if !task.already_lowered_parameters() {
3430                 task.lower_params = None;
3431                 task.lift_result = None;
3432 
3433                 // Not yet started; cancel and remove from pending
3434                 let instance = task.instance;
3435                 let pending = &mut store.instance_state(instance).pending;
3436                 let pending_count = pending.len();
3437                 pending.retain(|thread, _| thread.task != guest_task);
3438                 // If there were no pending threads for this task, we're in an error state
3439                 if pending.len() == pending_count {
3440                     bail!("`subtask.cancel` called after terminal status delivered");
3441                 }
3442                 return Ok(Status::StartCancelled as u32);
3443             } else if !task.returned_or_cancelled() {
3444                 // Started, but not yet returned or cancelled; send the
3445                 // `CANCELLED` event
3446                 task.cancel_sent = true;
3447                 // Note that this might overwrite an event that was set earlier
3448                 // (e.g. `Event::None` if the task is yielding, or
3449                 // `Event::Cancelled` if it was already cancelled), but that's
3450                 // okay -- this should supersede the previous state.
3451                 task.event = Some(Event::Cancelled);
3452                 for thread in task.threads.clone() {
3453                     let thread = QualifiedThreadId {
3454                         task: guest_task,
3455                         thread,
3456                     };
3457                     if let Some(set) = concurrent_state
3458                         .get_mut(thread.thread)
3459                         .unwrap()
3460                         .wake_on_cancel
3461                         .take()
3462                     {
3463                         let item = match concurrent_state
3464                             .get_mut(set)?
3465                             .waiting
3466                             .remove(&thread)
3467                             .unwrap()
3468                         {
3469                             WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3470                             WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
3471                                 thread,
3472                                 kind: GuestCallKind::DeliverEvent {
3473                                     instance,
3474                                     set: None,
3475                                 },
3476                             }),
3477                         };
3478                         concurrent_state.push_high_priority(item);
3479 
3480                         store.suspend(SuspendReason::Yielding { thread: caller })?;
3481                         break;
3482                     }
3483                 }
3484 
3485                 let concurrent_state = store.concurrent_state_mut();
3486                 let task = concurrent_state.get_mut(guest_task)?;
3487                 if !task.returned_or_cancelled() {
3488                     if async_ {
3489                         return Ok(BLOCKED);
3490                     } else {
3491                         store.wait_for_event(Waitable::Guest(guest_task))?;
3492                     }
3493                 }
3494             }
3495         }
3496 
3497         let event = waitable.take_event(store.concurrent_state_mut())?;
3498         if let Some(Event::Subtask {
3499             status: status @ (Status::Returned | Status::ReturnCancelled),
3500         }) = event
3501         {
3502             Ok(status as u32)
3503         } else {
3504             bail!("`subtask.cancel` called after terminal status delivered");
3505         }
3506     }
3507 
3508     pub(crate) fn context_get(
3509         self,
3510         store: &mut StoreOpaque,
3511         caller: RuntimeComponentInstanceIndex,
3512         slot: u32,
3513     ) -> Result<u32> {
3514         self.id().get(store).check_may_leave(caller)?;
3515         store.concurrent_state_mut().context_get(slot)
3516     }
3517 
3518     pub(crate) fn context_set(
3519         self,
3520         store: &mut StoreOpaque,
3521         caller: RuntimeComponentInstanceIndex,
3522         slot: u32,
3523         value: u32,
3524     ) -> Result<()> {
3525         self.id().get(store).check_may_leave(caller)?;
3526         store.concurrent_state_mut().context_set(slot, value)
3527     }
3528 }
3529 
3530 /// Trait representing component model ABI async intrinsics and fused adapter
3531 /// helper functions.
3532 ///
3533 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3534 /// which must be valid for at least the duration of the call (and possibly for
3535 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3536 /// pointers used for async calls).
3537 pub trait VMComponentAsyncStore {
3538     /// A helper function for fused adapter modules involving calls where the
3539     /// one of the caller or callee is async.
3540     ///
3541     /// This helper is not used when the caller and callee both use the sync
3542     /// ABI, only when at least one is async is this used.
3543     unsafe fn prepare_call(
3544         &mut self,
3545         instance: Instance,
3546         memory: *mut VMMemoryDefinition,
3547         start: *mut VMFuncRef,
3548         return_: *mut VMFuncRef,
3549         caller_instance: RuntimeComponentInstanceIndex,
3550         callee_instance: RuntimeComponentInstanceIndex,
3551         task_return_type: TypeTupleIndex,
3552         callee_async: bool,
3553         string_encoding: u8,
3554         result_count: u32,
3555         storage: *mut ValRaw,
3556         storage_len: usize,
3557     ) -> Result<()>;
3558 
3559     /// A helper function for fused adapter modules involving calls where the
3560     /// caller is sync-lowered but the callee is async-lifted.
3561     unsafe fn sync_start(
3562         &mut self,
3563         instance: Instance,
3564         callback: *mut VMFuncRef,
3565         callee: *mut VMFuncRef,
3566         param_count: u32,
3567         storage: *mut MaybeUninit<ValRaw>,
3568         storage_len: usize,
3569     ) -> Result<()>;
3570 
3571     /// A helper function for fused adapter modules involving calls where the
3572     /// caller is async-lowered.
3573     unsafe fn async_start(
3574         &mut self,
3575         instance: Instance,
3576         callback: *mut VMFuncRef,
3577         post_return: *mut VMFuncRef,
3578         callee: *mut VMFuncRef,
3579         param_count: u32,
3580         result_count: u32,
3581         flags: u32,
3582     ) -> Result<u32>;
3583 
3584     /// The `future.write` intrinsic.
3585     fn future_write(
3586         &mut self,
3587         instance: Instance,
3588         caller: RuntimeComponentInstanceIndex,
3589         ty: TypeFutureTableIndex,
3590         options: OptionsIndex,
3591         future: u32,
3592         address: u32,
3593     ) -> Result<u32>;
3594 
3595     /// The `future.read` intrinsic.
3596     fn future_read(
3597         &mut self,
3598         instance: Instance,
3599         caller: RuntimeComponentInstanceIndex,
3600         ty: TypeFutureTableIndex,
3601         options: OptionsIndex,
3602         future: u32,
3603         address: u32,
3604     ) -> Result<u32>;
3605 
3606     /// The `future.drop-writable` intrinsic.
3607     fn future_drop_writable(
3608         &mut self,
3609         instance: Instance,
3610         caller: RuntimeComponentInstanceIndex,
3611         ty: TypeFutureTableIndex,
3612         writer: u32,
3613     ) -> Result<()>;
3614 
3615     /// The `stream.write` intrinsic.
3616     fn stream_write(
3617         &mut self,
3618         instance: Instance,
3619         caller: RuntimeComponentInstanceIndex,
3620         ty: TypeStreamTableIndex,
3621         options: OptionsIndex,
3622         stream: u32,
3623         address: u32,
3624         count: u32,
3625     ) -> Result<u32>;
3626 
3627     /// The `stream.read` intrinsic.
3628     fn stream_read(
3629         &mut self,
3630         instance: Instance,
3631         caller: RuntimeComponentInstanceIndex,
3632         ty: TypeStreamTableIndex,
3633         options: OptionsIndex,
3634         stream: u32,
3635         address: u32,
3636         count: u32,
3637     ) -> Result<u32>;
3638 
3639     /// The "fast-path" implementation of the `stream.write` intrinsic for
3640     /// "flat" (i.e. memcpy-able) payloads.
3641     fn flat_stream_write(
3642         &mut self,
3643         instance: Instance,
3644         caller: RuntimeComponentInstanceIndex,
3645         ty: TypeStreamTableIndex,
3646         options: OptionsIndex,
3647         payload_size: u32,
3648         payload_align: u32,
3649         stream: u32,
3650         address: u32,
3651         count: u32,
3652     ) -> Result<u32>;
3653 
3654     /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3655     /// (i.e. memcpy-able) payloads.
3656     fn flat_stream_read(
3657         &mut self,
3658         instance: Instance,
3659         caller: RuntimeComponentInstanceIndex,
3660         ty: TypeStreamTableIndex,
3661         options: OptionsIndex,
3662         payload_size: u32,
3663         payload_align: u32,
3664         stream: u32,
3665         address: u32,
3666         count: u32,
3667     ) -> Result<u32>;
3668 
3669     /// The `stream.drop-writable` intrinsic.
3670     fn stream_drop_writable(
3671         &mut self,
3672         instance: Instance,
3673         caller: RuntimeComponentInstanceIndex,
3674         ty: TypeStreamTableIndex,
3675         writer: u32,
3676     ) -> Result<()>;
3677 
3678     /// The `error-context.debug-message` intrinsic.
3679     fn error_context_debug_message(
3680         &mut self,
3681         instance: Instance,
3682         caller: RuntimeComponentInstanceIndex,
3683         ty: TypeComponentLocalErrorContextTableIndex,
3684         options: OptionsIndex,
3685         err_ctx_handle: u32,
3686         debug_msg_address: u32,
3687     ) -> Result<()>;
3688 
3689     /// The `thread.new-indirect` intrinsic
3690     fn thread_new_indirect(
3691         &mut self,
3692         instance: Instance,
3693         caller: RuntimeComponentInstanceIndex,
3694         func_ty_idx: TypeFuncIndex,
3695         start_func_table_idx: RuntimeTableIndex,
3696         start_func_idx: u32,
3697         context: i32,
3698     ) -> Result<u32>;
3699 }
3700 
3701 /// SAFETY: See trait docs.
3702 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3703     unsafe fn prepare_call(
3704         &mut self,
3705         instance: Instance,
3706         memory: *mut VMMemoryDefinition,
3707         start: *mut VMFuncRef,
3708         return_: *mut VMFuncRef,
3709         caller_instance: RuntimeComponentInstanceIndex,
3710         callee_instance: RuntimeComponentInstanceIndex,
3711         task_return_type: TypeTupleIndex,
3712         callee_async: bool,
3713         string_encoding: u8,
3714         result_count_or_max_if_async: u32,
3715         storage: *mut ValRaw,
3716         storage_len: usize,
3717     ) -> Result<()> {
3718         // SAFETY: The `wasmtime_cranelift`-generated code that calls
3719         // this method will have ensured that `storage` is a valid
3720         // pointer containing at least `storage_len` items.
3721         let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3722 
3723         unsafe {
3724             instance.prepare_call(
3725                 StoreContextMut(self),
3726                 start,
3727                 return_,
3728                 caller_instance,
3729                 callee_instance,
3730                 task_return_type,
3731                 callee_async,
3732                 memory,
3733                 string_encoding,
3734                 match result_count_or_max_if_async {
3735                     PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3736                         params,
3737                         has_result: false,
3738                     },
3739                     PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3740                         params,
3741                         has_result: true,
3742                     },
3743                     result_count => CallerInfo::Sync {
3744                         params,
3745                         result_count,
3746                     },
3747                 },
3748             )
3749         }
3750     }
3751 
3752     unsafe fn sync_start(
3753         &mut self,
3754         instance: Instance,
3755         callback: *mut VMFuncRef,
3756         callee: *mut VMFuncRef,
3757         param_count: u32,
3758         storage: *mut MaybeUninit<ValRaw>,
3759         storage_len: usize,
3760     ) -> Result<()> {
3761         unsafe {
3762             instance
3763                 .start_call(
3764                     StoreContextMut(self),
3765                     callback,
3766                     ptr::null_mut(),
3767                     callee,
3768                     param_count,
3769                     1,
3770                     START_FLAG_ASYNC_CALLEE,
3771                     // SAFETY: The `wasmtime_cranelift`-generated code that calls
3772                     // this method will have ensured that `storage` is a valid
3773                     // pointer containing at least `storage_len` items.
3774                     Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3775                 )
3776                 .map(drop)
3777         }
3778     }
3779 
3780     unsafe fn async_start(
3781         &mut self,
3782         instance: Instance,
3783         callback: *mut VMFuncRef,
3784         post_return: *mut VMFuncRef,
3785         callee: *mut VMFuncRef,
3786         param_count: u32,
3787         result_count: u32,
3788         flags: u32,
3789     ) -> Result<u32> {
3790         unsafe {
3791             instance.start_call(
3792                 StoreContextMut(self),
3793                 callback,
3794                 post_return,
3795                 callee,
3796                 param_count,
3797                 result_count,
3798                 flags,
3799                 None,
3800             )
3801         }
3802     }
3803 
3804     fn future_write(
3805         &mut self,
3806         instance: Instance,
3807         caller: RuntimeComponentInstanceIndex,
3808         ty: TypeFutureTableIndex,
3809         options: OptionsIndex,
3810         future: u32,
3811         address: u32,
3812     ) -> Result<u32> {
3813         instance.id().get(self).check_may_leave(caller)?;
3814         instance
3815             .guest_write(
3816                 StoreContextMut(self),
3817                 caller,
3818                 TransmitIndex::Future(ty),
3819                 options,
3820                 None,
3821                 future,
3822                 address,
3823                 1,
3824             )
3825             .map(|result| result.encode())
3826     }
3827 
3828     fn future_read(
3829         &mut self,
3830         instance: Instance,
3831         caller: RuntimeComponentInstanceIndex,
3832         ty: TypeFutureTableIndex,
3833         options: OptionsIndex,
3834         future: u32,
3835         address: u32,
3836     ) -> Result<u32> {
3837         instance.id().get(self).check_may_leave(caller)?;
3838         instance
3839             .guest_read(
3840                 StoreContextMut(self),
3841                 caller,
3842                 TransmitIndex::Future(ty),
3843                 options,
3844                 None,
3845                 future,
3846                 address,
3847                 1,
3848             )
3849             .map(|result| result.encode())
3850     }
3851 
3852     fn stream_write(
3853         &mut self,
3854         instance: Instance,
3855         caller: RuntimeComponentInstanceIndex,
3856         ty: TypeStreamTableIndex,
3857         options: OptionsIndex,
3858         stream: u32,
3859         address: u32,
3860         count: u32,
3861     ) -> Result<u32> {
3862         instance.id().get(self).check_may_leave(caller)?;
3863         instance
3864             .guest_write(
3865                 StoreContextMut(self),
3866                 caller,
3867                 TransmitIndex::Stream(ty),
3868                 options,
3869                 None,
3870                 stream,
3871                 address,
3872                 count,
3873             )
3874             .map(|result| result.encode())
3875     }
3876 
3877     fn stream_read(
3878         &mut self,
3879         instance: Instance,
3880         caller: RuntimeComponentInstanceIndex,
3881         ty: TypeStreamTableIndex,
3882         options: OptionsIndex,
3883         stream: u32,
3884         address: u32,
3885         count: u32,
3886     ) -> Result<u32> {
3887         instance.id().get(self).check_may_leave(caller)?;
3888         instance
3889             .guest_read(
3890                 StoreContextMut(self),
3891                 caller,
3892                 TransmitIndex::Stream(ty),
3893                 options,
3894                 None,
3895                 stream,
3896                 address,
3897                 count,
3898             )
3899             .map(|result| result.encode())
3900     }
3901 
3902     fn future_drop_writable(
3903         &mut self,
3904         instance: Instance,
3905         caller: RuntimeComponentInstanceIndex,
3906         ty: TypeFutureTableIndex,
3907         writer: u32,
3908     ) -> Result<()> {
3909         instance.id().get(self).check_may_leave(caller)?;
3910         instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
3911     }
3912 
3913     fn flat_stream_write(
3914         &mut self,
3915         instance: Instance,
3916         caller: RuntimeComponentInstanceIndex,
3917         ty: TypeStreamTableIndex,
3918         options: OptionsIndex,
3919         payload_size: u32,
3920         payload_align: u32,
3921         stream: u32,
3922         address: u32,
3923         count: u32,
3924     ) -> Result<u32> {
3925         instance.id().get(self).check_may_leave(caller)?;
3926         instance
3927             .guest_write(
3928                 StoreContextMut(self),
3929                 caller,
3930                 TransmitIndex::Stream(ty),
3931                 options,
3932                 Some(FlatAbi {
3933                     size: payload_size,
3934                     align: payload_align,
3935                 }),
3936                 stream,
3937                 address,
3938                 count,
3939             )
3940             .map(|result| result.encode())
3941     }
3942 
3943     fn flat_stream_read(
3944         &mut self,
3945         instance: Instance,
3946         caller: RuntimeComponentInstanceIndex,
3947         ty: TypeStreamTableIndex,
3948         options: OptionsIndex,
3949         payload_size: u32,
3950         payload_align: u32,
3951         stream: u32,
3952         address: u32,
3953         count: u32,
3954     ) -> Result<u32> {
3955         instance.id().get(self).check_may_leave(caller)?;
3956         instance
3957             .guest_read(
3958                 StoreContextMut(self),
3959                 caller,
3960                 TransmitIndex::Stream(ty),
3961                 options,
3962                 Some(FlatAbi {
3963                     size: payload_size,
3964                     align: payload_align,
3965                 }),
3966                 stream,
3967                 address,
3968                 count,
3969             )
3970             .map(|result| result.encode())
3971     }
3972 
3973     fn stream_drop_writable(
3974         &mut self,
3975         instance: Instance,
3976         caller: RuntimeComponentInstanceIndex,
3977         ty: TypeStreamTableIndex,
3978         writer: u32,
3979     ) -> Result<()> {
3980         instance.id().get(self).check_may_leave(caller)?;
3981         instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
3982     }
3983 
3984     fn error_context_debug_message(
3985         &mut self,
3986         instance: Instance,
3987         caller: RuntimeComponentInstanceIndex,
3988         ty: TypeComponentLocalErrorContextTableIndex,
3989         options: OptionsIndex,
3990         err_ctx_handle: u32,
3991         debug_msg_address: u32,
3992     ) -> Result<()> {
3993         instance.id().get(self).check_may_leave(caller)?;
3994         instance.error_context_debug_message(
3995             StoreContextMut(self),
3996             ty,
3997             options,
3998             err_ctx_handle,
3999             debug_msg_address,
4000         )
4001     }
4002 
4003     fn thread_new_indirect(
4004         &mut self,
4005         instance: Instance,
4006         caller: RuntimeComponentInstanceIndex,
4007         func_ty_idx: TypeFuncIndex,
4008         start_func_table_idx: RuntimeTableIndex,
4009         start_func_idx: u32,
4010         context: i32,
4011     ) -> Result<u32> {
4012         instance.thread_new_indirect(
4013             StoreContextMut(self),
4014             caller,
4015             func_ty_idx,
4016             start_func_table_idx,
4017             start_func_idx,
4018             context,
4019         )
4020     }
4021 }
4022 
4023 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4024 
4025 /// Represents the state of a pending host task.
4026 struct HostTask {
4027     common: WaitableCommon,
4028     caller_instance: RuntimeComponentInstanceIndex,
4029     join_handle: Option<JoinHandle>,
4030 }
4031 
4032 impl HostTask {
4033     fn new(
4034         caller_instance: RuntimeComponentInstanceIndex,
4035         join_handle: Option<JoinHandle>,
4036     ) -> Self {
4037         Self {
4038             common: WaitableCommon::default(),
4039             caller_instance,
4040             join_handle,
4041         }
4042     }
4043 }
4044 
4045 impl TableDebug for HostTask {
4046     fn type_name() -> &'static str {
4047         "HostTask"
4048     }
4049 }
4050 
4051 type CallbackFn = Box<
4052     dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32>
4053         + Send
4054         + Sync
4055         + 'static,
4056 >;
4057 
4058 /// Represents the caller of a given guest task.
4059 enum Caller {
4060     /// The host called the guest task.
4061     Host {
4062         /// If present, may be used to deliver the result.
4063         tx: Option<oneshot::Sender<LiftedResult>>,
4064         /// Channel to notify once all subtasks spawned by this caller have
4065         /// completed.
4066         ///
4067         /// Note that we'll never actually send anything to this channel;
4068         /// dropping it when the refcount goes to zero is sufficient to notify
4069         /// the receiver.
4070         exit_tx: Arc<oneshot::Sender<()>>,
4071         /// If true, there's a host future that must be dropped before the task
4072         /// can be deleted.
4073         host_future_present: bool,
4074         /// If true, call `post-return` function (if any) automatically.
4075         call_post_return_automatically: bool,
4076     },
4077     /// Another guest thread called the guest task
4078     Guest {
4079         /// The id of the caller
4080         thread: QualifiedThreadId,
4081         /// The instance to use to enforce reentrance rules.
4082         ///
4083         /// Note that this might not be the same as the instance the caller task
4084         /// started executing in given that one or more synchronous guest->guest
4085         /// calls may have occurred involving multiple instances.
4086         instance: RuntimeComponentInstanceIndex,
4087     },
4088 }
4089 
4090 /// Represents a closure and related canonical ABI parameters required to
4091 /// validate a `task.return` call at runtime and lift the result.
4092 struct LiftResult {
4093     lift: RawLift,
4094     ty: TypeTupleIndex,
4095     memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4096     string_encoding: StringEncoding,
4097 }
4098 
4099 /// The table ID for a guest thread, qualified by the task to which it belongs.
4100 ///
4101 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4102 /// for the common case of identifying the task to which a thread belongs.
4103 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4104 struct QualifiedThreadId {
4105     task: TableId<GuestTask>,
4106     thread: TableId<GuestThread>,
4107 }
4108 
4109 impl QualifiedThreadId {
4110     fn qualify(
4111         state: &mut ConcurrentState,
4112         thread: TableId<GuestThread>,
4113     ) -> Result<QualifiedThreadId> {
4114         Ok(QualifiedThreadId {
4115             task: state.get_mut(thread)?.parent_task,
4116             thread,
4117         })
4118     }
4119 }
4120 
4121 impl fmt::Debug for QualifiedThreadId {
4122     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4123         f.debug_tuple("QualifiedThreadId")
4124             .field(&self.task.rep())
4125             .field(&self.thread.rep())
4126             .finish()
4127     }
4128 }
4129 
4130 enum GuestThreadState {
4131     NotStartedImplicit,
4132     NotStartedExplicit(
4133         Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4134     ),
4135     Running,
4136     Suspended(StoreFiber<'static>),
4137     Pending,
4138     Completed,
4139 }
4140 pub struct GuestThread {
4141     /// Context-local state used to implement the `context.{get,set}`
4142     /// intrinsics.
4143     context: [u32; 2],
4144     /// The owning guest task.
4145     parent_task: TableId<GuestTask>,
4146     /// If present, indicates that the thread is currently waiting on the
4147     /// specified set but may be cancelled and woken immediately.
4148     wake_on_cancel: Option<TableId<WaitableSet>>,
4149     /// The execution state of this guest thread
4150     state: GuestThreadState,
4151     /// The index of this thread in the component instance's handle table.
4152     /// This must always be `Some` after initialization.
4153     instance_rep: Option<u32>,
4154 }
4155 
4156 impl GuestThread {
4157     /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4158     /// handle.
4159     fn from_instance(
4160         state: Pin<&mut ComponentInstance>,
4161         caller_instance: RuntimeComponentInstanceIndex,
4162         guest_thread: u32,
4163     ) -> Result<TableId<Self>> {
4164         let rep = state.instance_states().0[caller_instance]
4165             .handle_table()
4166             .guest_thread_rep(guest_thread)?;
4167         Ok(TableId::new(rep))
4168     }
4169 
4170     fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4171         Self {
4172             context: [0; 2],
4173             parent_task,
4174             wake_on_cancel: None,
4175             state: GuestThreadState::NotStartedImplicit,
4176             instance_rep: None,
4177         }
4178     }
4179 
4180     fn new_explicit(
4181         parent_task: TableId<GuestTask>,
4182         start_func: Box<
4183             dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4184         >,
4185     ) -> Self {
4186         Self {
4187             context: [0; 2],
4188             parent_task,
4189             wake_on_cancel: None,
4190             state: GuestThreadState::NotStartedExplicit(start_func),
4191             instance_rep: None,
4192         }
4193     }
4194 }
4195 
4196 impl TableDebug for GuestThread {
4197     fn type_name() -> &'static str {
4198         "GuestThread"
4199     }
4200 }
4201 
4202 enum SyncResult {
4203     NotProduced,
4204     Produced(Option<ValRaw>),
4205     Taken,
4206 }
4207 
4208 impl SyncResult {
4209     fn take(&mut self) -> Option<Option<ValRaw>> {
4210         match mem::replace(self, SyncResult::Taken) {
4211             SyncResult::NotProduced => None,
4212             SyncResult::Produced(val) => Some(val),
4213             SyncResult::Taken => {
4214                 panic!("attempted to take a synchronous result that was already taken")
4215             }
4216         }
4217     }
4218 }
4219 
4220 #[derive(Debug)]
4221 enum HostFutureState {
4222     NotApplicable,
4223     Live,
4224     Dropped,
4225 }
4226 
4227 /// Represents a pending guest task.
4228 pub(crate) struct GuestTask {
4229     /// See `WaitableCommon`
4230     common: WaitableCommon,
4231     /// Closure to lower the parameters passed to this task.
4232     lower_params: Option<RawLower>,
4233     /// See `LiftResult`
4234     lift_result: Option<LiftResult>,
4235     /// A place to stash the type-erased lifted result if it can't be delivered
4236     /// immediately.
4237     result: Option<LiftedResult>,
4238     /// Closure to call the callback function for an async-lifted export, if
4239     /// provided.
4240     callback: Option<CallbackFn>,
4241     /// See `Caller`
4242     caller: Caller,
4243     /// A place to stash the call context for managing resource borrows while
4244     /// switching between guest tasks.
4245     call_context: Option<CallContext>,
4246     /// A place to stash the lowered result for a sync-to-async call until it
4247     /// can be returned to the caller.
4248     sync_result: SyncResult,
4249     /// Whether or not the task has been cancelled (i.e. whether the task is
4250     /// permitted to call `task.cancel`).
4251     cancel_sent: bool,
4252     /// Whether or not we've sent a `Status::Starting` event to any current or
4253     /// future waiters for this waitable.
4254     starting_sent: bool,
4255     /// Pending guest subtasks created by this task (directly or indirectly).
4256     ///
4257     /// This is used to re-parent subtasks which are still running when their
4258     /// parent task is disposed.
4259     subtasks: HashSet<TableId<GuestTask>>,
4260     /// Scratch waitable set used to watch subtasks during synchronous calls.
4261     sync_call_set: TableId<WaitableSet>,
4262     /// The runtime instance to which the exported function for this guest task
4263     /// belongs.
4264     ///
4265     /// Note that the task may do a sync->sync call via a fused adapter which
4266     /// results in that task executing code in a different instance, and it may
4267     /// call host functions and intrinsics from that other instance.
4268     instance: RuntimeInstance,
4269     /// If present, a pending `Event::None` or `Event::Cancelled` to be
4270     /// delivered to this task.
4271     event: Option<Event>,
4272     /// The `ExportIndex` of the guest function being called, if known.
4273     function_index: Option<ExportIndex>,
4274     /// Whether or not the task has exited.
4275     exited: bool,
4276     /// Threads belonging to this task
4277     threads: HashSet<TableId<GuestThread>>,
4278     /// The state of the host future that represents an async task, which must
4279     /// be dropped before we can delete the task.
4280     host_future_state: HostFutureState,
4281     /// Indicates whether this task was created for a call to an async-lifted
4282     /// export.
4283     async_function: bool,
4284 }
4285 
4286 impl GuestTask {
4287     fn already_lowered_parameters(&self) -> bool {
4288         // We reset `lower_params` after we lower the parameters
4289         self.lower_params.is_none()
4290     }
4291 
4292     fn returned_or_cancelled(&self) -> bool {
4293         // We reset `lift_result` after we return or exit
4294         self.lift_result.is_none()
4295     }
4296 
4297     fn ready_to_delete(&self) -> bool {
4298         let threads_completed = self.threads.is_empty();
4299         let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4300         let pending_completion_event = matches!(
4301             self.common.event,
4302             Some(Event::Subtask {
4303                 status: Status::Returned | Status::ReturnCancelled
4304             })
4305         );
4306         let ready = threads_completed
4307             && !has_sync_result
4308             && !pending_completion_event
4309             && !matches!(self.host_future_state, HostFutureState::Live);
4310         log::trace!(
4311             "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4312             threads_completed,
4313             has_sync_result,
4314             pending_completion_event,
4315             self.host_future_state
4316         );
4317         ready
4318     }
4319 
4320     fn new(
4321         state: &mut ConcurrentState,
4322         lower_params: RawLower,
4323         lift_result: LiftResult,
4324         caller: Caller,
4325         callback: Option<CallbackFn>,
4326         component_instance: Instance,
4327         instance: RuntimeComponentInstanceIndex,
4328         async_function: bool,
4329     ) -> Result<Self> {
4330         let sync_call_set = state.push(WaitableSet::default())?;
4331         let host_future_state = match &caller {
4332             Caller::Guest { .. } => HostFutureState::NotApplicable,
4333             Caller::Host {
4334                 host_future_present,
4335                 ..
4336             } => {
4337                 if *host_future_present {
4338                     HostFutureState::Live
4339                 } else {
4340                     HostFutureState::NotApplicable
4341                 }
4342             }
4343         };
4344         Ok(Self {
4345             common: WaitableCommon::default(),
4346             lower_params: Some(lower_params),
4347             lift_result: Some(lift_result),
4348             result: None,
4349             callback,
4350             caller,
4351             call_context: Some(CallContext::default()),
4352             sync_result: SyncResult::NotProduced,
4353             cancel_sent: false,
4354             starting_sent: false,
4355             subtasks: HashSet::new(),
4356             sync_call_set,
4357             instance: RuntimeInstance {
4358                 instance: component_instance.id().instance(),
4359                 index: instance,
4360             },
4361             event: None,
4362             function_index: None,
4363             exited: false,
4364             threads: HashSet::new(),
4365             host_future_state,
4366             async_function,
4367         })
4368     }
4369 
4370     /// Dispose of this guest task, reparenting any pending subtasks to the
4371     /// caller.
4372     fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> {
4373         // If there are not-yet-delivered completion events for subtasks in
4374         // `self.sync_call_set`, recursively dispose of those subtasks as well.
4375         for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4376             if let Some(Event::Subtask {
4377                 status: Status::Returned | Status::ReturnCancelled,
4378             }) = waitable.common(state)?.event
4379             {
4380                 waitable.delete_from(state)?;
4381             }
4382         }
4383 
4384         assert!(self.threads.is_empty());
4385 
4386         state.delete(self.sync_call_set)?;
4387 
4388         // Reparent any pending subtasks to the caller.
4389         match &self.caller {
4390             Caller::Guest {
4391                 thread,
4392                 instance: runtime_instance,
4393             } => {
4394                 let task_mut = state.get_mut(thread.task)?;
4395                 let present = task_mut.subtasks.remove(&me);
4396                 assert!(present);
4397 
4398                 for subtask in &self.subtasks {
4399                     task_mut.subtasks.insert(*subtask);
4400                 }
4401 
4402                 for subtask in &self.subtasks {
4403                     state.get_mut(*subtask)?.caller = Caller::Guest {
4404                         thread: *thread,
4405                         instance: *runtime_instance,
4406                     };
4407                 }
4408             }
4409             Caller::Host { exit_tx, .. } => {
4410                 for subtask in &self.subtasks {
4411                     state.get_mut(*subtask)?.caller = Caller::Host {
4412                         tx: None,
4413                         // Clone `exit_tx` to ensure that it is only dropped
4414                         // once all transitive subtasks of the host call have
4415                         // exited:
4416                         exit_tx: exit_tx.clone(),
4417                         host_future_present: false,
4418                         call_post_return_automatically: true,
4419                     };
4420                 }
4421             }
4422         }
4423 
4424         for subtask in self.subtasks {
4425             if state.get_mut(subtask)?.exited {
4426                 Waitable::Guest(subtask).delete_from(state)?;
4427             }
4428         }
4429 
4430         Ok(())
4431     }
4432 
4433     fn call_post_return_automatically(&self) -> bool {
4434         matches!(
4435             self.caller,
4436             Caller::Guest { .. }
4437                 | Caller::Host {
4438                     call_post_return_automatically: true,
4439                     ..
4440                 }
4441         )
4442     }
4443 }
4444 
4445 impl TableDebug for GuestTask {
4446     fn type_name() -> &'static str {
4447         "GuestTask"
4448     }
4449 }
4450 
4451 /// Represents state common to all kinds of waitables.
4452 #[derive(Default)]
4453 struct WaitableCommon {
4454     /// The currently pending event for this waitable, if any.
4455     event: Option<Event>,
4456     /// The set to which this waitable belongs, if any.
4457     set: Option<TableId<WaitableSet>>,
4458     /// The handle with which the guest refers to this waitable, if any.
4459     handle: Option<u32>,
4460 }
4461 
4462 /// Represents a Component Model Async `waitable`.
4463 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4464 enum Waitable {
4465     /// A host task
4466     Host(TableId<HostTask>),
4467     /// A guest task
4468     Guest(TableId<GuestTask>),
4469     /// The read or write end of a stream or future
4470     Transmit(TableId<TransmitHandle>),
4471 }
4472 
4473 impl Waitable {
4474     /// Retrieve the `Waitable` corresponding to the specified guest-visible
4475     /// handle.
4476     fn from_instance(
4477         state: Pin<&mut ComponentInstance>,
4478         caller_instance: RuntimeComponentInstanceIndex,
4479         waitable: u32,
4480     ) -> Result<Self> {
4481         use crate::runtime::vm::component::Waitable;
4482 
4483         let (waitable, kind) = state.instance_states().0[caller_instance]
4484             .handle_table()
4485             .waitable_rep(waitable)?;
4486 
4487         Ok(match kind {
4488             Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4489             Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4490             Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4491         })
4492     }
4493 
4494     /// Retrieve the host-visible identifier for this `Waitable`.
4495     fn rep(&self) -> u32 {
4496         match self {
4497             Self::Host(id) => id.rep(),
4498             Self::Guest(id) => id.rep(),
4499             Self::Transmit(id) => id.rep(),
4500         }
4501     }
4502 
4503     /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4504     /// remove it from any set it may currently belong to (when `set` is
4505     /// `None`).
4506     fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4507         log::trace!("waitable {self:?} join set {set:?}",);
4508 
4509         let old = mem::replace(&mut self.common(state)?.set, set);
4510 
4511         if let Some(old) = old {
4512             match *self {
4513                 Waitable::Host(id) => state.remove_child(id, old),
4514                 Waitable::Guest(id) => state.remove_child(id, old),
4515                 Waitable::Transmit(id) => state.remove_child(id, old),
4516             }?;
4517 
4518             state.get_mut(old)?.ready.remove(self);
4519         }
4520 
4521         if let Some(set) = set {
4522             match *self {
4523                 Waitable::Host(id) => state.add_child(id, set),
4524                 Waitable::Guest(id) => state.add_child(id, set),
4525                 Waitable::Transmit(id) => state.add_child(id, set),
4526             }?;
4527 
4528             if self.common(state)?.event.is_some() {
4529                 self.mark_ready(state)?;
4530             }
4531         }
4532 
4533         Ok(())
4534     }
4535 
4536     /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4537     fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4538         Ok(match self {
4539             Self::Host(id) => &mut state.get_mut(*id)?.common,
4540             Self::Guest(id) => &mut state.get_mut(*id)?.common,
4541             Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4542         })
4543     }
4544 
4545     /// Set or clear the pending event for this waitable and either deliver it
4546     /// to the first waiter, if any, or mark it as ready to be delivered to the
4547     /// next waiter that arrives.
4548     fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4549         log::trace!("set event for {self:?}: {event:?}");
4550         self.common(state)?.event = event;
4551         self.mark_ready(state)
4552     }
4553 
4554     /// Take the pending event from this waitable, leaving `None` in its place.
4555     fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4556         let common = self.common(state)?;
4557         let event = common.event.take();
4558         if let Some(set) = self.common(state)?.set {
4559             state.get_mut(set)?.ready.remove(self);
4560         }
4561 
4562         Ok(event)
4563     }
4564 
4565     /// Deliver the current event for this waitable to the first waiter, if any,
4566     /// or else mark it as ready to be delivered to the next waiter that
4567     /// arrives.
4568     fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4569         if let Some(set) = self.common(state)?.set {
4570             state.get_mut(set)?.ready.insert(*self);
4571             if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4572                 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4573                 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4574 
4575                 let item = match mode {
4576                     WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4577                     WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall {
4578                         thread,
4579                         kind: GuestCallKind::DeliverEvent {
4580                             instance,
4581                             set: Some(set),
4582                         },
4583                     }),
4584                 };
4585                 state.push_high_priority(item);
4586             }
4587         }
4588         Ok(())
4589     }
4590 
4591     /// Remove this waitable from the instance's rep table.
4592     fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4593         match self {
4594             Self::Host(task) => {
4595                 log::trace!("delete host task {task:?}");
4596                 state.delete(*task)?;
4597             }
4598             Self::Guest(task) => {
4599                 log::trace!("delete guest task {task:?}");
4600                 state.delete(*task)?.dispose(state, *task)?;
4601             }
4602             Self::Transmit(task) => {
4603                 state.delete(*task)?;
4604             }
4605         }
4606 
4607         Ok(())
4608     }
4609 }
4610 
4611 impl fmt::Debug for Waitable {
4612     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4613         match self {
4614             Self::Host(id) => write!(f, "{id:?}"),
4615             Self::Guest(id) => write!(f, "{id:?}"),
4616             Self::Transmit(id) => write!(f, "{id:?}"),
4617         }
4618     }
4619 }
4620 
4621 /// Represents a Component Model Async `waitable-set`.
4622 #[derive(Default)]
4623 struct WaitableSet {
4624     /// Which waitables in this set have pending events, if any.
4625     ready: BTreeSet<Waitable>,
4626     /// Which guest threads are currently waiting on this set, if any.
4627     waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4628 }
4629 
4630 impl TableDebug for WaitableSet {
4631     fn type_name() -> &'static str {
4632         "WaitableSet"
4633     }
4634 }
4635 
4636 /// Type-erased closure to lower the parameters for a guest task.
4637 type RawLower =
4638     Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4639 
4640 /// Type-erased closure to lift the result for a guest task.
4641 type RawLift = Box<
4642     dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4643 >;
4644 
4645 /// Type erased result of a guest task which may be downcast to the expected
4646 /// type by a host caller (or simply ignored in the case of a guest caller; see
4647 /// `DummyResult`).
4648 type LiftedResult = Box<dyn Any + Send + Sync>;
4649 
4650 /// Used to return a result from a `LiftFn` when the actual result has already
4651 /// been lowered to a guest task's stack and linear memory.
4652 struct DummyResult;
4653 
4654 /// Represents the Component Model Async state of a (sub-)component instance.
4655 #[derive(Default)]
4656 pub struct ConcurrentInstanceState {
4657     /// Whether backpressure is set for this instance (enabled if >0)
4658     backpressure: u16,
4659     /// Whether this instance can be entered
4660     do_not_enter: bool,
4661     /// Pending calls for this instance which require `Self::backpressure` to be
4662     /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4663     pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4664 }
4665 
4666 impl ConcurrentInstanceState {
4667     pub fn pending_is_empty(&self) -> bool {
4668         self.pending.is_empty()
4669     }
4670 }
4671 
4672 /// Represents the Component Model Async state of a store.
4673 pub struct ConcurrentState {
4674     /// The currently running guest thread, if any.
4675     guest_thread: Option<QualifiedThreadId>,
4676 
4677     /// The set of pending host and background tasks, if any.
4678     ///
4679     /// See `ComponentInstance::poll_until` for where we temporarily take this
4680     /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4681     futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4682     /// The table of waitables, waitable sets, etc.
4683     table: AlwaysMut<ResourceTable>,
4684     /// The "high priority" work queue for this store's event loop.
4685     high_priority: Vec<WorkItem>,
4686     /// The "low priority" work queue for this store's event loop.
4687     low_priority: Vec<WorkItem>,
4688     /// A place to stash the reason a fiber is suspending so that the code which
4689     /// resumed it will know under what conditions the fiber should be resumed
4690     /// again.
4691     suspend_reason: Option<SuspendReason>,
4692     /// A cached fiber which is waiting for work to do.
4693     ///
4694     /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4695     worker: Option<StoreFiber<'static>>,
4696     /// A place to stash the work item for which we're resuming a worker fiber.
4697     worker_item: Option<WorkerItem>,
4698 
4699     /// Reference counts for all component error contexts
4700     ///
4701     /// NOTE: it is possible the global ref count to be *greater* than the sum of
4702     /// (sub)component ref counts as tracked by `error_context_tables`, for
4703     /// example when the host holds one or more references to error contexts.
4704     ///
4705     /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4706     /// component-wide representation) of the index into concurrent state for a given
4707     /// stored `ErrorContext`.
4708     ///
4709     /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4710     /// as a `TableId<ErrorContextState>`.
4711     global_error_context_ref_counts:
4712         BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4713 }
4714 
4715 impl Default for ConcurrentState {
4716     fn default() -> Self {
4717         Self {
4718             guest_thread: None,
4719             table: AlwaysMut::new(ResourceTable::new()),
4720             futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4721             high_priority: Vec::new(),
4722             low_priority: Vec::new(),
4723             suspend_reason: None,
4724             worker: None,
4725             worker_item: None,
4726             global_error_context_ref_counts: BTreeMap::new(),
4727         }
4728     }
4729 }
4730 
4731 impl ConcurrentState {
4732     /// Take ownership of any fibers and futures owned by this object.
4733     ///
4734     /// This should be used when disposing of the `Store` containing this object
4735     /// in order to gracefully resolve any and all fibers using
4736     /// `StoreFiber::dispose`.  This is necessary to avoid possible
4737     /// use-after-free bugs due to fibers which may still have access to the
4738     /// `Store`.
4739     ///
4740     /// Additionally, the futures collected with this function should be dropped
4741     /// within a `tls::set` call, which will ensure than any futures closing
4742     /// over an `&Accessor` will have access to the store when dropped, allowing
4743     /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4744     /// panicking.
4745     ///
4746     /// Note that this will leave the object in an inconsistent and unusable
4747     /// state, so it should only be used just prior to dropping it.
4748     pub(crate) fn take_fibers_and_futures(
4749         &mut self,
4750         fibers: &mut Vec<StoreFiber<'static>>,
4751         futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4752     ) {
4753         for entry in self.table.get_mut().iter_mut() {
4754             if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4755                 for mode in mem::take(&mut set.waiting).into_values() {
4756                     if let WaitMode::Fiber(fiber) = mode {
4757                         fibers.push(fiber);
4758                     }
4759                 }
4760             } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4761                 if let GuestThreadState::Suspended(fiber) =
4762                     mem::replace(&mut thread.state, GuestThreadState::Completed)
4763                 {
4764                     fibers.push(fiber);
4765                 }
4766             }
4767         }
4768 
4769         if let Some(fiber) = self.worker.take() {
4770             fibers.push(fiber);
4771         }
4772 
4773         let mut take_items = |list| {
4774             for item in mem::take(list) {
4775                 match item {
4776                     WorkItem::ResumeFiber(fiber) => {
4777                         fibers.push(fiber);
4778                     }
4779                     WorkItem::PushFuture(future) => {
4780                         self.futures
4781                             .get_mut()
4782                             .as_mut()
4783                             .unwrap()
4784                             .push(future.into_inner());
4785                     }
4786                     _ => {}
4787                 }
4788             }
4789         };
4790 
4791         take_items(&mut self.high_priority);
4792         take_items(&mut self.low_priority);
4793 
4794         if let Some(them) = self.futures.get_mut().take() {
4795             futures.push(them);
4796         }
4797     }
4798 
4799     fn push<V: Send + Sync + 'static>(
4800         &mut self,
4801         value: V,
4802     ) -> Result<TableId<V>, ResourceTableError> {
4803         self.table.get_mut().push(value).map(TableId::from)
4804     }
4805 
4806     fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4807         self.table.get_mut().get_mut(&Resource::from(id))
4808     }
4809 
4810     pub fn add_child<T: 'static, U: 'static>(
4811         &mut self,
4812         child: TableId<T>,
4813         parent: TableId<U>,
4814     ) -> Result<(), ResourceTableError> {
4815         self.table
4816             .get_mut()
4817             .add_child(Resource::from(child), Resource::from(parent))
4818     }
4819 
4820     pub fn remove_child<T: 'static, U: 'static>(
4821         &mut self,
4822         child: TableId<T>,
4823         parent: TableId<U>,
4824     ) -> Result<(), ResourceTableError> {
4825         self.table
4826             .get_mut()
4827             .remove_child(Resource::from(child), Resource::from(parent))
4828     }
4829 
4830     fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4831         self.table.get_mut().delete(Resource::from(id))
4832     }
4833 
4834     fn push_future(&mut self, future: HostTaskFuture) {
4835         // Note that we can't directly push to `ConcurrentState::futures` here
4836         // since this may be called from a future that's being polled inside
4837         // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4838         // so it has exclusive access while polling it.  Therefore, we push a
4839         // work item to the "high priority" queue, which will actually push to
4840         // `ConcurrentState::futures` later.
4841         self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4842     }
4843 
4844     fn push_high_priority(&mut self, item: WorkItem) {
4845         log::trace!("push high priority: {item:?}");
4846         self.high_priority.push(item);
4847     }
4848 
4849     fn push_low_priority(&mut self, item: WorkItem) {
4850         log::trace!("push low priority: {item:?}");
4851         self.low_priority.push(item);
4852     }
4853 
4854     fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
4855         if high_priority {
4856             self.push_high_priority(item);
4857         } else {
4858             self.push_low_priority(item);
4859         }
4860     }
4861 
4862     /// Determine whether the instance associated with the specified guest task
4863     /// may be entered (i.e. is not already on the async call stack).
4864     ///
4865     /// This is an additional check on top of the "may_enter" instance flag;
4866     /// it's needed because async-lifted exports with callback functions must
4867     /// not call their own instances directly or indirectly, and due to the
4868     /// "stackless" nature of callback-enabled guest tasks this may happen even
4869     /// if there are no activation records on the stack (i.e. the "may_enter"
4870     /// field is `true`) for that instance.
4871     fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool {
4872         let guest_instance = self.get_mut(guest_task).unwrap().instance;
4873 
4874         // Walk the task tree back to the root, looking for potential
4875         // reentrance.
4876         //
4877         // TODO: This could be optimized by maintaining a per-`GuestTask` bitset
4878         // such that each bit represents and instance which has been entered by
4879         // that task or an ancestor of that task, in which case this would be a
4880         // constant time check.
4881         loop {
4882             let next_thread = match &self.get_mut(guest_task).unwrap().caller {
4883                 Caller::Host { .. } => break true,
4884                 Caller::Guest { thread, instance } => {
4885                     if *instance == guest_instance.index {
4886                         break false;
4887                     } else {
4888                         *thread
4889                     }
4890                 }
4891             };
4892             guest_task = next_thread.task;
4893         }
4894     }
4895 
4896     /// Implements the `context.get` intrinsic.
4897     pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
4898         let thread = self.guest_thread.unwrap();
4899         let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
4900         log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
4901         Ok(val)
4902     }
4903 
4904     /// Implements the `context.set` intrinsic.
4905     pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
4906         let thread = self.guest_thread.unwrap();
4907         log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
4908         self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
4909         Ok(())
4910     }
4911 
4912     /// Returns whether there's a pending cancellation on the current guest thread,
4913     /// consuming the event if so.
4914     fn take_pending_cancellation(&mut self) -> bool {
4915         let thread = self.guest_thread.unwrap();
4916         if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
4917             assert!(matches!(event, Event::Cancelled));
4918             true
4919         } else {
4920             false
4921         }
4922     }
4923 
4924     fn check_blocking(&mut self) -> Result<()> {
4925         let task = self.guest_thread.unwrap().task;
4926         self.check_blocking_for(task)
4927     }
4928 
4929     fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
4930         if self.may_block(task) {
4931             Ok(())
4932         } else {
4933             Err(Trap::CannotBlockSyncTask.into())
4934         }
4935     }
4936 
4937     fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
4938         let task = self.get_mut(task).unwrap();
4939         task.async_function || task.returned_or_cancelled()
4940     }
4941 }
4942 
4943 /// Provide a type hint to compiler about the shape of a parameter lower
4944 /// closure.
4945 fn for_any_lower<
4946     F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
4947 >(
4948     fun: F,
4949 ) -> F {
4950     fun
4951 }
4952 
4953 /// Provide a type hint to compiler about the shape of a result lift closure.
4954 fn for_any_lift<
4955     F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4956 >(
4957     fun: F,
4958 ) -> F {
4959     fun
4960 }
4961 
4962 /// Wrap the specified future in a `poll_fn` which asserts that the future is
4963 /// only polled from the event loop of the specified `Store`.
4964 ///
4965 /// See `StoreContextMut::run_concurrent` for details.
4966 fn checked<F: Future + Send + 'static>(
4967     id: StoreId,
4968     fut: F,
4969 ) -> impl Future<Output = F::Output> + Send + 'static {
4970     async move {
4971         let mut fut = pin!(fut);
4972         future::poll_fn(move |cx| {
4973             let message = "\
4974                 `Future`s which depend on asynchronous component tasks, streams, or \
4975                 futures to complete may only be polled from the event loop of the \
4976                 store to which they belong.  Please use \
4977                 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
4978             ";
4979             tls::try_get(|store| {
4980                 let matched = match store {
4981                     tls::TryGet::Some(store) => store.id() == id,
4982                     tls::TryGet::Taken | tls::TryGet::None => false,
4983                 };
4984 
4985                 if !matched {
4986                     panic!("{message}")
4987                 }
4988             });
4989             fut.as_mut().poll(cx)
4990         })
4991         .await
4992     }
4993 }
4994 
4995 /// Assert that `StoreContextMut::run_concurrent` has not been called from
4996 /// within an store's event loop.
4997 fn check_recursive_run() {
4998     tls::try_get(|store| {
4999         if !matches!(store, tls::TryGet::None) {
5000             panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5001         }
5002     });
5003 }
5004 
5005 fn unpack_callback_code(code: u32) -> (u32, u32) {
5006     (code & 0xF, code >> 4)
5007 }
5008 
5009 /// Helper struct for packaging parameters to be passed to
5010 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5011 /// `waitable-set.poll`.
5012 struct WaitableCheckParams {
5013     set: TableId<WaitableSet>,
5014     options: OptionsIndex,
5015     payload: u32,
5016 }
5017 
5018 /// Indicates whether `ComponentInstance::waitable_check` is being called for
5019 /// `waitable-set.wait` or `waitable-set.poll`.
5020 enum WaitableCheck {
5021     Wait,
5022     Poll,
5023 }
5024 
5025 /// Represents a guest task called from the host, prepared using `prepare_call`.
5026 pub(crate) struct PreparedCall<R> {
5027     /// The guest export to be called
5028     handle: Func,
5029     /// The guest thread created by `prepare_call`
5030     thread: QualifiedThreadId,
5031     /// The number of lowered core Wasm parameters to pass to the call.
5032     param_count: usize,
5033     /// The `oneshot::Receiver` to which the result of the call will be
5034     /// delivered when it is available.
5035     rx: oneshot::Receiver<LiftedResult>,
5036     /// The `oneshot::Receiver` which will resolve when the task -- and any
5037     /// transitive subtasks -- have all exited.
5038     exit_rx: oneshot::Receiver<()>,
5039     _phantom: PhantomData<R>,
5040 }
5041 
5042 impl<R> PreparedCall<R> {
5043     /// Get a copy of the `TaskId` for this `PreparedCall`.
5044     pub(crate) fn task_id(&self) -> TaskId {
5045         TaskId {
5046             task: self.thread.task,
5047         }
5048     }
5049 }
5050 
5051 /// Represents a task created by `prepare_call`.
5052 pub(crate) struct TaskId {
5053     task: TableId<GuestTask>,
5054 }
5055 
5056 impl TaskId {
5057     /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5058     /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5059     /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5060     /// and can be resumed by other tasks for this component, so we mark the future as dropped
5061     /// and delete the task when all threads are done.
5062     pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5063         let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5064         if !task.already_lowered_parameters() {
5065             Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5066         } else {
5067             task.host_future_state = HostFutureState::Dropped;
5068             if task.ready_to_delete() {
5069                 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5070             }
5071         }
5072         Ok(())
5073     }
5074 }
5075 
5076 /// Prepare a call to the specified exported Wasm function, providing functions
5077 /// for lowering the parameters and lifting the result.
5078 ///
5079 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5080 /// loop, use `queue_call`.
5081 pub(crate) fn prepare_call<T, R>(
5082     mut store: StoreContextMut<T>,
5083     handle: Func,
5084     param_count: usize,
5085     host_future_present: bool,
5086     call_post_return_automatically: bool,
5087     lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5088     + Send
5089     + Sync
5090     + 'static,
5091     lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5092     + Send
5093     + Sync
5094     + 'static,
5095 ) -> Result<PreparedCall<R>> {
5096     let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5097 
5098     let instance = handle.instance().id().get(store.0);
5099     let options = &instance.component().env_component().options[options];
5100     let ty = &instance.component().types()[ty];
5101     let async_function = ty.async_;
5102     let task_return_type = ty.results;
5103     let component_instance = raw_options.instance;
5104     let callback = options.callback.map(|i| instance.runtime_callback(i));
5105     let memory = options
5106         .memory()
5107         .map(|i| instance.runtime_memory(i))
5108         .map(SendSyncPtr::new);
5109     let string_encoding = options.string_encoding;
5110     let token = StoreToken::new(store.as_context_mut());
5111     let state = store.0.concurrent_state_mut();
5112 
5113     let (tx, rx) = oneshot::channel();
5114     let (exit_tx, exit_rx) = oneshot::channel();
5115 
5116     let mut task = GuestTask::new(
5117         state,
5118         Box::new(for_any_lower(move |store, params| {
5119             lower_params(handle, token.as_context_mut(store), params)
5120         })),
5121         LiftResult {
5122             lift: Box::new(for_any_lift(move |store, result| {
5123                 lift_result(handle, store, result)
5124             })),
5125             ty: task_return_type,
5126             memory,
5127             string_encoding,
5128         },
5129         Caller::Host {
5130             tx: Some(tx),
5131             exit_tx: Arc::new(exit_tx),
5132             host_future_present,
5133             call_post_return_automatically,
5134         },
5135         callback.map(|callback| {
5136             let callback = SendSyncPtr::new(callback);
5137             let instance = handle.instance();
5138             Box::new(
5139                 move |store: &mut dyn VMStore, runtime_instance, event, handle| {
5140                     let store = token.as_context_mut(store);
5141                     // SAFETY: Per the contract of `prepare_call`, the callback
5142                     // will remain valid at least as long is this task exists.
5143                     unsafe {
5144                         instance.call_callback(
5145                             store,
5146                             runtime_instance,
5147                             callback,
5148                             event,
5149                             handle,
5150                             call_post_return_automatically,
5151                         )
5152                     }
5153                 },
5154             ) as CallbackFn
5155         }),
5156         handle.instance(),
5157         component_instance,
5158         async_function,
5159     )?;
5160     task.function_index = Some(handle.index());
5161 
5162     let task = state.push(task)?;
5163     let thread = state.push(GuestThread::new_implicit(task))?;
5164     state.get_mut(task)?.threads.insert(thread);
5165 
5166     Ok(PreparedCall {
5167         handle,
5168         thread: QualifiedThreadId { task, thread },
5169         param_count,
5170         rx,
5171         exit_rx,
5172         _phantom: PhantomData,
5173     })
5174 }
5175 
5176 /// Queue a call previously prepared using `prepare_call` to be run as part of
5177 /// the associated `ComponentInstance`'s event loop.
5178 ///
5179 /// The returned future will resolve to the result once it is available, but
5180 /// must only be polled via the instance's event loop. See
5181 /// `StoreContextMut::run_concurrent` for details.
5182 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5183     mut store: StoreContextMut<T>,
5184     prepared: PreparedCall<R>,
5185 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> {
5186     let PreparedCall {
5187         handle,
5188         thread,
5189         param_count,
5190         rx,
5191         exit_rx,
5192         ..
5193     } = prepared;
5194 
5195     queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5196 
5197     Ok(checked(
5198         store.0.id(),
5199         rx.map(move |result| {
5200             result
5201                 .map(|v| (*v.downcast().unwrap(), exit_rx))
5202                 .map_err(anyhow::Error::from)
5203         }),
5204     ))
5205 }
5206 
5207 /// Queue a call previously prepared using `prepare_call` to be run as part of
5208 /// the associated `ComponentInstance`'s event loop.
5209 fn queue_call0<T: 'static>(
5210     store: StoreContextMut<T>,
5211     handle: Func,
5212     guest_thread: QualifiedThreadId,
5213     param_count: usize,
5214 ) -> Result<()> {
5215     let (_options, flags, _ty, raw_options) = handle.abi_info(store.0);
5216     let is_concurrent = raw_options.async_;
5217     let callback = raw_options.callback;
5218     let instance = handle.instance();
5219     let callee = handle.lifted_core_func(store.0);
5220     let post_return = handle.post_return_core_func(store.0);
5221     let callback = callback.map(|i| {
5222         let instance = instance.id().get(store.0);
5223         SendSyncPtr::new(instance.runtime_callback(i))
5224     });
5225 
5226     log::trace!("queueing call {guest_thread:?}");
5227 
5228     let instance_flags = if callback.is_none() {
5229         None
5230     } else {
5231         Some(flags)
5232     };
5233 
5234     // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5235     // (with signatures appropriate for this call) and will remain valid as
5236     // long as this instance is valid.
5237     unsafe {
5238         instance.queue_call(
5239             store,
5240             guest_thread,
5241             SendSyncPtr::new(callee),
5242             param_count,
5243             1,
5244             instance_flags,
5245             is_concurrent,
5246             callback,
5247             post_return.map(SendSyncPtr::new),
5248         )
5249     }
5250 }
5251