1 //! Runtime support for the Component Model Async ABI.
2 //!
3 //! This module and its submodules provide host runtime support for Component
4 //! Model Async features such as async-lifted exports, async-lowered imports,
5 //! streams, futures, and related intrinsics. See [the Async
6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md)
7 //! for a high-level overview.
8 //!
9 //! At the core of this support is an event loop which schedules and switches
10 //! between guest tasks and any host tasks they create. Each
11 //! `Store` will have at most one event loop running at any given
12 //! time, and that loop may be suspended and resumed by the host embedder using
13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until`
14 //! function contains the loop itself, while the
15 //! `StoreOpaque::concurrent_state` field holds its state.
16 //!
17 //! # Public API Overview
18 //!
19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop)
20 //!
21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an
22 //! async-lifted or sync-lifted import, creating a guest task.
23 //!
24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified
25 //! instance, allowing any and all tasks belonging to that instance to make
26 //! progress.
27 //!
28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop
29 //! for the specified instance.
30 //!
31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or
32 //! `stream` which may be passed to the guest. This takes a
33 //! `{Future,Stream}Producer` implementation which will be polled for items when
34 //! the consumer requests them.
35 //!
36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by
37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items
38 //! produced by the write end.
39 //!
40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks)
41 //!
42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host
43 //! function with the linker. That function will take an `Accessor` as its
44 //! first parameter, which provides access to the store between (but not across)
45 //! await points.
46 //!
47 //! - `Accessor::with`: Access the store and its associated data.
48 //!
49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the
50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use
51 //! in host functions.
52
53 use crate::bail_bug;
54 use crate::component::func::{self, Func, call_post_return};
55 use crate::component::{
56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
57 };
58 use crate::fiber::{self, StoreFiber, StoreFiberYield};
59 use crate::prelude::*;
60 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
61 use crate::vm::component::{CallContext, ComponentInstance, InstanceState};
62 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
63 use crate::{
64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail,
65 };
66 use error_contexts::GlobalErrorContextRefCount;
67 use futures::channel::oneshot;
68 use futures::future::{self, FutureExt};
69 use futures::stream::{FuturesUnordered, StreamExt};
70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex};
71 use std::any::Any;
72 use std::borrow::ToOwned;
73 use std::boxed::Box;
74 use std::cell::UnsafeCell;
75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
76 use std::fmt;
77 use std::future::Future;
78 use std::marker::PhantomData;
79 use std::mem::{self, ManuallyDrop, MaybeUninit};
80 use std::ops::DerefMut;
81 use std::pin::{Pin, pin};
82 use std::ptr::{self, NonNull};
83 use std::task::{Context, Poll, Waker};
84 use std::vec::Vec;
85 use table::{TableDebug, TableId};
86 use wasmtime_environ::Trap;
87 use wasmtime_environ::component::{
88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS,
89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT,
90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding,
91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex,
92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex,
93 };
94 use wasmtime_environ::packed_option::ReservedValue;
95
96 pub use abort::JoinHandle;
97 pub use future_stream_any::{FutureAny, StreamAny};
98 pub use futures_and_streams::{
99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer,
100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer,
101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer,
102 };
103 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index};
104
105 mod abort;
106 mod error_contexts;
107 mod future_stream_any;
108 mod futures_and_streams;
109 pub(crate) mod table;
110 pub(crate) mod tls;
111
112 /// Constant defined in the Component Model spec to indicate that the async
113 /// intrinsic (e.g. `future.write`) has not yet completed.
114 const BLOCKED: u32 = 0xffff_ffff;
115
116 /// Corresponds to `CallState` in the upstream spec.
117 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
118 pub enum Status {
119 Starting = 0,
120 Started = 1,
121 Returned = 2,
122 StartCancelled = 3,
123 ReturnCancelled = 4,
124 }
125
126 impl Status {
127 /// Packs this status and the optional `waitable` provided into a 32-bit
128 /// result that the canonical ABI requires.
129 ///
130 /// The low 4 bits are reserved for the status while the upper 28 bits are
131 /// the waitable, if present.
pack(self, waitable: Option<u32>) -> u32132 pub fn pack(self, waitable: Option<u32>) -> u32 {
133 assert!(matches!(self, Status::Returned) == waitable.is_none());
134 let waitable = waitable.unwrap_or(0);
135 assert!(waitable < (1 << 28));
136 (waitable << 4) | (self as u32)
137 }
138 }
139
140 /// Corresponds to `EventCode` in the Component Model spec, plus related payload
141 /// data.
142 #[derive(Clone, Copy, Debug)]
143 enum Event {
144 None,
145 Subtask {
146 status: Status,
147 },
148 StreamRead {
149 code: ReturnCode,
150 pending: Option<(TypeStreamTableIndex, u32)>,
151 },
152 StreamWrite {
153 code: ReturnCode,
154 pending: Option<(TypeStreamTableIndex, u32)>,
155 },
156 FutureRead {
157 code: ReturnCode,
158 pending: Option<(TypeFutureTableIndex, u32)>,
159 },
160 FutureWrite {
161 code: ReturnCode,
162 pending: Option<(TypeFutureTableIndex, u32)>,
163 },
164 Cancelled,
165 }
166
167 impl Event {
168 /// Lower this event to core Wasm integers for delivery to the guest.
169 ///
170 /// Note that the waitable handle, if any, is assumed to be lowered
171 /// separately.
parts(self) -> (u32, u32)172 fn parts(self) -> (u32, u32) {
173 const EVENT_NONE: u32 = 0;
174 const EVENT_SUBTASK: u32 = 1;
175 const EVENT_STREAM_READ: u32 = 2;
176 const EVENT_STREAM_WRITE: u32 = 3;
177 const EVENT_FUTURE_READ: u32 = 4;
178 const EVENT_FUTURE_WRITE: u32 = 5;
179 const EVENT_CANCELLED: u32 = 6;
180 match self {
181 Event::None => (EVENT_NONE, 0),
182 Event::Cancelled => (EVENT_CANCELLED, 0),
183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32),
184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()),
185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()),
186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()),
187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()),
188 }
189 }
190 }
191
192 /// Corresponds to `CallbackCode` in the spec.
193 mod callback_code {
194 pub const EXIT: u32 = 0;
195 pub const YIELD: u32 = 1;
196 pub const WAIT: u32 = 2;
197 }
198
199 /// A flag indicating that the callee is an async-lowered export.
200 ///
201 /// This may be passed to the `async-start` intrinsic from a fused adapter.
202 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32;
203
204 /// Provides access to either store data (via the `get` method) or the store
205 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component
206 /// instance to which the current host task belongs.
207 ///
208 /// See [`Accessor::with`] for details.
209 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> {
210 store: StoreContextMut<'a, T>,
211 get_data: fn(&mut T) -> D::Data<'_>,
212 }
213
214 impl<'a, T, D> Access<'a, T, D>
215 where
216 D: HasData + ?Sized,
217 T: 'static,
218 {
219 /// Creates a new [`Access`] from its component parts.
new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self {
221 Self { store, get_data }
222 }
223
224 /// Get mutable access to the store data.
data_mut(&mut self) -> &mut T225 pub fn data_mut(&mut self) -> &mut T {
226 self.store.data_mut()
227 }
228
229 /// Get mutable access to the store data.
get(&mut self) -> D::Data<'_>230 pub fn get(&mut self) -> D::Data<'_> {
231 (self.get_data)(self.data_mut())
232 }
233
234 /// Spawn a background task.
235 ///
236 /// See [`Accessor::spawn`] for details.
spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle where T: 'static,237 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle
238 where
239 T: 'static,
240 {
241 let accessor = Accessor {
242 get_data: self.get_data,
243 token: StoreToken::new(self.store.as_context_mut()),
244 };
245 self.store
246 .as_context_mut()
247 .spawn_with_accessor(accessor, task)
248 }
249
250 /// Returns the getter this accessor is using to project from `T` into
251 /// `D::Data`.
getter(&self) -> fn(&mut T) -> D::Data<'_>252 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
253 self.get_data
254 }
255 }
256
257 impl<'a, T, D> AsContext for Access<'a, T, D>
258 where
259 D: HasData + ?Sized,
260 T: 'static,
261 {
262 type Data = T;
263
as_context(&self) -> StoreContext<'_, T>264 fn as_context(&self) -> StoreContext<'_, T> {
265 self.store.as_context()
266 }
267 }
268
269 impl<'a, T, D> AsContextMut for Access<'a, T, D>
270 where
271 D: HasData + ?Sized,
272 T: 'static,
273 {
as_context_mut(&mut self) -> StoreContextMut<'_, T>274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> {
275 self.store.as_context_mut()
276 }
277 }
278
279 /// Provides scoped mutable access to store data in the context of a concurrent
280 /// host task future.
281 ///
282 /// This allows multiple host task futures to execute concurrently and access
283 /// the store between (but not across) `await` points.
284 ///
285 /// # Rationale
286 ///
287 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to
288 /// `D::Data<'_>`. The problem this is solving, however, is that it does not
289 /// literally store these values. The basic problem is that when a concurrent
290 /// host future is being polled it has access to `&mut T` (and the whole
291 /// `Store`) but when it's not being polled it does not have access to these
292 /// values. This reflects how the store is only ever polling one future at a
293 /// time so the store is effectively being passed between futures.
294 ///
295 /// Rust's `Future` trait, however, has no means of passing a `Store`
296 /// temporarily between futures. The [`Context`](std::task::Context) type does
297 /// not have the ability to attach arbitrary information to it at this time.
298 /// This type, [`Accessor`], is used to bridge this expressivity gap.
299 ///
300 /// The [`Accessor`] type here represents the ability to acquire, temporarily in
301 /// a synchronous manner, the current store. The [`Accessor::with`] function
302 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut
303 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does
304 /// not take an `async` closure as its argument, instead it's a synchronous
305 /// closure which must complete during on run of `Future::poll`. This reflects
306 /// how the store is temporarily made available while a host future is being
307 /// polled.
308 ///
309 /// # Implementation
310 ///
311 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and
312 /// this type additionally doesn't even have a lifetime parameter. This is
313 /// instead a representation of proof of the ability to acquire these while a
314 /// future is being polled. Wasmtime will, when it polls a host future,
315 /// configure ambient state such that the `Accessor` that a future closes over
316 /// will work and be able to access the store.
317 ///
318 /// This has a number of implications for users such as:
319 ///
320 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within
321 /// the lifetime of a single future.
322 /// * A future is expected to, however, close over an `Accessor` and keep it
323 /// alive probably for the duration of the entire future.
324 /// * Different host futures will be given different `Accessor`s, and that's
325 /// intentional.
326 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which
327 /// alleviates some otherwise required bounds to be written down.
328 ///
329 /// # Using `Accessor` in `Drop`
330 ///
331 /// The methods on `Accessor` are only expected to work in the context of
332 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a
333 /// host future can be dropped at any time throughout the system and Wasmtime
334 /// store context is not necessarily available at that time. It's recommended to
335 /// not use `Accessor` methods in anything connected to a `Drop` implementation
336 /// as they will panic and have unintended results. If you run into this though
337 /// feel free to file an issue on the Wasmtime repository.
338 pub struct Accessor<T: 'static, D = HasSelf<T>>
339 where
340 D: HasData + ?Sized,
341 {
342 token: StoreToken<T>,
343 get_data: fn(&mut T) -> D::Data<'_>,
344 }
345
346 /// A helper trait to take any type of accessor-with-data in functions.
347 ///
348 /// This trait is similar to [`AsContextMut`] except that it's used when
349 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The
350 /// [`Accessor`] is the main type used in concurrent settings and is passed to
351 /// functions such as [`Func::call_concurrent`].
352 ///
353 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements
354 /// this trait. This effectively means that regardless of the `D` in
355 /// `Accessor<T, D>` it can still be passed to a function which just needs a
356 /// store accessor.
357 ///
358 /// Acquiring an [`Accessor`] can be done through
359 /// [`StoreContextMut::run_concurrent`] for example or in a host function
360 /// through
361 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent).
362 pub trait AsAccessor {
363 /// The `T` in `Store<T>` that this accessor refers to.
364 type Data: 'static;
365
366 /// The `D` in `Accessor<T, D>`, or the projection out of
367 /// `Self::Data`.
368 type AccessorData: HasData + ?Sized;
369
370 /// Returns the accessor that this is referring to.
as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>;
372 }
373
374 impl<T: AsAccessor + ?Sized> AsAccessor for &T {
375 type Data = T::Data;
376 type AccessorData = T::AccessorData;
377
as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> {
379 T::as_accessor(self)
380 }
381 }
382
383 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> {
384 type Data = T;
385 type AccessorData = D;
386
as_accessor(&self) -> &Accessor<T, D>387 fn as_accessor(&self) -> &Accessor<T, D> {
388 self
389 }
390 }
391
392 // Note that it is intentional at this time that `Accessor` does not actually
393 // store `&mut T` or anything similar. This distinctly enables the `Accessor`
394 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for
395 // that matter). This is used to ergonomically simplify bindings where the
396 // majority of the time `Accessor` is closed over in a future which then needs
397 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as
398 // you already have to write `T: 'static`...) it helps to avoid this.
399 //
400 // Note as well that `Accessor` doesn't actually store its data at all. Instead
401 // it's more of a "proof" of what can be accessed from TLS. API design around
402 // `Accessor` and functions like `Linker::func_wrap_concurrent` are
403 // intentionally made to ensure that `Accessor` is ideally only used in the
404 // context that TLS variables are actually set. For example host functions are
405 // given `&Accessor`, not `Accessor`, and this prevents them from persisting
406 // the value outside of a future. Within the future the TLS variables are all
407 // guaranteed to be set while the future is being polled.
408 //
409 // Finally though this is not an ironclad guarantee, but nor does it need to be.
410 // The TLS APIs are designed to panic or otherwise model usage where they're
411 // called recursively or similar. It's hoped that code cannot be constructed to
412 // actually hit this at runtime but this is not a safety requirement at this
413 // time.
414 const _: () = {
assert<T: Send + Sync>()415 const fn assert<T: Send + Sync>() {}
416 assert::<Accessor<UnsafeCell<u32>>>();
417 };
418
419 impl<T> Accessor<T> {
420 /// Creates a new `Accessor` backed by the specified functions.
421 ///
422 /// - `get`: used to retrieve the store
423 ///
424 /// - `get_data`: used to "project" from the store's associated data to
425 /// another type (e.g. a field of that data or a wrapper around it).
426 ///
427 /// - `spawn`: used to queue spawned background tasks to be run later
new(token: StoreToken<T>) -> Self428 pub(crate) fn new(token: StoreToken<T>) -> Self {
429 Self {
430 token,
431 get_data: |x| x,
432 }
433 }
434 }
435
436 impl<T, D> Accessor<T, D>
437 where
438 D: HasData + ?Sized,
439 {
440 /// Run the specified closure, passing it mutable access to the store.
441 ///
442 /// This function is one of the main building blocks of the [`Accessor`]
443 /// type. This yields synchronous, blocking, access to the store via an
444 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to
445 /// providing the ability to access `D` via [`Access::get`]. Note that the
446 /// `fun` here is given only temporary access to the store and `T`/`D`
447 /// meaning that the return value `R` here is not allowed to capture borrows
448 /// into the two. If access is needed to data within `T` or `D` outside of
449 /// this closure then it must be `clone`d out, for example.
450 ///
451 /// # Panics
452 ///
453 /// This function will panic if it is call recursively with any other
454 /// accessor already in scope. For example if `with` is called within `fun`,
455 /// then this function will panic. It is up to the embedder to ensure that
456 /// this does not happen.
with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R457 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R {
458 tls::get(|vmstore| {
459 fun(Access {
460 store: self.token.as_context_mut(vmstore),
461 get_data: self.get_data,
462 })
463 })
464 }
465
466 /// Returns the getter this accessor is using to project from `T` into
467 /// `D::Data`.
getter(&self) -> fn(&mut T) -> D::Data<'_>468 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> {
469 self.get_data
470 }
471
472 /// Changes this accessor to access `D2` instead of the current type
473 /// parameter `D`.
474 ///
475 /// This changes the underlying data access from `T` to `D2::Data<'_>`.
476 ///
477 /// # Panics
478 ///
479 /// When using this API the returned value is disconnected from `&self` and
480 /// the lifetime binding the `self` argument. An `Accessor` only works
481 /// within the context of the closure or async closure that it was
482 /// originally given to, however. This means that due to the fact that the
483 /// returned value has no lifetime connection it's possible to use the
484 /// accessor outside of `&self`, the original accessor, and panic.
485 ///
486 /// The returned value should only be used within the scope of the original
487 /// `Accessor` that `self` refers to.
with_getter<D2: HasData>( &self, get_data: fn(&mut T) -> D2::Data<'_>, ) -> Accessor<T, D2>488 pub fn with_getter<D2: HasData>(
489 &self,
490 get_data: fn(&mut T) -> D2::Data<'_>,
491 ) -> Accessor<T, D2> {
492 Accessor {
493 token: self.token,
494 get_data,
495 }
496 }
497
498 /// Spawn a background task which will receive an `&Accessor<T, D>` and
499 /// run concurrently with any other tasks in progress for the current
500 /// store.
501 ///
502 /// This is particularly useful for host functions which return a `stream`
503 /// or `future` such that the code to write to the write end of that
504 /// `stream` or `future` must run after the function returns.
505 ///
506 /// The returned [`JoinHandle`] may be used to cancel the task.
507 ///
508 /// # Panics
509 ///
510 /// Panics if called within a closure provided to the [`Accessor::with`]
511 /// function. This can only be called outside an active invocation of
512 /// [`Accessor::with`].
spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle where T: 'static,513 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle
514 where
515 T: 'static,
516 {
517 let accessor = self.clone_for_spawn();
518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task))
519 }
520
clone_for_spawn(&self) -> Self521 fn clone_for_spawn(&self) -> Self {
522 Self {
523 token: self.token,
524 get_data: self.get_data,
525 }
526 }
527 }
528
529 /// Represents a task which may be provided to `Accessor::spawn`,
530 /// `Accessor::forward`, or `StorecContextMut::spawn`.
531 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable
532 // option.
533 //
534 // As of this writing, it's not possible to specify e.g. `Send` and `Sync`
535 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F:
536 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F +
537 // Send + Sync + 'static` fails with a type mismatch error when we try to pass
538 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the
539 // best we can do for the time being.
540 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static
541 where
542 D: HasData + ?Sized,
543 {
544 /// Run the task.
run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send;
546 }
547
548 /// Represents parameter and result metadata for the caller side of a
549 /// guest->guest call orchestrated by a fused adapter.
550 enum CallerInfo {
551 /// Metadata for a call to an async-lowered import
552 Async {
553 params: Vec<ValRaw>,
554 has_result: bool,
555 },
556 /// Metadata for a call to an sync-lowered import
557 Sync {
558 params: Vec<ValRaw>,
559 result_count: u32,
560 },
561 }
562
563 /// Indicates how a guest task is waiting on a waitable set.
564 enum WaitMode {
565 /// The guest task is waiting using `task.wait`
566 Fiber(StoreFiber<'static>),
567 /// The guest task is waiting via a callback declared as part of an
568 /// async-lifted export.
569 Callback(Instance),
570 }
571
572 /// Represents the reason a fiber is suspending itself.
573 #[derive(Debug)]
574 enum SuspendReason {
575 /// The fiber is waiting for an event to be delivered to the specified
576 /// waitable set or task.
577 Waiting {
578 set: TableId<WaitableSet>,
579 thread: QualifiedThreadId,
580 skip_may_block_check: bool,
581 },
582 /// The fiber has finished handling its most recent work item and is waiting
583 /// for another (or to be dropped if it is no longer needed).
584 NeedWork,
585 /// The fiber is yielding and should be resumed once other tasks have had a
586 /// chance to run.
587 Yielding {
588 thread: QualifiedThreadId,
589 skip_may_block_check: bool,
590 },
591 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`.
592 ExplicitlySuspending {
593 thread: QualifiedThreadId,
594 skip_may_block_check: bool,
595 },
596 }
597
598 /// Represents a pending call into guest code for a given guest task.
599 enum GuestCallKind {
600 /// Indicates there's an event to deliver to the task, possibly related to a
601 /// waitable set the task has been waiting on or polling.
602 DeliverEvent {
603 /// The instance to which the task belongs.
604 instance: Instance,
605 /// The waitable set the event belongs to, if any.
606 ///
607 /// If this is `None` the event will be waiting in the
608 /// `GuestTask::event` field for the task.
609 set: Option<TableId<WaitableSet>>,
610 },
611 /// Indicates that a new guest task call is pending and may be executed
612 /// using the specified closure.
613 ///
614 /// If the closure returns `Ok(Some(call))`, the `call` should be run
615 /// immediately using `handle_guest_call`.
616 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>),
617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>),
618 }
619
620 impl fmt::Debug for GuestCallKind {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
622 match self {
623 Self::DeliverEvent { instance, set } => f
624 .debug_struct("DeliverEvent")
625 .field("instance", instance)
626 .field("set", set)
627 .finish(),
628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(),
629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(),
630 }
631 }
632 }
633
634 /// The target of a suspension intrinsic.
635 #[derive(Copy, Clone, Debug)]
636 pub enum SuspensionTarget {
637 SomeSuspended(u32),
638 Some(u32),
639 None,
640 }
641
642 impl SuspensionTarget {
is_none(&self) -> bool643 fn is_none(&self) -> bool {
644 matches!(self, SuspensionTarget::None)
645 }
is_some(&self) -> bool646 fn is_some(&self) -> bool {
647 !self.is_none()
648 }
649 }
650
651 /// Represents a pending call into guest code for a given guest thread.
652 #[derive(Debug)]
653 struct GuestCall {
654 thread: QualifiedThreadId,
655 kind: GuestCallKind,
656 }
657
658 impl GuestCall {
659 /// Returns whether or not the call is ready to run.
660 ///
661 /// A call will not be ready to run if either:
662 ///
663 /// - the (sub-)component instance to be called has already been entered and
664 /// cannot be reentered until an in-progress call completes
665 ///
666 /// - the call is for a not-yet started task and the (sub-)component
667 /// instance to be called has backpressure enabled
is_ready(&self, store: &mut StoreOpaque) -> Result<bool>668 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> {
669 let instance = store
670 .concurrent_state_mut()
671 .get_mut(self.thread.task)?
672 .instance;
673 let state = store.instance_state(instance).concurrent_state();
674
675 let ready = match &self.kind {
676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter,
677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0),
678 GuestCallKind::StartExplicit(_) => true,
679 };
680 log::trace!(
681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})",
682 state.do_not_enter,
683 state.backpressure
684 );
685 Ok(ready)
686 }
687 }
688
689 /// Job to be run on a worker fiber.
690 enum WorkerItem {
691 GuestCall(GuestCall),
692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
693 }
694
695 /// Represents a pending work item to be handled by the event loop for a given
696 /// component instance.
697 enum WorkItem {
698 /// A host task to be pushed to `ConcurrentState::futures`.
699 PushFuture(AlwaysMut<HostTaskFuture>),
700 /// A fiber to resume.
701 ResumeFiber(StoreFiber<'static>),
702 /// A thread to resume.
703 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId),
704 /// A pending call into guest code for a given guest task.
705 GuestCall(RuntimeComponentInstanceIndex, GuestCall),
706 /// A job to run on a worker fiber.
707 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>),
708 }
709
710 impl fmt::Debug for WorkItem {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
712 match self {
713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(),
714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(),
715 Self::ResumeThread(instance, thread) => f
716 .debug_tuple("ResumeThread")
717 .field(instance)
718 .field(thread)
719 .finish(),
720 Self::GuestCall(instance, call) => f
721 .debug_tuple("GuestCall")
722 .field(instance)
723 .field(call)
724 .finish(),
725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(),
726 }
727 }
728 }
729
730 /// Whether a suspension intrinsic was cancelled or completed
731 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
732 pub(crate) enum WaitResult {
733 Cancelled,
734 Completed,
735 }
736
737 /// Poll the specified future until it completes on behalf of a guest->host call
738 /// using a sync-lowered import.
739 ///
740 /// This is similar to `Instance::first_poll` except it's for sync-lowered
741 /// imports, meaning we don't need to handle cancellation and we can block the
742 /// caller until the task completes, at which point the caller can handle
743 /// lowering the result to the guest's stack and linear memory.
poll_and_block<R: Send + Sync + 'static>( store: &mut dyn VMStore, future: impl Future<Output = Result<R>> + Send + 'static, ) -> Result<R>744 pub(crate) fn poll_and_block<R: Send + Sync + 'static>(
745 store: &mut dyn VMStore,
746 future: impl Future<Output = Result<R>> + Send + 'static,
747 ) -> Result<R> {
748 let state = store.concurrent_state_mut();
749 let task = state.current_host_thread()?;
750
751 // Wrap the future in a closure which will take care of stashing the result
752 // in `GuestTask::result` and resuming this fiber when the host task
753 // completes.
754 let mut future = Box::pin(async move {
755 let result = future.await?;
756 tls::get(move |store| {
757 let state = store.concurrent_state_mut();
758 let host_state = &mut state.get_mut(task)?.state;
759 assert!(matches!(host_state, HostTaskState::CalleeStarted));
760 *host_state = HostTaskState::CalleeFinished(Box::new(result));
761
762 Waitable::Host(task).set_event(
763 state,
764 Some(Event::Subtask {
765 status: Status::Returned,
766 }),
767 )?;
768
769 Ok(())
770 })
771 }) as HostTaskFuture;
772
773 // Finally, poll the future. We can use a dummy `Waker` here because we'll
774 // add the future to `ConcurrentState::futures` and poll it automatically
775 // from the event loop if it doesn't complete immediately here.
776 let poll = tls::set(store, || {
777 future
778 .as_mut()
779 .poll(&mut Context::from_waker(&Waker::noop()))
780 });
781
782 match poll {
783 // It completed immediately; check the result and delete the task.
784 Poll::Ready(result) => result?,
785
786 // It did not complete immediately; add it to
787 // `ConcurrentState::futures` so it will be polled via the event loop;
788 // then use `GuestThread::sync_call_set` to wait for the task to
789 // complete, suspending the current fiber until it does so.
790 Poll::Pending => {
791 let state = store.concurrent_state_mut();
792 state.push_future(future);
793
794 let caller = state.get_mut(task)?.caller;
795 let set = state.get_mut(caller.thread)?.sync_call_set;
796 Waitable::Host(task).join(state, Some(set))?;
797
798 store.suspend(SuspendReason::Waiting {
799 set,
800 thread: caller,
801 skip_may_block_check: false,
802 })?;
803
804 // Remove the `task` from the `sync_call_set` to ensure that when
805 // this function returns and the task is deleted that there are no
806 // more lingering references to this host task.
807 Waitable::Host(task).join(store.concurrent_state_mut(), None)?;
808 }
809 }
810
811 // Retrieve and return the result.
812 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state;
813 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) {
814 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() {
815 Ok(result) => *result,
816 Err(_) => bail_bug!("host task finished with wrong type of result"),
817 }),
818 _ => bail_bug!("unexpected host task state after completion"),
819 }
820 }
821
822 /// Execute the specified guest call.
handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()>823 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> {
824 let mut next = Some(call);
825 while let Some(call) = next.take() {
826 match call.kind {
827 GuestCallKind::DeliverEvent { instance, set } => {
828 let (event, waitable) =
829 match instance.get_event(store, call.thread.task, set, true)? {
830 Some(pair) => pair,
831 None => bail_bug!("delivering non-present event"),
832 };
833 let state = store.concurrent_state_mut();
834 let task = state.get_mut(call.thread.task)?;
835 let runtime_instance = task.instance;
836 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
837
838 log::trace!(
839 "use callback to deliver event {event:?} to {:?} for {waitable:?}",
840 call.thread,
841 );
842
843 let old_thread = store.set_thread(call.thread)?;
844 log::trace!(
845 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread",
846 call.thread
847 );
848
849 store.enter_instance(runtime_instance);
850
851 let Some(callback) = store
852 .concurrent_state_mut()
853 .get_mut(call.thread.task)?
854 .callback
855 .take()
856 else {
857 bail_bug!("guest task callback field not present")
858 };
859
860 let code = callback(store, event, handle)?;
861
862 store
863 .concurrent_state_mut()
864 .get_mut(call.thread.task)?
865 .callback = Some(callback);
866
867 store.exit_instance(runtime_instance)?;
868
869 store.set_thread(old_thread)?;
870
871 next = instance.handle_callback_code(
872 store,
873 call.thread,
874 runtime_instance.index,
875 code,
876 )?;
877
878 log::trace!(
879 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread"
880 );
881 }
882 GuestCallKind::StartImplicit(fun) => {
883 next = fun(store)?;
884 }
885 GuestCallKind::StartExplicit(fun) => {
886 fun(store)?;
887 }
888 }
889 }
890
891 Ok(())
892 }
893
894 impl<T> Store<T> {
895 /// Convenience wrapper for [`StoreContextMut::run_concurrent`].
run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> where T: Send + 'static,896 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
897 where
898 T: Send + 'static,
899 {
900 ensure!(
901 self.as_context().0.concurrency_support(),
902 "cannot use `run_concurrent` when Config::concurrency_support disabled",
903 );
904 self.as_context_mut().run_concurrent(fun).await
905 }
906
907 #[doc(hidden)]
assert_concurrent_state_empty(&mut self)908 pub fn assert_concurrent_state_empty(&mut self) {
909 self.as_context_mut().assert_concurrent_state_empty();
910 }
911
912 #[doc(hidden)]
concurrent_state_table_size(&mut self) -> usize913 pub fn concurrent_state_table_size(&mut self) -> usize {
914 self.as_context_mut().concurrent_state_table_size()
915 }
916
917 /// Convenience wrapper for [`StoreContextMut::spawn`].
spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle where T: 'static,918 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle
919 where
920 T: 'static,
921 {
922 self.as_context_mut().spawn(task)
923 }
924 }
925
926 impl<T> StoreContextMut<'_, T> {
927 /// Assert that all the relevant tables and queues in the concurrent state
928 /// for this store are empty.
929 ///
930 /// This is for sanity checking in integration tests
931 /// (e.g. `component-async-tests`) that the relevant state has been cleared
932 /// after each test concludes. This should help us catch leaks, e.g. guest
933 /// tasks which haven't been deleted despite having completed and having
934 /// been dropped by their supertasks.
935 ///
936 /// Only intended for use in Wasmtime's own testing.
937 #[doc(hidden)]
assert_concurrent_state_empty(self)938 pub fn assert_concurrent_state_empty(self) {
939 let store = self.0;
940 store
941 .store_data_mut()
942 .components
943 .assert_instance_states_empty();
944 let state = store.concurrent_state_mut();
945 assert!(
946 state.table.get_mut().is_empty(),
947 "non-empty table: {:?}",
948 state.table.get_mut()
949 );
950 assert!(state.high_priority.is_empty());
951 assert!(state.low_priority.is_empty());
952 assert!(state.current_thread.is_none());
953 assert!(state.futures_mut().unwrap().is_empty());
954 assert!(state.global_error_context_ref_counts.is_empty());
955 }
956
957 /// Helper function to perform tests over the size of the concurrent state
958 /// table which can be useful for detecting leaks.
959 ///
960 /// Only intended for use in Wasmtime's own testing.
961 #[doc(hidden)]
concurrent_state_table_size(&mut self) -> usize962 pub fn concurrent_state_table_size(&mut self) -> usize {
963 self.0
964 .concurrent_state_mut()
965 .table
966 .get_mut()
967 .iter_mut()
968 .count()
969 }
970
971 /// Spawn a background task to run as part of this instance's event loop.
972 ///
973 /// The task will receive an `&Accessor<U>` and run concurrently with
974 /// any other tasks in progress for the instance.
975 ///
976 /// Note that the task will only make progress if and when the event loop
977 /// for this instance is run.
978 ///
979 /// The returned [`JoinHandle`] may be used to cancel the task.
spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle where T: 'static,980 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle
981 where
982 T: 'static,
983 {
984 let accessor = Accessor::new(StoreToken::new(self.as_context_mut()));
985 self.spawn_with_accessor(accessor, task)
986 }
987
988 /// Internal implementation of `spawn` functions where a `store` is
989 /// available along with an `Accessor`.
spawn_with_accessor<D>( self, accessor: Accessor<T, D>, task: impl AccessorTask<T, D>, ) -> JoinHandle where T: 'static, D: HasData + ?Sized,990 fn spawn_with_accessor<D>(
991 self,
992 accessor: Accessor<T, D>,
993 task: impl AccessorTask<T, D>,
994 ) -> JoinHandle
995 where
996 T: 'static,
997 D: HasData + ?Sized,
998 {
999 // Create an "abortable future" here where internally the future will
1000 // hook calls to poll and possibly spawn more background tasks on each
1001 // iteration.
1002 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await });
1003 self.0
1004 .concurrent_state_mut()
1005 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) }));
1006 handle
1007 }
1008
1009 /// Run the specified closure `fun` to completion as part of this store's
1010 /// event loop.
1011 ///
1012 /// This will run `fun` as part of this store's event loop until it
1013 /// yields a result. `fun` is provided an [`Accessor`], which provides
1014 /// controlled access to the store and its data.
1015 ///
1016 /// This function can be used to invoke [`Func::call_concurrent`] for
1017 /// example within the async closure provided here.
1018 ///
1019 /// This function will unconditionally return an error if
1020 /// [`Config::concurrency_support`] is disabled.
1021 ///
1022 /// [`Config::concurrency_support`]: crate::Config::concurrency_support
1023 ///
1024 /// # Store-blocking behavior
1025 ///
1026 /// At this time there are certain situations in which the `Future` returned
1027 /// by the `AsyncFnOnce` passed to this function will not be polled for an
1028 /// extended period of time, despite one or more `Waker::wake` events having
1029 /// occurred for the task to which it belongs. This can manifest as the
1030 /// `Future` seeming to be "blocked" or "locked up", but is actually due to
1031 /// the `Store` being held by e.g. a blocking host function, preventing the
1032 /// `Future` from being polled. A canonical example of this is when the
1033 /// `fun` provided to this function attempts to set a timeout for an
1034 /// invocation of a wasm function. In this situation the async closure is
1035 /// waiting both on (a) the wasm computation to finish, and (b) the timeout
1036 /// to elapse. At this time this setup will not always work and the timeout
1037 /// may not reliably fire.
1038 ///
1039 /// This function will not block the current thread and as such is always
1040 /// suitable to run in an `async` context, but the current implementation of
1041 /// Wasmtime can lead to situations where a certain wasm computation is
1042 /// required to make progress the closure to make progress. This is an
1043 /// artifact of Wasmtime's historical implementation of `async` functions
1044 /// and is the topic of [#11869] and [#11870]. In the timeout example from
1045 /// above it means that Wasmtime can get "wedged" for a bit where (a) must
1046 /// progress for a readiness notification of (b) to get delivered.
1047 ///
1048 /// This effectively means that it's not possible to reliably perform a
1049 /// "select" operation within the `fun` closure, which timeouts for example
1050 /// are based on. Fixing this requires some relatively major refactoring
1051 /// work within Wasmtime itself. This is a known pitfall otherwise and one
1052 /// that is intended to be fixed one day. In the meantime it's recommended
1053 /// to apply timeouts or such to the entire `run_concurrent` call itself
1054 /// rather than internally.
1055 ///
1056 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869
1057 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870
1058 ///
1059 /// # Example
1060 ///
1061 /// ```
1062 /// # use {
1063 /// # wasmtime::{
1064 /// # error::{Result},
1065 /// # component::{ Component, Linker, Resource, ResourceTable},
1066 /// # Config, Engine, Store
1067 /// # },
1068 /// # };
1069 /// #
1070 /// # struct MyResource(u32);
1071 /// # struct Ctx { table: ResourceTable }
1072 /// #
1073 /// # async fn foo() -> Result<()> {
1074 /// # let mut config = Config::new();
1075 /// # let engine = Engine::new(&config)?;
1076 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() });
1077 /// # let mut linker = Linker::new(&engine);
1078 /// # let component = Component::new(&engine, "")?;
1079 /// # let instance = linker.instantiate_async(&mut store, &component).await?;
1080 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?;
1081 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?;
1082 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> {
1083 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?;
1084 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?;
1085 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?;
1086 /// bar.call_concurrent(accessor, (value.0,)).await?;
1087 /// Ok(())
1088 /// }).await??;
1089 /// # Ok(())
1090 /// # }
1091 /// ```
run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> where T: Send + 'static,1092 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R>
1093 where
1094 T: Send + 'static,
1095 {
1096 ensure!(
1097 self.0.concurrency_support(),
1098 "cannot use `run_concurrent` when Config::concurrency_support disabled",
1099 );
1100 self.do_run_concurrent(fun, false).await
1101 }
1102
run_concurrent_trap_on_idle<R>( self, fun: impl AsyncFnOnce(&Accessor<T>) -> R, ) -> Result<R> where T: Send + 'static,1103 pub(super) async fn run_concurrent_trap_on_idle<R>(
1104 self,
1105 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1106 ) -> Result<R>
1107 where
1108 T: Send + 'static,
1109 {
1110 self.do_run_concurrent(fun, true).await
1111 }
1112
do_run_concurrent<R>( mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R, trap_on_idle: bool, ) -> Result<R> where T: Send + 'static,1113 async fn do_run_concurrent<R>(
1114 mut self,
1115 fun: impl AsyncFnOnce(&Accessor<T>) -> R,
1116 trap_on_idle: bool,
1117 ) -> Result<R>
1118 where
1119 T: Send + 'static,
1120 {
1121 debug_assert!(self.0.concurrency_support());
1122 check_recursive_run();
1123 let token = StoreToken::new(self.as_context_mut());
1124
1125 struct Dropper<'a, T: 'static, V> {
1126 store: StoreContextMut<'a, T>,
1127 value: ManuallyDrop<V>,
1128 }
1129
1130 impl<'a, T, V> Drop for Dropper<'a, T, V> {
1131 fn drop(&mut self) {
1132 tls::set(self.store.0, || {
1133 // SAFETY: Here we drop the value without moving it for the
1134 // first and only time -- per the contract for `Drop::drop`,
1135 // this code won't run again, and the `value` field will no
1136 // longer be accessible.
1137 unsafe { ManuallyDrop::drop(&mut self.value) }
1138 });
1139 }
1140 }
1141
1142 let accessor = &Accessor::new(token);
1143 let dropper = &mut Dropper {
1144 store: self,
1145 value: ManuallyDrop::new(fun(accessor)),
1146 };
1147 // SAFETY: We never move `dropper` nor its `value` field.
1148 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) };
1149
1150 dropper
1151 .store
1152 .as_context_mut()
1153 .poll_until(future, trap_on_idle)
1154 .await
1155 }
1156
1157 /// Run this store's event loop.
1158 ///
1159 /// The returned future will resolve when the specified future completes or,
1160 /// if `trap_on_idle` is true, when the event loop can't make further
1161 /// progress.
poll_until<R>( mut self, mut future: Pin<&mut impl Future<Output = R>>, trap_on_idle: bool, ) -> Result<R> where T: Send + 'static,1162 async fn poll_until<R>(
1163 mut self,
1164 mut future: Pin<&mut impl Future<Output = R>>,
1165 trap_on_idle: bool,
1166 ) -> Result<R>
1167 where
1168 T: Send + 'static,
1169 {
1170 struct Reset<'a, T: 'static> {
1171 store: StoreContextMut<'a, T>,
1172 futures: Option<FuturesUnordered<HostTaskFuture>>,
1173 }
1174
1175 impl<'a, T> Drop for Reset<'a, T> {
1176 fn drop(&mut self) {
1177 if let Some(futures) = self.futures.take() {
1178 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures);
1179 }
1180 }
1181 }
1182
1183 loop {
1184 // Take `ConcurrentState::futures` out of the store so we can poll
1185 // it while also safely giving any of the futures inside access to
1186 // `self`.
1187 let futures = self.0.concurrent_state_mut().futures.get_mut().take();
1188 let mut reset = Reset {
1189 store: self.as_context_mut(),
1190 futures,
1191 };
1192 let mut next = match reset.futures.as_mut() {
1193 Some(f) => pin!(f.next()),
1194 None => bail_bug!("concurrent state missing futures field"),
1195 };
1196
1197 enum PollResult<R> {
1198 Complete(R),
1199 ProcessWork(Vec<WorkItem>),
1200 }
1201 let result = future::poll_fn(|cx| {
1202 // First, poll the future we were passed as an argument and
1203 // return immediately if it's ready.
1204 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) {
1205 return Poll::Ready(Ok(PollResult::Complete(value)));
1206 }
1207
1208 // Next, poll `ConcurrentState::futures` (which includes any
1209 // pending host tasks and/or background tasks), returning
1210 // immediately if one of them fails.
1211 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) {
1212 Poll::Ready(Some(output)) => {
1213 match output {
1214 Err(e) => return Poll::Ready(Err(e)),
1215 Ok(()) => {}
1216 }
1217 Poll::Ready(true)
1218 }
1219 Poll::Ready(None) => Poll::Ready(false),
1220 Poll::Pending => Poll::Pending,
1221 };
1222
1223 // Next, collect the next batch of work items to process, if any.
1224 // This will be either all of the high-priority work items, or if
1225 // there are none, a single low-priority work item.
1226 let state = reset.store.0.concurrent_state_mut();
1227 let ready = state.collect_work_items_to_run();
1228 if !ready.is_empty() {
1229 return Poll::Ready(Ok(PollResult::ProcessWork(ready)));
1230 }
1231
1232 // Finally, if we have nothing else to do right now, determine what to do
1233 // based on whether there are any pending futures in
1234 // `ConcurrentState::futures`.
1235 return match next {
1236 Poll::Ready(true) => {
1237 // In this case, one of the futures in
1238 // `ConcurrentState::futures` completed
1239 // successfully, so we return now and continue
1240 // the outer loop in case there is another one
1241 // ready to complete.
1242 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new())))
1243 }
1244 Poll::Ready(false) => {
1245 // Poll the future we were passed one last time
1246 // in case one of `ConcurrentState::futures` had
1247 // the side effect of unblocking it.
1248 if let Poll::Ready(value) =
1249 tls::set(reset.store.0, || future.as_mut().poll(cx))
1250 {
1251 Poll::Ready(Ok(PollResult::Complete(value)))
1252 } else {
1253 // In this case, there are no more pending
1254 // futures in `ConcurrentState::futures`,
1255 // there are no remaining work items, _and_
1256 // the future we were passed as an argument
1257 // still hasn't completed.
1258 if trap_on_idle {
1259 // `trap_on_idle` is true, so we exit
1260 // immediately.
1261 Poll::Ready(Err(Trap::AsyncDeadlock.into()))
1262 } else {
1263 // `trap_on_idle` is false, so we assume
1264 // that future will wake up and give us
1265 // more work to do when it's ready to.
1266 Poll::Pending
1267 }
1268 }
1269 }
1270 // There is at least one pending future in
1271 // `ConcurrentState::futures` and we have nothing
1272 // else to do but wait for now, so we return
1273 // `Pending`.
1274 Poll::Pending => Poll::Pending,
1275 };
1276 })
1277 .await;
1278
1279 // Put the `ConcurrentState::futures` back into the store before we
1280 // return or handle any work items since one or more of those items
1281 // might append more futures.
1282 drop(reset);
1283
1284 match result? {
1285 // The future we were passed as an argument completed, so we
1286 // return the result.
1287 PollResult::Complete(value) => break Ok(value),
1288 // The future we were passed has not yet completed, so handle
1289 // any work items and then loop again.
1290 PollResult::ProcessWork(ready) => {
1291 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> {
1292 store: StoreContextMut<'a, T>,
1293 ready: I,
1294 }
1295
1296 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> {
1297 fn drop(&mut self) {
1298 while let Some(item) = self.ready.next() {
1299 match item {
1300 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0),
1301 WorkItem::PushFuture(future) => {
1302 tls::set(self.store.0, move || drop(future))
1303 }
1304 _ => {}
1305 }
1306 }
1307 }
1308 }
1309
1310 let mut dispose = Dispose {
1311 store: self.as_context_mut(),
1312 ready: ready.into_iter(),
1313 };
1314
1315 while let Some(item) = dispose.ready.next() {
1316 dispose
1317 .store
1318 .as_context_mut()
1319 .handle_work_item(item)
1320 .await?;
1321 }
1322 }
1323 }
1324 }
1325 }
1326
1327 /// Handle the specified work item, possibly resuming a fiber if applicable.
handle_work_item(self, item: WorkItem) -> Result<()> where T: Send,1328 async fn handle_work_item(self, item: WorkItem) -> Result<()>
1329 where
1330 T: Send,
1331 {
1332 log::trace!("handle work item {item:?}");
1333 match item {
1334 WorkItem::PushFuture(future) => {
1335 self.0
1336 .concurrent_state_mut()
1337 .futures_mut()?
1338 .push(future.into_inner());
1339 }
1340 WorkItem::ResumeFiber(fiber) => {
1341 self.0.resume_fiber(fiber).await?;
1342 }
1343 WorkItem::ResumeThread(_, thread) => {
1344 if let GuestThreadState::Ready(fiber) = mem::replace(
1345 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state,
1346 GuestThreadState::Running,
1347 ) {
1348 self.0.resume_fiber(fiber).await?;
1349 } else {
1350 bail_bug!("cannot resume non-pending thread {thread:?}");
1351 }
1352 }
1353 WorkItem::GuestCall(_, call) => {
1354 if call.is_ready(self.0)? {
1355 self.run_on_worker(WorkerItem::GuestCall(call)).await?;
1356 } else {
1357 let state = self.0.concurrent_state_mut();
1358 let task = state.get_mut(call.thread.task)?;
1359 if !task.starting_sent {
1360 task.starting_sent = true;
1361 if let GuestCallKind::StartImplicit(_) = &call.kind {
1362 Waitable::Guest(call.thread.task).set_event(
1363 state,
1364 Some(Event::Subtask {
1365 status: Status::Starting,
1366 }),
1367 )?;
1368 }
1369 }
1370
1371 let instance = state.get_mut(call.thread.task)?.instance;
1372 self.0
1373 .instance_state(instance)
1374 .concurrent_state()
1375 .pending
1376 .insert(call.thread, call.kind);
1377 }
1378 }
1379 WorkItem::WorkerFunction(fun) => {
1380 self.run_on_worker(WorkerItem::Function(fun)).await?;
1381 }
1382 }
1383
1384 Ok(())
1385 }
1386
1387 /// Execute the specified guest call on a worker fiber.
run_on_worker(self, item: WorkerItem) -> Result<()> where T: Send,1388 async fn run_on_worker(self, item: WorkerItem) -> Result<()>
1389 where
1390 T: Send,
1391 {
1392 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() {
1393 fiber
1394 } else {
1395 fiber::make_fiber(self.0, move |store| {
1396 loop {
1397 let Some(item) = store.concurrent_state_mut().worker_item.take() else {
1398 bail_bug!("worker_item not present when resuming fiber")
1399 };
1400 match item {
1401 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?,
1402 WorkerItem::Function(fun) => fun.into_inner()(store)?,
1403 }
1404
1405 store.suspend(SuspendReason::NeedWork)?;
1406 }
1407 })?
1408 };
1409
1410 let worker_item = &mut self.0.concurrent_state_mut().worker_item;
1411 assert!(worker_item.is_none());
1412 *worker_item = Some(item);
1413
1414 self.0.resume_fiber(worker).await
1415 }
1416
1417 /// Wrap the specified host function in a future which will call it, passing
1418 /// it an `&Accessor<T>`.
1419 ///
1420 /// See the `Accessor` documentation for details.
wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static where T: 'static, F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> + Send + Sync + 'static, R: Send + Sync + 'static,1421 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static
1422 where
1423 T: 'static,
1424 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>>
1425 + Send
1426 + Sync
1427 + 'static,
1428 R: Send + Sync + 'static,
1429 {
1430 let token = StoreToken::new(self);
1431 async move {
1432 let mut accessor = Accessor::new(token);
1433 closure(&mut accessor).await
1434 }
1435 }
1436 }
1437
1438 impl StoreOpaque {
1439 /// Push a `GuestTask` onto the task stack for either a sync-to-sync,
1440 /// guest-to-guest call or a sync host-to-guest call.
1441 ///
1442 /// This task will only be used for the purpose of handling calls to
1443 /// intrinsic functions; both parameter lowering and result lifting are
1444 /// assumed to be taken care of elsewhere.
enter_guest_sync_call( &mut self, guest_caller: Option<RuntimeInstance>, callee_async: bool, callee: RuntimeInstance, ) -> Result<()>1445 pub(crate) fn enter_guest_sync_call(
1446 &mut self,
1447 guest_caller: Option<RuntimeInstance>,
1448 callee_async: bool,
1449 callee: RuntimeInstance,
1450 ) -> Result<()> {
1451 log::trace!("enter sync call {callee:?}");
1452 if !self.concurrency_support() {
1453 return Ok(self.enter_call_not_concurrent());
1454 }
1455
1456 let state = self.concurrent_state_mut();
1457 let thread = state.current_thread;
1458 let instance = if let Some(thread) = thread.guest() {
1459 Some(state.get_mut(thread.task)?.instance)
1460 } else {
1461 None
1462 };
1463 if guest_caller.is_some() {
1464 debug_assert_eq!(instance, guest_caller);
1465 }
1466 let task = GuestTask::new(
1467 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")),
1468 LiftResult {
1469 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")),
1470 ty: TypeTupleIndex::reserved_value(),
1471 memory: None,
1472 string_encoding: StringEncoding::Utf8,
1473 },
1474 if let Some(thread) = thread.guest() {
1475 Caller::Guest { thread: *thread }
1476 } else {
1477 Caller::Host {
1478 tx: None,
1479 host_future_present: false,
1480 caller: thread,
1481 }
1482 },
1483 None,
1484 callee,
1485 callee_async,
1486 )?;
1487
1488 let guest_task = state.push(task)?;
1489 let new_thread = GuestThread::new_implicit(state, guest_task)?;
1490 let guest_thread = state.push(new_thread)?;
1491 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table(
1492 guest_thread,
1493 self,
1494 callee.index,
1495 )?;
1496
1497 let state = self.concurrent_state_mut();
1498 state.get_mut(guest_task)?.threads.insert(guest_thread);
1499
1500 self.set_thread(QualifiedThreadId {
1501 task: guest_task,
1502 thread: guest_thread,
1503 })?;
1504
1505 Ok(())
1506 }
1507
1508 /// Pop a `GuestTask` previously pushed using `enter_sync_call`.
exit_guest_sync_call(&mut self) -> Result<()>1509 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> {
1510 if !self.concurrency_support() {
1511 return Ok(self.exit_call_not_concurrent());
1512 }
1513 let thread = match self.set_thread(CurrentThread::None)?.guest() {
1514 Some(t) => *t,
1515 None => bail_bug!("expected task when exiting"),
1516 };
1517 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance;
1518 log::trace!("exit sync call {instance:?}");
1519 Instance::from_wasmtime(self, instance.instance).cleanup_thread(
1520 self,
1521 thread,
1522 instance.index,
1523 )?;
1524
1525 let state = self.concurrent_state_mut();
1526 let task = state.get_mut(thread.task)?;
1527 let caller = match &task.caller {
1528 &Caller::Guest { thread } => thread.into(),
1529 &Caller::Host { caller, .. } => caller,
1530 };
1531 self.set_thread(caller)?;
1532
1533 let state = self.concurrent_state_mut();
1534 let task = state.get_mut(thread.task)?;
1535 if task.ready_to_delete() {
1536 state.delete(thread.task)?.dispose(state)?;
1537 }
1538
1539 Ok(())
1540 }
1541
1542 /// Similar to `enter_guest_sync_call` except for when the guest makes a
1543 /// transition to the host.
1544 ///
1545 /// FIXME: this is called for all guest->host transitions and performs some
1546 /// relatively expensive table manipulations. This would ideally be
1547 /// optimized to avoid the full allocation of a `HostTask` in at least some
1548 /// situations.
host_task_create(&mut self) -> Result<Option<TableId<HostTask>>>1549 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> {
1550 if !self.concurrency_support() {
1551 self.enter_call_not_concurrent();
1552 return Ok(None);
1553 }
1554 let state = self.concurrent_state_mut();
1555 let caller = state.current_guest_thread()?;
1556 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?;
1557 log::trace!("new host task {task:?}");
1558 self.set_thread(task)?;
1559 Ok(Some(task))
1560 }
1561
1562 /// Invoked before lowering the results of a host task to the guest.
1563 ///
1564 /// This is used to update the current thread annotations within the store
1565 /// to ensure that it reflects the guest task, not the host task, since
1566 /// lowering may execute guest code.
host_task_reenter_caller(&mut self) -> Result<()>1567 pub fn host_task_reenter_caller(&mut self) -> Result<()> {
1568 if !self.concurrency_support() {
1569 return Ok(());
1570 }
1571 let task = self.concurrent_state_mut().current_host_thread()?;
1572 let caller = self.concurrent_state_mut().get_mut(task)?.caller;
1573 self.set_thread(caller)?;
1574 Ok(())
1575 }
1576
1577 /// Dual of `host_task_create` and signifies that the host has finished and
1578 /// will be cleaned up.
1579 ///
1580 /// Note that this isn't invoked when the host is invoked asynchronously and
1581 /// the host isn't complete yet. In that situation the host task persists
1582 /// and will be cleaned up separately in `subtask_drop`
host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()>1583 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> {
1584 match task {
1585 Some(task) => {
1586 log::trace!("delete host task {task:?}");
1587 self.concurrent_state_mut().delete(task)?;
1588 }
1589 None => {
1590 self.exit_call_not_concurrent();
1591 }
1592 }
1593 Ok(())
1594 }
1595
1596 /// Determine whether the specified instance may be entered from the host.
1597 ///
1598 /// We return `true` here only if all of the following hold:
1599 ///
1600 /// - The top-level instance is not already on the current task's call stack.
1601 /// - The instance is not in need of a post-return function call.
1602 /// - `self` has not been poisoned due to a trap.
may_enter(&mut self, instance: RuntimeInstance) -> Result<bool>1603 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> {
1604 if self.trapped() {
1605 return Ok(false);
1606 }
1607 if !self.concurrency_support() {
1608 return Ok(true);
1609 }
1610 let state = self.concurrent_state_mut();
1611 let mut cur = state.current_thread;
1612 loop {
1613 match cur {
1614 CurrentThread::None => break Ok(true),
1615 CurrentThread::Guest(thread) => {
1616 let task = state.get_mut(thread.task)?;
1617
1618 // Note that we only compare top-level instance IDs here.
1619 // The idea is that the host is not allowed to recursively
1620 // enter a top-level instance even if the specific leaf
1621 // instance is not on the stack. This the behavior defined
1622 // in the spec, and it allows us to elide runtime checks in
1623 // guest-to-guest adapters.
1624 if task.instance.instance == instance.instance {
1625 break Ok(false);
1626 }
1627 cur = match task.caller {
1628 Caller::Host { caller, .. } => caller,
1629 Caller::Guest { thread } => thread.into(),
1630 };
1631 }
1632 CurrentThread::Host(id) => {
1633 cur = state.get_mut(id)?.caller.into();
1634 }
1635 }
1636 }
1637 }
1638
1639 /// Helper function to retrieve the `InstanceState` for the
1640 /// specified instance.
instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState1641 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState {
1642 self.component_instance_mut(instance.instance)
1643 .instance_state(instance.index)
1644 }
1645
set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread>1646 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> {
1647 // Each time we switch threads, we conservatively set `task_may_block`
1648 // to `false` for the component instance we're switching away from (if
1649 // any), meaning it will be `false` for any new thread created for that
1650 // instance unless explicitly set otherwise.
1651 let state = self.concurrent_state_mut();
1652 let old_thread = mem::replace(&mut state.current_thread, thread.into());
1653 if let Some(old_thread) = old_thread.guest() {
1654 let instance = state.get_mut(old_thread.task)?.instance.instance;
1655 self.component_instance_mut(instance)
1656 .set_task_may_block(false)
1657 }
1658
1659 // If we're switching to a new thread, set its component instance's
1660 // `task_may_block` according to where it left off.
1661 if self.concurrent_state_mut().current_thread.guest().is_some() {
1662 self.set_task_may_block()?;
1663 }
1664
1665 Ok(old_thread)
1666 }
1667
1668 /// Set the global variable representing whether the current task may block
1669 /// prior to entering Wasm code.
set_task_may_block(&mut self) -> Result<()>1670 fn set_task_may_block(&mut self) -> Result<()> {
1671 let state = self.concurrent_state_mut();
1672 let guest_thread = state.current_guest_thread()?;
1673 let instance = state.get_mut(guest_thread.task)?.instance.instance;
1674 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?;
1675 self.component_instance_mut(instance)
1676 .set_task_may_block(may_block);
1677 Ok(())
1678 }
1679
check_blocking(&mut self) -> Result<()>1680 pub(crate) fn check_blocking(&mut self) -> Result<()> {
1681 if !self.concurrency_support() {
1682 return Ok(());
1683 }
1684 let state = self.concurrent_state_mut();
1685 let task = state.current_guest_thread()?.task;
1686 let instance = state.get_mut(task)?.instance.instance;
1687 let task_may_block = self.component_instance(instance).get_task_may_block();
1688
1689 if task_may_block {
1690 Ok(())
1691 } else {
1692 Err(Trap::CannotBlockSyncTask.into())
1693 }
1694 }
1695
1696 /// Record that we're about to enter a (sub-)component instance which does
1697 /// not support more than one concurrent, stackful activation, meaning it
1698 /// cannot be entered again until the next call returns.
enter_instance(&mut self, instance: RuntimeInstance)1699 fn enter_instance(&mut self, instance: RuntimeInstance) {
1700 log::trace!("enter {instance:?}");
1701 self.instance_state(instance)
1702 .concurrent_state()
1703 .do_not_enter = true;
1704 }
1705
1706 /// Record that we've exited a (sub-)component instance previously entered
1707 /// with `Self::enter_instance` and then calls `Self::partition_pending`.
1708 /// See the documentation for the latter for details.
exit_instance(&mut self, instance: RuntimeInstance) -> Result<()>1709 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> {
1710 log::trace!("exit {instance:?}");
1711 self.instance_state(instance)
1712 .concurrent_state()
1713 .do_not_enter = false;
1714 self.partition_pending(instance)
1715 }
1716
1717 /// Iterate over `InstanceState::pending`, moving any ready items into the
1718 /// "high priority" work item queue.
1719 ///
1720 /// See `GuestCall::is_ready` for details.
partition_pending(&mut self, instance: RuntimeInstance) -> Result<()>1721 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> {
1722 for (thread, kind) in
1723 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter()
1724 {
1725 let call = GuestCall { thread, kind };
1726 if call.is_ready(self)? {
1727 self.concurrent_state_mut()
1728 .push_high_priority(WorkItem::GuestCall(instance.index, call));
1729 } else {
1730 self.instance_state(instance)
1731 .concurrent_state()
1732 .pending
1733 .insert(call.thread, call.kind);
1734 }
1735 }
1736
1737 Ok(())
1738 }
1739
1740 /// Implements the `backpressure.{inc,dec}` intrinsics.
backpressure_modify( &mut self, caller_instance: RuntimeInstance, modify: impl FnOnce(u16) -> Option<u16>, ) -> Result<()>1741 pub(crate) fn backpressure_modify(
1742 &mut self,
1743 caller_instance: RuntimeInstance,
1744 modify: impl FnOnce(u16) -> Option<u16>,
1745 ) -> Result<()> {
1746 let state = self.instance_state(caller_instance).concurrent_state();
1747 let old = state.backpressure;
1748 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?;
1749 state.backpressure = new;
1750
1751 if old > 0 && new == 0 {
1752 // Backpressure was previously enabled and is now disabled; move any
1753 // newly-eligible guest calls to the "high priority" queue.
1754 self.partition_pending(caller_instance)?;
1755 }
1756
1757 Ok(())
1758 }
1759
1760 /// Resume the specified fiber, giving it exclusive access to the specified
1761 /// store.
resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()>1762 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> {
1763 let old_thread = self.concurrent_state_mut().current_thread;
1764 log::trace!("resume_fiber: save current thread {old_thread:?}");
1765
1766 let fiber = fiber::resolve_or_release(self, fiber).await?;
1767
1768 self.set_thread(old_thread)?;
1769
1770 let state = self.concurrent_state_mut();
1771
1772 if let Some(ot) = old_thread.guest() {
1773 state.get_mut(ot.thread)?.state = GuestThreadState::Running;
1774 }
1775 log::trace!("resume_fiber: restore current thread {old_thread:?}");
1776
1777 if let Some(mut fiber) = fiber {
1778 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason);
1779 // See the `SuspendReason` documentation for what each case means.
1780 let reason = match state.suspend_reason.take() {
1781 Some(r) => r,
1782 None => bail_bug!("suspend reason missing when resuming fiber"),
1783 };
1784 match reason {
1785 SuspendReason::NeedWork => {
1786 if state.worker.is_none() {
1787 state.worker = Some(fiber);
1788 } else {
1789 fiber.dispose(self);
1790 }
1791 }
1792 SuspendReason::Yielding { thread, .. } => {
1793 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber);
1794 let instance = state.get_mut(thread.task)?.instance.index;
1795 state.push_low_priority(WorkItem::ResumeThread(instance, thread));
1796 }
1797 SuspendReason::ExplicitlySuspending { thread, .. } => {
1798 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber);
1799 }
1800 SuspendReason::Waiting { set, thread, .. } => {
1801 let old = state
1802 .get_mut(set)?
1803 .waiting
1804 .insert(thread, WaitMode::Fiber(fiber));
1805 assert!(old.is_none());
1806 }
1807 };
1808 } else {
1809 log::trace!("resume_fiber: fiber has exited");
1810 }
1811
1812 Ok(())
1813 }
1814
1815 /// Suspend the current fiber, storing the reason in
1816 /// `ConcurrentState::suspend_reason` to indicate the conditions under which
1817 /// it should be resumed.
1818 ///
1819 /// See the `SuspendReason` documentation for details.
suspend(&mut self, reason: SuspendReason) -> Result<()>1820 fn suspend(&mut self, reason: SuspendReason) -> Result<()> {
1821 log::trace!("suspend fiber: {reason:?}");
1822
1823 // If we're yielding or waiting on behalf of a guest thread, we'll need to
1824 // pop the call context which manages resource borrows before suspending
1825 // and then push it again once we've resumed.
1826 let task = match &reason {
1827 SuspendReason::Yielding { thread, .. }
1828 | SuspendReason::Waiting { thread, .. }
1829 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task),
1830 SuspendReason::NeedWork => None,
1831 };
1832
1833 let old_guest_thread = if task.is_some() {
1834 self.concurrent_state_mut().current_thread
1835 } else {
1836 CurrentThread::None
1837 };
1838
1839 // We should not have reached here unless either there's no current
1840 // task, or the current task is permitted to block. In addition, we
1841 // special-case `thread.switch-to` and waiting for a subtask to go from
1842 // `starting` to `started`, both of which we consider non-blocking
1843 // operations despite requiring a suspend.
1844 debug_assert!(
1845 matches!(
1846 reason,
1847 SuspendReason::ExplicitlySuspending {
1848 skip_may_block_check: true,
1849 ..
1850 } | SuspendReason::Waiting {
1851 skip_may_block_check: true,
1852 ..
1853 } | SuspendReason::Yielding {
1854 skip_may_block_check: true,
1855 ..
1856 }
1857 ) || old_guest_thread
1858 .guest()
1859 .map(|thread| self.concurrent_state_mut().may_block(thread.task))
1860 .transpose()?
1861 .unwrap_or(true)
1862 );
1863
1864 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason;
1865 assert!(suspend_reason.is_none());
1866 *suspend_reason = Some(reason);
1867
1868 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?;
1869
1870 if task.is_some() {
1871 self.set_thread(old_guest_thread)?;
1872 }
1873
1874 Ok(())
1875 }
1876
wait_for_event(&mut self, waitable: Waitable) -> Result<()>1877 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> {
1878 let state = self.concurrent_state_mut();
1879 let caller = state.current_guest_thread()?;
1880 let old_set = waitable.common(state)?.set;
1881 let set = state.get_mut(caller.thread)?.sync_call_set;
1882 waitable.join(state, Some(set))?;
1883 self.suspend(SuspendReason::Waiting {
1884 set,
1885 thread: caller,
1886 skip_may_block_check: false,
1887 })?;
1888 let state = self.concurrent_state_mut();
1889 waitable.join(state, old_set)
1890 }
1891 }
1892
1893 impl Instance {
1894 /// Get the next pending event for the specified task and (optional)
1895 /// waitable set, along with the waitable handle if applicable.
get_event( self, store: &mut StoreOpaque, guest_task: TableId<GuestTask>, set: Option<TableId<WaitableSet>>, cancellable: bool, ) -> Result<Option<(Event, Option<(Waitable, u32)>)>>1896 fn get_event(
1897 self,
1898 store: &mut StoreOpaque,
1899 guest_task: TableId<GuestTask>,
1900 set: Option<TableId<WaitableSet>>,
1901 cancellable: bool,
1902 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> {
1903 let state = store.concurrent_state_mut();
1904
1905 let event = &mut state.get_mut(guest_task)?.event;
1906 if let Some(ev) = event
1907 && (cancellable || !matches!(ev, Event::Cancelled))
1908 {
1909 log::trace!("deliver event {ev:?} to {guest_task:?}");
1910 let ev = *ev;
1911 *event = None;
1912 return Ok(Some((ev, None)));
1913 }
1914
1915 let set = match set {
1916 Some(set) => set,
1917 None => return Ok(None),
1918 };
1919 let waitable = match state.get_mut(set)?.ready.pop_first() {
1920 Some(v) => v,
1921 None => return Ok(None),
1922 };
1923
1924 let common = waitable.common(state)?;
1925 let handle = match common.handle {
1926 Some(h) => h,
1927 None => bail_bug!("handle not set when delivering event"),
1928 };
1929 let event = match common.event.take() {
1930 Some(e) => e,
1931 None => bail_bug!("event not set when delivering event"),
1932 };
1933
1934 log::trace!(
1935 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
1936 );
1937
1938 waitable.on_delivery(store, self, event)?;
1939
1940 Ok(Some((event, Some((waitable, handle)))))
1941 }
1942
1943 /// Handle the `CallbackCode` returned from an async-lifted export or its
1944 /// callback.
1945 ///
1946 /// If this returns `Ok(Some(call))`, then `call` should be run immediately
1947 /// using `handle_guest_call`.
handle_callback_code( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, code: u32, ) -> Result<Option<GuestCall>>1948 fn handle_callback_code(
1949 self,
1950 store: &mut StoreOpaque,
1951 guest_thread: QualifiedThreadId,
1952 runtime_instance: RuntimeComponentInstanceIndex,
1953 code: u32,
1954 ) -> Result<Option<GuestCall>> {
1955 let (code, set) = unpack_callback_code(code);
1956
1957 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})");
1958
1959 let state = store.concurrent_state_mut();
1960
1961 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> {
1962 let set = store
1963 .instance_state(RuntimeInstance {
1964 instance: self.id().instance(),
1965 index: runtime_instance,
1966 })
1967 .handle_table()
1968 .waitable_set_rep(handle)?;
1969
1970 Ok(TableId::<WaitableSet>::new(set))
1971 };
1972
1973 Ok(match code {
1974 callback_code::EXIT => {
1975 log::trace!("implicit thread {guest_thread:?} completed");
1976 self.cleanup_thread(store, guest_thread, runtime_instance)?;
1977 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
1978 if task.threads.is_empty() && !task.returned_or_cancelled() {
1979 bail!(Trap::NoAsyncResult);
1980 }
1981 if let Caller::Guest { .. } = task.caller {
1982 task.exited = true;
1983 task.callback = None;
1984 }
1985 if task.ready_to_delete() {
1986 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?;
1987 }
1988 None
1989 }
1990 callback_code::YIELD => {
1991 let task = state.get_mut(guest_thread.task)?;
1992 // If an `Event::Cancelled` is pending, we'll deliver that;
1993 // otherwise, we'll deliver `Event::None`. Note that
1994 // `GuestTask::event` is only ever set to one of those two
1995 // `Event` variants.
1996 if let Some(event) = task.event {
1997 assert!(matches!(event, Event::None | Event::Cancelled));
1998 } else {
1999 task.event = Some(Event::None);
2000 }
2001 let call = GuestCall {
2002 thread: guest_thread,
2003 kind: GuestCallKind::DeliverEvent {
2004 instance: self,
2005 set: None,
2006 },
2007 };
2008 if state.may_block(guest_thread.task)? {
2009 // Push this thread onto the "low priority" queue so it runs
2010 // after any other threads have had a chance to run.
2011 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call));
2012 None
2013 } else {
2014 // Yielding in a non-blocking context is defined as a no-op
2015 // according to the spec, so we must run this thread
2016 // immediately without allowing any others to run.
2017 Some(call)
2018 }
2019 }
2020 callback_code::WAIT => {
2021 // The task may only return `WAIT` if it was created for a call
2022 // to an async export). Otherwise, we'll trap.
2023 state.check_blocking_for(guest_thread.task)?;
2024
2025 let set = get_set(store, set)?;
2026 let state = store.concurrent_state_mut();
2027
2028 if state.get_mut(guest_thread.task)?.event.is_some()
2029 || !state.get_mut(set)?.ready.is_empty()
2030 {
2031 // An event is immediately available; deliver it ASAP.
2032 state.push_high_priority(WorkItem::GuestCall(
2033 runtime_instance,
2034 GuestCall {
2035 thread: guest_thread,
2036 kind: GuestCallKind::DeliverEvent {
2037 instance: self,
2038 set: Some(set),
2039 },
2040 },
2041 ));
2042 } else {
2043 // No event is immediately available.
2044 //
2045 // We're waiting, so register to be woken up when an event
2046 // is published for this waitable set.
2047 //
2048 // Here we also set `GuestTask::wake_on_cancel` which allows
2049 // `subtask.cancel` to interrupt the wait.
2050 let old = state
2051 .get_mut(guest_thread.thread)?
2052 .wake_on_cancel
2053 .replace(set);
2054 if !old.is_none() {
2055 bail_bug!("thread unexpectedly had wake_on_cancel set");
2056 }
2057 let old = state
2058 .get_mut(set)?
2059 .waiting
2060 .insert(guest_thread, WaitMode::Callback(self));
2061 if !old.is_none() {
2062 bail_bug!("set's waiting set already had this thread registered");
2063 }
2064 }
2065 None
2066 }
2067 _ => bail!(Trap::UnsupportedCallbackCode),
2068 })
2069 }
2070
cleanup_thread( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result<()>2071 fn cleanup_thread(
2072 self,
2073 store: &mut StoreOpaque,
2074 guest_thread: QualifiedThreadId,
2075 runtime_instance: RuntimeComponentInstanceIndex,
2076 ) -> Result<()> {
2077 let state = store.concurrent_state_mut();
2078 let thread_data = state.get_mut(guest_thread.thread)?;
2079 let guest_id = match thread_data.instance_rep {
2080 Some(id) => id,
2081 None => bail_bug!("thread must have instance_rep set by now"),
2082 };
2083 let sync_call_set = thread_data.sync_call_set;
2084
2085 // Clean up any pending subtasks in the sync_call_set
2086 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) {
2087 if let Some(Event::Subtask {
2088 status: Status::Returned | Status::ReturnCancelled,
2089 }) = waitable.common(state)?.event
2090 {
2091 waitable.delete_from(state)?;
2092 }
2093 }
2094
2095 store
2096 .instance_state(RuntimeInstance {
2097 instance: self.id().instance(),
2098 index: runtime_instance,
2099 })
2100 .thread_handle_table()
2101 .guest_thread_remove(guest_id)?;
2102
2103 store.concurrent_state_mut().delete(guest_thread.thread)?;
2104 store.concurrent_state_mut().delete(sync_call_set)?;
2105 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2106 task.threads.remove(&guest_thread.thread);
2107 Ok(())
2108 }
2109
2110 /// Add the specified guest call to the "high priority" work item queue, to
2111 /// be started as soon as backpressure and/or reentrance rules allow.
2112 ///
2113 /// SAFETY: The raw pointer arguments must be valid references to guest
2114 /// functions (with the appropriate signatures) when the closures queued by
2115 /// this function are called.
queue_call<T: 'static>( self, mut store: StoreContextMut<T>, guest_thread: QualifiedThreadId, callee: SendSyncPtr<VMFuncRef>, param_count: usize, result_count: usize, async_: bool, callback: Option<SendSyncPtr<VMFuncRef>>, post_return: Option<SendSyncPtr<VMFuncRef>>, ) -> Result<()>2116 unsafe fn queue_call<T: 'static>(
2117 self,
2118 mut store: StoreContextMut<T>,
2119 guest_thread: QualifiedThreadId,
2120 callee: SendSyncPtr<VMFuncRef>,
2121 param_count: usize,
2122 result_count: usize,
2123 async_: bool,
2124 callback: Option<SendSyncPtr<VMFuncRef>>,
2125 post_return: Option<SendSyncPtr<VMFuncRef>>,
2126 ) -> Result<()> {
2127 /// Return a closure which will call the specified function in the scope
2128 /// of the specified task.
2129 ///
2130 /// This will use `GuestTask::lower_params` to lower the parameters, but
2131 /// will not lift the result; instead, it returns a
2132 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if
2133 /// any, may be lifted. Note that an async-lifted export will have
2134 /// returned its result using the `task.return` intrinsic (or not
2135 /// returned a result at all, in the case of `task.cancel`), in which
2136 /// case the "result" of this call will either be a callback code or
2137 /// nothing.
2138 ///
2139 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when
2140 /// the returned closure is called.
2141 unsafe fn make_call<T: 'static>(
2142 store: StoreContextMut<T>,
2143 guest_thread: QualifiedThreadId,
2144 callee: SendSyncPtr<VMFuncRef>,
2145 param_count: usize,
2146 result_count: usize,
2147 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]>
2148 + Send
2149 + Sync
2150 + 'static
2151 + use<T> {
2152 let token = StoreToken::new(store);
2153 move |store: &mut dyn VMStore| {
2154 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS];
2155
2156 store
2157 .concurrent_state_mut()
2158 .get_mut(guest_thread.thread)?
2159 .state = GuestThreadState::Running;
2160 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2161 let lower = match task.lower_params.take() {
2162 Some(l) => l,
2163 None => bail_bug!("lower_params missing"),
2164 };
2165
2166 lower(store, &mut storage[..param_count])?;
2167
2168 let mut store = token.as_context_mut(store);
2169
2170 // SAFETY: Per the contract documented in `make_call's`
2171 // documentation, `callee` must be a valid pointer.
2172 unsafe {
2173 crate::Func::call_unchecked_raw(
2174 &mut store,
2175 callee.as_non_null(),
2176 NonNull::new(
2177 &mut storage[..param_count.max(result_count)]
2178 as *mut [MaybeUninit<ValRaw>] as _,
2179 )
2180 .unwrap(),
2181 )?;
2182 }
2183
2184 Ok(storage)
2185 }
2186 }
2187
2188 // SAFETY: Per the contract described in this function documentation,
2189 // the `callee` pointer which `call` closes over must be valid when
2190 // called by the closure we queue below.
2191 let call = unsafe {
2192 make_call(
2193 store.as_context_mut(),
2194 guest_thread,
2195 callee,
2196 param_count,
2197 result_count,
2198 )
2199 };
2200
2201 let callee_instance = store
2202 .0
2203 .concurrent_state_mut()
2204 .get_mut(guest_thread.task)?
2205 .instance;
2206
2207 let fun = if callback.is_some() {
2208 assert!(async_);
2209
2210 Box::new(move |store: &mut dyn VMStore| {
2211 self.add_guest_thread_to_instance_table(
2212 guest_thread.thread,
2213 store,
2214 callee_instance.index,
2215 )?;
2216 let old_thread = store.set_thread(guest_thread)?;
2217 log::trace!(
2218 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread"
2219 );
2220
2221 store.enter_instance(callee_instance);
2222
2223 // SAFETY: See the documentation for `make_call` to review the
2224 // contract we must uphold for `call` here.
2225 //
2226 // Per the contract described in the `queue_call`
2227 // documentation, the `callee` pointer which `call` closes
2228 // over must be valid.
2229 let storage = call(store)?;
2230
2231 store.exit_instance(callee_instance)?;
2232
2233 store.set_thread(old_thread)?;
2234 let state = store.concurrent_state_mut();
2235 if let Some(t) = old_thread.guest() {
2236 state.get_mut(t.thread)?.state = GuestThreadState::Running;
2237 }
2238 log::trace!("stackless call: restored {old_thread:?} as current thread");
2239
2240 // SAFETY: `wasmparser` will have validated that the callback
2241 // function returns a `i32` result.
2242 let code = unsafe { storage[0].assume_init() }.get_i32() as u32;
2243
2244 self.handle_callback_code(store, guest_thread, callee_instance.index, code)
2245 })
2246 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>
2247 } else {
2248 let token = StoreToken::new(store.as_context_mut());
2249 Box::new(move |store: &mut dyn VMStore| {
2250 self.add_guest_thread_to_instance_table(
2251 guest_thread.thread,
2252 store,
2253 callee_instance.index,
2254 )?;
2255 let old_thread = store.set_thread(guest_thread)?;
2256 log::trace!(
2257 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread",
2258 );
2259 let flags = self.id().get(store).instance_flags(callee_instance.index);
2260
2261 // Unless this is a callback-less (i.e. stackful)
2262 // async-lifted export, we need to record that the instance
2263 // cannot be entered until the call returns.
2264 if !async_ {
2265 store.enter_instance(callee_instance);
2266 }
2267
2268 // SAFETY: See the documentation for `make_call` to review the
2269 // contract we must uphold for `call` here.
2270 //
2271 // Per the contract described in the `queue_call`
2272 // documentation, the `callee` pointer which `call` closes
2273 // over must be valid.
2274 let storage = call(store)?;
2275
2276 if async_ {
2277 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?;
2278 if task.threads.len() == 1 && !task.returned_or_cancelled() {
2279 bail!(Trap::NoAsyncResult);
2280 }
2281 } else {
2282 // This is a sync-lifted export, so now is when we lift the
2283 // result, optionally call the post-return function, if any,
2284 // and finally notify any current or future waiters that the
2285 // subtask has returned.
2286
2287 let lift = {
2288 store.exit_instance(callee_instance)?;
2289
2290 let state = store.concurrent_state_mut();
2291 if !state.get_mut(guest_thread.task)?.result.is_none() {
2292 bail_bug!("task has not yet produced a result");
2293 }
2294
2295 match state.get_mut(guest_thread.task)?.lift_result.take() {
2296 Some(lift) => lift,
2297 None => bail_bug!("lift_result field is missing"),
2298 }
2299 };
2300
2301 // SAFETY: `result_count` represents the number of core Wasm
2302 // results returned, per `wasmparser`.
2303 let result = (lift.lift)(store, unsafe {
2304 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>(
2305 &storage[..result_count],
2306 )
2307 })?;
2308
2309 let post_return_arg = match result_count {
2310 0 => ValRaw::i32(0),
2311 // SAFETY: `result_count` represents the number of
2312 // core Wasm results returned, per `wasmparser`.
2313 1 => unsafe { storage[0].assume_init() },
2314 _ => unreachable!(),
2315 };
2316
2317 unsafe {
2318 call_post_return(
2319 token.as_context_mut(store),
2320 post_return.map(|v| v.as_non_null()),
2321 post_return_arg,
2322 flags,
2323 )?;
2324 }
2325
2326 self.task_complete(store, guest_thread.task, result, Status::Returned)?;
2327 }
2328
2329 // This is a callback-less call, so the implicit thread has now completed
2330 self.cleanup_thread(store, guest_thread, callee_instance.index)?;
2331
2332 store.set_thread(old_thread)?;
2333
2334 let state = store.concurrent_state_mut();
2335 let task = state.get_mut(guest_thread.task)?;
2336
2337 match &task.caller {
2338 Caller::Host { .. } => {
2339 if task.ready_to_delete() {
2340 Waitable::Guest(guest_thread.task).delete_from(state)?;
2341 }
2342 }
2343 Caller::Guest { .. } => {
2344 task.exited = true;
2345 }
2346 }
2347
2348 Ok(None)
2349 })
2350 };
2351
2352 store
2353 .0
2354 .concurrent_state_mut()
2355 .push_high_priority(WorkItem::GuestCall(
2356 callee_instance.index,
2357 GuestCall {
2358 thread: guest_thread,
2359 kind: GuestCallKind::StartImplicit(fun),
2360 },
2361 ));
2362
2363 Ok(())
2364 }
2365
2366 /// Prepare (but do not start) a guest->guest call.
2367 ///
2368 /// This is called from fused adapter code generated in
2369 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_`
2370 /// are synthesized Wasm functions which move the parameters from the caller
2371 /// to the callee and the result from the callee to the caller,
2372 /// respectively. The adapter will call `Self::start_call` immediately
2373 /// after calling this function.
2374 ///
2375 /// SAFETY: All the pointer arguments must be valid pointers to guest
2376 /// entities (and with the expected signatures for the function references
2377 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details).
prepare_call<T: 'static>( self, mut store: StoreContextMut<T>, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, memory: *mut VMMemoryDefinition, string_encoding: StringEncoding, caller_info: CallerInfo, ) -> Result<()>2378 unsafe fn prepare_call<T: 'static>(
2379 self,
2380 mut store: StoreContextMut<T>,
2381 start: NonNull<VMFuncRef>,
2382 return_: NonNull<VMFuncRef>,
2383 caller_instance: RuntimeComponentInstanceIndex,
2384 callee_instance: RuntimeComponentInstanceIndex,
2385 task_return_type: TypeTupleIndex,
2386 callee_async: bool,
2387 memory: *mut VMMemoryDefinition,
2388 string_encoding: StringEncoding,
2389 caller_info: CallerInfo,
2390 ) -> Result<()> {
2391 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) {
2392 // A task may only call an async-typed function via a sync lower if
2393 // it was created by a call to an async export. Otherwise, we'll
2394 // trap.
2395 store.0.check_blocking()?;
2396 }
2397
2398 enum ResultInfo {
2399 Heap { results: u32 },
2400 Stack { result_count: u32 },
2401 }
2402
2403 let result_info = match &caller_info {
2404 CallerInfo::Async {
2405 has_result: true,
2406 params,
2407 } => ResultInfo::Heap {
2408 results: match params.last() {
2409 Some(r) => r.get_u32(),
2410 None => bail_bug!("retptr missing"),
2411 },
2412 },
2413 CallerInfo::Async {
2414 has_result: false, ..
2415 } => ResultInfo::Stack { result_count: 0 },
2416 CallerInfo::Sync {
2417 result_count,
2418 params,
2419 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap {
2420 results: match params.last() {
2421 Some(r) => r.get_u32(),
2422 None => bail_bug!("arg ptr missing"),
2423 },
2424 },
2425 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack {
2426 result_count: *result_count,
2427 },
2428 };
2429
2430 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. });
2431
2432 // Create a new guest task for the call, closing over the `start` and
2433 // `return_` functions to lift the parameters and lower the result,
2434 // respectively.
2435 let start = SendSyncPtr::new(start);
2436 let return_ = SendSyncPtr::new(return_);
2437 let token = StoreToken::new(store.as_context_mut());
2438 let state = store.0.concurrent_state_mut();
2439 let old_thread = state.current_guest_thread()?;
2440
2441 debug_assert_eq!(
2442 state.get_mut(old_thread.task)?.instance,
2443 RuntimeInstance {
2444 instance: self.id().instance(),
2445 index: caller_instance,
2446 }
2447 );
2448
2449 let new_task = GuestTask::new(
2450 Box::new(move |store, dst| {
2451 let mut store = token.as_context_mut(store);
2452 assert!(dst.len() <= MAX_FLAT_PARAMS);
2453 // The `+ 1` here accounts for the return pointer, if any:
2454 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1];
2455 let count = match caller_info {
2456 // Async callers, if they have a result, use the last
2457 // parameter as a return pointer so chop that off if
2458 // relevant here.
2459 CallerInfo::Async { params, has_result } => {
2460 let params = ¶ms[..params.len() - usize::from(has_result)];
2461 for (param, src) in params.iter().zip(&mut src) {
2462 src.write(*param);
2463 }
2464 params.len()
2465 }
2466
2467 // Sync callers forward everything directly.
2468 CallerInfo::Sync { params, .. } => {
2469 for (param, src) in params.iter().zip(&mut src) {
2470 src.write(*param);
2471 }
2472 params.len()
2473 }
2474 };
2475 // SAFETY: `start` is a valid `*mut VMFuncRef` from
2476 // `wasmtime-cranelift`-generated fused adapter code. Based on
2477 // how it was constructed (see
2478 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter`
2479 // for details) we know it takes count parameters and returns
2480 // `dst.len()` results.
2481 unsafe {
2482 crate::Func::call_unchecked_raw(
2483 &mut store,
2484 start.as_non_null(),
2485 NonNull::new(
2486 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _,
2487 )
2488 .unwrap(),
2489 )?;
2490 }
2491 dst.copy_from_slice(&src[..dst.len()]);
2492 let state = store.0.concurrent_state_mut();
2493 Waitable::Guest(state.current_guest_thread()?.task).set_event(
2494 state,
2495 Some(Event::Subtask {
2496 status: Status::Started,
2497 }),
2498 )?;
2499 Ok(())
2500 }),
2501 LiftResult {
2502 lift: Box::new(move |store, src| {
2503 // SAFETY: See comment in closure passed as `lower_params`
2504 // parameter above.
2505 let mut store = token.as_context_mut(store);
2506 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation?
2507 if let ResultInfo::Heap { results } = &result_info {
2508 my_src.push(ValRaw::u32(*results));
2509 }
2510
2511 // Execute the `return_` hook, generated by Wasmtime's FACT
2512 // compiler, in the context of the old thread. The old
2513 // thread, this thread's caller, may have `realloc`
2514 // callbacks invoked for example and those need the correct
2515 // context set for the current thread.
2516 let prev = store.0.set_thread(old_thread)?;
2517
2518 // SAFETY: `return_` is a valid `*mut VMFuncRef` from
2519 // `wasmtime-cranelift`-generated fused adapter code. Based
2520 // on how it was constructed (see
2521 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter`
2522 // for details) we know it takes `src.len()` parameters and
2523 // returns up to 1 result.
2524 unsafe {
2525 crate::Func::call_unchecked_raw(
2526 &mut store,
2527 return_.as_non_null(),
2528 my_src.as_mut_slice().into(),
2529 )?;
2530 }
2531
2532 // Restore the previous current thread after the
2533 // lifting/lowering has returned.
2534 store.0.set_thread(prev)?;
2535
2536 let state = store.0.concurrent_state_mut();
2537 let thread = state.current_guest_thread()?;
2538 if sync_caller {
2539 state.get_mut(thread.task)?.sync_result = SyncResult::Produced(
2540 if let ResultInfo::Stack { result_count } = &result_info {
2541 match result_count {
2542 0 => None,
2543 1 => Some(my_src[0]),
2544 _ => unreachable!(),
2545 }
2546 } else {
2547 None
2548 },
2549 );
2550 }
2551 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>)
2552 }),
2553 ty: task_return_type,
2554 memory: NonNull::new(memory).map(SendSyncPtr::new),
2555 string_encoding,
2556 },
2557 Caller::Guest { thread: old_thread },
2558 None,
2559 RuntimeInstance {
2560 instance: self.id().instance(),
2561 index: callee_instance,
2562 },
2563 callee_async,
2564 )?;
2565
2566 let guest_task = state.push(new_task)?;
2567 let new_thread = GuestThread::new_implicit(state, guest_task)?;
2568 let guest_thread = state.push(new_thread)?;
2569 state.get_mut(guest_task)?.threads.insert(guest_thread);
2570
2571 // Make the new thread the current one so that `Self::start_call` knows
2572 // which one to start.
2573 store.0.set_thread(QualifiedThreadId {
2574 task: guest_task,
2575 thread: guest_thread,
2576 })?;
2577 log::trace!(
2578 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}"
2579 );
2580
2581 Ok(())
2582 }
2583
2584 /// Call the specified callback function for an async-lifted export.
2585 ///
2586 /// SAFETY: `function` must be a valid reference to a guest function of the
2587 /// correct signature for a callback.
call_callback<T>( self, mut store: StoreContextMut<T>, function: SendSyncPtr<VMFuncRef>, event: Event, handle: u32, ) -> Result<u32>2588 unsafe fn call_callback<T>(
2589 self,
2590 mut store: StoreContextMut<T>,
2591 function: SendSyncPtr<VMFuncRef>,
2592 event: Event,
2593 handle: u32,
2594 ) -> Result<u32> {
2595 let (ordinal, result) = event.parts();
2596 let params = &mut [
2597 ValRaw::u32(ordinal),
2598 ValRaw::u32(handle),
2599 ValRaw::u32(result),
2600 ];
2601 // SAFETY: `func` is a valid `*mut VMFuncRef` from either
2602 // `wasmtime-cranelift`-generated fused adapter code or
2603 // `component::Options`. Per `wasmparser` callback signature
2604 // validation, we know it takes three parameters and returns one.
2605 unsafe {
2606 crate::Func::call_unchecked_raw(
2607 &mut store,
2608 function.as_non_null(),
2609 params.as_mut_slice().into(),
2610 )?;
2611 }
2612 Ok(params[0].get_u32())
2613 }
2614
2615 /// Start a guest->guest call previously prepared using
2616 /// `Self::prepare_call`.
2617 ///
2618 /// This is called from fused adapter code generated in
2619 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call
2620 /// this function immediately after calling `Self::prepare_call`.
2621 ///
2622 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest
2623 /// functions with the appropriate signatures for the current guest task.
2624 /// If this is a call to an async-lowered import, the actual call may be
2625 /// deferred and run after this function returns, in which case the pointer
2626 /// arguments must also be valid when the call happens.
start_call<T: 'static>( self, mut store: StoreContextMut<T>, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, storage: Option<&mut [MaybeUninit<ValRaw>]>, ) -> Result<u32>2627 unsafe fn start_call<T: 'static>(
2628 self,
2629 mut store: StoreContextMut<T>,
2630 callback: *mut VMFuncRef,
2631 post_return: *mut VMFuncRef,
2632 callee: NonNull<VMFuncRef>,
2633 param_count: u32,
2634 result_count: u32,
2635 flags: u32,
2636 storage: Option<&mut [MaybeUninit<ValRaw>]>,
2637 ) -> Result<u32> {
2638 let token = StoreToken::new(store.as_context_mut());
2639 let async_caller = storage.is_none();
2640 let state = store.0.concurrent_state_mut();
2641 let guest_thread = state.current_guest_thread()?;
2642 let callee_async = state.get_mut(guest_thread.task)?.async_function;
2643 let callee = SendSyncPtr::new(callee);
2644 let param_count = usize::try_from(param_count)?;
2645 assert!(param_count <= MAX_FLAT_PARAMS);
2646 let result_count = usize::try_from(result_count)?;
2647 assert!(result_count <= MAX_FLAT_RESULTS);
2648
2649 let task = state.get_mut(guest_thread.task)?;
2650 if let Some(callback) = NonNull::new(callback) {
2651 // We're calling an async-lifted export with a callback, so store
2652 // the callback and related context as part of the task so we can
2653 // call it later when needed.
2654 let callback = SendSyncPtr::new(callback);
2655 task.callback = Some(Box::new(move |store, event, handle| {
2656 let store = token.as_context_mut(store);
2657 unsafe { self.call_callback::<T>(store, callback, event, handle) }
2658 }));
2659 }
2660
2661 let Caller::Guest { thread: caller } = &task.caller else {
2662 // As of this writing, `start_call` is only used for guest->guest
2663 // calls.
2664 bail_bug!("start_call unexpectedly invoked for host->guest call");
2665 };
2666 let caller = *caller;
2667 let caller_instance = state.get_mut(caller.task)?.instance;
2668
2669 // Queue the call as a "high priority" work item.
2670 unsafe {
2671 self.queue_call(
2672 store.as_context_mut(),
2673 guest_thread,
2674 callee,
2675 param_count,
2676 result_count,
2677 (flags & START_FLAG_ASYNC_CALLEE) != 0,
2678 NonNull::new(callback).map(SendSyncPtr::new),
2679 NonNull::new(post_return).map(SendSyncPtr::new),
2680 )?;
2681 }
2682
2683 let state = store.0.concurrent_state_mut();
2684
2685 // Use the caller's `GuestThread::sync_call_set` to register interest in
2686 // the subtask...
2687 let guest_waitable = Waitable::Guest(guest_thread.task);
2688 let old_set = guest_waitable.common(state)?.set;
2689 let set = state.get_mut(caller.thread)?.sync_call_set;
2690 guest_waitable.join(state, Some(set))?;
2691
2692 // ... and suspend this fiber temporarily while we wait for it to start.
2693 //
2694 // Note that we _could_ call the callee directly using the current fiber
2695 // rather than suspend this one, but that would make reasoning about the
2696 // event loop more complicated and is probably only worth doing if
2697 // there's a measurable performance benefit. In addition, it would mean
2698 // blocking the caller if the callee calls a blocking sync-lowered
2699 // import, and as of this writing the spec says we must not do that.
2700 //
2701 // Alternatively, the fused adapter code could be modified to call the
2702 // callee directly without calling a host-provided intrinsic at all (in
2703 // which case it would need to do its own, inline backpressure checks,
2704 // etc.). Again, we'd want to see a measurable performance benefit
2705 // before committing to such an optimization. And again, we'd need to
2706 // update the spec to allow that.
2707 let (status, waitable) = loop {
2708 store.0.suspend(SuspendReason::Waiting {
2709 set,
2710 thread: caller,
2711 // Normally, `StoreOpaque::suspend` would assert it's being
2712 // called from a context where blocking is allowed. However, if
2713 // `async_caller` is `true`, we'll only "block" long enough for
2714 // the callee to start, i.e. we won't repeat this loop, so we
2715 // tell `suspend` it's okay even if we're not allowed to block.
2716 // Alternatively, if the callee is not an async function, then
2717 // we know it won't block anyway.
2718 skip_may_block_check: async_caller || !callee_async,
2719 })?;
2720
2721 let state = store.0.concurrent_state_mut();
2722
2723 log::trace!("taking event for {:?}", guest_thread.task);
2724 let event = guest_waitable.take_event(state)?;
2725 let Some(Event::Subtask { status }) = event else {
2726 bail_bug!("subtasks should only get subtask events, got {event:?}")
2727 };
2728
2729 log::trace!("status {status:?} for {:?}", guest_thread.task);
2730
2731 if status == Status::Returned {
2732 // It returned, so we can stop waiting.
2733 break (status, None);
2734 } else if async_caller {
2735 // It hasn't returned yet, but the caller is calling via an
2736 // async-lowered import, so we generate a handle for the task
2737 // waitable and return the status.
2738 let handle = store
2739 .0
2740 .instance_state(caller_instance)
2741 .handle_table()
2742 .subtask_insert_guest(guest_thread.task.rep())?;
2743 store
2744 .0
2745 .concurrent_state_mut()
2746 .get_mut(guest_thread.task)?
2747 .common
2748 .handle = Some(handle);
2749 break (status, Some(handle));
2750 } else {
2751 // The callee hasn't returned yet, and the caller is calling via
2752 // a sync-lowered import, so we loop and keep waiting until the
2753 // callee returns.
2754 }
2755 };
2756
2757 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?;
2758
2759 // Reset the current thread to point to the caller as it resumes control.
2760 store.0.set_thread(caller)?;
2761 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running;
2762 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}");
2763
2764 if let Some(storage) = storage {
2765 // The caller used a sync-lowered import to call an async-lifted
2766 // export, in which case the result, if any, has been stashed in
2767 // `GuestTask::sync_result`.
2768 let state = store.0.concurrent_state_mut();
2769 let task = state.get_mut(guest_thread.task)?;
2770 if let Some(result) = task.sync_result.take()? {
2771 if let Some(result) = result {
2772 storage[0] = MaybeUninit::new(result);
2773 }
2774
2775 if task.exited && task.ready_to_delete() {
2776 Waitable::Guest(guest_thread.task).delete_from(state)?;
2777 }
2778 }
2779 }
2780
2781 Ok(status.pack(waitable))
2782 }
2783
2784 /// Poll the specified future once on behalf of a guest->host call using an
2785 /// async-lowered import.
2786 ///
2787 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns
2788 /// `Pending`, add it to the set of futures to be polled as part of this
2789 /// instance's event loop until it completes, and then return
2790 /// `Ok(Some(handle))` where `handle` is the waitable handle to return.
2791 ///
2792 /// Whether the future returns `Ready` immediately or later, the `lower`
2793 /// function will be used to lower the result, if any, into the guest caller's
2794 /// stack and linear memory. The `lower` function is invoked with `None` if
2795 /// the future is cancelled.
first_poll<T: 'static, R: Send + 'static>( self, mut store: StoreContextMut<'_, T>, future: impl Future<Output = Result<R>> + Send + 'static, lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static, ) -> Result<Option<u32>>2796 pub(crate) fn first_poll<T: 'static, R: Send + 'static>(
2797 self,
2798 mut store: StoreContextMut<'_, T>,
2799 future: impl Future<Output = Result<R>> + Send + 'static,
2800 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static,
2801 ) -> Result<Option<u32>> {
2802 let token = StoreToken::new(store.as_context_mut());
2803 let state = store.0.concurrent_state_mut();
2804 let task = state.current_host_thread()?;
2805
2806 // Create an abortable future which hooks calls to poll and manages call
2807 // context state for the future.
2808 let (join_handle, future) = JoinHandle::run(future);
2809 {
2810 let state = &mut state.get_mut(task)?.state;
2811 assert!(matches!(state, HostTaskState::CalleeStarted));
2812 *state = HostTaskState::CalleeRunning(join_handle);
2813 }
2814
2815 let mut future = Box::pin(future);
2816
2817 // Finally, poll the future. We can use a dummy `Waker` here because
2818 // we'll add the future to `ConcurrentState::futures` and poll it
2819 // automatically from the event loop if it doesn't complete immediately
2820 // here.
2821 let poll = tls::set(store.0, || {
2822 future
2823 .as_mut()
2824 .poll(&mut Context::from_waker(&Waker::noop()))
2825 });
2826
2827 match poll {
2828 // It finished immediately; lower the result and delete the task.
2829 Poll::Ready(result) => {
2830 let result = result.transpose()?;
2831 lower(store.as_context_mut(), result)?;
2832 return Ok(None);
2833 }
2834
2835 // Future isn't ready yet, so fall through.
2836 Poll::Pending => {}
2837 }
2838
2839 // It hasn't finished yet; add the future to
2840 // `ConcurrentState::futures` so it will be polled by the event
2841 // loop and allocate a waitable handle to return to the guest.
2842
2843 // Wrap the future in a closure responsible for lowering the result into
2844 // the guest's stack and memory, as well as notifying any waiters that
2845 // the task returned.
2846 let future = Box::pin(async move {
2847 let result = match future.await {
2848 Some(result) => Some(result?),
2849 None => None,
2850 };
2851 let on_complete = move |store: &mut dyn VMStore| {
2852 // Restore the `current_thread` to be the host so `lower` knows
2853 // how to manipulate borrows and knows which scope of borrows
2854 // to check.
2855 let mut store = token.as_context_mut(store);
2856 let old = store.0.set_thread(task)?;
2857
2858 let status = if result.is_some() {
2859 Status::Returned
2860 } else {
2861 Status::ReturnCancelled
2862 };
2863
2864 lower(store.as_context_mut(), result)?;
2865 let state = store.0.concurrent_state_mut();
2866 match &mut state.get_mut(task)?.state {
2867 // The task is already flagged as finished because it was
2868 // cancelled. No need to transition further.
2869 HostTaskState::CalleeDone { .. } => {}
2870
2871 // Otherwise transition this task to the done state.
2872 other => *other = HostTaskState::CalleeDone { cancelled: false },
2873 }
2874 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?;
2875
2876 store.0.set_thread(old)?;
2877 Ok(())
2878 };
2879
2880 // Here we schedule a task to run on a worker fiber to do the
2881 // lowering since it may involve a call to the guest's realloc
2882 // function. This is necessary because calling the guest while
2883 // there are host embedder frames on the stack is unsound.
2884 tls::get(move |store| {
2885 store
2886 .concurrent_state_mut()
2887 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new(
2888 on_complete,
2889 ))));
2890 Ok(())
2891 })
2892 });
2893
2894 // Make this task visible to the guest and then record what it
2895 // was made visible as.
2896 let state = store.0.concurrent_state_mut();
2897 state.push_future(future);
2898 let caller = state.get_mut(task)?.caller;
2899 let instance = state.get_mut(caller.task)?.instance;
2900 let handle = store
2901 .0
2902 .instance_state(instance)
2903 .handle_table()
2904 .subtask_insert_host(task.rep())?;
2905 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle);
2906 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}");
2907
2908 // Restore the currently running thread to this host task's
2909 // caller. Note that the host task isn't deallocated as it's
2910 // within the store and will get deallocated later.
2911 store.0.set_thread(caller)?;
2912 Ok(Some(handle))
2913 }
2914
2915 /// Implements the `task.return` intrinsic, lifting the result for the
2916 /// current guest task.
task_return( self, store: &mut dyn VMStore, ty: TypeTupleIndex, options: OptionsIndex, storage: &[ValRaw], ) -> Result<()>2917 pub(crate) fn task_return(
2918 self,
2919 store: &mut dyn VMStore,
2920 ty: TypeTupleIndex,
2921 options: OptionsIndex,
2922 storage: &[ValRaw],
2923 ) -> Result<()> {
2924 let state = store.concurrent_state_mut();
2925 let guest_thread = state.current_guest_thread()?;
2926 let lift = state
2927 .get_mut(guest_thread.task)?
2928 .lift_result
2929 .take()
2930 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2931 if !state.get_mut(guest_thread.task)?.result.is_none() {
2932 bail_bug!("task result unexpectedly already set");
2933 }
2934
2935 let CanonicalOptions {
2936 string_encoding,
2937 data_model,
2938 ..
2939 } = &self.id().get(store).component().env_component().options[options];
2940
2941 let invalid = ty != lift.ty
2942 || string_encoding != &lift.string_encoding
2943 || match data_model {
2944 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory {
2945 Some(memory) => {
2946 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut());
2947 let actual = self.id().get(store).runtime_memory(memory);
2948 expected != actual.as_ptr()
2949 }
2950 // Memory not specified, meaning it didn't need to be
2951 // specified per validation, so not invalid.
2952 None => false,
2953 },
2954 // Always invalid as this isn't supported.
2955 CanonicalOptionsDataModel::Gc { .. } => true,
2956 };
2957
2958 if invalid {
2959 bail!(Trap::TaskReturnInvalid);
2960 }
2961
2962 log::trace!("task.return for {guest_thread:?}");
2963
2964 let result = (lift.lift)(store, storage)?;
2965 self.task_complete(store, guest_thread.task, result, Status::Returned)
2966 }
2967
2968 /// Implements the `task.cancel` intrinsic.
task_cancel(self, store: &mut StoreOpaque) -> Result<()>2969 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> {
2970 let state = store.concurrent_state_mut();
2971 let guest_thread = state.current_guest_thread()?;
2972 let task = state.get_mut(guest_thread.task)?;
2973 if !task.cancel_sent {
2974 bail!(Trap::TaskCancelNotCancelled);
2975 }
2976 _ = task
2977 .lift_result
2978 .take()
2979 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?;
2980
2981 if !task.result.is_none() {
2982 bail_bug!("task result should not bet set yet");
2983 }
2984
2985 log::trace!("task.cancel for {guest_thread:?}");
2986
2987 self.task_complete(
2988 store,
2989 guest_thread.task,
2990 Box::new(DummyResult),
2991 Status::ReturnCancelled,
2992 )
2993 }
2994
2995 /// Complete the specified guest task (i.e. indicate that it has either
2996 /// returned a (possibly empty) result or cancelled itself).
2997 ///
2998 /// This will return any resource borrows and notify any current or future
2999 /// waiters that the task has completed.
task_complete( self, store: &mut StoreOpaque, guest_task: TableId<GuestTask>, result: Box<dyn Any + Send + Sync>, status: Status, ) -> Result<()>3000 fn task_complete(
3001 self,
3002 store: &mut StoreOpaque,
3003 guest_task: TableId<GuestTask>,
3004 result: Box<dyn Any + Send + Sync>,
3005 status: Status,
3006 ) -> Result<()> {
3007 store
3008 .component_resource_tables(Some(self))
3009 .validate_scope_exit()?;
3010
3011 let state = store.concurrent_state_mut();
3012 let task = state.get_mut(guest_task)?;
3013
3014 if let Caller::Host { tx, .. } = &mut task.caller {
3015 if let Some(tx) = tx.take() {
3016 _ = tx.send(result);
3017 }
3018 } else {
3019 task.result = Some(result);
3020 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?;
3021 }
3022
3023 Ok(())
3024 }
3025
3026 /// Implements the `waitable-set.new` intrinsic.
waitable_set_new( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, ) -> Result<u32>3027 pub(crate) fn waitable_set_new(
3028 self,
3029 store: &mut StoreOpaque,
3030 caller_instance: RuntimeComponentInstanceIndex,
3031 ) -> Result<u32> {
3032 let set = store.concurrent_state_mut().push(WaitableSet::default())?;
3033 let handle = store
3034 .instance_state(RuntimeInstance {
3035 instance: self.id().instance(),
3036 index: caller_instance,
3037 })
3038 .handle_table()
3039 .waitable_set_insert(set.rep())?;
3040 log::trace!("new waitable set {set:?} (handle {handle})");
3041 Ok(handle)
3042 }
3043
3044 /// Implements the `waitable-set.drop` intrinsic.
waitable_set_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, set: u32, ) -> Result<()>3045 pub(crate) fn waitable_set_drop(
3046 self,
3047 store: &mut StoreOpaque,
3048 caller_instance: RuntimeComponentInstanceIndex,
3049 set: u32,
3050 ) -> Result<()> {
3051 let rep = store
3052 .instance_state(RuntimeInstance {
3053 instance: self.id().instance(),
3054 index: caller_instance,
3055 })
3056 .handle_table()
3057 .waitable_set_remove(set)?;
3058
3059 log::trace!("drop waitable set {rep} (handle {set})");
3060
3061 let set = store
3062 .concurrent_state_mut()
3063 .delete(TableId::<WaitableSet>::new(rep))?;
3064
3065 if !set.waiting.is_empty() {
3066 bail!(Trap::WaitableSetDropHasWaiters);
3067 }
3068
3069 Ok(())
3070 }
3071
3072 /// Implements the `waitable.join` intrinsic.
waitable_join( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, waitable_handle: u32, set_handle: u32, ) -> Result<()>3073 pub(crate) fn waitable_join(
3074 self,
3075 store: &mut StoreOpaque,
3076 caller_instance: RuntimeComponentInstanceIndex,
3077 waitable_handle: u32,
3078 set_handle: u32,
3079 ) -> Result<()> {
3080 let mut instance = self.id().get_mut(store);
3081 let waitable =
3082 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?;
3083
3084 let set = if set_handle == 0 {
3085 None
3086 } else {
3087 let set = instance.instance_states().0[caller_instance]
3088 .handle_table()
3089 .waitable_set_rep(set_handle)?;
3090
3091 Some(TableId::<WaitableSet>::new(set))
3092 };
3093
3094 log::trace!(
3095 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})",
3096 );
3097
3098 waitable.join(store.concurrent_state_mut(), set)
3099 }
3100
3101 /// Implements the `subtask.drop` intrinsic.
subtask_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, task_id: u32, ) -> Result<()>3102 pub(crate) fn subtask_drop(
3103 self,
3104 store: &mut StoreOpaque,
3105 caller_instance: RuntimeComponentInstanceIndex,
3106 task_id: u32,
3107 ) -> Result<()> {
3108 self.waitable_join(store, caller_instance, task_id, 0)?;
3109
3110 let (rep, is_host) = store
3111 .instance_state(RuntimeInstance {
3112 instance: self.id().instance(),
3113 index: caller_instance,
3114 })
3115 .handle_table()
3116 .subtask_remove(task_id)?;
3117
3118 let concurrent_state = store.concurrent_state_mut();
3119 let (waitable, expected_caller, delete) = if is_host {
3120 let id = TableId::<HostTask>::new(rep);
3121 let task = concurrent_state.get_mut(id)?;
3122 match &task.state {
3123 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved),
3124 HostTaskState::CalleeDone { .. } => {}
3125 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3126 bail_bug!("invalid state for callee in `subtask.drop`")
3127 }
3128 }
3129 (Waitable::Host(id), task.caller, true)
3130 } else {
3131 let id = TableId::<GuestTask>::new(rep);
3132 let task = concurrent_state.get_mut(id)?;
3133 if task.lift_result.is_some() {
3134 bail!(Trap::SubtaskDropNotResolved);
3135 }
3136 if let Caller::Guest { thread } = task.caller {
3137 (
3138 Waitable::Guest(id),
3139 thread,
3140 concurrent_state.get_mut(id)?.exited,
3141 )
3142 } else {
3143 bail_bug!("expected guest caller for `subtask.drop`")
3144 }
3145 };
3146
3147 waitable.common(concurrent_state)?.handle = None;
3148
3149 // If this subtask has an event that means that the terminal status of
3150 // this subtask wasn't yet received so it can't be dropped yet.
3151 if waitable.take_event(concurrent_state)?.is_some() {
3152 bail!(Trap::SubtaskDropNotResolved);
3153 }
3154
3155 if delete {
3156 waitable.delete_from(concurrent_state)?;
3157 }
3158
3159 // Since waitables can neither be passed between instances nor forged,
3160 // this should never fail unless there's a bug in Wasmtime, but we check
3161 // here to be sure:
3162 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3163 log::trace!("subtask_drop {waitable:?} (handle {task_id})");
3164 Ok(())
3165 }
3166
3167 /// Implements the `waitable-set.wait` intrinsic.
waitable_set_wait( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result<u32>3168 pub(crate) fn waitable_set_wait(
3169 self,
3170 store: &mut StoreOpaque,
3171 options: OptionsIndex,
3172 set: u32,
3173 payload: u32,
3174 ) -> Result<u32> {
3175 if !self.options(store, options).async_ {
3176 // The caller may only call `waitable-set.wait` from an async task
3177 // (i.e. a task created via a call to an async export).
3178 // Otherwise, we'll trap.
3179 store.check_blocking()?;
3180 }
3181
3182 let &CanonicalOptions {
3183 cancellable,
3184 instance: caller_instance,
3185 ..
3186 } = &self.id().get(store).component().env_component().options[options];
3187 let rep = store
3188 .instance_state(RuntimeInstance {
3189 instance: self.id().instance(),
3190 index: caller_instance,
3191 })
3192 .handle_table()
3193 .waitable_set_rep(set)?;
3194
3195 self.waitable_check(
3196 store,
3197 cancellable,
3198 WaitableCheck::Wait,
3199 WaitableCheckParams {
3200 set: TableId::new(rep),
3201 options,
3202 payload,
3203 },
3204 )
3205 }
3206
3207 /// Implements the `waitable-set.poll` intrinsic.
waitable_set_poll( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result<u32>3208 pub(crate) fn waitable_set_poll(
3209 self,
3210 store: &mut StoreOpaque,
3211 options: OptionsIndex,
3212 set: u32,
3213 payload: u32,
3214 ) -> Result<u32> {
3215 let &CanonicalOptions {
3216 cancellable,
3217 instance: caller_instance,
3218 ..
3219 } = &self.id().get(store).component().env_component().options[options];
3220 let rep = store
3221 .instance_state(RuntimeInstance {
3222 instance: self.id().instance(),
3223 index: caller_instance,
3224 })
3225 .handle_table()
3226 .waitable_set_rep(set)?;
3227
3228 self.waitable_check(
3229 store,
3230 cancellable,
3231 WaitableCheck::Poll,
3232 WaitableCheckParams {
3233 set: TableId::new(rep),
3234 options,
3235 payload,
3236 },
3237 )
3238 }
3239
3240 /// Implements the `thread.index` intrinsic.
thread_index(&self, store: &mut dyn VMStore) -> Result<u32>3241 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> {
3242 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread;
3243 match store
3244 .concurrent_state_mut()
3245 .get_mut(thread_id)?
3246 .instance_rep
3247 {
3248 Some(r) => Ok(r),
3249 None => bail_bug!("thread should have instance_rep by now"),
3250 }
3251 }
3252
3253 /// Implements the `thread.new-indirect` intrinsic.
thread_new_indirect<T: 'static>( self, mut store: StoreContextMut<T>, runtime_instance: RuntimeComponentInstanceIndex, _func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>3254 pub(crate) fn thread_new_indirect<T: 'static>(
3255 self,
3256 mut store: StoreContextMut<T>,
3257 runtime_instance: RuntimeComponentInstanceIndex,
3258 _func_ty_idx: TypeFuncIndex, // currently unused
3259 start_func_table_idx: RuntimeTableIndex,
3260 start_func_idx: u32,
3261 context: i32,
3262 ) -> Result<u32> {
3263 log::trace!("creating new thread");
3264
3265 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []);
3266 let (instance, registry) = self.id().get_mut_and_registry(store.0);
3267 let callee = instance
3268 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)?
3269 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?;
3270 if callee.type_index(store.0) != start_func_ty.type_index() {
3271 bail!(Trap::ThreadNewIndirectInvalidType);
3272 }
3273
3274 let token = StoreToken::new(store.as_context_mut());
3275 let start_func = Box::new(
3276 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> {
3277 let old_thread = store.set_thread(guest_thread)?;
3278 log::trace!(
3279 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread"
3280 );
3281
3282 let mut store = token.as_context_mut(store);
3283 let mut params = [ValRaw::i32(context)];
3284 // Use call_unchecked rather than call or call_async, as we don't want to run the function
3285 // on a separate fiber if we're running in an async store.
3286 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? };
3287
3288 self.cleanup_thread(store.0, guest_thread, runtime_instance)?;
3289 log::trace!("explicit thread {guest_thread:?} completed");
3290 let state = store.0.concurrent_state_mut();
3291 let task = state.get_mut(guest_thread.task)?;
3292 if task.threads.is_empty() && !task.returned_or_cancelled() {
3293 bail!(Trap::NoAsyncResult);
3294 }
3295 store.0.set_thread(old_thread)?;
3296 let state = store.0.concurrent_state_mut();
3297 if let Some(t) = old_thread.guest() {
3298 state.get_mut(t.thread)?.state = GuestThreadState::Running;
3299 }
3300 if state.get_mut(guest_thread.task)?.ready_to_delete() {
3301 Waitable::Guest(guest_thread.task).delete_from(state)?;
3302 }
3303 log::trace!("thread start: restored {old_thread:?} as current thread");
3304
3305 Ok(())
3306 },
3307 );
3308
3309 let state = store.0.concurrent_state_mut();
3310 let current_thread = state.current_guest_thread()?;
3311 let parent_task = current_thread.task;
3312
3313 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?;
3314 let thread_id = state.push(new_thread)?;
3315 state.get_mut(parent_task)?.threads.insert(thread_id);
3316
3317 log::trace!("new thread with id {thread_id:?} created");
3318
3319 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance)
3320 }
3321
resume_thread( self, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, thread_idx: u32, high_priority: bool, allow_ready: bool, ) -> Result<()>3322 pub(crate) fn resume_thread(
3323 self,
3324 store: &mut StoreOpaque,
3325 runtime_instance: RuntimeComponentInstanceIndex,
3326 thread_idx: u32,
3327 high_priority: bool,
3328 allow_ready: bool,
3329 ) -> Result<()> {
3330 let thread_id =
3331 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?;
3332 let state = store.concurrent_state_mut();
3333 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?;
3334 let thread = state.get_mut(guest_thread.thread)?;
3335
3336 match mem::replace(&mut thread.state, GuestThreadState::Running) {
3337 GuestThreadState::NotStartedExplicit(start_func) => {
3338 log::trace!("starting thread {guest_thread:?}");
3339 let guest_call = WorkItem::GuestCall(
3340 runtime_instance,
3341 GuestCall {
3342 thread: guest_thread,
3343 kind: GuestCallKind::StartExplicit(Box::new(move |store| {
3344 start_func(store, guest_thread)
3345 })),
3346 },
3347 );
3348 store
3349 .concurrent_state_mut()
3350 .push_work_item(guest_call, high_priority);
3351 }
3352 GuestThreadState::Suspended(fiber) => {
3353 log::trace!("resuming thread {thread_id:?} that was suspended");
3354 store
3355 .concurrent_state_mut()
3356 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority);
3357 }
3358 GuestThreadState::Ready(fiber) if allow_ready => {
3359 log::trace!("resuming thread {thread_id:?} that was ready");
3360 thread.state = GuestThreadState::Ready(fiber);
3361 store
3362 .concurrent_state_mut()
3363 .promote_thread_work_item(guest_thread);
3364 }
3365 other => {
3366 thread.state = other;
3367 bail!(Trap::CannotResumeThread);
3368 }
3369 }
3370 Ok(())
3371 }
3372
add_guest_thread_to_instance_table( self, thread_id: TableId<GuestThread>, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result<u32>3373 fn add_guest_thread_to_instance_table(
3374 self,
3375 thread_id: TableId<GuestThread>,
3376 store: &mut StoreOpaque,
3377 runtime_instance: RuntimeComponentInstanceIndex,
3378 ) -> Result<u32> {
3379 let guest_id = store
3380 .instance_state(RuntimeInstance {
3381 instance: self.id().instance(),
3382 index: runtime_instance,
3383 })
3384 .thread_handle_table()
3385 .guest_thread_insert(thread_id.rep())?;
3386 store
3387 .concurrent_state_mut()
3388 .get_mut(thread_id)?
3389 .instance_rep = Some(guest_id);
3390 Ok(guest_id)
3391 }
3392
3393 /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`,
3394 /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics.
suspension_intrinsic( self, store: &mut StoreOpaque, caller: RuntimeComponentInstanceIndex, cancellable: bool, yielding: bool, to_thread: SuspensionTarget, ) -> Result<WaitResult>3395 pub(crate) fn suspension_intrinsic(
3396 self,
3397 store: &mut StoreOpaque,
3398 caller: RuntimeComponentInstanceIndex,
3399 cancellable: bool,
3400 yielding: bool,
3401 to_thread: SuspensionTarget,
3402 ) -> Result<WaitResult> {
3403 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3404 if to_thread.is_none() {
3405 let state = store.concurrent_state_mut();
3406 if yielding {
3407 // This is a `thread.yield` call
3408 if !state.may_block(guest_thread.task)? {
3409 // In a non-blocking context, a `thread.yield` may trigger
3410 // other threads in the same component instance to run.
3411 if !state.promote_instance_local_thread_work_item(caller) {
3412 // No other threads are runnable, so just return
3413 return Ok(WaitResult::Completed);
3414 }
3415 }
3416 } else {
3417 // The caller may only call `thread.suspend` from an async task
3418 // (i.e. a task created via a call to an async export).
3419 // Otherwise, we'll trap.
3420 store.check_blocking()?;
3421 }
3422 }
3423
3424 // There could be a pending cancellation from a previous uncancellable wait
3425 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3426 return Ok(WaitResult::Cancelled);
3427 }
3428
3429 match to_thread {
3430 SuspensionTarget::SomeSuspended(thread) => {
3431 self.resume_thread(store, caller, thread, true, false)?
3432 }
3433 SuspensionTarget::Some(thread) => {
3434 self.resume_thread(store, caller, thread, true, true)?
3435 }
3436 SuspensionTarget::None => { /* nothing to do */ }
3437 }
3438
3439 let reason = if yielding {
3440 SuspendReason::Yielding {
3441 thread: guest_thread,
3442 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3443 // we're handling a `thread.yield-to-suspended` call; otherwise it would
3444 // panic if we called it in a non-blocking context.
3445 skip_may_block_check: to_thread.is_some(),
3446 }
3447 } else {
3448 SuspendReason::ExplicitlySuspending {
3449 thread: guest_thread,
3450 // Tell `StoreOpaque::suspend` it's okay to suspend here since
3451 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would
3452 // panic if we called it in a non-blocking context.
3453 skip_may_block_check: to_thread.is_some(),
3454 }
3455 };
3456
3457 store.suspend(reason)?;
3458
3459 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? {
3460 Ok(WaitResult::Cancelled)
3461 } else {
3462 Ok(WaitResult::Completed)
3463 }
3464 }
3465
3466 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics.
waitable_check( self, store: &mut StoreOpaque, cancellable: bool, check: WaitableCheck, params: WaitableCheckParams, ) -> Result<u32>3467 fn waitable_check(
3468 self,
3469 store: &mut StoreOpaque,
3470 cancellable: bool,
3471 check: WaitableCheck,
3472 params: WaitableCheckParams,
3473 ) -> Result<u32> {
3474 let guest_thread = store.concurrent_state_mut().current_guest_thread()?;
3475
3476 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set);
3477
3478 let state = store.concurrent_state_mut();
3479 let task = state.get_mut(guest_thread.task)?;
3480
3481 // If we're waiting, and there are no events immediately available,
3482 // suspend the fiber until that changes.
3483 match &check {
3484 WaitableCheck::Wait => {
3485 let set = params.set;
3486
3487 if (task.event.is_none()
3488 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable))
3489 && state.get_mut(set)?.ready.is_empty()
3490 {
3491 if cancellable {
3492 let old = state
3493 .get_mut(guest_thread.thread)?
3494 .wake_on_cancel
3495 .replace(set);
3496 if !old.is_none() {
3497 bail_bug!("thread unexpectedly in a prior wake_on_cancel set");
3498 }
3499 }
3500
3501 store.suspend(SuspendReason::Waiting {
3502 set,
3503 thread: guest_thread,
3504 skip_may_block_check: false,
3505 })?;
3506 }
3507 }
3508 WaitableCheck::Poll => {}
3509 }
3510
3511 log::trace!(
3512 "waitable check for {guest_thread:?}; set {:?}, part two",
3513 params.set
3514 );
3515
3516 // Deliver any pending events to the guest and return.
3517 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?;
3518
3519 let (ordinal, handle, result) = match &check {
3520 WaitableCheck::Wait => {
3521 let (event, waitable) = match event {
3522 Some(p) => p,
3523 None => bail_bug!("event expected to be present"),
3524 };
3525 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3526 let (ordinal, result) = event.parts();
3527 (ordinal, handle, result)
3528 }
3529 WaitableCheck::Poll => {
3530 if let Some((event, waitable)) = event {
3531 let handle = waitable.map(|(_, v)| v).unwrap_or(0);
3532 let (ordinal, result) = event.parts();
3533 (ordinal, handle, result)
3534 } else {
3535 log::trace!(
3536 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}",
3537 guest_thread.task,
3538 params.set
3539 );
3540 let (ordinal, result) = Event::None.parts();
3541 (ordinal, 0, result)
3542 }
3543 }
3544 };
3545 let memory = self.options_memory_mut(store, params.options);
3546 let ptr = func::validate_inbounds_dynamic(
3547 &CanonicalAbiInfo::POINTER_PAIR,
3548 memory,
3549 &ValRaw::u32(params.payload),
3550 )?;
3551 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes());
3552 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes());
3553 Ok(ordinal)
3554 }
3555
3556 /// Implements the `subtask.cancel` intrinsic.
subtask_cancel( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, async_: bool, task_id: u32, ) -> Result<u32>3557 pub(crate) fn subtask_cancel(
3558 self,
3559 store: &mut StoreOpaque,
3560 caller_instance: RuntimeComponentInstanceIndex,
3561 async_: bool,
3562 task_id: u32,
3563 ) -> Result<u32> {
3564 if !async_ {
3565 // The caller may only sync call `subtask.cancel` from an async task
3566 // (i.e. a task created via a call to an async export). Otherwise,
3567 // we'll trap.
3568 store.check_blocking()?;
3569 }
3570
3571 let (rep, is_host) = store
3572 .instance_state(RuntimeInstance {
3573 instance: self.id().instance(),
3574 index: caller_instance,
3575 })
3576 .handle_table()
3577 .subtask_rep(task_id)?;
3578 let (waitable, expected_caller) = if is_host {
3579 let id = TableId::<HostTask>::new(rep);
3580 (
3581 Waitable::Host(id),
3582 store.concurrent_state_mut().get_mut(id)?.caller,
3583 )
3584 } else {
3585 let id = TableId::<GuestTask>::new(rep);
3586 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller {
3587 (Waitable::Guest(id), thread)
3588 } else {
3589 bail_bug!("expected guest caller for `subtask.cancel`")
3590 }
3591 };
3592 // Since waitables can neither be passed between instances nor forged,
3593 // this should never fail unless there's a bug in Wasmtime, but we check
3594 // here to be sure:
3595 let concurrent_state = store.concurrent_state_mut();
3596 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?);
3597
3598 log::trace!("subtask_cancel {waitable:?} (handle {task_id})");
3599
3600 let needs_block;
3601 if let Waitable::Host(host_task) = waitable {
3602 let state = &mut concurrent_state.get_mut(host_task)?.state;
3603 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) {
3604 // If the callee is still running, signal an abort is requested.
3605 //
3606 // After cancelling this falls through to block waiting for the
3607 // host task to actually finish assuming that `async_` is false.
3608 // This blocking behavior resolves the race of `handle.abort()`
3609 // with the task actually getting cancelled or finishing.
3610 HostTaskState::CalleeRunning(handle) => {
3611 handle.abort();
3612 needs_block = true;
3613 }
3614
3615 // Cancellation was already requested, so fail as the task can't
3616 // be cancelled twice.
3617 HostTaskState::CalleeDone { cancelled } => {
3618 if cancelled {
3619 bail!(Trap::SubtaskCancelAfterTerminal);
3620 } else {
3621 // The callee is already done so there's no need to
3622 // block further for an event.
3623 needs_block = false;
3624 }
3625 }
3626
3627 // These states should not be possible for a subtask that's
3628 // visible from the guest, so trap here.
3629 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => {
3630 bail_bug!("invalid states for host callee")
3631 }
3632 }
3633 } else {
3634 let caller = concurrent_state.current_guest_thread()?;
3635 let guest_task = TableId::<GuestTask>::new(rep);
3636 let task = concurrent_state.get_mut(guest_task)?;
3637 if !task.already_lowered_parameters() {
3638 // The task is in a `starting` state, meaning it hasn't run at
3639 // all yet. Here we update its fields to indicate that it is
3640 // ready to delete immediately once `subtask.drop` is called.
3641 task.lower_params = None;
3642 task.lift_result = None;
3643 task.exited = true;
3644
3645 let instance = task.instance;
3646
3647 assert_eq!(1, task.threads.len());
3648 let thread = mem::take(&mut task.threads).into_iter().next().unwrap();
3649 let concurrent_state = store.concurrent_state_mut();
3650 concurrent_state.delete(thread)?;
3651 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete());
3652
3653 // Not yet started; cancel and remove from pending
3654 let pending = &mut store.instance_state(instance).concurrent_state().pending;
3655 let pending_count = pending.len();
3656 pending.retain(|thread, _| thread.task != guest_task);
3657 // If there were no pending threads for this task, we're in an error state
3658 if pending.len() == pending_count {
3659 bail!(Trap::SubtaskCancelAfterTerminal);
3660 }
3661 return Ok(Status::StartCancelled as u32);
3662 } else if !task.returned_or_cancelled() {
3663 // Started, but not yet returned or cancelled; send the
3664 // `CANCELLED` event
3665 task.cancel_sent = true;
3666 // Note that this might overwrite an event that was set earlier
3667 // (e.g. `Event::None` if the task is yielding, or
3668 // `Event::Cancelled` if it was already cancelled), but that's
3669 // okay -- this should supersede the previous state.
3670 task.event = Some(Event::Cancelled);
3671 let runtime_instance = task.instance.index;
3672 for thread in task.threads.clone() {
3673 let thread = QualifiedThreadId {
3674 task: guest_task,
3675 thread,
3676 };
3677 if let Some(set) = concurrent_state
3678 .get_mut(thread.thread)?
3679 .wake_on_cancel
3680 .take()
3681 {
3682 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) {
3683 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber),
3684 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall(
3685 runtime_instance,
3686 GuestCall {
3687 thread,
3688 kind: GuestCallKind::DeliverEvent {
3689 instance,
3690 set: None,
3691 },
3692 },
3693 ),
3694 None => bail_bug!("thread not present in wake_on_cancel set"),
3695 };
3696 concurrent_state.push_high_priority(item);
3697
3698 store.suspend(SuspendReason::Yielding {
3699 thread: caller,
3700 // `subtask.cancel` is not allowed to be called in a
3701 // sync context, so we cannot skip the may-block check.
3702 skip_may_block_check: false,
3703 })?;
3704 break;
3705 }
3706 }
3707
3708 // Guest tasks need to block if they have not yet returned or
3709 // cancelled, even as a result of the event delivery above.
3710 needs_block = !store
3711 .concurrent_state_mut()
3712 .get_mut(guest_task)?
3713 .returned_or_cancelled()
3714 } else {
3715 needs_block = false;
3716 }
3717 };
3718
3719 // If we need to block waiting on the terminal status of this subtask
3720 // then return immediately in `async` mode, or otherwise wait for the
3721 // event to get signaled through the store.
3722 if needs_block {
3723 if async_ {
3724 return Ok(BLOCKED);
3725 }
3726
3727 // Wait for this waitable to get signaled with its terminal status
3728 // from the completion callback enqueued by `first_poll`. Once
3729 // that's done fall through to the sahred
3730 store.wait_for_event(waitable)?;
3731
3732 // .. fall through to determine what event's in store for us.
3733 }
3734
3735 let event = waitable.take_event(store.concurrent_state_mut())?;
3736 if let Some(Event::Subtask {
3737 status: status @ (Status::Returned | Status::ReturnCancelled),
3738 }) = event
3739 {
3740 Ok(status as u32)
3741 } else {
3742 bail!(Trap::SubtaskCancelAfterTerminal);
3743 }
3744 }
3745
context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32>3746 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> {
3747 store.concurrent_state_mut().context_get(slot)
3748 }
3749
context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()>3750 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> {
3751 store.concurrent_state_mut().context_set(slot, value)
3752 }
3753 }
3754
3755 /// Trait representing component model ABI async intrinsics and fused adapter
3756 /// helper functions.
3757 ///
3758 /// SAFETY (callers): Most of the methods in this trait accept raw pointers,
3759 /// which must be valid for at least the duration of the call (and possibly for
3760 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef`
3761 /// pointers used for async calls).
3762 pub trait VMComponentAsyncStore {
3763 /// A helper function for fused adapter modules involving calls where the
3764 /// one of the caller or callee is async.
3765 ///
3766 /// This helper is not used when the caller and callee both use the sync
3767 /// ABI, only when at least one is async is this used.
prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()>3768 unsafe fn prepare_call(
3769 &mut self,
3770 instance: Instance,
3771 memory: *mut VMMemoryDefinition,
3772 start: NonNull<VMFuncRef>,
3773 return_: NonNull<VMFuncRef>,
3774 caller_instance: RuntimeComponentInstanceIndex,
3775 callee_instance: RuntimeComponentInstanceIndex,
3776 task_return_type: TypeTupleIndex,
3777 callee_async: bool,
3778 string_encoding: StringEncoding,
3779 result_count: u32,
3780 storage: *mut ValRaw,
3781 storage_len: usize,
3782 ) -> Result<()>;
3783
3784 /// A helper function for fused adapter modules involving calls where the
3785 /// caller is sync-lowered but the callee is async-lifted.
sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, storage: *mut MaybeUninit<ValRaw>, storage_len: usize, ) -> Result<()>3786 unsafe fn sync_start(
3787 &mut self,
3788 instance: Instance,
3789 callback: *mut VMFuncRef,
3790 callee: NonNull<VMFuncRef>,
3791 param_count: u32,
3792 storage: *mut MaybeUninit<ValRaw>,
3793 storage_len: usize,
3794 ) -> Result<()>;
3795
3796 /// A helper function for fused adapter modules involving calls where the
3797 /// caller is async-lowered.
async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, ) -> Result<u32>3798 unsafe fn async_start(
3799 &mut self,
3800 instance: Instance,
3801 callback: *mut VMFuncRef,
3802 post_return: *mut VMFuncRef,
3803 callee: NonNull<VMFuncRef>,
3804 param_count: u32,
3805 result_count: u32,
3806 flags: u32,
3807 ) -> Result<u32>;
3808
3809 /// The `future.write` intrinsic.
future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>3810 fn future_write(
3811 &mut self,
3812 instance: Instance,
3813 caller: RuntimeComponentInstanceIndex,
3814 ty: TypeFutureTableIndex,
3815 options: OptionsIndex,
3816 future: u32,
3817 address: u32,
3818 ) -> Result<u32>;
3819
3820 /// The `future.read` intrinsic.
future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>3821 fn future_read(
3822 &mut self,
3823 instance: Instance,
3824 caller: RuntimeComponentInstanceIndex,
3825 ty: TypeFutureTableIndex,
3826 options: OptionsIndex,
3827 future: u32,
3828 address: u32,
3829 ) -> Result<u32>;
3830
3831 /// The `future.drop-writable` intrinsic.
future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()>3832 fn future_drop_writable(
3833 &mut self,
3834 instance: Instance,
3835 ty: TypeFutureTableIndex,
3836 writer: u32,
3837 ) -> Result<()>;
3838
3839 /// The `stream.write` intrinsic.
stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>3840 fn stream_write(
3841 &mut self,
3842 instance: Instance,
3843 caller: RuntimeComponentInstanceIndex,
3844 ty: TypeStreamTableIndex,
3845 options: OptionsIndex,
3846 stream: u32,
3847 address: u32,
3848 count: u32,
3849 ) -> Result<u32>;
3850
3851 /// The `stream.read` intrinsic.
stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>3852 fn stream_read(
3853 &mut self,
3854 instance: Instance,
3855 caller: RuntimeComponentInstanceIndex,
3856 ty: TypeStreamTableIndex,
3857 options: OptionsIndex,
3858 stream: u32,
3859 address: u32,
3860 count: u32,
3861 ) -> Result<u32>;
3862
3863 /// The "fast-path" implementation of the `stream.write` intrinsic for
3864 /// "flat" (i.e. memcpy-able) payloads.
flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>3865 fn flat_stream_write(
3866 &mut self,
3867 instance: Instance,
3868 caller: RuntimeComponentInstanceIndex,
3869 ty: TypeStreamTableIndex,
3870 options: OptionsIndex,
3871 payload_size: u32,
3872 payload_align: u32,
3873 stream: u32,
3874 address: u32,
3875 count: u32,
3876 ) -> Result<u32>;
3877
3878 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat"
3879 /// (i.e. memcpy-able) payloads.
flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>3880 fn flat_stream_read(
3881 &mut self,
3882 instance: Instance,
3883 caller: RuntimeComponentInstanceIndex,
3884 ty: TypeStreamTableIndex,
3885 options: OptionsIndex,
3886 payload_size: u32,
3887 payload_align: u32,
3888 stream: u32,
3889 address: u32,
3890 count: u32,
3891 ) -> Result<u32>;
3892
3893 /// The `stream.drop-writable` intrinsic.
stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()>3894 fn stream_drop_writable(
3895 &mut self,
3896 instance: Instance,
3897 ty: TypeStreamTableIndex,
3898 writer: u32,
3899 ) -> Result<()>;
3900
3901 /// The `error-context.debug-message` intrinsic.
error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>3902 fn error_context_debug_message(
3903 &mut self,
3904 instance: Instance,
3905 ty: TypeComponentLocalErrorContextTableIndex,
3906 options: OptionsIndex,
3907 err_ctx_handle: u32,
3908 debug_msg_address: u32,
3909 ) -> Result<()>;
3910
3911 /// The `thread.new-indirect` intrinsic
thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>3912 fn thread_new_indirect(
3913 &mut self,
3914 instance: Instance,
3915 caller: RuntimeComponentInstanceIndex,
3916 func_ty_idx: TypeFuncIndex,
3917 start_func_table_idx: RuntimeTableIndex,
3918 start_func_idx: u32,
3919 context: i32,
3920 ) -> Result<u32>;
3921 }
3922
3923 /// SAFETY: See trait docs.
3924 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> {
prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull<VMFuncRef>, return_: NonNull<VMFuncRef>, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count_or_max_if_async: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()>3925 unsafe fn prepare_call(
3926 &mut self,
3927 instance: Instance,
3928 memory: *mut VMMemoryDefinition,
3929 start: NonNull<VMFuncRef>,
3930 return_: NonNull<VMFuncRef>,
3931 caller_instance: RuntimeComponentInstanceIndex,
3932 callee_instance: RuntimeComponentInstanceIndex,
3933 task_return_type: TypeTupleIndex,
3934 callee_async: bool,
3935 string_encoding: StringEncoding,
3936 result_count_or_max_if_async: u32,
3937 storage: *mut ValRaw,
3938 storage_len: usize,
3939 ) -> Result<()> {
3940 // SAFETY: The `wasmtime_cranelift`-generated code that calls
3941 // this method will have ensured that `storage` is a valid
3942 // pointer containing at least `storage_len` items.
3943 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec();
3944
3945 unsafe {
3946 instance.prepare_call(
3947 StoreContextMut(self),
3948 start,
3949 return_,
3950 caller_instance,
3951 callee_instance,
3952 task_return_type,
3953 callee_async,
3954 memory,
3955 string_encoding,
3956 match result_count_or_max_if_async {
3957 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async {
3958 params,
3959 has_result: false,
3960 },
3961 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async {
3962 params,
3963 has_result: true,
3964 },
3965 result_count => CallerInfo::Sync {
3966 params,
3967 result_count,
3968 },
3969 },
3970 )
3971 }
3972 }
3973
sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, storage: *mut MaybeUninit<ValRaw>, storage_len: usize, ) -> Result<()>3974 unsafe fn sync_start(
3975 &mut self,
3976 instance: Instance,
3977 callback: *mut VMFuncRef,
3978 callee: NonNull<VMFuncRef>,
3979 param_count: u32,
3980 storage: *mut MaybeUninit<ValRaw>,
3981 storage_len: usize,
3982 ) -> Result<()> {
3983 unsafe {
3984 instance
3985 .start_call(
3986 StoreContextMut(self),
3987 callback,
3988 ptr::null_mut(),
3989 callee,
3990 param_count,
3991 1,
3992 START_FLAG_ASYNC_CALLEE,
3993 // SAFETY: The `wasmtime_cranelift`-generated code that calls
3994 // this method will have ensured that `storage` is a valid
3995 // pointer containing at least `storage_len` items.
3996 Some(std::slice::from_raw_parts_mut(storage, storage_len)),
3997 )
3998 .map(drop)
3999 }
4000 }
4001
async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull<VMFuncRef>, param_count: u32, result_count: u32, flags: u32, ) -> Result<u32>4002 unsafe fn async_start(
4003 &mut self,
4004 instance: Instance,
4005 callback: *mut VMFuncRef,
4006 post_return: *mut VMFuncRef,
4007 callee: NonNull<VMFuncRef>,
4008 param_count: u32,
4009 result_count: u32,
4010 flags: u32,
4011 ) -> Result<u32> {
4012 unsafe {
4013 instance.start_call(
4014 StoreContextMut(self),
4015 callback,
4016 post_return,
4017 callee,
4018 param_count,
4019 result_count,
4020 flags,
4021 None,
4022 )
4023 }
4024 }
4025
future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>4026 fn future_write(
4027 &mut self,
4028 instance: Instance,
4029 caller: RuntimeComponentInstanceIndex,
4030 ty: TypeFutureTableIndex,
4031 options: OptionsIndex,
4032 future: u32,
4033 address: u32,
4034 ) -> Result<u32> {
4035 instance
4036 .guest_write(
4037 StoreContextMut(self),
4038 caller,
4039 TransmitIndex::Future(ty),
4040 options,
4041 None,
4042 future,
4043 address,
4044 1,
4045 )
4046 .map(|result| result.encode())
4047 }
4048
future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result<u32>4049 fn future_read(
4050 &mut self,
4051 instance: Instance,
4052 caller: RuntimeComponentInstanceIndex,
4053 ty: TypeFutureTableIndex,
4054 options: OptionsIndex,
4055 future: u32,
4056 address: u32,
4057 ) -> Result<u32> {
4058 instance
4059 .guest_read(
4060 StoreContextMut(self),
4061 caller,
4062 TransmitIndex::Future(ty),
4063 options,
4064 None,
4065 future,
4066 address,
4067 1,
4068 )
4069 .map(|result| result.encode())
4070 }
4071
stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>4072 fn stream_write(
4073 &mut self,
4074 instance: Instance,
4075 caller: RuntimeComponentInstanceIndex,
4076 ty: TypeStreamTableIndex,
4077 options: OptionsIndex,
4078 stream: u32,
4079 address: u32,
4080 count: u32,
4081 ) -> Result<u32> {
4082 instance
4083 .guest_write(
4084 StoreContextMut(self),
4085 caller,
4086 TransmitIndex::Stream(ty),
4087 options,
4088 None,
4089 stream,
4090 address,
4091 count,
4092 )
4093 .map(|result| result.encode())
4094 }
4095
stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result<u32>4096 fn stream_read(
4097 &mut self,
4098 instance: Instance,
4099 caller: RuntimeComponentInstanceIndex,
4100 ty: TypeStreamTableIndex,
4101 options: OptionsIndex,
4102 stream: u32,
4103 address: u32,
4104 count: u32,
4105 ) -> Result<u32> {
4106 instance
4107 .guest_read(
4108 StoreContextMut(self),
4109 caller,
4110 TransmitIndex::Stream(ty),
4111 options,
4112 None,
4113 stream,
4114 address,
4115 count,
4116 )
4117 .map(|result| result.encode())
4118 }
4119
future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()>4120 fn future_drop_writable(
4121 &mut self,
4122 instance: Instance,
4123 ty: TypeFutureTableIndex,
4124 writer: u32,
4125 ) -> Result<()> {
4126 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer)
4127 }
4128
flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>4129 fn flat_stream_write(
4130 &mut self,
4131 instance: Instance,
4132 caller: RuntimeComponentInstanceIndex,
4133 ty: TypeStreamTableIndex,
4134 options: OptionsIndex,
4135 payload_size: u32,
4136 payload_align: u32,
4137 stream: u32,
4138 address: u32,
4139 count: u32,
4140 ) -> Result<u32> {
4141 instance
4142 .guest_write(
4143 StoreContextMut(self),
4144 caller,
4145 TransmitIndex::Stream(ty),
4146 options,
4147 Some(FlatAbi {
4148 size: payload_size,
4149 align: payload_align,
4150 }),
4151 stream,
4152 address,
4153 count,
4154 )
4155 .map(|result| result.encode())
4156 }
4157
flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result<u32>4158 fn flat_stream_read(
4159 &mut self,
4160 instance: Instance,
4161 caller: RuntimeComponentInstanceIndex,
4162 ty: TypeStreamTableIndex,
4163 options: OptionsIndex,
4164 payload_size: u32,
4165 payload_align: u32,
4166 stream: u32,
4167 address: u32,
4168 count: u32,
4169 ) -> Result<u32> {
4170 instance
4171 .guest_read(
4172 StoreContextMut(self),
4173 caller,
4174 TransmitIndex::Stream(ty),
4175 options,
4176 Some(FlatAbi {
4177 size: payload_size,
4178 align: payload_align,
4179 }),
4180 stream,
4181 address,
4182 count,
4183 )
4184 .map(|result| result.encode())
4185 }
4186
stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()>4187 fn stream_drop_writable(
4188 &mut self,
4189 instance: Instance,
4190 ty: TypeStreamTableIndex,
4191 writer: u32,
4192 ) -> Result<()> {
4193 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer)
4194 }
4195
error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>4196 fn error_context_debug_message(
4197 &mut self,
4198 instance: Instance,
4199 ty: TypeComponentLocalErrorContextTableIndex,
4200 options: OptionsIndex,
4201 err_ctx_handle: u32,
4202 debug_msg_address: u32,
4203 ) -> Result<()> {
4204 instance.error_context_debug_message(
4205 StoreContextMut(self),
4206 ty,
4207 options,
4208 err_ctx_handle,
4209 debug_msg_address,
4210 )
4211 }
4212
thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result<u32>4213 fn thread_new_indirect(
4214 &mut self,
4215 instance: Instance,
4216 caller: RuntimeComponentInstanceIndex,
4217 func_ty_idx: TypeFuncIndex,
4218 start_func_table_idx: RuntimeTableIndex,
4219 start_func_idx: u32,
4220 context: i32,
4221 ) -> Result<u32> {
4222 instance.thread_new_indirect(
4223 StoreContextMut(self),
4224 caller,
4225 func_ty_idx,
4226 start_func_table_idx,
4227 start_func_idx,
4228 context,
4229 )
4230 }
4231 }
4232
4233 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>;
4234
4235 /// Represents the state of a pending host task.
4236 ///
4237 /// This is used to represent tasks when the guest calls into the host.
4238 pub(crate) struct HostTask {
4239 common: WaitableCommon,
4240
4241 /// Guest thread which called the host.
4242 caller: QualifiedThreadId,
4243
4244 /// State of borrows/etc the host needs to track. Used when the guest passes
4245 /// borrows to the host, for example.
4246 call_context: CallContext,
4247
4248 state: HostTaskState,
4249 }
4250
4251 enum HostTaskState {
4252 /// A host task has been created and it's considered "started".
4253 ///
4254 /// The host task has yet to enter `first_poll` or `poll_and_block` which
4255 /// is where this will get updated further.
4256 CalleeStarted,
4257
4258 /// State used for tasks in `first_poll` meaning that the guest did an async
4259 /// lower of a host async function which is blocked. The specified handle is
4260 /// linked to the future in the main `FuturesUnordered` of a store which is
4261 /// used to cancel it if the guest requests cancellation.
4262 CalleeRunning(JoinHandle),
4263
4264 /// Terminal state used for tasks in `poll_and_block` to store the result of
4265 /// their computation. Note that this state is not used for tasks in
4266 /// `first_poll`.
4267 CalleeFinished(LiftedResult),
4268
4269 /// Terminal state for host tasks meaning that the task was cancelled or the
4270 /// result was taken.
4271 CalleeDone { cancelled: bool },
4272 }
4273
4274 impl HostTask {
new(caller: QualifiedThreadId, state: HostTaskState) -> Self4275 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self {
4276 Self {
4277 common: WaitableCommon::default(),
4278 call_context: CallContext::default(),
4279 caller,
4280 state,
4281 }
4282 }
4283 }
4284
4285 impl TableDebug for HostTask {
type_name() -> &'static str4286 fn type_name() -> &'static str {
4287 "HostTask"
4288 }
4289 }
4290
4291 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>;
4292
4293 /// Represents the caller of a given guest task.
4294 enum Caller {
4295 /// The host called the guest task.
4296 Host {
4297 /// If present, may be used to deliver the result.
4298 tx: Option<oneshot::Sender<LiftedResult>>,
4299 /// If true, there's a host future that must be dropped before the task
4300 /// can be deleted.
4301 host_future_present: bool,
4302 /// Represents the caller of the host function which called back into a
4303 /// guest. Note that this thread could belong to an entirely unrelated
4304 /// top-level component instance than the one the host called into.
4305 caller: CurrentThread,
4306 },
4307 /// Another guest thread called the guest task
4308 Guest {
4309 /// The id of the caller
4310 thread: QualifiedThreadId,
4311 },
4312 }
4313
4314 /// Represents a closure and related canonical ABI parameters required to
4315 /// validate a `task.return` call at runtime and lift the result.
4316 struct LiftResult {
4317 lift: RawLift,
4318 ty: TypeTupleIndex,
4319 memory: Option<SendSyncPtr<VMMemoryDefinition>>,
4320 string_encoding: StringEncoding,
4321 }
4322
4323 /// The table ID for a guest thread, qualified by the task to which it belongs.
4324 ///
4325 /// This exists to minimize table lookups and the necessity to pass stores around mutably
4326 /// for the common case of identifying the task to which a thread belongs.
4327 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4328 pub(crate) struct QualifiedThreadId {
4329 task: TableId<GuestTask>,
4330 thread: TableId<GuestThread>,
4331 }
4332
4333 impl QualifiedThreadId {
qualify( state: &mut ConcurrentState, thread: TableId<GuestThread>, ) -> Result<QualifiedThreadId>4334 fn qualify(
4335 state: &mut ConcurrentState,
4336 thread: TableId<GuestThread>,
4337 ) -> Result<QualifiedThreadId> {
4338 Ok(QualifiedThreadId {
4339 task: state.get_mut(thread)?.parent_task,
4340 thread,
4341 })
4342 }
4343 }
4344
4345 impl fmt::Debug for QualifiedThreadId {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result4346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4347 f.debug_tuple("QualifiedThreadId")
4348 .field(&self.task.rep())
4349 .field(&self.thread.rep())
4350 .finish()
4351 }
4352 }
4353
4354 enum GuestThreadState {
4355 NotStartedImplicit,
4356 NotStartedExplicit(
4357 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>,
4358 ),
4359 Running,
4360 Suspended(StoreFiber<'static>),
4361 Ready(StoreFiber<'static>),
4362 Completed,
4363 }
4364 pub struct GuestThread {
4365 /// Context-local state used to implement the `context.{get,set}`
4366 /// intrinsics.
4367 context: [u32; 2],
4368 /// The owning guest task.
4369 parent_task: TableId<GuestTask>,
4370 /// If present, indicates that the thread is currently waiting on the
4371 /// specified set but may be cancelled and woken immediately.
4372 wake_on_cancel: Option<TableId<WaitableSet>>,
4373 /// The execution state of this guest thread
4374 state: GuestThreadState,
4375 /// The index of this thread in the component instance's handle table.
4376 /// This must always be `Some` after initialization.
4377 instance_rep: Option<u32>,
4378 /// Scratch waitable set used to watch subtasks during synchronous calls.
4379 sync_call_set: TableId<WaitableSet>,
4380 }
4381
4382 impl GuestThread {
4383 /// Retrieve the `GuestThread` corresponding to the specified guest-visible
4384 /// handle.
from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, guest_thread: u32, ) -> Result<TableId<Self>>4385 fn from_instance(
4386 state: Pin<&mut ComponentInstance>,
4387 caller_instance: RuntimeComponentInstanceIndex,
4388 guest_thread: u32,
4389 ) -> Result<TableId<Self>> {
4390 let rep = state.instance_states().0[caller_instance]
4391 .thread_handle_table()
4392 .guest_thread_rep(guest_thread)?;
4393 Ok(TableId::new(rep))
4394 }
4395
new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self>4396 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> {
4397 let sync_call_set = state.push(WaitableSet::default())?;
4398 Ok(Self {
4399 context: [0; 2],
4400 parent_task,
4401 wake_on_cancel: None,
4402 state: GuestThreadState::NotStartedImplicit,
4403 instance_rep: None,
4404 sync_call_set,
4405 })
4406 }
4407
new_explicit( state: &mut ConcurrentState, parent_task: TableId<GuestTask>, start_func: Box< dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, >, ) -> Result<Self>4408 fn new_explicit(
4409 state: &mut ConcurrentState,
4410 parent_task: TableId<GuestTask>,
4411 start_func: Box<
4412 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync,
4413 >,
4414 ) -> Result<Self> {
4415 let sync_call_set = state.push(WaitableSet::default())?;
4416 Ok(Self {
4417 context: [0; 2],
4418 parent_task,
4419 wake_on_cancel: None,
4420 state: GuestThreadState::NotStartedExplicit(start_func),
4421 instance_rep: None,
4422 sync_call_set,
4423 })
4424 }
4425 }
4426
4427 impl TableDebug for GuestThread {
type_name() -> &'static str4428 fn type_name() -> &'static str {
4429 "GuestThread"
4430 }
4431 }
4432
4433 enum SyncResult {
4434 NotProduced,
4435 Produced(Option<ValRaw>),
4436 Taken,
4437 }
4438
4439 impl SyncResult {
take(&mut self) -> Result<Option<Option<ValRaw>>>4440 fn take(&mut self) -> Result<Option<Option<ValRaw>>> {
4441 Ok(match mem::replace(self, SyncResult::Taken) {
4442 SyncResult::NotProduced => None,
4443 SyncResult::Produced(val) => Some(val),
4444 SyncResult::Taken => {
4445 bail_bug!("attempted to take a synchronous result that was already taken")
4446 }
4447 })
4448 }
4449 }
4450
4451 #[derive(Debug)]
4452 enum HostFutureState {
4453 NotApplicable,
4454 Live,
4455 Dropped,
4456 }
4457
4458 /// Represents a pending guest task.
4459 pub(crate) struct GuestTask {
4460 /// See `WaitableCommon`
4461 common: WaitableCommon,
4462 /// Closure to lower the parameters passed to this task.
4463 lower_params: Option<RawLower>,
4464 /// See `LiftResult`
4465 lift_result: Option<LiftResult>,
4466 /// A place to stash the type-erased lifted result if it can't be delivered
4467 /// immediately.
4468 result: Option<LiftedResult>,
4469 /// Closure to call the callback function for an async-lifted export, if
4470 /// provided.
4471 callback: Option<CallbackFn>,
4472 /// See `Caller`
4473 caller: Caller,
4474 /// Borrow state for this task.
4475 ///
4476 /// Keeps track of `borrow<T>` received to this task to ensure that
4477 /// everything is dropped by the time it exits.
4478 call_context: CallContext,
4479 /// A place to stash the lowered result for a sync-to-async call until it
4480 /// can be returned to the caller.
4481 sync_result: SyncResult,
4482 /// Whether or not the task has been cancelled (i.e. whether the task is
4483 /// permitted to call `task.cancel`).
4484 cancel_sent: bool,
4485 /// Whether or not we've sent a `Status::Starting` event to any current or
4486 /// future waiters for this waitable.
4487 starting_sent: bool,
4488 /// The runtime instance to which the exported function for this guest task
4489 /// belongs.
4490 ///
4491 /// Note that the task may do a sync->sync call via a fused adapter which
4492 /// results in that task executing code in a different instance, and it may
4493 /// call host functions and intrinsics from that other instance.
4494 instance: RuntimeInstance,
4495 /// If present, a pending `Event::None` or `Event::Cancelled` to be
4496 /// delivered to this task.
4497 event: Option<Event>,
4498 /// Whether or not the task has exited.
4499 exited: bool,
4500 /// Threads belonging to this task
4501 threads: HashSet<TableId<GuestThread>>,
4502 /// The state of the host future that represents an async task, which must
4503 /// be dropped before we can delete the task.
4504 host_future_state: HostFutureState,
4505 /// Indicates whether this task was created for a call to an async-lifted
4506 /// export.
4507 async_function: bool,
4508 }
4509
4510 impl GuestTask {
already_lowered_parameters(&self) -> bool4511 fn already_lowered_parameters(&self) -> bool {
4512 // We reset `lower_params` after we lower the parameters
4513 self.lower_params.is_none()
4514 }
4515
returned_or_cancelled(&self) -> bool4516 fn returned_or_cancelled(&self) -> bool {
4517 // We reset `lift_result` after we return or exit
4518 self.lift_result.is_none()
4519 }
4520
ready_to_delete(&self) -> bool4521 fn ready_to_delete(&self) -> bool {
4522 let threads_completed = self.threads.is_empty();
4523 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_));
4524 let pending_completion_event = matches!(
4525 self.common.event,
4526 Some(Event::Subtask {
4527 status: Status::Returned | Status::ReturnCancelled
4528 })
4529 );
4530 let ready = threads_completed
4531 && !has_sync_result
4532 && !pending_completion_event
4533 && !matches!(self.host_future_state, HostFutureState::Live);
4534 log::trace!(
4535 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})",
4536 threads_completed,
4537 has_sync_result,
4538 pending_completion_event,
4539 self.host_future_state
4540 );
4541 ready
4542 }
4543
new( lower_params: RawLower, lift_result: LiftResult, caller: Caller, callback: Option<CallbackFn>, instance: RuntimeInstance, async_function: bool, ) -> Result<Self>4544 fn new(
4545 lower_params: RawLower,
4546 lift_result: LiftResult,
4547 caller: Caller,
4548 callback: Option<CallbackFn>,
4549 instance: RuntimeInstance,
4550 async_function: bool,
4551 ) -> Result<Self> {
4552 let host_future_state = match &caller {
4553 Caller::Guest { .. } => HostFutureState::NotApplicable,
4554 Caller::Host {
4555 host_future_present,
4556 ..
4557 } => {
4558 if *host_future_present {
4559 HostFutureState::Live
4560 } else {
4561 HostFutureState::NotApplicable
4562 }
4563 }
4564 };
4565 Ok(Self {
4566 common: WaitableCommon::default(),
4567 lower_params: Some(lower_params),
4568 lift_result: Some(lift_result),
4569 result: None,
4570 callback,
4571 caller,
4572 call_context: CallContext::default(),
4573 sync_result: SyncResult::NotProduced,
4574 cancel_sent: false,
4575 starting_sent: false,
4576 instance,
4577 event: None,
4578 exited: false,
4579 threads: HashSet::new(),
4580 host_future_state,
4581 async_function,
4582 })
4583 }
4584
4585 /// Dispose of this guest task.
dispose(self, _state: &mut ConcurrentState) -> Result<()>4586 fn dispose(self, _state: &mut ConcurrentState) -> Result<()> {
4587 assert!(self.threads.is_empty());
4588 Ok(())
4589 }
4590 }
4591
4592 impl TableDebug for GuestTask {
type_name() -> &'static str4593 fn type_name() -> &'static str {
4594 "GuestTask"
4595 }
4596 }
4597
4598 /// Represents state common to all kinds of waitables.
4599 #[derive(Default)]
4600 struct WaitableCommon {
4601 /// The currently pending event for this waitable, if any.
4602 event: Option<Event>,
4603 /// The set to which this waitable belongs, if any.
4604 set: Option<TableId<WaitableSet>>,
4605 /// The handle with which the guest refers to this waitable, if any.
4606 handle: Option<u32>,
4607 }
4608
4609 /// Represents a Component Model Async `waitable`.
4610 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
4611 enum Waitable {
4612 /// A host task
4613 Host(TableId<HostTask>),
4614 /// A guest task
4615 Guest(TableId<GuestTask>),
4616 /// The read or write end of a stream or future
4617 Transmit(TableId<TransmitHandle>),
4618 }
4619
4620 impl Waitable {
4621 /// Retrieve the `Waitable` corresponding to the specified guest-visible
4622 /// handle.
from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, waitable: u32, ) -> Result<Self>4623 fn from_instance(
4624 state: Pin<&mut ComponentInstance>,
4625 caller_instance: RuntimeComponentInstanceIndex,
4626 waitable: u32,
4627 ) -> Result<Self> {
4628 use crate::runtime::vm::component::Waitable;
4629
4630 let (waitable, kind) = state.instance_states().0[caller_instance]
4631 .handle_table()
4632 .waitable_rep(waitable)?;
4633
4634 Ok(match kind {
4635 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)),
4636 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)),
4637 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)),
4638 })
4639 }
4640
4641 /// Retrieve the host-visible identifier for this `Waitable`.
rep(&self) -> u324642 fn rep(&self) -> u32 {
4643 match self {
4644 Self::Host(id) => id.rep(),
4645 Self::Guest(id) => id.rep(),
4646 Self::Transmit(id) => id.rep(),
4647 }
4648 }
4649
4650 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or
4651 /// remove it from any set it may currently belong to (when `set` is
4652 /// `None`).
join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()>4653 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> {
4654 log::trace!("waitable {self:?} join set {set:?}",);
4655
4656 let old = mem::replace(&mut self.common(state)?.set, set);
4657
4658 if let Some(old) = old {
4659 match *self {
4660 Waitable::Host(id) => state.remove_child(id, old),
4661 Waitable::Guest(id) => state.remove_child(id, old),
4662 Waitable::Transmit(id) => state.remove_child(id, old),
4663 }?;
4664
4665 state.get_mut(old)?.ready.remove(self);
4666 }
4667
4668 if let Some(set) = set {
4669 match *self {
4670 Waitable::Host(id) => state.add_child(id, set),
4671 Waitable::Guest(id) => state.add_child(id, set),
4672 Waitable::Transmit(id) => state.add_child(id, set),
4673 }?;
4674
4675 if self.common(state)?.event.is_some() {
4676 self.mark_ready(state)?;
4677 }
4678 }
4679
4680 Ok(())
4681 }
4682
4683 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`.
common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon>4684 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> {
4685 Ok(match self {
4686 Self::Host(id) => &mut state.get_mut(*id)?.common,
4687 Self::Guest(id) => &mut state.get_mut(*id)?.common,
4688 Self::Transmit(id) => &mut state.get_mut(*id)?.common,
4689 })
4690 }
4691
4692 /// Set or clear the pending event for this waitable and either deliver it
4693 /// to the first waiter, if any, or mark it as ready to be delivered to the
4694 /// next waiter that arrives.
set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()>4695 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> {
4696 log::trace!("set event for {self:?}: {event:?}");
4697 self.common(state)?.event = event;
4698 self.mark_ready(state)
4699 }
4700
4701 /// Take the pending event from this waitable, leaving `None` in its place.
take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>>4702 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> {
4703 let common = self.common(state)?;
4704 let event = common.event.take();
4705 if let Some(set) = self.common(state)?.set {
4706 state.get_mut(set)?.ready.remove(self);
4707 }
4708
4709 Ok(event)
4710 }
4711
4712 /// Deliver the current event for this waitable to the first waiter, if any,
4713 /// or else mark it as ready to be delivered to the next waiter that
4714 /// arrives.
mark_ready(&self, state: &mut ConcurrentState) -> Result<()>4715 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> {
4716 if let Some(set) = self.common(state)?.set {
4717 state.get_mut(set)?.ready.insert(*self);
4718 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() {
4719 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take();
4720 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set));
4721
4722 let item = match mode {
4723 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber),
4724 WaitMode::Callback(instance) => WorkItem::GuestCall(
4725 state.get_mut(thread.task)?.instance.index,
4726 GuestCall {
4727 thread,
4728 kind: GuestCallKind::DeliverEvent {
4729 instance,
4730 set: Some(set),
4731 },
4732 },
4733 ),
4734 };
4735 state.push_high_priority(item);
4736 }
4737 }
4738 Ok(())
4739 }
4740
4741 /// Remove this waitable from the instance's rep table.
delete_from(&self, state: &mut ConcurrentState) -> Result<()>4742 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
4743 match self {
4744 Self::Host(task) => {
4745 log::trace!("delete host task {task:?}");
4746 state.delete(*task)?;
4747 }
4748 Self::Guest(task) => {
4749 log::trace!("delete guest task {task:?}");
4750 state.delete(*task)?.dispose(state)?;
4751 }
4752 Self::Transmit(task) => {
4753 state.delete(*task)?;
4754 }
4755 }
4756
4757 Ok(())
4758 }
4759 }
4760
4761 impl fmt::Debug for Waitable {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result4762 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4763 match self {
4764 Self::Host(id) => write!(f, "{id:?}"),
4765 Self::Guest(id) => write!(f, "{id:?}"),
4766 Self::Transmit(id) => write!(f, "{id:?}"),
4767 }
4768 }
4769 }
4770
4771 /// Represents a Component Model Async `waitable-set`.
4772 #[derive(Default)]
4773 struct WaitableSet {
4774 /// Which waitables in this set have pending events, if any.
4775 ready: BTreeSet<Waitable>,
4776 /// Which guest threads are currently waiting on this set, if any.
4777 waiting: BTreeMap<QualifiedThreadId, WaitMode>,
4778 }
4779
4780 impl TableDebug for WaitableSet {
type_name() -> &'static str4781 fn type_name() -> &'static str {
4782 "WaitableSet"
4783 }
4784 }
4785
4786 /// Type-erased closure to lower the parameters for a guest task.
4787 type RawLower =
4788 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>;
4789
4790 /// Type-erased closure to lift the result for a guest task.
4791 type RawLift = Box<
4792 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
4793 >;
4794
4795 /// Type erased result of a guest task which may be downcast to the expected
4796 /// type by a host caller (or simply ignored in the case of a guest caller; see
4797 /// `DummyResult`).
4798 type LiftedResult = Box<dyn Any + Send + Sync>;
4799
4800 /// Used to return a result from a `LiftFn` when the actual result has already
4801 /// been lowered to a guest task's stack and linear memory.
4802 struct DummyResult;
4803
4804 /// Represents the Component Model Async state of a (sub-)component instance.
4805 #[derive(Default)]
4806 pub struct ConcurrentInstanceState {
4807 /// Whether backpressure is set for this instance (enabled if >0)
4808 backpressure: u16,
4809 /// Whether this instance can be entered
4810 do_not_enter: bool,
4811 /// Pending calls for this instance which require `Self::backpressure` to be
4812 /// `true` and/or `Self::do_not_enter` to be false before they can proceed.
4813 pending: BTreeMap<QualifiedThreadId, GuestCallKind>,
4814 }
4815
4816 impl ConcurrentInstanceState {
pending_is_empty(&self) -> bool4817 pub fn pending_is_empty(&self) -> bool {
4818 self.pending.is_empty()
4819 }
4820 }
4821
4822 #[derive(Debug, Copy, Clone)]
4823 pub(crate) enum CurrentThread {
4824 Guest(QualifiedThreadId),
4825 Host(TableId<HostTask>),
4826 None,
4827 }
4828
4829 impl CurrentThread {
guest(&self) -> Option<&QualifiedThreadId>4830 fn guest(&self) -> Option<&QualifiedThreadId> {
4831 match self {
4832 Self::Guest(id) => Some(id),
4833 _ => None,
4834 }
4835 }
4836
host(&self) -> Option<TableId<HostTask>>4837 fn host(&self) -> Option<TableId<HostTask>> {
4838 match self {
4839 Self::Host(id) => Some(*id),
4840 _ => None,
4841 }
4842 }
4843
is_none(&self) -> bool4844 fn is_none(&self) -> bool {
4845 matches!(self, Self::None)
4846 }
4847 }
4848
4849 impl From<QualifiedThreadId> for CurrentThread {
from(id: QualifiedThreadId) -> Self4850 fn from(id: QualifiedThreadId) -> Self {
4851 Self::Guest(id)
4852 }
4853 }
4854
4855 impl From<TableId<HostTask>> for CurrentThread {
from(id: TableId<HostTask>) -> Self4856 fn from(id: TableId<HostTask>) -> Self {
4857 Self::Host(id)
4858 }
4859 }
4860
4861 /// Represents the Component Model Async state of a store.
4862 pub struct ConcurrentState {
4863 /// The currently running thread, if any.
4864 current_thread: CurrentThread,
4865
4866 /// The set of pending host and background tasks, if any.
4867 ///
4868 /// See `ComponentInstance::poll_until` for where we temporarily take this
4869 /// out, poll it, then put it back to avoid any mutable aliasing hazards.
4870 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>,
4871 /// The table of waitables, waitable sets, etc.
4872 table: AlwaysMut<ResourceTable>,
4873 /// The "high priority" work queue for this store's event loop.
4874 high_priority: Vec<WorkItem>,
4875 /// The "low priority" work queue for this store's event loop.
4876 low_priority: VecDeque<WorkItem>,
4877 /// A place to stash the reason a fiber is suspending so that the code which
4878 /// resumed it will know under what conditions the fiber should be resumed
4879 /// again.
4880 suspend_reason: Option<SuspendReason>,
4881 /// A cached fiber which is waiting for work to do.
4882 ///
4883 /// This helps us avoid creating a new fiber for each `GuestCall` work item.
4884 worker: Option<StoreFiber<'static>>,
4885 /// A place to stash the work item for which we're resuming a worker fiber.
4886 worker_item: Option<WorkerItem>,
4887
4888 /// Reference counts for all component error contexts
4889 ///
4890 /// NOTE: it is possible the global ref count to be *greater* than the sum of
4891 /// (sub)component ref counts as tracked by `error_context_tables`, for
4892 /// example when the host holds one or more references to error contexts.
4893 ///
4894 /// The key of this primary map is often referred to as the "rep" (i.e. host-side
4895 /// component-wide representation) of the index into concurrent state for a given
4896 /// stored `ErrorContext`.
4897 ///
4898 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same
4899 /// as a `TableId<ErrorContextState>`.
4900 global_error_context_ref_counts:
4901 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>,
4902 }
4903
4904 impl Default for ConcurrentState {
default() -> Self4905 fn default() -> Self {
4906 Self {
4907 current_thread: CurrentThread::None,
4908 table: AlwaysMut::new(ResourceTable::new()),
4909 futures: AlwaysMut::new(Some(FuturesUnordered::new())),
4910 high_priority: Vec::new(),
4911 low_priority: VecDeque::new(),
4912 suspend_reason: None,
4913 worker: None,
4914 worker_item: None,
4915 global_error_context_ref_counts: BTreeMap::new(),
4916 }
4917 }
4918 }
4919
4920 impl ConcurrentState {
4921 /// Take ownership of any fibers and futures owned by this object.
4922 ///
4923 /// This should be used when disposing of the `Store` containing this object
4924 /// in order to gracefully resolve any and all fibers using
4925 /// `StoreFiber::dispose`. This is necessary to avoid possible
4926 /// use-after-free bugs due to fibers which may still have access to the
4927 /// `Store`.
4928 ///
4929 /// Additionally, the futures collected with this function should be dropped
4930 /// within a `tls::set` call, which will ensure than any futures closing
4931 /// over an `&Accessor` will have access to the store when dropped, allowing
4932 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without
4933 /// panicking.
4934 ///
4935 /// Note that this will leave the object in an inconsistent and unusable
4936 /// state, so it should only be used just prior to dropping it.
take_fibers_and_futures( &mut self, fibers: &mut Vec<StoreFiber<'static>>, futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, )4937 pub(crate) fn take_fibers_and_futures(
4938 &mut self,
4939 fibers: &mut Vec<StoreFiber<'static>>,
4940 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>,
4941 ) {
4942 for entry in self.table.get_mut().iter_mut() {
4943 if let Some(set) = entry.downcast_mut::<WaitableSet>() {
4944 for mode in mem::take(&mut set.waiting).into_values() {
4945 if let WaitMode::Fiber(fiber) = mode {
4946 fibers.push(fiber);
4947 }
4948 }
4949 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() {
4950 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) =
4951 mem::replace(&mut thread.state, GuestThreadState::Completed)
4952 {
4953 fibers.push(fiber);
4954 }
4955 }
4956 }
4957
4958 if let Some(fiber) = self.worker.take() {
4959 fibers.push(fiber);
4960 }
4961
4962 let mut handle_item = |item| match item {
4963 WorkItem::ResumeFiber(fiber) => {
4964 fibers.push(fiber);
4965 }
4966 WorkItem::PushFuture(future) => {
4967 self.futures
4968 .get_mut()
4969 .as_mut()
4970 .unwrap()
4971 .push(future.into_inner());
4972 }
4973 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => {
4974 }
4975 };
4976
4977 for item in mem::take(&mut self.high_priority) {
4978 handle_item(item);
4979 }
4980 for item in mem::take(&mut self.low_priority) {
4981 handle_item(item);
4982 }
4983
4984 if let Some(them) = self.futures.get_mut().take() {
4985 futures.push(them);
4986 }
4987 }
4988
4989 /// Collect the next set of work items to run. This will be either all
4990 /// high-priority items, or a single low-priority item if there are no
4991 /// high-priority items.
collect_work_items_to_run(&mut self) -> Vec<WorkItem>4992 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> {
4993 let mut ready = mem::take(&mut self.high_priority);
4994 if ready.is_empty() {
4995 if let Some(item) = self.low_priority.pop_back() {
4996 ready.push(item);
4997 }
4998 }
4999 ready
5000 }
5001
push<V: Send + Sync + 'static>( &mut self, value: V, ) -> Result<TableId<V>, ResourceTableError>5002 fn push<V: Send + Sync + 'static>(
5003 &mut self,
5004 value: V,
5005 ) -> Result<TableId<V>, ResourceTableError> {
5006 self.table.get_mut().push(value).map(TableId::from)
5007 }
5008
get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError>5009 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> {
5010 self.table.get_mut().get_mut(&Resource::from(id))
5011 }
5012
add_child<T: 'static, U: 'static>( &mut self, child: TableId<T>, parent: TableId<U>, ) -> Result<(), ResourceTableError>5013 pub fn add_child<T: 'static, U: 'static>(
5014 &mut self,
5015 child: TableId<T>,
5016 parent: TableId<U>,
5017 ) -> Result<(), ResourceTableError> {
5018 self.table
5019 .get_mut()
5020 .add_child(Resource::from(child), Resource::from(parent))
5021 }
5022
remove_child<T: 'static, U: 'static>( &mut self, child: TableId<T>, parent: TableId<U>, ) -> Result<(), ResourceTableError>5023 pub fn remove_child<T: 'static, U: 'static>(
5024 &mut self,
5025 child: TableId<T>,
5026 parent: TableId<U>,
5027 ) -> Result<(), ResourceTableError> {
5028 self.table
5029 .get_mut()
5030 .remove_child(Resource::from(child), Resource::from(parent))
5031 }
5032
delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError>5033 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> {
5034 self.table.get_mut().delete(Resource::from(id))
5035 }
5036
push_future(&mut self, future: HostTaskFuture)5037 fn push_future(&mut self, future: HostTaskFuture) {
5038 // Note that we can't directly push to `ConcurrentState::futures` here
5039 // since this may be called from a future that's being polled inside
5040 // `Self::poll_until`, which temporarily removes the `FuturesUnordered`
5041 // so it has exclusive access while polling it. Therefore, we push a
5042 // work item to the "high priority" queue, which will actually push to
5043 // `ConcurrentState::futures` later.
5044 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future)));
5045 }
5046
push_high_priority(&mut self, item: WorkItem)5047 fn push_high_priority(&mut self, item: WorkItem) {
5048 log::trace!("push high priority: {item:?}");
5049 self.high_priority.push(item);
5050 }
5051
push_low_priority(&mut self, item: WorkItem)5052 fn push_low_priority(&mut self, item: WorkItem) {
5053 log::trace!("push low priority: {item:?}");
5054 self.low_priority.push_front(item);
5055 }
5056
push_work_item(&mut self, item: WorkItem, high_priority: bool)5057 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) {
5058 if high_priority {
5059 self.push_high_priority(item);
5060 } else {
5061 self.push_low_priority(item);
5062 }
5063 }
5064
promote_instance_local_thread_work_item( &mut self, current_instance: RuntimeComponentInstanceIndex, ) -> bool5065 fn promote_instance_local_thread_work_item(
5066 &mut self,
5067 current_instance: RuntimeComponentInstanceIndex,
5068 ) -> bool {
5069 self.promote_work_items_matching(|item: &WorkItem| match item {
5070 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => {
5071 *instance == current_instance
5072 }
5073 _ => false,
5074 })
5075 }
5076
promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool5077 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool {
5078 self.promote_work_items_matching(|item: &WorkItem| match item {
5079 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => {
5080 *t == thread
5081 }
5082 _ => false,
5083 })
5084 }
5085
promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool where F: FnMut(&WorkItem) -> bool,5086 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool
5087 where
5088 F: FnMut(&WorkItem) -> bool,
5089 {
5090 // If there's a high-priority work item to resume the current guest thread,
5091 // we don't need to promote anything, but we return true to indicate that
5092 // work is pending for the current instance.
5093 if self.high_priority.iter().any(&mut predicate) {
5094 true
5095 }
5096 // Otherwise, look for a low-priority work item that matches the current
5097 // instance and promote it to high-priority.
5098 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) {
5099 let item = self.low_priority.remove(idx).unwrap();
5100 self.push_high_priority(item);
5101 true
5102 } else {
5103 false
5104 }
5105 }
5106
5107 /// Implements the `context.get` intrinsic.
context_get(&mut self, slot: u32) -> Result<u32>5108 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> {
5109 let thread = self.current_guest_thread()?;
5110 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?];
5111 log::trace!("context_get {thread:?} slot {slot} val {val:#x}");
5112 Ok(val)
5113 }
5114
5115 /// Implements the `context.set` intrinsic.
context_set(&mut self, slot: u32, val: u32) -> Result<()>5116 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> {
5117 let thread = self.current_guest_thread()?;
5118 log::trace!("context_set {thread:?} slot {slot} val {val:#x}");
5119 self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val;
5120 Ok(())
5121 }
5122
5123 /// Returns whether there's a pending cancellation on the current guest thread,
5124 /// consuming the event if so.
take_pending_cancellation(&mut self) -> Result<bool>5125 fn take_pending_cancellation(&mut self) -> Result<bool> {
5126 let thread = self.current_guest_thread()?;
5127 if let Some(event) = self.get_mut(thread.task)?.event.take() {
5128 assert!(matches!(event, Event::Cancelled));
5129 Ok(true)
5130 } else {
5131 Ok(false)
5132 }
5133 }
5134
check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()>5135 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> {
5136 if self.may_block(task)? {
5137 Ok(())
5138 } else {
5139 Err(Trap::CannotBlockSyncTask.into())
5140 }
5141 }
5142
may_block(&mut self, task: TableId<GuestTask>) -> Result<bool>5143 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> {
5144 let task = self.get_mut(task)?;
5145 Ok(task.async_function || task.returned_or_cancelled())
5146 }
5147
5148 /// Used by `ResourceTables` to acquire the current `CallContext` for the
5149 /// specified task.
5150 ///
5151 /// The `task` is bit-packed as returned by `current_call_context_scope_id`
5152 /// below.
call_context(&mut self, task: u32) -> Result<&mut CallContext>5153 pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> {
5154 let (task, is_host) = (task >> 1, task & 1 == 1);
5155 if is_host {
5156 let task: TableId<HostTask> = TableId::new(task);
5157 Ok(&mut self.get_mut(task)?.call_context)
5158 } else {
5159 let task: TableId<GuestTask> = TableId::new(task);
5160 Ok(&mut self.get_mut(task)?.call_context)
5161 }
5162 }
5163
5164 /// Used by `ResourceTables` to record the scope of a borrow to get undone
5165 /// in the future.
current_call_context_scope_id(&self) -> Result<u32>5166 pub fn current_call_context_scope_id(&self) -> Result<u32> {
5167 let (bits, is_host) = match self.current_thread {
5168 CurrentThread::Guest(id) => (id.task.rep(), false),
5169 CurrentThread::Host(id) => (id.rep(), true),
5170 CurrentThread::None => bail_bug!("current thread is not set"),
5171 };
5172 assert_eq!((bits << 1) >> 1, bits);
5173 Ok((bits << 1) | u32::from(is_host))
5174 }
5175
current_guest_thread(&self) -> Result<QualifiedThreadId>5176 fn current_guest_thread(&self) -> Result<QualifiedThreadId> {
5177 match self.current_thread.guest() {
5178 Some(id) => Ok(*id),
5179 None => bail_bug!("current thread is not a guest thread"),
5180 }
5181 }
5182
current_host_thread(&self) -> Result<TableId<HostTask>>5183 fn current_host_thread(&self) -> Result<TableId<HostTask>> {
5184 match self.current_thread.host() {
5185 Some(id) => Ok(id),
5186 None => bail_bug!("current thread is not a host thread"),
5187 }
5188 }
5189
futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>>5190 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> {
5191 match self.futures.get_mut().as_mut() {
5192 Some(f) => Ok(f),
5193 None => bail_bug!("futures field of concurrent state is currently taken"),
5194 }
5195 }
5196
table(&mut self) -> &mut ResourceTable5197 pub(crate) fn table(&mut self) -> &mut ResourceTable {
5198 self.table.get_mut()
5199 }
5200 }
5201
5202 /// Provide a type hint to compiler about the shape of a parameter lower
5203 /// closure.
for_any_lower< F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, >( fun: F, ) -> F5204 fn for_any_lower<
5205 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync,
5206 >(
5207 fun: F,
5208 ) -> F {
5209 fun
5210 }
5211
5212 /// Provide a type hint to compiler about the shape of a result lift closure.
for_any_lift< F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, >( fun: F, ) -> F5213 fn for_any_lift<
5214 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync,
5215 >(
5216 fun: F,
5217 ) -> F {
5218 fun
5219 }
5220
5221 /// Wrap the specified future in a `poll_fn` which asserts that the future is
5222 /// only polled from the event loop of the specified `Store`.
5223 ///
5224 /// See `StoreContextMut::run_concurrent` for details.
checked<F: Future + Send + 'static>( id: StoreId, fut: F, ) -> impl Future<Output = F::Output> + Send + 'static5225 fn checked<F: Future + Send + 'static>(
5226 id: StoreId,
5227 fut: F,
5228 ) -> impl Future<Output = F::Output> + Send + 'static {
5229 async move {
5230 let mut fut = pin!(fut);
5231 future::poll_fn(move |cx| {
5232 let message = "\
5233 `Future`s which depend on asynchronous component tasks, streams, or \
5234 futures to complete may only be polled from the event loop of the \
5235 store to which they belong. Please use \
5236 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\
5237 ";
5238 tls::try_get(|store| {
5239 let matched = match store {
5240 tls::TryGet::Some(store) => store.id() == id,
5241 tls::TryGet::Taken | tls::TryGet::None => false,
5242 };
5243
5244 if !matched {
5245 panic!("{message}")
5246 }
5247 });
5248 fut.as_mut().poll(cx)
5249 })
5250 .await
5251 }
5252 }
5253
5254 /// Assert that `StoreContextMut::run_concurrent` has not been called from
5255 /// within an store's event loop.
check_recursive_run()5256 fn check_recursive_run() {
5257 tls::try_get(|store| {
5258 if !matches!(store, tls::TryGet::None) {
5259 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported")
5260 }
5261 });
5262 }
5263
unpack_callback_code(code: u32) -> (u32, u32)5264 fn unpack_callback_code(code: u32) -> (u32, u32) {
5265 (code & 0xF, code >> 4)
5266 }
5267
5268 /// Helper struct for packaging parameters to be passed to
5269 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or
5270 /// `waitable-set.poll`.
5271 struct WaitableCheckParams {
5272 set: TableId<WaitableSet>,
5273 options: OptionsIndex,
5274 payload: u32,
5275 }
5276
5277 /// Indicates whether `ComponentInstance::waitable_check` is being called for
5278 /// `waitable-set.wait` or `waitable-set.poll`.
5279 enum WaitableCheck {
5280 Wait,
5281 Poll,
5282 }
5283
5284 /// Represents a guest task called from the host, prepared using `prepare_call`.
5285 pub(crate) struct PreparedCall<R> {
5286 /// The guest export to be called
5287 handle: Func,
5288 /// The guest thread created by `prepare_call`
5289 thread: QualifiedThreadId,
5290 /// The number of lowered core Wasm parameters to pass to the call.
5291 param_count: usize,
5292 /// The `oneshot::Receiver` to which the result of the call will be
5293 /// delivered when it is available.
5294 rx: oneshot::Receiver<LiftedResult>,
5295 _phantom: PhantomData<R>,
5296 }
5297
5298 impl<R> PreparedCall<R> {
5299 /// Get a copy of the `TaskId` for this `PreparedCall`.
task_id(&self) -> TaskId5300 pub(crate) fn task_id(&self) -> TaskId {
5301 TaskId {
5302 task: self.thread.task,
5303 }
5304 }
5305 }
5306
5307 /// Represents a task created by `prepare_call`.
5308 pub(crate) struct TaskId {
5309 task: TableId<GuestTask>,
5310 }
5311
5312 impl TaskId {
5313 /// The host future for an async task was dropped. If the parameters have not been lowered yet,
5314 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case,
5315 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended
5316 /// and can be resumed by other tasks for this component, so we mark the future as dropped
5317 /// and delete the task when all threads are done.
host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()>5318 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> {
5319 let task = store.0.concurrent_state_mut().get_mut(self.task)?;
5320 if !task.already_lowered_parameters() {
5321 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5322 } else {
5323 task.host_future_state = HostFutureState::Dropped;
5324 if task.ready_to_delete() {
5325 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())?
5326 }
5327 }
5328 Ok(())
5329 }
5330 }
5331
5332 /// Prepare a call to the specified exported Wasm function, providing functions
5333 /// for lowering the parameters and lifting the result.
5334 ///
5335 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event
5336 /// loop, use `queue_call`.
prepare_call<T, R>( mut store: StoreContextMut<T>, handle: Func, param_count: usize, host_future_present: bool, lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync + 'static, lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync + 'static, ) -> Result<PreparedCall<R>>5337 pub(crate) fn prepare_call<T, R>(
5338 mut store: StoreContextMut<T>,
5339 handle: Func,
5340 param_count: usize,
5341 host_future_present: bool,
5342 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()>
5343 + Send
5344 + Sync
5345 + 'static,
5346 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>>
5347 + Send
5348 + Sync
5349 + 'static,
5350 ) -> Result<PreparedCall<R>> {
5351 let (options, _flags, ty, raw_options) = handle.abi_info(store.0);
5352
5353 let instance = handle.instance().id().get(store.0);
5354 let options = &instance.component().env_component().options[options];
5355 let ty = &instance.component().types()[ty];
5356 let async_function = ty.async_;
5357 let task_return_type = ty.results;
5358 let component_instance = raw_options.instance;
5359 let callback = options.callback.map(|i| instance.runtime_callback(i));
5360 let memory = options
5361 .memory()
5362 .map(|i| instance.runtime_memory(i))
5363 .map(SendSyncPtr::new);
5364 let string_encoding = options.string_encoding;
5365 let token = StoreToken::new(store.as_context_mut());
5366 let state = store.0.concurrent_state_mut();
5367
5368 let (tx, rx) = oneshot::channel();
5369
5370 let instance = RuntimeInstance {
5371 instance: handle.instance().id().instance(),
5372 index: component_instance,
5373 };
5374 let caller = state.current_thread;
5375 let task = GuestTask::new(
5376 Box::new(for_any_lower(move |store, params| {
5377 lower_params(handle, token.as_context_mut(store), params)
5378 })),
5379 LiftResult {
5380 lift: Box::new(for_any_lift(move |store, result| {
5381 lift_result(handle, store, result)
5382 })),
5383 ty: task_return_type,
5384 memory,
5385 string_encoding,
5386 },
5387 Caller::Host {
5388 tx: Some(tx),
5389 host_future_present,
5390 caller,
5391 },
5392 callback.map(|callback| {
5393 let callback = SendSyncPtr::new(callback);
5394 let instance = handle.instance();
5395 Box::new(move |store: &mut dyn VMStore, event, handle| {
5396 let store = token.as_context_mut(store);
5397 // SAFETY: Per the contract of `prepare_call`, the callback
5398 // will remain valid at least as long is this task exists.
5399 unsafe { instance.call_callback(store, callback, event, handle) }
5400 }) as CallbackFn
5401 }),
5402 instance,
5403 async_function,
5404 )?;
5405
5406 let task = state.push(task)?;
5407 let new_thread = GuestThread::new_implicit(state, task)?;
5408 let thread = state.push(new_thread)?;
5409 state.get_mut(task)?.threads.insert(thread);
5410
5411 if !store.0.may_enter(instance)? {
5412 bail!(Trap::CannotEnterComponent);
5413 }
5414
5415 Ok(PreparedCall {
5416 handle,
5417 thread: QualifiedThreadId { task, thread },
5418 param_count,
5419 rx,
5420 _phantom: PhantomData,
5421 })
5422 }
5423
5424 /// Queue a call previously prepared using `prepare_call` to be run as part of
5425 /// the associated `ComponentInstance`'s event loop.
5426 ///
5427 /// The returned future will resolve to the result once it is available, but
5428 /// must only be polled via the instance's event loop. See
5429 /// `StoreContextMut::run_concurrent` for details.
queue_call<T: 'static, R: Send + 'static>( mut store: StoreContextMut<T>, prepared: PreparedCall<R>, ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>>5430 pub(crate) fn queue_call<T: 'static, R: Send + 'static>(
5431 mut store: StoreContextMut<T>,
5432 prepared: PreparedCall<R>,
5433 ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> {
5434 let PreparedCall {
5435 handle,
5436 thread,
5437 param_count,
5438 rx,
5439 ..
5440 } = prepared;
5441
5442 queue_call0(store.as_context_mut(), handle, thread, param_count)?;
5443
5444 Ok(checked(
5445 store.0.id(),
5446 rx.map(move |result| match result {
5447 Ok(r) => match r.downcast() {
5448 Ok(r) => Ok(*r),
5449 Err(_) => bail_bug!("wrong type of value produced"),
5450 },
5451 Err(e) => Err(e.into()),
5452 }),
5453 ))
5454 }
5455
5456 /// Queue a call previously prepared using `prepare_call` to be run as part of
5457 /// the associated `ComponentInstance`'s event loop.
queue_call0<T: 'static>( store: StoreContextMut<T>, handle: Func, guest_thread: QualifiedThreadId, param_count: usize, ) -> Result<()>5458 fn queue_call0<T: 'static>(
5459 store: StoreContextMut<T>,
5460 handle: Func,
5461 guest_thread: QualifiedThreadId,
5462 param_count: usize,
5463 ) -> Result<()> {
5464 let (_options, _, _ty, raw_options) = handle.abi_info(store.0);
5465 let is_concurrent = raw_options.async_;
5466 let callback = raw_options.callback;
5467 let instance = handle.instance();
5468 let callee = handle.lifted_core_func(store.0);
5469 let post_return = handle.post_return_core_func(store.0);
5470 let callback = callback.map(|i| {
5471 let instance = instance.id().get(store.0);
5472 SendSyncPtr::new(instance.runtime_callback(i))
5473 });
5474
5475 log::trace!("queueing call {guest_thread:?}");
5476
5477 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers
5478 // (with signatures appropriate for this call) and will remain valid as
5479 // long as this instance is valid.
5480 unsafe {
5481 instance.queue_call(
5482 store,
5483 guest_thread,
5484 SendSyncPtr::new(callee),
5485 param_count,
5486 1,
5487 is_concurrent,
5488 callback,
5489 post_return.map(SendSyncPtr::new),
5490 )
5491 }
5492 }
5493