1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics.  See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create.  Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`.  The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest.  This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker.  That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store.  This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52 
53 use crate::bail_bug;
54 use crate::component::func::{self, Func, call_post_return};
55 use crate::component::{
56     HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57 };
58 use crate::fiber::{self, StoreFiber, StoreFiberYield};
59 use crate::prelude::*;
60 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61 use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63 use crate::{
64     AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65 };
66 use error_contexts::GlobalErrorContextRefCount;
67 use futures::channel::oneshot;
68 use futures::future::{self, FutureExt};
69 use futures::stream::{FuturesUnordered, StreamExt};
70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71 use std::any::Any;
72 use std::borrow::ToOwned;
73 use std::boxed::Box;
74 use std::cell::UnsafeCell;
75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76 use std::fmt;
77 use std::future::Future;
78 use std::marker::PhantomData;
79 use std::mem::{self, ManuallyDrop, MaybeUninit};
80 use std::ops::DerefMut;
81 use std::pin::{Pin, pin};
82 use std::ptr::{self, NonNull};
83 use std::task::{Context, Poll, Waker};
84 use std::vec::Vec;
85 use table::{TableDebug, TableId};
86 use wasmtime_environ::Trap;
87 use wasmtime_environ::component::{
88     CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89     MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90     RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91     TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92     TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93 };
94 use wasmtime_environ::packed_option::ReservedValue;
95 
96 pub use abort::JoinHandle;
97 pub use future_stream_any::{FutureAny, StreamAny};
98 pub use futures_and_streams::{
99     Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100     FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101     StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102 };
103 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104 
105 mod abort;
106 mod error_contexts;
107 mod future_stream_any;
108 mod futures_and_streams;
109 pub(crate) mod table;
110 pub(crate) mod tls;
111 
112 /// Constant defined in the Component Model spec to indicate that the async
113 /// intrinsic (e.g. `future.write`) has not yet completed.
114 const BLOCKED: u32 = 0xffff_ffff;
115 
116 /// Corresponds to `CallState` in the upstream spec.
117 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
118 pub enum Status {
119     Starting = 0,
120     Started = 1,
121     Returned = 2,
122     StartCancelled = 3,
123     ReturnCancelled = 4,
124 }
125 
126 impl Status {
127     /// Packs this status and the optional `waitable` provided into a 32-bit
128     /// result that the canonical ABI requires.
129     ///
130     /// The low 4 bits are reserved for the status while the upper 28 bits are
131     /// the waitable, if present.
pack(self, waitable: Option<u32>) -> u32132     pub fn pack(self, waitable: Option<u32>) -> u32 {
133         assert!(matches!(self, Status::Returned) == waitable.is_none());
134         let waitable = waitable.unwrap_or(0);
135         assert!(waitable < (1 << 28));
136         (waitable << 4) | (self as u32)
137     }
138 }
139 
140 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
141 /// data.
142 #[derive(Clone, Copy, Debug)]
143 enum Event {
144     None,
145     Subtask {
146         status: Status,
147     },
148     StreamRead {
149         code: ReturnCode,
150         pending: Option<(TypeStreamTableIndex, u32)>,
151     },
152     StreamWrite {
153         code: ReturnCode,
154         pending: Option<(TypeStreamTableIndex, u32)>,
155     },
156     FutureRead {
157         code: ReturnCode,
158         pending: Option<(TypeFutureTableIndex, u32)>,
159     },
160     FutureWrite {
161         code: ReturnCode,
162         pending: Option<(TypeFutureTableIndex, u32)>,
163     },
164     Cancelled,
165 }
166 
167 impl Event {
168     /// Lower this event to core Wasm integers for delivery to the guest.
169     ///
170     /// Note that the waitable handle, if any, is assumed to be lowered
171     /// separately.
parts(self) -> (u32, u32)172     fn parts(self) -> (u32, u32) {
173         const EVENT_NONE: u32 = 0;
174         const EVENT_SUBTASK: u32 = 1;
175         const EVENT_STREAM_READ: u32 = 2;
176         const EVENT_STREAM_WRITE: u32 = 3;
177         const EVENT_FUTURE_READ: u32 = 4;
178         const EVENT_FUTURE_WRITE: u32 = 5;
179         const EVENT_CANCELLED: u32 = 6;
180         match self {
181             Event::None => (EVENT_NONE, 0),
182             Event::Cancelled => (EVENT_CANCELLED, 0),
183             Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184             Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185             Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186             Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187             Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188         }
189     }
190 }
191 
192 /// Corresponds to `CallbackCode` in the spec.
193 mod callback_code {
194     pub const EXIT: u32 = 0;
195     pub const YIELD: u32 = 1;
196     pub const WAIT: u32 = 2;
197 }
198 
199 /// A flag indicating that the callee is an async-lowered export.
200 ///
201 /// This may be passed to the `async-start` intrinsic from a fused adapter.
202 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203 
204 /// Provides access to either store data (via the `get` method) or the store
205 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206 /// instance to which the current host task belongs.
207 ///
208 /// See [`Accessor::with`] for details.
209 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210     store: StoreContextMut<'a, T>,
211     get_data: fn(&mut T) -> D::Data<'_>,
212 }
213 
214 impl<'a, T, D> Access<'a, T, D>
215 where
216     D: HasData + ?Sized,
217     T: 'static,
218 {
219     /// Creates a new [`Access`] from its component parts.
new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self220     pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221         Self { store, get_data }
222     }
223 
224     /// Get mutable access to the store data.
data_mut(&mut self) -> &mut T225     pub fn data_mut(&mut self) -> &mut T {
226         self.store.data_mut()
227     }
228 
229     /// Get mutable access to the store data.
get(&mut self) -> D::Data<'_>230     pub fn get(&mut self) -> D::Data<'_> {
231         (self.get_data)(self.data_mut())
232     }
233 
234     /// Spawn a background task.
235     ///
236     /// See [`Accessor::spawn`] for details.
spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle where T: 'static,237     pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238     where
239         T: 'static,
240     {
241         let accessor = Accessor {
242             get_data: self.get_data,
243             token: StoreToken::new(self.store.as_context_mut()),
244         };
245         self.store
246             .as_context_mut()
247             .spawn_with_accessor(accessor, task)
248     }
249 
250     /// Returns the getter this accessor is using to project from `T` into
251     /// `D::Data`.
getter(&self) -> fn(&mut T) -> D::Data<'_>252     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253         self.get_data
254     }
255 }
256 
257 impl<'a, T, D> AsContext for Access<'a, T, D>
258 where
259     D: HasData + ?Sized,
260     T: 'static,
261 {
262     type Data = T;
263 
as_context(&self) -> StoreContext<'_, T>264     fn as_context(&self) -> StoreContext<'_, T> {
265         self.store.as_context()
266     }
267 }
268 
269 impl<'a, T, D> AsContextMut for Access<'a, T, D>
270 where
271     D: HasData + ?Sized,
272     T: 'static,
273 {
as_context_mut(&mut self) -> StoreContextMut<'_, T>274     fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275         self.store.as_context_mut()
276     }
277 }
278 
279 /// Provides scoped mutable access to store data in the context of a concurrent
280 /// host task future.
281 ///
282 /// This allows multiple host task futures to execute concurrently and access
283 /// the store between (but not across) `await` points.
284 ///
285 /// # Rationale
286 ///
287 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
289 /// literally store these values. The basic problem is that when a concurrent
290 /// host future is being polled it has access to `&mut T` (and the whole
291 /// `Store`) but when it's not being polled it does not have access to these
292 /// values. This reflects how the store is only ever polling one future at a
293 /// time so the store is effectively being passed between futures.
294 ///
295 /// Rust's `Future` trait, however, has no means of passing a `Store`
296 /// temporarily between futures. The [`Context`](std::task::Context) type does
297 /// not have the ability to attach arbitrary information to it at this time.
298 /// This type, [`Accessor`], is used to bridge this expressivity gap.
299 ///
300 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
301 /// a synchronous manner, the current store. The [`Accessor::with`] function
302 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304 /// not take an `async` closure as its argument, instead it's a synchronous
305 /// closure which must complete during on run of `Future::poll`. This reflects
306 /// how the store is temporarily made available while a host future is being
307 /// polled.
308 ///
309 /// # Implementation
310 ///
311 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312 /// this type additionally doesn't even have a lifetime parameter. This is
313 /// instead a representation of proof of the ability to acquire these while a
314 /// future is being polled. Wasmtime will, when it polls a host future,
315 /// configure ambient state such that the `Accessor` that a future closes over
316 /// will work and be able to access the store.
317 ///
318 /// This has a number of implications for users such as:
319 ///
320 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321 ///   the lifetime of a single future.
322 /// * A future is expected to, however, close over an `Accessor` and keep it
323 ///   alive probably for the duration of the entire future.
324 /// * Different host futures will be given different `Accessor`s, and that's
325 ///   intentional.
326 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327 ///   alleviates some otherwise required bounds to be written down.
328 ///
329 /// # Using `Accessor` in `Drop`
330 ///
331 /// The methods on `Accessor` are only expected to work in the context of
332 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333 /// host future can be dropped at any time throughout the system and Wasmtime
334 /// store context is not necessarily available at that time. It's recommended to
335 /// not use `Accessor` methods in anything connected to a `Drop` implementation
336 /// as they will panic and have unintended results. If you run into this though
337 /// feel free to file an issue on the Wasmtime repository.
338 pub struct Accessor<T: 'static, D = HasSelf<T>>
339 where
340     D: HasData + ?Sized,
341 {
342     token: StoreToken<T>,
343     get_data: fn(&mut T) -> D::Data<'_>,
344 }
345 
346 /// A helper trait to take any type of accessor-with-data in functions.
347 ///
348 /// This trait is similar to [`AsContextMut`] except that it's used when
349 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350 /// [`Accessor`] is the main type used in concurrent settings and is passed to
351 /// functions such as [`Func::call_concurrent`].
352 ///
353 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354 /// this trait. This effectively means that regardless of the `D` in
355 /// `Accessor<T, D>` it can still be passed to a function which just needs a
356 /// store accessor.
357 ///
358 /// Acquiring an [`Accessor`] can be done through
359 /// [`StoreContextMut::run_concurrent`] for example or in a host function
360 /// through
361 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362 pub trait AsAccessor {
363     /// The `T` in `Store<T>` that this accessor refers to.
364     type Data: 'static;
365 
366     /// The `D` in `Accessor<T, D>`, or the projection out of
367     /// `Self::Data`.
368     type AccessorData: HasData + ?Sized;
369 
370     /// Returns the accessor that this is referring to.
as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>371     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372 }
373 
374 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375     type Data = T::Data;
376     type AccessorData = T::AccessorData;
377 
as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>378     fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379         T::as_accessor(self)
380     }
381 }
382 
383 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384     type Data = T;
385     type AccessorData = D;
386 
as_accessor(&self) -> &Accessor<T, D>387     fn as_accessor(&self) -> &Accessor<T, D> {
388         self
389     }
390 }
391 
392 // Note that it is intentional at this time that `Accessor` does not actually
393 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
394 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395 // that matter). This is used to ergonomically simplify bindings where the
396 // majority of the time `Accessor` is closed over in a future which then needs
397 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398 // you already have to write `T: 'static`...) it helps to avoid this.
399 //
400 // Note as well that `Accessor` doesn't actually store its data at all. Instead
401 // it's more of a "proof" of what can be accessed from TLS. API design around
402 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
403 // intentionally made to ensure that `Accessor` is ideally only used in the
404 // context that TLS variables are actually set. For example host functions are
405 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
406 // the value outside of a future. Within the future the TLS variables are all
407 // guaranteed to be set while the future is being polled.
408 //
409 // Finally though this is not an ironclad guarantee, but nor does it need to be.
410 // The TLS APIs are designed to panic or otherwise model usage where they're
411 // called recursively or similar. It's hoped that code cannot be constructed to
412 // actually hit this at runtime but this is not a safety requirement at this
413 // time.
414 const _: () = {
assert<T: Send + Sync>()415     const fn assert<T: Send + Sync>() {}
416     assert::<Accessor<UnsafeCell<u32>>>();
417 };
418 
419 impl<T> Accessor<T> {
420     /// Creates a new `Accessor` backed by the specified functions.
421     ///
422     /// - `get`: used to retrieve the store
423     ///
424     /// - `get_data`: used to "project" from the store's associated data to
425     /// another type (e.g. a field of that data or a wrapper around it).
426     ///
427     /// - `spawn`: used to queue spawned background tasks to be run later
new(token: StoreToken<T>) -> Self428     pub(crate) fn new(token: StoreToken<T>) -> Self {
429         Self {
430             token,
431             get_data: |x| x,
432         }
433     }
434 }
435 
436 impl<T, D> Accessor<T, D>
437 where
438     D: HasData + ?Sized,
439 {
440     /// Run the specified closure, passing it mutable access to the store.
441     ///
442     /// This function is one of the main building blocks of the [`Accessor`]
443     /// type. This yields synchronous, blocking, access to the store via an
444     /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445     /// providing the ability to access `D` via [`Access::get`]. Note that the
446     /// `fun` here is given only temporary access to the store and `T`/`D`
447     /// meaning that the return value `R` here is not allowed to capture borrows
448     /// into the two. If access is needed to data within `T` or `D` outside of
449     /// this closure then it must be `clone`d out, for example.
450     ///
451     /// # Panics
452     ///
453     /// This function will panic if it is call recursively with any other
454     /// accessor already in scope. For example if `with` is called within `fun`,
455     /// then this function will panic. It is up to the embedder to ensure that
456     /// this does not happen.
with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R457     pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458         tls::get(|vmstore| {
459             fun(Access {
460                 store: self.token.as_context_mut(vmstore),
461                 get_data: self.get_data,
462             })
463         })
464     }
465 
466     /// Returns the getter this accessor is using to project from `T` into
467     /// `D::Data`.
getter(&self) -> fn(&mut T) -> D::Data<'_>468     pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469         self.get_data
470     }
471 
472     /// Changes this accessor to access `D2` instead of the current type
473     /// parameter `D`.
474     ///
475     /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476     ///
477     /// # Panics
478     ///
479     /// When using this API the returned value is disconnected from `&self` and
480     /// the lifetime binding the `self` argument. An `Accessor` only works
481     /// within the context of the closure or async closure that it was
482     /// originally given to, however. This means that due to the fact that the
483     /// returned value has no lifetime connection it's possible to use the
484     /// accessor outside of `&self`, the original accessor, and panic.
485     ///
486     /// The returned value should only be used within the scope of the original
487     /// `Accessor` that `self` refers to.
with_getter<D2: HasData>( &self, get_data: fn(&mut T) -> D2::Data<'_>, ) -> Accessor<T, D2>488     pub fn with_getter<D2: HasData>(
489         &self,
490         get_data: fn(&mut T) -> D2::Data<'_>,
491     ) -> Accessor<T, D2> {
492         Accessor {
493             token: self.token,
494             get_data,
495         }
496     }
497 
498     /// Spawn a background task which will receive an `&Accessor<T, D>` and
499     /// run concurrently with any other tasks in progress for the current
500     /// store.
501     ///
502     /// This is particularly useful for host functions which return a `stream`
503     /// or `future` such that the code to write to the write end of that
504     /// `stream` or `future` must run after the function returns.
505     ///
506     /// The returned [`JoinHandle`] may be used to cancel the task.
507     ///
508     /// # Panics
509     ///
510     /// Panics if called within a closure provided to the [`Accessor::with`]
511     /// function. This can only be called outside an active invocation of
512     /// [`Accessor::with`].
spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle where T: 'static,513     pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514     where
515         T: 'static,
516     {
517         let accessor = self.clone_for_spawn();
518         self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519     }
520 
clone_for_spawn(&self) -> Self521     fn clone_for_spawn(&self) -> Self {
522         Self {
523             token: self.token,
524             get_data: self.get_data,
525         }
526     }
527 }
528 
529 /// Represents a task which may be provided to `Accessor::spawn`,
530 /// `Accessor::forward`, or `StorecContextMut::spawn`.
531 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
532 // option.
533 //
534 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
535 // bounds on the `Future` type returned by an `AsyncFnOnce`.  Also, using `F:
536 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
537 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
538 // it an async closure (e.g. `async move |_| { ... }`).  So this seems to be the
539 // best we can do for the time being.
540 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541 where
542     D: HasData + ?Sized,
543 {
544     /// Run the task.
run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send545     fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546 }
547 
548 /// Represents parameter and result metadata for the caller side of a
549 /// guest->guest call orchestrated by a fused adapter.
550 enum CallerInfo {
551     /// Metadata for a call to an async-lowered import
552     Async {
553         params: Vec<ValRaw>,
554         has_result: bool,
555     },
556     /// Metadata for a call to an sync-lowered import
557     Sync {
558         params: Vec<ValRaw>,
559         result_count: u32,
560     },
561 }
562 
563 /// Indicates how a guest task is waiting on a waitable set.
564 enum WaitMode {
565     /// The guest task is waiting using `task.wait`
566     Fiber(StoreFiber<'static>),
567     /// The guest task is waiting via a callback declared as part of an
568     /// async-lifted export.
569     Callback(Instance),
570 }
571 
572 /// Represents the reason a fiber is suspending itself.
573 #[derive(Debug)]
574 enum SuspendReason {
575     /// The fiber is waiting for an event to be delivered to the specified
576     /// waitable set or task.
577     Waiting {
578         set: TableId<WaitableSet>,
579         thread: QualifiedThreadId,
580         skip_may_block_check: bool,
581     },
582     /// The fiber has finished handling its most recent work item and is waiting
583     /// for another (or to be dropped if it is no longer needed).
584     NeedWork,
585     /// The fiber is yielding and should be resumed once other tasks have had a
586     /// chance to run.
587     Yielding {
588         thread: QualifiedThreadId,
589         skip_may_block_check: bool,
590     },
591     /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
592     ExplicitlySuspending {
593         thread: QualifiedThreadId,
594         skip_may_block_check: bool,
595     },
596 }
597 
598 /// Represents a pending call into guest code for a given guest task.
599 enum GuestCallKind {
600     /// Indicates there's an event to deliver to the task, possibly related to a
601     /// waitable set the task has been waiting on or polling.
602     DeliverEvent {
603         /// The instance to which the task belongs.
604         instance: Instance,
605         /// The waitable set the event belongs to, if any.
606         ///
607         /// If this is `None` the event will be waiting in the
608         /// `GuestTask::event` field for the task.
609         set: Option<TableId<WaitableSet>>,
610     },
611     /// Indicates that a new guest task call is pending and may be executed
612     /// using the specified closure.
613     ///
614     /// If the closure returns `Ok(Some(call))`, the `call` should be run
615     /// immediately using `handle_guest_call`.
616     StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617     StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618 }
619 
620 impl fmt::Debug for GuestCallKind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result621     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622         match self {
623             Self::DeliverEvent { instance, set } => f
624                 .debug_struct("DeliverEvent")
625                 .field("instance", instance)
626                 .field("set", set)
627                 .finish(),
628             Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629             Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630         }
631     }
632 }
633 
634 /// The target of a suspension intrinsic.
635 #[derive(Copy, Clone, Debug)]
636 pub enum SuspensionTarget {
637     SomeSuspended(u32),
638     Some(u32),
639     None,
640 }
641 
642 impl SuspensionTarget {
is_none(&self) -> bool643     fn is_none(&self) -> bool {
644         matches!(self, SuspensionTarget::None)
645     }
is_some(&self) -> bool646     fn is_some(&self) -> bool {
647         !self.is_none()
648     }
649 }
650 
651 /// Represents a pending call into guest code for a given guest thread.
652 #[derive(Debug)]
653 struct GuestCall {
654     thread: QualifiedThreadId,
655     kind: GuestCallKind,
656 }
657 
658 impl GuestCall {
659     /// Returns whether or not the call is ready to run.
660     ///
661     /// A call will not be ready to run if either:
662     ///
663     /// - the (sub-)component instance to be called has already been entered and
664     /// cannot be reentered until an in-progress call completes
665     ///
666     /// - the call is for a not-yet started task and the (sub-)component
667     /// instance to be called has backpressure enabled
is_ready(&self, store: &mut StoreOpaque) -> Result<bool>668     fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669         let instance = store
670             .concurrent_state_mut()
671             .get_mut(self.thread.task)?
672             .instance;
673         let state = store.instance_state(instance).concurrent_state();
674 
675         let ready = match &self.kind {
676             GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677             GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678             GuestCallKind::StartExplicit(_) => true,
679         };
680         log::trace!(
681             "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682             state.do_not_enter,
683             state.backpressure
684         );
685         Ok(ready)
686     }
687 }
688 
689 /// Job to be run on a worker fiber.
690 enum WorkerItem {
691     GuestCall(GuestCall),
692     Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693 }
694 
695 /// Represents a pending work item to be handled by the event loop for a given
696 /// component instance.
697 enum WorkItem {
698     /// A host task to be pushed to `ConcurrentState::futures`.
699     PushFuture(AlwaysMut<HostTaskFuture>),
700     /// A fiber to resume.
701     ResumeFiber(StoreFiber<'static>),
702     /// A thread to resume.
703     ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704     /// A pending call into guest code for a given guest task.
705     GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706     /// A job to run on a worker fiber.
707     WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708 }
709 
710 impl fmt::Debug for WorkItem {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result711     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712         match self {
713             Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714             Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715             Self::ResumeThread(instance, thread) => f
716                 .debug_tuple("ResumeThread")
717                 .field(instance)
718                 .field(thread)
719                 .finish(),
720             Self::GuestCall(instance, call) => f
721                 .debug_tuple("GuestCall")
722                 .field(instance)
723                 .field(call)
724                 .finish(),
725             Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726         }
727     }
728 }
729 
730 /// Whether a suspension intrinsic was cancelled or completed
731 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732 pub(crate) enum WaitResult {
733     Cancelled,
734     Completed,
735 }
736 
737 /// Poll the specified future until it completes on behalf of a guest->host call
738 /// using a sync-lowered import.
739 ///
740 /// This is similar to `Instance::first_poll` except it's for sync-lowered
741 /// imports, meaning we don't need to handle cancellation and we can block the
742 /// caller until the task completes, at which point the caller can handle
743 /// lowering the result to the guest's stack and linear memory.
poll_and_block<R: Send + Sync + 'static>( store: &mut dyn VMStore, future: impl Future<Output = Result<R>> + Send + 'static, ) -> Result<R>744 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745     store: &mut dyn VMStore,
746     future: impl Future<Output = Result<R>> + Send + 'static,
747 ) -> Result<R> {
748     let state = store.concurrent_state_mut();
749     let task = state.current_host_thread()?;
750 
751     // Wrap the future in a closure which will take care of stashing the result
752     // in `GuestTask::result` and resuming this fiber when the host task
753     // completes.
754     let mut future = Box::pin(async move {
755         let result = future.await?;
756         tls::get(move |store| {
757             let state = store.concurrent_state_mut();
758             let host_state = &mut state.get_mut(task)?.state;
759             assert!(matches!(host_state, HostTaskState::CalleeStarted));
760             *host_state = HostTaskState::CalleeFinished(Box::new(result));
761 
762             Waitable::Host(task).set_event(
763                 state,
764                 Some(Event::Subtask {
765                     status: Status::Returned,
766                 }),
767             )?;
768 
769             Ok(())
770         })
771     }) as HostTaskFuture;
772 
773     // Finally, poll the future.  We can use a dummy `Waker` here because we'll
774     // add the future to `ConcurrentState::futures` and poll it automatically
775     // from the event loop if it doesn't complete immediately here.
776     let poll = tls::set(store, || {
777         future
778             .as_mut()
779             .poll(&mut Context::from_waker(&Waker::noop()))
780     });
781 
782     match poll {
783         // It completed immediately; check the result and delete the task.
784         Poll::Ready(result) => result?,
785 
786         // It did not complete immediately; add it to
787         // `ConcurrentState::futures` so it will be polled via the event loop;
788         // then use `GuestThread::sync_call_set` to wait for the task to
789         // complete, suspending the current fiber until it does so.
790         Poll::Pending => {
791             let state = store.concurrent_state_mut();
792             state.push_future(future);
793 
794             let caller = state.get_mut(task)?.caller;
795             let set = state.get_mut(caller.thread)?.sync_call_set;
796             Waitable::Host(task).join(state, Some(set))?;
797 
798             store.suspend(SuspendReason::Waiting {
799                 set,
800                 thread: caller,
801                 skip_may_block_check: false,
802             })?;
803 
804             // Remove the `task` from the `sync_call_set` to ensure that when
805             // this function returns and the task is deleted that there are no
806             // more lingering references to this host task.
807             Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808         }
809     }
810 
811     // Retrieve and return the result.
812     let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813     match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814         HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815             Ok(result) => *result,
816             Err(_) => bail_bug!("host task finished with wrong type of result"),
817         }),
818         _ => bail_bug!("unexpected host task state after completion"),
819     }
820 }
821 
822 /// Execute the specified guest call.
handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()>823 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824     let mut next = Some(call);
825     while let Some(call) = next.take() {
826         match call.kind {
827             GuestCallKind::DeliverEvent { instance, set } => {
828                 let (event, waitable) =
829                     match instance.get_event(store, call.thread.task, set, true)? {
830                         Some(pair) => pair,
831                         None => bail_bug!("delivering non-present event"),
832                     };
833                 let state = store.concurrent_state_mut();
834                 let task = state.get_mut(call.thread.task)?;
835                 let runtime_instance = task.instance;
836                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837 
838                 log::trace!(
839                     "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840                     call.thread,
841                 );
842 
843                 let old_thread = store.set_thread(call.thread)?;
844                 log::trace!(
845                     "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846                     call.thread
847                 );
848 
849                 store.enter_instance(runtime_instance);
850 
851                 let Some(callback) = store
852                     .concurrent_state_mut()
853                     .get_mut(call.thread.task)?
854                     .callback
855                     .take()
856                 else {
857                     bail_bug!("guest task callback field not present")
858                 };
859 
860                 let code = callback(store, event, handle)?;
861 
862                 store
863                     .concurrent_state_mut()
864                     .get_mut(call.thread.task)?
865                     .callback = Some(callback);
866 
867                 store.exit_instance(runtime_instance)?;
868 
869                 store.set_thread(old_thread)?;
870 
871                 next = instance.handle_callback_code(
872                     store,
873                     call.thread,
874                     runtime_instance.index,
875                     code,
876                 )?;
877 
878                 log::trace!(
879                     "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880                 );
881             }
882             GuestCallKind::StartImplicit(fun) => {
883                 next = fun(store)?;
884             }
885             GuestCallKind::StartExplicit(fun) => {
886                 fun(store)?;
887             }
888         }
889     }
890 
891     Ok(())
892 }
893 
894 impl<T> Store<T> {
895     /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> where T: Send + 'static,896     pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897     where
898         T: Send + 'static,
899     {
900         ensure!(
901             self.as_context().0.concurrency_support(),
902             "cannot use `run_concurrent` when Config::concurrency_support disabled",
903         );
904         self.as_context_mut().run_concurrent(fun).await
905     }
906 
907     #[doc(hidden)]
assert_concurrent_state_empty(&mut self)908     pub fn assert_concurrent_state_empty(&mut self) {
909         self.as_context_mut().assert_concurrent_state_empty();
910     }
911 
912     #[doc(hidden)]
concurrent_state_table_size(&mut self) -> usize913     pub fn concurrent_state_table_size(&mut self) -> usize {
914         self.as_context_mut().concurrent_state_table_size()
915     }
916 
917     /// Convenience wrapper for [`StoreContextMut::spawn`].
spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle where T: 'static,918     pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919     where
920         T: 'static,
921     {
922         self.as_context_mut().spawn(task)
923     }
924 }
925 
926 impl<T> StoreContextMut<'_, T> {
927     /// Assert that all the relevant tables and queues in the concurrent state
928     /// for this store are empty.
929     ///
930     /// This is for sanity checking in integration tests
931     /// (e.g. `component-async-tests`) that the relevant state has been cleared
932     /// after each test concludes.  This should help us catch leaks, e.g. guest
933     /// tasks which haven't been deleted despite having completed and having
934     /// been dropped by their supertasks.
935     ///
936     /// Only intended for use in Wasmtime's own testing.
937     #[doc(hidden)]
assert_concurrent_state_empty(self)938     pub fn assert_concurrent_state_empty(self) {
939         let store = self.0;
940         store
941             .store_data_mut()
942             .components
943             .assert_instance_states_empty();
944         let state = store.concurrent_state_mut();
945         assert!(
946             state.table.get_mut().is_empty(),
947             "non-empty table: {:?}",
948             state.table.get_mut()
949         );
950         assert!(state.high_priority.is_empty());
951         assert!(state.low_priority.is_empty());
952         assert!(state.current_thread.is_none());
953         assert!(state.futures_mut().unwrap().is_empty());
954         assert!(state.global_error_context_ref_counts.is_empty());
955     }
956 
957     /// Helper function to perform tests over the size of the concurrent state
958     /// table which can be useful for detecting leaks.
959     ///
960     /// Only intended for use in Wasmtime's own testing.
961     #[doc(hidden)]
concurrent_state_table_size(&mut self) -> usize962     pub fn concurrent_state_table_size(&mut self) -> usize {
963         self.0
964             .concurrent_state_mut()
965             .table
966             .get_mut()
967             .iter_mut()
968             .count()
969     }
970 
971     /// Spawn a background task to run as part of this instance's event loop.
972     ///
973     /// The task will receive an `&Accessor<U>` and run concurrently with
974     /// any other tasks in progress for the instance.
975     ///
976     /// Note that the task will only make progress if and when the event loop
977     /// for this instance is run.
978     ///
979     /// The returned [`JoinHandle`] may be used to cancel the task.
spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle where T: 'static,980     pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981     where
982         T: 'static,
983     {
984         let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985         self.spawn_with_accessor(accessor, task)
986     }
987 
988     /// Internal implementation of `spawn` functions where a `store` is
989     /// available along with an `Accessor`.
spawn_with_accessor<D>( self, accessor: Accessor<T, D>, task: impl AccessorTask<T, D>, ) -> JoinHandle where T: 'static, D: HasData + ?Sized,990     fn spawn_with_accessor<D>(
991         self,
992         accessor: Accessor<T, D>,
993         task: impl AccessorTask<T, D>,
994     ) -> JoinHandle
995     where
996         T: 'static,
997         D: HasData + ?Sized,
998     {
999         // Create an "abortable future" here where internally the future will
1000         // hook calls to poll and possibly spawn more background tasks on each
1001         // iteration.
1002         let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003         self.0
1004             .concurrent_state_mut()
1005             .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006         handle
1007     }
1008 
1009     /// Run the specified closure `fun` to completion as part of this store's
1010     /// event loop.
1011     ///
1012     /// This will run `fun` as part of this store's event loop until it
1013     /// yields a result.  `fun` is provided an [`Accessor`], which provides
1014     /// controlled access to the store and its data.
1015     ///
1016     /// This function can be used to invoke [`Func::call_concurrent`] for
1017     /// example within the async closure provided here.
1018     ///
1019     /// This function will unconditionally return an error if
1020     /// [`Config::concurrency_support`] is disabled.
1021     ///
1022     /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1023     ///
1024     /// # Store-blocking behavior
1025     ///
1026     /// At this time there are certain situations in which the `Future` returned
1027     /// by the `AsyncFnOnce` passed to this function will not be polled for an
1028     /// extended period of time, despite one or more `Waker::wake` events having
1029     /// occurred for the task to which it belongs.  This can manifest as the
1030     /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1031     /// the `Store` being held by e.g. a blocking host function, preventing the
1032     /// `Future` from being polled. A canonical example of this is when the
1033     /// `fun` provided to this function attempts to set a timeout for an
1034     /// invocation of a wasm function. In this situation the async closure is
1035     /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1036     /// to elapse. At this time this setup will not always work and the timeout
1037     /// may not reliably fire.
1038     ///
1039     /// This function will not block the current thread and as such is always
1040     /// suitable to run in an `async` context, but the current implementation of
1041     /// Wasmtime can lead to situations where a certain wasm computation is
1042     /// required to make progress the closure to make progress. This is an
1043     /// artifact of Wasmtime's historical implementation of `async` functions
1044     /// and is the topic of [#11869] and [#11870]. In the timeout example from
1045     /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1046     /// progress for a readiness notification of (b) to get delivered.
1047     ///
1048     /// This effectively means that it's not possible to reliably perform a
1049     /// "select" operation within the `fun` closure, which timeouts for example
1050     /// are based on. Fixing this requires some relatively major refactoring
1051     /// work within Wasmtime itself. This is a known pitfall otherwise and one
1052     /// that is intended to be fixed one day. In the meantime it's recommended
1053     /// to apply timeouts or such to the entire `run_concurrent` call itself
1054     /// rather than internally.
1055     ///
1056     /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1057     /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1058     ///
1059     /// # Example
1060     ///
1061     /// ```
1062     /// # use {
1063     /// #   wasmtime::{
1064     /// #     error::{Result},
1065     /// #     component::{ Component, Linker, Resource, ResourceTable},
1066     /// #     Config, Engine, Store
1067     /// #   },
1068     /// # };
1069     /// #
1070     /// # struct MyResource(u32);
1071     /// # struct Ctx { table: ResourceTable }
1072     /// #
1073     /// # async fn foo() -> Result<()> {
1074     /// # let mut config = Config::new();
1075     /// # let engine = Engine::new(&config)?;
1076     /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1077     /// # let mut linker = Linker::new(&engine);
1078     /// # let component = Component::new(&engine, "")?;
1079     /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1080     /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1081     /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1082     /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1083     ///    let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1084     ///    let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1085     ///    let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1086     ///    bar.call_concurrent(accessor, (value.0,)).await?;
1087     ///    Ok(())
1088     /// }).await??;
1089     /// # Ok(())
1090     /// # }
1091     /// ```
run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> where T: Send + 'static,1092     pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093     where
1094         T: Send + 'static,
1095     {
1096         ensure!(
1097             self.0.concurrency_support(),
1098             "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099         );
1100         self.do_run_concurrent(fun, false).await
1101     }
1102 
run_concurrent_trap_on_idle<R>( self, fun: impl AsyncFnOnce(&Accessor<T>) -> R, ) -> Result<R> where T: Send + 'static,1103     pub(super) async fn run_concurrent_trap_on_idle<R>(
1104         self,
1105         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106     ) -> Result<R>
1107     where
1108         T: Send + 'static,
1109     {
1110         self.do_run_concurrent(fun, true).await
1111     }
1112 
do_run_concurrent<R>( mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R, trap_on_idle: bool, ) -> Result<R> where T: Send + 'static,1113     async fn do_run_concurrent<R>(
1114         mut self,
1115         fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116         trap_on_idle: bool,
1117     ) -> Result<R>
1118     where
1119         T: Send + 'static,
1120     {
1121         debug_assert!(self.0.concurrency_support());
1122         check_recursive_run();
1123         let token = StoreToken::new(self.as_context_mut());
1124 
1125         struct Dropper<'a, T: 'static, V> {
1126             store: StoreContextMut<'a, T>,
1127             value: ManuallyDrop<V>,
1128         }
1129 
1130         impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131             fn drop(&mut self) {
1132                 tls::set(self.store.0, || {
1133                     // SAFETY: Here we drop the value without moving it for the
1134                     // first and only time -- per the contract for `Drop::drop`,
1135                     // this code won't run again, and the `value` field will no
1136                     // longer be accessible.
1137                     unsafe { ManuallyDrop::drop(&mut self.value) }
1138                 });
1139             }
1140         }
1141 
1142         let accessor = &Accessor::new(token);
1143         let dropper = &mut Dropper {
1144             store: self,
1145             value: ManuallyDrop::new(fun(accessor)),
1146         };
1147         // SAFETY: We never move `dropper` nor its `value` field.
1148         let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149 
1150         dropper
1151             .store
1152             .as_context_mut()
1153             .poll_until(future, trap_on_idle)
1154             .await
1155     }
1156 
1157     /// Run this store's event loop.
1158     ///
1159     /// The returned future will resolve when the specified future completes or,
1160     /// if `trap_on_idle` is true, when the event loop can't make further
1161     /// progress.
poll_until<R>( mut self, mut future: Pin<&mut impl Future<Output = R>>, trap_on_idle: bool, ) -> Result<R> where T: Send + 'static,1162     async fn poll_until<R>(
1163         mut self,
1164         mut future: Pin<&mut impl Future<Output = R>>,
1165         trap_on_idle: bool,
1166     ) -> Result<R>
1167     where
1168         T: Send + 'static,
1169     {
1170         struct Reset<'a, T: 'static> {
1171             store: StoreContextMut<'a, T>,
1172             futures: Option<FuturesUnordered<HostTaskFuture>>,
1173         }
1174 
1175         impl<'a, T> Drop for Reset<'a, T> {
1176             fn drop(&mut self) {
1177                 if let Some(futures) = self.futures.take() {
1178                     *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179                 }
1180             }
1181         }
1182 
1183         loop {
1184             // Take `ConcurrentState::futures` out of the store so we can poll
1185             // it while also safely giving any of the futures inside access to
1186             // `self`.
1187             let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188             let mut reset = Reset {
1189                 store: self.as_context_mut(),
1190                 futures,
1191             };
1192             let mut next = match reset.futures.as_mut() {
1193                 Some(f) => pin!(f.next()),
1194                 None => bail_bug!("concurrent state missing futures field"),
1195             };
1196 
1197             enum PollResult<R> {
1198                 Complete(R),
1199                 ProcessWork(Vec<WorkItem>),
1200             }
1201             let result = future::poll_fn(|cx| {
1202                 // First, poll the future we were passed as an argument and
1203                 // return immediately if it's ready.
1204                 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1205                     return Poll::Ready(Ok(PollResult::Complete(value)));
1206                 }
1207 
1208                 // Next, poll `ConcurrentState::futures` (which includes any
1209                 // pending host tasks and/or background tasks), returning
1210                 // immediately if one of them fails.
1211                 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1212                     Poll::Ready(Some(output)) => {
1213                         match output {
1214                             Err(e) => return Poll::Ready(Err(e)),
1215                             Ok(()) => {}
1216                         }
1217                         Poll::Ready(true)
1218                     }
1219                     Poll::Ready(None) => Poll::Ready(false),
1220                     Poll::Pending => Poll::Pending,
1221                 };
1222 
1223                 // Next, collect the next batch of work items to process, if any.
1224                 // This will be either all of the high-priority work items, or if
1225                 // there are none, a single low-priority work item.
1226                 let state = reset.store.0.concurrent_state_mut();
1227                 let ready = state.collect_work_items_to_run();
1228                 if !ready.is_empty() {
1229                     return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1230                 }
1231 
1232                 // Finally, if we have nothing else to do right now, determine what to do
1233                 // based on whether there are any pending futures in
1234                 // `ConcurrentState::futures`.
1235                 return match next {
1236                     Poll::Ready(true) => {
1237                         // In this case, one of the futures in
1238                         // `ConcurrentState::futures` completed
1239                         // successfully, so we return now and continue
1240                         // the outer loop in case there is another one
1241                         // ready to complete.
1242                         Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1243                     }
1244                     Poll::Ready(false) => {
1245                         // Poll the future we were passed one last time
1246                         // in case one of `ConcurrentState::futures` had
1247                         // the side effect of unblocking it.
1248                         if let Poll::Ready(value) =
1249                             tls::set(reset.store.0, || future.as_mut().poll(cx))
1250                         {
1251                             Poll::Ready(Ok(PollResult::Complete(value)))
1252                         } else {
1253                             // In this case, there are no more pending
1254                             // futures in `ConcurrentState::futures`,
1255                             // there are no remaining work items, _and_
1256                             // the future we were passed as an argument
1257                             // still hasn't completed.
1258                             if trap_on_idle {
1259                                 // `trap_on_idle` is true, so we exit
1260                                 // immediately.
1261                                 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1262                             } else {
1263                                 // `trap_on_idle` is false, so we assume
1264                                 // that future will wake up and give us
1265                                 // more work to do when it's ready to.
1266                                 Poll::Pending
1267                             }
1268                         }
1269                     }
1270                     // There is at least one pending future in
1271                     // `ConcurrentState::futures` and we have nothing
1272                     // else to do but wait for now, so we return
1273                     // `Pending`.
1274                     Poll::Pending => Poll::Pending,
1275                 };
1276             })
1277             .await;
1278 
1279             // Put the `ConcurrentState::futures` back into the store before we
1280             // return or handle any work items since one or more of those items
1281             // might append more futures.
1282             drop(reset);
1283 
1284             match result? {
1285                 // The future we were passed as an argument completed, so we
1286                 // return the result.
1287                 PollResult::Complete(value) => break Ok(value),
1288                 // The future we were passed has not yet completed, so handle
1289                 // any work items and then loop again.
1290                 PollResult::ProcessWork(ready) => {
1291                     struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1292                         store: StoreContextMut<'a, T>,
1293                         ready: I,
1294                     }
1295 
1296                     impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1297                         fn drop(&mut self) {
1298                             while let Some(item) = self.ready.next() {
1299                                 match item {
1300                                     WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1301                                     WorkItem::PushFuture(future) => {
1302                                         tls::set(self.store.0, move || drop(future))
1303                                     }
1304                                     _ => {}
1305                                 }
1306                             }
1307                         }
1308                     }
1309 
1310                     let mut dispose = Dispose {
1311                         store: self.as_context_mut(),
1312                         ready: ready.into_iter(),
1313                     };
1314 
1315                     while let Some(item) = dispose.ready.next() {
1316                         dispose
1317                             .store
1318                             .as_context_mut()
1319                             .handle_work_item(item)
1320                             .await?;
1321                     }
1322                 }
1323             }
1324         }
1325     }
1326 
1327     /// Handle the specified work item, possibly resuming a fiber if applicable.
handle_work_item(self, item: WorkItem) -> Result<()> where T: Send,1328     async fn handle_work_item(self, item: WorkItem) -> Result<()>
1329     where
1330         T: Send,
1331     {
1332         log::trace!("handle work item {item:?}");
1333         match item {
1334             WorkItem::PushFuture(future) => {
1335                 self.0
1336                     .concurrent_state_mut()
1337                     .futures_mut()?
1338                     .push(future.into_inner());
1339             }
1340             WorkItem::ResumeFiber(fiber) => {
1341                 self.0.resume_fiber(fiber).await?;
1342             }
1343             WorkItem::ResumeThread(_, thread) => {
1344                 if let GuestThreadState::Ready(fiber) = mem::replace(
1345                     &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1346                     GuestThreadState::Running,
1347                 ) {
1348                     self.0.resume_fiber(fiber).await?;
1349                 } else {
1350                     bail_bug!("cannot resume non-pending thread {thread:?}");
1351                 }
1352             }
1353             WorkItem::GuestCall(_, call) => {
1354                 if call.is_ready(self.0)? {
1355                     self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1356                 } else {
1357                     let state = self.0.concurrent_state_mut();
1358                     let task = state.get_mut(call.thread.task)?;
1359                     if !task.starting_sent {
1360                         task.starting_sent = true;
1361                         if let GuestCallKind::StartImplicit(_) = &call.kind {
1362                             Waitable::Guest(call.thread.task).set_event(
1363                                 state,
1364                                 Some(Event::Subtask {
1365                                     status: Status::Starting,
1366                                 }),
1367                             )?;
1368                         }
1369                     }
1370 
1371                     let instance = state.get_mut(call.thread.task)?.instance;
1372                     self.0
1373                         .instance_state(instance)
1374                         .concurrent_state()
1375                         .pending
1376                         .insert(call.thread, call.kind);
1377                 }
1378             }
1379             WorkItem::WorkerFunction(fun) => {
1380                 self.run_on_worker(WorkerItem::Function(fun)).await?;
1381             }
1382         }
1383 
1384         Ok(())
1385     }
1386 
1387     /// Execute the specified guest call on a worker fiber.
run_on_worker(self, item: WorkerItem) -> Result<()> where T: Send,1388     async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1389     where
1390         T: Send,
1391     {
1392         let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1393             fiber
1394         } else {
1395             fiber::make_fiber(self.0, move |store| {
1396                 loop {
1397                     let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1398                         bail_bug!("worker_item not present when resuming fiber")
1399                     };
1400                     match item {
1401                         WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1402                         WorkerItem::Function(fun) => fun.into_inner()(store)?,
1403                     }
1404 
1405                     store.suspend(SuspendReason::NeedWork)?;
1406                 }
1407             })?
1408         };
1409 
1410         let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1411         assert!(worker_item.is_none());
1412         *worker_item = Some(item);
1413 
1414         self.0.resume_fiber(worker).await
1415     }
1416 
1417     /// Wrap the specified host function in a future which will call it, passing
1418     /// it an `&Accessor<T>`.
1419     ///
1420     /// See the `Accessor` documentation for details.
wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static where T: 'static, F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> + Send + Sync + 'static, R: Send + Sync + 'static,1421     pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1422     where
1423         T: 'static,
1424         F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1425             + Send
1426             + Sync
1427             + 'static,
1428         R: Send + Sync + 'static,
1429     {
1430         let token = StoreToken::new(self);
1431         async move {
1432             let mut accessor = Accessor::new(token);
1433             closure(&mut accessor).await
1434         }
1435     }
1436 }
1437 
1438 impl StoreOpaque {
1439     /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1440     /// guest-to-guest call or a sync host-to-guest call.
1441     ///
1442     /// This task will only be used for the purpose of handling calls to
1443     /// intrinsic functions; both parameter lowering and result lifting are
1444     /// assumed to be taken care of elsewhere.
enter_guest_sync_call( &mut self, guest_caller: Option<RuntimeInstance>, callee_async: bool, callee: RuntimeInstance, ) -> Result<()>1445     pub(crate) fn enter_guest_sync_call(
1446         &mut self,
1447         guest_caller: Option<RuntimeInstance>,
1448         callee_async: bool,
1449         callee: RuntimeInstance,
1450     ) -> Result<()> {
1451         log::trace!("enter sync call {callee:?}");
1452         if !self.concurrency_support() {
1453             return Ok(self.enter_call_not_concurrent());
1454         }
1455 
1456         let state = self.concurrent_state_mut();
1457         let thread = state.current_thread;
1458         let instance = if let Some(thread) = thread.guest() {
1459             Some(state.get_mut(thread.task)?.instance)
1460         } else {
1461             None
1462         };
1463         if guest_caller.is_some() {
1464             debug_assert_eq!(instance, guest_caller);
1465         }
1466         let task = GuestTask::new(
1467             Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1468             LiftResult {
1469                 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1470                 ty: TypeTupleIndex::reserved_value(),
1471                 memory: None,
1472                 string_encoding: StringEncoding::Utf8,
1473             },
1474             if let Some(thread) = thread.guest() {
1475                 Caller::Guest { thread: *thread }
1476             } else {
1477                 Caller::Host {
1478                     tx: None,
1479                     host_future_present: false,
1480                     caller: thread,
1481                 }
1482             },
1483             None,
1484             callee,
1485             callee_async,
1486         )?;
1487 
1488         let guest_task = state.push(task)?;
1489         let new_thread = GuestThread::new_implicit(state, guest_task)?;
1490         let guest_thread = state.push(new_thread)?;
1491         Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1492             guest_thread,
1493             self,
1494             callee.index,
1495         )?;
1496 
1497         let state = self.concurrent_state_mut();
1498         state.get_mut(guest_task)?.threads.insert(guest_thread);
1499 
1500         self.set_thread(QualifiedThreadId {
1501             task: guest_task,
1502             thread: guest_thread,
1503         })?;
1504 
1505         Ok(())
1506     }
1507 
1508     /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
exit_guest_sync_call(&mut self) -> Result<()>1509     pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1510         if !self.concurrency_support() {
1511             return Ok(self.exit_call_not_concurrent());
1512         }
1513         let thread = match self.set_thread(CurrentThread::None)?.guest() {
1514             Some(t) => *t,
1515             None => bail_bug!("expected task when exiting"),
1516         };
1517         let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1518         log::trace!("exit sync call {instance:?}");
1519         Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1520             self,
1521             thread,
1522             instance.index,
1523         )?;
1524 
1525         let state = self.concurrent_state_mut();
1526         let task = state.get_mut(thread.task)?;
1527         let caller = match &task.caller {
1528             &Caller::Guest { thread } => thread.into(),
1529             &Caller::Host { caller, .. } => caller,
1530         };
1531         self.set_thread(caller)?;
1532 
1533         let state = self.concurrent_state_mut();
1534         let task = state.get_mut(thread.task)?;
1535         if task.ready_to_delete() {
1536             state.delete(thread.task)?.dispose(state)?;
1537         }
1538 
1539         Ok(())
1540     }
1541 
1542     /// Similar to `enter_guest_sync_call` except for when the guest makes a
1543     /// transition to the host.
1544     ///
1545     /// FIXME: this is called for all guest->host transitions and performs some
1546     /// relatively expensive table manipulations. This would ideally be
1547     /// optimized to avoid the full allocation of a `HostTask` in at least some
1548     /// situations.
host_task_create(&mut self) -> Result<Option<TableId<HostTask>>>1549     pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1550         if !self.concurrency_support() {
1551             self.enter_call_not_concurrent();
1552             return Ok(None);
1553         }
1554         let state = self.concurrent_state_mut();
1555         let caller = state.current_guest_thread()?;
1556         let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1557         log::trace!("new host task {task:?}");
1558         self.set_thread(task)?;
1559         Ok(Some(task))
1560     }
1561 
1562     /// Invoked before lowering the results of a host task to the guest.
1563     ///
1564     /// This is used to update the current thread annotations within the store
1565     /// to ensure that it reflects the guest task, not the host task, since
1566     /// lowering may execute guest code.
host_task_reenter_caller(&mut self) -> Result<()>1567     pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1568         if !self.concurrency_support() {
1569             return Ok(());
1570         }
1571         let task = self.concurrent_state_mut().current_host_thread()?;
1572         let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1573         self.set_thread(caller)?;
1574         Ok(())
1575     }
1576 
1577     /// Dual of `host_task_create` and signifies that the host has finished and
1578     /// will be cleaned up.
1579     ///
1580     /// Note that this isn't invoked when the host is invoked asynchronously and
1581     /// the host isn't complete yet. In that situation the host task persists
1582     /// and will be cleaned up separately in `subtask_drop`
host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()>1583     pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1584         match task {
1585             Some(task) => {
1586                 log::trace!("delete host task {task:?}");
1587                 self.concurrent_state_mut().delete(task)?;
1588             }
1589             None => {
1590                 self.exit_call_not_concurrent();
1591             }
1592         }
1593         Ok(())
1594     }
1595 
1596     /// Determine whether the specified instance may be entered from the host.
1597     ///
1598     /// We return `true` here only if all of the following hold:
1599     ///
1600     /// - The top-level instance is not already on the current task's call stack.
1601     /// - The instance is not in need of a post-return function call.
1602     /// - `self` has not been poisoned due to a trap.
may_enter(&mut self, instance: RuntimeInstance) -> Result<bool>1603     pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1604         if self.trapped() {
1605             return Ok(false);
1606         }
1607         if !self.concurrency_support() {
1608             return Ok(true);
1609         }
1610         let state = self.concurrent_state_mut();
1611         let mut cur = state.current_thread;
1612         loop {
1613             match cur {
1614                 CurrentThread::None => break Ok(true),
1615                 CurrentThread::Guest(thread) => {
1616                     let task = state.get_mut(thread.task)?;
1617 
1618                     // Note that we only compare top-level instance IDs here.
1619                     // The idea is that the host is not allowed to recursively
1620                     // enter a top-level instance even if the specific leaf
1621                     // instance is not on the stack. This the behavior defined
1622                     // in the spec, and it allows us to elide runtime checks in
1623                     // guest-to-guest adapters.
1624                     if task.instance.instance == instance.instance {
1625                         break Ok(false);
1626                     }
1627                     cur = match task.caller {
1628                         Caller::Host { caller, .. } => caller,
1629                         Caller::Guest { thread } => thread.into(),
1630                     };
1631                 }
1632                 CurrentThread::Host(id) => {
1633                     cur = state.get_mut(id)?.caller.into();
1634                 }
1635             }
1636         }
1637     }
1638 
1639     /// Helper function to retrieve the `InstanceState` for the
1640     /// specified instance.
instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState1641     fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1642         self.component_instance_mut(instance.instance)
1643             .instance_state(instance.index)
1644     }
1645 
set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread>1646     fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1647         // Each time we switch threads, we conservatively set `task_may_block`
1648         // to `false` for the component instance we're switching away from (if
1649         // any), meaning it will be `false` for any new thread created for that
1650         // instance unless explicitly set otherwise.
1651         let state = self.concurrent_state_mut();
1652         let old_thread = mem::replace(&mut state.current_thread, thread.into());
1653         if let Some(old_thread) = old_thread.guest() {
1654             let instance = state.get_mut(old_thread.task)?.instance.instance;
1655             self.component_instance_mut(instance)
1656                 .set_task_may_block(false)
1657         }
1658 
1659         // If we're switching to a new thread, set its component instance's
1660         // `task_may_block` according to where it left off.
1661         if self.concurrent_state_mut().current_thread.guest().is_some() {
1662             self.set_task_may_block()?;
1663         }
1664 
1665         Ok(old_thread)
1666     }
1667 
1668     /// Set the global variable representing whether the current task may block
1669     /// prior to entering Wasm code.
set_task_may_block(&mut self) -> Result<()>1670     fn set_task_may_block(&mut self) -> Result<()> {
1671         let state = self.concurrent_state_mut();
1672         let guest_thread = state.current_guest_thread()?;
1673         let instance = state.get_mut(guest_thread.task)?.instance.instance;
1674         let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1675         self.component_instance_mut(instance)
1676             .set_task_may_block(may_block);
1677         Ok(())
1678     }
1679 
check_blocking(&mut self) -> Result<()>1680     pub(crate) fn check_blocking(&mut self) -> Result<()> {
1681         if !self.concurrency_support() {
1682             return Ok(());
1683         }
1684         let state = self.concurrent_state_mut();
1685         let task = state.current_guest_thread()?.task;
1686         let instance = state.get_mut(task)?.instance.instance;
1687         let task_may_block = self.component_instance(instance).get_task_may_block();
1688 
1689         if task_may_block {
1690             Ok(())
1691         } else {
1692             Err(Trap::CannotBlockSyncTask.into())
1693         }
1694     }
1695 
1696     /// Record that we're about to enter a (sub-)component instance which does
1697     /// not support more than one concurrent, stackful activation, meaning it
1698     /// cannot be entered again until the next call returns.
enter_instance(&mut self, instance: RuntimeInstance)1699     fn enter_instance(&mut self, instance: RuntimeInstance) {
1700         log::trace!("enter {instance:?}");
1701         self.instance_state(instance)
1702             .concurrent_state()
1703             .do_not_enter = true;
1704     }
1705 
1706     /// Record that we've exited a (sub-)component instance previously entered
1707     /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1708     /// See the documentation for the latter for details.
exit_instance(&mut self, instance: RuntimeInstance) -> Result<()>1709     fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1710         log::trace!("exit {instance:?}");
1711         self.instance_state(instance)
1712             .concurrent_state()
1713             .do_not_enter = false;
1714         self.partition_pending(instance)
1715     }
1716 
1717     /// Iterate over `InstanceState::pending`, moving any ready items into the
1718     /// "high priority" work item queue.
1719     ///
1720     /// See `GuestCall::is_ready` for details.
partition_pending(&mut self, instance: RuntimeInstance) -> Result<()>1721     fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1722         for (thread, kind) in
1723             mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1724         {
1725             let call = GuestCall { thread, kind };
1726             if call.is_ready(self)? {
1727                 self.concurrent_state_mut()
1728                     .push_high_priority(WorkItem::GuestCall(instance.index, call));
1729             } else {
1730                 self.instance_state(instance)
1731                     .concurrent_state()
1732                     .pending
1733                     .insert(call.thread, call.kind);
1734             }
1735         }
1736 
1737         Ok(())
1738     }
1739 
1740     /// Implements the `backpressure.{inc,dec}` intrinsics.
backpressure_modify( &mut self, caller_instance: RuntimeInstance, modify: impl FnOnce(u16) -> Option<u16>, ) -> Result<()>1741     pub(crate) fn backpressure_modify(
1742         &mut self,
1743         caller_instance: RuntimeInstance,
1744         modify: impl FnOnce(u16) -> Option<u16>,
1745     ) -> Result<()> {
1746         let state = self.instance_state(caller_instance).concurrent_state();
1747         let old = state.backpressure;
1748         let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1749         state.backpressure = new;
1750 
1751         if old > 0 && new == 0 {
1752             // Backpressure was previously enabled and is now disabled; move any
1753             // newly-eligible guest calls to the "high priority" queue.
1754             self.partition_pending(caller_instance)?;
1755         }
1756 
1757         Ok(())
1758     }
1759 
1760     /// Resume the specified fiber, giving it exclusive access to the specified
1761     /// store.
resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()>1762     async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1763         let old_thread = self.concurrent_state_mut().current_thread;
1764         log::trace!("resume_fiber: save current thread {old_thread:?}");
1765 
1766         let fiber = fiber::resolve_or_release(self, fiber).await?;
1767 
1768         self.set_thread(old_thread)?;
1769 
1770         let state = self.concurrent_state_mut();
1771 
1772         if let Some(ot) = old_thread.guest() {
1773             state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1774         }
1775         log::trace!("resume_fiber: restore current thread {old_thread:?}");
1776 
1777         if let Some(mut fiber) = fiber {
1778             log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1779             // See the `SuspendReason` documentation for what each case means.
1780             let reason = match state.suspend_reason.take() {
1781                 Some(r) => r,
1782                 None => bail_bug!("suspend reason missing when resuming fiber"),
1783             };
1784             match reason {
1785                 SuspendReason::NeedWork => {
1786                     if state.worker.is_none() {
1787                         state.worker = Some(fiber);
1788                     } else {
1789                         fiber.dispose(self);
1790                     }
1791                 }
1792                 SuspendReason::Yielding { thread, .. } => {
1793                     state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1794                     let instance = state.get_mut(thread.task)?.instance.index;
1795                     state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1796                 }
1797                 SuspendReason::ExplicitlySuspending { thread, .. } => {
1798                     state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1799                 }
1800                 SuspendReason::Waiting { set, thread, .. } => {
1801                     let old = state
1802                         .get_mut(set)?
1803                         .waiting
1804                         .insert(thread, WaitMode::Fiber(fiber));
1805                     assert!(old.is_none());
1806                 }
1807             };
1808         } else {
1809             log::trace!("resume_fiber: fiber has exited");
1810         }
1811 
1812         Ok(())
1813     }
1814 
1815     /// Suspend the current fiber, storing the reason in
1816     /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1817     /// it should be resumed.
1818     ///
1819     /// See the `SuspendReason` documentation for details.
suspend(&mut self, reason: SuspendReason) -> Result<()>1820     fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1821         log::trace!("suspend fiber: {reason:?}");
1822 
1823         // If we're yielding or waiting on behalf of a guest thread, we'll need to
1824         // pop the call context which manages resource borrows before suspending
1825         // and then push it again once we've resumed.
1826         let task = match &reason {
1827             SuspendReason::Yielding { thread, .. }
1828             | SuspendReason::Waiting { thread, .. }
1829             | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1830             SuspendReason::NeedWork => None,
1831         };
1832 
1833         let old_guest_thread = if task.is_some() {
1834             self.concurrent_state_mut().current_thread
1835         } else {
1836             CurrentThread::None
1837         };
1838 
1839         // We should not have reached here unless either there's no current
1840         // task, or the current task is permitted to block.  In addition, we
1841         // special-case `thread.switch-to` and waiting for a subtask to go from
1842         // `starting` to `started`, both of which we consider non-blocking
1843         // operations despite requiring a suspend.
1844         debug_assert!(
1845             matches!(
1846                 reason,
1847                 SuspendReason::ExplicitlySuspending {
1848                     skip_may_block_check: true,
1849                     ..
1850                 } | SuspendReason::Waiting {
1851                     skip_may_block_check: true,
1852                     ..
1853                 } | SuspendReason::Yielding {
1854                     skip_may_block_check: true,
1855                     ..
1856                 }
1857             ) || old_guest_thread
1858                 .guest()
1859                 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1860                 .transpose()?
1861                 .unwrap_or(true)
1862         );
1863 
1864         let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1865         assert!(suspend_reason.is_none());
1866         *suspend_reason = Some(reason);
1867 
1868         self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1869 
1870         if task.is_some() {
1871             self.set_thread(old_guest_thread)?;
1872         }
1873 
1874         Ok(())
1875     }
1876 
wait_for_event(&mut self, waitable: Waitable) -> Result<()>1877     fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1878         let state = self.concurrent_state_mut();
1879         let caller = state.current_guest_thread()?;
1880         let old_set = waitable.common(state)?.set;
1881         let set = state.get_mut(caller.thread)?.sync_call_set;
1882         waitable.join(state, Some(set))?;
1883         self.suspend(SuspendReason::Waiting {
1884             set,
1885             thread: caller,
1886             skip_may_block_check: false,
1887         })?;
1888         let state = self.concurrent_state_mut();
1889         waitable.join(state, old_set)
1890     }
1891 }
1892 
1893 impl Instance {
1894     /// Get the next pending event for the specified task and (optional)
1895     /// waitable set, along with the waitable handle if applicable.
get_event( self, store: &mut StoreOpaque, guest_task: TableId<GuestTask>, set: Option<TableId<WaitableSet>>, cancellable: bool, ) -> Result<Option<(Event, Option<(Waitable, u32)>)>>1896     fn get_event(
1897         self,
1898         store: &mut StoreOpaque,
1899         guest_task: TableId<GuestTask>,
1900         set: Option<TableId<WaitableSet>>,
1901         cancellable: bool,
1902     ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1903         let state = store.concurrent_state_mut();
1904 
1905         let event = &mut state.get_mut(guest_task)?.event;
1906         if let Some(ev) = event
1907             && (cancellable || !matches!(ev, Event::Cancelled))
1908         {
1909             log::trace!("deliver event {ev:?} to {guest_task:?}");
1910             let ev = *ev;
1911             *event = None;
1912             return Ok(Some((ev, None)));
1913         }
1914 
1915         let set = match set {
1916             Some(set) => set,
1917             None => return Ok(None),
1918         };
1919         let waitable = match state.get_mut(set)?.ready.pop_first() {
1920             Some(v) => v,
1921             None => return Ok(None),
1922         };
1923 
1924         let common = waitable.common(state)?;
1925         let handle = match common.handle {
1926             Some(h) => h,
1927             None => bail_bug!("handle not set when delivering event"),
1928         };
1929         let event = match common.event.take() {
1930             Some(e) => e,
1931             None => bail_bug!("event not set when delivering event"),
1932         };
1933 
1934         log::trace!(
1935             "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1936         );
1937 
1938         waitable.on_delivery(store, self, event)?;
1939 
1940         Ok(Some((event, Some((waitable, handle)))))
1941     }
1942 
1943     /// Handle the `CallbackCode` returned from an async-lifted export or its
1944     /// callback.
1945     ///
1946     /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1947     /// using `handle_guest_call`.
handle_callback_code( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, code: u32, ) -> Result<Option<GuestCall>>1948     fn handle_callback_code(
1949         self,
1950         store: &mut StoreOpaque,
1951         guest_thread: QualifiedThreadId,
1952         runtime_instance: RuntimeComponentInstanceIndex,
1953         code: u32,
1954     ) -> Result<Option<GuestCall>> {
1955         let (code, set) = unpack_callback_code(code);
1956 
1957         log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1958 
1959         let state = store.concurrent_state_mut();
1960 
1961         let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
1962             let set = store
1963                 .instance_state(RuntimeInstance {
1964                     instance: self.id().instance(),
1965                     index: runtime_instance,
1966                 })
1967                 .handle_table()
1968                 .waitable_set_rep(handle)?;
1969 
1970             Ok(TableId::<WaitableSet>::new(set))
1971         };
1972 
1973         Ok(match code {
1974             callback_code::EXIT => {
1975                 log::trace!("implicit thread {guest_thread:?} completed");
1976                 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1977                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1978                 if task.threads.is_empty() && !task.returned_or_cancelled() {
1979                     bail!(Trap::NoAsyncResult);
1980                 }
1981                 if let Caller::Guest { .. } = task.caller {
1982                     task.exited = true;
1983                     task.callback = None;
1984                 }
1985                 if task.ready_to_delete() {
1986                     Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1987                 }
1988                 None
1989             }
1990             callback_code::YIELD => {
1991                 let task = state.get_mut(guest_thread.task)?;
1992                 // If an `Event::Cancelled` is pending, we'll deliver that;
1993                 // otherwise, we'll deliver `Event::None`.  Note that
1994                 // `GuestTask::event` is only ever set to one of those two
1995                 // `Event` variants.
1996                 if let Some(event) = task.event {
1997                     assert!(matches!(event, Event::None | Event::Cancelled));
1998                 } else {
1999                     task.event = Some(Event::None);
2000                 }
2001                 let call = GuestCall {
2002                     thread: guest_thread,
2003                     kind: GuestCallKind::DeliverEvent {
2004                         instance: self,
2005                         set: None,
2006                     },
2007                 };
2008                 if state.may_block(guest_thread.task)? {
2009                     // Push this thread onto the "low priority" queue so it runs
2010                     // after any other threads have had a chance to run.
2011                     state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2012                     None
2013                 } else {
2014                     // Yielding in a non-blocking context is defined as a no-op
2015                     // according to the spec, so we must run this thread
2016                     // immediately without allowing any others to run.
2017                     Some(call)
2018                 }
2019             }
2020             callback_code::WAIT => {
2021                 // The task may only return `WAIT` if it was created for a call
2022                 // to an async export).  Otherwise, we'll trap.
2023                 state.check_blocking_for(guest_thread.task)?;
2024 
2025                 let set = get_set(store, set)?;
2026                 let state = store.concurrent_state_mut();
2027 
2028                 if state.get_mut(guest_thread.task)?.event.is_some()
2029                     || !state.get_mut(set)?.ready.is_empty()
2030                 {
2031                     // An event is immediately available; deliver it ASAP.
2032                     state.push_high_priority(WorkItem::GuestCall(
2033                         runtime_instance,
2034                         GuestCall {
2035                             thread: guest_thread,
2036                             kind: GuestCallKind::DeliverEvent {
2037                                 instance: self,
2038                                 set: Some(set),
2039                             },
2040                         },
2041                     ));
2042                 } else {
2043                     // No event is immediately available.
2044                     //
2045                     // We're waiting, so register to be woken up when an event
2046                     // is published for this waitable set.
2047                     //
2048                     // Here we also set `GuestTask::wake_on_cancel` which allows
2049                     // `subtask.cancel` to interrupt the wait.
2050                     let old = state
2051                         .get_mut(guest_thread.thread)?
2052                         .wake_on_cancel
2053                         .replace(set);
2054                     if !old.is_none() {
2055                         bail_bug!("thread unexpectedly had wake_on_cancel set");
2056                     }
2057                     let old = state
2058                         .get_mut(set)?
2059                         .waiting
2060                         .insert(guest_thread, WaitMode::Callback(self));
2061                     if !old.is_none() {
2062                         bail_bug!("set's waiting set already had this thread registered");
2063                     }
2064                 }
2065                 None
2066             }
2067             _ => bail!(Trap::UnsupportedCallbackCode),
2068         })
2069     }
2070 
cleanup_thread( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result<()>2071     fn cleanup_thread(
2072         self,
2073         store: &mut StoreOpaque,
2074         guest_thread: QualifiedThreadId,
2075         runtime_instance: RuntimeComponentInstanceIndex,
2076     ) -> Result<()> {
2077         let state = store.concurrent_state_mut();
2078         let thread_data = state.get_mut(guest_thread.thread)?;
2079         let guest_id = match thread_data.instance_rep {
2080             Some(id) => id,
2081             None => bail_bug!("thread must have instance_rep set by now"),
2082         };
2083         let sync_call_set = thread_data.sync_call_set;
2084 
2085         // Clean up any pending subtasks in the sync_call_set
2086         for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2087             if let Some(Event::Subtask {
2088                 status: Status::Returned | Status::ReturnCancelled,
2089             }) = waitable.common(state)?.event
2090             {
2091                 waitable.delete_from(state)?;
2092             }
2093         }
2094 
2095         store
2096             .instance_state(RuntimeInstance {
2097                 instance: self.id().instance(),
2098                 index: runtime_instance,
2099             })
2100             .thread_handle_table()
2101             .guest_thread_remove(guest_id)?;
2102 
2103         store.concurrent_state_mut().delete(guest_thread.thread)?;
2104         store.concurrent_state_mut().delete(sync_call_set)?;
2105         let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2106         task.threads.remove(&guest_thread.thread);
2107         Ok(())
2108     }
2109 
2110     /// Add the specified guest call to the "high priority" work item queue, to
2111     /// be started as soon as backpressure and/or reentrance rules allow.
2112     ///
2113     /// SAFETY: The raw pointer arguments must be valid references to guest
2114     /// functions (with the appropriate signatures) when the closures queued by
2115     /// this function are called.
queue_call<T: 'static>( self, mut store: StoreContextMut<T>, guest_thread: QualifiedThreadId, callee: SendSyncPtr<VMFuncRef>, param_count: usize, result_count: usize, async_: bool, callback: Option<SendSyncPtr<VMFuncRef>>, post_return: Option<SendSyncPtr<VMFuncRef>>, ) -> Result<()>2116     unsafe fn queue_call<T: 'static>(
2117         self,
2118         mut store: StoreContextMut<T>,
2119         guest_thread: QualifiedThreadId,
2120         callee: SendSyncPtr<VMFuncRef>,
2121         param_count: usize,
2122         result_count: usize,
2123         async_: bool,
2124         callback: Option<SendSyncPtr<VMFuncRef>>,
2125         post_return: Option<SendSyncPtr<VMFuncRef>>,
2126     ) -> Result<()> {
2127         /// Return a closure which will call the specified function in the scope
2128         /// of the specified task.
2129         ///
2130         /// This will use `GuestTask::lower_params` to lower the parameters, but
2131         /// will not lift the result; instead, it returns a
2132         /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2133         /// any, may be lifted.  Note that an async-lifted export will have
2134         /// returned its result using the `task.return` intrinsic (or not
2135         /// returned a result at all, in the case of `task.cancel`), in which
2136         /// case the "result" of this call will either be a callback code or
2137         /// nothing.
2138         ///
2139         /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2140         /// the returned closure is called.
2141         unsafe fn make_call<T: 'static>(
2142             store: StoreContextMut<T>,
2143             guest_thread: QualifiedThreadId,
2144             callee: SendSyncPtr<VMFuncRef>,
2145             param_count: usize,
2146             result_count: usize,
2147         ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2148         + Send
2149         + Sync
2150         + 'static
2151         + use<T> {
2152             let token = StoreToken::new(store);
2153             move |store: &mut dyn VMStore| {
2154                 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2155 
2156                 store
2157                     .concurrent_state_mut()
2158                     .get_mut(guest_thread.thread)?
2159                     .state = GuestThreadState::Running;
2160                 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2161                 let lower = match task.lower_params.take() {
2162                     Some(l) => l,
2163                     None => bail_bug!("lower_params missing"),
2164                 };
2165 
2166                 lower(store, &mut storage[..param_count])?;
2167 
2168                 let mut store = token.as_context_mut(store);
2169 
2170                 // SAFETY: Per the contract documented in `make_call's`
2171                 // documentation, `callee` must be a valid pointer.
2172                 unsafe {
2173                     crate::Func::call_unchecked_raw(
2174                         &mut store,
2175                         callee.as_non_null(),
2176                         NonNull::new(
2177                             &mut storage[..param_count.max(result_count)]
2178                                 as *mut [MaybeUninit<ValRaw>] as _,
2179                         )
2180                         .unwrap(),
2181                     )?;
2182                 }
2183 
2184                 Ok(storage)
2185             }
2186         }
2187 
2188         // SAFETY: Per the contract described in this function documentation,
2189         // the `callee` pointer which `call` closes over must be valid when
2190         // called by the closure we queue below.
2191         let call = unsafe {
2192             make_call(
2193                 store.as_context_mut(),
2194                 guest_thread,
2195                 callee,
2196                 param_count,
2197                 result_count,
2198             )
2199         };
2200 
2201         let callee_instance = store
2202             .0
2203             .concurrent_state_mut()
2204             .get_mut(guest_thread.task)?
2205             .instance;
2206 
2207         let fun = if callback.is_some() {
2208             assert!(async_);
2209 
2210             Box::new(move |store: &mut dyn VMStore| {
2211                 self.add_guest_thread_to_instance_table(
2212                     guest_thread.thread,
2213                     store,
2214                     callee_instance.index,
2215                 )?;
2216                 let old_thread = store.set_thread(guest_thread)?;
2217                 log::trace!(
2218                     "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2219                 );
2220 
2221                 store.enter_instance(callee_instance);
2222 
2223                 // SAFETY: See the documentation for `make_call` to review the
2224                 // contract we must uphold for `call` here.
2225                 //
2226                 // Per the contract described in the `queue_call`
2227                 // documentation, the `callee` pointer which `call` closes
2228                 // over must be valid.
2229                 let storage = call(store)?;
2230 
2231                 store.exit_instance(callee_instance)?;
2232 
2233                 store.set_thread(old_thread)?;
2234                 let state = store.concurrent_state_mut();
2235                 if let Some(t) = old_thread.guest() {
2236                     state.get_mut(t.thread)?.state = GuestThreadState::Running;
2237                 }
2238                 log::trace!("stackless call: restored {old_thread:?} as current thread");
2239 
2240                 // SAFETY: `wasmparser` will have validated that the callback
2241                 // function returns a `i32` result.
2242                 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2243 
2244                 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2245             })
2246                 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2247         } else {
2248             let token = StoreToken::new(store.as_context_mut());
2249             Box::new(move |store: &mut dyn VMStore| {
2250                 self.add_guest_thread_to_instance_table(
2251                     guest_thread.thread,
2252                     store,
2253                     callee_instance.index,
2254                 )?;
2255                 let old_thread = store.set_thread(guest_thread)?;
2256                 log::trace!(
2257                     "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2258                 );
2259                 let flags = self.id().get(store).instance_flags(callee_instance.index);
2260 
2261                 // Unless this is a callback-less (i.e. stackful)
2262                 // async-lifted export, we need to record that the instance
2263                 // cannot be entered until the call returns.
2264                 if !async_ {
2265                     store.enter_instance(callee_instance);
2266                 }
2267 
2268                 // SAFETY: See the documentation for `make_call` to review the
2269                 // contract we must uphold for `call` here.
2270                 //
2271                 // Per the contract described in the `queue_call`
2272                 // documentation, the `callee` pointer which `call` closes
2273                 // over must be valid.
2274                 let storage = call(store)?;
2275 
2276                 if async_ {
2277                     let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2278                     if task.threads.len() == 1 && !task.returned_or_cancelled() {
2279                         bail!(Trap::NoAsyncResult);
2280                     }
2281                 } else {
2282                     // This is a sync-lifted export, so now is when we lift the
2283                     // result, optionally call the post-return function, if any,
2284                     // and finally notify any current or future waiters that the
2285                     // subtask has returned.
2286 
2287                     let lift = {
2288                         store.exit_instance(callee_instance)?;
2289 
2290                         let state = store.concurrent_state_mut();
2291                         if !state.get_mut(guest_thread.task)?.result.is_none() {
2292                             bail_bug!("task has not yet produced a result");
2293                         }
2294 
2295                         match state.get_mut(guest_thread.task)?.lift_result.take() {
2296                             Some(lift) => lift,
2297                             None => bail_bug!("lift_result field is missing"),
2298                         }
2299                     };
2300 
2301                     // SAFETY: `result_count` represents the number of core Wasm
2302                     // results returned, per `wasmparser`.
2303                     let result = (lift.lift)(store, unsafe {
2304                         mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2305                             &storage[..result_count],
2306                         )
2307                     })?;
2308 
2309                     let post_return_arg = match result_count {
2310                         0 => ValRaw::i32(0),
2311                         // SAFETY: `result_count` represents the number of
2312                         // core Wasm results returned, per `wasmparser`.
2313                         1 => unsafe { storage[0].assume_init() },
2314                         _ => unreachable!(),
2315                     };
2316 
2317                     unsafe {
2318                         call_post_return(
2319                             token.as_context_mut(store),
2320                             post_return.map(|v| v.as_non_null()),
2321                             post_return_arg,
2322                             flags,
2323                         )?;
2324                     }
2325 
2326                     self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2327                 }
2328 
2329                 // This is a callback-less call, so the implicit thread has now completed
2330                 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2331 
2332                 store.set_thread(old_thread)?;
2333 
2334                 let state = store.concurrent_state_mut();
2335                 let task = state.get_mut(guest_thread.task)?;
2336 
2337                 match &task.caller {
2338                     Caller::Host { .. } => {
2339                         if task.ready_to_delete() {
2340                             Waitable::Guest(guest_thread.task).delete_from(state)?;
2341                         }
2342                     }
2343                     Caller::Guest { .. } => {
2344                         task.exited = true;
2345                     }
2346                 }
2347 
2348                 Ok(None)
2349             })
2350         };
2351 
2352         store
2353             .0
2354             .concurrent_state_mut()
2355             .push_high_priority(WorkItem::GuestCall(
2356                 callee_instance.index,
2357                 GuestCall {
2358                     thread: guest_thread,
2359                     kind: GuestCallKind::StartImplicit(fun),
2360                 },
2361             ));
2362 
2363         Ok(())
2364     }
2365 
2366     /// Prepare (but do not start) a guest->guest call.
2367     ///
2368     /// This is called from fused adapter code generated in
2369     /// `wasmtime_environ::fact::trampoline::Compiler`.  `start` and `return_`
2370     /// are synthesized Wasm functions which move the parameters from the caller
2371     /// to the callee and the result from the callee to the caller,
2372     /// respectively.  The adapter will call `Self::start_call` immediately
2373     /// after calling this function.
2374     ///
2375     /// SAFETY: All the pointer arguments must be valid pointers to guest
2376     /// entities (and with the expected signatures for the function references
2377     /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
prepare_call<T: 'static>( self, mut store: StoreContextMut<T>, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, memory: *mut VMMemoryDefinition, string_encoding: StringEncoding, caller_info: CallerInfo, ) -> Result<()>2378     unsafe fn prepare_call<T: 'static>(
2379         self,
2380         mut store: StoreContextMut<T>,
2381         start: NonNull<VMFuncRef>,
2382         return_: NonNull<VMFuncRef>,
2383         caller_instance: RuntimeComponentInstanceIndex,
2384         callee_instance: RuntimeComponentInstanceIndex,
2385         task_return_type: TypeTupleIndex,
2386         callee_async: bool,
2387         memory: *mut VMMemoryDefinition,
2388         string_encoding: StringEncoding,
2389         caller_info: CallerInfo,
2390     ) -> Result<()> {
2391         if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2392             // A task may only call an async-typed function via a sync lower if
2393             // it was created by a call to an async export.  Otherwise, we'll
2394             // trap.
2395             store.0.check_blocking()?;
2396         }
2397 
2398         enum ResultInfo {
2399             Heap { results: u32 },
2400             Stack { result_count: u32 },
2401         }
2402 
2403         let result_info = match &caller_info {
2404             CallerInfo::Async {
2405                 has_result: true,
2406                 params,
2407             } => ResultInfo::Heap {
2408                 results: match params.last() {
2409                     Some(r) => r.get_u32(),
2410                     None => bail_bug!("retptr missing"),
2411                 },
2412             },
2413             CallerInfo::Async {
2414                 has_result: false, ..
2415             } => ResultInfo::Stack { result_count: 0 },
2416             CallerInfo::Sync {
2417                 result_count,
2418                 params,
2419             } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2420                 results: match params.last() {
2421                     Some(r) => r.get_u32(),
2422                     None => bail_bug!("arg ptr missing"),
2423                 },
2424             },
2425             CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2426                 result_count: *result_count,
2427             },
2428         };
2429 
2430         let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2431 
2432         // Create a new guest task for the call, closing over the `start` and
2433         // `return_` functions to lift the parameters and lower the result,
2434         // respectively.
2435         let start = SendSyncPtr::new(start);
2436         let return_ = SendSyncPtr::new(return_);
2437         let token = StoreToken::new(store.as_context_mut());
2438         let state = store.0.concurrent_state_mut();
2439         let old_thread = state.current_guest_thread()?;
2440 
2441         debug_assert_eq!(
2442             state.get_mut(old_thread.task)?.instance,
2443             RuntimeInstance {
2444                 instance: self.id().instance(),
2445                 index: caller_instance,
2446             }
2447         );
2448 
2449         let new_task = GuestTask::new(
2450             Box::new(move |store, dst| {
2451                 let mut store = token.as_context_mut(store);
2452                 assert!(dst.len() <= MAX_FLAT_PARAMS);
2453                 // The `+ 1` here accounts for the return pointer, if any:
2454                 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2455                 let count = match caller_info {
2456                     // Async callers, if they have a result, use the last
2457                     // parameter as a return pointer so chop that off if
2458                     // relevant here.
2459                     CallerInfo::Async { params, has_result } => {
2460                         let params = &params[..params.len() - usize::from(has_result)];
2461                         for (param, src) in params.iter().zip(&mut src) {
2462                             src.write(*param);
2463                         }
2464                         params.len()
2465                     }
2466 
2467                     // Sync callers forward everything directly.
2468                     CallerInfo::Sync { params, .. } => {
2469                         for (param, src) in params.iter().zip(&mut src) {
2470                             src.write(*param);
2471                         }
2472                         params.len()
2473                     }
2474                 };
2475                 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2476                 // `wasmtime-cranelift`-generated fused adapter code.  Based on
2477                 // how it was constructed (see
2478                 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2479                 // for details) we know it takes count parameters and returns
2480                 // `dst.len()` results.
2481                 unsafe {
2482                     crate::Func::call_unchecked_raw(
2483                         &mut store,
2484                         start.as_non_null(),
2485                         NonNull::new(
2486                             &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2487                         )
2488                         .unwrap(),
2489                     )?;
2490                 }
2491                 dst.copy_from_slice(&src[..dst.len()]);
2492                 let state = store.0.concurrent_state_mut();
2493                 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2494                     state,
2495                     Some(Event::Subtask {
2496                         status: Status::Started,
2497                     }),
2498                 )?;
2499                 Ok(())
2500             }),
2501             LiftResult {
2502                 lift: Box::new(move |store, src| {
2503                     // SAFETY: See comment in closure passed as `lower_params`
2504                     // parameter above.
2505                     let mut store = token.as_context_mut(store);
2506                     let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2507                     if let ResultInfo::Heap { results } = &result_info {
2508                         my_src.push(ValRaw::u32(*results));
2509                     }
2510 
2511                     // Execute the `return_` hook, generated by Wasmtime's FACT
2512                     // compiler, in the context of the old thread. The old
2513                     // thread, this thread's caller, may have `realloc`
2514                     // callbacks invoked for example and those need the correct
2515                     // context set for the current thread.
2516                     let prev = store.0.set_thread(old_thread)?;
2517 
2518                     // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2519                     // `wasmtime-cranelift`-generated fused adapter code.  Based
2520                     // on how it was constructed (see
2521                     // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2522                     // for details) we know it takes `src.len()` parameters and
2523                     // returns up to 1 result.
2524                     unsafe {
2525                         crate::Func::call_unchecked_raw(
2526                             &mut store,
2527                             return_.as_non_null(),
2528                             my_src.as_mut_slice().into(),
2529                         )?;
2530                     }
2531 
2532                     // Restore the previous current thread after the
2533                     // lifting/lowering has returned.
2534                     store.0.set_thread(prev)?;
2535 
2536                     let state = store.0.concurrent_state_mut();
2537                     let thread = state.current_guest_thread()?;
2538                     if sync_caller {
2539                         state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2540                             if let ResultInfo::Stack { result_count } = &result_info {
2541                                 match result_count {
2542                                     0 => None,
2543                                     1 => Some(my_src[0]),
2544                                     _ => unreachable!(),
2545                                 }
2546                             } else {
2547                                 None
2548                             },
2549                         );
2550                     }
2551                     Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2552                 }),
2553                 ty: task_return_type,
2554                 memory: NonNull::new(memory).map(SendSyncPtr::new),
2555                 string_encoding,
2556             },
2557             Caller::Guest { thread: old_thread },
2558             None,
2559             RuntimeInstance {
2560                 instance: self.id().instance(),
2561                 index: callee_instance,
2562             },
2563             callee_async,
2564         )?;
2565 
2566         let guest_task = state.push(new_task)?;
2567         let new_thread = GuestThread::new_implicit(state, guest_task)?;
2568         let guest_thread = state.push(new_thread)?;
2569         state.get_mut(guest_task)?.threads.insert(guest_thread);
2570 
2571         // Make the new thread the current one so that `Self::start_call` knows
2572         // which one to start.
2573         store.0.set_thread(QualifiedThreadId {
2574             task: guest_task,
2575             thread: guest_thread,
2576         })?;
2577         log::trace!(
2578             "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2579         );
2580 
2581         Ok(())
2582     }
2583 
2584     /// Call the specified callback function for an async-lifted export.
2585     ///
2586     /// SAFETY: `function` must be a valid reference to a guest function of the
2587     /// correct signature for a callback.
call_callback<T>( self, mut store: StoreContextMut<T>, function: SendSyncPtr<VMFuncRef>, event: Event, handle: u32, ) -> Result<u32>2588     unsafe fn call_callback<T>(
2589         self,
2590         mut store: StoreContextMut<T>,
2591         function: SendSyncPtr<VMFuncRef>,
2592         event: Event,
2593         handle: u32,
2594     ) -> Result<u32> {
2595         let (ordinal, result) = event.parts();
2596         let params = &mut [
2597             ValRaw::u32(ordinal),
2598             ValRaw::u32(handle),
2599             ValRaw::u32(result),
2600         ];
2601         // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2602         // `wasmtime-cranelift`-generated fused adapter code or
2603         // `component::Options`.  Per `wasmparser` callback signature
2604         // validation, we know it takes three parameters and returns one.
2605         unsafe {
2606             crate::Func::call_unchecked_raw(
2607                 &mut store,
2608                 function.as_non_null(),
2609                 params.as_mut_slice().into(),
2610             )?;
2611         }
2612         Ok(params[0].get_u32())
2613     }
2614 
2615     /// Start a guest->guest call previously prepared using
2616     /// `Self::prepare_call`.
2617     ///
2618     /// This is called from fused adapter code generated in
2619     /// `wasmtime_environ::fact::trampoline::Compiler`.  The adapter will call
2620     /// this function immediately after calling `Self::prepare_call`.
2621     ///
2622     /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2623     /// functions with the appropriate signatures for the current guest task.
2624     /// If this is a call to an async-lowered import, the actual call may be
2625     /// deferred and run after this function returns, in which case the pointer
2626     /// arguments must also be valid when the call happens.
start_call<T: 'static>( self, mut store: StoreContextMut<T>, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, storage: Option<&mut [MaybeUninit<ValRaw>]>, ) -> Result<u32>2627     unsafe fn start_call<T: 'static>(
2628         self,
2629         mut store: StoreContextMut<T>,
2630         callback: *mut VMFuncRef,
2631         post_return: *mut VMFuncRef,
2632         callee: NonNull<VMFuncRef>,
2633         param_count: u32,
2634         result_count: u32,
2635         flags: u32,
2636         storage: Option<&mut [MaybeUninit<ValRaw>]>,
2637     ) -> Result<u32> {
2638         let token = StoreToken::new(store.as_context_mut());
2639         let async_caller = storage.is_none();
2640         let state = store.0.concurrent_state_mut();
2641         let guest_thread = state.current_guest_thread()?;
2642         let callee_async = state.get_mut(guest_thread.task)?.async_function;
2643         let callee = SendSyncPtr::new(callee);
2644         let param_count = usize::try_from(param_count)?;
2645         assert!(param_count <= MAX_FLAT_PARAMS);
2646         let result_count = usize::try_from(result_count)?;
2647         assert!(result_count <= MAX_FLAT_RESULTS);
2648 
2649         let task = state.get_mut(guest_thread.task)?;
2650         if let Some(callback) = NonNull::new(callback) {
2651             // We're calling an async-lifted export with a callback, so store
2652             // the callback and related context as part of the task so we can
2653             // call it later when needed.
2654             let callback = SendSyncPtr::new(callback);
2655             task.callback = Some(Box::new(move |store, event, handle| {
2656                 let store = token.as_context_mut(store);
2657                 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2658             }));
2659         }
2660 
2661         let Caller::Guest { thread: caller } = &task.caller else {
2662             // As of this writing, `start_call` is only used for guest->guest
2663             // calls.
2664             bail_bug!("start_call unexpectedly invoked for host->guest call");
2665         };
2666         let caller = *caller;
2667         let caller_instance = state.get_mut(caller.task)?.instance;
2668 
2669         // Queue the call as a "high priority" work item.
2670         unsafe {
2671             self.queue_call(
2672                 store.as_context_mut(),
2673                 guest_thread,
2674                 callee,
2675                 param_count,
2676                 result_count,
2677                 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2678                 NonNull::new(callback).map(SendSyncPtr::new),
2679                 NonNull::new(post_return).map(SendSyncPtr::new),
2680             )?;
2681         }
2682 
2683         let state = store.0.concurrent_state_mut();
2684 
2685         // Use the caller's `GuestThread::sync_call_set` to register interest in
2686         // the subtask...
2687         let guest_waitable = Waitable::Guest(guest_thread.task);
2688         let old_set = guest_waitable.common(state)?.set;
2689         let set = state.get_mut(caller.thread)?.sync_call_set;
2690         guest_waitable.join(state, Some(set))?;
2691 
2692         // ... and suspend this fiber temporarily while we wait for it to start.
2693         //
2694         // Note that we _could_ call the callee directly using the current fiber
2695         // rather than suspend this one, but that would make reasoning about the
2696         // event loop more complicated and is probably only worth doing if
2697         // there's a measurable performance benefit.  In addition, it would mean
2698         // blocking the caller if the callee calls a blocking sync-lowered
2699         // import, and as of this writing the spec says we must not do that.
2700         //
2701         // Alternatively, the fused adapter code could be modified to call the
2702         // callee directly without calling a host-provided intrinsic at all (in
2703         // which case it would need to do its own, inline backpressure checks,
2704         // etc.).  Again, we'd want to see a measurable performance benefit
2705         // before committing to such an optimization.  And again, we'd need to
2706         // update the spec to allow that.
2707         let (status, waitable) = loop {
2708             store.0.suspend(SuspendReason::Waiting {
2709                 set,
2710                 thread: caller,
2711                 // Normally, `StoreOpaque::suspend` would assert it's being
2712                 // called from a context where blocking is allowed.  However, if
2713                 // `async_caller` is `true`, we'll only "block" long enough for
2714                 // the callee to start, i.e. we won't repeat this loop, so we
2715                 // tell `suspend` it's okay even if we're not allowed to block.
2716                 // Alternatively, if the callee is not an async function, then
2717                 // we know it won't block anyway.
2718                 skip_may_block_check: async_caller || !callee_async,
2719             })?;
2720 
2721             let state = store.0.concurrent_state_mut();
2722 
2723             log::trace!("taking event for {:?}", guest_thread.task);
2724             let event = guest_waitable.take_event(state)?;
2725             let Some(Event::Subtask { status }) = event else {
2726                 bail_bug!("subtasks should only get subtask events, got {event:?}")
2727             };
2728 
2729             log::trace!("status {status:?} for {:?}", guest_thread.task);
2730 
2731             if status == Status::Returned {
2732                 // It returned, so we can stop waiting.
2733                 break (status, None);
2734             } else if async_caller {
2735                 // It hasn't returned yet, but the caller is calling via an
2736                 // async-lowered import, so we generate a handle for the task
2737                 // waitable and return the status.
2738                 let handle = store
2739                     .0
2740                     .instance_state(caller_instance)
2741                     .handle_table()
2742                     .subtask_insert_guest(guest_thread.task.rep())?;
2743                 store
2744                     .0
2745                     .concurrent_state_mut()
2746                     .get_mut(guest_thread.task)?
2747                     .common
2748                     .handle = Some(handle);
2749                 break (status, Some(handle));
2750             } else {
2751                 // The callee hasn't returned yet, and the caller is calling via
2752                 // a sync-lowered import, so we loop and keep waiting until the
2753                 // callee returns.
2754             }
2755         };
2756 
2757         guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2758 
2759         // Reset the current thread to point to the caller as it resumes control.
2760         store.0.set_thread(caller)?;
2761         store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2762         log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2763 
2764         if let Some(storage) = storage {
2765             // The caller used a sync-lowered import to call an async-lifted
2766             // export, in which case the result, if any, has been stashed in
2767             // `GuestTask::sync_result`.
2768             let state = store.0.concurrent_state_mut();
2769             let task = state.get_mut(guest_thread.task)?;
2770             if let Some(result) = task.sync_result.take()? {
2771                 if let Some(result) = result {
2772                     storage[0] = MaybeUninit::new(result);
2773                 }
2774 
2775                 if task.exited && task.ready_to_delete() {
2776                     Waitable::Guest(guest_thread.task).delete_from(state)?;
2777                 }
2778             }
2779         }
2780 
2781         Ok(status.pack(waitable))
2782     }
2783 
2784     /// Poll the specified future once on behalf of a guest->host call using an
2785     /// async-lowered import.
2786     ///
2787     /// If it returns `Ready`, return `Ok(None)`.  Otherwise, if it returns
2788     /// `Pending`, add it to the set of futures to be polled as part of this
2789     /// instance's event loop until it completes, and then return
2790     /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2791     ///
2792     /// Whether the future returns `Ready` immediately or later, the `lower`
2793     /// function will be used to lower the result, if any, into the guest caller's
2794     /// stack and linear memory. The `lower` function is invoked with `None` if
2795     /// the future is cancelled.
first_poll<T: 'static, R: Send + 'static>( self, mut store: StoreContextMut<'_, T>, future: impl Future<Output = Result<R>> + Send + 'static, lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static, ) -> Result<Option<u32>>2796     pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2797         self,
2798         mut store: StoreContextMut<'_, T>,
2799         future: impl Future<Output = Result<R>> + Send + 'static,
2800         lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2801     ) -> Result<Option<u32>> {
2802         let token = StoreToken::new(store.as_context_mut());
2803         let state = store.0.concurrent_state_mut();
2804         let task = state.current_host_thread()?;
2805 
2806         // Create an abortable future which hooks calls to poll and manages call
2807         // context state for the future.
2808         let (join_handle, future) = JoinHandle::run(future);
2809         {
2810             let state = &mut state.get_mut(task)?.state;
2811             assert!(matches!(state, HostTaskState::CalleeStarted));
2812             *state = HostTaskState::CalleeRunning(join_handle);
2813         }
2814 
2815         let mut future = Box::pin(future);
2816 
2817         // Finally, poll the future.  We can use a dummy `Waker` here because
2818         // we'll add the future to `ConcurrentState::futures` and poll it
2819         // automatically from the event loop if it doesn't complete immediately
2820         // here.
2821         let poll = tls::set(store.0, || {
2822             future
2823                 .as_mut()
2824                 .poll(&mut Context::from_waker(&Waker::noop()))
2825         });
2826 
2827         match poll {
2828             // It finished immediately; lower the result and delete the task.
2829             Poll::Ready(result) => {
2830                 let result = result.transpose()?;
2831                 lower(store.as_context_mut(), result)?;
2832                 return Ok(None);
2833             }
2834 
2835             // Future isn't ready yet, so fall through.
2836             Poll::Pending => {}
2837         }
2838 
2839         // It hasn't finished yet; add the future to
2840         // `ConcurrentState::futures` so it will be polled by the event
2841         // loop and allocate a waitable handle to return to the guest.
2842 
2843         // Wrap the future in a closure responsible for lowering the result into
2844         // the guest's stack and memory, as well as notifying any waiters that
2845         // the task returned.
2846         let future = Box::pin(async move {
2847             let result = match future.await {
2848                 Some(result) => Some(result?),
2849                 None => None,
2850             };
2851             let on_complete = move |store: &mut dyn VMStore| {
2852                 // Restore the `current_thread` to be the host so `lower` knows
2853                 // how to manipulate borrows and knows which scope of borrows
2854                 // to check.
2855                 let mut store = token.as_context_mut(store);
2856                 let old = store.0.set_thread(task)?;
2857 
2858                 let status = if result.is_some() {
2859                     Status::Returned
2860                 } else {
2861                     Status::ReturnCancelled
2862                 };
2863 
2864                 lower(store.as_context_mut(), result)?;
2865                 let state = store.0.concurrent_state_mut();
2866                 match &mut state.get_mut(task)?.state {
2867                     // The task is already flagged as finished because it was
2868                     // cancelled. No need to transition further.
2869                     HostTaskState::CalleeDone { .. } => {}
2870 
2871                     // Otherwise transition this task to the done state.
2872                     other => *other = HostTaskState::CalleeDone { cancelled: false },
2873                 }
2874                 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2875 
2876                 store.0.set_thread(old)?;
2877                 Ok(())
2878             };
2879 
2880             // Here we schedule a task to run on a worker fiber to do the
2881             // lowering since it may involve a call to the guest's realloc
2882             // function. This is necessary because calling the guest while
2883             // there are host embedder frames on the stack is unsound.
2884             tls::get(move |store| {
2885                 store
2886                     .concurrent_state_mut()
2887                     .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2888                         on_complete,
2889                     ))));
2890                 Ok(())
2891             })
2892         });
2893 
2894         // Make this task visible to the guest and then record what it
2895         // was made visible as.
2896         let state = store.0.concurrent_state_mut();
2897         state.push_future(future);
2898         let caller = state.get_mut(task)?.caller;
2899         let instance = state.get_mut(caller.task)?.instance;
2900         let handle = store
2901             .0
2902             .instance_state(instance)
2903             .handle_table()
2904             .subtask_insert_host(task.rep())?;
2905         store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2906         log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2907 
2908         // Restore the currently running thread to this host task's
2909         // caller. Note that the host task isn't deallocated as it's
2910         // within the store and will get deallocated later.
2911         store.0.set_thread(caller)?;
2912         Ok(Some(handle))
2913     }
2914 
2915     /// Implements the `task.return` intrinsic, lifting the result for the
2916     /// current guest task.
task_return( self, store: &mut dyn VMStore, ty: TypeTupleIndex, options: OptionsIndex, storage: &[ValRaw], ) -> Result<()>2917     pub(crate) fn task_return(
2918         self,
2919         store: &mut dyn VMStore,
2920         ty: TypeTupleIndex,
2921         options: OptionsIndex,
2922         storage: &[ValRaw],
2923     ) -> Result<()> {
2924         let state = store.concurrent_state_mut();
2925         let guest_thread = state.current_guest_thread()?;
2926         let lift = state
2927             .get_mut(guest_thread.task)?
2928             .lift_result
2929             .take()
2930             .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2931         if !state.get_mut(guest_thread.task)?.result.is_none() {
2932             bail_bug!("task result unexpectedly already set");
2933         }
2934 
2935         let CanonicalOptions {
2936             string_encoding,
2937             data_model,
2938             ..
2939         } = &self.id().get(store).component().env_component().options[options];
2940 
2941         let invalid = ty != lift.ty
2942             || string_encoding != &lift.string_encoding
2943             || match data_model {
2944                 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2945                     Some(memory) => {
2946                         let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2947                         let actual = self.id().get(store).runtime_memory(memory);
2948                         expected != actual.as_ptr()
2949                     }
2950                     // Memory not specified, meaning it didn't need to be
2951                     // specified per validation, so not invalid.
2952                     None => false,
2953                 },
2954                 // Always invalid as this isn't supported.
2955                 CanonicalOptionsDataModel::Gc { .. } => true,
2956             };
2957 
2958         if invalid {
2959             bail!(Trap::TaskReturnInvalid);
2960         }
2961 
2962         log::trace!("task.return for {guest_thread:?}");
2963 
2964         let result = (lift.lift)(store, storage)?;
2965         self.task_complete(store, guest_thread.task, result, Status::Returned)
2966     }
2967 
2968     /// Implements the `task.cancel` intrinsic.
task_cancel(self, store: &mut StoreOpaque) -> Result<()>2969     pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2970         let state = store.concurrent_state_mut();
2971         let guest_thread = state.current_guest_thread()?;
2972         let task = state.get_mut(guest_thread.task)?;
2973         if !task.cancel_sent {
2974             bail!(Trap::TaskCancelNotCancelled);
2975         }
2976         _ = task
2977             .lift_result
2978             .take()
2979             .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2980 
2981         if !task.result.is_none() {
2982             bail_bug!("task result should not bet set yet");
2983         }
2984 
2985         log::trace!("task.cancel for {guest_thread:?}");
2986 
2987         self.task_complete(
2988             store,
2989             guest_thread.task,
2990             Box::new(DummyResult),
2991             Status::ReturnCancelled,
2992         )
2993     }
2994 
2995     /// Complete the specified guest task (i.e. indicate that it has either
2996     /// returned a (possibly empty) result or cancelled itself).
2997     ///
2998     /// This will return any resource borrows and notify any current or future
2999     /// waiters that the task has completed.
task_complete( self, store: &mut StoreOpaque, guest_task: TableId<GuestTask>, result: Box<dyn Any + Send + Sync>, status: Status, ) -> Result<()>3000     fn task_complete(
3001         self,
3002         store: &mut StoreOpaque,
3003         guest_task: TableId<GuestTask>,
3004         result: Box<dyn Any + Send + Sync>,
3005         status: Status,
3006     ) -> Result<()> {
3007         store
3008             .component_resource_tables(Some(self))
3009             .validate_scope_exit()?;
3010 
3011         let state = store.concurrent_state_mut();
3012         let task = state.get_mut(guest_task)?;
3013 
3014         if let Caller::Host { tx, .. } = &mut task.caller {
3015             if let Some(tx) = tx.take() {
3016                 _ = tx.send(result);
3017             }
3018         } else {
3019             task.result = Some(result);
3020             Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3021         }
3022 
3023         Ok(())
3024     }
3025 
3026     /// Implements the `waitable-set.new` intrinsic.
waitable_set_new( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, ) -> Result<u32>3027     pub(crate) fn waitable_set_new(
3028         self,
3029         store: &mut StoreOpaque,
3030         caller_instance: RuntimeComponentInstanceIndex,
3031     ) -> Result<u32> {
3032         let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3033         let handle = store
3034             .instance_state(RuntimeInstance {
3035                 instance: self.id().instance(),
3036                 index: caller_instance,
3037             })
3038             .handle_table()
3039             .waitable_set_insert(set.rep())?;
3040         log::trace!("new waitable set {set:?} (handle {handle})");
3041         Ok(handle)
3042     }
3043 
3044     /// Implements the `waitable-set.drop` intrinsic.
waitable_set_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, set: u32, ) -> Result<()>3045     pub(crate) fn waitable_set_drop(
3046         self,
3047         store: &mut StoreOpaque,
3048         caller_instance: RuntimeComponentInstanceIndex,
3049         set: u32,
3050     ) -> Result<()> {
3051         let rep = store
3052             .instance_state(RuntimeInstance {
3053                 instance: self.id().instance(),
3054                 index: caller_instance,
3055             })
3056             .handle_table()
3057             .waitable_set_remove(set)?;
3058 
3059         log::trace!("drop waitable set {rep} (handle {set})");
3060 
3061         let set = store
3062             .concurrent_state_mut()
3063             .delete(TableId::<WaitableSet>::new(rep))?;
3064 
3065         if !set.waiting.is_empty() {
3066             bail!(Trap::WaitableSetDropHasWaiters);
3067         }
3068 
3069         Ok(())
3070     }
3071 
3072     /// Implements the `waitable.join` intrinsic.
waitable_join( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, waitable_handle: u32, set_handle: u32, ) -> Result<()>3073     pub(crate) fn waitable_join(
3074         self,
3075         store: &mut StoreOpaque,
3076         caller_instance: RuntimeComponentInstanceIndex,
3077         waitable_handle: u32,
3078         set_handle: u32,
3079     ) -> Result<()> {
3080         let mut instance = self.id().get_mut(store);
3081         let waitable =
3082             Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3083 
3084         let set = if set_handle == 0 {
3085             None
3086         } else {
3087             let set = instance.instance_states().0[caller_instance]
3088                 .handle_table()
3089                 .waitable_set_rep(set_handle)?;
3090 
3091             Some(TableId::<WaitableSet>::new(set))
3092         };
3093 
3094         log::trace!(
3095             "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3096         );
3097 
3098         waitable.join(store.concurrent_state_mut(), set)
3099     }
3100 
3101     /// Implements the `subtask.drop` intrinsic.
subtask_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, task_id: u32, ) -> Result<()>3102     pub(crate) fn subtask_drop(
3103         self,
3104         store: &mut StoreOpaque,
3105         caller_instance: RuntimeComponentInstanceIndex,
3106         task_id: u32,
3107     ) -> Result<()> {
3108         self.waitable_join(store, caller_instance, task_id, 0)?;
3109 
3110         let (rep, is_host) = store
3111             .instance_state(RuntimeInstance {
3112                 instance: self.id().instance(),
3113                 index: caller_instance,
3114             })
3115             .handle_table()
3116             .subtask_remove(task_id)?;
3117 
3118         let concurrent_state = store.concurrent_state_mut();
3119         let (waitable, expected_caller, delete) = if is_host {
3120             let id = TableId::<HostTask>::new(rep);
3121             let task = concurrent_state.get_mut(id)?;
3122             match &task.state {
3123                 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3124                 HostTaskState::CalleeDone { .. } => {}
3125                 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3126                     bail_bug!("invalid state for callee in `subtask.drop`")
3127                 }
3128             }
3129             (Waitable::Host(id), task.caller, true)
3130         } else {
3131             let id = TableId::<GuestTask>::new(rep);
3132             let task = concurrent_state.get_mut(id)?;
3133             if task.lift_result.is_some() {
3134                 bail!(Trap::SubtaskDropNotResolved);
3135             }
3136             if let Caller::Guest { thread } = task.caller {
3137                 (
3138                     Waitable::Guest(id),
3139                     thread,
3140                     concurrent_state.get_mut(id)?.exited,
3141                 )
3142             } else {
3143                 bail_bug!("expected guest caller for `subtask.drop`")
3144             }
3145         };
3146 
3147         waitable.common(concurrent_state)?.handle = None;
3148 
3149         // If this subtask has an event that means that the terminal status of
3150         // this subtask wasn't yet received so it can't be dropped yet.
3151         if waitable.take_event(concurrent_state)?.is_some() {
3152             bail!(Trap::SubtaskDropNotResolved);
3153         }
3154 
3155         if delete {
3156             waitable.delete_from(concurrent_state)?;
3157         }
3158 
3159         // Since waitables can neither be passed between instances nor forged,
3160         // this should never fail unless there's a bug in Wasmtime, but we check
3161         // here to be sure:
3162         debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3163         log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3164         Ok(())
3165     }
3166 
3167     /// Implements the `waitable-set.wait` intrinsic.
waitable_set_wait( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result<u32>3168     pub(crate) fn waitable_set_wait(
3169         self,
3170         store: &mut StoreOpaque,
3171         options: OptionsIndex,
3172         set: u32,
3173         payload: u32,
3174     ) -> Result<u32> {
3175         if !self.options(store, options).async_ {
3176             // The caller may only call `waitable-set.wait` from an async task
3177             // (i.e. a task created via a call to an async export).
3178             // Otherwise, we'll trap.
3179             store.check_blocking()?;
3180         }
3181 
3182         let &CanonicalOptions {
3183             cancellable,
3184             instance: caller_instance,
3185             ..
3186         } = &self.id().get(store).component().env_component().options[options];
3187         let rep = store
3188             .instance_state(RuntimeInstance {
3189                 instance: self.id().instance(),
3190                 index: caller_instance,
3191             })
3192             .handle_table()
3193             .waitable_set_rep(set)?;
3194 
3195         self.waitable_check(
3196             store,
3197             cancellable,
3198             WaitableCheck::Wait,
3199             WaitableCheckParams {
3200                 set: TableId::new(rep),
3201                 options,
3202                 payload,
3203             },
3204         )
3205     }
3206 
3207     /// Implements the `waitable-set.poll` intrinsic.
waitable_set_poll( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result<u32>3208     pub(crate) fn waitable_set_poll(
3209         self,
3210         store: &mut StoreOpaque,
3211         options: OptionsIndex,
3212         set: u32,
3213         payload: u32,
3214     ) -> Result<u32> {
3215         let &CanonicalOptions {
3216             cancellable,
3217             instance: caller_instance,
3218             ..
3219         } = &self.id().get(store).component().env_component().options[options];
3220         let rep = store
3221             .instance_state(RuntimeInstance {
3222                 instance: self.id().instance(),
3223                 index: caller_instance,
3224             })
3225             .handle_table()
3226             .waitable_set_rep(set)?;
3227 
3228         self.waitable_check(
3229             store,
3230             cancellable,
3231             WaitableCheck::Poll,
3232             WaitableCheckParams {
3233                 set: TableId::new(rep),
3234                 options,
3235                 payload,
3236             },
3237         )
3238     }
3239 
3240     /// Implements the `thread.index` intrinsic.
thread_index(&self, store: &mut dyn VMStore) -> Result<u32>3241     pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3242         let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3243         match store
3244             .concurrent_state_mut()
3245             .get_mut(thread_id)?
3246             .instance_rep
3247         {
3248             Some(r) => Ok(r),
3249             None => bail_bug!("thread should have instance_rep by now"),
3250         }
3251     }
3252 
3253     /// Implements the `thread.new-indirect` intrinsic.
thread_new_indirect<T: 'static>( self, mut store: StoreContextMut<T>, runtime_instance: RuntimeComponentInstanceIndex, _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>3254     pub(crate) fn thread_new_indirect<T: 'static>(
3255         self,
3256         mut store: StoreContextMut<T>,
3257         runtime_instance: RuntimeComponentInstanceIndex,
3258         _func_ty_idx: TypeFuncIndex, // currently unused
3259         start_func_table_idx: RuntimeTableIndex,
3260         start_func_idx: u32,
3261         context: i32,
3262     ) -> Result<u32> {
3263         log::trace!("creating new thread");
3264 
3265         let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3266         let (instance, registry) = self.id().get_mut_and_registry(store.0);
3267         let callee = instance
3268             .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3269             .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3270         if callee.type_index(store.0) != start_func_ty.type_index() {
3271             bail!(Trap::ThreadNewIndirectInvalidType);
3272         }
3273 
3274         let token = StoreToken::new(store.as_context_mut());
3275         let start_func = Box::new(
3276             move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3277                 let old_thread = store.set_thread(guest_thread)?;
3278                 log::trace!(
3279                     "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3280                 );
3281 
3282                 let mut store = token.as_context_mut(store);
3283                 let mut params = [ValRaw::i32(context)];
3284                 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3285                 // on a separate fiber if we're running in an async store.
3286                 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3287 
3288                 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3289                 log::trace!("explicit thread {guest_thread:?} completed");
3290                 let state = store.0.concurrent_state_mut();
3291                 let task = state.get_mut(guest_thread.task)?;
3292                 if task.threads.is_empty() && !task.returned_or_cancelled() {
3293                     bail!(Trap::NoAsyncResult);
3294                 }
3295                 store.0.set_thread(old_thread)?;
3296                 let state = store.0.concurrent_state_mut();
3297                 if let Some(t) = old_thread.guest() {
3298                     state.get_mut(t.thread)?.state = GuestThreadState::Running;
3299                 }
3300                 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3301                     Waitable::Guest(guest_thread.task).delete_from(state)?;
3302                 }
3303                 log::trace!("thread start: restored {old_thread:?} as current thread");
3304 
3305                 Ok(())
3306             },
3307         );
3308 
3309         let state = store.0.concurrent_state_mut();
3310         let current_thread = state.current_guest_thread()?;
3311         let parent_task = current_thread.task;
3312 
3313         let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3314         let thread_id = state.push(new_thread)?;
3315         state.get_mut(parent_task)?.threads.insert(thread_id);
3316 
3317         log::trace!("new thread with id {thread_id:?} created");
3318 
3319         self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3320     }
3321 
resume_thread( self, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, thread_idx: u32, high_priority: bool, allow_ready: bool, ) -> Result<()>3322     pub(crate) fn resume_thread(
3323         self,
3324         store: &mut StoreOpaque,
3325         runtime_instance: RuntimeComponentInstanceIndex,
3326         thread_idx: u32,
3327         high_priority: bool,
3328         allow_ready: bool,
3329     ) -> Result<()> {
3330         let thread_id =
3331             GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3332         let state = store.concurrent_state_mut();
3333         let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3334         let thread = state.get_mut(guest_thread.thread)?;
3335 
3336         match mem::replace(&mut thread.state, GuestThreadState::Running) {
3337             GuestThreadState::NotStartedExplicit(start_func) => {
3338                 log::trace!("starting thread {guest_thread:?}");
3339                 let guest_call = WorkItem::GuestCall(
3340                     runtime_instance,
3341                     GuestCall {
3342                         thread: guest_thread,
3343                         kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3344                             start_func(store, guest_thread)
3345                         })),
3346                     },
3347                 );
3348                 store
3349                     .concurrent_state_mut()
3350                     .push_work_item(guest_call, high_priority);
3351             }
3352             GuestThreadState::Suspended(fiber) => {
3353                 log::trace!("resuming thread {thread_id:?} that was suspended");
3354                 store
3355                     .concurrent_state_mut()
3356                     .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3357             }
3358             GuestThreadState::Ready(fiber) if allow_ready => {
3359                 log::trace!("resuming thread {thread_id:?} that was ready");
3360                 thread.state = GuestThreadState::Ready(fiber);
3361                 store
3362                     .concurrent_state_mut()
3363                     .promote_thread_work_item(guest_thread);
3364             }
3365             other => {
3366                 thread.state = other;
3367                 bail!(Trap::CannotResumeThread);
3368             }
3369         }
3370         Ok(())
3371     }
3372 
add_guest_thread_to_instance_table( self, thread_id: TableId<GuestThread>, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result<u32>3373     fn add_guest_thread_to_instance_table(
3374         self,
3375         thread_id: TableId<GuestThread>,
3376         store: &mut StoreOpaque,
3377         runtime_instance: RuntimeComponentInstanceIndex,
3378     ) -> Result<u32> {
3379         let guest_id = store
3380             .instance_state(RuntimeInstance {
3381                 instance: self.id().instance(),
3382                 index: runtime_instance,
3383             })
3384             .thread_handle_table()
3385             .guest_thread_insert(thread_id.rep())?;
3386         store
3387             .concurrent_state_mut()
3388             .get_mut(thread_id)?
3389             .instance_rep = Some(guest_id);
3390         Ok(guest_id)
3391     }
3392 
3393     /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3394     /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
suspension_intrinsic( self, store: &mut StoreOpaque, caller: RuntimeComponentInstanceIndex, cancellable: bool, yielding: bool, to_thread: SuspensionTarget, ) -> Result<WaitResult>3395     pub(crate) fn suspension_intrinsic(
3396         self,
3397         store: &mut StoreOpaque,
3398         caller: RuntimeComponentInstanceIndex,
3399         cancellable: bool,
3400         yielding: bool,
3401         to_thread: SuspensionTarget,
3402     ) -> Result<WaitResult> {
3403         let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3404         if to_thread.is_none() {
3405             let state = store.concurrent_state_mut();
3406             if yielding {
3407                 // This is a `thread.yield` call
3408                 if !state.may_block(guest_thread.task)? {
3409                     // In a non-blocking context, a `thread.yield` may trigger
3410                     // other threads in the same component instance to run.
3411                     if !state.promote_instance_local_thread_work_item(caller) {
3412                         // No other threads are runnable, so just return
3413                         return Ok(WaitResult::Completed);
3414                     }
3415                 }
3416             } else {
3417                 // The caller may only call `thread.suspend` from an async task
3418                 // (i.e. a task created via a call to an async export).
3419                 // Otherwise, we'll trap.
3420                 store.check_blocking()?;
3421             }
3422         }
3423 
3424         // There could be a pending cancellation from a previous uncancellable wait
3425         if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3426             return Ok(WaitResult::Cancelled);
3427         }
3428 
3429         match to_thread {
3430             SuspensionTarget::SomeSuspended(thread) => {
3431                 self.resume_thread(store, caller, thread, true, false)?
3432             }
3433             SuspensionTarget::Some(thread) => {
3434                 self.resume_thread(store, caller, thread, true, true)?
3435             }
3436             SuspensionTarget::None => { /* nothing to do */ }
3437         }
3438 
3439         let reason = if yielding {
3440             SuspendReason::Yielding {
3441                 thread: guest_thread,
3442                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3443                 // we're handling a `thread.yield-to-suspended` call; otherwise it would
3444                 // panic if we called it in a non-blocking context.
3445                 skip_may_block_check: to_thread.is_some(),
3446             }
3447         } else {
3448             SuspendReason::ExplicitlySuspending {
3449                 thread: guest_thread,
3450                 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3451                 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3452                 // panic if we called it in a non-blocking context.
3453                 skip_may_block_check: to_thread.is_some(),
3454             }
3455         };
3456 
3457         store.suspend(reason)?;
3458 
3459         if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3460             Ok(WaitResult::Cancelled)
3461         } else {
3462             Ok(WaitResult::Completed)
3463         }
3464     }
3465 
3466     /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
waitable_check( self, store: &mut StoreOpaque, cancellable: bool, check: WaitableCheck, params: WaitableCheckParams, ) -> Result<u32>3467     fn waitable_check(
3468         self,
3469         store: &mut StoreOpaque,
3470         cancellable: bool,
3471         check: WaitableCheck,
3472         params: WaitableCheckParams,
3473     ) -> Result<u32> {
3474         let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3475 
3476         log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3477 
3478         let state = store.concurrent_state_mut();
3479         let task = state.get_mut(guest_thread.task)?;
3480 
3481         // If we're waiting, and there are no events immediately available,
3482         // suspend the fiber until that changes.
3483         match &check {
3484             WaitableCheck::Wait => {
3485                 let set = params.set;
3486 
3487                 if (task.event.is_none()
3488                     || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3489                     && state.get_mut(set)?.ready.is_empty()
3490                 {
3491                     if cancellable {
3492                         let old = state
3493                             .get_mut(guest_thread.thread)?
3494                             .wake_on_cancel
3495                             .replace(set);
3496                         if !old.is_none() {
3497                             bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3498                         }
3499                     }
3500 
3501                     store.suspend(SuspendReason::Waiting {
3502                         set,
3503                         thread: guest_thread,
3504                         skip_may_block_check: false,
3505                     })?;
3506                 }
3507             }
3508             WaitableCheck::Poll => {}
3509         }
3510 
3511         log::trace!(
3512             "waitable check for {guest_thread:?}; set {:?}, part two",
3513             params.set
3514         );
3515 
3516         // Deliver any pending events to the guest and return.
3517         let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3518 
3519         let (ordinal, handle, result) = match &check {
3520             WaitableCheck::Wait => {
3521                 let (event, waitable) = match event {
3522                     Some(p) => p,
3523                     None => bail_bug!("event expected to be present"),
3524                 };
3525                 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3526                 let (ordinal, result) = event.parts();
3527                 (ordinal, handle, result)
3528             }
3529             WaitableCheck::Poll => {
3530                 if let Some((event, waitable)) = event {
3531                     let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3532                     let (ordinal, result) = event.parts();
3533                     (ordinal, handle, result)
3534                 } else {
3535                     log::trace!(
3536                         "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3537                         guest_thread.task,
3538                         params.set
3539                     );
3540                     let (ordinal, result) = Event::None.parts();
3541                     (ordinal, 0, result)
3542                 }
3543             }
3544         };
3545         let memory = self.options_memory_mut(store, params.options);
3546         let ptr = func::validate_inbounds_dynamic(
3547             &CanonicalAbiInfo::POINTER_PAIR,
3548             memory,
3549             &ValRaw::u32(params.payload),
3550         )?;
3551         memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3552         memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3553         Ok(ordinal)
3554     }
3555 
3556     /// Implements the `subtask.cancel` intrinsic.
subtask_cancel( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, async_: bool, task_id: u32, ) -> Result<u32>3557     pub(crate) fn subtask_cancel(
3558         self,
3559         store: &mut StoreOpaque,
3560         caller_instance: RuntimeComponentInstanceIndex,
3561         async_: bool,
3562         task_id: u32,
3563     ) -> Result<u32> {
3564         if !async_ {
3565             // The caller may only sync call `subtask.cancel` from an async task
3566             // (i.e. a task created via a call to an async export).  Otherwise,
3567             // we'll trap.
3568             store.check_blocking()?;
3569         }
3570 
3571         let (rep, is_host) = store
3572             .instance_state(RuntimeInstance {
3573                 instance: self.id().instance(),
3574                 index: caller_instance,
3575             })
3576             .handle_table()
3577             .subtask_rep(task_id)?;
3578         let (waitable, expected_caller) = if is_host {
3579             let id = TableId::<HostTask>::new(rep);
3580             (
3581                 Waitable::Host(id),
3582                 store.concurrent_state_mut().get_mut(id)?.caller,
3583             )
3584         } else {
3585             let id = TableId::<GuestTask>::new(rep);
3586             if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3587                 (Waitable::Guest(id), thread)
3588             } else {
3589                 bail_bug!("expected guest caller for `subtask.cancel`")
3590             }
3591         };
3592         // Since waitables can neither be passed between instances nor forged,
3593         // this should never fail unless there's a bug in Wasmtime, but we check
3594         // here to be sure:
3595         let concurrent_state = store.concurrent_state_mut();
3596         debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3597 
3598         log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3599 
3600         let needs_block;
3601         if let Waitable::Host(host_task) = waitable {
3602             let state = &mut concurrent_state.get_mut(host_task)?.state;
3603             match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3604                 // If the callee is still running, signal an abort is requested.
3605                 //
3606                 // After cancelling this falls through to block waiting for the
3607                 // host task to actually finish assuming that `async_` is false.
3608                 // This blocking behavior resolves the race of `handle.abort()`
3609                 // with the task actually getting cancelled or finishing.
3610                 HostTaskState::CalleeRunning(handle) => {
3611                     handle.abort();
3612                     needs_block = true;
3613                 }
3614 
3615                 // Cancellation was already requested, so fail as the task can't
3616                 // be cancelled twice.
3617                 HostTaskState::CalleeDone { cancelled } => {
3618                     if cancelled {
3619                         bail!(Trap::SubtaskCancelAfterTerminal);
3620                     } else {
3621                         // The callee is already done so there's no need to
3622                         // block further for an event.
3623                         needs_block = false;
3624                     }
3625                 }
3626 
3627                 // These states should not be possible for a subtask that's
3628                 // visible from the guest, so trap here.
3629                 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3630                     bail_bug!("invalid states for host callee")
3631                 }
3632             }
3633         } else {
3634             let caller = concurrent_state.current_guest_thread()?;
3635             let guest_task = TableId::<GuestTask>::new(rep);
3636             let task = concurrent_state.get_mut(guest_task)?;
3637             if !task.already_lowered_parameters() {
3638                 // The task is in a `starting` state, meaning it hasn't run at
3639                 // all yet.  Here we update its fields to indicate that it is
3640                 // ready to delete immediately once `subtask.drop` is called.
3641                 task.lower_params = None;
3642                 task.lift_result = None;
3643                 task.exited = true;
3644 
3645                 let instance = task.instance;
3646 
3647                 assert_eq!(1, task.threads.len());
3648                 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3649                 let concurrent_state = store.concurrent_state_mut();
3650                 concurrent_state.delete(thread)?;
3651                 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3652 
3653                 // Not yet started; cancel and remove from pending
3654                 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3655                 let pending_count = pending.len();
3656                 pending.retain(|thread, _| thread.task != guest_task);
3657                 // If there were no pending threads for this task, we're in an error state
3658                 if pending.len() == pending_count {
3659                     bail!(Trap::SubtaskCancelAfterTerminal);
3660                 }
3661                 return Ok(Status::StartCancelled as u32);
3662             } else if !task.returned_or_cancelled() {
3663                 // Started, but not yet returned or cancelled; send the
3664                 // `CANCELLED` event
3665                 task.cancel_sent = true;
3666                 // Note that this might overwrite an event that was set earlier
3667                 // (e.g. `Event::None` if the task is yielding, or
3668                 // `Event::Cancelled` if it was already cancelled), but that's
3669                 // okay -- this should supersede the previous state.
3670                 task.event = Some(Event::Cancelled);
3671                 let runtime_instance = task.instance.index;
3672                 for thread in task.threads.clone() {
3673                     let thread = QualifiedThreadId {
3674                         task: guest_task,
3675                         thread,
3676                     };
3677                     if let Some(set) = concurrent_state
3678                         .get_mut(thread.thread)?
3679                         .wake_on_cancel
3680                         .take()
3681                     {
3682                         let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3683                             Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3684                             Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3685                                 runtime_instance,
3686                                 GuestCall {
3687                                     thread,
3688                                     kind: GuestCallKind::DeliverEvent {
3689                                         instance,
3690                                         set: None,
3691                                     },
3692                                 },
3693                             ),
3694                             None => bail_bug!("thread not present in wake_on_cancel set"),
3695                         };
3696                         concurrent_state.push_high_priority(item);
3697 
3698                         store.suspend(SuspendReason::Yielding {
3699                             thread: caller,
3700                             // `subtask.cancel` is not allowed to be called in a
3701                             // sync context, so we cannot skip the may-block check.
3702                             skip_may_block_check: false,
3703                         })?;
3704                         break;
3705                     }
3706                 }
3707 
3708                 // Guest tasks need to block if they have not yet returned or
3709                 // cancelled, even as a result of the event delivery above.
3710                 needs_block = !store
3711                     .concurrent_state_mut()
3712                     .get_mut(guest_task)?
3713                     .returned_or_cancelled()
3714             } else {
3715                 needs_block = false;
3716             }
3717         };
3718 
3719         // If we need to block waiting on the terminal status of this subtask
3720         // then return immediately in `async` mode, or otherwise wait for the
3721         // event to get signaled through the store.
3722         if needs_block {
3723             if async_ {
3724                 return Ok(BLOCKED);
3725             }
3726 
3727             // Wait for this waitable to get signaled with its terminal status
3728             // from the completion callback enqueued by `first_poll`. Once
3729             // that's done fall through to the sahred
3730             store.wait_for_event(waitable)?;
3731 
3732             // .. fall through to determine what event's in store for us.
3733         }
3734 
3735         let event = waitable.take_event(store.concurrent_state_mut())?;
3736         if let Some(Event::Subtask {
3737             status: status @ (Status::Returned | Status::ReturnCancelled),
3738         }) = event
3739         {
3740             Ok(status as u32)
3741         } else {
3742             bail!(Trap::SubtaskCancelAfterTerminal);
3743         }
3744     }
3745 
context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32>3746     pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3747         store.concurrent_state_mut().context_get(slot)
3748     }
3749 
context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()>3750     pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3751         store.concurrent_state_mut().context_set(slot, value)
3752     }
3753 }
3754 
3755 /// Trait representing component model ABI async intrinsics and fused adapter
3756 /// helper functions.
3757 ///
3758 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3759 /// which must be valid for at least the duration of the call (and possibly for
3760 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3761 /// pointers used for async calls).
3762 pub trait VMComponentAsyncStore {
3763     /// A helper function for fused adapter modules involving calls where the
3764     /// one of the caller or callee is async.
3765     ///
3766     /// This helper is not used when the caller and callee both use the sync
3767     /// ABI, only when at least one is async is this used.
prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()>3768     unsafe fn prepare_call(
3769         &mut self,
3770         instance: Instance,
3771         memory: *mut VMMemoryDefinition,
3772         start: NonNull<VMFuncRef>,
3773         return_: NonNull<VMFuncRef>,
3774         caller_instance: RuntimeComponentInstanceIndex,
3775         callee_instance: RuntimeComponentInstanceIndex,
3776         task_return_type: TypeTupleIndex,
3777         callee_async: bool,
3778         string_encoding: StringEncoding,
3779         result_count: u32,
3780         storage: *mut ValRaw,
3781         storage_len: usize,
3782     ) -> Result<()>;
3783 
3784     /// A helper function for fused adapter modules involving calls where the
3785     /// caller is sync-lowered but the callee is async-lifted.
sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, storage: *mut MaybeUninit<ValRaw>, storage_len: usize, ) -> Result<()>3786     unsafe fn sync_start(
3787         &mut self,
3788         instance: Instance,
3789         callback: *mut VMFuncRef,
3790         callee: NonNull<VMFuncRef>,
3791         param_count: u32,
3792         storage: *mut MaybeUninit<ValRaw>,
3793         storage_len: usize,
3794     ) -> Result<()>;
3795 
3796     /// A helper function for fused adapter modules involving calls where the
3797     /// caller is async-lowered.
async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, ) -> Result<u32>3798     unsafe fn async_start(
3799         &mut self,
3800         instance: Instance,
3801         callback: *mut VMFuncRef,
3802         post_return: *mut VMFuncRef,
3803         callee: NonNull<VMFuncRef>,
3804         param_count: u32,
3805         result_count: u32,
3806         flags: u32,
3807     ) -> Result<u32>;
3808 
3809     /// The `future.write` intrinsic.
future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>3810     fn future_write(
3811         &mut self,
3812         instance: Instance,
3813         caller: RuntimeComponentInstanceIndex,
3814         ty: TypeFutureTableIndex,
3815         options: OptionsIndex,
3816         future: u32,
3817         address: u32,
3818     ) -> Result<u32>;
3819 
3820     /// The `future.read` intrinsic.
future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>3821     fn future_read(
3822         &mut self,
3823         instance: Instance,
3824         caller: RuntimeComponentInstanceIndex,
3825         ty: TypeFutureTableIndex,
3826         options: OptionsIndex,
3827         future: u32,
3828         address: u32,
3829     ) -> Result<u32>;
3830 
3831     /// The `future.drop-writable` intrinsic.
future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()>3832     fn future_drop_writable(
3833         &mut self,
3834         instance: Instance,
3835         ty: TypeFutureTableIndex,
3836         writer: u32,
3837     ) -> Result<()>;
3838 
3839     /// The `stream.write` intrinsic.
stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>3840     fn stream_write(
3841         &mut self,
3842         instance: Instance,
3843         caller: RuntimeComponentInstanceIndex,
3844         ty: TypeStreamTableIndex,
3845         options: OptionsIndex,
3846         stream: u32,
3847         address: u32,
3848         count: u32,
3849     ) -> Result<u32>;
3850 
3851     /// The `stream.read` intrinsic.
stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>3852     fn stream_read(
3853         &mut self,
3854         instance: Instance,
3855         caller: RuntimeComponentInstanceIndex,
3856         ty: TypeStreamTableIndex,
3857         options: OptionsIndex,
3858         stream: u32,
3859         address: u32,
3860         count: u32,
3861     ) -> Result<u32>;
3862 
3863     /// The "fast-path" implementation of the `stream.write` intrinsic for
3864     /// "flat" (i.e. memcpy-able) payloads.
flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>3865     fn flat_stream_write(
3866         &mut self,
3867         instance: Instance,
3868         caller: RuntimeComponentInstanceIndex,
3869         ty: TypeStreamTableIndex,
3870         options: OptionsIndex,
3871         payload_size: u32,
3872         payload_align: u32,
3873         stream: u32,
3874         address: u32,
3875         count: u32,
3876     ) -> Result<u32>;
3877 
3878     /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3879     /// (i.e. memcpy-able) payloads.
flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>3880     fn flat_stream_read(
3881         &mut self,
3882         instance: Instance,
3883         caller: RuntimeComponentInstanceIndex,
3884         ty: TypeStreamTableIndex,
3885         options: OptionsIndex,
3886         payload_size: u32,
3887         payload_align: u32,
3888         stream: u32,
3889         address: u32,
3890         count: u32,
3891     ) -> Result<u32>;
3892 
3893     /// The `stream.drop-writable` intrinsic.
stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()>3894     fn stream_drop_writable(
3895         &mut self,
3896         instance: Instance,
3897         ty: TypeStreamTableIndex,
3898         writer: u32,
3899     ) -> Result<()>;
3900 
3901     /// The `error-context.debug-message` intrinsic.
error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>3902     fn error_context_debug_message(
3903         &mut self,
3904         instance: Instance,
3905         ty: TypeComponentLocalErrorContextTableIndex,
3906         options: OptionsIndex,
3907         err_ctx_handle: u32,
3908         debug_msg_address: u32,
3909     ) -> Result<()>;
3910 
3911     /// The `thread.new-indirect` intrinsic
thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>3912     fn thread_new_indirect(
3913         &mut self,
3914         instance: Instance,
3915         caller: RuntimeComponentInstanceIndex,
3916         func_ty_idx: TypeFuncIndex,
3917         start_func_table_idx: RuntimeTableIndex,
3918         start_func_idx: u32,
3919         context: i32,
3920     ) -> Result<u32>;
3921 }
3922 
3923 /// SAFETY: See trait docs.
3924 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count_or_max_if_async: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()>3925     unsafe fn prepare_call(
3926         &mut self,
3927         instance: Instance,
3928         memory: *mut VMMemoryDefinition,
3929         start: NonNull<VMFuncRef>,
3930         return_: NonNull<VMFuncRef>,
3931         caller_instance: RuntimeComponentInstanceIndex,
3932         callee_instance: RuntimeComponentInstanceIndex,
3933         task_return_type: TypeTupleIndex,
3934         callee_async: bool,
3935         string_encoding: StringEncoding,
3936         result_count_or_max_if_async: u32,
3937         storage: *mut ValRaw,
3938         storage_len: usize,
3939     ) -> Result<()> {
3940         // SAFETY: The `wasmtime_cranelift`-generated code that calls
3941         // this method will have ensured that `storage` is a valid
3942         // pointer containing at least `storage_len` items.
3943         let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3944 
3945         unsafe {
3946             instance.prepare_call(
3947                 StoreContextMut(self),
3948                 start,
3949                 return_,
3950                 caller_instance,
3951                 callee_instance,
3952                 task_return_type,
3953                 callee_async,
3954                 memory,
3955                 string_encoding,
3956                 match result_count_or_max_if_async {
3957                     PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3958                         params,
3959                         has_result: false,
3960                     },
3961                     PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3962                         params,
3963                         has_result: true,
3964                     },
3965                     result_count => CallerInfo::Sync {
3966                         params,
3967                         result_count,
3968                     },
3969                 },
3970             )
3971         }
3972     }
3973 
sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, storage: *mut MaybeUninit<ValRaw>, storage_len: usize, ) -> Result<()>3974     unsafe fn sync_start(
3975         &mut self,
3976         instance: Instance,
3977         callback: *mut VMFuncRef,
3978         callee: NonNull<VMFuncRef>,
3979         param_count: u32,
3980         storage: *mut MaybeUninit<ValRaw>,
3981         storage_len: usize,
3982     ) -> Result<()> {
3983         unsafe {
3984             instance
3985                 .start_call(
3986                     StoreContextMut(self),
3987                     callback,
3988                     ptr::null_mut(),
3989                     callee,
3990                     param_count,
3991                     1,
3992                     START_FLAG_ASYNC_CALLEE,
3993                     // SAFETY: The `wasmtime_cranelift`-generated code that calls
3994                     // this method will have ensured that `storage` is a valid
3995                     // pointer containing at least `storage_len` items.
3996                     Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3997                 )
3998                 .map(drop)
3999         }
4000     }
4001 
async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, ) -> Result<u32>4002     unsafe fn async_start(
4003         &mut self,
4004         instance: Instance,
4005         callback: *mut VMFuncRef,
4006         post_return: *mut VMFuncRef,
4007         callee: NonNull<VMFuncRef>,
4008         param_count: u32,
4009         result_count: u32,
4010         flags: u32,
4011     ) -> Result<u32> {
4012         unsafe {
4013             instance.start_call(
4014                 StoreContextMut(self),
4015                 callback,
4016                 post_return,
4017                 callee,
4018                 param_count,
4019                 result_count,
4020                 flags,
4021                 None,
4022             )
4023         }
4024     }
4025 
future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>4026     fn future_write(
4027         &mut self,
4028         instance: Instance,
4029         caller: RuntimeComponentInstanceIndex,
4030         ty: TypeFutureTableIndex,
4031         options: OptionsIndex,
4032         future: u32,
4033         address: u32,
4034     ) -> Result<u32> {
4035         instance
4036             .guest_write(
4037                 StoreContextMut(self),
4038                 caller,
4039                 TransmitIndex::Future(ty),
4040                 options,
4041                 None,
4042                 future,
4043                 address,
4044                 1,
4045             )
4046             .map(|result| result.encode())
4047     }
4048 
future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>4049     fn future_read(
4050         &mut self,
4051         instance: Instance,
4052         caller: RuntimeComponentInstanceIndex,
4053         ty: TypeFutureTableIndex,
4054         options: OptionsIndex,
4055         future: u32,
4056         address: u32,
4057     ) -> Result<u32> {
4058         instance
4059             .guest_read(
4060                 StoreContextMut(self),
4061                 caller,
4062                 TransmitIndex::Future(ty),
4063                 options,
4064                 None,
4065                 future,
4066                 address,
4067                 1,
4068             )
4069             .map(|result| result.encode())
4070     }
4071 
stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>4072     fn stream_write(
4073         &mut self,
4074         instance: Instance,
4075         caller: RuntimeComponentInstanceIndex,
4076         ty: TypeStreamTableIndex,
4077         options: OptionsIndex,
4078         stream: u32,
4079         address: u32,
4080         count: u32,
4081     ) -> Result<u32> {
4082         instance
4083             .guest_write(
4084                 StoreContextMut(self),
4085                 caller,
4086                 TransmitIndex::Stream(ty),
4087                 options,
4088                 None,
4089                 stream,
4090                 address,
4091                 count,
4092             )
4093             .map(|result| result.encode())
4094     }
4095 
stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>4096     fn stream_read(
4097         &mut self,
4098         instance: Instance,
4099         caller: RuntimeComponentInstanceIndex,
4100         ty: TypeStreamTableIndex,
4101         options: OptionsIndex,
4102         stream: u32,
4103         address: u32,
4104         count: u32,
4105     ) -> Result<u32> {
4106         instance
4107             .guest_read(
4108                 StoreContextMut(self),
4109                 caller,
4110                 TransmitIndex::Stream(ty),
4111                 options,
4112                 None,
4113                 stream,
4114                 address,
4115                 count,
4116             )
4117             .map(|result| result.encode())
4118     }
4119 
future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()>4120     fn future_drop_writable(
4121         &mut self,
4122         instance: Instance,
4123         ty: TypeFutureTableIndex,
4124         writer: u32,
4125     ) -> Result<()> {
4126         instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4127     }
4128 
flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>4129     fn flat_stream_write(
4130         &mut self,
4131         instance: Instance,
4132         caller: RuntimeComponentInstanceIndex,
4133         ty: TypeStreamTableIndex,
4134         options: OptionsIndex,
4135         payload_size: u32,
4136         payload_align: u32,
4137         stream: u32,
4138         address: u32,
4139         count: u32,
4140     ) -> Result<u32> {
4141         instance
4142             .guest_write(
4143                 StoreContextMut(self),
4144                 caller,
4145                 TransmitIndex::Stream(ty),
4146                 options,
4147                 Some(FlatAbi {
4148                     size: payload_size,
4149                     align: payload_align,
4150                 }),
4151                 stream,
4152                 address,
4153                 count,
4154             )
4155             .map(|result| result.encode())
4156     }
4157 
flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>4158     fn flat_stream_read(
4159         &mut self,
4160         instance: Instance,
4161         caller: RuntimeComponentInstanceIndex,
4162         ty: TypeStreamTableIndex,
4163         options: OptionsIndex,
4164         payload_size: u32,
4165         payload_align: u32,
4166         stream: u32,
4167         address: u32,
4168         count: u32,
4169     ) -> Result<u32> {
4170         instance
4171             .guest_read(
4172                 StoreContextMut(self),
4173                 caller,
4174                 TransmitIndex::Stream(ty),
4175                 options,
4176                 Some(FlatAbi {
4177                     size: payload_size,
4178                     align: payload_align,
4179                 }),
4180                 stream,
4181                 address,
4182                 count,
4183             )
4184             .map(|result| result.encode())
4185     }
4186 
stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()>4187     fn stream_drop_writable(
4188         &mut self,
4189         instance: Instance,
4190         ty: TypeStreamTableIndex,
4191         writer: u32,
4192     ) -> Result<()> {
4193         instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4194     }
4195 
error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>4196     fn error_context_debug_message(
4197         &mut self,
4198         instance: Instance,
4199         ty: TypeComponentLocalErrorContextTableIndex,
4200         options: OptionsIndex,
4201         err_ctx_handle: u32,
4202         debug_msg_address: u32,
4203     ) -> Result<()> {
4204         instance.error_context_debug_message(
4205             StoreContextMut(self),
4206             ty,
4207             options,
4208             err_ctx_handle,
4209             debug_msg_address,
4210         )
4211     }
4212 
thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>4213     fn thread_new_indirect(
4214         &mut self,
4215         instance: Instance,
4216         caller: RuntimeComponentInstanceIndex,
4217         func_ty_idx: TypeFuncIndex,
4218         start_func_table_idx: RuntimeTableIndex,
4219         start_func_idx: u32,
4220         context: i32,
4221     ) -> Result<u32> {
4222         instance.thread_new_indirect(
4223             StoreContextMut(self),
4224             caller,
4225             func_ty_idx,
4226             start_func_table_idx,
4227             start_func_idx,
4228             context,
4229         )
4230     }
4231 }
4232 
4233 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4234 
4235 /// Represents the state of a pending host task.
4236 ///
4237 /// This is used to represent tasks when the guest calls into the host.
4238 pub(crate) struct HostTask {
4239     common: WaitableCommon,
4240 
4241     /// Guest thread which called the host.
4242     caller: QualifiedThreadId,
4243 
4244     /// State of borrows/etc the host needs to track. Used when the guest passes
4245     /// borrows to the host, for example.
4246     call_context: CallContext,
4247 
4248     state: HostTaskState,
4249 }
4250 
4251 enum HostTaskState {
4252     /// A host task has been created and it's considered "started".
4253     ///
4254     /// The host task has yet to enter `first_poll` or `poll_and_block` which
4255     /// is where this will get updated further.
4256     CalleeStarted,
4257 
4258     /// State used for tasks in `first_poll` meaning that the guest did an async
4259     /// lower of a host async function which is blocked. The specified handle is
4260     /// linked to the future in the main `FuturesUnordered` of a store which is
4261     /// used to cancel it if the guest requests cancellation.
4262     CalleeRunning(JoinHandle),
4263 
4264     /// Terminal state used for tasks in `poll_and_block` to store the result of
4265     /// their computation. Note that this state is not used for tasks in
4266     /// `first_poll`.
4267     CalleeFinished(LiftedResult),
4268 
4269     /// Terminal state for host tasks meaning that the task was cancelled or the
4270     /// result was taken.
4271     CalleeDone { cancelled: bool },
4272 }
4273 
4274 impl HostTask {
new(caller: QualifiedThreadId, state: HostTaskState) -> Self4275     fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4276         Self {
4277             common: WaitableCommon::default(),
4278             call_context: CallContext::default(),
4279             caller,
4280             state,
4281         }
4282     }
4283 }
4284 
4285 impl TableDebug for HostTask {
type_name() -> &'static str4286     fn type_name() -> &'static str {
4287         "HostTask"
4288     }
4289 }
4290 
4291 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4292 
4293 /// Represents the caller of a given guest task.
4294 enum Caller {
4295     /// The host called the guest task.
4296     Host {
4297         /// If present, may be used to deliver the result.
4298         tx: Option<oneshot::Sender<LiftedResult>>,
4299         /// If true, there's a host future that must be dropped before the task
4300         /// can be deleted.
4301         host_future_present: bool,
4302         /// Represents the caller of the host function which called back into a
4303         /// guest. Note that this thread could belong to an entirely unrelated
4304         /// top-level component instance than the one the host called into.
4305         caller: CurrentThread,
4306     },
4307     /// Another guest thread called the guest task
4308     Guest {
4309         /// The id of the caller
4310         thread: QualifiedThreadId,
4311     },
4312 }
4313 
4314 /// Represents a closure and related canonical ABI parameters required to
4315 /// validate a `task.return` call at runtime and lift the result.
4316 struct LiftResult {
4317     lift: RawLift,
4318     ty: TypeTupleIndex,
4319     memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4320     string_encoding: StringEncoding,
4321 }
4322 
4323 /// The table ID for a guest thread, qualified by the task to which it belongs.
4324 ///
4325 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4326 /// for the common case of identifying the task to which a thread belongs.
4327 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4328 pub(crate) struct QualifiedThreadId {
4329     task: TableId<GuestTask>,
4330     thread: TableId<GuestThread>,
4331 }
4332 
4333 impl QualifiedThreadId {
qualify( state: &mut ConcurrentState, thread: TableId<GuestThread>, ) -> Result<QualifiedThreadId>4334     fn qualify(
4335         state: &mut ConcurrentState,
4336         thread: TableId<GuestThread>,
4337     ) -> Result<QualifiedThreadId> {
4338         Ok(QualifiedThreadId {
4339             task: state.get_mut(thread)?.parent_task,
4340             thread,
4341         })
4342     }
4343 }
4344 
4345 impl fmt::Debug for QualifiedThreadId {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result4346     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4347         f.debug_tuple("QualifiedThreadId")
4348             .field(&self.task.rep())
4349             .field(&self.thread.rep())
4350             .finish()
4351     }
4352 }
4353 
4354 enum GuestThreadState {
4355     NotStartedImplicit,
4356     NotStartedExplicit(
4357         Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4358     ),
4359     Running,
4360     Suspended(StoreFiber<'static>),
4361     Ready(StoreFiber<'static>),
4362     Completed,
4363 }
4364 pub struct GuestThread {
4365     /// Context-local state used to implement the `context.{get,set}`
4366     /// intrinsics.
4367     context: [u32; 2],
4368     /// The owning guest task.
4369     parent_task: TableId<GuestTask>,
4370     /// If present, indicates that the thread is currently waiting on the
4371     /// specified set but may be cancelled and woken immediately.
4372     wake_on_cancel: Option<TableId<WaitableSet>>,
4373     /// The execution state of this guest thread
4374     state: GuestThreadState,
4375     /// The index of this thread in the component instance's handle table.
4376     /// This must always be `Some` after initialization.
4377     instance_rep: Option<u32>,
4378     /// Scratch waitable set used to watch subtasks during synchronous calls.
4379     sync_call_set: TableId<WaitableSet>,
4380 }
4381 
4382 impl GuestThread {
4383     /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4384     /// handle.
from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, guest_thread: u32, ) -> Result<TableId<Self>>4385     fn from_instance(
4386         state: Pin<&mut ComponentInstance>,
4387         caller_instance: RuntimeComponentInstanceIndex,
4388         guest_thread: u32,
4389     ) -> Result<TableId<Self>> {
4390         let rep = state.instance_states().0[caller_instance]
4391             .thread_handle_table()
4392             .guest_thread_rep(guest_thread)?;
4393         Ok(TableId::new(rep))
4394     }
4395 
new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self>4396     fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4397         let sync_call_set = state.push(WaitableSet::default())?;
4398         Ok(Self {
4399             context: [0; 2],
4400             parent_task,
4401             wake_on_cancel: None,
4402             state: GuestThreadState::NotStartedImplicit,
4403             instance_rep: None,
4404             sync_call_set,
4405         })
4406     }
4407 
new_explicit( state: &mut ConcurrentState, parent_task: TableId<GuestTask>, start_func: Box< dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, >, ) -> Result<Self>4408     fn new_explicit(
4409         state: &mut ConcurrentState,
4410         parent_task: TableId<GuestTask>,
4411         start_func: Box<
4412             dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4413         >,
4414     ) -> Result<Self> {
4415         let sync_call_set = state.push(WaitableSet::default())?;
4416         Ok(Self {
4417             context: [0; 2],
4418             parent_task,
4419             wake_on_cancel: None,
4420             state: GuestThreadState::NotStartedExplicit(start_func),
4421             instance_rep: None,
4422             sync_call_set,
4423         })
4424     }
4425 }
4426 
4427 impl TableDebug for GuestThread {
type_name() -> &'static str4428     fn type_name() -> &'static str {
4429         "GuestThread"
4430     }
4431 }
4432 
4433 enum SyncResult {
4434     NotProduced,
4435     Produced(Option<ValRaw>),
4436     Taken,
4437 }
4438 
4439 impl SyncResult {
take(&mut self) -> Result<Option<Option<ValRaw>>>4440     fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4441         Ok(match mem::replace(self, SyncResult::Taken) {
4442             SyncResult::NotProduced => None,
4443             SyncResult::Produced(val) => Some(val),
4444             SyncResult::Taken => {
4445                 bail_bug!("attempted to take a synchronous result that was already taken")
4446             }
4447         })
4448     }
4449 }
4450 
4451 #[derive(Debug)]
4452 enum HostFutureState {
4453     NotApplicable,
4454     Live,
4455     Dropped,
4456 }
4457 
4458 /// Represents a pending guest task.
4459 pub(crate) struct GuestTask {
4460     /// See `WaitableCommon`
4461     common: WaitableCommon,
4462     /// Closure to lower the parameters passed to this task.
4463     lower_params: Option<RawLower>,
4464     /// See `LiftResult`
4465     lift_result: Option<LiftResult>,
4466     /// A place to stash the type-erased lifted result if it can't be delivered
4467     /// immediately.
4468     result: Option<LiftedResult>,
4469     /// Closure to call the callback function for an async-lifted export, if
4470     /// provided.
4471     callback: Option<CallbackFn>,
4472     /// See `Caller`
4473     caller: Caller,
4474     /// Borrow state for this task.
4475     ///
4476     /// Keeps track of `borrow<T>` received to this task to ensure that
4477     /// everything is dropped by the time it exits.
4478     call_context: CallContext,
4479     /// A place to stash the lowered result for a sync-to-async call until it
4480     /// can be returned to the caller.
4481     sync_result: SyncResult,
4482     /// Whether or not the task has been cancelled (i.e. whether the task is
4483     /// permitted to call `task.cancel`).
4484     cancel_sent: bool,
4485     /// Whether or not we've sent a `Status::Starting` event to any current or
4486     /// future waiters for this waitable.
4487     starting_sent: bool,
4488     /// The runtime instance to which the exported function for this guest task
4489     /// belongs.
4490     ///
4491     /// Note that the task may do a sync->sync call via a fused adapter which
4492     /// results in that task executing code in a different instance, and it may
4493     /// call host functions and intrinsics from that other instance.
4494     instance: RuntimeInstance,
4495     /// If present, a pending `Event::None` or `Event::Cancelled` to be
4496     /// delivered to this task.
4497     event: Option<Event>,
4498     /// Whether or not the task has exited.
4499     exited: bool,
4500     /// Threads belonging to this task
4501     threads: HashSet<TableId<GuestThread>>,
4502     /// The state of the host future that represents an async task, which must
4503     /// be dropped before we can delete the task.
4504     host_future_state: HostFutureState,
4505     /// Indicates whether this task was created for a call to an async-lifted
4506     /// export.
4507     async_function: bool,
4508 }
4509 
4510 impl GuestTask {
already_lowered_parameters(&self) -> bool4511     fn already_lowered_parameters(&self) -> bool {
4512         // We reset `lower_params` after we lower the parameters
4513         self.lower_params.is_none()
4514     }
4515 
returned_or_cancelled(&self) -> bool4516     fn returned_or_cancelled(&self) -> bool {
4517         // We reset `lift_result` after we return or exit
4518         self.lift_result.is_none()
4519     }
4520 
ready_to_delete(&self) -> bool4521     fn ready_to_delete(&self) -> bool {
4522         let threads_completed = self.threads.is_empty();
4523         let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4524         let pending_completion_event = matches!(
4525             self.common.event,
4526             Some(Event::Subtask {
4527                 status: Status::Returned | Status::ReturnCancelled
4528             })
4529         );
4530         let ready = threads_completed
4531             && !has_sync_result
4532             && !pending_completion_event
4533             && !matches!(self.host_future_state, HostFutureState::Live);
4534         log::trace!(
4535             "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4536             threads_completed,
4537             has_sync_result,
4538             pending_completion_event,
4539             self.host_future_state
4540         );
4541         ready
4542     }
4543 
new( lower_params: RawLower, lift_result: LiftResult, caller: Caller, callback: Option<CallbackFn>, instance: RuntimeInstance, async_function: bool, ) -> Result<Self>4544     fn new(
4545         lower_params: RawLower,
4546         lift_result: LiftResult,
4547         caller: Caller,
4548         callback: Option<CallbackFn>,
4549         instance: RuntimeInstance,
4550         async_function: bool,
4551     ) -> Result<Self> {
4552         let host_future_state = match &caller {
4553             Caller::Guest { .. } => HostFutureState::NotApplicable,
4554             Caller::Host {
4555                 host_future_present,
4556                 ..
4557             } => {
4558                 if *host_future_present {
4559                     HostFutureState::Live
4560                 } else {
4561                     HostFutureState::NotApplicable
4562                 }
4563             }
4564         };
4565         Ok(Self {
4566             common: WaitableCommon::default(),
4567             lower_params: Some(lower_params),
4568             lift_result: Some(lift_result),
4569             result: None,
4570             callback,
4571             caller,
4572             call_context: CallContext::default(),
4573             sync_result: SyncResult::NotProduced,
4574             cancel_sent: false,
4575             starting_sent: false,
4576             instance,
4577             event: None,
4578             exited: false,
4579             threads: HashSet::new(),
4580             host_future_state,
4581             async_function,
4582         })
4583     }
4584 
4585     /// Dispose of this guest task.
dispose(self, _state: &mut ConcurrentState) -> Result<()>4586     fn dispose(self, _state: &mut ConcurrentState) -> Result<()> {
4587         assert!(self.threads.is_empty());
4588         Ok(())
4589     }
4590 }
4591 
4592 impl TableDebug for GuestTask {
type_name() -> &'static str4593     fn type_name() -> &'static str {
4594         "GuestTask"
4595     }
4596 }
4597 
4598 /// Represents state common to all kinds of waitables.
4599 #[derive(Default)]
4600 struct WaitableCommon {
4601     /// The currently pending event for this waitable, if any.
4602     event: Option<Event>,
4603     /// The set to which this waitable belongs, if any.
4604     set: Option<TableId<WaitableSet>>,
4605     /// The handle with which the guest refers to this waitable, if any.
4606     handle: Option<u32>,
4607 }
4608 
4609 /// Represents a Component Model Async `waitable`.
4610 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4611 enum Waitable {
4612     /// A host task
4613     Host(TableId<HostTask>),
4614     /// A guest task
4615     Guest(TableId<GuestTask>),
4616     /// The read or write end of a stream or future
4617     Transmit(TableId<TransmitHandle>),
4618 }
4619 
4620 impl Waitable {
4621     /// Retrieve the `Waitable` corresponding to the specified guest-visible
4622     /// handle.
from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, waitable: u32, ) -> Result<Self>4623     fn from_instance(
4624         state: Pin<&mut ComponentInstance>,
4625         caller_instance: RuntimeComponentInstanceIndex,
4626         waitable: u32,
4627     ) -> Result<Self> {
4628         use crate::runtime::vm::component::Waitable;
4629 
4630         let (waitable, kind) = state.instance_states().0[caller_instance]
4631             .handle_table()
4632             .waitable_rep(waitable)?;
4633 
4634         Ok(match kind {
4635             Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4636             Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4637             Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4638         })
4639     }
4640 
4641     /// Retrieve the host-visible identifier for this `Waitable`.
rep(&self) -> u324642     fn rep(&self) -> u32 {
4643         match self {
4644             Self::Host(id) => id.rep(),
4645             Self::Guest(id) => id.rep(),
4646             Self::Transmit(id) => id.rep(),
4647         }
4648     }
4649 
4650     /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4651     /// remove it from any set it may currently belong to (when `set` is
4652     /// `None`).
join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()>4653     fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4654         log::trace!("waitable {self:?} join set {set:?}",);
4655 
4656         let old = mem::replace(&mut self.common(state)?.set, set);
4657 
4658         if let Some(old) = old {
4659             match *self {
4660                 Waitable::Host(id) => state.remove_child(id, old),
4661                 Waitable::Guest(id) => state.remove_child(id, old),
4662                 Waitable::Transmit(id) => state.remove_child(id, old),
4663             }?;
4664 
4665             state.get_mut(old)?.ready.remove(self);
4666         }
4667 
4668         if let Some(set) = set {
4669             match *self {
4670                 Waitable::Host(id) => state.add_child(id, set),
4671                 Waitable::Guest(id) => state.add_child(id, set),
4672                 Waitable::Transmit(id) => state.add_child(id, set),
4673             }?;
4674 
4675             if self.common(state)?.event.is_some() {
4676                 self.mark_ready(state)?;
4677             }
4678         }
4679 
4680         Ok(())
4681     }
4682 
4683     /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon>4684     fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4685         Ok(match self {
4686             Self::Host(id) => &mut state.get_mut(*id)?.common,
4687             Self::Guest(id) => &mut state.get_mut(*id)?.common,
4688             Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4689         })
4690     }
4691 
4692     /// Set or clear the pending event for this waitable and either deliver it
4693     /// to the first waiter, if any, or mark it as ready to be delivered to the
4694     /// next waiter that arrives.
set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()>4695     fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4696         log::trace!("set event for {self:?}: {event:?}");
4697         self.common(state)?.event = event;
4698         self.mark_ready(state)
4699     }
4700 
4701     /// Take the pending event from this waitable, leaving `None` in its place.
take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>>4702     fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4703         let common = self.common(state)?;
4704         let event = common.event.take();
4705         if let Some(set) = self.common(state)?.set {
4706             state.get_mut(set)?.ready.remove(self);
4707         }
4708 
4709         Ok(event)
4710     }
4711 
4712     /// Deliver the current event for this waitable to the first waiter, if any,
4713     /// or else mark it as ready to be delivered to the next waiter that
4714     /// arrives.
mark_ready(&self, state: &mut ConcurrentState) -> Result<()>4715     fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4716         if let Some(set) = self.common(state)?.set {
4717             state.get_mut(set)?.ready.insert(*self);
4718             if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4719                 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4720                 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4721 
4722                 let item = match mode {
4723                     WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4724                     WaitMode::Callback(instance) => WorkItem::GuestCall(
4725                         state.get_mut(thread.task)?.instance.index,
4726                         GuestCall {
4727                             thread,
4728                             kind: GuestCallKind::DeliverEvent {
4729                                 instance,
4730                                 set: Some(set),
4731                             },
4732                         },
4733                     ),
4734                 };
4735                 state.push_high_priority(item);
4736             }
4737         }
4738         Ok(())
4739     }
4740 
4741     /// Remove this waitable from the instance's rep table.
delete_from(&self, state: &mut ConcurrentState) -> Result<()>4742     fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4743         match self {
4744             Self::Host(task) => {
4745                 log::trace!("delete host task {task:?}");
4746                 state.delete(*task)?;
4747             }
4748             Self::Guest(task) => {
4749                 log::trace!("delete guest task {task:?}");
4750                 state.delete(*task)?.dispose(state)?;
4751             }
4752             Self::Transmit(task) => {
4753                 state.delete(*task)?;
4754             }
4755         }
4756 
4757         Ok(())
4758     }
4759 }
4760 
4761 impl fmt::Debug for Waitable {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result4762     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4763         match self {
4764             Self::Host(id) => write!(f, "{id:?}"),
4765             Self::Guest(id) => write!(f, "{id:?}"),
4766             Self::Transmit(id) => write!(f, "{id:?}"),
4767         }
4768     }
4769 }
4770 
4771 /// Represents a Component Model Async `waitable-set`.
4772 #[derive(Default)]
4773 struct WaitableSet {
4774     /// Which waitables in this set have pending events, if any.
4775     ready: BTreeSet<Waitable>,
4776     /// Which guest threads are currently waiting on this set, if any.
4777     waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4778 }
4779 
4780 impl TableDebug for WaitableSet {
type_name() -> &'static str4781     fn type_name() -> &'static str {
4782         "WaitableSet"
4783     }
4784 }
4785 
4786 /// Type-erased closure to lower the parameters for a guest task.
4787 type RawLower =
4788     Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4789 
4790 /// Type-erased closure to lift the result for a guest task.
4791 type RawLift = Box<
4792     dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4793 >;
4794 
4795 /// Type erased result of a guest task which may be downcast to the expected
4796 /// type by a host caller (or simply ignored in the case of a guest caller; see
4797 /// `DummyResult`).
4798 type LiftedResult = Box<dyn Any + Send + Sync>;
4799 
4800 /// Used to return a result from a `LiftFn` when the actual result has already
4801 /// been lowered to a guest task's stack and linear memory.
4802 struct DummyResult;
4803 
4804 /// Represents the Component Model Async state of a (sub-)component instance.
4805 #[derive(Default)]
4806 pub struct ConcurrentInstanceState {
4807     /// Whether backpressure is set for this instance (enabled if >0)
4808     backpressure: u16,
4809     /// Whether this instance can be entered
4810     do_not_enter: bool,
4811     /// Pending calls for this instance which require `Self::backpressure` to be
4812     /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4813     pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4814 }
4815 
4816 impl ConcurrentInstanceState {
pending_is_empty(&self) -> bool4817     pub fn pending_is_empty(&self) -> bool {
4818         self.pending.is_empty()
4819     }
4820 }
4821 
4822 #[derive(Debug, Copy, Clone)]
4823 pub(crate) enum CurrentThread {
4824     Guest(QualifiedThreadId),
4825     Host(TableId<HostTask>),
4826     None,
4827 }
4828 
4829 impl CurrentThread {
guest(&self) -> Option<&QualifiedThreadId>4830     fn guest(&self) -> Option<&QualifiedThreadId> {
4831         match self {
4832             Self::Guest(id) => Some(id),
4833             _ => None,
4834         }
4835     }
4836 
host(&self) -> Option<TableId<HostTask>>4837     fn host(&self) -> Option<TableId<HostTask>> {
4838         match self {
4839             Self::Host(id) => Some(*id),
4840             _ => None,
4841         }
4842     }
4843 
is_none(&self) -> bool4844     fn is_none(&self) -> bool {
4845         matches!(self, Self::None)
4846     }
4847 }
4848 
4849 impl From<QualifiedThreadId> for CurrentThread {
from(id: QualifiedThreadId) -> Self4850     fn from(id: QualifiedThreadId) -> Self {
4851         Self::Guest(id)
4852     }
4853 }
4854 
4855 impl From<TableId<HostTask>> for CurrentThread {
from(id: TableId<HostTask>) -> Self4856     fn from(id: TableId<HostTask>) -> Self {
4857         Self::Host(id)
4858     }
4859 }
4860 
4861 /// Represents the Component Model Async state of a store.
4862 pub struct ConcurrentState {
4863     /// The currently running thread, if any.
4864     current_thread: CurrentThread,
4865 
4866     /// The set of pending host and background tasks, if any.
4867     ///
4868     /// See `ComponentInstance::poll_until` for where we temporarily take this
4869     /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4870     futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4871     /// The table of waitables, waitable sets, etc.
4872     table: AlwaysMut<ResourceTable>,
4873     /// The "high priority" work queue for this store's event loop.
4874     high_priority: Vec<WorkItem>,
4875     /// The "low priority" work queue for this store's event loop.
4876     low_priority: VecDeque<WorkItem>,
4877     /// A place to stash the reason a fiber is suspending so that the code which
4878     /// resumed it will know under what conditions the fiber should be resumed
4879     /// again.
4880     suspend_reason: Option<SuspendReason>,
4881     /// A cached fiber which is waiting for work to do.
4882     ///
4883     /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4884     worker: Option<StoreFiber<'static>>,
4885     /// A place to stash the work item for which we're resuming a worker fiber.
4886     worker_item: Option<WorkerItem>,
4887 
4888     /// Reference counts for all component error contexts
4889     ///
4890     /// NOTE: it is possible the global ref count to be *greater* than the sum of
4891     /// (sub)component ref counts as tracked by `error_context_tables`, for
4892     /// example when the host holds one or more references to error contexts.
4893     ///
4894     /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4895     /// component-wide representation) of the index into concurrent state for a given
4896     /// stored `ErrorContext`.
4897     ///
4898     /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4899     /// as a `TableId<ErrorContextState>`.
4900     global_error_context_ref_counts:
4901         BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4902 }
4903 
4904 impl Default for ConcurrentState {
default() -> Self4905     fn default() -> Self {
4906         Self {
4907             current_thread: CurrentThread::None,
4908             table: AlwaysMut::new(ResourceTable::new()),
4909             futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4910             high_priority: Vec::new(),
4911             low_priority: VecDeque::new(),
4912             suspend_reason: None,
4913             worker: None,
4914             worker_item: None,
4915             global_error_context_ref_counts: BTreeMap::new(),
4916         }
4917     }
4918 }
4919 
4920 impl ConcurrentState {
4921     /// Take ownership of any fibers and futures owned by this object.
4922     ///
4923     /// This should be used when disposing of the `Store` containing this object
4924     /// in order to gracefully resolve any and all fibers using
4925     /// `StoreFiber::dispose`.  This is necessary to avoid possible
4926     /// use-after-free bugs due to fibers which may still have access to the
4927     /// `Store`.
4928     ///
4929     /// Additionally, the futures collected with this function should be dropped
4930     /// within a `tls::set` call, which will ensure than any futures closing
4931     /// over an `&Accessor` will have access to the store when dropped, allowing
4932     /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4933     /// panicking.
4934     ///
4935     /// Note that this will leave the object in an inconsistent and unusable
4936     /// state, so it should only be used just prior to dropping it.
take_fibers_and_futures( &mut self, fibers: &mut Vec<StoreFiber<'static>>, futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, )4937     pub(crate) fn take_fibers_and_futures(
4938         &mut self,
4939         fibers: &mut Vec<StoreFiber<'static>>,
4940         futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4941     ) {
4942         for entry in self.table.get_mut().iter_mut() {
4943             if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4944                 for mode in mem::take(&mut set.waiting).into_values() {
4945                     if let WaitMode::Fiber(fiber) = mode {
4946                         fibers.push(fiber);
4947                     }
4948                 }
4949             } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4950                 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4951                     mem::replace(&mut thread.state, GuestThreadState::Completed)
4952                 {
4953                     fibers.push(fiber);
4954                 }
4955             }
4956         }
4957 
4958         if let Some(fiber) = self.worker.take() {
4959             fibers.push(fiber);
4960         }
4961 
4962         let mut handle_item = |item| match item {
4963             WorkItem::ResumeFiber(fiber) => {
4964                 fibers.push(fiber);
4965             }
4966             WorkItem::PushFuture(future) => {
4967                 self.futures
4968                     .get_mut()
4969                     .as_mut()
4970                     .unwrap()
4971                     .push(future.into_inner());
4972             }
4973             WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
4974             }
4975         };
4976 
4977         for item in mem::take(&mut self.high_priority) {
4978             handle_item(item);
4979         }
4980         for item in mem::take(&mut self.low_priority) {
4981             handle_item(item);
4982         }
4983 
4984         if let Some(them) = self.futures.get_mut().take() {
4985             futures.push(them);
4986         }
4987     }
4988 
4989     /// Collect the next set of work items to run. This will be either all
4990     /// high-priority items, or a single low-priority item if there are no
4991     /// high-priority items.
collect_work_items_to_run(&mut self) -> Vec<WorkItem>4992     fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4993         let mut ready = mem::take(&mut self.high_priority);
4994         if ready.is_empty() {
4995             if let Some(item) = self.low_priority.pop_back() {
4996                 ready.push(item);
4997             }
4998         }
4999         ready
5000     }
5001 
push<V: Send + Sync + 'static>( &mut self, value: V, ) -> Result<TableId<V>, ResourceTableError>5002     fn push<V: Send + Sync + 'static>(
5003         &mut self,
5004         value: V,
5005     ) -> Result<TableId<V>, ResourceTableError> {
5006         self.table.get_mut().push(value).map(TableId::from)
5007     }
5008 
get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError>5009     fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5010         self.table.get_mut().get_mut(&Resource::from(id))
5011     }
5012 
add_child<T: 'static, U: 'static>( &mut self, child: TableId<T>, parent: TableId<U>, ) -> Result<(), ResourceTableError>5013     pub fn add_child<T: 'static, U: 'static>(
5014         &mut self,
5015         child: TableId<T>,
5016         parent: TableId<U>,
5017     ) -> Result<(), ResourceTableError> {
5018         self.table
5019             .get_mut()
5020             .add_child(Resource::from(child), Resource::from(parent))
5021     }
5022 
remove_child<T: 'static, U: 'static>( &mut self, child: TableId<T>, parent: TableId<U>, ) -> Result<(), ResourceTableError>5023     pub fn remove_child<T: 'static, U: 'static>(
5024         &mut self,
5025         child: TableId<T>,
5026         parent: TableId<U>,
5027     ) -> Result<(), ResourceTableError> {
5028         self.table
5029             .get_mut()
5030             .remove_child(Resource::from(child), Resource::from(parent))
5031     }
5032 
delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError>5033     fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5034         self.table.get_mut().delete(Resource::from(id))
5035     }
5036 
push_future(&mut self, future: HostTaskFuture)5037     fn push_future(&mut self, future: HostTaskFuture) {
5038         // Note that we can't directly push to `ConcurrentState::futures` here
5039         // since this may be called from a future that's being polled inside
5040         // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5041         // so it has exclusive access while polling it.  Therefore, we push a
5042         // work item to the "high priority" queue, which will actually push to
5043         // `ConcurrentState::futures` later.
5044         self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5045     }
5046 
push_high_priority(&mut self, item: WorkItem)5047     fn push_high_priority(&mut self, item: WorkItem) {
5048         log::trace!("push high priority: {item:?}");
5049         self.high_priority.push(item);
5050     }
5051 
push_low_priority(&mut self, item: WorkItem)5052     fn push_low_priority(&mut self, item: WorkItem) {
5053         log::trace!("push low priority: {item:?}");
5054         self.low_priority.push_front(item);
5055     }
5056 
push_work_item(&mut self, item: WorkItem, high_priority: bool)5057     fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5058         if high_priority {
5059             self.push_high_priority(item);
5060         } else {
5061             self.push_low_priority(item);
5062         }
5063     }
5064 
promote_instance_local_thread_work_item( &mut self, current_instance: RuntimeComponentInstanceIndex, ) -> bool5065     fn promote_instance_local_thread_work_item(
5066         &mut self,
5067         current_instance: RuntimeComponentInstanceIndex,
5068     ) -> bool {
5069         self.promote_work_items_matching(|item: &WorkItem| match item {
5070             WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5071                 *instance == current_instance
5072             }
5073             _ => false,
5074         })
5075     }
5076 
promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool5077     fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5078         self.promote_work_items_matching(|item: &WorkItem| match item {
5079             WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5080                 *t == thread
5081             }
5082             _ => false,
5083         })
5084     }
5085 
promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool where F: FnMut(&WorkItem) -> bool,5086     fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5087     where
5088         F: FnMut(&WorkItem) -> bool,
5089     {
5090         // If there's a high-priority work item to resume the current guest thread,
5091         // we don't need to promote anything, but we return true to indicate that
5092         // work is pending for the current instance.
5093         if self.high_priority.iter().any(&mut predicate) {
5094             true
5095         }
5096         // Otherwise, look for a low-priority work item that matches the current
5097         // instance and promote it to high-priority.
5098         else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5099             let item = self.low_priority.remove(idx).unwrap();
5100             self.push_high_priority(item);
5101             true
5102         } else {
5103             false
5104         }
5105     }
5106 
5107     /// Implements the `context.get` intrinsic.
context_get(&mut self, slot: u32) -> Result<u32>5108     pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5109         let thread = self.current_guest_thread()?;
5110         let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?];
5111         log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5112         Ok(val)
5113     }
5114 
5115     /// Implements the `context.set` intrinsic.
context_set(&mut self, slot: u32, val: u32) -> Result<()>5116     pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5117         let thread = self.current_guest_thread()?;
5118         log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5119         self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val;
5120         Ok(())
5121     }
5122 
5123     /// Returns whether there's a pending cancellation on the current guest thread,
5124     /// consuming the event if so.
take_pending_cancellation(&mut self) -> Result<bool>5125     fn take_pending_cancellation(&mut self) -> Result<bool> {
5126         let thread = self.current_guest_thread()?;
5127         if let Some(event) = self.get_mut(thread.task)?.event.take() {
5128             assert!(matches!(event, Event::Cancelled));
5129             Ok(true)
5130         } else {
5131             Ok(false)
5132         }
5133     }
5134 
check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()>5135     fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5136         if self.may_block(task)? {
5137             Ok(())
5138         } else {
5139             Err(Trap::CannotBlockSyncTask.into())
5140         }
5141     }
5142 
may_block(&mut self, task: TableId<GuestTask>) -> Result<bool>5143     fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5144         let task = self.get_mut(task)?;
5145         Ok(task.async_function || task.returned_or_cancelled())
5146     }
5147 
5148     /// Used by `ResourceTables` to acquire the current `CallContext` for the
5149     /// specified task.
5150     ///
5151     /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5152     /// below.
call_context(&mut self, task: u32) -> Result<&mut CallContext>5153     pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5154         let (task, is_host) = (task >> 1, task & 1 == 1);
5155         if is_host {
5156             let task: TableId<HostTask> = TableId::new(task);
5157             Ok(&mut self.get_mut(task)?.call_context)
5158         } else {
5159             let task: TableId<GuestTask> = TableId::new(task);
5160             Ok(&mut self.get_mut(task)?.call_context)
5161         }
5162     }
5163 
5164     /// Used by `ResourceTables` to record the scope of a borrow to get undone
5165     /// in the future.
current_call_context_scope_id(&self) -> Result<u32>5166     pub fn current_call_context_scope_id(&self) -> Result<u32> {
5167         let (bits, is_host) = match self.current_thread {
5168             CurrentThread::Guest(id) => (id.task.rep(), false),
5169             CurrentThread::Host(id) => (id.rep(), true),
5170             CurrentThread::None => bail_bug!("current thread is not set"),
5171         };
5172         assert_eq!((bits << 1) >> 1, bits);
5173         Ok((bits << 1) | u32::from(is_host))
5174     }
5175 
current_guest_thread(&self) -> Result<QualifiedThreadId>5176     fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5177         match self.current_thread.guest() {
5178             Some(id) => Ok(*id),
5179             None => bail_bug!("current thread is not a guest thread"),
5180         }
5181     }
5182 
current_host_thread(&self) -> Result<TableId<HostTask>>5183     fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5184         match self.current_thread.host() {
5185             Some(id) => Ok(id),
5186             None => bail_bug!("current thread is not a host thread"),
5187         }
5188     }
5189 
futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>>5190     fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5191         match self.futures.get_mut().as_mut() {
5192             Some(f) => Ok(f),
5193             None => bail_bug!("futures field of concurrent state is currently taken"),
5194         }
5195     }
5196 
table(&mut self) -> &mut ResourceTable5197     pub(crate) fn table(&mut self) -> &mut ResourceTable {
5198         self.table.get_mut()
5199     }
5200 }
5201 
5202 /// Provide a type hint to compiler about the shape of a parameter lower
5203 /// closure.
for_any_lower< F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, >( fun: F, ) -> F5204 fn for_any_lower<
5205     F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5206 >(
5207     fun: F,
5208 ) -> F {
5209     fun
5210 }
5211 
5212 /// Provide a type hint to compiler about the shape of a result lift closure.
for_any_lift< F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, >( fun: F, ) -> F5213 fn for_any_lift<
5214     F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5215 >(
5216     fun: F,
5217 ) -> F {
5218     fun
5219 }
5220 
5221 /// Wrap the specified future in a `poll_fn` which asserts that the future is
5222 /// only polled from the event loop of the specified `Store`.
5223 ///
5224 /// See `StoreContextMut::run_concurrent` for details.
checked<F: Future + Send + 'static>( id: StoreId, fut: F, ) -> impl Future<Output = F::Output> + Send + 'static5225 fn checked<F: Future + Send + 'static>(
5226     id: StoreId,
5227     fut: F,
5228 ) -> impl Future<Output = F::Output> + Send + 'static {
5229     async move {
5230         let mut fut = pin!(fut);
5231         future::poll_fn(move |cx| {
5232             let message = "\
5233                 `Future`s which depend on asynchronous component tasks, streams, or \
5234                 futures to complete may only be polled from the event loop of the \
5235                 store to which they belong.  Please use \
5236                 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5237             ";
5238             tls::try_get(|store| {
5239                 let matched = match store {
5240                     tls::TryGet::Some(store) => store.id() == id,
5241                     tls::TryGet::Taken | tls::TryGet::None => false,
5242                 };
5243 
5244                 if !matched {
5245                     panic!("{message}")
5246                 }
5247             });
5248             fut.as_mut().poll(cx)
5249         })
5250         .await
5251     }
5252 }
5253 
5254 /// Assert that `StoreContextMut::run_concurrent` has not been called from
5255 /// within an store's event loop.
check_recursive_run()5256 fn check_recursive_run() {
5257     tls::try_get(|store| {
5258         if !matches!(store, tls::TryGet::None) {
5259             panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5260         }
5261     });
5262 }
5263 
unpack_callback_code(code: u32) -> (u32, u32)5264 fn unpack_callback_code(code: u32) -> (u32, u32) {
5265     (code & 0xF, code >> 4)
5266 }
5267 
5268 /// Helper struct for packaging parameters to be passed to
5269 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5270 /// `waitable-set.poll`.
5271 struct WaitableCheckParams {
5272     set: TableId<WaitableSet>,
5273     options: OptionsIndex,
5274     payload: u32,
5275 }
5276 
5277 /// Indicates whether `ComponentInstance::waitable_check` is being called for
5278 /// `waitable-set.wait` or `waitable-set.poll`.
5279 enum WaitableCheck {
5280     Wait,
5281     Poll,
5282 }
5283 
5284 /// Represents a guest task called from the host, prepared using `prepare_call`.
5285 pub(crate) struct PreparedCall<R> {
5286     /// The guest export to be called
5287     handle: Func,
5288     /// The guest thread created by `prepare_call`
5289     thread: QualifiedThreadId,
5290     /// The number of lowered core Wasm parameters to pass to the call.
5291     param_count: usize,
5292     /// The `oneshot::Receiver` to which the result of the call will be
5293     /// delivered when it is available.
5294     rx: oneshot::Receiver<LiftedResult>,
5295     _phantom: PhantomData<R>,
5296 }
5297 
5298 impl<R> PreparedCall<R> {
5299     /// Get a copy of the `TaskId` for this `PreparedCall`.
task_id(&self) -> TaskId5300     pub(crate) fn task_id(&self) -> TaskId {
5301         TaskId {
5302             task: self.thread.task,
5303         }
5304     }
5305 }
5306 
5307 /// Represents a task created by `prepare_call`.
5308 pub(crate) struct TaskId {
5309     task: TableId<GuestTask>,
5310 }
5311 
5312 impl TaskId {
5313     /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5314     /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5315     /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5316     /// and can be resumed by other tasks for this component, so we mark the future as dropped
5317     /// and delete the task when all threads are done.
host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()>5318     pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5319         let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5320         if !task.already_lowered_parameters() {
5321             Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5322         } else {
5323             task.host_future_state = HostFutureState::Dropped;
5324             if task.ready_to_delete() {
5325                 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5326             }
5327         }
5328         Ok(())
5329     }
5330 }
5331 
5332 /// Prepare a call to the specified exported Wasm function, providing functions
5333 /// for lowering the parameters and lifting the result.
5334 ///
5335 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5336 /// loop, use `queue_call`.
prepare_call<T, R>( mut store: StoreContextMut<T>, handle: Func, param_count: usize, host_future_present: bool, lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync + 'static, lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync + 'static, ) -> Result<PreparedCall<R>>5337 pub(crate) fn prepare_call<T, R>(
5338     mut store: StoreContextMut<T>,
5339     handle: Func,
5340     param_count: usize,
5341     host_future_present: bool,
5342     lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5343     + Send
5344     + Sync
5345     + 'static,
5346     lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5347     + Send
5348     + Sync
5349     + 'static,
5350 ) -> Result<PreparedCall<R>> {
5351     let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5352 
5353     let instance = handle.instance().id().get(store.0);
5354     let options = &instance.component().env_component().options[options];
5355     let ty = &instance.component().types()[ty];
5356     let async_function = ty.async_;
5357     let task_return_type = ty.results;
5358     let component_instance = raw_options.instance;
5359     let callback = options.callback.map(|i| instance.runtime_callback(i));
5360     let memory = options
5361         .memory()
5362         .map(|i| instance.runtime_memory(i))
5363         .map(SendSyncPtr::new);
5364     let string_encoding = options.string_encoding;
5365     let token = StoreToken::new(store.as_context_mut());
5366     let state = store.0.concurrent_state_mut();
5367 
5368     let (tx, rx) = oneshot::channel();
5369 
5370     let instance = RuntimeInstance {
5371         instance: handle.instance().id().instance(),
5372         index: component_instance,
5373     };
5374     let caller = state.current_thread;
5375     let task = GuestTask::new(
5376         Box::new(for_any_lower(move |store, params| {
5377             lower_params(handle, token.as_context_mut(store), params)
5378         })),
5379         LiftResult {
5380             lift: Box::new(for_any_lift(move |store, result| {
5381                 lift_result(handle, store, result)
5382             })),
5383             ty: task_return_type,
5384             memory,
5385             string_encoding,
5386         },
5387         Caller::Host {
5388             tx: Some(tx),
5389             host_future_present,
5390             caller,
5391         },
5392         callback.map(|callback| {
5393             let callback = SendSyncPtr::new(callback);
5394             let instance = handle.instance();
5395             Box::new(move |store: &mut dyn VMStore, event, handle| {
5396                 let store = token.as_context_mut(store);
5397                 // SAFETY: Per the contract of `prepare_call`, the callback
5398                 // will remain valid at least as long is this task exists.
5399                 unsafe { instance.call_callback(store, callback, event, handle) }
5400             }) as CallbackFn
5401         }),
5402         instance,
5403         async_function,
5404     )?;
5405 
5406     let task = state.push(task)?;
5407     let new_thread = GuestThread::new_implicit(state, task)?;
5408     let thread = state.push(new_thread)?;
5409     state.get_mut(task)?.threads.insert(thread);
5410 
5411     if !store.0.may_enter(instance)? {
5412         bail!(Trap::CannotEnterComponent);
5413     }
5414 
5415     Ok(PreparedCall {
5416         handle,
5417         thread: QualifiedThreadId { task, thread },
5418         param_count,
5419         rx,
5420         _phantom: PhantomData,
5421     })
5422 }
5423 
5424 /// Queue a call previously prepared using `prepare_call` to be run as part of
5425 /// the associated `ComponentInstance`'s event loop.
5426 ///
5427 /// The returned future will resolve to the result once it is available, but
5428 /// must only be polled via the instance's event loop. See
5429 /// `StoreContextMut::run_concurrent` for details.
queue_call<T: 'static, R: Send + 'static>( mut store: StoreContextMut<T>, prepared: PreparedCall<R>, ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>>5430 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5431     mut store: StoreContextMut<T>,
5432     prepared: PreparedCall<R>,
5433 ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5434     let PreparedCall {
5435         handle,
5436         thread,
5437         param_count,
5438         rx,
5439         ..
5440     } = prepared;
5441 
5442     queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5443 
5444     Ok(checked(
5445         store.0.id(),
5446         rx.map(move |result| match result {
5447             Ok(r) => match r.downcast() {
5448                 Ok(r) => Ok(*r),
5449                 Err(_) => bail_bug!("wrong type of value produced"),
5450             },
5451             Err(e) => Err(e.into()),
5452         }),
5453     ))
5454 }
5455 
5456 /// Queue a call previously prepared using `prepare_call` to be run as part of
5457 /// the associated `ComponentInstance`'s event loop.
queue_call0<T: 'static>( store: StoreContextMut<T>, handle: Func, guest_thread: QualifiedThreadId, param_count: usize, ) -> Result<()>5458 fn queue_call0<T: 'static>(
5459     store: StoreContextMut<T>,
5460     handle: Func,
5461     guest_thread: QualifiedThreadId,
5462     param_count: usize,
5463 ) -> Result<()> {
5464     let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5465     let is_concurrent = raw_options.async_;
5466     let callback = raw_options.callback;
5467     let instance = handle.instance();
5468     let callee = handle.lifted_core_func(store.0);
5469     let post_return = handle.post_return_core_func(store.0);
5470     let callback = callback.map(|i| {
5471         let instance = instance.id().get(store.0);
5472         SendSyncPtr::new(instance.runtime_callback(i))
5473     });
5474 
5475     log::trace!("queueing call {guest_thread:?}");
5476 
5477     // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5478     // (with signatures appropriate for this call) and will remain valid as
5479     // long as this instance is valid.
5480     unsafe {
5481         instance.queue_call(
5482             store,
5483             guest_thread,
5484             SendSyncPtr::new(callee),
5485             param_count,
5486             1,
5487             is_concurrent,
5488             callback,
5489             post_return.map(SendSyncPtr::new),
5490         )
5491     }
5492 }
5493