1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics.  See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create.  Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest.  This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker.  That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52 
53 use crate::component::func::{self, Func, call_post_return};
54 use crate::component::{
55     HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
56 };
57 use crate::fiber::{self, StoreFiber, StoreFiberYield};
58 use crate::prelude::*;
59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
60 use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
61 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
62 use crate::{
63     AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType,
64     bail, error::format_err,
65 };
66 use error_contexts::GlobalErrorContextRefCount;
67 use futures::channel::oneshot;
68 use futures::future::{self, FutureExt};
69 use futures::stream::{FuturesUnordered, StreamExt};
70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71 use std::any::Any;
72 use std::borrow::ToOwned;
73 use std::boxed::Box;
74 use std::cell::UnsafeCell;
75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76 use std::fmt;
77 use std::future::Future;
78 use std::marker::PhantomData;
79 use std::mem::{self, ManuallyDrop, MaybeUninit};
80 use std::ops::DerefMut;
81 use std::pin::{Pin, pin};
82 use std::ptr::{self, NonNull};
83 use std::task::{Context, Poll, Waker};
84 use std::vec::Vec;
85 use table::{TableDebug, TableId};
86 use wasmtime_environ::Trap;
87 use wasmtime_environ::component::{
88     CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89     MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90     RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91     TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92     TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93 };
94 use wasmtime_environ::packed_option::ReservedValue;
95 
96 pub use abort::JoinHandle;
97 pub use future_stream_any::{FutureAny, StreamAny};
98 pub use futures_and_streams::{
99     Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100     FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101     StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102 };
103 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104 
105 mod abort;
106 mod error_contexts;
107 mod future_stream_any;
108 mod futures_and_streams;
109 pub(crate) mod table;
110 pub(crate) mod tls;
111 
112 /// Constant defined in the Component Model spec to indicate that the async
113 /// intrinsic (e.g. `future.write`) has not yet completed.
114 const BLOCKED: u32 = 0xffff_ffff;
115 
116 /// Corresponds to `CallState` in the upstream spec.
117 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
118 pub enum Status {
119     Starting = 0,
120     Started = 1,
121     Returned = 2,
122     StartCancelled = 3,
123     ReturnCancelled = 4,
124 }
125 
126 impl Status {
127     /// Packs this status and the optional `waitable` provided into a 32-bit
128     /// result that the canonical ABI requires.
129     ///
130     /// The low 4 bits are reserved for the status while the upper 28 bits are
131     /// the waitable, if present.
132     pub fn pack(self, waitable: Option<u32>) -> u32 {
133         assert!(matches!(self, Status::Returned) == waitable.is_none());
134         let waitable = waitable.unwrap_or(0);
135         assert!(waitable < (1 << 28));
136         (waitable << 4) | (self as u32)
137     }
138 }
139 
140 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
141 /// data.
142 #[derive(Clone, Copy, Debug)]
143 enum Event {
144     None,
145     Cancelled,
146     Subtask {
147         status: Status,
148     },
149     StreamRead {
150         code: ReturnCode,
151         pending: Option<(TypeStreamTableIndex, u32)>,
152     },
153     StreamWrite {
154         code: ReturnCode,
155         pending: Option<(TypeStreamTableIndex, u32)>,
156     },
157     FutureRead {
158         code: ReturnCode,
159         pending: Option<(TypeFutureTableIndex, u32)>,
160     },
161     FutureWrite {
162         code: ReturnCode,
163         pending: Option<(TypeFutureTableIndex, u32)>,
164     },
165 }
166 
167 impl Event {
168     /// Lower this event to core Wasm integers for delivery to the guest.
169     ///
170     /// Note that the waitable handle, if any, is assumed to be lowered
171     /// separately.
172     fn parts(self) -> (u32, u32) {
173         const EVENT_NONE: u32 = 0;
174         const EVENT_SUBTASK: u32 = 1;
175         const EVENT_STREAM_READ: u32 = 2;
176         const EVENT_STREAM_WRITE: u32 = 3;
177         const EVENT_FUTURE_READ: u32 = 4;
178         const EVENT_FUTURE_WRITE: u32 = 5;
179         const EVENT_CANCELLED: u32 = 6;
180         match self {
181             Event::None => (EVENT_NONE, 0),
182             Event::Cancelled => (EVENT_CANCELLED, 0),
183             Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184             Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185             Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186             Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187             Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188         }
189     }
190 }
191 
192 /// Corresponds to `CallbackCode` in the spec.
193 mod callback_code {
194     pub const EXIT: u32 = 0;
195     pub const YIELD: u32 = 1;
196     pub const WAIT: u32 = 2;
197 }
198 
199 /// A flag indicating that the callee is an async-lowered export.
200 ///
201 /// This may be passed to the `async-start` intrinsic from a fused adapter.
202 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203 
204 /// Provides access to either store data (via the `get` method) or the store
205 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206 /// instance to which the current host task belongs.
207 ///
208 /// See [`Accessor::with`] for details.
209 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210     store: StoreContextMut<'a, T>,
211     get_data: fn(&mut T) -> D::Data<'_>,
212 }
213 
214 impl<'a, T, D> Access<'a, T, D>
215 where
216     D: HasData + ?Sized,
217     T: 'static,
218 {
219     /// Creates a new [`Access`] from its component parts.
220     pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221         Self { store, get_data }
222     }
223 
224     /// Get mutable access to the store data.
225     pub fn data_mut(&mut self) -> &mut T {
226         self.store.data_mut()
227     }
228 
229     /// Get mutable access to the store data.
230     pub fn get(&mut self) -> D::Data<'_> {
231         (self.get_data)(self.data_mut())
232     }
233 
234     /// Spawn a background task.
235     ///
236     /// See [`Accessor::spawn`] for details.
237     pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238     where
239         T: 'static,
240     {
241         let accessor = Accessor {
242             get_data: self.get_data,
243             token: StoreToken::new(self.store.as_context_mut()),
244         };
245         self.store
246             .as_context_mut()
247             .spawn_with_accessor(accessor, task)
248     }
249 
250     /// Returns the getter this accessor is using to project from `T` into
251     /// `D::Data`.
252     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253         self.get_data
254     }
255 }
256 
257 impl<'a, T, D> AsContext for Access<'a, T, D>
258 where
259     D: HasData + ?Sized,
260     T: 'static,
261 {
262     type Data = T;
263 
264     fn as_context(&self) -> StoreContext<'_, T> {
265         self.store.as_context()
266     }
267 }
268 
269 impl<'a, T, D> AsContextMut for Access<'a, T, D>
270 where
271     D: HasData + ?Sized,
272     T: 'static,
273 {
274     fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275         self.store.as_context_mut()
276     }
277 }
278 
279 /// Provides scoped mutable access to store data in the context of a concurrent
280 /// host task future.
281 ///
282 /// This allows multiple host task futures to execute concurrently and access
283 /// the store between (but not across) `await` points.
284 ///
285 /// # Rationale
286 ///
287 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
289 /// literally store these values. The basic problem is that when a concurrent
290 /// host future is being polled it has access to `&mut T` (and the whole
291 /// `Store`) but when it's not being polled it does not have access to these
292 /// values. This reflects how the store is only ever polling one future at a
293 /// time so the store is effectively being passed between futures.
294 ///
295 /// Rust's `Future` trait, however, has no means of passing a `Store`
296 /// temporarily between futures. The [`Context`](std::task::Context) type does
297 /// not have the ability to attach arbitrary information to it at this time.
298 /// This type, [`Accessor`], is used to bridge this expressivity gap.
299 ///
300 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
301 /// a synchronous manner, the current store. The [`Accessor::with`] function
302 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304 /// not take an `async` closure as its argument, instead it's a synchronous
305 /// closure which must complete during on run of `Future::poll`. This reflects
306 /// how the store is temporarily made available while a host future is being
307 /// polled.
308 ///
309 /// # Implementation
310 ///
311 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312 /// this type additionally doesn't even have a lifetime parameter. This is
313 /// instead a representation of proof of the ability to acquire these while a
314 /// future is being polled. Wasmtime will, when it polls a host future,
315 /// configure ambient state such that the `Accessor` that a future closes over
316 /// will work and be able to access the store.
317 ///
318 /// This has a number of implications for users such as:
319 ///
320 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321 ///   the lifetime of a single future.
322 /// * A future is expected to, however, close over an `Accessor` and keep it
323 ///   alive probably for the duration of the entire future.
324 /// * Different host futures will be given different `Accessor`s, and that's
325 ///   intentional.
326 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327 ///   alleviates some otherwise required bounds to be written down.
328 ///
329 /// # Using `Accessor` in `Drop`
330 ///
331 /// The methods on `Accessor` are only expected to work in the context of
332 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333 /// host future can be dropped at any time throughout the system and Wasmtime
334 /// store context is not necessarily available at that time. It's recommended to
335 /// not use `Accessor` methods in anything connected to a `Drop` implementation
336 /// as they will panic and have unintended results. If you run into this though
337 /// feel free to file an issue on the Wasmtime repository.
338 pub struct Accessor<T: 'static, D = HasSelf<T>>
339 where
340     D: HasData + ?Sized,
341 {
342     token: StoreToken<T>,
343     get_data: fn(&mut T) -> D::Data<'_>,
344 }
345 
346 /// A helper trait to take any type of accessor-with-data in functions.
347 ///
348 /// This trait is similar to [`AsContextMut`] except that it's used when
349 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350 /// [`Accessor`] is the main type used in concurrent settings and is passed to
351 /// functions such as [`Func::call_concurrent`].
352 ///
353 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354 /// this trait. This effectively means that regardless of the `D` in
355 /// `Accessor<T, D>` it can still be passed to a function which just needs a
356 /// store accessor.
357 ///
358 /// Acquiring an [`Accessor`] can be done through
359 /// [`StoreContextMut::run_concurrent`] for example or in a host function
360 /// through
361 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362 pub trait AsAccessor {
363     /// The `T` in `Store<T>` that this accessor refers to.
364     type Data: 'static;
365 
366     /// The `D` in `Accessor<T, D>`, or the projection out of
367     /// `Self::Data`.
368     type AccessorData: HasData + ?Sized;
369 
370     /// Returns the accessor that this is referring to.
371     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372 }
373 
374 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375     type Data = T::Data;
376     type AccessorData = T::AccessorData;
377 
378     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379         T::as_accessor(self)
380     }
381 }
382 
383 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384     type Data = T;
385     type AccessorData = D;
386 
387     fn as_accessor(&self) -> &Accessor<T, D> {
388         self
389     }
390 }
391 
392 // Note that it is intentional at this time that `Accessor` does not actually
393 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
394 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395 // that matter). This is used to ergonomically simplify bindings where the
396 // majority of the time `Accessor` is closed over in a future which then needs
397 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398 // you already have to write `T: 'static`...) it helps to avoid this.
399 //
400 // Note as well that `Accessor` doesn't actually store its data at all. Instead
401 // it's more of a "proof" of what can be accessed from TLS. API design around
402 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
403 // intentionally made to ensure that `Accessor` is ideally only used in the
404 // context that TLS variables are actually set. For example host functions are
405 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
406 // the value outside of a future. Within the future the TLS variables are all
407 // guaranteed to be set while the future is being polled.
408 //
409 // Finally though this is not an ironclad guarantee, but nor does it need to be.
410 // The TLS APIs are designed to panic or otherwise model usage where they're
411 // called recursively or similar. It's hoped that code cannot be constructed to
412 // actually hit this at runtime but this is not a safety requirement at this
413 // time.
414 const _: () = {
415     const fn assert<T: Send + Sync>() {}
416     assert::<Accessor<UnsafeCell<u32>>>();
417 };
418 
419 impl<T> Accessor<T> {
420     /// Creates a new `Accessor` backed by the specified functions.
421     ///
422     /// - `get`: used to retrieve the store
423     ///
424     /// - `get_data`: used to "project" from the store's associated data to
425     /// another type (e.g. a field of that data or a wrapper around it).
426     ///
427     /// - `spawn`: used to queue spawned background tasks to be run later
428     pub(crate) fn new(token: StoreToken<T>) -> Self {
429         Self {
430             token,
431             get_data: |x| x,
432         }
433     }
434 }
435 
436 impl<T, D> Accessor<T, D>
437 where
438     D: HasData + ?Sized,
439 {
440     /// Run the specified closure, passing it mutable access to the store.
441     ///
442     /// This function is one of the main building blocks of the [`Accessor`]
443     /// type. This yields synchronous, blocking, access to the store via an
444     /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445     /// providing the ability to access `D` via [`Access::get`]. Note that the
446     /// `fun` here is given only temporary access to the store and `T`/`D`
447     /// meaning that the return value `R` here is not allowed to capture borrows
448     /// into the two. If access is needed to data within `T` or `D` outside of
449     /// this closure then it must be `clone`d out, for example.
450     ///
451     /// # Panics
452     ///
453     /// This function will panic if it is call recursively with any other
454     /// accessor already in scope. For example if `with` is called within `fun`,
455     /// then this function will panic. It is up to the embedder to ensure that
456     /// this does not happen.
457     pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458         tls::get(|vmstore| {
459             fun(Access {
460                 store: self.token.as_context_mut(vmstore),
461                 get_data: self.get_data,
462             })
463         })
464     }
465 
466     /// Returns the getter this accessor is using to project from `T` into
467     /// `D::Data`.
468     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469         self.get_data
470     }
471 
472     /// Changes this accessor to access `D2` instead of the current type
473     /// parameter `D`.
474     ///
475     /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476     ///
477     /// # Panics
478     ///
479     /// When using this API the returned value is disconnected from `&self` and
480     /// the lifetime binding the `self` argument. An `Accessor` only works
481     /// within the context of the closure or async closure that it was
482     /// originally given to, however. This means that due to the fact that the
483     /// returned value has no lifetime connection it's possible to use the
484     /// accessor outside of `&self`, the original accessor, and panic.
485     ///
486     /// The returned value should only be used within the scope of the original
487     /// `Accessor` that `self` refers to.
488     pub fn with_getter<D2: HasData>(
489         &self,
490         get_data: fn(&mut T) -> D2::Data<'_>,
491     ) -> Accessor<T, D2> {
492         Accessor {
493             token: self.token,
494             get_data,
495         }
496     }
497 
498     /// Spawn a background task which will receive an `&Accessor<T, D>` and
499     /// run concurrently with any other tasks in progress for the current
500     /// store.
501     ///
502     /// This is particularly useful for host functions which return a `stream`
503     /// or `future` such that the code to write to the write end of that
504     /// `stream` or `future` must run after the function returns.
505     ///
506     /// The returned [`JoinHandle`] may be used to cancel the task.
507     ///
508     /// # Panics
509     ///
510     /// Panics if called within a closure provided to the [`Accessor::with`]
511     /// function. This can only be called outside an active invocation of
512     /// [`Accessor::with`].
513     pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514     where
515         T: 'static,
516     {
517         let accessor = self.clone_for_spawn();
518         self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519     }
520 
521     fn clone_for_spawn(&self) -> Self {
522         Self {
523             token: self.token,
524             get_data: self.get_data,
525         }
526     }
527 }
528 
529 /// Represents a task which may be provided to `Accessor::spawn`,
530 /// `Accessor::forward`, or `StorecContextMut::spawn`.
531 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
532 // option.
533 //
534 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
535 // bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
536 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
537 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
538 // it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
539 // best we can do for the time being.
540 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541 where
542     D: HasData + ?Sized,
543 {
544     /// Run the task.
545     fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546 }
547 
548 /// Represents parameter and result metadata for the caller side of a
549 /// guest->guest call orchestrated by a fused adapter.
550 enum CallerInfo {
551     /// Metadata for a call to an async-lowered import
552     Async {
553         params: Vec<ValRaw>,
554         has_result: bool,
555     },
556     /// Metadata for a call to an sync-lowered import
557     Sync {
558         params: Vec<ValRaw>,
559         result_count: u32,
560     },
561 }
562 
563 /// Indicates how a guest task is waiting on a waitable set.
564 enum WaitMode {
565     /// The guest task is waiting using `task.wait`
566     Fiber(StoreFiber<'static>),
567     /// The guest task is waiting via a callback declared as part of an
568     /// async-lifted export.
569     Callback(Instance),
570 }
571 
572 /// Represents the reason a fiber is suspending itself.
573 #[derive(Debug)]
574 enum SuspendReason {
575     /// The fiber is waiting for an event to be delivered to the specified
576     /// waitable set or task.
577     Waiting {
578         set: TableId<WaitableSet>,
579         thread: QualifiedThreadId,
580         skip_may_block_check: bool,
581     },
582     /// The fiber has finished handling its most recent work item and is waiting
583     /// for another (or to be dropped if it is no longer needed).
584     NeedWork,
585     /// The fiber is yielding and should be resumed once other tasks have had a
586     /// chance to run.
587     Yielding {
588         thread: QualifiedThreadId,
589         skip_may_block_check: bool,
590     },
591     /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
592     ExplicitlySuspending {
593         thread: QualifiedThreadId,
594         skip_may_block_check: bool,
595     },
596 }
597 
598 /// Represents a pending call into guest code for a given guest task.
599 enum GuestCallKind {
600     /// Indicates there's an event to deliver to the task, possibly related to a
601     /// waitable set the task has been waiting on or polling.
602     DeliverEvent {
603         /// The instance to which the task belongs.
604         instance: Instance,
605         /// The waitable set the event belongs to, if any.
606         ///
607         /// If this is `None` the event will be waiting in the
608         /// `GuestTask::event` field for the task.
609         set: Option<TableId<WaitableSet>>,
610     },
611     /// Indicates that a new guest task call is pending and may be executed
612     /// using the specified closure.
613     ///
614     /// If the closure returns `Ok(Some(call))`, the `call` should be run
615     /// immediately using `handle_guest_call`.
616     StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617     StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618 }
619 
620 impl fmt::Debug for GuestCallKind {
621     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622         match self {
623             Self::DeliverEvent { instance, set } => f
624                 .debug_struct("DeliverEvent")
625                 .field("instance", instance)
626                 .field("set", set)
627                 .finish(),
628             Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629             Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630         }
631     }
632 }
633 
634 /// The target of a suspension intrinsic.
635 #[derive(Copy, Clone, Debug)]
636 pub enum SuspensionTarget {
637     SomeSuspended(u32),
638     Some(u32),
639     None,
640 }
641 
642 impl SuspensionTarget {
643     fn is_none(&self) -> bool {
644         matches!(self, SuspensionTarget::None)
645     }
646     fn is_some(&self) -> bool {
647         !self.is_none()
648     }
649 }
650 
651 /// Represents a pending call into guest code for a given guest thread.
652 #[derive(Debug)]
653 struct GuestCall {
654     thread: QualifiedThreadId,
655     kind: GuestCallKind,
656 }
657 
658 impl GuestCall {
659     /// Returns whether or not the call is ready to run.
660     ///
661     /// A call will not be ready to run if either:
662     ///
663     /// - the (sub-)component instance to be called has already been entered and
664     /// cannot be reentered until an in-progress call completes
665     ///
666     /// - the call is for a not-yet started task and the (sub-)component
667     /// instance to be called has backpressure enabled
668     fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669         let instance = store
670             .concurrent_state_mut()
671             .get_mut(self.thread.task)?
672             .instance;
673         let state = store.instance_state(instance).concurrent_state();
674 
675         let ready = match &self.kind {
676             GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677             GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678             GuestCallKind::StartExplicit(_) => true,
679         };
680         log::trace!(
681             "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682             state.do_not_enter,
683             state.backpressure
684         );
685         Ok(ready)
686     }
687 }
688 
689 /// Job to be run on a worker fiber.
690 enum WorkerItem {
691     GuestCall(GuestCall),
692     Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693 }
694 
695 /// Represents a pending work item to be handled by the event loop for a given
696 /// component instance.
697 enum WorkItem {
698     /// A host task to be pushed to `ConcurrentState::futures`.
699     PushFuture(AlwaysMut<HostTaskFuture>),
700     /// A fiber to resume.
701     ResumeFiber(StoreFiber<'static>),
702     /// A thread to resume.
703     ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704     /// A pending call into guest code for a given guest task.
705     GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706     /// A job to run on a worker fiber.
707     WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708 }
709 
710 impl fmt::Debug for WorkItem {
711     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712         match self {
713             Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714             Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715             Self::ResumeThread(instance, thread) => f
716                 .debug_tuple("ResumeThread")
717                 .field(instance)
718                 .field(thread)
719                 .finish(),
720             Self::GuestCall(instance, call) => f
721                 .debug_tuple("GuestCall")
722                 .field(instance)
723                 .field(call)
724                 .finish(),
725             Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726         }
727     }
728 }
729 
730 /// Whether a suspension intrinsic was cancelled or completed
731 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732 pub(crate) enum WaitResult {
733     Cancelled,
734     Completed,
735 }
736 
737 /// Poll the specified future until it completes on behalf of a guest->host call
738 /// using a sync-lowered import.
739 ///
740 /// This is similar to `Instance::first_poll` except it's for sync-lowered
741 /// imports, meaning we don't need to handle cancellation and we can block the
742 /// caller until the task completes, at which point the caller can handle
743 /// lowering the result to the guest's stack and linear memory.
744 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745     store: &mut dyn VMStore,
746     future: impl Future<Output = Result<R>> + Send + 'static,
747 ) -> Result<R> {
748     let state = store.concurrent_state_mut();
749     let task = state.unwrap_current_host_thread();
750 
751     // Wrap the future in a closure which will take care of stashing the result
752     // in `GuestTask::result` and resuming this fiber when the host task
753     // completes.
754     let mut future = Box::pin(async move {
755         let result = future.await?;
756         tls::get(move |store| {
757             let state = store.concurrent_state_mut();
758             let host_state = &mut state.get_mut(task)?.state;
759             assert!(matches!(host_state, HostTaskState::CalleeStarted));
760             *host_state = HostTaskState::CalleeFinished(Box::new(result));
761 
762             Waitable::Host(task).set_event(
763                 state,
764                 Some(Event::Subtask {
765                     status: Status::Returned,
766                 }),
767             )?;
768 
769             Ok(())
770         })
771     }) as HostTaskFuture;
772 
773     // Finally, poll the future.  We can use a dummy `Waker` here because we'll
774     // add the future to `ConcurrentState::futures` and poll it automatically
775     // from the event loop if it doesn't complete immediately here.
776     let poll = tls::set(store, || {
777         future
778             .as_mut()
779             .poll(&mut Context::from_waker(&Waker::noop()))
780     });
781 
782     match poll {
783         // It completed immediately; check the result and delete the task.
784         Poll::Ready(result) => result?,
785 
786         // It did not complete immediately; add it to
787         // `ConcurrentState::futures` so it will be polled via the event loop;
788         // then use `GuestTask::sync_call_set` to wait for the task to
789         // complete, suspending the current fiber until it does so.
790         Poll::Pending => {
791             let state = store.concurrent_state_mut();
792             state.push_future(future);
793 
794             let caller = state.get_mut(task)?.caller;
795             let set = state.get_mut(caller.task)?.sync_call_set;
796             Waitable::Host(task).join(state, Some(set))?;
797 
798             store.suspend(SuspendReason::Waiting {
799                 set,
800                 thread: caller,
801                 skip_may_block_check: false,
802             })?;
803 
804             // Remove the `task` from the `sync_call_set` to ensure that when
805             // this function returns and the task is deleted that there are no
806             // more lingering references to this host task.
807             Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808         }
809     }
810 
811     // Retrieve and return the result.
812     let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813     match mem::replace(host_state, HostTaskState::CalleeDone) {
814         HostTaskState::CalleeFinished(result) => Ok(*result.downcast().unwrap()),
815         _ => panic!("unexpected host task state after completion"),
816     }
817 }
818 
819 /// Execute the specified guest call.
820 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
821     let mut next = Some(call);
822     while let Some(call) = next.take() {
823         match call.kind {
824             GuestCallKind::DeliverEvent { instance, set } => {
825                 let (event, waitable) = instance
826                     .get_event(store, call.thread.task, set, true)?
827                     .unwrap();
828                 let state = store.concurrent_state_mut();
829                 let task = state.get_mut(call.thread.task)?;
830                 let runtime_instance = task.instance;
831                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
832 
833                 log::trace!(
834                     "use callback to deliver event {event:?} to {:?} for {waitable:?}",
835                     call.thread,
836                 );
837 
838                 let old_thread = store.set_thread(call.thread);
839                 log::trace!(
840                     "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
841                     call.thread
842                 );
843 
844                 store.enter_instance(runtime_instance);
845 
846                 let callback = store
847                     .concurrent_state_mut()
848                     .get_mut(call.thread.task)?
849                     .callback
850                     .take()
851                     .unwrap();
852 
853                 let code = callback(store, event, handle)?;
854 
855                 store
856                     .concurrent_state_mut()
857                     .get_mut(call.thread.task)?
858                     .callback = Some(callback);
859 
860                 store.exit_instance(runtime_instance)?;
861 
862                 store.set_thread(old_thread);
863 
864                 next = instance.handle_callback_code(
865                     store,
866                     call.thread,
867                     runtime_instance.index,
868                     code,
869                 )?;
870 
871                 log::trace!(
872                     "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
873                 );
874             }
875             GuestCallKind::StartImplicit(fun) => {
876                 next = fun(store)?;
877             }
878             GuestCallKind::StartExplicit(fun) => {
879                 fun(store)?;
880             }
881         }
882     }
883 
884     Ok(())
885 }
886 
887 impl<T> Store<T> {
888     /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
889     pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
890     where
891         T: Send + 'static,
892     {
893         ensure!(
894             self.as_context().0.concurrency_support(),
895             "cannot use `run_concurrent` when Config::concurrency_support disabled",
896         );
897         self.as_context_mut().run_concurrent(fun).await
898     }
899 
900     #[doc(hidden)]
901     pub fn assert_concurrent_state_empty(&mut self) {
902         self.as_context_mut().assert_concurrent_state_empty();
903     }
904 
905     #[doc(hidden)]
906     pub fn concurrent_state_table_size(&mut self) -> usize {
907         self.as_context_mut().concurrent_state_table_size()
908     }
909 
910     /// Convenience wrapper for [`StoreContextMut::spawn`].
911     pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
912     where
913         T: 'static,
914     {
915         self.as_context_mut().spawn(task)
916     }
917 }
918 
919 impl<T> StoreContextMut<'_, T> {
920     /// Assert that all the relevant tables and queues in the concurrent state
921     /// for this store are empty.
922     ///
923     /// This is for sanity checking in integration tests
924     /// (e.g. `component-async-tests`) that the relevant state has been cleared
925     /// after each test concludes.  This should help us catch leaks, e.g. guest
926     /// tasks which haven't been deleted despite having completed and having
927     /// been dropped by their supertasks.
928     ///
929     /// Only intended for use in Wasmtime's own testing.
930     #[doc(hidden)]
931     pub fn assert_concurrent_state_empty(self) {
932         let store = self.0;
933         store
934             .store_data_mut()
935             .components
936             .assert_instance_states_empty();
937         let state = store.concurrent_state_mut();
938         assert!(
939             state.table.get_mut().is_empty(),
940             "non-empty table: {:?}",
941             state.table.get_mut()
942         );
943         assert!(state.high_priority.is_empty());
944         assert!(state.low_priority.is_empty());
945         assert!(state.current_thread.is_none());
946         assert!(state.futures.get_mut().as_ref().unwrap().is_empty());
947         assert!(state.global_error_context_ref_counts.is_empty());
948     }
949 
950     /// Helper function to perform tests over the size of the concurrent state
951     /// table which can be useful for detecting leaks.
952     ///
953     /// Only intended for use in Wasmtime's own testing.
954     #[doc(hidden)]
955     pub fn concurrent_state_table_size(&mut self) -> usize {
956         self.0
957             .concurrent_state_mut()
958             .table
959             .get_mut()
960             .iter_mut()
961             .count()
962     }
963 
964     /// Spawn a background task to run as part of this instance's event loop.
965     ///
966     /// The task will receive an `&Accessor<U>` and run concurrently with
967     /// any other tasks in progress for the instance.
968     ///
969     /// Note that the task will only make progress if and when the event loop
970     /// for this instance is run.
971     ///
972     /// The returned [`JoinHandle`] may be used to cancel the task.
973     pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
974     where
975         T: 'static,
976     {
977         let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
978         self.spawn_with_accessor(accessor, task)
979     }
980 
981     /// Internal implementation of `spawn` functions where a `store` is
982     /// available along with an `Accessor`.
983     fn spawn_with_accessor<D>(
984         self,
985         accessor: Accessor<T, D>,
986         task: impl AccessorTask<T, D>,
987     ) -> JoinHandle
988     where
989         T: 'static,
990         D: HasData + ?Sized,
991     {
992         // Create an "abortable future" here where internally the future will
993         // hook calls to poll and possibly spawn more background tasks on each
994         // iteration.
995         let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
996         self.0
997             .concurrent_state_mut()
998             .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
999         handle
1000     }
1001 
1002     /// Run the specified closure `fun` to completion as part of this store's
1003     /// event loop.
1004     ///
1005     /// This will run `fun` as part of this store's event loop until it
1006     /// yields a result.  `fun` is provided an [`Accessor`], which provides
1007     /// controlled access to the store and its data.
1008     ///
1009     /// This function can be used to invoke [`Func::call_concurrent`] for
1010     /// example within the async closure provided here.
1011     ///
1012     /// This function will unconditionally return an error if
1013     /// [`Config::concurrency_support`] is disabled.
1014     ///
1015     /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1016     ///
1017     /// # Store-blocking behavior
1018     ///
1019     /// At this time there are certain situations in which the `Future` returned
1020     /// by the `AsyncFnOnce` passed to this function will not be polled for an
1021     /// extended period of time, despite one or more `Waker::wake` events having
1022     /// occurred for the task to which it belongs.  This can manifest as the
1023     /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1024     /// the `Store` being held by e.g. a blocking host function, preventing the
1025     /// `Future` from being polled. A canonical example of this is when the
1026     /// `fun` provided to this function attempts to set a timeout for an
1027     /// invocation of a wasm function. In this situation the async closure is
1028     /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1029     /// to elapse. At this time this setup will not always work and the timeout
1030     /// may not reliably fire.
1031     ///
1032     /// This function will not block the current thread and as such is always
1033     /// suitable to run in an `async` context, but the current implementation of
1034     /// Wasmtime can lead to situations where a certain wasm computation is
1035     /// required to make progress the closure to make progress. This is an
1036     /// artifact of Wasmtime's historical implementation of `async` functions
1037     /// and is the topic of [#11869] and [#11870]. In the timeout example from
1038     /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1039     /// progress for a readiness notification of (b) to get delivered.
1040     ///
1041     /// This effectively means that it's not possible to reliably perform a
1042     /// "select" operation within the `fun` closure, which timeouts for example
1043     /// are based on. Fixing this requires some relatively major refactoring
1044     /// work within Wasmtime itself. This is a known pitfall otherwise and one
1045     /// that is intended to be fixed one day. In the meantime it's recommended
1046     /// to apply timeouts or such to the entire `run_concurrent` call itself
1047     /// rather than internally.
1048     ///
1049     /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1050     /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1051     ///
1052     /// # Example
1053     ///
1054     /// ```
1055     /// # use {
1056     /// #   wasmtime::{
1057     /// #     error::{Result},
1058     /// #     component::{ Component, Linker, Resource, ResourceTable},
1059     /// #     Config, Engine, Store
1060     /// #   },
1061     /// # };
1062     /// #
1063     /// # struct MyResource(u32);
1064     /// # struct Ctx { table: ResourceTable }
1065     /// #
1066     /// # async fn foo() -> Result<()> {
1067     /// # let mut config = Config::new();
1068     /// # let engine = Engine::new(&config)?;
1069     /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1070     /// # let mut linker = Linker::new(&engine);
1071     /// # let component = Component::new(&engine, "")?;
1072     /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1073     /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1074     /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1075     /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1076     ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1077     ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1078     ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1079     ///    bar.call_concurrent(accessor, (value.0,)).await?;
1080     ///    Ok(())
1081     /// }).await??;
1082     /// # Ok(())
1083     /// # }
1084     /// ```
1085     pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1086     where
1087         T: Send + 'static,
1088     {
1089         ensure!(
1090             self.0.concurrency_support(),
1091             "cannot use `run_concurrent` when Config::concurrency_support disabled",
1092         );
1093         self.do_run_concurrent(fun, false).await
1094     }
1095 
1096     pub(super) async fn run_concurrent_trap_on_idle<R>(
1097         self,
1098         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1099     ) -> Result<R>
1100     where
1101         T: Send + 'static,
1102     {
1103         self.do_run_concurrent(fun, true).await
1104     }
1105 
1106     async fn do_run_concurrent<R>(
1107         mut self,
1108         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1109         trap_on_idle: bool,
1110     ) -> Result<R>
1111     where
1112         T: Send + 'static,
1113     {
1114         debug_assert!(self.0.concurrency_support());
1115         check_recursive_run();
1116         let token = StoreToken::new(self.as_context_mut());
1117 
1118         struct Dropper<'a, T: 'static, V> {
1119             store: StoreContextMut<'a, T>,
1120             value: ManuallyDrop<V>,
1121         }
1122 
1123         impl<'a, T, V> Drop for Dropper<'a, T, V> {
1124             fn drop(&mut self) {
1125                 tls::set(self.store.0, || {
1126                     // SAFETY: Here we drop the value without moving it for the
1127                     // first and only time -- per the contract for `Drop::drop`,
1128                     // this code won't run again, and the `value` field will no
1129                     // longer be accessible.
1130                     unsafe { ManuallyDrop::drop(&mut self.value) }
1131                 });
1132             }
1133         }
1134 
1135         let accessor = &Accessor::new(token);
1136         let dropper = &mut Dropper {
1137             store: self,
1138             value: ManuallyDrop::new(fun(accessor)),
1139         };
1140         // SAFETY: We never move `dropper` nor its `value` field.
1141         let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1142 
1143         dropper
1144             .store
1145             .as_context_mut()
1146             .poll_until(future, trap_on_idle)
1147             .await
1148     }
1149 
1150     /// Run this store's event loop.
1151     ///
1152     /// The returned future will resolve when the specified future completes or,
1153     /// if `trap_on_idle` is true, when the event loop can't make further
1154     /// progress.
1155     async fn poll_until<R>(
1156         mut self,
1157         mut future: Pin<&mut impl Future<Output = R>>,
1158         trap_on_idle: bool,
1159     ) -> Result<R>
1160     where
1161         T: Send + 'static,
1162     {
1163         struct Reset<'a, T: 'static> {
1164             store: StoreContextMut<'a, T>,
1165             futures: Option<FuturesUnordered<HostTaskFuture>>,
1166         }
1167 
1168         impl<'a, T> Drop for Reset<'a, T> {
1169             fn drop(&mut self) {
1170                 if let Some(futures) = self.futures.take() {
1171                     *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1172                 }
1173             }
1174         }
1175 
1176         loop {
1177             // Take `ConcurrentState::futures` out of the store so we can poll
1178             // it while also safely giving any of the futures inside access to
1179             // `self`.
1180             let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1181             let mut reset = Reset {
1182                 store: self.as_context_mut(),
1183                 futures,
1184             };
1185             let mut next = pin!(reset.futures.as_mut().unwrap().next());
1186 
1187             enum PollResult<R> {
1188                 Complete(R),
1189                 ProcessWork(Vec<WorkItem>),
1190             }
1191             let result = future::poll_fn(|cx| {
1192                 // First, poll the future we were passed as an argument and
1193                 // return immediately if it's ready.
1194                 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1195                     return Poll::Ready(Ok(PollResult::Complete(value)));
1196                 }
1197 
1198                 // Next, poll `ConcurrentState::futures` (which includes any
1199                 // pending host tasks and/or background tasks), returning
1200                 // immediately if one of them fails.
1201                 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1202                     Poll::Ready(Some(output)) => {
1203                         match output {
1204                             Err(e) => return Poll::Ready(Err(e)),
1205                             Ok(()) => {}
1206                         }
1207                         Poll::Ready(true)
1208                     }
1209                     Poll::Ready(None) => Poll::Ready(false),
1210                     Poll::Pending => Poll::Pending,
1211                 };
1212 
1213                 // Next, collect the next batch of work items to process, if any.
1214                 // This will be either all of the high-priority work items, or if
1215                 // there are none, a single low-priority work item.
1216                 let state = reset.store.0.concurrent_state_mut();
1217                 let ready = state.collect_work_items_to_run();
1218                 if !ready.is_empty() {
1219                     return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1220                 }
1221 
1222                 // Finally, if we have nothing else to do right now, determine what to do
1223                 // based on whether there are any pending futures in
1224                 // `ConcurrentState::futures`.
1225                 return match next {
1226                     Poll::Ready(true) => {
1227                         // In this case, one of the futures in
1228                         // `ConcurrentState::futures` completed
1229                         // successfully, so we return now and continue
1230                         // the outer loop in case there is another one
1231                         // ready to complete.
1232                         Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1233                     }
1234                     Poll::Ready(false) => {
1235                         // Poll the future we were passed one last time
1236                         // in case one of `ConcurrentState::futures` had
1237                         // the side effect of unblocking it.
1238                         if let Poll::Ready(value) =
1239                             tls::set(reset.store.0, || future.as_mut().poll(cx))
1240                         {
1241                             Poll::Ready(Ok(PollResult::Complete(value)))
1242                         } else {
1243                             // In this case, there are no more pending
1244                             // futures in `ConcurrentState::futures`,
1245                             // there are no remaining work items, _and_
1246                             // the future we were passed as an argument
1247                             // still hasn't completed.
1248                             if trap_on_idle {
1249                                 // `trap_on_idle` is true, so we exit
1250                                 // immediately.
1251                                 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock)))
1252                             } else {
1253                                 // `trap_on_idle` is false, so we assume
1254                                 // that future will wake up and give us
1255                                 // more work to do when it's ready to.
1256                                 Poll::Pending
1257                             }
1258                         }
1259                     }
1260                     // There is at least one pending future in
1261                     // `ConcurrentState::futures` and we have nothing
1262                     // else to do but wait for now, so we return
1263                     // `Pending`.
1264                     Poll::Pending => Poll::Pending,
1265                 };
1266             })
1267             .await;
1268 
1269             // Put the `ConcurrentState::futures` back into the store before we
1270             // return or handle any work items since one or more of those items
1271             // might append more futures.
1272             drop(reset);
1273 
1274             match result? {
1275                 // The future we were passed as an argument completed, so we
1276                 // return the result.
1277                 PollResult::Complete(value) => break Ok(value),
1278                 // The future we were passed has not yet completed, so handle
1279                 // any work items and then loop again.
1280                 PollResult::ProcessWork(ready) => {
1281                     struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1282                         store: StoreContextMut<'a, T>,
1283                         ready: I,
1284                     }
1285 
1286                     impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1287                         fn drop(&mut self) {
1288                             while let Some(item) = self.ready.next() {
1289                                 match item {
1290                                     WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1291                                     WorkItem::PushFuture(future) => {
1292                                         tls::set(self.store.0, move || drop(future))
1293                                     }
1294                                     _ => {}
1295                                 }
1296                             }
1297                         }
1298                     }
1299 
1300                     let mut dispose = Dispose {
1301                         store: self.as_context_mut(),
1302                         ready: ready.into_iter(),
1303                     };
1304 
1305                     while let Some(item) = dispose.ready.next() {
1306                         dispose
1307                             .store
1308                             .as_context_mut()
1309                             .handle_work_item(item)
1310                             .await?;
1311                     }
1312                 }
1313             }
1314         }
1315     }
1316 
1317     /// Handle the specified work item, possibly resuming a fiber if applicable.
1318     async fn handle_work_item(self, item: WorkItem) -> Result<()>
1319     where
1320         T: Send,
1321     {
1322         log::trace!("handle work item {item:?}");
1323         match item {
1324             WorkItem::PushFuture(future) => {
1325                 self.0
1326                     .concurrent_state_mut()
1327                     .futures
1328                     .get_mut()
1329                     .as_mut()
1330                     .unwrap()
1331                     .push(future.into_inner());
1332             }
1333             WorkItem::ResumeFiber(fiber) => {
1334                 self.0.resume_fiber(fiber).await?;
1335             }
1336             WorkItem::ResumeThread(_, thread) => {
1337                 if let GuestThreadState::Ready(fiber) = mem::replace(
1338                     &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1339                     GuestThreadState::Running,
1340                 ) {
1341                     self.0.resume_fiber(fiber).await?;
1342                 } else {
1343                     bail!("cannot resume non-pending thread {thread:?}");
1344                 }
1345             }
1346             WorkItem::GuestCall(_, call) => {
1347                 if call.is_ready(self.0)? {
1348                     self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1349                 } else {
1350                     let state = self.0.concurrent_state_mut();
1351                     let task = state.get_mut(call.thread.task)?;
1352                     if !task.starting_sent {
1353                         task.starting_sent = true;
1354                         if let GuestCallKind::StartImplicit(_) = &call.kind {
1355                             Waitable::Guest(call.thread.task).set_event(
1356                                 state,
1357                                 Some(Event::Subtask {
1358                                     status: Status::Starting,
1359                                 }),
1360                             )?;
1361                         }
1362                     }
1363 
1364                     let instance = state.get_mut(call.thread.task)?.instance;
1365                     self.0
1366                         .instance_state(instance)
1367                         .concurrent_state()
1368                         .pending
1369                         .insert(call.thread, call.kind);
1370                 }
1371             }
1372             WorkItem::WorkerFunction(fun) => {
1373                 self.run_on_worker(WorkerItem::Function(fun)).await?;
1374             }
1375         }
1376 
1377         Ok(())
1378     }
1379 
1380     /// Execute the specified guest call on a worker fiber.
1381     async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1382     where
1383         T: Send,
1384     {
1385         let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1386             fiber
1387         } else {
1388             fiber::make_fiber(self.0, move |store| {
1389                 loop {
1390                     match store.concurrent_state_mut().worker_item.take().unwrap() {
1391                         WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1392                         WorkerItem::Function(fun) => fun.into_inner()(store)?,
1393                     }
1394 
1395                     store.suspend(SuspendReason::NeedWork)?;
1396                 }
1397             })?
1398         };
1399 
1400         let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1401         assert!(worker_item.is_none());
1402         *worker_item = Some(item);
1403 
1404         self.0.resume_fiber(worker).await
1405     }
1406 
1407     /// Wrap the specified host function in a future which will call it, passing
1408     /// it an `&Accessor<T>`.
1409     ///
1410     /// See the `Accessor` documentation for details.
1411     pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1412     where
1413         T: 'static,
1414         F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1415             + Send
1416             + Sync
1417             + 'static,
1418         R: Send + Sync + 'static,
1419     {
1420         let token = StoreToken::new(self);
1421         async move {
1422             let mut accessor = Accessor::new(token);
1423             closure(&mut accessor).await
1424         }
1425     }
1426 }
1427 
1428 impl StoreOpaque {
1429     /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1430     /// guest-to-guest call or a sync host-to-guest call.
1431     ///
1432     /// This task will only be used for the purpose of handling calls to
1433     /// intrinsic functions; both parameter lowering and result lifting are
1434     /// assumed to be taken care of elsewhere.
1435     pub(crate) fn enter_guest_sync_call(
1436         &mut self,
1437         guest_caller: Option<RuntimeInstance>,
1438         callee_async: bool,
1439         callee: RuntimeInstance,
1440     ) -> Result<()> {
1441         log::trace!("enter sync call {callee:?}");
1442         if !self.concurrency_support() {
1443             return Ok(self.enter_call_not_concurrent());
1444         }
1445 
1446         let state = self.concurrent_state_mut();
1447         let thread = state.current_thread;
1448         let instance = if let Some(thread) = thread.guest() {
1449             Some(state.get_mut(thread.task)?.instance)
1450         } else {
1451             None
1452         };
1453         let task = GuestTask::new(
1454             state,
1455             Box::new(move |_, _| unreachable!()),
1456             LiftResult {
1457                 lift: Box::new(move |_, _| unreachable!()),
1458                 ty: TypeTupleIndex::reserved_value(),
1459                 memory: None,
1460                 string_encoding: StringEncoding::Utf8,
1461             },
1462             if let Some(caller) = guest_caller {
1463                 assert_eq!(caller, instance.unwrap());
1464                 Caller::Guest {
1465                     thread: *thread.guest().unwrap(),
1466                 }
1467             } else {
1468                 Caller::Host {
1469                     tx: None,
1470                     host_future_present: false,
1471                     caller: thread,
1472                 }
1473             },
1474             None,
1475             callee,
1476             callee_async,
1477         )?;
1478 
1479         let guest_task = state.push(task)?;
1480         let new_thread = GuestThread::new_implicit(guest_task);
1481         let guest_thread = state.push(new_thread)?;
1482         Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1483             guest_thread,
1484             self,
1485             callee.index,
1486         )?;
1487 
1488         let state = self.concurrent_state_mut();
1489         state.get_mut(guest_task)?.threads.insert(guest_thread);
1490 
1491         self.set_thread(QualifiedThreadId {
1492             task: guest_task,
1493             thread: guest_thread,
1494         });
1495 
1496         Ok(())
1497     }
1498 
1499     /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
1500     pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> {
1501         if !self.concurrency_support() {
1502             return Ok(self.exit_call_not_concurrent());
1503         }
1504         let thread = *self.set_thread(CurrentThread::None).guest().unwrap();
1505         let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1506         log::trace!("exit sync call {instance:?}");
1507         Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1508             self,
1509             thread,
1510             instance.index,
1511         )?;
1512 
1513         let state = self.concurrent_state_mut();
1514         let task = state.get_mut(thread.task)?;
1515         let caller = match &task.caller {
1516             &Caller::Guest { thread } => {
1517                 assert!(guest_caller);
1518                 thread.into()
1519             }
1520             &Caller::Host { caller, .. } => {
1521                 assert!(!guest_caller);
1522                 caller
1523             }
1524         };
1525         self.set_thread(caller);
1526 
1527         let state = self.concurrent_state_mut();
1528         let task = state.get_mut(thread.task)?;
1529         if task.ready_to_delete() {
1530             state.delete(thread.task)?.dispose(state)?;
1531         }
1532 
1533         Ok(())
1534     }
1535 
1536     /// Similar to `enter_guest_sync_call` except for when the guest makes a
1537     /// transition to the host.
1538     ///
1539     /// FIXME: this is called for all guest->host transitions and performs some
1540     /// relatively expensive table manipulations. This would ideally be
1541     /// optimized to avoid the full allocation of a `HostTask` in at least some
1542     /// situations.
1543     pub fn enter_host_call(&mut self) -> Result<()> {
1544         if !self.concurrency_support() {
1545             self.enter_call_not_concurrent();
1546             return Ok(());
1547         }
1548         let state = self.concurrent_state_mut();
1549         let caller = state.unwrap_current_guest_thread();
1550         let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1551         log::trace!("new host task {task:?}");
1552         self.set_thread(task);
1553         Ok(())
1554     }
1555 
1556     /// Dual of `enter_host_call` and signifies that the host has finished and
1557     /// will be cleaned up.
1558     ///
1559     /// Note that this isn't invoked when the host is invoked asynchronously and
1560     /// the host isn't complete yet. In that situation the host task persists
1561     /// and will be cleaned up separately.
1562     pub fn exit_host_call(&mut self) -> Result<()> {
1563         if !self.concurrency_support() {
1564             self.exit_call_not_concurrent();
1565             return Ok(());
1566         }
1567         let task = self.concurrent_state_mut().unwrap_current_host_thread();
1568         log::trace!("delete host task {task:?}");
1569         let task = self.concurrent_state_mut().delete(task)?;
1570         self.set_thread(task.caller);
1571         Ok(())
1572     }
1573 
1574     /// Determine whether the specified instance may be entered from the host.
1575     ///
1576     /// We return `true` here only if all of the following hold:
1577     ///
1578     /// - The top-level instance is not already on the current task's call stack.
1579     /// - The instance is not in need of a post-return function call.
1580     /// - `self` has not been poisoned due to a trap.
1581     pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
1582         if self.trapped() {
1583             return false;
1584         }
1585         if !self.concurrency_support() {
1586             return true;
1587         }
1588         let state = self.concurrent_state_mut();
1589         let mut cur = state.current_thread;
1590         loop {
1591             match cur {
1592                 CurrentThread::None => break true,
1593                 CurrentThread::Guest(thread) => {
1594                     let task = state.get_mut(thread.task).unwrap();
1595 
1596                     // Note that we only compare top-level instance IDs here.
1597                     // The idea is that the host is not allowed to recursively
1598                     // enter a top-level instance even if the specific leaf
1599                     // instance is not on the stack. This the behavior defined
1600                     // in the spec, and it allows us to elide runtime checks in
1601                     // guest-to-guest adapters.
1602                     if task.instance.instance == instance.instance {
1603                         break false;
1604                     }
1605                     cur = match task.caller {
1606                         Caller::Host { caller, .. } => caller,
1607                         Caller::Guest { thread } => thread.into(),
1608                     };
1609                 }
1610                 CurrentThread::Host(id) => {
1611                     cur = state.get_mut(id).unwrap().caller.into();
1612                 }
1613             }
1614         }
1615     }
1616 
1617     /// Helper function to retrieve the `InstanceState` for the
1618     /// specified instance.
1619     fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1620         self.component_instance_mut(instance.instance)
1621             .instance_state(instance.index)
1622     }
1623 
1624     fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread {
1625         // Each time we switch threads, we conservatively set `task_may_block`
1626         // to `false` for the component instance we're switching away from (if
1627         // any), meaning it will be `false` for any new thread created for that
1628         // instance unless explicitly set otherwise.
1629         let state = self.concurrent_state_mut();
1630         let old_thread = mem::replace(&mut state.current_thread, thread.into());
1631         if let Some(old_thread) = old_thread.guest() {
1632             let instance = state.get_mut(old_thread.task).unwrap().instance.instance;
1633             self.component_instance_mut(instance)
1634                 .set_task_may_block(false)
1635         }
1636 
1637         // If we're switching to a new thread, set its component instance's
1638         // `task_may_block` according to where it left off.
1639         if self.concurrent_state_mut().current_thread.guest().is_some() {
1640             self.set_task_may_block();
1641         }
1642 
1643         old_thread
1644     }
1645 
1646     /// Set the global variable representing whether the current task may block
1647     /// prior to entering Wasm code.
1648     fn set_task_may_block(&mut self) {
1649         let state = self.concurrent_state_mut();
1650         let guest_thread = state.unwrap_current_guest_thread();
1651         let instance = state.get_mut(guest_thread.task).unwrap().instance.instance;
1652         let may_block = self.concurrent_state_mut().may_block(guest_thread.task);
1653         self.component_instance_mut(instance)
1654             .set_task_may_block(may_block)
1655     }
1656 
1657     pub(crate) fn check_blocking(&mut self) -> Result<()> {
1658         if !self.concurrency_support() {
1659             return Ok(());
1660         }
1661         let state = self.concurrent_state_mut();
1662         let task = state.unwrap_current_guest_thread().task;
1663         let instance = state.get_mut(task).unwrap().instance.instance;
1664         let task_may_block = self.component_instance(instance).get_task_may_block();
1665 
1666         if task_may_block {
1667             Ok(())
1668         } else {
1669             Err(Trap::CannotBlockSyncTask.into())
1670         }
1671     }
1672 
1673     /// Record that we're about to enter a (sub-)component instance which does
1674     /// not support more than one concurrent, stackful activation, meaning it
1675     /// cannot be entered again until the next call returns.
1676     fn enter_instance(&mut self, instance: RuntimeInstance) {
1677         log::trace!("enter {instance:?}");
1678         self.instance_state(instance)
1679             .concurrent_state()
1680             .do_not_enter = true;
1681     }
1682 
1683     /// Record that we've exited a (sub-)component instance previously entered
1684     /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1685     /// See the documentation for the latter for details.
1686     fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1687         log::trace!("exit {instance:?}");
1688         self.instance_state(instance)
1689             .concurrent_state()
1690             .do_not_enter = false;
1691         self.partition_pending(instance)
1692     }
1693 
1694     /// Iterate over `InstanceState::pending`, moving any ready items into the
1695     /// "high priority" work item queue.
1696     ///
1697     /// See `GuestCall::is_ready` for details.
1698     fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1699         for (thread, kind) in
1700             mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1701         {
1702             let call = GuestCall { thread, kind };
1703             if call.is_ready(self)? {
1704                 self.concurrent_state_mut()
1705                     .push_high_priority(WorkItem::GuestCall(instance.index, call));
1706             } else {
1707                 self.instance_state(instance)
1708                     .concurrent_state()
1709                     .pending
1710                     .insert(call.thread, call.kind);
1711             }
1712         }
1713 
1714         Ok(())
1715     }
1716 
1717     /// Implements the `backpressure.{inc,dec}` intrinsics.
1718     pub(crate) fn backpressure_modify(
1719         &mut self,
1720         caller_instance: RuntimeInstance,
1721         modify: impl FnOnce(u16) -> Option<u16>,
1722     ) -> Result<()> {
1723         let state = self.instance_state(caller_instance).concurrent_state();
1724         let old = state.backpressure;
1725         let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?;
1726         state.backpressure = new;
1727 
1728         if old > 0 && new == 0 {
1729             // Backpressure was previously enabled and is now disabled; move any
1730             // newly-eligible guest calls to the "high priority" queue.
1731             self.partition_pending(caller_instance)?;
1732         }
1733 
1734         Ok(())
1735     }
1736 
1737     /// Resume the specified fiber, giving it exclusive access to the specified
1738     /// store.
1739     async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1740         let old_thread = self.concurrent_state_mut().current_thread;
1741         log::trace!("resume_fiber: save current thread {old_thread:?}");
1742 
1743         let fiber = fiber::resolve_or_release(self, fiber).await?;
1744 
1745         self.set_thread(old_thread);
1746 
1747         let state = self.concurrent_state_mut();
1748 
1749         if let Some(ot) = old_thread.guest() {
1750             state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1751         }
1752         log::trace!("resume_fiber: restore current thread {old_thread:?}");
1753 
1754         if let Some(mut fiber) = fiber {
1755             log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1756             // See the `SuspendReason` documentation for what each case means.
1757             match state.suspend_reason.take().unwrap() {
1758                 SuspendReason::NeedWork => {
1759                     if state.worker.is_none() {
1760                         state.worker = Some(fiber);
1761                     } else {
1762                         fiber.dispose(self);
1763                     }
1764                 }
1765                 SuspendReason::Yielding { thread, .. } => {
1766                     state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1767                     let instance = state.get_mut(thread.task)?.instance.index;
1768                     state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1769                 }
1770                 SuspendReason::ExplicitlySuspending { thread, .. } => {
1771                     state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1772                 }
1773                 SuspendReason::Waiting { set, thread, .. } => {
1774                     let old = state
1775                         .get_mut(set)?
1776                         .waiting
1777                         .insert(thread, WaitMode::Fiber(fiber));
1778                     assert!(old.is_none());
1779                 }
1780             };
1781         } else {
1782             log::trace!("resume_fiber: fiber has exited");
1783         }
1784 
1785         Ok(())
1786     }
1787 
1788     /// Suspend the current fiber, storing the reason in
1789     /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1790     /// it should be resumed.
1791     ///
1792     /// See the `SuspendReason` documentation for details.
1793     fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1794         log::trace!("suspend fiber: {reason:?}");
1795 
1796         // If we're yielding or waiting on behalf of a guest thread, we'll need to
1797         // pop the call context which manages resource borrows before suspending
1798         // and then push it again once we've resumed.
1799         let task = match &reason {
1800             SuspendReason::Yielding { thread, .. }
1801             | SuspendReason::Waiting { thread, .. }
1802             | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1803             SuspendReason::NeedWork => None,
1804         };
1805 
1806         let old_guest_thread = if task.is_some() {
1807             self.concurrent_state_mut().current_thread
1808         } else {
1809             CurrentThread::None
1810         };
1811 
1812         // We should not have reached here unless either there's no current
1813         // task, or the current task is permitted to block.  In addition, we
1814         // special-case `thread.switch-to` and waiting for a subtask to go from
1815         // `starting` to `started`, both of which we consider non-blocking
1816         // operations despite requiring a suspend.
1817         assert!(
1818             matches!(
1819                 reason,
1820                 SuspendReason::ExplicitlySuspending {
1821                     skip_may_block_check: true,
1822                     ..
1823                 } | SuspendReason::Waiting {
1824                     skip_may_block_check: true,
1825                     ..
1826                 } | SuspendReason::Yielding {
1827                     skip_may_block_check: true,
1828                     ..
1829                 }
1830             ) || old_guest_thread
1831                 .guest()
1832                 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1833                 .unwrap_or(true)
1834         );
1835 
1836         let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1837         assert!(suspend_reason.is_none());
1838         *suspend_reason = Some(reason);
1839 
1840         self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1841 
1842         if task.is_some() {
1843             self.set_thread(old_guest_thread);
1844         }
1845 
1846         Ok(())
1847     }
1848 
1849     fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1850         let state = self.concurrent_state_mut();
1851         let caller = state.unwrap_current_guest_thread();
1852         let old_set = waitable.common(state)?.set;
1853         let set = state.get_mut(caller.task)?.sync_call_set;
1854         waitable.join(state, Some(set))?;
1855         self.suspend(SuspendReason::Waiting {
1856             set,
1857             thread: caller,
1858             skip_may_block_check: false,
1859         })?;
1860         let state = self.concurrent_state_mut();
1861         waitable.join(state, old_set)
1862     }
1863 }
1864 
1865 impl Instance {
1866     /// Get the next pending event for the specified task and (optional)
1867     /// waitable set, along with the waitable handle if applicable.
1868     fn get_event(
1869         self,
1870         store: &mut StoreOpaque,
1871         guest_task: TableId<GuestTask>,
1872         set: Option<TableId<WaitableSet>>,
1873         cancellable: bool,
1874     ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1875         let state = store.concurrent_state_mut();
1876 
1877         if let Some(event) = state.get_mut(guest_task)?.event.take() {
1878             log::trace!("deliver event {event:?} to {guest_task:?}");
1879 
1880             if cancellable || !matches!(event, Event::Cancelled) {
1881                 return Ok(Some((event, None)));
1882             } else {
1883                 state.get_mut(guest_task)?.event = Some(event);
1884             }
1885         }
1886 
1887         Ok(
1888             if let Some((set, waitable)) = set
1889                 .and_then(|set| {
1890                     state
1891                         .get_mut(set)
1892                         .map(|v| v.ready.pop_first().map(|v| (set, v)))
1893                         .transpose()
1894                 })
1895                 .transpose()?
1896             {
1897                 let common = waitable.common(state)?;
1898                 let handle = common.handle.unwrap();
1899                 let event = common.event.take().unwrap();
1900 
1901                 log::trace!(
1902                     "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1903                 );
1904 
1905                 waitable.on_delivery(store, self, event);
1906 
1907                 Some((event, Some((waitable, handle))))
1908             } else {
1909                 None
1910             },
1911         )
1912     }
1913 
1914     /// Handle the `CallbackCode` returned from an async-lifted export or its
1915     /// callback.
1916     ///
1917     /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1918     /// using `handle_guest_call`.
1919     fn handle_callback_code(
1920         self,
1921         store: &mut StoreOpaque,
1922         guest_thread: QualifiedThreadId,
1923         runtime_instance: RuntimeComponentInstanceIndex,
1924         code: u32,
1925     ) -> Result<Option<GuestCall>> {
1926         let (code, set) = unpack_callback_code(code);
1927 
1928         log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1929 
1930         let state = store.concurrent_state_mut();
1931 
1932         let get_set = |store: &mut StoreOpaque, handle| {
1933             if handle == 0 {
1934                 bail!("invalid waitable-set handle");
1935             }
1936 
1937             let set = store
1938                 .instance_state(RuntimeInstance {
1939                     instance: self.id().instance(),
1940                     index: runtime_instance,
1941                 })
1942                 .handle_table()
1943                 .waitable_set_rep(handle)?;
1944 
1945             Ok(TableId::<WaitableSet>::new(set))
1946         };
1947 
1948         Ok(match code {
1949             callback_code::EXIT => {
1950                 log::trace!("implicit thread {guest_thread:?} completed");
1951                 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1952                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1953                 if task.threads.is_empty() && !task.returned_or_cancelled() {
1954                     bail!(Trap::NoAsyncResult);
1955                 }
1956                 if let Caller::Guest { .. } = task.caller {
1957                     task.exited = true;
1958                     task.callback = None;
1959                 }
1960                 if task.ready_to_delete() {
1961                     Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1962                 }
1963                 None
1964             }
1965             callback_code::YIELD => {
1966                 let task = state.get_mut(guest_thread.task)?;
1967                 // If an `Event::Cancelled` is pending, we'll deliver that;
1968                 // otherwise, we'll deliver `Event::None`.  Note that
1969                 // `GuestTask::event` is only ever set to one of those two
1970                 // `Event` variants.
1971                 if let Some(event) = task.event {
1972                     assert!(matches!(event, Event::None | Event::Cancelled));
1973                 } else {
1974                     task.event = Some(Event::None);
1975                 }
1976                 let call = GuestCall {
1977                     thread: guest_thread,
1978                     kind: GuestCallKind::DeliverEvent {
1979                         instance: self,
1980                         set: None,
1981                     },
1982                 };
1983                 if state.may_block(guest_thread.task) {
1984                     // Push this thread onto the "low priority" queue so it runs
1985                     // after any other threads have had a chance to run.
1986                     state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
1987                     None
1988                 } else {
1989                     // Yielding in a non-blocking context is defined as a no-op
1990                     // according to the spec, so we must run this thread
1991                     // immediately without allowing any others to run.
1992                     Some(call)
1993                 }
1994             }
1995             callback_code::WAIT => {
1996                 // The task may only return `WAIT` if it was created for a call
1997                 // to an async export).  Otherwise, we'll trap.
1998                 state.check_blocking_for(guest_thread.task)?;
1999 
2000                 let set = get_set(store, set)?;
2001                 let state = store.concurrent_state_mut();
2002 
2003                 if state.get_mut(guest_thread.task)?.event.is_some()
2004                     || !state.get_mut(set)?.ready.is_empty()
2005                 {
2006                     // An event is immediately available; deliver it ASAP.
2007                     state.push_high_priority(WorkItem::GuestCall(
2008                         runtime_instance,
2009                         GuestCall {
2010                             thread: guest_thread,
2011                             kind: GuestCallKind::DeliverEvent {
2012                                 instance: self,
2013                                 set: Some(set),
2014                             },
2015                         },
2016                     ));
2017                 } else {
2018                     // No event is immediately available.
2019                     //
2020                     // We're waiting, so register to be woken up when an event
2021                     // is published for this waitable set.
2022                     //
2023                     // Here we also set `GuestTask::wake_on_cancel` which allows
2024                     // `subtask.cancel` to interrupt the wait.
2025                     let old = state
2026                         .get_mut(guest_thread.thread)?
2027                         .wake_on_cancel
2028                         .replace(set);
2029                     assert!(old.is_none());
2030                     let old = state
2031                         .get_mut(set)?
2032                         .waiting
2033                         .insert(guest_thread, WaitMode::Callback(self));
2034                     assert!(old.is_none());
2035                 }
2036                 None
2037             }
2038             _ => bail!("unsupported callback code: {code}"),
2039         })
2040     }
2041 
2042     fn cleanup_thread(
2043         self,
2044         store: &mut StoreOpaque,
2045         guest_thread: QualifiedThreadId,
2046         runtime_instance: RuntimeComponentInstanceIndex,
2047     ) -> Result<()> {
2048         let guest_id = store
2049             .concurrent_state_mut()
2050             .get_mut(guest_thread.thread)?
2051             .instance_rep;
2052         store
2053             .instance_state(RuntimeInstance {
2054                 instance: self.id().instance(),
2055                 index: runtime_instance,
2056             })
2057             .thread_handle_table()
2058             .guest_thread_remove(guest_id.unwrap())?;
2059 
2060         store.concurrent_state_mut().delete(guest_thread.thread)?;
2061         let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2062         task.threads.remove(&guest_thread.thread);
2063         Ok(())
2064     }
2065 
2066     /// Add the specified guest call to the "high priority" work item queue, to
2067     /// be started as soon as backpressure and/or reentrance rules allow.
2068     ///
2069     /// SAFETY: The raw pointer arguments must be valid references to guest
2070     /// functions (with the appropriate signatures) when the closures queued by
2071     /// this function are called.
2072     unsafe fn queue_call<T: 'static>(
2073         self,
2074         mut store: StoreContextMut<T>,
2075         guest_thread: QualifiedThreadId,
2076         callee: SendSyncPtr<VMFuncRef>,
2077         param_count: usize,
2078         result_count: usize,
2079         async_: bool,
2080         callback: Option<SendSyncPtr<VMFuncRef>>,
2081         post_return: Option<SendSyncPtr<VMFuncRef>>,
2082     ) -> Result<()> {
2083         /// Return a closure which will call the specified function in the scope
2084         /// of the specified task.
2085         ///
2086         /// This will use `GuestTask::lower_params` to lower the parameters, but
2087         /// will not lift the result; instead, it returns a
2088         /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2089         /// any, may be lifted.  Note that an async-lifted export will have
2090         /// returned its result using the `task.return` intrinsic (or not
2091         /// returned a result at all, in the case of `task.cancel`), in which
2092         /// case the "result" of this call will either be a callback code or
2093         /// nothing.
2094         ///
2095         /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2096         /// the returned closure is called.
2097         unsafe fn make_call<T: 'static>(
2098             store: StoreContextMut<T>,
2099             guest_thread: QualifiedThreadId,
2100             callee: SendSyncPtr<VMFuncRef>,
2101             param_count: usize,
2102             result_count: usize,
2103         ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2104         + Send
2105         + Sync
2106         + 'static
2107         + use<T> {
2108             let token = StoreToken::new(store);
2109             move |store: &mut dyn VMStore| {
2110                 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2111 
2112                 store
2113                     .concurrent_state_mut()
2114                     .get_mut(guest_thread.thread)?
2115                     .state = GuestThreadState::Running;
2116                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2117                 let lower = task.lower_params.take().unwrap();
2118 
2119                 lower(store, &mut storage[..param_count])?;
2120 
2121                 let mut store = token.as_context_mut(store);
2122 
2123                 // SAFETY: Per the contract documented in `make_call's`
2124                 // documentation, `callee` must be a valid pointer.
2125                 unsafe {
2126                     crate::Func::call_unchecked_raw(
2127                         &mut store,
2128                         callee.as_non_null(),
2129                         NonNull::new(
2130                             &mut storage[..param_count.max(result_count)]
2131                                 as *mut [MaybeUninit<ValRaw>] as _,
2132                         )
2133                         .unwrap(),
2134                     )?;
2135                 }
2136 
2137                 Ok(storage)
2138             }
2139         }
2140 
2141         // SAFETY: Per the contract described in this function documentation,
2142         // the `callee` pointer which `call` closes over must be valid when
2143         // called by the closure we queue below.
2144         let call = unsafe {
2145             make_call(
2146                 store.as_context_mut(),
2147                 guest_thread,
2148                 callee,
2149                 param_count,
2150                 result_count,
2151             )
2152         };
2153 
2154         let callee_instance = store
2155             .0
2156             .concurrent_state_mut()
2157             .get_mut(guest_thread.task)?
2158             .instance;
2159 
2160         let fun = if callback.is_some() {
2161             assert!(async_);
2162 
2163             Box::new(move |store: &mut dyn VMStore| {
2164                 self.add_guest_thread_to_instance_table(
2165                     guest_thread.thread,
2166                     store,
2167                     callee_instance.index,
2168                 )?;
2169                 let old_thread = store.set_thread(guest_thread);
2170                 log::trace!(
2171                     "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2172                 );
2173 
2174                 store.enter_instance(callee_instance);
2175 
2176                 // SAFETY: See the documentation for `make_call` to review the
2177                 // contract we must uphold for `call` here.
2178                 //
2179                 // Per the contract described in the `queue_call`
2180                 // documentation, the `callee` pointer which `call` closes
2181                 // over must be valid.
2182                 let storage = call(store)?;
2183 
2184                 store.exit_instance(callee_instance)?;
2185 
2186                 store.set_thread(old_thread);
2187                 let state = store.concurrent_state_mut();
2188                 old_thread
2189                     .guest()
2190                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
2191                 log::trace!("stackless call: restored {old_thread:?} as current thread");
2192 
2193                 // SAFETY: `wasmparser` will have validated that the callback
2194                 // function returns a `i32` result.
2195                 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2196 
2197                 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2198             })
2199                 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2200         } else {
2201             let token = StoreToken::new(store.as_context_mut());
2202             Box::new(move |store: &mut dyn VMStore| {
2203                 self.add_guest_thread_to_instance_table(
2204                     guest_thread.thread,
2205                     store,
2206                     callee_instance.index,
2207                 )?;
2208                 let old_thread = store.set_thread(guest_thread);
2209                 log::trace!(
2210                     "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2211                 );
2212                 let flags = self.id().get(store).instance_flags(callee_instance.index);
2213 
2214                 // Unless this is a callback-less (i.e. stackful)
2215                 // async-lifted export, we need to record that the instance
2216                 // cannot be entered until the call returns.
2217                 if !async_ {
2218                     store.enter_instance(callee_instance);
2219                 }
2220 
2221                 // SAFETY: See the documentation for `make_call` to review the
2222                 // contract we must uphold for `call` here.
2223                 //
2224                 // Per the contract described in the `queue_call`
2225                 // documentation, the `callee` pointer which `call` closes
2226                 // over must be valid.
2227                 let storage = call(store)?;
2228 
2229                 if async_ {
2230                     let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2231                     if task.threads.len() == 1 && !task.returned_or_cancelled() {
2232                         bail!(Trap::NoAsyncResult);
2233                     }
2234                 } else {
2235                     // This is a sync-lifted export, so now is when we lift the
2236                     // result, optionally call the post-return function, if any,
2237                     // and finally notify any current or future waiters that the
2238                     // subtask has returned.
2239 
2240                     let lift = {
2241                         store.exit_instance(callee_instance)?;
2242 
2243                         let state = store.concurrent_state_mut();
2244                         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2245 
2246                         state
2247                             .get_mut(guest_thread.task)?
2248                             .lift_result
2249                             .take()
2250                             .unwrap()
2251                     };
2252 
2253                     // SAFETY: `result_count` represents the number of core Wasm
2254                     // results returned, per `wasmparser`.
2255                     let result = (lift.lift)(store, unsafe {
2256                         mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2257                             &storage[..result_count],
2258                         )
2259                     })?;
2260 
2261                     let post_return_arg = match result_count {
2262                         0 => ValRaw::i32(0),
2263                         // SAFETY: `result_count` represents the number of
2264                         // core Wasm results returned, per `wasmparser`.
2265                         1 => unsafe { storage[0].assume_init() },
2266                         _ => unreachable!(),
2267                     };
2268 
2269                     unsafe {
2270                         call_post_return(
2271                             token.as_context_mut(store),
2272                             post_return.map(|v| v.as_non_null()),
2273                             post_return_arg,
2274                             flags,
2275                         )?;
2276                     }
2277 
2278                     self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2279                 }
2280 
2281                 // This is a callback-less call, so the implicit thread has now completed
2282                 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2283 
2284                 store.set_thread(old_thread);
2285 
2286                 let state = store.concurrent_state_mut();
2287                 let task = state.get_mut(guest_thread.task)?;
2288 
2289                 match &task.caller {
2290                     Caller::Host { .. } => {
2291                         if task.ready_to_delete() {
2292                             Waitable::Guest(guest_thread.task).delete_from(state)?;
2293                         }
2294                     }
2295                     Caller::Guest { .. } => {
2296                         task.exited = true;
2297                     }
2298                 }
2299 
2300                 Ok(None)
2301             })
2302         };
2303 
2304         store
2305             .0
2306             .concurrent_state_mut()
2307             .push_high_priority(WorkItem::GuestCall(
2308                 callee_instance.index,
2309                 GuestCall {
2310                     thread: guest_thread,
2311                     kind: GuestCallKind::StartImplicit(fun),
2312                 },
2313             ));
2314 
2315         Ok(())
2316     }
2317 
2318     /// Prepare (but do not start) a guest->guest call.
2319     ///
2320     /// This is called from fused adapter code generated in
2321     /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2322     /// are synthesized Wasm functions which move the parameters from the caller
2323     /// to the callee and the result from the callee to the caller,
2324     /// respectively.  The adapter will call `Self::start_call` immediately
2325     /// after calling this function.
2326     ///
2327     /// SAFETY: All the pointer arguments must be valid pointers to guest
2328     /// entities (and with the expected signatures for the function references
2329     /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
2330     unsafe fn prepare_call<T: 'static>(
2331         self,
2332         mut store: StoreContextMut<T>,
2333         start: *mut VMFuncRef,
2334         return_: *mut VMFuncRef,
2335         caller_instance: RuntimeComponentInstanceIndex,
2336         callee_instance: RuntimeComponentInstanceIndex,
2337         task_return_type: TypeTupleIndex,
2338         callee_async: bool,
2339         memory: *mut VMMemoryDefinition,
2340         string_encoding: u8,
2341         caller_info: CallerInfo,
2342     ) -> Result<()> {
2343         if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2344             // A task may only call an async-typed function via a sync lower if
2345             // it was created by a call to an async export.  Otherwise, we'll
2346             // trap.
2347             store.0.check_blocking()?;
2348         }
2349 
2350         enum ResultInfo {
2351             Heap { results: u32 },
2352             Stack { result_count: u32 },
2353         }
2354 
2355         let result_info = match &caller_info {
2356             CallerInfo::Async {
2357                 has_result: true,
2358                 params,
2359             } => ResultInfo::Heap {
2360                 results: params.last().unwrap().get_u32(),
2361             },
2362             CallerInfo::Async {
2363                 has_result: false, ..
2364             } => ResultInfo::Stack { result_count: 0 },
2365             CallerInfo::Sync {
2366                 result_count,
2367                 params,
2368             } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap {
2369                 results: params.last().unwrap().get_u32(),
2370             },
2371             CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2372                 result_count: *result_count,
2373             },
2374         };
2375 
2376         let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2377 
2378         // Create a new guest task for the call, closing over the `start` and
2379         // `return_` functions to lift the parameters and lower the result,
2380         // respectively.
2381         let start = SendSyncPtr::new(NonNull::new(start).unwrap());
2382         let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap());
2383         let token = StoreToken::new(store.as_context_mut());
2384         let state = store.0.concurrent_state_mut();
2385         let old_thread = state.unwrap_current_guest_thread();
2386 
2387         assert_eq!(
2388             state.get_mut(old_thread.task)?.instance,
2389             RuntimeInstance {
2390                 instance: self.id().instance(),
2391                 index: caller_instance,
2392             }
2393         );
2394 
2395         let new_task = GuestTask::new(
2396             state,
2397             Box::new(move |store, dst| {
2398                 let mut store = token.as_context_mut(store);
2399                 assert!(dst.len() <= MAX_FLAT_PARAMS);
2400                 // The `+ 1` here accounts for the return pointer, if any:
2401                 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2402                 let count = match caller_info {
2403                     // Async callers, if they have a result, use the last
2404                     // parameter as a return pointer so chop that off if
2405                     // relevant here.
2406                     CallerInfo::Async { params, has_result } => {
2407                         let params = &params[..params.len() - usize::from(has_result)];
2408                         for (param, src) in params.iter().zip(&mut src) {
2409                             src.write(*param);
2410                         }
2411                         params.len()
2412                     }
2413 
2414                     // Sync callers forward everything directly.
2415                     CallerInfo::Sync { params, .. } => {
2416                         for (param, src) in params.iter().zip(&mut src) {
2417                             src.write(*param);
2418                         }
2419                         params.len()
2420                     }
2421                 };
2422                 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2423                 // `wasmtime-cranelift`-generated fused adapter code.  Based on
2424                 // how it was constructed (see
2425                 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2426                 // for details) we know it takes count parameters and returns
2427                 // `dst.len()` results.
2428                 unsafe {
2429                     crate::Func::call_unchecked_raw(
2430                         &mut store,
2431                         start.as_non_null(),
2432                         NonNull::new(
2433                             &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2434                         )
2435                         .unwrap(),
2436                     )?;
2437                 }
2438                 dst.copy_from_slice(&src[..dst.len()]);
2439                 let state = store.0.concurrent_state_mut();
2440                 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event(
2441                     state,
2442                     Some(Event::Subtask {
2443                         status: Status::Started,
2444                     }),
2445                 )?;
2446                 Ok(())
2447             }),
2448             LiftResult {
2449                 lift: Box::new(move |store, src| {
2450                     // SAFETY: See comment in closure passed as `lower_params`
2451                     // parameter above.
2452                     let mut store = token.as_context_mut(store);
2453                     let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2454                     if let ResultInfo::Heap { results } = &result_info {
2455                         my_src.push(ValRaw::u32(*results));
2456                     }
2457                     // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2458                     // `wasmtime-cranelift`-generated fused adapter code.  Based
2459                     // on how it was constructed (see
2460                     // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2461                     // for details) we know it takes `src.len()` parameters and
2462                     // returns up to 1 result.
2463                     unsafe {
2464                         crate::Func::call_unchecked_raw(
2465                             &mut store,
2466                             return_.as_non_null(),
2467                             my_src.as_mut_slice().into(),
2468                         )?;
2469                     }
2470                     let state = store.0.concurrent_state_mut();
2471                     let thread = state.unwrap_current_guest_thread();
2472                     if sync_caller {
2473                         state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2474                             if let ResultInfo::Stack { result_count } = &result_info {
2475                                 match result_count {
2476                                     0 => None,
2477                                     1 => Some(my_src[0]),
2478                                     _ => unreachable!(),
2479                                 }
2480                             } else {
2481                                 None
2482                             },
2483                         );
2484                     }
2485                     Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2486                 }),
2487                 ty: task_return_type,
2488                 memory: NonNull::new(memory).map(SendSyncPtr::new),
2489                 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(),
2490             },
2491             Caller::Guest { thread: old_thread },
2492             None,
2493             RuntimeInstance {
2494                 instance: self.id().instance(),
2495                 index: callee_instance,
2496             },
2497             callee_async,
2498         )?;
2499 
2500         let guest_task = state.push(new_task)?;
2501         let new_thread = GuestThread::new_implicit(guest_task);
2502         let guest_thread = state.push(new_thread)?;
2503         state.get_mut(guest_task)?.threads.insert(guest_thread);
2504 
2505         // Make the new thread the current one so that `Self::start_call` knows
2506         // which one to start.
2507         store.0.set_thread(QualifiedThreadId {
2508             task: guest_task,
2509             thread: guest_thread,
2510         });
2511         log::trace!(
2512             "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2513         );
2514 
2515         Ok(())
2516     }
2517 
2518     /// Call the specified callback function for an async-lifted export.
2519     ///
2520     /// SAFETY: `function` must be a valid reference to a guest function of the
2521     /// correct signature for a callback.
2522     unsafe fn call_callback<T>(
2523         self,
2524         mut store: StoreContextMut<T>,
2525         function: SendSyncPtr<VMFuncRef>,
2526         event: Event,
2527         handle: u32,
2528     ) -> Result<u32> {
2529         let (ordinal, result) = event.parts();
2530         let params = &mut [
2531             ValRaw::u32(ordinal),
2532             ValRaw::u32(handle),
2533             ValRaw::u32(result),
2534         ];
2535         // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2536         // `wasmtime-cranelift`-generated fused adapter code or
2537         // `component::Options`.  Per `wasmparser` callback signature
2538         // validation, we know it takes three parameters and returns one.
2539         unsafe {
2540             crate::Func::call_unchecked_raw(
2541                 &mut store,
2542                 function.as_non_null(),
2543                 params.as_mut_slice().into(),
2544             )?;
2545         }
2546         Ok(params[0].get_u32())
2547     }
2548 
2549     /// Start a guest->guest call previously prepared using
2550     /// `Self::prepare_call`.
2551     ///
2552     /// This is called from fused adapter code generated in
2553     /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2554     /// this function immediately after calling `Self::prepare_call`.
2555     ///
2556     /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2557     /// functions with the appropriate signatures for the current guest task.
2558     /// If this is a call to an async-lowered import, the actual call may be
2559     /// deferred and run after this function returns, in which case the pointer
2560     /// arguments must also be valid when the call happens.
2561     unsafe fn start_call<T: 'static>(
2562         self,
2563         mut store: StoreContextMut<T>,
2564         callback: *mut VMFuncRef,
2565         post_return: *mut VMFuncRef,
2566         callee: *mut VMFuncRef,
2567         param_count: u32,
2568         result_count: u32,
2569         flags: u32,
2570         storage: Option<&mut [MaybeUninit<ValRaw>]>,
2571     ) -> Result<u32> {
2572         let token = StoreToken::new(store.as_context_mut());
2573         let async_caller = storage.is_none();
2574         let state = store.0.concurrent_state_mut();
2575         let guest_thread = state.unwrap_current_guest_thread();
2576         let callee_async = state.get_mut(guest_thread.task)?.async_function;
2577         let callee = SendSyncPtr::new(NonNull::new(callee).unwrap());
2578         let param_count = usize::try_from(param_count).unwrap();
2579         assert!(param_count <= MAX_FLAT_PARAMS);
2580         let result_count = usize::try_from(result_count).unwrap();
2581         assert!(result_count <= MAX_FLAT_RESULTS);
2582 
2583         let task = state.get_mut(guest_thread.task)?;
2584         if !callback.is_null() {
2585             // We're calling an async-lifted export with a callback, so store
2586             // the callback and related context as part of the task so we can
2587             // call it later when needed.
2588             let callback = SendSyncPtr::new(NonNull::new(callback).unwrap());
2589             task.callback = Some(Box::new(move |store, event, handle| {
2590                 let store = token.as_context_mut(store);
2591                 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2592             }));
2593         }
2594 
2595         let Caller::Guest { thread: caller } = &task.caller else {
2596             // As of this writing, `start_call` is only used for guest->guest
2597             // calls.
2598             unreachable!()
2599         };
2600         let caller = *caller;
2601         let caller_instance = state.get_mut(caller.task)?.instance;
2602 
2603         // Queue the call as a "high priority" work item.
2604         unsafe {
2605             self.queue_call(
2606                 store.as_context_mut(),
2607                 guest_thread,
2608                 callee,
2609                 param_count,
2610                 result_count,
2611                 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2612                 NonNull::new(callback).map(SendSyncPtr::new),
2613                 NonNull::new(post_return).map(SendSyncPtr::new),
2614             )?;
2615         }
2616 
2617         let state = store.0.concurrent_state_mut();
2618 
2619         // Use the caller's `GuestTask::sync_call_set` to register interest in
2620         // the subtask...
2621         let guest_waitable = Waitable::Guest(guest_thread.task);
2622         let old_set = guest_waitable.common(state)?.set;
2623         let set = state.get_mut(caller.task)?.sync_call_set;
2624         guest_waitable.join(state, Some(set))?;
2625 
2626         // ... and suspend this fiber temporarily while we wait for it to start.
2627         //
2628         // Note that we _could_ call the callee directly using the current fiber
2629         // rather than suspend this one, but that would make reasoning about the
2630         // event loop more complicated and is probably only worth doing if
2631         // there's a measurable performance benefit.  In addition, it would mean
2632         // blocking the caller if the callee calls a blocking sync-lowered
2633         // import, and as of this writing the spec says we must not do that.
2634         //
2635         // Alternatively, the fused adapter code could be modified to call the
2636         // callee directly without calling a host-provided intrinsic at all (in
2637         // which case it would need to do its own, inline backpressure checks,
2638         // etc.).  Again, we'd want to see a measurable performance benefit
2639         // before committing to such an optimization.  And again, we'd need to
2640         // update the spec to allow that.
2641         let (status, waitable) = loop {
2642             store.0.suspend(SuspendReason::Waiting {
2643                 set,
2644                 thread: caller,
2645                 // Normally, `StoreOpaque::suspend` would assert it's being
2646                 // called from a context where blocking is allowed.  However, if
2647                 // `async_caller` is `true`, we'll only "block" long enough for
2648                 // the callee to start, i.e. we won't repeat this loop, so we
2649                 // tell `suspend` it's okay even if we're not allowed to block.
2650                 // Alternatively, if the callee is not an async function, then
2651                 // we know it won't block anyway.
2652                 skip_may_block_check: async_caller || !callee_async,
2653             })?;
2654 
2655             let state = store.0.concurrent_state_mut();
2656 
2657             log::trace!("taking event for {:?}", guest_thread.task);
2658             let event = guest_waitable.take_event(state)?;
2659             let Some(Event::Subtask { status }) = event else {
2660                 unreachable!();
2661             };
2662 
2663             log::trace!("status {status:?} for {:?}", guest_thread.task);
2664 
2665             if status == Status::Returned {
2666                 // It returned, so we can stop waiting.
2667                 break (status, None);
2668             } else if async_caller {
2669                 // It hasn't returned yet, but the caller is calling via an
2670                 // async-lowered import, so we generate a handle for the task
2671                 // waitable and return the status.
2672                 let handle = store
2673                     .0
2674                     .instance_state(caller_instance)
2675                     .handle_table()
2676                     .subtask_insert_guest(guest_thread.task.rep())?;
2677                 store
2678                     .0
2679                     .concurrent_state_mut()
2680                     .get_mut(guest_thread.task)?
2681                     .common
2682                     .handle = Some(handle);
2683                 break (status, Some(handle));
2684             } else {
2685                 // The callee hasn't returned yet, and the caller is calling via
2686                 // a sync-lowered import, so we loop and keep waiting until the
2687                 // callee returns.
2688             }
2689         };
2690 
2691         guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2692 
2693         // Reset the current thread to point to the caller as it resumes control.
2694         store.0.set_thread(caller);
2695         store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2696         log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2697 
2698         if let Some(storage) = storage {
2699             // The caller used a sync-lowered import to call an async-lifted
2700             // export, in which case the result, if any, has been stashed in
2701             // `GuestTask::sync_result`.
2702             let state = store.0.concurrent_state_mut();
2703             let task = state.get_mut(guest_thread.task)?;
2704             if let Some(result) = task.sync_result.take() {
2705                 if let Some(result) = result {
2706                     storage[0] = MaybeUninit::new(result);
2707                 }
2708 
2709                 if task.exited && task.ready_to_delete() {
2710                     Waitable::Guest(guest_thread.task).delete_from(state)?;
2711                 }
2712             }
2713         }
2714 
2715         Ok(status.pack(waitable))
2716     }
2717 
2718     /// Poll the specified future once on behalf of a guest->host call using an
2719     /// async-lowered import.
2720     ///
2721     /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2722     /// `Pending`, add it to the set of futures to be polled as part of this
2723     /// instance's event loop until it completes, and then return
2724     /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2725     ///
2726     /// Whether the future returns `Ready` immediately or later, the `lower`
2727     /// function will be used to lower the result, if any, into the guest caller's
2728     /// stack and linear memory. The `lower` function is invoked with `None` if
2729     /// the future is cancelled.
2730     pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2731         self,
2732         mut store: StoreContextMut<'_, T>,
2733         future: impl Future<Output = Result<R>> + Send + 'static,
2734         lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2735     ) -> Result<Option<u32>> {
2736         let token = StoreToken::new(store.as_context_mut());
2737         let state = store.0.concurrent_state_mut();
2738         let task = state.unwrap_current_host_thread();
2739 
2740         // Create an abortable future which hooks calls to poll and manages call
2741         // context state for the future.
2742         let (join_handle, future) = JoinHandle::run(future);
2743         {
2744             let state = &mut state.get_mut(task)?.state;
2745             assert!(matches!(state, HostTaskState::CalleeStarted));
2746             *state = HostTaskState::CalleeRunning(join_handle);
2747         }
2748 
2749         let mut future = Box::pin(future);
2750 
2751         // Finally, poll the future.  We can use a dummy `Waker` here because
2752         // we'll add the future to `ConcurrentState::futures` and poll it
2753         // automatically from the event loop if it doesn't complete immediately
2754         // here.
2755         let poll = tls::set(store.0, || {
2756             future
2757                 .as_mut()
2758                 .poll(&mut Context::from_waker(&Waker::noop()))
2759         });
2760 
2761         match poll {
2762             // It finished immediately; lower the result and delete the task.
2763             Poll::Ready(Some(result)) => {
2764                 lower(store.as_context_mut(), Some(result?))?;
2765                 return Ok(None);
2766             }
2767 
2768             // Shouldn't be possible since the future isn't cancelled via the
2769             // `join_handle`.
2770             Poll::Ready(None) => unreachable!(),
2771 
2772             // Future isn't ready yet, so fall through.
2773             Poll::Pending => {}
2774         }
2775 
2776         // It hasn't finished yet; add the future to
2777         // `ConcurrentState::futures` so it will be polled by the event
2778         // loop and allocate a waitable handle to return to the guest.
2779 
2780         // Wrap the future in a closure responsible for lowering the result into
2781         // the guest's stack and memory, as well as notifying any waiters that
2782         // the task returned.
2783         let future = Box::pin(async move {
2784             let result = match future.await {
2785                 Some(result) => Some(result?),
2786                 None => None,
2787             };
2788             let on_complete = move |store: &mut dyn VMStore| {
2789                 // Restore the `current_thread` to be the host so `lower` knows
2790                 // how to manipulate borrows and knows which scope of borrows
2791                 // to check.
2792                 let mut store = token.as_context_mut(store);
2793                 let state = store.0.concurrent_state_mut();
2794                 assert!(state.current_thread.is_none());
2795                 store.0.set_thread(task);
2796 
2797                 let status = if result.is_some() {
2798                     Status::Returned
2799                 } else {
2800                     Status::ReturnCancelled
2801                 };
2802 
2803                 lower(store.as_context_mut(), result)?;
2804                 let state = store.0.concurrent_state_mut();
2805                 state.get_mut(task)?.state = HostTaskState::CalleeDone;
2806                 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2807 
2808                 // Go back to "no current thread" at the end.
2809                 store.0.set_thread(CurrentThread::None);
2810                 Ok(())
2811             };
2812 
2813             // Here we schedule a task to run on a worker fiber to do the
2814             // lowering since it may involve a call to the guest's realloc
2815             // function. This is necessary because calling the guest while
2816             // there are host embedder frames on the stack is unsound.
2817             tls::get(move |store| {
2818                 store
2819                     .concurrent_state_mut()
2820                     .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2821                         on_complete,
2822                     ))));
2823                 Ok(())
2824             })
2825         });
2826 
2827         // Make this task visible to the guest and then record what it
2828         // was made visible as.
2829         let state = store.0.concurrent_state_mut();
2830         state.push_future(future);
2831         let caller = state.get_mut(task)?.caller;
2832         let instance = state.get_mut(caller.task)?.instance;
2833         let handle = store
2834             .0
2835             .instance_state(instance)
2836             .handle_table()
2837             .subtask_insert_host(task.rep())?;
2838         store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2839         log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2840 
2841         // Restore the currently running thread to this host task's
2842         // caller. Note that the host task isn't deallocated as it's
2843         // within the store and will get deallocated later.
2844         store.0.set_thread(caller);
2845         Ok(Some(handle))
2846     }
2847 
2848     /// Implements the `task.return` intrinsic, lifting the result for the
2849     /// current guest task.
2850     pub(crate) fn task_return(
2851         self,
2852         store: &mut dyn VMStore,
2853         ty: TypeTupleIndex,
2854         options: OptionsIndex,
2855         storage: &[ValRaw],
2856     ) -> Result<()> {
2857         let state = store.concurrent_state_mut();
2858         let guest_thread = state.unwrap_current_guest_thread();
2859         let lift = state
2860             .get_mut(guest_thread.task)?
2861             .lift_result
2862             .take()
2863             .ok_or_else(|| {
2864                 format_err!("`task.return` or `task.cancel` called more than once for current task")
2865             })?;
2866         assert!(state.get_mut(guest_thread.task)?.result.is_none());
2867 
2868         let CanonicalOptions {
2869             string_encoding,
2870             data_model,
2871             ..
2872         } = &self.id().get(store).component().env_component().options[options];
2873 
2874         let invalid = ty != lift.ty
2875             || string_encoding != &lift.string_encoding
2876             || match data_model {
2877                 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2878                     Some(memory) => {
2879                         let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2880                         let actual = self.id().get(store).runtime_memory(memory);
2881                         expected != actual.as_ptr()
2882                     }
2883                     // Memory not specified, meaning it didn't need to be
2884                     // specified per validation, so not invalid.
2885                     None => false,
2886                 },
2887                 // Always invalid as this isn't supported.
2888                 CanonicalOptionsDataModel::Gc { .. } => true,
2889             };
2890 
2891         if invalid {
2892             bail!("invalid `task.return` signature and/or options for current task");
2893         }
2894 
2895         log::trace!("task.return for {guest_thread:?}");
2896 
2897         let result = (lift.lift)(store, storage)?;
2898         self.task_complete(store, guest_thread.task, result, Status::Returned)
2899     }
2900 
2901     /// Implements the `task.cancel` intrinsic.
2902     pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2903         let state = store.concurrent_state_mut();
2904         let guest_thread = state.unwrap_current_guest_thread();
2905         let task = state.get_mut(guest_thread.task)?;
2906         if !task.cancel_sent {
2907             bail!("`task.cancel` called by task which has not been cancelled")
2908         }
2909         _ = task.lift_result.take().ok_or_else(|| {
2910             format_err!("`task.return` or `task.cancel` called more than once for current task")
2911         })?;
2912 
2913         assert!(task.result.is_none());
2914 
2915         log::trace!("task.cancel for {guest_thread:?}");
2916 
2917         self.task_complete(
2918             store,
2919             guest_thread.task,
2920             Box::new(DummyResult),
2921             Status::ReturnCancelled,
2922         )
2923     }
2924 
2925     /// Complete the specified guest task (i.e. indicate that it has either
2926     /// returned a (possibly empty) result or cancelled itself).
2927     ///
2928     /// This will return any resource borrows and notify any current or future
2929     /// waiters that the task has completed.
2930     fn task_complete(
2931         self,
2932         store: &mut StoreOpaque,
2933         guest_task: TableId<GuestTask>,
2934         result: Box<dyn Any + Send + Sync>,
2935         status: Status,
2936     ) -> Result<()> {
2937         store
2938             .component_resource_tables(Some(self))
2939             .validate_scope_exit()?;
2940 
2941         let state = store.concurrent_state_mut();
2942         let task = state.get_mut(guest_task)?;
2943 
2944         if let Caller::Host { tx, .. } = &mut task.caller {
2945             if let Some(tx) = tx.take() {
2946                 _ = tx.send(result);
2947             }
2948         } else {
2949             task.result = Some(result);
2950             Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
2951         }
2952 
2953         Ok(())
2954     }
2955 
2956     /// Implements the `waitable-set.new` intrinsic.
2957     pub(crate) fn waitable_set_new(
2958         self,
2959         store: &mut StoreOpaque,
2960         caller_instance: RuntimeComponentInstanceIndex,
2961     ) -> Result<u32> {
2962         let set = store.concurrent_state_mut().push(WaitableSet::default())?;
2963         let handle = store
2964             .instance_state(RuntimeInstance {
2965                 instance: self.id().instance(),
2966                 index: caller_instance,
2967             })
2968             .handle_table()
2969             .waitable_set_insert(set.rep())?;
2970         log::trace!("new waitable set {set:?} (handle {handle})");
2971         Ok(handle)
2972     }
2973 
2974     /// Implements the `waitable-set.drop` intrinsic.
2975     pub(crate) fn waitable_set_drop(
2976         self,
2977         store: &mut StoreOpaque,
2978         caller_instance: RuntimeComponentInstanceIndex,
2979         set: u32,
2980     ) -> Result<()> {
2981         let rep = store
2982             .instance_state(RuntimeInstance {
2983                 instance: self.id().instance(),
2984                 index: caller_instance,
2985             })
2986             .handle_table()
2987             .waitable_set_remove(set)?;
2988 
2989         log::trace!("drop waitable set {rep} (handle {set})");
2990 
2991         let set = store
2992             .concurrent_state_mut()
2993             .delete(TableId::<WaitableSet>::new(rep))?;
2994 
2995         if !set.waiting.is_empty() {
2996             bail!("cannot drop waitable set with waiters");
2997         }
2998 
2999         Ok(())
3000     }
3001 
3002     /// Implements the `waitable.join` intrinsic.
3003     pub(crate) fn waitable_join(
3004         self,
3005         store: &mut StoreOpaque,
3006         caller_instance: RuntimeComponentInstanceIndex,
3007         waitable_handle: u32,
3008         set_handle: u32,
3009     ) -> Result<()> {
3010         let mut instance = self.id().get_mut(store);
3011         let waitable =
3012             Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3013 
3014         let set = if set_handle == 0 {
3015             None
3016         } else {
3017             let set = instance.instance_states().0[caller_instance]
3018                 .handle_table()
3019                 .waitable_set_rep(set_handle)?;
3020 
3021             Some(TableId::<WaitableSet>::new(set))
3022         };
3023 
3024         log::trace!(
3025             "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3026         );
3027 
3028         waitable.join(store.concurrent_state_mut(), set)
3029     }
3030 
3031     /// Implements the `subtask.drop` intrinsic.
3032     pub(crate) fn subtask_drop(
3033         self,
3034         store: &mut StoreOpaque,
3035         caller_instance: RuntimeComponentInstanceIndex,
3036         task_id: u32,
3037     ) -> Result<()> {
3038         self.waitable_join(store, caller_instance, task_id, 0)?;
3039 
3040         let (rep, is_host) = store
3041             .instance_state(RuntimeInstance {
3042                 instance: self.id().instance(),
3043                 index: caller_instance,
3044             })
3045             .handle_table()
3046             .subtask_remove(task_id)?;
3047 
3048         let concurrent_state = store.concurrent_state_mut();
3049         let (waitable, expected_caller, delete) = if is_host {
3050             let id = TableId::<HostTask>::new(rep);
3051             let task = concurrent_state.get_mut(id)?;
3052             match &task.state {
3053                 HostTaskState::CalleeRunning(_) => {
3054                     bail!("cannot drop a subtask which has not yet resolved");
3055                 }
3056                 HostTaskState::CalleeDone => {}
3057                 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3058             }
3059             (Waitable::Host(id), task.caller, true)
3060         } else {
3061             let id = TableId::<GuestTask>::new(rep);
3062             let task = concurrent_state.get_mut(id)?;
3063             if task.lift_result.is_some() {
3064                 bail!("cannot drop a subtask which has not yet resolved");
3065             }
3066             if let Caller::Guest { thread } = task.caller {
3067                 (
3068                     Waitable::Guest(id),
3069                     thread,
3070                     concurrent_state.get_mut(id)?.exited,
3071                 )
3072             } else {
3073                 unreachable!()
3074             }
3075         };
3076 
3077         waitable.common(concurrent_state)?.handle = None;
3078 
3079         if waitable.take_event(concurrent_state)?.is_some() {
3080             bail!("cannot drop a subtask with an undelivered event");
3081         }
3082 
3083         if delete {
3084             waitable.delete_from(concurrent_state)?;
3085         }
3086 
3087         // Since waitables can neither be passed between instances nor forged,
3088         // this should never fail unless there's a bug in Wasmtime, but we check
3089         // here to be sure:
3090         assert_eq!(
3091             expected_caller,
3092             concurrent_state.unwrap_current_guest_thread(),
3093         );
3094         log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3095         Ok(())
3096     }
3097 
3098     /// Implements the `waitable-set.wait` intrinsic.
3099     pub(crate) fn waitable_set_wait(
3100         self,
3101         store: &mut StoreOpaque,
3102         options: OptionsIndex,
3103         set: u32,
3104         payload: u32,
3105     ) -> Result<u32> {
3106         if !self.options(store, options).async_ {
3107             // The caller may only call `waitable-set.wait` from an async task
3108             // (i.e. a task created via a call to an async export).
3109             // Otherwise, we'll trap.
3110             store.check_blocking()?;
3111         }
3112 
3113         let &CanonicalOptions {
3114             cancellable,
3115             instance: caller_instance,
3116             ..
3117         } = &self.id().get(store).component().env_component().options[options];
3118         let rep = store
3119             .instance_state(RuntimeInstance {
3120                 instance: self.id().instance(),
3121                 index: caller_instance,
3122             })
3123             .handle_table()
3124             .waitable_set_rep(set)?;
3125 
3126         self.waitable_check(
3127             store,
3128             cancellable,
3129             WaitableCheck::Wait,
3130             WaitableCheckParams {
3131                 set: TableId::new(rep),
3132                 options,
3133                 payload,
3134             },
3135         )
3136     }
3137 
3138     /// Implements the `waitable-set.poll` intrinsic.
3139     pub(crate) fn waitable_set_poll(
3140         self,
3141         store: &mut StoreOpaque,
3142         options: OptionsIndex,
3143         set: u32,
3144         payload: u32,
3145     ) -> Result<u32> {
3146         let &CanonicalOptions {
3147             cancellable,
3148             instance: caller_instance,
3149             ..
3150         } = &self.id().get(store).component().env_component().options[options];
3151         let rep = store
3152             .instance_state(RuntimeInstance {
3153                 instance: self.id().instance(),
3154                 index: caller_instance,
3155             })
3156             .handle_table()
3157             .waitable_set_rep(set)?;
3158 
3159         self.waitable_check(
3160             store,
3161             cancellable,
3162             WaitableCheck::Poll,
3163             WaitableCheckParams {
3164                 set: TableId::new(rep),
3165                 options,
3166                 payload,
3167             },
3168         )
3169     }
3170 
3171     /// Implements the `thread.index` intrinsic.
3172     pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3173         let thread_id = store
3174             .concurrent_state_mut()
3175             .unwrap_current_guest_thread()
3176             .thread;
3177         // The unwrap is safe because `instance_rep` must be `Some` by this point
3178         Ok(store
3179             .concurrent_state_mut()
3180             .get_mut(thread_id)?
3181             .instance_rep
3182             .unwrap())
3183     }
3184 
3185     /// Implements the `thread.new-indirect` intrinsic.
3186     pub(crate) fn thread_new_indirect<T: 'static>(
3187         self,
3188         mut store: StoreContextMut<T>,
3189         runtime_instance: RuntimeComponentInstanceIndex,
3190         _func_ty_idx: TypeFuncIndex, // currently unused
3191         start_func_table_idx: RuntimeTableIndex,
3192         start_func_idx: u32,
3193         context: i32,
3194     ) -> Result<u32> {
3195         log::trace!("creating new thread");
3196 
3197         let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3198         let (instance, registry) = self.id().get_mut_and_registry(store.0);
3199         let callee = instance
3200             .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3201             .ok_or_else(|| {
3202                 format_err!("the start function index points to an uninitialized function")
3203             })?;
3204         if callee.type_index(store.0) != start_func_ty.type_index() {
3205             bail!(
3206                 "start function does not match expected type (currently only `(i32) -> ()` is supported)"
3207             );
3208         }
3209 
3210         let token = StoreToken::new(store.as_context_mut());
3211         let start_func = Box::new(
3212             move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3213                 let old_thread = store.set_thread(guest_thread);
3214                 log::trace!(
3215                     "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3216                 );
3217 
3218                 let mut store = token.as_context_mut(store);
3219                 let mut params = [ValRaw::i32(context)];
3220                 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3221                 // on a separate fiber if we're running in an async store.
3222                 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3223 
3224                 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3225                 log::trace!("explicit thread {guest_thread:?} completed");
3226                 let state = store.0.concurrent_state_mut();
3227                 let task = state.get_mut(guest_thread.task)?;
3228                 if task.threads.is_empty() && !task.returned_or_cancelled() {
3229                     bail!(Trap::NoAsyncResult);
3230                 }
3231                 store.0.set_thread(old_thread);
3232                 let state = store.0.concurrent_state_mut();
3233                 old_thread
3234                     .guest()
3235                     .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running);
3236                 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3237                     Waitable::Guest(guest_thread.task).delete_from(state)?;
3238                 }
3239                 log::trace!("thread start: restored {old_thread:?} as current thread");
3240 
3241                 Ok(())
3242             },
3243         );
3244 
3245         let state = store.0.concurrent_state_mut();
3246         let current_thread = state.unwrap_current_guest_thread();
3247         let parent_task = current_thread.task;
3248 
3249         let new_thread = GuestThread::new_explicit(parent_task, start_func);
3250         let thread_id = state.push(new_thread)?;
3251         state.get_mut(parent_task)?.threads.insert(thread_id);
3252 
3253         log::trace!("new thread with id {thread_id:?} created");
3254 
3255         self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3256     }
3257 
3258     pub(crate) fn resume_thread(
3259         self,
3260         store: &mut StoreOpaque,
3261         runtime_instance: RuntimeComponentInstanceIndex,
3262         thread_idx: u32,
3263         high_priority: bool,
3264         allow_ready: bool,
3265     ) -> Result<()> {
3266         let thread_id =
3267             GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3268         let state = store.concurrent_state_mut();
3269         let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3270         let thread = state.get_mut(guest_thread.thread)?;
3271 
3272         match mem::replace(&mut thread.state, GuestThreadState::Running) {
3273             GuestThreadState::NotStartedExplicit(start_func) => {
3274                 log::trace!("starting thread {guest_thread:?}");
3275                 let guest_call = WorkItem::GuestCall(
3276                     runtime_instance,
3277                     GuestCall {
3278                         thread: guest_thread,
3279                         kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3280                             start_func(store, guest_thread)
3281                         })),
3282                     },
3283                 );
3284                 store
3285                     .concurrent_state_mut()
3286                     .push_work_item(guest_call, high_priority);
3287             }
3288             GuestThreadState::Suspended(fiber) => {
3289                 log::trace!("resuming thread {thread_id:?} that was suspended");
3290                 store
3291                     .concurrent_state_mut()
3292                     .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3293             }
3294             GuestThreadState::Ready(fiber) if allow_ready => {
3295                 log::trace!("resuming thread {thread_id:?} that was ready");
3296                 thread.state = GuestThreadState::Ready(fiber);
3297                 store
3298                     .concurrent_state_mut()
3299                     .promote_thread_work_item(guest_thread);
3300             }
3301             other => {
3302                 thread.state = other;
3303                 bail!("cannot resume thread which is not suspended");
3304             }
3305         }
3306         Ok(())
3307     }
3308 
3309     fn add_guest_thread_to_instance_table(
3310         self,
3311         thread_id: TableId<GuestThread>,
3312         store: &mut StoreOpaque,
3313         runtime_instance: RuntimeComponentInstanceIndex,
3314     ) -> Result<u32> {
3315         let guest_id = store
3316             .instance_state(RuntimeInstance {
3317                 instance: self.id().instance(),
3318                 index: runtime_instance,
3319             })
3320             .thread_handle_table()
3321             .guest_thread_insert(thread_id.rep())?;
3322         store
3323             .concurrent_state_mut()
3324             .get_mut(thread_id)?
3325             .instance_rep = Some(guest_id);
3326         Ok(guest_id)
3327     }
3328 
3329     /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3330     /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
3331     pub(crate) fn suspension_intrinsic(
3332         self,
3333         store: &mut StoreOpaque,
3334         caller: RuntimeComponentInstanceIndex,
3335         cancellable: bool,
3336         yielding: bool,
3337         to_thread: SuspensionTarget,
3338     ) -> Result<WaitResult> {
3339         let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3340         if to_thread.is_none() {
3341             let state = store.concurrent_state_mut();
3342             if yielding {
3343                 // This is a `thread.yield` call
3344                 if !state.may_block(guest_thread.task) {
3345                     // In a non-blocking context, a `thread.yield` may trigger
3346                     // other threads in the same component instance to run.
3347                     if !state.promote_instance_local_thread_work_item(caller) {
3348                         // No other threads are runnable, so just return
3349                         return Ok(WaitResult::Completed);
3350                     }
3351                 }
3352             } else {
3353                 // The caller may only call `thread.suspend` from an async task
3354                 // (i.e. a task created via a call to an async export).
3355                 // Otherwise, we'll trap.
3356                 store.check_blocking()?;
3357             }
3358         }
3359 
3360         // There could be a pending cancellation from a previous uncancellable wait
3361         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3362             return Ok(WaitResult::Cancelled);
3363         }
3364 
3365         match to_thread {
3366             SuspensionTarget::SomeSuspended(thread) => {
3367                 self.resume_thread(store, caller, thread, true, false)?
3368             }
3369             SuspensionTarget::Some(thread) => {
3370                 self.resume_thread(store, caller, thread, true, true)?
3371             }
3372             SuspensionTarget::None => { /* nothing to do */ }
3373         }
3374 
3375         let reason = if yielding {
3376             SuspendReason::Yielding {
3377                 thread: guest_thread,
3378                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3379                 // we're handling a `thread.yield-to-suspended` call; otherwise it would
3380                 // panic if we called it in a non-blocking context.
3381                 skip_may_block_check: to_thread.is_some(),
3382             }
3383         } else {
3384             SuspendReason::ExplicitlySuspending {
3385                 thread: guest_thread,
3386                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3387                 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3388                 // panic if we called it in a non-blocking context.
3389                 skip_may_block_check: to_thread.is_some(),
3390             }
3391         };
3392 
3393         store.suspend(reason)?;
3394 
3395         if cancellable && store.concurrent_state_mut().take_pending_cancellation() {
3396             Ok(WaitResult::Cancelled)
3397         } else {
3398             Ok(WaitResult::Completed)
3399         }
3400     }
3401 
3402     /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
3403     fn waitable_check(
3404         self,
3405         store: &mut StoreOpaque,
3406         cancellable: bool,
3407         check: WaitableCheck,
3408         params: WaitableCheckParams,
3409     ) -> Result<u32> {
3410         let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread();
3411 
3412         log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3413 
3414         let state = store.concurrent_state_mut();
3415         let task = state.get_mut(guest_thread.task)?;
3416 
3417         // If we're waiting, and there are no events immediately available,
3418         // suspend the fiber until that changes.
3419         match &check {
3420             WaitableCheck::Wait => {
3421                 let set = params.set;
3422 
3423                 if (task.event.is_none()
3424                     || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3425                     && state.get_mut(set)?.ready.is_empty()
3426                 {
3427                     if cancellable {
3428                         let old = state
3429                             .get_mut(guest_thread.thread)?
3430                             .wake_on_cancel
3431                             .replace(set);
3432                         assert!(old.is_none());
3433                     }
3434 
3435                     store.suspend(SuspendReason::Waiting {
3436                         set,
3437                         thread: guest_thread,
3438                         skip_may_block_check: false,
3439                     })?;
3440                 }
3441             }
3442             WaitableCheck::Poll => {}
3443         }
3444 
3445         log::trace!(
3446             "waitable check for {guest_thread:?}; set {:?}, part two",
3447             params.set
3448         );
3449 
3450         // Deliver any pending events to the guest and return.
3451         let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3452 
3453         let (ordinal, handle, result) = match &check {
3454             WaitableCheck::Wait => {
3455                 let (event, waitable) = event.unwrap();
3456                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3457                 let (ordinal, result) = event.parts();
3458                 (ordinal, handle, result)
3459             }
3460             WaitableCheck::Poll => {
3461                 if let Some((event, waitable)) = event {
3462                     let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3463                     let (ordinal, result) = event.parts();
3464                     (ordinal, handle, result)
3465                 } else {
3466                     log::trace!(
3467                         "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3468                         guest_thread.task,
3469                         params.set
3470                     );
3471                     let (ordinal, result) = Event::None.parts();
3472                     (ordinal, 0, result)
3473                 }
3474             }
3475         };
3476         let memory = self.options_memory_mut(store, params.options);
3477         let ptr = func::validate_inbounds_dynamic(
3478             &CanonicalAbiInfo::POINTER_PAIR,
3479             memory,
3480             &ValRaw::u32(params.payload),
3481         )?;
3482         memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3483         memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3484         Ok(ordinal)
3485     }
3486 
3487     /// Implements the `subtask.cancel` intrinsic.
3488     pub(crate) fn subtask_cancel(
3489         self,
3490         store: &mut StoreOpaque,
3491         caller_instance: RuntimeComponentInstanceIndex,
3492         async_: bool,
3493         task_id: u32,
3494     ) -> Result<u32> {
3495         if !async_ {
3496             // The caller may only sync call `subtask.cancel` from an async task
3497             // (i.e. a task created via a call to an async export).  Otherwise,
3498             // we'll trap.
3499             store.check_blocking()?;
3500         }
3501 
3502         let (rep, is_host) = store
3503             .instance_state(RuntimeInstance {
3504                 instance: self.id().instance(),
3505                 index: caller_instance,
3506             })
3507             .handle_table()
3508             .subtask_rep(task_id)?;
3509         let (waitable, expected_caller) = if is_host {
3510             let id = TableId::<HostTask>::new(rep);
3511             (
3512                 Waitable::Host(id),
3513                 store.concurrent_state_mut().get_mut(id)?.caller,
3514             )
3515         } else {
3516             let id = TableId::<GuestTask>::new(rep);
3517             if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3518                 (Waitable::Guest(id), thread)
3519             } else {
3520                 unreachable!()
3521             }
3522         };
3523         // Since waitables can neither be passed between instances nor forged,
3524         // this should never fail unless there's a bug in Wasmtime, but we check
3525         // here to be sure:
3526         let concurrent_state = store.concurrent_state_mut();
3527         assert_eq!(
3528             expected_caller,
3529             concurrent_state.unwrap_current_guest_thread(),
3530         );
3531 
3532         log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3533 
3534         let needs_block;
3535         if let Waitable::Host(host_task) = waitable {
3536             let state = &mut concurrent_state.get_mut(host_task)?.state;
3537             match mem::replace(state, HostTaskState::CalleeDone) {
3538                 // If the callee is still running, signal an abort is requested.
3539                 // Then fall through to determine what to do next.
3540                 HostTaskState::CalleeRunning(handle) => handle.abort(),
3541 
3542                 // Cancellation was already requested, so fail as the task can't
3543                 // be cancelled twice.
3544                 HostTaskState::CalleeDone => {
3545                     bail!("`subtask.cancel` called after terminal status delivered");
3546                 }
3547 
3548                 // These states should not be possible for a subtask that's
3549                 // visible from the guest, so panic here.
3550                 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(),
3551             }
3552 
3553             // Cancelling host tasks always needs to block on them to await the
3554             // result of the completion set up in `first_poll`. This'll resolve
3555             // the race of `handle.abort()` above to see if it actually
3556             // cancelled something or if the future ended up finishing.
3557             needs_block = true;
3558         } else {
3559             let caller = concurrent_state.unwrap_current_guest_thread();
3560             let guest_task = TableId::<GuestTask>::new(rep);
3561             let task = concurrent_state.get_mut(guest_task)?;
3562             if !task.already_lowered_parameters() {
3563                 // The task is in a `starting` state, meaning it hasn't run at
3564                 // all yet.  Here we update its fields to indicate that it is
3565                 // ready to delete immediately once `subtask.drop` is called.
3566                 task.lower_params = None;
3567                 task.lift_result = None;
3568                 task.exited = true;
3569 
3570                 let instance = task.instance;
3571 
3572                 assert_eq!(1, task.threads.len());
3573                 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3574                 let concurrent_state = store.concurrent_state_mut();
3575                 concurrent_state.delete(thread)?;
3576                 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3577 
3578                 // Not yet started; cancel and remove from pending
3579                 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3580                 let pending_count = pending.len();
3581                 pending.retain(|thread, _| thread.task != guest_task);
3582                 // If there were no pending threads for this task, we're in an error state
3583                 if pending.len() == pending_count {
3584                     bail!("`subtask.cancel` called after terminal status delivered");
3585                 }
3586                 return Ok(Status::StartCancelled as u32);
3587             } else if !task.returned_or_cancelled() {
3588                 // Started, but not yet returned or cancelled; send the
3589                 // `CANCELLED` event
3590                 task.cancel_sent = true;
3591                 // Note that this might overwrite an event that was set earlier
3592                 // (e.g. `Event::None` if the task is yielding, or
3593                 // `Event::Cancelled` if it was already cancelled), but that's
3594                 // okay -- this should supersede the previous state.
3595                 task.event = Some(Event::Cancelled);
3596                 let runtime_instance = task.instance.index;
3597                 for thread in task.threads.clone() {
3598                     let thread = QualifiedThreadId {
3599                         task: guest_task,
3600                         thread,
3601                     };
3602                     if let Some(set) = concurrent_state
3603                         .get_mut(thread.thread)
3604                         .unwrap()
3605                         .wake_on_cancel
3606                         .take()
3607                     {
3608                         let item = match concurrent_state
3609                             .get_mut(set)?
3610                             .waiting
3611                             .remove(&thread)
3612                             .unwrap()
3613                         {
3614                             WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
3615                             WaitMode::Callback(instance) => WorkItem::GuestCall(
3616                                 runtime_instance,
3617                                 GuestCall {
3618                                     thread,
3619                                     kind: GuestCallKind::DeliverEvent {
3620                                         instance,
3621                                         set: None,
3622                                     },
3623                                 },
3624                             ),
3625                         };
3626                         concurrent_state.push_high_priority(item);
3627 
3628                         store.suspend(SuspendReason::Yielding {
3629                             thread: caller,
3630                             // `subtask.cancel` is not allowed to be called in a
3631                             // sync context, so we cannot skip the may-block check.
3632                             skip_may_block_check: false,
3633                         })?;
3634                         break;
3635                     }
3636                 }
3637 
3638                 // Guest tasks need to block if they have not yet returned or
3639                 // cancelled, even as a result of the event delivery above.
3640                 needs_block = !store
3641                     .concurrent_state_mut()
3642                     .get_mut(guest_task)?
3643                     .returned_or_cancelled()
3644             } else {
3645                 needs_block = false;
3646             }
3647         };
3648 
3649         // If we need to block waiting on the terminal status of this subtask
3650         // then return immediately in `async` mode, or otherwise wait for the
3651         // event to get signaled through the store.
3652         if needs_block {
3653             if async_ {
3654                 return Ok(BLOCKED);
3655             }
3656 
3657             // Wait for this waitable to get signaled with its terminal status
3658             // from the completion callback enqueued by `first_poll`. Once
3659             // that's done fall through to the sahred
3660             store.wait_for_event(waitable)?;
3661 
3662             // .. fall through to determine what event's in store for us.
3663         }
3664 
3665         let event = waitable.take_event(store.concurrent_state_mut())?;
3666         if let Some(Event::Subtask {
3667             status: status @ (Status::Returned | Status::ReturnCancelled),
3668         }) = event
3669         {
3670             Ok(status as u32)
3671         } else {
3672             bail!("`subtask.cancel` called after terminal status delivered");
3673         }
3674     }
3675 
3676     pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3677         store.concurrent_state_mut().context_get(slot)
3678     }
3679 
3680     pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3681         store.concurrent_state_mut().context_set(slot, value)
3682     }
3683 }
3684 
3685 /// Trait representing component model ABI async intrinsics and fused adapter
3686 /// helper functions.
3687 ///
3688 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3689 /// which must be valid for at least the duration of the call (and possibly for
3690 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3691 /// pointers used for async calls).
3692 pub trait VMComponentAsyncStore {
3693     /// A helper function for fused adapter modules involving calls where the
3694     /// one of the caller or callee is async.
3695     ///
3696     /// This helper is not used when the caller and callee both use the sync
3697     /// ABI, only when at least one is async is this used.
3698     unsafe fn prepare_call(
3699         &mut self,
3700         instance: Instance,
3701         memory: *mut VMMemoryDefinition,
3702         start: *mut VMFuncRef,
3703         return_: *mut VMFuncRef,
3704         caller_instance: RuntimeComponentInstanceIndex,
3705         callee_instance: RuntimeComponentInstanceIndex,
3706         task_return_type: TypeTupleIndex,
3707         callee_async: bool,
3708         string_encoding: u8,
3709         result_count: u32,
3710         storage: *mut ValRaw,
3711         storage_len: usize,
3712     ) -> Result<()>;
3713 
3714     /// A helper function for fused adapter modules involving calls where the
3715     /// caller is sync-lowered but the callee is async-lifted.
3716     unsafe fn sync_start(
3717         &mut self,
3718         instance: Instance,
3719         callback: *mut VMFuncRef,
3720         callee: *mut VMFuncRef,
3721         param_count: u32,
3722         storage: *mut MaybeUninit<ValRaw>,
3723         storage_len: usize,
3724     ) -> Result<()>;
3725 
3726     /// A helper function for fused adapter modules involving calls where the
3727     /// caller is async-lowered.
3728     unsafe fn async_start(
3729         &mut self,
3730         instance: Instance,
3731         callback: *mut VMFuncRef,
3732         post_return: *mut VMFuncRef,
3733         callee: *mut VMFuncRef,
3734         param_count: u32,
3735         result_count: u32,
3736         flags: u32,
3737     ) -> Result<u32>;
3738 
3739     /// The `future.write` intrinsic.
3740     fn future_write(
3741         &mut self,
3742         instance: Instance,
3743         caller: RuntimeComponentInstanceIndex,
3744         ty: TypeFutureTableIndex,
3745         options: OptionsIndex,
3746         future: u32,
3747         address: u32,
3748     ) -> Result<u32>;
3749 
3750     /// The `future.read` intrinsic.
3751     fn future_read(
3752         &mut self,
3753         instance: Instance,
3754         caller: RuntimeComponentInstanceIndex,
3755         ty: TypeFutureTableIndex,
3756         options: OptionsIndex,
3757         future: u32,
3758         address: u32,
3759     ) -> Result<u32>;
3760 
3761     /// The `future.drop-writable` intrinsic.
3762     fn future_drop_writable(
3763         &mut self,
3764         instance: Instance,
3765         ty: TypeFutureTableIndex,
3766         writer: u32,
3767     ) -> Result<()>;
3768 
3769     /// The `stream.write` intrinsic.
3770     fn stream_write(
3771         &mut self,
3772         instance: Instance,
3773         caller: RuntimeComponentInstanceIndex,
3774         ty: TypeStreamTableIndex,
3775         options: OptionsIndex,
3776         stream: u32,
3777         address: u32,
3778         count: u32,
3779     ) -> Result<u32>;
3780 
3781     /// The `stream.read` intrinsic.
3782     fn stream_read(
3783         &mut self,
3784         instance: Instance,
3785         caller: RuntimeComponentInstanceIndex,
3786         ty: TypeStreamTableIndex,
3787         options: OptionsIndex,
3788         stream: u32,
3789         address: u32,
3790         count: u32,
3791     ) -> Result<u32>;
3792 
3793     /// The "fast-path" implementation of the `stream.write` intrinsic for
3794     /// "flat" (i.e. memcpy-able) payloads.
3795     fn flat_stream_write(
3796         &mut self,
3797         instance: Instance,
3798         caller: RuntimeComponentInstanceIndex,
3799         ty: TypeStreamTableIndex,
3800         options: OptionsIndex,
3801         payload_size: u32,
3802         payload_align: u32,
3803         stream: u32,
3804         address: u32,
3805         count: u32,
3806     ) -> Result<u32>;
3807 
3808     /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3809     /// (i.e. memcpy-able) payloads.
3810     fn flat_stream_read(
3811         &mut self,
3812         instance: Instance,
3813         caller: RuntimeComponentInstanceIndex,
3814         ty: TypeStreamTableIndex,
3815         options: OptionsIndex,
3816         payload_size: u32,
3817         payload_align: u32,
3818         stream: u32,
3819         address: u32,
3820         count: u32,
3821     ) -> Result<u32>;
3822 
3823     /// The `stream.drop-writable` intrinsic.
3824     fn stream_drop_writable(
3825         &mut self,
3826         instance: Instance,
3827         ty: TypeStreamTableIndex,
3828         writer: u32,
3829     ) -> Result<()>;
3830 
3831     /// The `error-context.debug-message` intrinsic.
3832     fn error_context_debug_message(
3833         &mut self,
3834         instance: Instance,
3835         ty: TypeComponentLocalErrorContextTableIndex,
3836         options: OptionsIndex,
3837         err_ctx_handle: u32,
3838         debug_msg_address: u32,
3839     ) -> Result<()>;
3840 
3841     /// The `thread.new-indirect` intrinsic
3842     fn thread_new_indirect(
3843         &mut self,
3844         instance: Instance,
3845         caller: RuntimeComponentInstanceIndex,
3846         func_ty_idx: TypeFuncIndex,
3847         start_func_table_idx: RuntimeTableIndex,
3848         start_func_idx: u32,
3849         context: i32,
3850     ) -> Result<u32>;
3851 }
3852 
3853 /// SAFETY: See trait docs.
3854 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
3855     unsafe fn prepare_call(
3856         &mut self,
3857         instance: Instance,
3858         memory: *mut VMMemoryDefinition,
3859         start: *mut VMFuncRef,
3860         return_: *mut VMFuncRef,
3861         caller_instance: RuntimeComponentInstanceIndex,
3862         callee_instance: RuntimeComponentInstanceIndex,
3863         task_return_type: TypeTupleIndex,
3864         callee_async: bool,
3865         string_encoding: u8,
3866         result_count_or_max_if_async: u32,
3867         storage: *mut ValRaw,
3868         storage_len: usize,
3869     ) -> Result<()> {
3870         // SAFETY: The `wasmtime_cranelift`-generated code that calls
3871         // this method will have ensured that `storage` is a valid
3872         // pointer containing at least `storage_len` items.
3873         let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3874 
3875         unsafe {
3876             instance.prepare_call(
3877                 StoreContextMut(self),
3878                 start,
3879                 return_,
3880                 caller_instance,
3881                 callee_instance,
3882                 task_return_type,
3883                 callee_async,
3884                 memory,
3885                 string_encoding,
3886                 match result_count_or_max_if_async {
3887                     PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3888                         params,
3889                         has_result: false,
3890                     },
3891                     PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3892                         params,
3893                         has_result: true,
3894                     },
3895                     result_count => CallerInfo::Sync {
3896                         params,
3897                         result_count,
3898                     },
3899                 },
3900             )
3901         }
3902     }
3903 
3904     unsafe fn sync_start(
3905         &mut self,
3906         instance: Instance,
3907         callback: *mut VMFuncRef,
3908         callee: *mut VMFuncRef,
3909         param_count: u32,
3910         storage: *mut MaybeUninit<ValRaw>,
3911         storage_len: usize,
3912     ) -> Result<()> {
3913         unsafe {
3914             instance
3915                 .start_call(
3916                     StoreContextMut(self),
3917                     callback,
3918                     ptr::null_mut(),
3919                     callee,
3920                     param_count,
3921                     1,
3922                     START_FLAG_ASYNC_CALLEE,
3923                     // SAFETY: The `wasmtime_cranelift`-generated code that calls
3924                     // this method will have ensured that `storage` is a valid
3925                     // pointer containing at least `storage_len` items.
3926                     Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3927                 )
3928                 .map(drop)
3929         }
3930     }
3931 
3932     unsafe fn async_start(
3933         &mut self,
3934         instance: Instance,
3935         callback: *mut VMFuncRef,
3936         post_return: *mut VMFuncRef,
3937         callee: *mut VMFuncRef,
3938         param_count: u32,
3939         result_count: u32,
3940         flags: u32,
3941     ) -> Result<u32> {
3942         unsafe {
3943             instance.start_call(
3944                 StoreContextMut(self),
3945                 callback,
3946                 post_return,
3947                 callee,
3948                 param_count,
3949                 result_count,
3950                 flags,
3951                 None,
3952             )
3953         }
3954     }
3955 
3956     fn future_write(
3957         &mut self,
3958         instance: Instance,
3959         caller: RuntimeComponentInstanceIndex,
3960         ty: TypeFutureTableIndex,
3961         options: OptionsIndex,
3962         future: u32,
3963         address: u32,
3964     ) -> Result<u32> {
3965         instance
3966             .guest_write(
3967                 StoreContextMut(self),
3968                 caller,
3969                 TransmitIndex::Future(ty),
3970                 options,
3971                 None,
3972                 future,
3973                 address,
3974                 1,
3975             )
3976             .map(|result| result.encode())
3977     }
3978 
3979     fn future_read(
3980         &mut self,
3981         instance: Instance,
3982         caller: RuntimeComponentInstanceIndex,
3983         ty: TypeFutureTableIndex,
3984         options: OptionsIndex,
3985         future: u32,
3986         address: u32,
3987     ) -> Result<u32> {
3988         instance
3989             .guest_read(
3990                 StoreContextMut(self),
3991                 caller,
3992                 TransmitIndex::Future(ty),
3993                 options,
3994                 None,
3995                 future,
3996                 address,
3997                 1,
3998             )
3999             .map(|result| result.encode())
4000     }
4001 
4002     fn stream_write(
4003         &mut self,
4004         instance: Instance,
4005         caller: RuntimeComponentInstanceIndex,
4006         ty: TypeStreamTableIndex,
4007         options: OptionsIndex,
4008         stream: u32,
4009         address: u32,
4010         count: u32,
4011     ) -> Result<u32> {
4012         instance
4013             .guest_write(
4014                 StoreContextMut(self),
4015                 caller,
4016                 TransmitIndex::Stream(ty),
4017                 options,
4018                 None,
4019                 stream,
4020                 address,
4021                 count,
4022             )
4023             .map(|result| result.encode())
4024     }
4025 
4026     fn stream_read(
4027         &mut self,
4028         instance: Instance,
4029         caller: RuntimeComponentInstanceIndex,
4030         ty: TypeStreamTableIndex,
4031         options: OptionsIndex,
4032         stream: u32,
4033         address: u32,
4034         count: u32,
4035     ) -> Result<u32> {
4036         instance
4037             .guest_read(
4038                 StoreContextMut(self),
4039                 caller,
4040                 TransmitIndex::Stream(ty),
4041                 options,
4042                 None,
4043                 stream,
4044                 address,
4045                 count,
4046             )
4047             .map(|result| result.encode())
4048     }
4049 
4050     fn future_drop_writable(
4051         &mut self,
4052         instance: Instance,
4053         ty: TypeFutureTableIndex,
4054         writer: u32,
4055     ) -> Result<()> {
4056         instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4057     }
4058 
4059     fn flat_stream_write(
4060         &mut self,
4061         instance: Instance,
4062         caller: RuntimeComponentInstanceIndex,
4063         ty: TypeStreamTableIndex,
4064         options: OptionsIndex,
4065         payload_size: u32,
4066         payload_align: u32,
4067         stream: u32,
4068         address: u32,
4069         count: u32,
4070     ) -> Result<u32> {
4071         instance
4072             .guest_write(
4073                 StoreContextMut(self),
4074                 caller,
4075                 TransmitIndex::Stream(ty),
4076                 options,
4077                 Some(FlatAbi {
4078                     size: payload_size,
4079                     align: payload_align,
4080                 }),
4081                 stream,
4082                 address,
4083                 count,
4084             )
4085             .map(|result| result.encode())
4086     }
4087 
4088     fn flat_stream_read(
4089         &mut self,
4090         instance: Instance,
4091         caller: RuntimeComponentInstanceIndex,
4092         ty: TypeStreamTableIndex,
4093         options: OptionsIndex,
4094         payload_size: u32,
4095         payload_align: u32,
4096         stream: u32,
4097         address: u32,
4098         count: u32,
4099     ) -> Result<u32> {
4100         instance
4101             .guest_read(
4102                 StoreContextMut(self),
4103                 caller,
4104                 TransmitIndex::Stream(ty),
4105                 options,
4106                 Some(FlatAbi {
4107                     size: payload_size,
4108                     align: payload_align,
4109                 }),
4110                 stream,
4111                 address,
4112                 count,
4113             )
4114             .map(|result| result.encode())
4115     }
4116 
4117     fn stream_drop_writable(
4118         &mut self,
4119         instance: Instance,
4120         ty: TypeStreamTableIndex,
4121         writer: u32,
4122     ) -> Result<()> {
4123         instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4124     }
4125 
4126     fn error_context_debug_message(
4127         &mut self,
4128         instance: Instance,
4129         ty: TypeComponentLocalErrorContextTableIndex,
4130         options: OptionsIndex,
4131         err_ctx_handle: u32,
4132         debug_msg_address: u32,
4133     ) -> Result<()> {
4134         instance.error_context_debug_message(
4135             StoreContextMut(self),
4136             ty,
4137             options,
4138             err_ctx_handle,
4139             debug_msg_address,
4140         )
4141     }
4142 
4143     fn thread_new_indirect(
4144         &mut self,
4145         instance: Instance,
4146         caller: RuntimeComponentInstanceIndex,
4147         func_ty_idx: TypeFuncIndex,
4148         start_func_table_idx: RuntimeTableIndex,
4149         start_func_idx: u32,
4150         context: i32,
4151     ) -> Result<u32> {
4152         instance.thread_new_indirect(
4153             StoreContextMut(self),
4154             caller,
4155             func_ty_idx,
4156             start_func_table_idx,
4157             start_func_idx,
4158             context,
4159         )
4160     }
4161 }
4162 
4163 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4164 
4165 /// Represents the state of a pending host task.
4166 ///
4167 /// This is used to represent tasks when the guest calls into the host.
4168 struct HostTask {
4169     common: WaitableCommon,
4170 
4171     /// Guest thread which called the host.
4172     caller: QualifiedThreadId,
4173 
4174     /// State of borrows/etc the host needs to track. Used when the guest passes
4175     /// borrows to the host, for example.
4176     call_context: CallContext,
4177 
4178     state: HostTaskState,
4179 }
4180 
4181 enum HostTaskState {
4182     /// A host task has been created and it's considered "started".
4183     ///
4184     /// The host task has yet to enter `first_poll` or `poll_and_block` which
4185     /// is where this will get updated further.
4186     CalleeStarted,
4187 
4188     /// State used for tasks in `first_poll` meaning that the guest did an async
4189     /// lower of a host async function which is blocked. The specified handle is
4190     /// linked to the future in the main `FuturesUnordered` of a store which is
4191     /// used to cancel it if the guest requests cancellation.
4192     CalleeRunning(JoinHandle),
4193 
4194     /// Terminal state used for tasks in `poll_and_block` to store the result of
4195     /// their computation. Note that this state is not used for tasks in
4196     /// `first_poll`.
4197     CalleeFinished(LiftedResult),
4198 
4199     /// Terminal state for host tasks meaning that the task was cancelled or the
4200     /// result was taken.
4201     CalleeDone,
4202 }
4203 
4204 impl HostTask {
4205     fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4206         Self {
4207             common: WaitableCommon::default(),
4208             call_context: CallContext::default(),
4209             caller,
4210             state,
4211         }
4212     }
4213 }
4214 
4215 impl TableDebug for HostTask {
4216     fn type_name() -> &'static str {
4217         "HostTask"
4218     }
4219 }
4220 
4221 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4222 
4223 /// Represents the caller of a given guest task.
4224 enum Caller {
4225     /// The host called the guest task.
4226     Host {
4227         /// If present, may be used to deliver the result.
4228         tx: Option<oneshot::Sender<LiftedResult>>,
4229         /// If true, there's a host future that must be dropped before the task
4230         /// can be deleted.
4231         host_future_present: bool,
4232         /// Represents the caller of the host function which called back into a
4233         /// guest. Note that this thread could belong to an entirely unrelated
4234         /// top-level component instance than the one the host called into.
4235         caller: CurrentThread,
4236     },
4237     /// Another guest thread called the guest task
4238     Guest {
4239         /// The id of the caller
4240         thread: QualifiedThreadId,
4241     },
4242 }
4243 
4244 /// Represents a closure and related canonical ABI parameters required to
4245 /// validate a `task.return` call at runtime and lift the result.
4246 struct LiftResult {
4247     lift: RawLift,
4248     ty: TypeTupleIndex,
4249     memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4250     string_encoding: StringEncoding,
4251 }
4252 
4253 /// The table ID for a guest thread, qualified by the task to which it belongs.
4254 ///
4255 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4256 /// for the common case of identifying the task to which a thread belongs.
4257 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4258 struct QualifiedThreadId {
4259     task: TableId<GuestTask>,
4260     thread: TableId<GuestThread>,
4261 }
4262 
4263 impl QualifiedThreadId {
4264     fn qualify(
4265         state: &mut ConcurrentState,
4266         thread: TableId<GuestThread>,
4267     ) -> Result<QualifiedThreadId> {
4268         Ok(QualifiedThreadId {
4269             task: state.get_mut(thread)?.parent_task,
4270             thread,
4271         })
4272     }
4273 }
4274 
4275 impl fmt::Debug for QualifiedThreadId {
4276     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4277         f.debug_tuple("QualifiedThreadId")
4278             .field(&self.task.rep())
4279             .field(&self.thread.rep())
4280             .finish()
4281     }
4282 }
4283 
4284 enum GuestThreadState {
4285     NotStartedImplicit,
4286     NotStartedExplicit(
4287         Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4288     ),
4289     Running,
4290     Suspended(StoreFiber<'static>),
4291     Ready(StoreFiber<'static>),
4292     Completed,
4293 }
4294 pub struct GuestThread {
4295     /// Context-local state used to implement the `context.{get,set}`
4296     /// intrinsics.
4297     context: [u32; 2],
4298     /// The owning guest task.
4299     parent_task: TableId<GuestTask>,
4300     /// If present, indicates that the thread is currently waiting on the
4301     /// specified set but may be cancelled and woken immediately.
4302     wake_on_cancel: Option<TableId<WaitableSet>>,
4303     /// The execution state of this guest thread
4304     state: GuestThreadState,
4305     /// The index of this thread in the component instance's handle table.
4306     /// This must always be `Some` after initialization.
4307     instance_rep: Option<u32>,
4308 }
4309 
4310 impl GuestThread {
4311     /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4312     /// handle.
4313     fn from_instance(
4314         state: Pin<&mut ComponentInstance>,
4315         caller_instance: RuntimeComponentInstanceIndex,
4316         guest_thread: u32,
4317     ) -> Result<TableId<Self>> {
4318         let rep = state.instance_states().0[caller_instance]
4319             .thread_handle_table()
4320             .guest_thread_rep(guest_thread)?;
4321         Ok(TableId::new(rep))
4322     }
4323 
4324     fn new_implicit(parent_task: TableId<GuestTask>) -> Self {
4325         Self {
4326             context: [0; 2],
4327             parent_task,
4328             wake_on_cancel: None,
4329             state: GuestThreadState::NotStartedImplicit,
4330             instance_rep: None,
4331         }
4332     }
4333 
4334     fn new_explicit(
4335         parent_task: TableId<GuestTask>,
4336         start_func: Box<
4337             dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4338         >,
4339     ) -> Self {
4340         Self {
4341             context: [0; 2],
4342             parent_task,
4343             wake_on_cancel: None,
4344             state: GuestThreadState::NotStartedExplicit(start_func),
4345             instance_rep: None,
4346         }
4347     }
4348 }
4349 
4350 impl TableDebug for GuestThread {
4351     fn type_name() -> &'static str {
4352         "GuestThread"
4353     }
4354 }
4355 
4356 enum SyncResult {
4357     NotProduced,
4358     Produced(Option<ValRaw>),
4359     Taken,
4360 }
4361 
4362 impl SyncResult {
4363     fn take(&mut self) -> Option<Option<ValRaw>> {
4364         match mem::replace(self, SyncResult::Taken) {
4365             SyncResult::NotProduced => None,
4366             SyncResult::Produced(val) => Some(val),
4367             SyncResult::Taken => {
4368                 panic!("attempted to take a synchronous result that was already taken")
4369             }
4370         }
4371     }
4372 }
4373 
4374 #[derive(Debug)]
4375 enum HostFutureState {
4376     NotApplicable,
4377     Live,
4378     Dropped,
4379 }
4380 
4381 /// Represents a pending guest task.
4382 pub(crate) struct GuestTask {
4383     /// See `WaitableCommon`
4384     common: WaitableCommon,
4385     /// Closure to lower the parameters passed to this task.
4386     lower_params: Option<RawLower>,
4387     /// See `LiftResult`
4388     lift_result: Option<LiftResult>,
4389     /// A place to stash the type-erased lifted result if it can't be delivered
4390     /// immediately.
4391     result: Option<LiftedResult>,
4392     /// Closure to call the callback function for an async-lifted export, if
4393     /// provided.
4394     callback: Option<CallbackFn>,
4395     /// See `Caller`
4396     caller: Caller,
4397     /// Borrow state for this task.
4398     ///
4399     /// Keeps track of `borrow<T>` received to this task to ensure that
4400     /// everything is dropped by the time it exits.
4401     call_context: CallContext,
4402     /// A place to stash the lowered result for a sync-to-async call until it
4403     /// can be returned to the caller.
4404     sync_result: SyncResult,
4405     /// Whether or not the task has been cancelled (i.e. whether the task is
4406     /// permitted to call `task.cancel`).
4407     cancel_sent: bool,
4408     /// Whether or not we've sent a `Status::Starting` event to any current or
4409     /// future waiters for this waitable.
4410     starting_sent: bool,
4411     /// Scratch waitable set used to watch subtasks during synchronous calls.
4412     sync_call_set: TableId<WaitableSet>,
4413     /// The runtime instance to which the exported function for this guest task
4414     /// belongs.
4415     ///
4416     /// Note that the task may do a sync->sync call via a fused adapter which
4417     /// results in that task executing code in a different instance, and it may
4418     /// call host functions and intrinsics from that other instance.
4419     instance: RuntimeInstance,
4420     /// If present, a pending `Event::None` or `Event::Cancelled` to be
4421     /// delivered to this task.
4422     event: Option<Event>,
4423     /// Whether or not the task has exited.
4424     exited: bool,
4425     /// Threads belonging to this task
4426     threads: HashSet<TableId<GuestThread>>,
4427     /// The state of the host future that represents an async task, which must
4428     /// be dropped before we can delete the task.
4429     host_future_state: HostFutureState,
4430     /// Indicates whether this task was created for a call to an async-lifted
4431     /// export.
4432     async_function: bool,
4433 }
4434 
4435 impl GuestTask {
4436     fn already_lowered_parameters(&self) -> bool {
4437         // We reset `lower_params` after we lower the parameters
4438         self.lower_params.is_none()
4439     }
4440 
4441     fn returned_or_cancelled(&self) -> bool {
4442         // We reset `lift_result` after we return or exit
4443         self.lift_result.is_none()
4444     }
4445 
4446     fn ready_to_delete(&self) -> bool {
4447         let threads_completed = self.threads.is_empty();
4448         let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4449         let pending_completion_event = matches!(
4450             self.common.event,
4451             Some(Event::Subtask {
4452                 status: Status::Returned | Status::ReturnCancelled
4453             })
4454         );
4455         let ready = threads_completed
4456             && !has_sync_result
4457             && !pending_completion_event
4458             && !matches!(self.host_future_state, HostFutureState::Live);
4459         log::trace!(
4460             "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4461             threads_completed,
4462             has_sync_result,
4463             pending_completion_event,
4464             self.host_future_state
4465         );
4466         ready
4467     }
4468 
4469     fn new(
4470         state: &mut ConcurrentState,
4471         lower_params: RawLower,
4472         lift_result: LiftResult,
4473         caller: Caller,
4474         callback: Option<CallbackFn>,
4475         instance: RuntimeInstance,
4476         async_function: bool,
4477     ) -> Result<Self> {
4478         let sync_call_set = state.push(WaitableSet::default())?;
4479         let host_future_state = match &caller {
4480             Caller::Guest { .. } => HostFutureState::NotApplicable,
4481             Caller::Host {
4482                 host_future_present,
4483                 ..
4484             } => {
4485                 if *host_future_present {
4486                     HostFutureState::Live
4487                 } else {
4488                     HostFutureState::NotApplicable
4489                 }
4490             }
4491         };
4492         Ok(Self {
4493             common: WaitableCommon::default(),
4494             lower_params: Some(lower_params),
4495             lift_result: Some(lift_result),
4496             result: None,
4497             callback,
4498             caller,
4499             call_context: CallContext::default(),
4500             sync_result: SyncResult::NotProduced,
4501             cancel_sent: false,
4502             starting_sent: false,
4503             sync_call_set,
4504             instance,
4505             event: None,
4506             exited: false,
4507             threads: HashSet::new(),
4508             host_future_state,
4509             async_function,
4510         })
4511     }
4512 
4513     /// Dispose of this guest task, reparenting any pending subtasks to the
4514     /// caller.
4515     fn dispose(self, state: &mut ConcurrentState) -> Result<()> {
4516         // If there are not-yet-delivered completion events for subtasks in
4517         // `self.sync_call_set`, recursively dispose of those subtasks as well.
4518         for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) {
4519             if let Some(Event::Subtask {
4520                 status: Status::Returned | Status::ReturnCancelled,
4521             }) = waitable.common(state)?.event
4522             {
4523                 waitable.delete_from(state)?;
4524             }
4525         }
4526 
4527         assert!(self.threads.is_empty());
4528 
4529         state.delete(self.sync_call_set)?;
4530 
4531         Ok(())
4532     }
4533 }
4534 
4535 impl TableDebug for GuestTask {
4536     fn type_name() -> &'static str {
4537         "GuestTask"
4538     }
4539 }
4540 
4541 /// Represents state common to all kinds of waitables.
4542 #[derive(Default)]
4543 struct WaitableCommon {
4544     /// The currently pending event for this waitable, if any.
4545     event: Option<Event>,
4546     /// The set to which this waitable belongs, if any.
4547     set: Option<TableId<WaitableSet>>,
4548     /// The handle with which the guest refers to this waitable, if any.
4549     handle: Option<u32>,
4550 }
4551 
4552 /// Represents a Component Model Async `waitable`.
4553 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4554 enum Waitable {
4555     /// A host task
4556     Host(TableId<HostTask>),
4557     /// A guest task
4558     Guest(TableId<GuestTask>),
4559     /// The read or write end of a stream or future
4560     Transmit(TableId<TransmitHandle>),
4561 }
4562 
4563 impl Waitable {
4564     /// Retrieve the `Waitable` corresponding to the specified guest-visible
4565     /// handle.
4566     fn from_instance(
4567         state: Pin<&mut ComponentInstance>,
4568         caller_instance: RuntimeComponentInstanceIndex,
4569         waitable: u32,
4570     ) -> Result<Self> {
4571         use crate::runtime::vm::component::Waitable;
4572 
4573         let (waitable, kind) = state.instance_states().0[caller_instance]
4574             .handle_table()
4575             .waitable_rep(waitable)?;
4576 
4577         Ok(match kind {
4578             Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4579             Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4580             Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4581         })
4582     }
4583 
4584     /// Retrieve the host-visible identifier for this `Waitable`.
4585     fn rep(&self) -> u32 {
4586         match self {
4587             Self::Host(id) => id.rep(),
4588             Self::Guest(id) => id.rep(),
4589             Self::Transmit(id) => id.rep(),
4590         }
4591     }
4592 
4593     /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4594     /// remove it from any set it may currently belong to (when `set` is
4595     /// `None`).
4596     fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4597         log::trace!("waitable {self:?} join set {set:?}",);
4598 
4599         let old = mem::replace(&mut self.common(state)?.set, set);
4600 
4601         if let Some(old) = old {
4602             match *self {
4603                 Waitable::Host(id) => state.remove_child(id, old),
4604                 Waitable::Guest(id) => state.remove_child(id, old),
4605                 Waitable::Transmit(id) => state.remove_child(id, old),
4606             }?;
4607 
4608             state.get_mut(old)?.ready.remove(self);
4609         }
4610 
4611         if let Some(set) = set {
4612             match *self {
4613                 Waitable::Host(id) => state.add_child(id, set),
4614                 Waitable::Guest(id) => state.add_child(id, set),
4615                 Waitable::Transmit(id) => state.add_child(id, set),
4616             }?;
4617 
4618             if self.common(state)?.event.is_some() {
4619                 self.mark_ready(state)?;
4620             }
4621         }
4622 
4623         Ok(())
4624     }
4625 
4626     /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
4627     fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4628         Ok(match self {
4629             Self::Host(id) => &mut state.get_mut(*id)?.common,
4630             Self::Guest(id) => &mut state.get_mut(*id)?.common,
4631             Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4632         })
4633     }
4634 
4635     /// Set or clear the pending event for this waitable and either deliver it
4636     /// to the first waiter, if any, or mark it as ready to be delivered to the
4637     /// next waiter that arrives.
4638     fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4639         log::trace!("set event for {self:?}: {event:?}");
4640         self.common(state)?.event = event;
4641         self.mark_ready(state)
4642     }
4643 
4644     /// Take the pending event from this waitable, leaving `None` in its place.
4645     fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4646         let common = self.common(state)?;
4647         let event = common.event.take();
4648         if let Some(set) = self.common(state)?.set {
4649             state.get_mut(set)?.ready.remove(self);
4650         }
4651 
4652         Ok(event)
4653     }
4654 
4655     /// Deliver the current event for this waitable to the first waiter, if any,
4656     /// or else mark it as ready to be delivered to the next waiter that
4657     /// arrives.
4658     fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4659         if let Some(set) = self.common(state)?.set {
4660             state.get_mut(set)?.ready.insert(*self);
4661             if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4662                 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4663                 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4664 
4665                 let item = match mode {
4666                     WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4667                     WaitMode::Callback(instance) => WorkItem::GuestCall(
4668                         state.get_mut(thread.task)?.instance.index,
4669                         GuestCall {
4670                             thread,
4671                             kind: GuestCallKind::DeliverEvent {
4672                                 instance,
4673                                 set: Some(set),
4674                             },
4675                         },
4676                     ),
4677                 };
4678                 state.push_high_priority(item);
4679             }
4680         }
4681         Ok(())
4682     }
4683 
4684     /// Remove this waitable from the instance's rep table.
4685     fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4686         match self {
4687             Self::Host(task) => {
4688                 log::trace!("delete host task {task:?}");
4689                 state.delete(*task)?;
4690             }
4691             Self::Guest(task) => {
4692                 log::trace!("delete guest task {task:?}");
4693                 state.delete(*task)?.dispose(state)?;
4694             }
4695             Self::Transmit(task) => {
4696                 state.delete(*task)?;
4697             }
4698         }
4699 
4700         Ok(())
4701     }
4702 }
4703 
4704 impl fmt::Debug for Waitable {
4705     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4706         match self {
4707             Self::Host(id) => write!(f, "{id:?}"),
4708             Self::Guest(id) => write!(f, "{id:?}"),
4709             Self::Transmit(id) => write!(f, "{id:?}"),
4710         }
4711     }
4712 }
4713 
4714 /// Represents a Component Model Async `waitable-set`.
4715 #[derive(Default)]
4716 struct WaitableSet {
4717     /// Which waitables in this set have pending events, if any.
4718     ready: BTreeSet<Waitable>,
4719     /// Which guest threads are currently waiting on this set, if any.
4720     waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4721 }
4722 
4723 impl TableDebug for WaitableSet {
4724     fn type_name() -> &'static str {
4725         "WaitableSet"
4726     }
4727 }
4728 
4729 /// Type-erased closure to lower the parameters for a guest task.
4730 type RawLower =
4731     Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4732 
4733 /// Type-erased closure to lift the result for a guest task.
4734 type RawLift = Box<
4735     dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4736 >;
4737 
4738 /// Type erased result of a guest task which may be downcast to the expected
4739 /// type by a host caller (or simply ignored in the case of a guest caller; see
4740 /// `DummyResult`).
4741 type LiftedResult = Box<dyn Any + Send + Sync>;
4742 
4743 /// Used to return a result from a `LiftFn` when the actual result has already
4744 /// been lowered to a guest task's stack and linear memory.
4745 struct DummyResult;
4746 
4747 /// Represents the Component Model Async state of a (sub-)component instance.
4748 #[derive(Default)]
4749 pub struct ConcurrentInstanceState {
4750     /// Whether backpressure is set for this instance (enabled if >0)
4751     backpressure: u16,
4752     /// Whether this instance can be entered
4753     do_not_enter: bool,
4754     /// Pending calls for this instance which require `Self::backpressure` to be
4755     /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4756     pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4757 }
4758 
4759 impl ConcurrentInstanceState {
4760     pub fn pending_is_empty(&self) -> bool {
4761         self.pending.is_empty()
4762     }
4763 }
4764 
4765 #[derive(Debug, Copy, Clone)]
4766 enum CurrentThread {
4767     Guest(QualifiedThreadId),
4768     Host(TableId<HostTask>),
4769     None,
4770 }
4771 
4772 impl CurrentThread {
4773     fn guest(&self) -> Option<&QualifiedThreadId> {
4774         match self {
4775             Self::Guest(id) => Some(id),
4776             _ => None,
4777         }
4778     }
4779 
4780     fn host(&self) -> Option<TableId<HostTask>> {
4781         match self {
4782             Self::Host(id) => Some(*id),
4783             _ => None,
4784         }
4785     }
4786 
4787     fn is_none(&self) -> bool {
4788         matches!(self, Self::None)
4789     }
4790 }
4791 
4792 impl From<QualifiedThreadId> for CurrentThread {
4793     fn from(id: QualifiedThreadId) -> Self {
4794         Self::Guest(id)
4795     }
4796 }
4797 
4798 impl From<TableId<HostTask>> for CurrentThread {
4799     fn from(id: TableId<HostTask>) -> Self {
4800         Self::Host(id)
4801     }
4802 }
4803 
4804 /// Represents the Component Model Async state of a store.
4805 pub struct ConcurrentState {
4806     /// The currently running thread, if any.
4807     current_thread: CurrentThread,
4808 
4809     /// The set of pending host and background tasks, if any.
4810     ///
4811     /// See `ComponentInstance::poll_until` for where we temporarily take this
4812     /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4813     futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4814     /// The table of waitables, waitable sets, etc.
4815     table: AlwaysMut<ResourceTable>,
4816     /// The "high priority" work queue for this store's event loop.
4817     high_priority: Vec<WorkItem>,
4818     /// The "low priority" work queue for this store's event loop.
4819     low_priority: VecDeque<WorkItem>,
4820     /// A place to stash the reason a fiber is suspending so that the code which
4821     /// resumed it will know under what conditions the fiber should be resumed
4822     /// again.
4823     suspend_reason: Option<SuspendReason>,
4824     /// A cached fiber which is waiting for work to do.
4825     ///
4826     /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4827     worker: Option<StoreFiber<'static>>,
4828     /// A place to stash the work item for which we're resuming a worker fiber.
4829     worker_item: Option<WorkerItem>,
4830 
4831     /// Reference counts for all component error contexts
4832     ///
4833     /// NOTE: it is possible the global ref count to be *greater* than the sum of
4834     /// (sub)component ref counts as tracked by `error_context_tables`, for
4835     /// example when the host holds one or more references to error contexts.
4836     ///
4837     /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4838     /// component-wide representation) of the index into concurrent state for a given
4839     /// stored `ErrorContext`.
4840     ///
4841     /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4842     /// as a `TableId<ErrorContextState>`.
4843     global_error_context_ref_counts:
4844         BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4845 }
4846 
4847 impl Default for ConcurrentState {
4848     fn default() -> Self {
4849         Self {
4850             current_thread: CurrentThread::None,
4851             table: AlwaysMut::new(ResourceTable::new()),
4852             futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4853             high_priority: Vec::new(),
4854             low_priority: VecDeque::new(),
4855             suspend_reason: None,
4856             worker: None,
4857             worker_item: None,
4858             global_error_context_ref_counts: BTreeMap::new(),
4859         }
4860     }
4861 }
4862 
4863 impl ConcurrentState {
4864     /// Take ownership of any fibers and futures owned by this object.
4865     ///
4866     /// This should be used when disposing of the `Store` containing this object
4867     /// in order to gracefully resolve any and all fibers using
4868     /// `StoreFiber::dispose`.  This is necessary to avoid possible
4869     /// use-after-free bugs due to fibers which may still have access to the
4870     /// `Store`.
4871     ///
4872     /// Additionally, the futures collected with this function should be dropped
4873     /// within a `tls::set` call, which will ensure than any futures closing
4874     /// over an `&Accessor` will have access to the store when dropped, allowing
4875     /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4876     /// panicking.
4877     ///
4878     /// Note that this will leave the object in an inconsistent and unusable
4879     /// state, so it should only be used just prior to dropping it.
4880     pub(crate) fn take_fibers_and_futures(
4881         &mut self,
4882         fibers: &mut Vec<StoreFiber<'static>>,
4883         futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4884     ) {
4885         for entry in self.table.get_mut().iter_mut() {
4886             if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4887                 for mode in mem::take(&mut set.waiting).into_values() {
4888                     if let WaitMode::Fiber(fiber) = mode {
4889                         fibers.push(fiber);
4890                     }
4891                 }
4892             } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4893                 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4894                     mem::replace(&mut thread.state, GuestThreadState::Completed)
4895                 {
4896                     fibers.push(fiber);
4897                 }
4898             }
4899         }
4900 
4901         if let Some(fiber) = self.worker.take() {
4902             fibers.push(fiber);
4903         }
4904 
4905         let mut handle_item = |item| match item {
4906             WorkItem::ResumeFiber(fiber) => {
4907                 fibers.push(fiber);
4908             }
4909             WorkItem::PushFuture(future) => {
4910                 self.futures
4911                     .get_mut()
4912                     .as_mut()
4913                     .unwrap()
4914                     .push(future.into_inner());
4915             }
4916             _ => {}
4917         };
4918 
4919         for item in mem::take(&mut self.high_priority) {
4920             handle_item(item);
4921         }
4922         for item in mem::take(&mut self.low_priority) {
4923             handle_item(item);
4924         }
4925 
4926         if let Some(them) = self.futures.get_mut().take() {
4927             futures.push(them);
4928         }
4929     }
4930 
4931     /// Collect the next set of work items to run. This will be either all
4932     /// high-priority items, or a single low-priority item if there are no
4933     /// high-priority items.
4934     fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4935         let mut ready = mem::take(&mut self.high_priority);
4936         if ready.is_empty() {
4937             if let Some(item) = self.low_priority.pop_back() {
4938                 ready.push(item);
4939             }
4940         }
4941         ready
4942     }
4943 
4944     fn push<V: Send + Sync + 'static>(
4945         &mut self,
4946         value: V,
4947     ) -> Result<TableId<V>, ResourceTableError> {
4948         self.table.get_mut().push(value).map(TableId::from)
4949     }
4950 
4951     fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
4952         self.table.get_mut().get_mut(&Resource::from(id))
4953     }
4954 
4955     pub fn add_child<T: 'static, U: 'static>(
4956         &mut self,
4957         child: TableId<T>,
4958         parent: TableId<U>,
4959     ) -> Result<(), ResourceTableError> {
4960         self.table
4961             .get_mut()
4962             .add_child(Resource::from(child), Resource::from(parent))
4963     }
4964 
4965     pub fn remove_child<T: 'static, U: 'static>(
4966         &mut self,
4967         child: TableId<T>,
4968         parent: TableId<U>,
4969     ) -> Result<(), ResourceTableError> {
4970         self.table
4971             .get_mut()
4972             .remove_child(Resource::from(child), Resource::from(parent))
4973     }
4974 
4975     fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
4976         self.table.get_mut().delete(Resource::from(id))
4977     }
4978 
4979     fn push_future(&mut self, future: HostTaskFuture) {
4980         // Note that we can't directly push to `ConcurrentState::futures` here
4981         // since this may be called from a future that's being polled inside
4982         // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
4983         // so it has exclusive access while polling it.  Therefore, we push a
4984         // work item to the "high priority" queue, which will actually push to
4985         // `ConcurrentState::futures` later.
4986         self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
4987     }
4988 
4989     fn push_high_priority(&mut self, item: WorkItem) {
4990         log::trace!("push high priority: {item:?}");
4991         self.high_priority.push(item);
4992     }
4993 
4994     fn push_low_priority(&mut self, item: WorkItem) {
4995         log::trace!("push low priority: {item:?}");
4996         self.low_priority.push_front(item);
4997     }
4998 
4999     fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5000         if high_priority {
5001             self.push_high_priority(item);
5002         } else {
5003             self.push_low_priority(item);
5004         }
5005     }
5006 
5007     fn promote_instance_local_thread_work_item(
5008         &mut self,
5009         current_instance: RuntimeComponentInstanceIndex,
5010     ) -> bool {
5011         self.promote_work_items_matching(|item: &WorkItem| match item {
5012             WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5013                 *instance == current_instance
5014             }
5015             _ => false,
5016         })
5017     }
5018 
5019     fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5020         self.promote_work_items_matching(|item: &WorkItem| match item {
5021             WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5022                 *t == thread
5023             }
5024             _ => false,
5025         })
5026     }
5027 
5028     fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5029     where
5030         F: FnMut(&WorkItem) -> bool,
5031     {
5032         // If there's a high-priority work item to resume the current guest thread,
5033         // we don't need to promote anything, but we return true to indicate that
5034         // work is pending for the current instance.
5035         if self.high_priority.iter().any(&mut predicate) {
5036             true
5037         }
5038         // Otherwise, look for a low-priority work item that matches the current
5039         // instance and promote it to high-priority.
5040         else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5041             let item = self.low_priority.remove(idx).unwrap();
5042             self.push_high_priority(item);
5043             true
5044         } else {
5045             false
5046         }
5047     }
5048 
5049     /// Implements the `context.get` intrinsic.
5050     pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5051         let thread = self.unwrap_current_guest_thread();
5052         let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()];
5053         log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5054         Ok(val)
5055     }
5056 
5057     /// Implements the `context.set` intrinsic.
5058     pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5059         let thread = self.unwrap_current_guest_thread();
5060         log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5061         self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val;
5062         Ok(())
5063     }
5064 
5065     /// Returns whether there's a pending cancellation on the current guest thread,
5066     /// consuming the event if so.
5067     fn take_pending_cancellation(&mut self) -> bool {
5068         let thread = self.unwrap_current_guest_thread();
5069         if let Some(event) = self.get_mut(thread.task).unwrap().event.take() {
5070             assert!(matches!(event, Event::Cancelled));
5071             true
5072         } else {
5073             false
5074         }
5075     }
5076 
5077     fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5078         if self.may_block(task) {
5079             Ok(())
5080         } else {
5081             Err(Trap::CannotBlockSyncTask.into())
5082         }
5083     }
5084 
5085     fn may_block(&mut self, task: TableId<GuestTask>) -> bool {
5086         let task = self.get_mut(task).unwrap();
5087         task.async_function || task.returned_or_cancelled()
5088     }
5089 
5090     /// Used by `ResourceTables` to acquire the current `CallContext` for the
5091     /// specified task.
5092     ///
5093     /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5094     /// below.
5095     pub fn call_context(&mut self, task: u32) -> &mut CallContext {
5096         let (task, is_host) = (task >> 1, task & 1 == 1);
5097         if is_host {
5098             let task: TableId<HostTask> = TableId::new(task);
5099             &mut self.get_mut(task).unwrap().call_context
5100         } else {
5101             let task: TableId<GuestTask> = TableId::new(task);
5102             &mut self.get_mut(task).unwrap().call_context
5103         }
5104     }
5105 
5106     /// Used by `ResourceTables` to record the scope of a borrow to get undone
5107     /// in the future.
5108     pub fn current_call_context_scope_id(&self) -> u32 {
5109         let (bits, is_host) = match self.current_thread {
5110             CurrentThread::Guest(id) => (id.task.rep(), false),
5111             CurrentThread::Host(id) => (id.rep(), true),
5112             CurrentThread::None => unreachable!(),
5113         };
5114         assert_eq!((bits << 1) >> 1, bits);
5115         (bits << 1) | u32::from(is_host)
5116     }
5117 
5118     fn unwrap_current_guest_thread(&self) -> QualifiedThreadId {
5119         *self.current_thread.guest().unwrap()
5120     }
5121 
5122     fn unwrap_current_host_thread(&self) -> TableId<HostTask> {
5123         self.current_thread.host().unwrap()
5124     }
5125 }
5126 
5127 /// Provide a type hint to compiler about the shape of a parameter lower
5128 /// closure.
5129 fn for_any_lower<
5130     F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5131 >(
5132     fun: F,
5133 ) -> F {
5134     fun
5135 }
5136 
5137 /// Provide a type hint to compiler about the shape of a result lift closure.
5138 fn for_any_lift<
5139     F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5140 >(
5141     fun: F,
5142 ) -> F {
5143     fun
5144 }
5145 
5146 /// Wrap the specified future in a `poll_fn` which asserts that the future is
5147 /// only polled from the event loop of the specified `Store`.
5148 ///
5149 /// See `StoreContextMut::run_concurrent` for details.
5150 fn checked<F: Future + Send + 'static>(
5151     id: StoreId,
5152     fut: F,
5153 ) -> impl Future<Output = F::Output> + Send + 'static {
5154     async move {
5155         let mut fut = pin!(fut);
5156         future::poll_fn(move |cx| {
5157             let message = "\
5158                 `Future`s which depend on asynchronous component tasks, streams, or \
5159                 futures to complete may only be polled from the event loop of the \
5160                 store to which they belong.  Please use \
5161                 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5162             ";
5163             tls::try_get(|store| {
5164                 let matched = match store {
5165                     tls::TryGet::Some(store) => store.id() == id,
5166                     tls::TryGet::Taken | tls::TryGet::None => false,
5167                 };
5168 
5169                 if !matched {
5170                     panic!("{message}")
5171                 }
5172             });
5173             fut.as_mut().poll(cx)
5174         })
5175         .await
5176     }
5177 }
5178 
5179 /// Assert that `StoreContextMut::run_concurrent` has not been called from
5180 /// within an store's event loop.
5181 fn check_recursive_run() {
5182     tls::try_get(|store| {
5183         if !matches!(store, tls::TryGet::None) {
5184             panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5185         }
5186     });
5187 }
5188 
5189 fn unpack_callback_code(code: u32) -> (u32, u32) {
5190     (code & 0xF, code >> 4)
5191 }
5192 
5193 /// Helper struct for packaging parameters to be passed to
5194 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5195 /// `waitable-set.poll`.
5196 struct WaitableCheckParams {
5197     set: TableId<WaitableSet>,
5198     options: OptionsIndex,
5199     payload: u32,
5200 }
5201 
5202 /// Indicates whether `ComponentInstance::waitable_check` is being called for
5203 /// `waitable-set.wait` or `waitable-set.poll`.
5204 enum WaitableCheck {
5205     Wait,
5206     Poll,
5207 }
5208 
5209 /// Represents a guest task called from the host, prepared using `prepare_call`.
5210 pub(crate) struct PreparedCall<R> {
5211     /// The guest export to be called
5212     handle: Func,
5213     /// The guest thread created by `prepare_call`
5214     thread: QualifiedThreadId,
5215     /// The number of lowered core Wasm parameters to pass to the call.
5216     param_count: usize,
5217     /// The `oneshot::Receiver` to which the result of the call will be
5218     /// delivered when it is available.
5219     rx: oneshot::Receiver<LiftedResult>,
5220     _phantom: PhantomData<R>,
5221 }
5222 
5223 impl<R> PreparedCall<R> {
5224     /// Get a copy of the `TaskId` for this `PreparedCall`.
5225     pub(crate) fn task_id(&self) -> TaskId {
5226         TaskId {
5227             task: self.thread.task,
5228         }
5229     }
5230 }
5231 
5232 /// Represents a task created by `prepare_call`.
5233 pub(crate) struct TaskId {
5234     task: TableId<GuestTask>,
5235 }
5236 
5237 impl TaskId {
5238     /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5239     /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5240     /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5241     /// and can be resumed by other tasks for this component, so we mark the future as dropped
5242     /// and delete the task when all threads are done.
5243     pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5244         let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5245         if !task.already_lowered_parameters() {
5246             Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5247         } else {
5248             task.host_future_state = HostFutureState::Dropped;
5249             if task.ready_to_delete() {
5250                 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5251             }
5252         }
5253         Ok(())
5254     }
5255 }
5256 
5257 /// Prepare a call to the specified exported Wasm function, providing functions
5258 /// for lowering the parameters and lifting the result.
5259 ///
5260 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5261 /// loop, use `queue_call`.
5262 pub(crate) fn prepare_call<T, R>(
5263     mut store: StoreContextMut<T>,
5264     handle: Func,
5265     param_count: usize,
5266     host_future_present: bool,
5267     lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5268     + Send
5269     + Sync
5270     + 'static,
5271     lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5272     + Send
5273     + Sync
5274     + 'static,
5275 ) -> Result<PreparedCall<R>> {
5276     let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5277 
5278     let instance = handle.instance().id().get(store.0);
5279     let options = &instance.component().env_component().options[options];
5280     let ty = &instance.component().types()[ty];
5281     let async_function = ty.async_;
5282     let task_return_type = ty.results;
5283     let component_instance = raw_options.instance;
5284     let callback = options.callback.map(|i| instance.runtime_callback(i));
5285     let memory = options
5286         .memory()
5287         .map(|i| instance.runtime_memory(i))
5288         .map(SendSyncPtr::new);
5289     let string_encoding = options.string_encoding;
5290     let token = StoreToken::new(store.as_context_mut());
5291     let state = store.0.concurrent_state_mut();
5292 
5293     let (tx, rx) = oneshot::channel();
5294 
5295     let instance = RuntimeInstance {
5296         instance: handle.instance().id().instance(),
5297         index: component_instance,
5298     };
5299     let caller = state.current_thread;
5300     let task = GuestTask::new(
5301         state,
5302         Box::new(for_any_lower(move |store, params| {
5303             lower_params(handle, token.as_context_mut(store), params)
5304         })),
5305         LiftResult {
5306             lift: Box::new(for_any_lift(move |store, result| {
5307                 lift_result(handle, store, result)
5308             })),
5309             ty: task_return_type,
5310             memory,
5311             string_encoding,
5312         },
5313         Caller::Host {
5314             tx: Some(tx),
5315             host_future_present,
5316             caller,
5317         },
5318         callback.map(|callback| {
5319             let callback = SendSyncPtr::new(callback);
5320             let instance = handle.instance();
5321             Box::new(move |store: &mut dyn VMStore, event, handle| {
5322                 let store = token.as_context_mut(store);
5323                 // SAFETY: Per the contract of `prepare_call`, the callback
5324                 // will remain valid at least as long is this task exists.
5325                 unsafe { instance.call_callback(store, callback, event, handle) }
5326             }) as CallbackFn
5327         }),
5328         instance,
5329         async_function,
5330     )?;
5331 
5332     let task = state.push(task)?;
5333     let thread = state.push(GuestThread::new_implicit(task))?;
5334     state.get_mut(task)?.threads.insert(thread);
5335 
5336     if !store.0.may_enter(instance) {
5337         bail!(crate::Trap::CannotEnterComponent);
5338     }
5339 
5340     Ok(PreparedCall {
5341         handle,
5342         thread: QualifiedThreadId { task, thread },
5343         param_count,
5344         rx,
5345         _phantom: PhantomData,
5346     })
5347 }
5348 
5349 /// Queue a call previously prepared using `prepare_call` to be run as part of
5350 /// the associated `ComponentInstance`'s event loop.
5351 ///
5352 /// The returned future will resolve to the result once it is available, but
5353 /// must only be polled via the instance's event loop. See
5354 /// `StoreContextMut::run_concurrent` for details.
5355 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5356     mut store: StoreContextMut<T>,
5357     prepared: PreparedCall<R>,
5358 ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5359     let PreparedCall {
5360         handle,
5361         thread,
5362         param_count,
5363         rx,
5364         ..
5365     } = prepared;
5366 
5367     queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5368 
5369     Ok(checked(
5370         store.0.id(),
5371         rx.map(move |result| {
5372             result
5373                 .map(|v| *v.downcast().unwrap())
5374                 .map_err(crate::Error::from)
5375         }),
5376     ))
5377 }
5378 
5379 /// Queue a call previously prepared using `prepare_call` to be run as part of
5380 /// the associated `ComponentInstance`'s event loop.
5381 fn queue_call0<T: 'static>(
5382     store: StoreContextMut<T>,
5383     handle: Func,
5384     guest_thread: QualifiedThreadId,
5385     param_count: usize,
5386 ) -> Result<()> {
5387     let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5388     let is_concurrent = raw_options.async_;
5389     let callback = raw_options.callback;
5390     let instance = handle.instance();
5391     let callee = handle.lifted_core_func(store.0);
5392     let post_return = handle.post_return_core_func(store.0);
5393     let callback = callback.map(|i| {
5394         let instance = instance.id().get(store.0);
5395         SendSyncPtr::new(instance.runtime_callback(i))
5396     });
5397 
5398     log::trace!("queueing call {guest_thread:?}");
5399 
5400     // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5401     // (with signatures appropriate for this call) and will remain valid as
5402     // long as this instance is valid.
5403     unsafe {
5404         instance.queue_call(
5405             store,
5406             guest_thread,
5407             SendSyncPtr::new(callee),
5408             param_count,
5409             1,
5410             is_concurrent,
5411             callback,
5412             post_return.map(SendSyncPtr::new),
5413         )
5414     }
5415 }
5416