//! Runtime support for the Component Model Async ABI. //! //! This module and its submodules provide host runtime support for Component //! Model Async features such as async-lifted exports, async-lowered imports, //! streams, futures, and related intrinsics. See [the Async //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) //! for a high-level overview. //! //! At the core of this support is an event loop which schedules and switches //! between guest tasks and any host tasks they create. Each //! `Store` will have at most one event loop running at any given //! time, and that loop may be suspended and resumed by the host embedder using //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` //! function contains the loop itself, while the //! `StoreOpaque::concurrent_state` field holds its state. //! //! # Public API Overview //! //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) //! //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an //! async-lifted or sync-lifted import, creating a guest task. //! //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified //! instance, allowing any and all tasks belonging to that instance to make //! progress. //! //! - `StoreContextMut::spawn`: Run a background task as part of the event loop //! for the specified instance. //! //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or //! `stream` which may be passed to the guest. This takes a //! `{Future,Stream}Producer` implementation which will be polled for items when //! the consumer requests them. //! //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by //! connecting it to a `{Future,Stream}Consumer` which will consume any items //! produced by the write end. //! //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) //! //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host //! function with the linker. That function will take an `Accessor` as its //! first parameter, which provides access to the store between (but not across) //! await points. //! //! - `Accessor::with`: Access the store and its associated data. //! //! - `Accessor::spawn`: Run a background task as part of the event loop for the //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use //! in host functions. use crate::bail_bug; use crate::component::func::{self, Func, call_post_return}; use crate::component::{ HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance, }; use crate::fiber::{self, StoreFiber, StoreFiberYield}; use crate::prelude::*; use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; use crate::vm::component::{CallContext, ComponentInstance, InstanceState}; use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; use crate::{ AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail, }; use error_contexts::GlobalErrorContextRefCount; use futures::channel::oneshot; use futures::future::{self, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; use std::any::Any; use std::borrow::ToOwned; use std::boxed::Box; use std::cell::UnsafeCell; use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::ops::DerefMut; use std::pin::{Pin, pin}; use std::ptr::{self, NonNull}; use std::task::{Context, Poll, Waker}; use std::vec::Vec; use table::{TableDebug, TableId}; use wasmtime_environ::Trap; use wasmtime_environ::component::{ CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, }; use wasmtime_environ::packed_option::ReservedValue; pub use abort::JoinHandle; pub use future_stream_any::{FutureAny, StreamAny}; pub use futures_and_streams::{ Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, }; pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; mod abort; mod error_contexts; mod future_stream_any; mod futures_and_streams; pub(crate) mod table; pub(crate) mod tls; /// Constant defined in the Component Model spec to indicate that the async /// intrinsic (e.g. `future.write`) has not yet completed. const BLOCKED: u32 = 0xffff_ffff; /// Corresponds to `CallState` in the upstream spec. #[derive(Clone, Copy, Eq, PartialEq, Debug)] pub enum Status { Starting = 0, Started = 1, Returned = 2, StartCancelled = 3, ReturnCancelled = 4, } impl Status { /// Packs this status and the optional `waitable` provided into a 32-bit /// result that the canonical ABI requires. /// /// The low 4 bits are reserved for the status while the upper 28 bits are /// the waitable, if present. pub fn pack(self, waitable: Option) -> u32 { assert!(matches!(self, Status::Returned) == waitable.is_none()); let waitable = waitable.unwrap_or(0); assert!(waitable < (1 << 28)); (waitable << 4) | (self as u32) } } /// Corresponds to `EventCode` in the Component Model spec, plus related payload /// data. #[derive(Clone, Copy, Debug)] enum Event { None, Subtask { status: Status, }, StreamRead { code: ReturnCode, pending: Option<(TypeStreamTableIndex, u32)>, }, StreamWrite { code: ReturnCode, pending: Option<(TypeStreamTableIndex, u32)>, }, FutureRead { code: ReturnCode, pending: Option<(TypeFutureTableIndex, u32)>, }, FutureWrite { code: ReturnCode, pending: Option<(TypeFutureTableIndex, u32)>, }, Cancelled, } impl Event { /// Lower this event to core Wasm integers for delivery to the guest. /// /// Note that the waitable handle, if any, is assumed to be lowered /// separately. fn parts(self) -> (u32, u32) { const EVENT_NONE: u32 = 0; const EVENT_SUBTASK: u32 = 1; const EVENT_STREAM_READ: u32 = 2; const EVENT_STREAM_WRITE: u32 = 3; const EVENT_FUTURE_READ: u32 = 4; const EVENT_FUTURE_WRITE: u32 = 5; const EVENT_CANCELLED: u32 = 6; match self { Event::None => (EVENT_NONE, 0), Event::Cancelled => (EVENT_CANCELLED, 0), Event::Subtask { status } => (EVENT_SUBTASK, status as u32), Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), } } } /// Corresponds to `CallbackCode` in the spec. mod callback_code { pub const EXIT: u32 = 0; pub const YIELD: u32 = 1; pub const WAIT: u32 = 2; } /// A flag indicating that the callee is an async-lowered export. /// /// This may be passed to the `async-start` intrinsic from a fused adapter. const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; /// Provides access to either store data (via the `get` method) or the store /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component /// instance to which the current host task belongs. /// /// See [`Accessor::with`] for details. pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf> { store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>, } impl<'a, T, D> Access<'a, T, D> where D: HasData + ?Sized, T: 'static, { /// Creates a new [`Access`] from its component parts. pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { Self { store, get_data } } /// Get mutable access to the store data. pub fn data_mut(&mut self) -> &mut T { self.store.data_mut() } /// Get mutable access to the store data. pub fn get(&mut self) -> D::Data<'_> { (self.get_data)(self.data_mut()) } /// Spawn a background task. /// /// See [`Accessor::spawn`] for details. pub fn spawn(&mut self, task: impl AccessorTask) -> JoinHandle where T: 'static, { let accessor = Accessor { get_data: self.get_data, token: StoreToken::new(self.store.as_context_mut()), }; self.store .as_context_mut() .spawn_with_accessor(accessor, task) } /// Returns the getter this accessor is using to project from `T` into /// `D::Data`. pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { self.get_data } } impl<'a, T, D> AsContext for Access<'a, T, D> where D: HasData + ?Sized, T: 'static, { type Data = T; fn as_context(&self) -> StoreContext<'_, T> { self.store.as_context() } } impl<'a, T, D> AsContextMut for Access<'a, T, D> where D: HasData + ?Sized, T: 'static, { fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { self.store.as_context_mut() } } /// Provides scoped mutable access to store data in the context of a concurrent /// host task future. /// /// This allows multiple host task futures to execute concurrently and access /// the store between (but not across) `await` points. /// /// # Rationale /// /// This structure is sort of like `&mut T` plus a projection from `&mut T` to /// `D::Data<'_>`. The problem this is solving, however, is that it does not /// literally store these values. The basic problem is that when a concurrent /// host future is being polled it has access to `&mut T` (and the whole /// `Store`) but when it's not being polled it does not have access to these /// values. This reflects how the store is only ever polling one future at a /// time so the store is effectively being passed between futures. /// /// Rust's `Future` trait, however, has no means of passing a `Store` /// temporarily between futures. The [`Context`](std::task::Context) type does /// not have the ability to attach arbitrary information to it at this time. /// This type, [`Accessor`], is used to bridge this expressivity gap. /// /// The [`Accessor`] type here represents the ability to acquire, temporarily in /// a synchronous manner, the current store. The [`Accessor::with`] function /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does /// not take an `async` closure as its argument, instead it's a synchronous /// closure which must complete during on run of `Future::poll`. This reflects /// how the store is temporarily made available while a host future is being /// polled. /// /// # Implementation /// /// This type does not actually store `&mut T` nor `StoreContextMut`, and /// this type additionally doesn't even have a lifetime parameter. This is /// instead a representation of proof of the ability to acquire these while a /// future is being polled. Wasmtime will, when it polls a host future, /// configure ambient state such that the `Accessor` that a future closes over /// will work and be able to access the store. /// /// This has a number of implications for users such as: /// /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within /// the lifetime of a single future. /// * A future is expected to, however, close over an `Accessor` and keep it /// alive probably for the duration of the entire future. /// * Different host futures will be given different `Accessor`s, and that's /// intentional. /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which /// alleviates some otherwise required bounds to be written down. /// /// # Using `Accessor` in `Drop` /// /// The methods on `Accessor` are only expected to work in the context of /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a /// host future can be dropped at any time throughout the system and Wasmtime /// store context is not necessarily available at that time. It's recommended to /// not use `Accessor` methods in anything connected to a `Drop` implementation /// as they will panic and have unintended results. If you run into this though /// feel free to file an issue on the Wasmtime repository. pub struct Accessor> where D: HasData + ?Sized, { token: StoreToken, get_data: fn(&mut T) -> D::Data<'_>, } /// A helper trait to take any type of accessor-with-data in functions. /// /// This trait is similar to [`AsContextMut`] except that it's used when /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The /// [`Accessor`] is the main type used in concurrent settings and is passed to /// functions such as [`Func::call_concurrent`]. /// /// This trait is implemented for [`Accessor`] and `&T` where `T` implements /// this trait. This effectively means that regardless of the `D` in /// `Accessor` it can still be passed to a function which just needs a /// store accessor. /// /// Acquiring an [`Accessor`] can be done through /// [`StoreContextMut::run_concurrent`] for example or in a host function /// through /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent). pub trait AsAccessor { /// The `T` in `Store` that this accessor refers to. type Data: 'static; /// The `D` in `Accessor`, or the projection out of /// `Self::Data`. type AccessorData: HasData + ?Sized; /// Returns the accessor that this is referring to. fn as_accessor(&self) -> &Accessor; } impl AsAccessor for &T { type Data = T::Data; type AccessorData = T::AccessorData; fn as_accessor(&self) -> &Accessor { T::as_accessor(self) } } impl AsAccessor for Accessor { type Data = T; type AccessorData = D; fn as_accessor(&self) -> &Accessor { self } } // Note that it is intentional at this time that `Accessor` does not actually // store `&mut T` or anything similar. This distinctly enables the `Accessor` // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for // that matter). This is used to ergonomically simplify bindings where the // majority of the time `Accessor` is closed over in a future which then needs // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as // you already have to write `T: 'static`...) it helps to avoid this. // // Note as well that `Accessor` doesn't actually store its data at all. Instead // it's more of a "proof" of what can be accessed from TLS. API design around // `Accessor` and functions like `Linker::func_wrap_concurrent` are // intentionally made to ensure that `Accessor` is ideally only used in the // context that TLS variables are actually set. For example host functions are // given `&Accessor`, not `Accessor`, and this prevents them from persisting // the value outside of a future. Within the future the TLS variables are all // guaranteed to be set while the future is being polled. // // Finally though this is not an ironclad guarantee, but nor does it need to be. // The TLS APIs are designed to panic or otherwise model usage where they're // called recursively or similar. It's hoped that code cannot be constructed to // actually hit this at runtime but this is not a safety requirement at this // time. const _: () = { const fn assert() {} assert::>>(); }; impl Accessor { /// Creates a new `Accessor` backed by the specified functions. /// /// - `get`: used to retrieve the store /// /// - `get_data`: used to "project" from the store's associated data to /// another type (e.g. a field of that data or a wrapper around it). /// /// - `spawn`: used to queue spawned background tasks to be run later pub(crate) fn new(token: StoreToken) -> Self { Self { token, get_data: |x| x, } } } impl Accessor where D: HasData + ?Sized, { /// Run the specified closure, passing it mutable access to the store. /// /// This function is one of the main building blocks of the [`Accessor`] /// type. This yields synchronous, blocking, access to the store via an /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to /// providing the ability to access `D` via [`Access::get`]. Note that the /// `fun` here is given only temporary access to the store and `T`/`D` /// meaning that the return value `R` here is not allowed to capture borrows /// into the two. If access is needed to data within `T` or `D` outside of /// this closure then it must be `clone`d out, for example. /// /// # Panics /// /// This function will panic if it is call recursively with any other /// accessor already in scope. For example if `with` is called within `fun`, /// then this function will panic. It is up to the embedder to ensure that /// this does not happen. pub fn with(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { tls::get(|vmstore| { fun(Access { store: self.token.as_context_mut(vmstore), get_data: self.get_data, }) }) } /// Returns the getter this accessor is using to project from `T` into /// `D::Data`. pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { self.get_data } /// Changes this accessor to access `D2` instead of the current type /// parameter `D`. /// /// This changes the underlying data access from `T` to `D2::Data<'_>`. /// /// # Panics /// /// When using this API the returned value is disconnected from `&self` and /// the lifetime binding the `self` argument. An `Accessor` only works /// within the context of the closure or async closure that it was /// originally given to, however. This means that due to the fact that the /// returned value has no lifetime connection it's possible to use the /// accessor outside of `&self`, the original accessor, and panic. /// /// The returned value should only be used within the scope of the original /// `Accessor` that `self` refers to. pub fn with_getter( &self, get_data: fn(&mut T) -> D2::Data<'_>, ) -> Accessor { Accessor { token: self.token, get_data, } } /// Spawn a background task which will receive an `&Accessor` and /// run concurrently with any other tasks in progress for the current /// store. /// /// This is particularly useful for host functions which return a `stream` /// or `future` such that the code to write to the write end of that /// `stream` or `future` must run after the function returns. /// /// The returned [`JoinHandle`] may be used to cancel the task. /// /// # Panics /// /// Panics if called within a closure provided to the [`Accessor::with`] /// function. This can only be called outside an active invocation of /// [`Accessor::with`]. pub fn spawn(&self, task: impl AccessorTask) -> JoinHandle where T: 'static, { let accessor = self.clone_for_spawn(); self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) } fn clone_for_spawn(&self) -> Self { Self { token: self.token, get_data: self.get_data, } } } /// Represents a task which may be provided to `Accessor::spawn`, /// `Accessor::forward`, or `StorecContextMut::spawn`. // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable // option. // // As of this writing, it's not possible to specify e.g. `Send` and `Sync` // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: // Future> + Send + Sync, FN: FnOnce(&Accessor) -> F + // Send + Sync + 'static` fails with a type mismatch error when we try to pass // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the // best we can do for the time being. pub trait AccessorTask>: Send + 'static where D: HasData + ?Sized, { /// Run the task. fn run(self, accessor: &Accessor) -> impl Future> + Send; } /// Represents parameter and result metadata for the caller side of a /// guest->guest call orchestrated by a fused adapter. enum CallerInfo { /// Metadata for a call to an async-lowered import Async { params: Vec, has_result: bool, }, /// Metadata for a call to an sync-lowered import Sync { params: Vec, result_count: u32, }, } /// Indicates how a guest task is waiting on a waitable set. enum WaitMode { /// The guest task is waiting using `task.wait` Fiber(StoreFiber<'static>), /// The guest task is waiting via a callback declared as part of an /// async-lifted export. Callback(Instance), } /// Represents the reason a fiber is suspending itself. #[derive(Debug)] enum SuspendReason { /// The fiber is waiting for an event to be delivered to the specified /// waitable set or task. Waiting { set: TableId, thread: QualifiedThreadId, skip_may_block_check: bool, }, /// The fiber has finished handling its most recent work item and is waiting /// for another (or to be dropped if it is no longer needed). NeedWork, /// The fiber is yielding and should be resumed once other tasks have had a /// chance to run. Yielding { thread: QualifiedThreadId, skip_may_block_check: bool, }, /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. ExplicitlySuspending { thread: QualifiedThreadId, skip_may_block_check: bool, }, } /// Represents a pending call into guest code for a given guest task. enum GuestCallKind { /// Indicates there's an event to deliver to the task, possibly related to a /// waitable set the task has been waiting on or polling. DeliverEvent { /// The instance to which the task belongs. instance: Instance, /// The waitable set the event belongs to, if any. /// /// If this is `None` the event will be waiting in the /// `GuestTask::event` field for the task. set: Option>, }, /// Indicates that a new guest task call is pending and may be executed /// using the specified closure. /// /// If the closure returns `Ok(Some(call))`, the `call` should be run /// immediately using `handle_guest_call`. StartImplicit(Box Result> + Send + Sync>), StartExplicit(Box Result<()> + Send + Sync>), } impl fmt::Debug for GuestCallKind { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::DeliverEvent { instance, set } => f .debug_struct("DeliverEvent") .field("instance", instance) .field("set", set) .finish(), Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), } } } /// The target of a suspension intrinsic. #[derive(Copy, Clone, Debug)] pub enum SuspensionTarget { SomeSuspended(u32), Some(u32), None, } impl SuspensionTarget { fn is_none(&self) -> bool { matches!(self, SuspensionTarget::None) } fn is_some(&self) -> bool { !self.is_none() } } /// Represents a pending call into guest code for a given guest thread. #[derive(Debug)] struct GuestCall { thread: QualifiedThreadId, kind: GuestCallKind, } impl GuestCall { /// Returns whether or not the call is ready to run. /// /// A call will not be ready to run if either: /// /// - the (sub-)component instance to be called has already been entered and /// cannot be reentered until an in-progress call completes /// /// - the call is for a not-yet started task and the (sub-)component /// instance to be called has backpressure enabled fn is_ready(&self, store: &mut StoreOpaque) -> Result { let instance = store .concurrent_state_mut() .get_mut(self.thread.task)? .instance; let state = store.instance_state(instance).concurrent_state(); let ready = match &self.kind { GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), GuestCallKind::StartExplicit(_) => true, }; log::trace!( "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", state.do_not_enter, state.backpressure ); Ok(ready) } } /// Job to be run on a worker fiber. enum WorkerItem { GuestCall(GuestCall), Function(AlwaysMut Result<()> + Send>>), } /// Represents a pending work item to be handled by the event loop for a given /// component instance. enum WorkItem { /// A host task to be pushed to `ConcurrentState::futures`. PushFuture(AlwaysMut), /// A fiber to resume. ResumeFiber(StoreFiber<'static>), /// A thread to resume. ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId), /// A pending call into guest code for a given guest task. GuestCall(RuntimeComponentInstanceIndex, GuestCall), /// A job to run on a worker fiber. WorkerFunction(AlwaysMut Result<()> + Send>>), } impl fmt::Debug for WorkItem { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), Self::ResumeThread(instance, thread) => f .debug_tuple("ResumeThread") .field(instance) .field(thread) .finish(), Self::GuestCall(instance, call) => f .debug_tuple("GuestCall") .field(instance) .field(call) .finish(), Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), } } } /// Whether a suspension intrinsic was cancelled or completed #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] pub(crate) enum WaitResult { Cancelled, Completed, } /// Poll the specified future until it completes on behalf of a guest->host call /// using a sync-lowered import. /// /// This is similar to `Instance::first_poll` except it's for sync-lowered /// imports, meaning we don't need to handle cancellation and we can block the /// caller until the task completes, at which point the caller can handle /// lowering the result to the guest's stack and linear memory. pub(crate) fn poll_and_block( store: &mut dyn VMStore, future: impl Future> + Send + 'static, ) -> Result { let state = store.concurrent_state_mut(); let task = state.current_host_thread()?; // Wrap the future in a closure which will take care of stashing the result // in `GuestTask::result` and resuming this fiber when the host task // completes. let mut future = Box::pin(async move { let result = future.await?; tls::get(move |store| { let state = store.concurrent_state_mut(); let host_state = &mut state.get_mut(task)?.state; assert!(matches!(host_state, HostTaskState::CalleeStarted)); *host_state = HostTaskState::CalleeFinished(Box::new(result)); Waitable::Host(task).set_event( state, Some(Event::Subtask { status: Status::Returned, }), )?; Ok(()) }) }) as HostTaskFuture; // Finally, poll the future. We can use a dummy `Waker` here because we'll // add the future to `ConcurrentState::futures` and poll it automatically // from the event loop if it doesn't complete immediately here. let poll = tls::set(store, || { future .as_mut() .poll(&mut Context::from_waker(&Waker::noop())) }); match poll { // It completed immediately; check the result and delete the task. Poll::Ready(result) => result?, // It did not complete immediately; add it to // `ConcurrentState::futures` so it will be polled via the event loop; // then use `GuestThread::sync_call_set` to wait for the task to // complete, suspending the current fiber until it does so. Poll::Pending => { let state = store.concurrent_state_mut(); state.push_future(future); let caller = state.get_mut(task)?.caller; let set = state.get_mut(caller.thread)?.sync_call_set; Waitable::Host(task).join(state, Some(set))?; store.suspend(SuspendReason::Waiting { set, thread: caller, skip_may_block_check: false, })?; // Remove the `task` from the `sync_call_set` to ensure that when // this function returns and the task is deleted that there are no // more lingering references to this host task. Waitable::Host(task).join(store.concurrent_state_mut(), None)?; } } // Retrieve and return the result. let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state; match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) { HostTaskState::CalleeFinished(result) => Ok(match result.downcast() { Ok(result) => *result, Err(_) => bail_bug!("host task finished with wrong type of result"), }), _ => bail_bug!("unexpected host task state after completion"), } } /// Execute the specified guest call. fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { let mut next = Some(call); while let Some(call) = next.take() { match call.kind { GuestCallKind::DeliverEvent { instance, set } => { let (event, waitable) = match instance.get_event(store, call.thread.task, set, true)? { Some(pair) => pair, None => bail_bug!("delivering non-present event"), }; let state = store.concurrent_state_mut(); let task = state.get_mut(call.thread.task)?; let runtime_instance = task.instance; let handle = waitable.map(|(_, v)| v).unwrap_or(0); log::trace!( "use callback to deliver event {event:?} to {:?} for {waitable:?}", call.thread, ); let old_thread = store.set_thread(call.thread)?; log::trace!( "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", call.thread ); store.enter_instance(runtime_instance); let Some(callback) = store .concurrent_state_mut() .get_mut(call.thread.task)? .callback .take() else { bail_bug!("guest task callback field not present") }; let code = callback(store, event, handle)?; store .concurrent_state_mut() .get_mut(call.thread.task)? .callback = Some(callback); store.exit_instance(runtime_instance)?; store.set_thread(old_thread)?; next = instance.handle_callback_code( store, call.thread, runtime_instance.index, code, )?; log::trace!( "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" ); } GuestCallKind::StartImplicit(fun) => { next = fun(store)?; } GuestCallKind::StartExplicit(fun) => { fun(store)?; } } } Ok(()) } impl Store { /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. pub async fn run_concurrent(&mut self, fun: impl AsyncFnOnce(&Accessor) -> R) -> Result where T: Send + 'static, { ensure!( self.as_context().0.concurrency_support(), "cannot use `run_concurrent` when Config::concurrency_support disabled", ); self.as_context_mut().run_concurrent(fun).await } #[doc(hidden)] pub fn assert_concurrent_state_empty(&mut self) { self.as_context_mut().assert_concurrent_state_empty(); } #[doc(hidden)] pub fn concurrent_state_table_size(&mut self) -> usize { self.as_context_mut().concurrent_state_table_size() } /// Convenience wrapper for [`StoreContextMut::spawn`]. pub fn spawn(&mut self, task: impl AccessorTask>) -> JoinHandle where T: 'static, { self.as_context_mut().spawn(task) } } impl StoreContextMut<'_, T> { /// Assert that all the relevant tables and queues in the concurrent state /// for this store are empty. /// /// This is for sanity checking in integration tests /// (e.g. `component-async-tests`) that the relevant state has been cleared /// after each test concludes. This should help us catch leaks, e.g. guest /// tasks which haven't been deleted despite having completed and having /// been dropped by their supertasks. /// /// Only intended for use in Wasmtime's own testing. #[doc(hidden)] pub fn assert_concurrent_state_empty(self) { let store = self.0; store .store_data_mut() .components .assert_instance_states_empty(); let state = store.concurrent_state_mut(); assert!( state.table.get_mut().is_empty(), "non-empty table: {:?}", state.table.get_mut() ); assert!(state.high_priority.is_empty()); assert!(state.low_priority.is_empty()); assert!(state.current_thread.is_none()); assert!(state.futures_mut().unwrap().is_empty()); assert!(state.global_error_context_ref_counts.is_empty()); } /// Helper function to perform tests over the size of the concurrent state /// table which can be useful for detecting leaks. /// /// Only intended for use in Wasmtime's own testing. #[doc(hidden)] pub fn concurrent_state_table_size(&mut self) -> usize { self.0 .concurrent_state_mut() .table .get_mut() .iter_mut() .count() } /// Spawn a background task to run as part of this instance's event loop. /// /// The task will receive an `&Accessor` and run concurrently with /// any other tasks in progress for the instance. /// /// Note that the task will only make progress if and when the event loop /// for this instance is run. /// /// The returned [`JoinHandle`] may be used to cancel the task. pub fn spawn(mut self, task: impl AccessorTask) -> JoinHandle where T: 'static, { let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); self.spawn_with_accessor(accessor, task) } /// Internal implementation of `spawn` functions where a `store` is /// available along with an `Accessor`. fn spawn_with_accessor( self, accessor: Accessor, task: impl AccessorTask, ) -> JoinHandle where T: 'static, D: HasData + ?Sized, { // Create an "abortable future" here where internally the future will // hook calls to poll and possibly spawn more background tasks on each // iteration. let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); self.0 .concurrent_state_mut() .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); handle } /// Run the specified closure `fun` to completion as part of this store's /// event loop. /// /// This will run `fun` as part of this store's event loop until it /// yields a result. `fun` is provided an [`Accessor`], which provides /// controlled access to the store and its data. /// /// This function can be used to invoke [`Func::call_concurrent`] for /// example within the async closure provided here. /// /// This function will unconditionally return an error if /// [`Config::concurrency_support`] is disabled. /// /// [`Config::concurrency_support`]: crate::Config::concurrency_support /// /// # Store-blocking behavior /// /// At this time there are certain situations in which the `Future` returned /// by the `AsyncFnOnce` passed to this function will not be polled for an /// extended period of time, despite one or more `Waker::wake` events having /// occurred for the task to which it belongs. This can manifest as the /// `Future` seeming to be "blocked" or "locked up", but is actually due to /// the `Store` being held by e.g. a blocking host function, preventing the /// `Future` from being polled. A canonical example of this is when the /// `fun` provided to this function attempts to set a timeout for an /// invocation of a wasm function. In this situation the async closure is /// waiting both on (a) the wasm computation to finish, and (b) the timeout /// to elapse. At this time this setup will not always work and the timeout /// may not reliably fire. /// /// This function will not block the current thread and as such is always /// suitable to run in an `async` context, but the current implementation of /// Wasmtime can lead to situations where a certain wasm computation is /// required to make progress the closure to make progress. This is an /// artifact of Wasmtime's historical implementation of `async` functions /// and is the topic of [#11869] and [#11870]. In the timeout example from /// above it means that Wasmtime can get "wedged" for a bit where (a) must /// progress for a readiness notification of (b) to get delivered. /// /// This effectively means that it's not possible to reliably perform a /// "select" operation within the `fun` closure, which timeouts for example /// are based on. Fixing this requires some relatively major refactoring /// work within Wasmtime itself. This is a known pitfall otherwise and one /// that is intended to be fixed one day. In the meantime it's recommended /// to apply timeouts or such to the entire `run_concurrent` call itself /// rather than internally. /// /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 /// /// # Example /// /// ``` /// # use { /// # wasmtime::{ /// # error::{Result}, /// # component::{ Component, Linker, Resource, ResourceTable}, /// # Config, Engine, Store /// # }, /// # }; /// # /// # struct MyResource(u32); /// # struct Ctx { table: ResourceTable } /// # /// # async fn foo() -> Result<()> { /// # let mut config = Config::new(); /// # let engine = Engine::new(&config)?; /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); /// # let mut linker = Linker::new(&engine); /// # let component = Component::new(&engine, "")?; /// # let instance = linker.instantiate_async(&mut store, &component).await?; /// # let foo = instance.get_typed_func::<(Resource,), (Resource,)>(&mut store, "foo")?; /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?; /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; /// bar.call_concurrent(accessor, (value.0,)).await?; /// Ok(()) /// }).await??; /// # Ok(()) /// # } /// ``` pub async fn run_concurrent(self, fun: impl AsyncFnOnce(&Accessor) -> R) -> Result where T: Send + 'static, { ensure!( self.0.concurrency_support(), "cannot use `run_concurrent` when Config::concurrency_support disabled", ); self.do_run_concurrent(fun, false).await } pub(super) async fn run_concurrent_trap_on_idle( self, fun: impl AsyncFnOnce(&Accessor) -> R, ) -> Result where T: Send + 'static, { self.do_run_concurrent(fun, true).await } async fn do_run_concurrent( mut self, fun: impl AsyncFnOnce(&Accessor) -> R, trap_on_idle: bool, ) -> Result where T: Send + 'static, { debug_assert!(self.0.concurrency_support()); check_recursive_run(); let token = StoreToken::new(self.as_context_mut()); struct Dropper<'a, T: 'static, V> { store: StoreContextMut<'a, T>, value: ManuallyDrop, } impl<'a, T, V> Drop for Dropper<'a, T, V> { fn drop(&mut self) { tls::set(self.store.0, || { // SAFETY: Here we drop the value without moving it for the // first and only time -- per the contract for `Drop::drop`, // this code won't run again, and the `value` field will no // longer be accessible. unsafe { ManuallyDrop::drop(&mut self.value) } }); } } let accessor = &Accessor::new(token); let dropper = &mut Dropper { store: self, value: ManuallyDrop::new(fun(accessor)), }; // SAFETY: We never move `dropper` nor its `value` field. let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; dropper .store .as_context_mut() .poll_until(future, trap_on_idle) .await } /// Run this store's event loop. /// /// The returned future will resolve when the specified future completes or, /// if `trap_on_idle` is true, when the event loop can't make further /// progress. async fn poll_until( mut self, mut future: Pin<&mut impl Future>, trap_on_idle: bool, ) -> Result where T: Send + 'static, { struct Reset<'a, T: 'static> { store: StoreContextMut<'a, T>, futures: Option>, } impl<'a, T> Drop for Reset<'a, T> { fn drop(&mut self) { if let Some(futures) = self.futures.take() { *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); } } } loop { // Take `ConcurrentState::futures` out of the store so we can poll // it while also safely giving any of the futures inside access to // `self`. let futures = self.0.concurrent_state_mut().futures.get_mut().take(); let mut reset = Reset { store: self.as_context_mut(), futures, }; let mut next = match reset.futures.as_mut() { Some(f) => pin!(f.next()), None => bail_bug!("concurrent state missing futures field"), }; enum PollResult { Complete(R), ProcessWork(Vec), } let result = future::poll_fn(|cx| { // First, poll the future we were passed as an argument and // return immediately if it's ready. if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { return Poll::Ready(Ok(PollResult::Complete(value))); } // Next, poll `ConcurrentState::futures` (which includes any // pending host tasks and/or background tasks), returning // immediately if one of them fails. let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { Poll::Ready(Some(output)) => { match output { Err(e) => return Poll::Ready(Err(e)), Ok(()) => {} } Poll::Ready(true) } Poll::Ready(None) => Poll::Ready(false), Poll::Pending => Poll::Pending, }; // Next, collect the next batch of work items to process, if any. // This will be either all of the high-priority work items, or if // there are none, a single low-priority work item. let state = reset.store.0.concurrent_state_mut(); let ready = state.collect_work_items_to_run(); if !ready.is_empty() { return Poll::Ready(Ok(PollResult::ProcessWork(ready))); } // Finally, if we have nothing else to do right now, determine what to do // based on whether there are any pending futures in // `ConcurrentState::futures`. return match next { Poll::Ready(true) => { // In this case, one of the futures in // `ConcurrentState::futures` completed // successfully, so we return now and continue // the outer loop in case there is another one // ready to complete. Poll::Ready(Ok(PollResult::ProcessWork(Vec::new()))) } Poll::Ready(false) => { // Poll the future we were passed one last time // in case one of `ConcurrentState::futures` had // the side effect of unblocking it. if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { Poll::Ready(Ok(PollResult::Complete(value))) } else { // In this case, there are no more pending // futures in `ConcurrentState::futures`, // there are no remaining work items, _and_ // the future we were passed as an argument // still hasn't completed. if trap_on_idle { // `trap_on_idle` is true, so we exit // immediately. Poll::Ready(Err(Trap::AsyncDeadlock.into())) } else { // `trap_on_idle` is false, so we assume // that future will wake up and give us // more work to do when it's ready to. Poll::Pending } } } // There is at least one pending future in // `ConcurrentState::futures` and we have nothing // else to do but wait for now, so we return // `Pending`. Poll::Pending => Poll::Pending, }; }) .await; // Put the `ConcurrentState::futures` back into the store before we // return or handle any work items since one or more of those items // might append more futures. drop(reset); match result? { // The future we were passed as an argument completed, so we // return the result. PollResult::Complete(value) => break Ok(value), // The future we were passed has not yet completed, so handle // any work items and then loop again. PollResult::ProcessWork(ready) => { struct Dispose<'a, T: 'static, I: Iterator> { store: StoreContextMut<'a, T>, ready: I, } impl<'a, T, I: Iterator> Drop for Dispose<'a, T, I> { fn drop(&mut self) { while let Some(item) = self.ready.next() { match item { WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), WorkItem::PushFuture(future) => { tls::set(self.store.0, move || drop(future)) } _ => {} } } } } let mut dispose = Dispose { store: self.as_context_mut(), ready: ready.into_iter(), }; while let Some(item) = dispose.ready.next() { dispose .store .as_context_mut() .handle_work_item(item) .await?; } } } } } /// Handle the specified work item, possibly resuming a fiber if applicable. async fn handle_work_item(self, item: WorkItem) -> Result<()> where T: Send, { log::trace!("handle work item {item:?}"); match item { WorkItem::PushFuture(future) => { self.0 .concurrent_state_mut() .futures_mut()? .push(future.into_inner()); } WorkItem::ResumeFiber(fiber) => { self.0.resume_fiber(fiber).await?; } WorkItem::ResumeThread(_, thread) => { if let GuestThreadState::Ready(fiber) = mem::replace( &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state, GuestThreadState::Running, ) { self.0.resume_fiber(fiber).await?; } else { bail_bug!("cannot resume non-pending thread {thread:?}"); } } WorkItem::GuestCall(_, call) => { if call.is_ready(self.0)? { self.run_on_worker(WorkerItem::GuestCall(call)).await?; } else { let state = self.0.concurrent_state_mut(); let task = state.get_mut(call.thread.task)?; if !task.starting_sent { task.starting_sent = true; if let GuestCallKind::StartImplicit(_) = &call.kind { Waitable::Guest(call.thread.task).set_event( state, Some(Event::Subtask { status: Status::Starting, }), )?; } } let instance = state.get_mut(call.thread.task)?.instance; self.0 .instance_state(instance) .concurrent_state() .pending .insert(call.thread, call.kind); } } WorkItem::WorkerFunction(fun) => { self.run_on_worker(WorkerItem::Function(fun)).await?; } } Ok(()) } /// Execute the specified guest call on a worker fiber. async fn run_on_worker(self, item: WorkerItem) -> Result<()> where T: Send, { let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { fiber } else { fiber::make_fiber(self.0, move |store| { loop { let Some(item) = store.concurrent_state_mut().worker_item.take() else { bail_bug!("worker_item not present when resuming fiber") }; match item { WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, WorkerItem::Function(fun) => fun.into_inner()(store)?, } store.suspend(SuspendReason::NeedWork)?; } })? }; let worker_item = &mut self.0.concurrent_state_mut().worker_item; assert!(worker_item.is_none()); *worker_item = Some(item); self.0.resume_fiber(worker).await } /// Wrap the specified host function in a future which will call it, passing /// it an `&Accessor`. /// /// See the `Accessor` documentation for details. pub(crate) fn wrap_call(self, closure: F) -> impl Future> + 'static where T: 'static, F: FnOnce(&Accessor) -> Pin> + Send + '_>> + Send + Sync + 'static, R: Send + Sync + 'static, { let token = StoreToken::new(self); async move { let mut accessor = Accessor::new(token); closure(&mut accessor).await } } } impl StoreOpaque { /// Push a `GuestTask` onto the task stack for either a sync-to-sync, /// guest-to-guest call or a sync host-to-guest call. /// /// This task will only be used for the purpose of handling calls to /// intrinsic functions; both parameter lowering and result lifting are /// assumed to be taken care of elsewhere. pub(crate) fn enter_guest_sync_call( &mut self, guest_caller: Option, callee_async: bool, callee: RuntimeInstance, ) -> Result<()> { log::trace!("enter sync call {callee:?}"); if !self.concurrency_support() { return Ok(self.enter_call_not_concurrent()); } let state = self.concurrent_state_mut(); let thread = state.current_thread; let instance = if let Some(thread) = thread.guest() { Some(state.get_mut(thread.task)?.instance) } else { None }; if guest_caller.is_some() { debug_assert_eq!(instance, guest_caller); } let task = GuestTask::new( Box::new(move |_, _| bail_bug!("cannot lower params in sync call")), LiftResult { lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")), ty: TypeTupleIndex::reserved_value(), memory: None, string_encoding: StringEncoding::Utf8, }, if let Some(thread) = thread.guest() { Caller::Guest { thread: *thread } } else { Caller::Host { tx: None, host_future_present: false, caller: thread, } }, None, callee, callee_async, )?; let guest_task = state.push(task)?; let new_thread = GuestThread::new_implicit(state, guest_task)?; let guest_thread = state.push(new_thread)?; Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table( guest_thread, self, callee.index, )?; let state = self.concurrent_state_mut(); state.get_mut(guest_task)?.threads.insert(guest_thread); self.set_thread(QualifiedThreadId { task: guest_task, thread: guest_thread, })?; Ok(()) } /// Pop a `GuestTask` previously pushed using `enter_sync_call`. pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> { if !self.concurrency_support() { return Ok(self.exit_call_not_concurrent()); } let thread = match self.set_thread(CurrentThread::None)?.guest() { Some(t) => *t, None => bail_bug!("expected task when exiting"), }; let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance; log::trace!("exit sync call {instance:?}"); Instance::from_wasmtime(self, instance.instance).cleanup_thread( self, thread, instance.index, )?; let state = self.concurrent_state_mut(); let task = state.get_mut(thread.task)?; let caller = match &task.caller { &Caller::Guest { thread } => thread.into(), &Caller::Host { caller, .. } => caller, }; self.set_thread(caller)?; let state = self.concurrent_state_mut(); let task = state.get_mut(thread.task)?; if task.ready_to_delete() { state.delete(thread.task)?.dispose(state)?; } Ok(()) } /// Similar to `enter_guest_sync_call` except for when the guest makes a /// transition to the host. /// /// FIXME: this is called for all guest->host transitions and performs some /// relatively expensive table manipulations. This would ideally be /// optimized to avoid the full allocation of a `HostTask` in at least some /// situations. pub(crate) fn host_task_create(&mut self) -> Result>> { if !self.concurrency_support() { self.enter_call_not_concurrent(); return Ok(None); } let state = self.concurrent_state_mut(); let caller = state.current_guest_thread()?; let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?; log::trace!("new host task {task:?}"); self.set_thread(task)?; Ok(Some(task)) } /// Invoked before lowering the results of a host task to the guest. /// /// This is used to update the current thread annotations within the store /// to ensure that it reflects the guest task, not the host task, since /// lowering may execute guest code. pub fn host_task_reenter_caller(&mut self) -> Result<()> { if !self.concurrency_support() { return Ok(()); } let task = self.concurrent_state_mut().current_host_thread()?; let caller = self.concurrent_state_mut().get_mut(task)?.caller; self.set_thread(caller)?; Ok(()) } /// Dual of `host_task_create` and signifies that the host has finished and /// will be cleaned up. /// /// Note that this isn't invoked when the host is invoked asynchronously and /// the host isn't complete yet. In that situation the host task persists /// and will be cleaned up separately in `subtask_drop` pub(crate) fn host_task_delete(&mut self, task: Option>) -> Result<()> { match task { Some(task) => { log::trace!("delete host task {task:?}"); self.concurrent_state_mut().delete(task)?; } None => { self.exit_call_not_concurrent(); } } Ok(()) } /// Determine whether the specified instance may be entered from the host. /// /// We return `true` here only if all of the following hold: /// /// - The top-level instance is not already on the current task's call stack. /// - The instance is not in need of a post-return function call. /// - `self` has not been poisoned due to a trap. pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result { if self.trapped() { return Ok(false); } if !self.concurrency_support() { return Ok(true); } let state = self.concurrent_state_mut(); let mut cur = state.current_thread; loop { match cur { CurrentThread::None => break Ok(true), CurrentThread::Guest(thread) => { let task = state.get_mut(thread.task)?; // Note that we only compare top-level instance IDs here. // The idea is that the host is not allowed to recursively // enter a top-level instance even if the specific leaf // instance is not on the stack. This the behavior defined // in the spec, and it allows us to elide runtime checks in // guest-to-guest adapters. if task.instance.instance == instance.instance { break Ok(false); } cur = match task.caller { Caller::Host { caller, .. } => caller, Caller::Guest { thread } => thread.into(), }; } CurrentThread::Host(id) => { cur = state.get_mut(id)?.caller.into(); } } } } /// Helper function to retrieve the `InstanceState` for the /// specified instance. fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState { self.component_instance_mut(instance.instance) .instance_state(instance.index) } fn set_thread(&mut self, thread: impl Into) -> Result { // Each time we switch threads, we conservatively set `task_may_block` // to `false` for the component instance we're switching away from (if // any), meaning it will be `false` for any new thread created for that // instance unless explicitly set otherwise. let state = self.concurrent_state_mut(); let old_thread = mem::replace(&mut state.current_thread, thread.into()); if let Some(old_thread) = old_thread.guest() { let instance = state.get_mut(old_thread.task)?.instance.instance; self.component_instance_mut(instance) .set_task_may_block(false) } // If we're switching to a new thread, set its component instance's // `task_may_block` according to where it left off. if self.concurrent_state_mut().current_thread.guest().is_some() { self.set_task_may_block()?; } Ok(old_thread) } /// Set the global variable representing whether the current task may block /// prior to entering Wasm code. fn set_task_may_block(&mut self) -> Result<()> { let state = self.concurrent_state_mut(); let guest_thread = state.current_guest_thread()?; let instance = state.get_mut(guest_thread.task)?.instance.instance; let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?; self.component_instance_mut(instance) .set_task_may_block(may_block); Ok(()) } pub(crate) fn check_blocking(&mut self) -> Result<()> { if !self.concurrency_support() { return Ok(()); } let state = self.concurrent_state_mut(); let task = state.current_guest_thread()?.task; let instance = state.get_mut(task)?.instance.instance; let task_may_block = self.component_instance(instance).get_task_may_block(); if task_may_block { Ok(()) } else { Err(Trap::CannotBlockSyncTask.into()) } } /// Record that we're about to enter a (sub-)component instance which does /// not support more than one concurrent, stackful activation, meaning it /// cannot be entered again until the next call returns. fn enter_instance(&mut self, instance: RuntimeInstance) { log::trace!("enter {instance:?}"); self.instance_state(instance) .concurrent_state() .do_not_enter = true; } /// Record that we've exited a (sub-)component instance previously entered /// with `Self::enter_instance` and then calls `Self::partition_pending`. /// See the documentation for the latter for details. fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> { log::trace!("exit {instance:?}"); self.instance_state(instance) .concurrent_state() .do_not_enter = false; self.partition_pending(instance) } /// Iterate over `InstanceState::pending`, moving any ready items into the /// "high priority" work item queue. /// /// See `GuestCall::is_ready` for details. fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { for (thread, kind) in mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter() { let call = GuestCall { thread, kind }; if call.is_ready(self)? { self.concurrent_state_mut() .push_high_priority(WorkItem::GuestCall(instance.index, call)); } else { self.instance_state(instance) .concurrent_state() .pending .insert(call.thread, call.kind); } } Ok(()) } /// Implements the `backpressure.{inc,dec}` intrinsics. pub(crate) fn backpressure_modify( &mut self, caller_instance: RuntimeInstance, modify: impl FnOnce(u16) -> Option, ) -> Result<()> { let state = self.instance_state(caller_instance).concurrent_state(); let old = state.backpressure; let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?; state.backpressure = new; if old > 0 && new == 0 { // Backpressure was previously enabled and is now disabled; move any // newly-eligible guest calls to the "high priority" queue. self.partition_pending(caller_instance)?; } Ok(()) } /// Resume the specified fiber, giving it exclusive access to the specified /// store. async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { let old_thread = self.concurrent_state_mut().current_thread; log::trace!("resume_fiber: save current thread {old_thread:?}"); let fiber = fiber::resolve_or_release(self, fiber).await?; self.set_thread(old_thread)?; let state = self.concurrent_state_mut(); if let Some(ot) = old_thread.guest() { state.get_mut(ot.thread)?.state = GuestThreadState::Running; } log::trace!("resume_fiber: restore current thread {old_thread:?}"); if let Some(mut fiber) = fiber { log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); // See the `SuspendReason` documentation for what each case means. let reason = match state.suspend_reason.take() { Some(r) => r, None => bail_bug!("suspend reason missing when resuming fiber"), }; match reason { SuspendReason::NeedWork => { if state.worker.is_none() { state.worker = Some(fiber); } else { fiber.dispose(self); } } SuspendReason::Yielding { thread, .. } => { state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber); let instance = state.get_mut(thread.task)?.instance.index; state.push_low_priority(WorkItem::ResumeThread(instance, thread)); } SuspendReason::ExplicitlySuspending { thread, .. } => { state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); } SuspendReason::Waiting { set, thread, .. } => { let old = state .get_mut(set)? .waiting .insert(thread, WaitMode::Fiber(fiber)); assert!(old.is_none()); } }; } else { log::trace!("resume_fiber: fiber has exited"); } Ok(()) } /// Suspend the current fiber, storing the reason in /// `ConcurrentState::suspend_reason` to indicate the conditions under which /// it should be resumed. /// /// See the `SuspendReason` documentation for details. fn suspend(&mut self, reason: SuspendReason) -> Result<()> { log::trace!("suspend fiber: {reason:?}"); // If we're yielding or waiting on behalf of a guest thread, we'll need to // pop the call context which manages resource borrows before suspending // and then push it again once we've resumed. let task = match &reason { SuspendReason::Yielding { thread, .. } | SuspendReason::Waiting { thread, .. } | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), SuspendReason::NeedWork => None, }; let old_guest_thread = if task.is_some() { self.concurrent_state_mut().current_thread } else { CurrentThread::None }; // We should not have reached here unless either there's no current // task, or the current task is permitted to block. In addition, we // special-case `thread.switch-to` and waiting for a subtask to go from // `starting` to `started`, both of which we consider non-blocking // operations despite requiring a suspend. debug_assert!( matches!( reason, SuspendReason::ExplicitlySuspending { skip_may_block_check: true, .. } | SuspendReason::Waiting { skip_may_block_check: true, .. } | SuspendReason::Yielding { skip_may_block_check: true, .. } ) || old_guest_thread .guest() .map(|thread| self.concurrent_state_mut().may_block(thread.task)) .transpose()? .unwrap_or(true) ); let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; assert!(suspend_reason.is_none()); *suspend_reason = Some(reason); self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; if task.is_some() { self.set_thread(old_guest_thread)?; } Ok(()) } fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { let state = self.concurrent_state_mut(); let caller = state.current_guest_thread()?; let old_set = waitable.common(state)?.set; let set = state.get_mut(caller.thread)?.sync_call_set; waitable.join(state, Some(set))?; self.suspend(SuspendReason::Waiting { set, thread: caller, skip_may_block_check: false, })?; let state = self.concurrent_state_mut(); waitable.join(state, old_set) } } impl Instance { /// Get the next pending event for the specified task and (optional) /// waitable set, along with the waitable handle if applicable. fn get_event( self, store: &mut StoreOpaque, guest_task: TableId, set: Option>, cancellable: bool, ) -> Result)>> { let state = store.concurrent_state_mut(); let event = &mut state.get_mut(guest_task)?.event; if let Some(ev) = event && (cancellable || !matches!(ev, Event::Cancelled)) { log::trace!("deliver event {ev:?} to {guest_task:?}"); let ev = *ev; *event = None; return Ok(Some((ev, None))); } let set = match set { Some(set) => set, None => return Ok(None), }; let waitable = match state.get_mut(set)?.ready.pop_first() { Some(v) => v, None => return Ok(None), }; let common = waitable.common(state)?; let handle = match common.handle { Some(h) => h, None => bail_bug!("handle not set when delivering event"), }; let event = match common.event.take() { Some(e) => e, None => bail_bug!("event not set when delivering event"), }; log::trace!( "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" ); waitable.on_delivery(store, self, event)?; Ok(Some((event, Some((waitable, handle))))) } /// Handle the `CallbackCode` returned from an async-lifted export or its /// callback. /// /// If this returns `Ok(Some(call))`, then `call` should be run immediately /// using `handle_guest_call`. fn handle_callback_code( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, code: u32, ) -> Result> { let (code, set) = unpack_callback_code(code); log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); let state = store.concurrent_state_mut(); let get_set = |store: &mut StoreOpaque, handle| -> Result<_> { let set = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: runtime_instance, }) .handle_table() .waitable_set_rep(handle)?; Ok(TableId::::new(set)) }; Ok(match code { callback_code::EXIT => { log::trace!("implicit thread {guest_thread:?} completed"); self.cleanup_thread(store, guest_thread, runtime_instance)?; let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; if task.threads.is_empty() && !task.returned_or_cancelled() { bail!(Trap::NoAsyncResult); } if let Caller::Guest { .. } = task.caller { task.exited = true; task.callback = None; } if task.ready_to_delete() { Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?; } None } callback_code::YIELD => { let task = state.get_mut(guest_thread.task)?; // If an `Event::Cancelled` is pending, we'll deliver that; // otherwise, we'll deliver `Event::None`. Note that // `GuestTask::event` is only ever set to one of those two // `Event` variants. if let Some(event) = task.event { assert!(matches!(event, Event::None | Event::Cancelled)); } else { task.event = Some(Event::None); } let call = GuestCall { thread: guest_thread, kind: GuestCallKind::DeliverEvent { instance: self, set: None, }, }; if state.may_block(guest_thread.task)? { // Push this thread onto the "low priority" queue so it runs // after any other threads have had a chance to run. state.push_low_priority(WorkItem::GuestCall(runtime_instance, call)); None } else { // Yielding in a non-blocking context is defined as a no-op // according to the spec, so we must run this thread // immediately without allowing any others to run. Some(call) } } callback_code::WAIT => { // The task may only return `WAIT` if it was created for a call // to an async export). Otherwise, we'll trap. state.check_blocking_for(guest_thread.task)?; let set = get_set(store, set)?; let state = store.concurrent_state_mut(); if state.get_mut(guest_thread.task)?.event.is_some() || !state.get_mut(set)?.ready.is_empty() { // An event is immediately available; deliver it ASAP. state.push_high_priority(WorkItem::GuestCall( runtime_instance, GuestCall { thread: guest_thread, kind: GuestCallKind::DeliverEvent { instance: self, set: Some(set), }, }, )); } else { // No event is immediately available. // // We're waiting, so register to be woken up when an event // is published for this waitable set. // // Here we also set `GuestTask::wake_on_cancel` which allows // `subtask.cancel` to interrupt the wait. let old = state .get_mut(guest_thread.thread)? .wake_on_cancel .replace(set); if !old.is_none() { bail_bug!("thread unexpectedly had wake_on_cancel set"); } let old = state .get_mut(set)? .waiting .insert(guest_thread, WaitMode::Callback(self)); if !old.is_none() { bail_bug!("set's waiting set already had this thread registered"); } } None } _ => bail!(Trap::UnsupportedCallbackCode), }) } fn cleanup_thread( self, store: &mut StoreOpaque, guest_thread: QualifiedThreadId, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result<()> { let state = store.concurrent_state_mut(); let thread_data = state.get_mut(guest_thread.thread)?; let guest_id = match thread_data.instance_rep { Some(id) => id, None => bail_bug!("thread must have instance_rep set by now"), }; let sync_call_set = thread_data.sync_call_set; // Clean up any pending subtasks in the sync_call_set for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) { if let Some(Event::Subtask { status: Status::Returned | Status::ReturnCancelled, }) = waitable.common(state)?.event { waitable.delete_from(state)?; } } store .instance_state(RuntimeInstance { instance: self.id().instance(), index: runtime_instance, }) .thread_handle_table() .guest_thread_remove(guest_id)?; store.concurrent_state_mut().delete(guest_thread.thread)?; store.concurrent_state_mut().delete(sync_call_set)?; let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; task.threads.remove(&guest_thread.thread); Ok(()) } /// Add the specified guest call to the "high priority" work item queue, to /// be started as soon as backpressure and/or reentrance rules allow. /// /// SAFETY: The raw pointer arguments must be valid references to guest /// functions (with the appropriate signatures) when the closures queued by /// this function are called. unsafe fn queue_call( self, mut store: StoreContextMut, guest_thread: QualifiedThreadId, callee: SendSyncPtr, param_count: usize, result_count: usize, async_: bool, callback: Option>, post_return: Option>, ) -> Result<()> { /// Return a closure which will call the specified function in the scope /// of the specified task. /// /// This will use `GuestTask::lower_params` to lower the parameters, but /// will not lift the result; instead, it returns a /// `[MaybeUninit; MAX_FLAT_PARAMS]` from which the result, if /// any, may be lifted. Note that an async-lifted export will have /// returned its result using the `task.return` intrinsic (or not /// returned a result at all, in the case of `task.cancel`), in which /// case the "result" of this call will either be a callback code or /// nothing. /// /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when /// the returned closure is called. unsafe fn make_call( store: StoreContextMut, guest_thread: QualifiedThreadId, callee: SendSyncPtr, param_count: usize, result_count: usize, ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit; MAX_FLAT_PARAMS]> + Send + Sync + 'static + use { let token = StoreToken::new(store); move |store: &mut dyn VMStore| { let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; store .concurrent_state_mut() .get_mut(guest_thread.thread)? .state = GuestThreadState::Running; let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; let lower = match task.lower_params.take() { Some(l) => l, None => bail_bug!("lower_params missing"), }; lower(store, &mut storage[..param_count])?; let mut store = token.as_context_mut(store); // SAFETY: Per the contract documented in `make_call's` // documentation, `callee` must be a valid pointer. unsafe { crate::Func::call_unchecked_raw( &mut store, callee.as_non_null(), NonNull::new( &mut storage[..param_count.max(result_count)] as *mut [MaybeUninit] as _, ) .unwrap(), )?; } Ok(storage) } } // SAFETY: Per the contract described in this function documentation, // the `callee` pointer which `call` closes over must be valid when // called by the closure we queue below. let call = unsafe { make_call( store.as_context_mut(), guest_thread, callee, param_count, result_count, ) }; let callee_instance = store .0 .concurrent_state_mut() .get_mut(guest_thread.task)? .instance; let fun = if callback.is_some() { assert!(async_); Box::new(move |store: &mut dyn VMStore| { self.add_guest_thread_to_instance_table( guest_thread.thread, store, callee_instance.index, )?; let old_thread = store.set_thread(guest_thread)?; log::trace!( "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" ); store.enter_instance(callee_instance); // SAFETY: See the documentation for `make_call` to review the // contract we must uphold for `call` here. // // Per the contract described in the `queue_call` // documentation, the `callee` pointer which `call` closes // over must be valid. let storage = call(store)?; store.exit_instance(callee_instance)?; store.set_thread(old_thread)?; let state = store.concurrent_state_mut(); if let Some(t) = old_thread.guest() { state.get_mut(t.thread)?.state = GuestThreadState::Running; } log::trace!("stackless call: restored {old_thread:?} as current thread"); // SAFETY: `wasmparser` will have validated that the callback // function returns a `i32` result. let code = unsafe { storage[0].assume_init() }.get_i32() as u32; self.handle_callback_code(store, guest_thread, callee_instance.index, code) }) as Box Result> + Send + Sync> } else { let token = StoreToken::new(store.as_context_mut()); Box::new(move |store: &mut dyn VMStore| { self.add_guest_thread_to_instance_table( guest_thread.thread, store, callee_instance.index, )?; let old_thread = store.set_thread(guest_thread)?; log::trace!( "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", ); let flags = self.id().get(store).instance_flags(callee_instance.index); // Unless this is a callback-less (i.e. stackful) // async-lifted export, we need to record that the instance // cannot be entered until the call returns. if !async_ { store.enter_instance(callee_instance); } // SAFETY: See the documentation for `make_call` to review the // contract we must uphold for `call` here. // // Per the contract described in the `queue_call` // documentation, the `callee` pointer which `call` closes // over must be valid. let storage = call(store)?; if async_ { let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; if task.threads.len() == 1 && !task.returned_or_cancelled() { bail!(Trap::NoAsyncResult); } } else { // This is a sync-lifted export, so now is when we lift the // result, optionally call the post-return function, if any, // and finally notify any current or future waiters that the // subtask has returned. let lift = { store.exit_instance(callee_instance)?; let state = store.concurrent_state_mut(); if !state.get_mut(guest_thread.task)?.result.is_none() { bail_bug!("task has not yet produced a result"); } match state.get_mut(guest_thread.task)?.lift_result.take() { Some(lift) => lift, None => bail_bug!("lift_result field is missing"), } }; // SAFETY: `result_count` represents the number of core Wasm // results returned, per `wasmparser`. let result = (lift.lift)(store, unsafe { mem::transmute::<&[MaybeUninit], &[ValRaw]>( &storage[..result_count], ) })?; let post_return_arg = match result_count { 0 => ValRaw::i32(0), // SAFETY: `result_count` represents the number of // core Wasm results returned, per `wasmparser`. 1 => unsafe { storage[0].assume_init() }, _ => unreachable!(), }; unsafe { call_post_return( token.as_context_mut(store), post_return.map(|v| v.as_non_null()), post_return_arg, flags, )?; } self.task_complete(store, guest_thread.task, result, Status::Returned)?; } // This is a callback-less call, so the implicit thread has now completed self.cleanup_thread(store, guest_thread, callee_instance.index)?; store.set_thread(old_thread)?; let state = store.concurrent_state_mut(); let task = state.get_mut(guest_thread.task)?; match &task.caller { Caller::Host { .. } => { if task.ready_to_delete() { Waitable::Guest(guest_thread.task).delete_from(state)?; } } Caller::Guest { .. } => { task.exited = true; } } Ok(None) }) }; store .0 .concurrent_state_mut() .push_high_priority(WorkItem::GuestCall( callee_instance.index, GuestCall { thread: guest_thread, kind: GuestCallKind::StartImplicit(fun), }, )); Ok(()) } /// Prepare (but do not start) a guest->guest call. /// /// This is called from fused adapter code generated in /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` /// are synthesized Wasm functions which move the parameters from the caller /// to the callee and the result from the callee to the caller, /// respectively. The adapter will call `Self::start_call` immediately /// after calling this function. /// /// SAFETY: All the pointer arguments must be valid pointers to guest /// entities (and with the expected signatures for the function references /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). unsafe fn prepare_call( self, mut store: StoreContextMut, start: NonNull, return_: NonNull, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, memory: *mut VMMemoryDefinition, string_encoding: StringEncoding, caller_info: CallerInfo, ) -> Result<()> { if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { // A task may only call an async-typed function via a sync lower if // it was created by a call to an async export. Otherwise, we'll // trap. store.0.check_blocking()?; } enum ResultInfo { Heap { results: u32 }, Stack { result_count: u32 }, } let result_info = match &caller_info { CallerInfo::Async { has_result: true, params, } => ResultInfo::Heap { results: match params.last() { Some(r) => r.get_u32(), None => bail_bug!("retptr missing"), }, }, CallerInfo::Async { has_result: false, .. } => ResultInfo::Stack { result_count: 0 }, CallerInfo::Sync { result_count, params, } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap { results: match params.last() { Some(r) => r.get_u32(), None => bail_bug!("arg ptr missing"), }, }, CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { result_count: *result_count, }, }; let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); // Create a new guest task for the call, closing over the `start` and // `return_` functions to lift the parameters and lower the result, // respectively. let start = SendSyncPtr::new(start); let return_ = SendSyncPtr::new(return_); let token = StoreToken::new(store.as_context_mut()); let state = store.0.concurrent_state_mut(); let old_thread = state.current_guest_thread()?; debug_assert_eq!( state.get_mut(old_thread.task)?.instance, RuntimeInstance { instance: self.id().instance(), index: caller_instance, } ); let new_task = GuestTask::new( Box::new(move |store, dst| { let mut store = token.as_context_mut(store); assert!(dst.len() <= MAX_FLAT_PARAMS); // The `+ 1` here accounts for the return pointer, if any: let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; let count = match caller_info { // Async callers, if they have a result, use the last // parameter as a return pointer so chop that off if // relevant here. CallerInfo::Async { params, has_result } => { let params = ¶ms[..params.len() - usize::from(has_result)]; for (param, src) in params.iter().zip(&mut src) { src.write(*param); } params.len() } // Sync callers forward everything directly. CallerInfo::Sync { params, .. } => { for (param, src) in params.iter().zip(&mut src) { src.write(*param); } params.len() } }; // SAFETY: `start` is a valid `*mut VMFuncRef` from // `wasmtime-cranelift`-generated fused adapter code. Based on // how it was constructed (see // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` // for details) we know it takes count parameters and returns // `dst.len()` results. unsafe { crate::Func::call_unchecked_raw( &mut store, start.as_non_null(), NonNull::new( &mut src[..count.max(dst.len())] as *mut [MaybeUninit] as _, ) .unwrap(), )?; } dst.copy_from_slice(&src[..dst.len()]); let state = store.0.concurrent_state_mut(); Waitable::Guest(state.current_guest_thread()?.task).set_event( state, Some(Event::Subtask { status: Status::Started, }), )?; Ok(()) }), LiftResult { lift: Box::new(move |store, src| { // SAFETY: See comment in closure passed as `lower_params` // parameter above. let mut store = token.as_context_mut(store); let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? if let ResultInfo::Heap { results } = &result_info { my_src.push(ValRaw::u32(*results)); } // Execute the `return_` hook, generated by Wasmtime's FACT // compiler, in the context of the old thread. The old // thread, this thread's caller, may have `realloc` // callbacks invoked for example and those need the correct // context set for the current thread. let prev = store.0.set_thread(old_thread)?; // SAFETY: `return_` is a valid `*mut VMFuncRef` from // `wasmtime-cranelift`-generated fused adapter code. Based // on how it was constructed (see // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` // for details) we know it takes `src.len()` parameters and // returns up to 1 result. unsafe { crate::Func::call_unchecked_raw( &mut store, return_.as_non_null(), my_src.as_mut_slice().into(), )?; } // Restore the previous current thread after the // lifting/lowering has returned. store.0.set_thread(prev)?; let state = store.0.concurrent_state_mut(); let thread = state.current_guest_thread()?; if sync_caller { state.get_mut(thread.task)?.sync_result = SyncResult::Produced( if let ResultInfo::Stack { result_count } = &result_info { match result_count { 0 => None, 1 => Some(my_src[0]), _ => unreachable!(), } } else { None }, ); } Ok(Box::new(DummyResult) as Box) }), ty: task_return_type, memory: NonNull::new(memory).map(SendSyncPtr::new), string_encoding, }, Caller::Guest { thread: old_thread }, None, RuntimeInstance { instance: self.id().instance(), index: callee_instance, }, callee_async, )?; let guest_task = state.push(new_task)?; let new_thread = GuestThread::new_implicit(state, guest_task)?; let guest_thread = state.push(new_thread)?; state.get_mut(guest_task)?.threads.insert(guest_thread); // Make the new thread the current one so that `Self::start_call` knows // which one to start. store.0.set_thread(QualifiedThreadId { task: guest_task, thread: guest_thread, })?; log::trace!( "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" ); Ok(()) } /// Call the specified callback function for an async-lifted export. /// /// SAFETY: `function` must be a valid reference to a guest function of the /// correct signature for a callback. unsafe fn call_callback( self, mut store: StoreContextMut, function: SendSyncPtr, event: Event, handle: u32, ) -> Result { let (ordinal, result) = event.parts(); let params = &mut [ ValRaw::u32(ordinal), ValRaw::u32(handle), ValRaw::u32(result), ]; // SAFETY: `func` is a valid `*mut VMFuncRef` from either // `wasmtime-cranelift`-generated fused adapter code or // `component::Options`. Per `wasmparser` callback signature // validation, we know it takes three parameters and returns one. unsafe { crate::Func::call_unchecked_raw( &mut store, function.as_non_null(), params.as_mut_slice().into(), )?; } Ok(params[0].get_u32()) } /// Start a guest->guest call previously prepared using /// `Self::prepare_call`. /// /// This is called from fused adapter code generated in /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call /// this function immediately after calling `Self::prepare_call`. /// /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest /// functions with the appropriate signatures for the current guest task. /// If this is a call to an async-lowered import, the actual call may be /// deferred and run after this function returns, in which case the pointer /// arguments must also be valid when the call happens. unsafe fn start_call( self, mut store: StoreContextMut, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull, param_count: u32, result_count: u32, flags: u32, storage: Option<&mut [MaybeUninit]>, ) -> Result { let token = StoreToken::new(store.as_context_mut()); let async_caller = storage.is_none(); let state = store.0.concurrent_state_mut(); let guest_thread = state.current_guest_thread()?; let callee_async = state.get_mut(guest_thread.task)?.async_function; let callee = SendSyncPtr::new(callee); let param_count = usize::try_from(param_count)?; assert!(param_count <= MAX_FLAT_PARAMS); let result_count = usize::try_from(result_count)?; assert!(result_count <= MAX_FLAT_RESULTS); let task = state.get_mut(guest_thread.task)?; if let Some(callback) = NonNull::new(callback) { // We're calling an async-lifted export with a callback, so store // the callback and related context as part of the task so we can // call it later when needed. let callback = SendSyncPtr::new(callback); task.callback = Some(Box::new(move |store, event, handle| { let store = token.as_context_mut(store); unsafe { self.call_callback::(store, callback, event, handle) } })); } let Caller::Guest { thread: caller } = &task.caller else { // As of this writing, `start_call` is only used for guest->guest // calls. bail_bug!("start_call unexpectedly invoked for host->guest call"); }; let caller = *caller; let caller_instance = state.get_mut(caller.task)?.instance; // Queue the call as a "high priority" work item. unsafe { self.queue_call( store.as_context_mut(), guest_thread, callee, param_count, result_count, (flags & START_FLAG_ASYNC_CALLEE) != 0, NonNull::new(callback).map(SendSyncPtr::new), NonNull::new(post_return).map(SendSyncPtr::new), )?; } let state = store.0.concurrent_state_mut(); // Use the caller's `GuestThread::sync_call_set` to register interest in // the subtask... let guest_waitable = Waitable::Guest(guest_thread.task); let old_set = guest_waitable.common(state)?.set; let set = state.get_mut(caller.thread)?.sync_call_set; guest_waitable.join(state, Some(set))?; // ... and suspend this fiber temporarily while we wait for it to start. // // Note that we _could_ call the callee directly using the current fiber // rather than suspend this one, but that would make reasoning about the // event loop more complicated and is probably only worth doing if // there's a measurable performance benefit. In addition, it would mean // blocking the caller if the callee calls a blocking sync-lowered // import, and as of this writing the spec says we must not do that. // // Alternatively, the fused adapter code could be modified to call the // callee directly without calling a host-provided intrinsic at all (in // which case it would need to do its own, inline backpressure checks, // etc.). Again, we'd want to see a measurable performance benefit // before committing to such an optimization. And again, we'd need to // update the spec to allow that. let (status, waitable) = loop { store.0.suspend(SuspendReason::Waiting { set, thread: caller, // Normally, `StoreOpaque::suspend` would assert it's being // called from a context where blocking is allowed. However, if // `async_caller` is `true`, we'll only "block" long enough for // the callee to start, i.e. we won't repeat this loop, so we // tell `suspend` it's okay even if we're not allowed to block. // Alternatively, if the callee is not an async function, then // we know it won't block anyway. skip_may_block_check: async_caller || !callee_async, })?; let state = store.0.concurrent_state_mut(); log::trace!("taking event for {:?}", guest_thread.task); let event = guest_waitable.take_event(state)?; let Some(Event::Subtask { status }) = event else { bail_bug!("subtasks should only get subtask events, got {event:?}") }; log::trace!("status {status:?} for {:?}", guest_thread.task); if status == Status::Returned { // It returned, so we can stop waiting. break (status, None); } else if async_caller { // It hasn't returned yet, but the caller is calling via an // async-lowered import, so we generate a handle for the task // waitable and return the status. let handle = store .0 .instance_state(caller_instance) .handle_table() .subtask_insert_guest(guest_thread.task.rep())?; store .0 .concurrent_state_mut() .get_mut(guest_thread.task)? .common .handle = Some(handle); break (status, Some(handle)); } else { // The callee hasn't returned yet, and the caller is calling via // a sync-lowered import, so we loop and keep waiting until the // callee returns. } }; guest_waitable.join(store.0.concurrent_state_mut(), old_set)?; // Reset the current thread to point to the caller as it resumes control. store.0.set_thread(caller)?; store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running; log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); if let Some(storage) = storage { // The caller used a sync-lowered import to call an async-lifted // export, in which case the result, if any, has been stashed in // `GuestTask::sync_result`. let state = store.0.concurrent_state_mut(); let task = state.get_mut(guest_thread.task)?; if let Some(result) = task.sync_result.take()? { if let Some(result) = result { storage[0] = MaybeUninit::new(result); } if task.exited && task.ready_to_delete() { Waitable::Guest(guest_thread.task).delete_from(state)?; } } } Ok(status.pack(waitable)) } /// Poll the specified future once on behalf of a guest->host call using an /// async-lowered import. /// /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns /// `Pending`, add it to the set of futures to be polled as part of this /// instance's event loop until it completes, and then return /// `Ok(Some(handle))` where `handle` is the waitable handle to return. /// /// Whether the future returns `Ready` immediately or later, the `lower` /// function will be used to lower the result, if any, into the guest caller's /// stack and linear memory. The `lower` function is invoked with `None` if /// the future is cancelled. pub(crate) fn first_poll( self, mut store: StoreContextMut<'_, T>, future: impl Future> + Send + 'static, lower: impl FnOnce(StoreContextMut, Option) -> Result<()> + Send + 'static, ) -> Result> { let token = StoreToken::new(store.as_context_mut()); let state = store.0.concurrent_state_mut(); let task = state.current_host_thread()?; // Create an abortable future which hooks calls to poll and manages call // context state for the future. let (join_handle, future) = JoinHandle::run(future); { let state = &mut state.get_mut(task)?.state; assert!(matches!(state, HostTaskState::CalleeStarted)); *state = HostTaskState::CalleeRunning(join_handle); } let mut future = Box::pin(future); // Finally, poll the future. We can use a dummy `Waker` here because // we'll add the future to `ConcurrentState::futures` and poll it // automatically from the event loop if it doesn't complete immediately // here. let poll = tls::set(store.0, || { future .as_mut() .poll(&mut Context::from_waker(&Waker::noop())) }); match poll { // It finished immediately; lower the result and delete the task. Poll::Ready(result) => { let result = result.transpose()?; lower(store.as_context_mut(), result)?; return Ok(None); } // Future isn't ready yet, so fall through. Poll::Pending => {} } // It hasn't finished yet; add the future to // `ConcurrentState::futures` so it will be polled by the event // loop and allocate a waitable handle to return to the guest. // Wrap the future in a closure responsible for lowering the result into // the guest's stack and memory, as well as notifying any waiters that // the task returned. let future = Box::pin(async move { let result = match future.await { Some(result) => Some(result?), None => None, }; let on_complete = move |store: &mut dyn VMStore| { // Restore the `current_thread` to be the host so `lower` knows // how to manipulate borrows and knows which scope of borrows // to check. let mut store = token.as_context_mut(store); let old = store.0.set_thread(task)?; let status = if result.is_some() { Status::Returned } else { Status::ReturnCancelled }; lower(store.as_context_mut(), result)?; let state = store.0.concurrent_state_mut(); match &mut state.get_mut(task)?.state { // The task is already flagged as finished because it was // cancelled. No need to transition further. HostTaskState::CalleeDone { .. } => {} // Otherwise transition this task to the done state. other => *other = HostTaskState::CalleeDone { cancelled: false }, } Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?; store.0.set_thread(old)?; Ok(()) }; // Here we schedule a task to run on a worker fiber to do the // lowering since it may involve a call to the guest's realloc // function. This is necessary because calling the guest while // there are host embedder frames on the stack is unsound. tls::get(move |store| { store .concurrent_state_mut() .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new( on_complete, )))); Ok(()) }) }); // Make this task visible to the guest and then record what it // was made visible as. let state = store.0.concurrent_state_mut(); state.push_future(future); let caller = state.get_mut(task)?.caller; let instance = state.get_mut(caller.task)?.instance; let handle = store .0 .instance_state(instance) .handle_table() .subtask_insert_host(task.rep())?; store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}"); // Restore the currently running thread to this host task's // caller. Note that the host task isn't deallocated as it's // within the store and will get deallocated later. store.0.set_thread(caller)?; Ok(Some(handle)) } /// Implements the `task.return` intrinsic, lifting the result for the /// current guest task. pub(crate) fn task_return( self, store: &mut dyn VMStore, ty: TypeTupleIndex, options: OptionsIndex, storage: &[ValRaw], ) -> Result<()> { let state = store.concurrent_state_mut(); let guest_thread = state.current_guest_thread()?; let lift = state .get_mut(guest_thread.task)? .lift_result .take() .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?; if !state.get_mut(guest_thread.task)?.result.is_none() { bail_bug!("task result unexpectedly already set"); } let CanonicalOptions { string_encoding, data_model, .. } = &self.id().get(store).component().env_component().options[options]; let invalid = ty != lift.ty || string_encoding != &lift.string_encoding || match data_model { CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { Some(memory) => { let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); let actual = self.id().get(store).runtime_memory(memory); expected != actual.as_ptr() } // Memory not specified, meaning it didn't need to be // specified per validation, so not invalid. None => false, }, // Always invalid as this isn't supported. CanonicalOptionsDataModel::Gc { .. } => true, }; if invalid { bail!(Trap::TaskReturnInvalid); } log::trace!("task.return for {guest_thread:?}"); let result = (lift.lift)(store, storage)?; self.task_complete(store, guest_thread.task, result, Status::Returned) } /// Implements the `task.cancel` intrinsic. pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> { let state = store.concurrent_state_mut(); let guest_thread = state.current_guest_thread()?; let task = state.get_mut(guest_thread.task)?; if !task.cancel_sent { bail!(Trap::TaskCancelNotCancelled); } _ = task .lift_result .take() .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?; if !task.result.is_none() { bail_bug!("task result should not bet set yet"); } log::trace!("task.cancel for {guest_thread:?}"); self.task_complete( store, guest_thread.task, Box::new(DummyResult), Status::ReturnCancelled, ) } /// Complete the specified guest task (i.e. indicate that it has either /// returned a (possibly empty) result or cancelled itself). /// /// This will return any resource borrows and notify any current or future /// waiters that the task has completed. fn task_complete( self, store: &mut StoreOpaque, guest_task: TableId, result: Box, status: Status, ) -> Result<()> { store .component_resource_tables(Some(self)) .validate_scope_exit()?; let state = store.concurrent_state_mut(); let task = state.get_mut(guest_task)?; if let Caller::Host { tx, .. } = &mut task.caller { if let Some(tx) = tx.take() { _ = tx.send(result); } } else { task.result = Some(result); Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; } Ok(()) } /// Implements the `waitable-set.new` intrinsic. pub(crate) fn waitable_set_new( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, ) -> Result { let set = store.concurrent_state_mut().push(WaitableSet::default())?; let handle = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .waitable_set_insert(set.rep())?; log::trace!("new waitable set {set:?} (handle {handle})"); Ok(handle) } /// Implements the `waitable-set.drop` intrinsic. pub(crate) fn waitable_set_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, set: u32, ) -> Result<()> { let rep = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .waitable_set_remove(set)?; log::trace!("drop waitable set {rep} (handle {set})"); let set = store .concurrent_state_mut() .delete(TableId::::new(rep))?; if !set.waiting.is_empty() { bail!(Trap::WaitableSetDropHasWaiters); } Ok(()) } /// Implements the `waitable.join` intrinsic. pub(crate) fn waitable_join( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, waitable_handle: u32, set_handle: u32, ) -> Result<()> { let mut instance = self.id().get_mut(store); let waitable = Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; let set = if set_handle == 0 { None } else { let set = instance.instance_states().0[caller_instance] .handle_table() .waitable_set_rep(set_handle)?; Some(TableId::::new(set)) }; log::trace!( "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", ); waitable.join(store.concurrent_state_mut(), set) } /// Implements the `subtask.drop` intrinsic. pub(crate) fn subtask_drop( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, task_id: u32, ) -> Result<()> { self.waitable_join(store, caller_instance, task_id, 0)?; let (rep, is_host) = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .subtask_remove(task_id)?; let concurrent_state = store.concurrent_state_mut(); let (waitable, expected_caller, delete) = if is_host { let id = TableId::::new(rep); let task = concurrent_state.get_mut(id)?; match &task.state { HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved), HostTaskState::CalleeDone { .. } => {} HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => { bail_bug!("invalid state for callee in `subtask.drop`") } } (Waitable::Host(id), task.caller, true) } else { let id = TableId::::new(rep); let task = concurrent_state.get_mut(id)?; if task.lift_result.is_some() { bail!(Trap::SubtaskDropNotResolved); } if let Caller::Guest { thread } = task.caller { ( Waitable::Guest(id), thread, concurrent_state.get_mut(id)?.exited, ) } else { bail_bug!("expected guest caller for `subtask.drop`") } }; waitable.common(concurrent_state)?.handle = None; // If this subtask has an event that means that the terminal status of // this subtask wasn't yet received so it can't be dropped yet. if waitable.take_event(concurrent_state)?.is_some() { bail!(Trap::SubtaskDropNotResolved); } if delete { waitable.delete_from(concurrent_state)?; } // Since waitables can neither be passed between instances nor forged, // this should never fail unless there's a bug in Wasmtime, but we check // here to be sure: debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?); log::trace!("subtask_drop {waitable:?} (handle {task_id})"); Ok(()) } /// Implements the `waitable-set.wait` intrinsic. pub(crate) fn waitable_set_wait( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result { if !self.options(store, options).async_ { // The caller may only call `waitable-set.wait` from an async task // (i.e. a task created via a call to an async export). // Otherwise, we'll trap. store.check_blocking()?; } let &CanonicalOptions { cancellable, instance: caller_instance, .. } = &self.id().get(store).component().env_component().options[options]; let rep = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .waitable_set_rep(set)?; self.waitable_check( store, cancellable, WaitableCheck::Wait, WaitableCheckParams { set: TableId::new(rep), options, payload, }, ) } /// Implements the `waitable-set.poll` intrinsic. pub(crate) fn waitable_set_poll( self, store: &mut StoreOpaque, options: OptionsIndex, set: u32, payload: u32, ) -> Result { let &CanonicalOptions { cancellable, instance: caller_instance, .. } = &self.id().get(store).component().env_component().options[options]; let rep = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .waitable_set_rep(set)?; self.waitable_check( store, cancellable, WaitableCheck::Poll, WaitableCheckParams { set: TableId::new(rep), options, payload, }, ) } /// Implements the `thread.index` intrinsic. pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result { let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread; match store .concurrent_state_mut() .get_mut(thread_id)? .instance_rep { Some(r) => Ok(r), None => bail_bug!("thread should have instance_rep by now"), } } /// Implements the `thread.new-indirect` intrinsic. pub(crate) fn thread_new_indirect( self, mut store: StoreContextMut, runtime_instance: RuntimeComponentInstanceIndex, _func_ty_idx: TypeFuncIndex, // currently unused start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result { log::trace!("creating new thread"); let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); let (instance, registry) = self.id().get_mut_and_registry(store.0); let callee = instance .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?; if callee.type_index(store.0) != start_func_ty.type_index() { bail!(Trap::ThreadNewIndirectInvalidType); } let token = StoreToken::new(store.as_context_mut()); let start_func = Box::new( move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { let old_thread = store.set_thread(guest_thread)?; log::trace!( "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" ); let mut store = token.as_context_mut(store); let mut params = [ValRaw::i32(context)]; // Use call_unchecked rather than call or call_async, as we don't want to run the function // on a separate fiber if we're running in an async store. unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; self.cleanup_thread(store.0, guest_thread, runtime_instance)?; log::trace!("explicit thread {guest_thread:?} completed"); let state = store.0.concurrent_state_mut(); let task = state.get_mut(guest_thread.task)?; if task.threads.is_empty() && !task.returned_or_cancelled() { bail!(Trap::NoAsyncResult); } store.0.set_thread(old_thread)?; let state = store.0.concurrent_state_mut(); if let Some(t) = old_thread.guest() { state.get_mut(t.thread)?.state = GuestThreadState::Running; } if state.get_mut(guest_thread.task)?.ready_to_delete() { Waitable::Guest(guest_thread.task).delete_from(state)?; } log::trace!("thread start: restored {old_thread:?} as current thread"); Ok(()) }, ); let state = store.0.concurrent_state_mut(); let current_thread = state.current_guest_thread()?; let parent_task = current_thread.task; let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?; let thread_id = state.push(new_thread)?; state.get_mut(parent_task)?.threads.insert(thread_id); log::trace!("new thread with id {thread_id:?} created"); self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) } pub(crate) fn resume_thread( self, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, thread_idx: u32, high_priority: bool, allow_ready: bool, ) -> Result<()> { let thread_id = GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; let state = store.concurrent_state_mut(); let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; let thread = state.get_mut(guest_thread.thread)?; match mem::replace(&mut thread.state, GuestThreadState::Running) { GuestThreadState::NotStartedExplicit(start_func) => { log::trace!("starting thread {guest_thread:?}"); let guest_call = WorkItem::GuestCall( runtime_instance, GuestCall { thread: guest_thread, kind: GuestCallKind::StartExplicit(Box::new(move |store| { start_func(store, guest_thread) })), }, ); store .concurrent_state_mut() .push_work_item(guest_call, high_priority); } GuestThreadState::Suspended(fiber) => { log::trace!("resuming thread {thread_id:?} that was suspended"); store .concurrent_state_mut() .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); } GuestThreadState::Ready(fiber) if allow_ready => { log::trace!("resuming thread {thread_id:?} that was ready"); thread.state = GuestThreadState::Ready(fiber); store .concurrent_state_mut() .promote_thread_work_item(guest_thread); } other => { thread.state = other; bail!(Trap::CannotResumeThread); } } Ok(()) } fn add_guest_thread_to_instance_table( self, thread_id: TableId, store: &mut StoreOpaque, runtime_instance: RuntimeComponentInstanceIndex, ) -> Result { let guest_id = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: runtime_instance, }) .thread_handle_table() .guest_thread_insert(thread_id.rep())?; store .concurrent_state_mut() .get_mut(thread_id)? .instance_rep = Some(guest_id); Ok(guest_id) } /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`, /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics. pub(crate) fn suspension_intrinsic( self, store: &mut StoreOpaque, caller: RuntimeComponentInstanceIndex, cancellable: bool, yielding: bool, to_thread: SuspensionTarget, ) -> Result { let guest_thread = store.concurrent_state_mut().current_guest_thread()?; if to_thread.is_none() { let state = store.concurrent_state_mut(); if yielding { // This is a `thread.yield` call if !state.may_block(guest_thread.task)? { // In a non-blocking context, a `thread.yield` may trigger // other threads in the same component instance to run. if !state.promote_instance_local_thread_work_item(caller) { // No other threads are runnable, so just return return Ok(WaitResult::Completed); } } } else { // The caller may only call `thread.suspend` from an async task // (i.e. a task created via a call to an async export). // Otherwise, we'll trap. store.check_blocking()?; } } // There could be a pending cancellation from a previous uncancellable wait if cancellable && store.concurrent_state_mut().take_pending_cancellation()? { return Ok(WaitResult::Cancelled); } match to_thread { SuspensionTarget::SomeSuspended(thread) => { self.resume_thread(store, caller, thread, true, false)? } SuspensionTarget::Some(thread) => { self.resume_thread(store, caller, thread, true, true)? } SuspensionTarget::None => { /* nothing to do */ } } let reason = if yielding { SuspendReason::Yielding { thread: guest_thread, // Tell `StoreOpaque::suspend` it's okay to suspend here since // we're handling a `thread.yield-to-suspended` call; otherwise it would // panic if we called it in a non-blocking context. skip_may_block_check: to_thread.is_some(), } } else { SuspendReason::ExplicitlySuspending { thread: guest_thread, // Tell `StoreOpaque::suspend` it's okay to suspend here since // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would // panic if we called it in a non-blocking context. skip_may_block_check: to_thread.is_some(), } }; store.suspend(reason)?; if cancellable && store.concurrent_state_mut().take_pending_cancellation()? { Ok(WaitResult::Cancelled) } else { Ok(WaitResult::Completed) } } /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. fn waitable_check( self, store: &mut StoreOpaque, cancellable: bool, check: WaitableCheck, params: WaitableCheckParams, ) -> Result { let guest_thread = store.concurrent_state_mut().current_guest_thread()?; log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set); let state = store.concurrent_state_mut(); let task = state.get_mut(guest_thread.task)?; // If we're waiting, and there are no events immediately available, // suspend the fiber until that changes. match &check { WaitableCheck::Wait => { let set = params.set; if (task.event.is_none() || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) && state.get_mut(set)?.ready.is_empty() { if cancellable { let old = state .get_mut(guest_thread.thread)? .wake_on_cancel .replace(set); if !old.is_none() { bail_bug!("thread unexpectedly in a prior wake_on_cancel set"); } } store.suspend(SuspendReason::Waiting { set, thread: guest_thread, skip_may_block_check: false, })?; } } WaitableCheck::Poll => {} } log::trace!( "waitable check for {guest_thread:?}; set {:?}, part two", params.set ); // Deliver any pending events to the guest and return. let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; let (ordinal, handle, result) = match &check { WaitableCheck::Wait => { let (event, waitable) = match event { Some(p) => p, None => bail_bug!("event expected to be present"), }; let handle = waitable.map(|(_, v)| v).unwrap_or(0); let (ordinal, result) = event.parts(); (ordinal, handle, result) } WaitableCheck::Poll => { if let Some((event, waitable)) = event { let handle = waitable.map(|(_, v)| v).unwrap_or(0); let (ordinal, result) = event.parts(); (ordinal, handle, result) } else { log::trace!( "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", guest_thread.task, params.set ); let (ordinal, result) = Event::None.parts(); (ordinal, 0, result) } } }; let memory = self.options_memory_mut(store, params.options); let ptr = func::validate_inbounds_dynamic( &CanonicalAbiInfo::POINTER_PAIR, memory, &ValRaw::u32(params.payload), )?; memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); Ok(ordinal) } /// Implements the `subtask.cancel` intrinsic. pub(crate) fn subtask_cancel( self, store: &mut StoreOpaque, caller_instance: RuntimeComponentInstanceIndex, async_: bool, task_id: u32, ) -> Result { if !async_ { // The caller may only sync call `subtask.cancel` from an async task // (i.e. a task created via a call to an async export). Otherwise, // we'll trap. store.check_blocking()?; } let (rep, is_host) = store .instance_state(RuntimeInstance { instance: self.id().instance(), index: caller_instance, }) .handle_table() .subtask_rep(task_id)?; let (waitable, expected_caller) = if is_host { let id = TableId::::new(rep); ( Waitable::Host(id), store.concurrent_state_mut().get_mut(id)?.caller, ) } else { let id = TableId::::new(rep); if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller { (Waitable::Guest(id), thread) } else { bail_bug!("expected guest caller for `subtask.cancel`") } }; // Since waitables can neither be passed between instances nor forged, // this should never fail unless there's a bug in Wasmtime, but we check // here to be sure: let concurrent_state = store.concurrent_state_mut(); debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?); log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); let needs_block; if let Waitable::Host(host_task) = waitable { let state = &mut concurrent_state.get_mut(host_task)?.state; match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) { // If the callee is still running, signal an abort is requested. // // After cancelling this falls through to block waiting for the // host task to actually finish assuming that `async_` is false. // This blocking behavior resolves the race of `handle.abort()` // with the task actually getting cancelled or finishing. HostTaskState::CalleeRunning(handle) => { handle.abort(); needs_block = true; } // Cancellation was already requested, so fail as the task can't // be cancelled twice. HostTaskState::CalleeDone { cancelled } => { if cancelled { bail!(Trap::SubtaskCancelAfterTerminal); } else { // The callee is already done so there's no need to // block further for an event. needs_block = false; } } // These states should not be possible for a subtask that's // visible from the guest, so trap here. HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => { bail_bug!("invalid states for host callee") } } } else { let caller = concurrent_state.current_guest_thread()?; let guest_task = TableId::::new(rep); let task = concurrent_state.get_mut(guest_task)?; if !task.already_lowered_parameters() { // The task is in a `starting` state, meaning it hasn't run at // all yet. Here we update its fields to indicate that it is // ready to delete immediately once `subtask.drop` is called. task.lower_params = None; task.lift_result = None; task.exited = true; let instance = task.instance; assert_eq!(1, task.threads.len()); let thread = mem::take(&mut task.threads).into_iter().next().unwrap(); let concurrent_state = store.concurrent_state_mut(); concurrent_state.delete(thread)?; assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete()); // Not yet started; cancel and remove from pending let pending = &mut store.instance_state(instance).concurrent_state().pending; let pending_count = pending.len(); pending.retain(|thread, _| thread.task != guest_task); // If there were no pending threads for this task, we're in an error state if pending.len() == pending_count { bail!(Trap::SubtaskCancelAfterTerminal); } return Ok(Status::StartCancelled as u32); } else if !task.returned_or_cancelled() { // Started, but not yet returned or cancelled; send the // `CANCELLED` event task.cancel_sent = true; // Note that this might overwrite an event that was set earlier // (e.g. `Event::None` if the task is yielding, or // `Event::Cancelled` if it was already cancelled), but that's // okay -- this should supersede the previous state. task.event = Some(Event::Cancelled); let runtime_instance = task.instance.index; for thread in task.threads.clone() { let thread = QualifiedThreadId { task: guest_task, thread, }; if let Some(set) = concurrent_state .get_mut(thread.thread)? .wake_on_cancel .take() { let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) { Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber), Some(WaitMode::Callback(instance)) => WorkItem::GuestCall( runtime_instance, GuestCall { thread, kind: GuestCallKind::DeliverEvent { instance, set: None, }, }, ), None => bail_bug!("thread not present in wake_on_cancel set"), }; concurrent_state.push_high_priority(item); store.suspend(SuspendReason::Yielding { thread: caller, // `subtask.cancel` is not allowed to be called in a // sync context, so we cannot skip the may-block check. skip_may_block_check: false, })?; break; } } // Guest tasks need to block if they have not yet returned or // cancelled, even as a result of the event delivery above. needs_block = !store .concurrent_state_mut() .get_mut(guest_task)? .returned_or_cancelled() } else { needs_block = false; } }; // If we need to block waiting on the terminal status of this subtask // then return immediately in `async` mode, or otherwise wait for the // event to get signaled through the store. if needs_block { if async_ { return Ok(BLOCKED); } // Wait for this waitable to get signaled with its terminal status // from the completion callback enqueued by `first_poll`. Once // that's done fall through to the sahred store.wait_for_event(waitable)?; // .. fall through to determine what event's in store for us. } let event = waitable.take_event(store.concurrent_state_mut())?; if let Some(Event::Subtask { status: status @ (Status::Returned | Status::ReturnCancelled), }) = event { Ok(status as u32) } else { bail!(Trap::SubtaskCancelAfterTerminal); } } pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result { store.concurrent_state_mut().context_get(slot) } pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> { store.concurrent_state_mut().context_set(slot, value) } } /// Trait representing component model ABI async intrinsics and fused adapter /// helper functions. /// /// SAFETY (callers): Most of the methods in this trait accept raw pointers, /// which must be valid for at least the duration of the call (and possibly for /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` /// pointers used for async calls). pub trait VMComponentAsyncStore { /// A helper function for fused adapter modules involving calls where the /// one of the caller or callee is async. /// /// This helper is not used when the caller and callee both use the sync /// ABI, only when at least one is async is this used. unsafe fn prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull, return_: NonNull, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()>; /// A helper function for fused adapter modules involving calls where the /// caller is sync-lowered but the callee is async-lifted. unsafe fn sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull, param_count: u32, storage: *mut MaybeUninit, storage_len: usize, ) -> Result<()>; /// A helper function for fused adapter modules involving calls where the /// caller is async-lowered. unsafe fn async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull, param_count: u32, result_count: u32, flags: u32, ) -> Result; /// The `future.write` intrinsic. fn future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result; /// The `future.read` intrinsic. fn future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result; /// The `future.drop-writable` intrinsic. fn future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()>; /// The `stream.write` intrinsic. fn stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result; /// The `stream.read` intrinsic. fn stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result; /// The "fast-path" implementation of the `stream.write` intrinsic for /// "flat" (i.e. memcpy-able) payloads. fn flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result; /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" /// (i.e. memcpy-able) payloads. fn flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result; /// The `stream.drop-writable` intrinsic. fn stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()>; /// The `error-context.debug-message` intrinsic. fn error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()>; /// The `thread.new-indirect` intrinsic fn thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result; } /// SAFETY: See trait docs. impl VMComponentAsyncStore for StoreInner { unsafe fn prepare_call( &mut self, instance: Instance, memory: *mut VMMemoryDefinition, start: NonNull, return_: NonNull, caller_instance: RuntimeComponentInstanceIndex, callee_instance: RuntimeComponentInstanceIndex, task_return_type: TypeTupleIndex, callee_async: bool, string_encoding: StringEncoding, result_count_or_max_if_async: u32, storage: *mut ValRaw, storage_len: usize, ) -> Result<()> { // SAFETY: The `wasmtime_cranelift`-generated code that calls // this method will have ensured that `storage` is a valid // pointer containing at least `storage_len` items. let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); unsafe { instance.prepare_call( StoreContextMut(self), start, return_, caller_instance, callee_instance, task_return_type, callee_async, memory, string_encoding, match result_count_or_max_if_async { PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { params, has_result: false, }, PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { params, has_result: true, }, result_count => CallerInfo::Sync { params, result_count, }, }, ) } } unsafe fn sync_start( &mut self, instance: Instance, callback: *mut VMFuncRef, callee: NonNull, param_count: u32, storage: *mut MaybeUninit, storage_len: usize, ) -> Result<()> { unsafe { instance .start_call( StoreContextMut(self), callback, ptr::null_mut(), callee, param_count, 1, START_FLAG_ASYNC_CALLEE, // SAFETY: The `wasmtime_cranelift`-generated code that calls // this method will have ensured that `storage` is a valid // pointer containing at least `storage_len` items. Some(std::slice::from_raw_parts_mut(storage, storage_len)), ) .map(drop) } } unsafe fn async_start( &mut self, instance: Instance, callback: *mut VMFuncRef, post_return: *mut VMFuncRef, callee: NonNull, param_count: u32, result_count: u32, flags: u32, ) -> Result { unsafe { instance.start_call( StoreContextMut(self), callback, post_return, callee, param_count, result_count, flags, None, ) } } fn future_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result { instance .guest_write( StoreContextMut(self), caller, TransmitIndex::Future(ty), options, None, future, address, 1, ) .map(|result| result.encode()) } fn future_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeFutureTableIndex, options: OptionsIndex, future: u32, address: u32, ) -> Result { instance .guest_read( StoreContextMut(self), caller, TransmitIndex::Future(ty), options, None, future, address, 1, ) .map(|result| result.encode()) } fn stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result { instance .guest_write( StoreContextMut(self), caller, TransmitIndex::Stream(ty), options, None, stream, address, count, ) .map(|result| result.encode()) } fn stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, stream: u32, address: u32, count: u32, ) -> Result { instance .guest_read( StoreContextMut(self), caller, TransmitIndex::Stream(ty), options, None, stream, address, count, ) .map(|result| result.encode()) } fn future_drop_writable( &mut self, instance: Instance, ty: TypeFutureTableIndex, writer: u32, ) -> Result<()> { instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) } fn flat_stream_write( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result { instance .guest_write( StoreContextMut(self), caller, TransmitIndex::Stream(ty), options, Some(FlatAbi { size: payload_size, align: payload_align, }), stream, address, count, ) .map(|result| result.encode()) } fn flat_stream_read( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, ty: TypeStreamTableIndex, options: OptionsIndex, payload_size: u32, payload_align: u32, stream: u32, address: u32, count: u32, ) -> Result { instance .guest_read( StoreContextMut(self), caller, TransmitIndex::Stream(ty), options, Some(FlatAbi { size: payload_size, align: payload_align, }), stream, address, count, ) .map(|result| result.encode()) } fn stream_drop_writable( &mut self, instance: Instance, ty: TypeStreamTableIndex, writer: u32, ) -> Result<()> { instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) } fn error_context_debug_message( &mut self, instance: Instance, ty: TypeComponentLocalErrorContextTableIndex, options: OptionsIndex, err_ctx_handle: u32, debug_msg_address: u32, ) -> Result<()> { instance.error_context_debug_message( StoreContextMut(self), ty, options, err_ctx_handle, debug_msg_address, ) } fn thread_new_indirect( &mut self, instance: Instance, caller: RuntimeComponentInstanceIndex, func_ty_idx: TypeFuncIndex, start_func_table_idx: RuntimeTableIndex, start_func_idx: u32, context: i32, ) -> Result { instance.thread_new_indirect( StoreContextMut(self), caller, func_ty_idx, start_func_table_idx, start_func_idx, context, ) } } type HostTaskFuture = Pin> + Send + 'static>>; /// Represents the state of a pending host task. /// /// This is used to represent tasks when the guest calls into the host. pub(crate) struct HostTask { common: WaitableCommon, /// Guest thread which called the host. caller: QualifiedThreadId, /// State of borrows/etc the host needs to track. Used when the guest passes /// borrows to the host, for example. call_context: CallContext, state: HostTaskState, } enum HostTaskState { /// A host task has been created and it's considered "started". /// /// The host task has yet to enter `first_poll` or `poll_and_block` which /// is where this will get updated further. CalleeStarted, /// State used for tasks in `first_poll` meaning that the guest did an async /// lower of a host async function which is blocked. The specified handle is /// linked to the future in the main `FuturesUnordered` of a store which is /// used to cancel it if the guest requests cancellation. CalleeRunning(JoinHandle), /// Terminal state used for tasks in `poll_and_block` to store the result of /// their computation. Note that this state is not used for tasks in /// `first_poll`. CalleeFinished(LiftedResult), /// Terminal state for host tasks meaning that the task was cancelled or the /// result was taken. CalleeDone { cancelled: bool }, } impl HostTask { fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self { Self { common: WaitableCommon::default(), call_context: CallContext::default(), caller, state, } } } impl TableDebug for HostTask { fn type_name() -> &'static str { "HostTask" } } type CallbackFn = Box Result + Send + Sync + 'static>; /// Represents the caller of a given guest task. enum Caller { /// The host called the guest task. Host { /// If present, may be used to deliver the result. tx: Option>, /// If true, there's a host future that must be dropped before the task /// can be deleted. host_future_present: bool, /// Represents the caller of the host function which called back into a /// guest. Note that this thread could belong to an entirely unrelated /// top-level component instance than the one the host called into. caller: CurrentThread, }, /// Another guest thread called the guest task Guest { /// The id of the caller thread: QualifiedThreadId, }, } /// Represents a closure and related canonical ABI parameters required to /// validate a `task.return` call at runtime and lift the result. struct LiftResult { lift: RawLift, ty: TypeTupleIndex, memory: Option>, string_encoding: StringEncoding, } /// The table ID for a guest thread, qualified by the task to which it belongs. /// /// This exists to minimize table lookups and the necessity to pass stores around mutably /// for the common case of identifying the task to which a thread belongs. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] pub(crate) struct QualifiedThreadId { task: TableId, thread: TableId, } impl QualifiedThreadId { fn qualify( state: &mut ConcurrentState, thread: TableId, ) -> Result { Ok(QualifiedThreadId { task: state.get_mut(thread)?.parent_task, thread, }) } } impl fmt::Debug for QualifiedThreadId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("QualifiedThreadId") .field(&self.task.rep()) .field(&self.thread.rep()) .finish() } } enum GuestThreadState { NotStartedImplicit, NotStartedExplicit( Box Result<()> + Send + Sync>, ), Running, Suspended(StoreFiber<'static>), Ready(StoreFiber<'static>), Completed, } pub struct GuestThread { /// Context-local state used to implement the `context.{get,set}` /// intrinsics. context: [u32; 2], /// The owning guest task. parent_task: TableId, /// If present, indicates that the thread is currently waiting on the /// specified set but may be cancelled and woken immediately. wake_on_cancel: Option>, /// The execution state of this guest thread state: GuestThreadState, /// The index of this thread in the component instance's handle table. /// This must always be `Some` after initialization. instance_rep: Option, /// Scratch waitable set used to watch subtasks during synchronous calls. sync_call_set: TableId, } impl GuestThread { /// Retrieve the `GuestThread` corresponding to the specified guest-visible /// handle. fn from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, guest_thread: u32, ) -> Result> { let rep = state.instance_states().0[caller_instance] .thread_handle_table() .guest_thread_rep(guest_thread)?; Ok(TableId::new(rep)) } fn new_implicit(state: &mut ConcurrentState, parent_task: TableId) -> Result { let sync_call_set = state.push(WaitableSet::default())?; Ok(Self { context: [0; 2], parent_task, wake_on_cancel: None, state: GuestThreadState::NotStartedImplicit, instance_rep: None, sync_call_set, }) } fn new_explicit( state: &mut ConcurrentState, parent_task: TableId, start_func: Box< dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, >, ) -> Result { let sync_call_set = state.push(WaitableSet::default())?; Ok(Self { context: [0; 2], parent_task, wake_on_cancel: None, state: GuestThreadState::NotStartedExplicit(start_func), instance_rep: None, sync_call_set, }) } } impl TableDebug for GuestThread { fn type_name() -> &'static str { "GuestThread" } } enum SyncResult { NotProduced, Produced(Option), Taken, } impl SyncResult { fn take(&mut self) -> Result>> { Ok(match mem::replace(self, SyncResult::Taken) { SyncResult::NotProduced => None, SyncResult::Produced(val) => Some(val), SyncResult::Taken => { bail_bug!("attempted to take a synchronous result that was already taken") } }) } } #[derive(Debug)] enum HostFutureState { NotApplicable, Live, Dropped, } /// Represents a pending guest task. pub(crate) struct GuestTask { /// See `WaitableCommon` common: WaitableCommon, /// Closure to lower the parameters passed to this task. lower_params: Option, /// See `LiftResult` lift_result: Option, /// A place to stash the type-erased lifted result if it can't be delivered /// immediately. result: Option, /// Closure to call the callback function for an async-lifted export, if /// provided. callback: Option, /// See `Caller` caller: Caller, /// Borrow state for this task. /// /// Keeps track of `borrow` received to this task to ensure that /// everything is dropped by the time it exits. call_context: CallContext, /// A place to stash the lowered result for a sync-to-async call until it /// can be returned to the caller. sync_result: SyncResult, /// Whether or not the task has been cancelled (i.e. whether the task is /// permitted to call `task.cancel`). cancel_sent: bool, /// Whether or not we've sent a `Status::Starting` event to any current or /// future waiters for this waitable. starting_sent: bool, /// The runtime instance to which the exported function for this guest task /// belongs. /// /// Note that the task may do a sync->sync call via a fused adapter which /// results in that task executing code in a different instance, and it may /// call host functions and intrinsics from that other instance. instance: RuntimeInstance, /// If present, a pending `Event::None` or `Event::Cancelled` to be /// delivered to this task. event: Option, /// Whether or not the task has exited. exited: bool, /// Threads belonging to this task threads: HashSet>, /// The state of the host future that represents an async task, which must /// be dropped before we can delete the task. host_future_state: HostFutureState, /// Indicates whether this task was created for a call to an async-lifted /// export. async_function: bool, } impl GuestTask { fn already_lowered_parameters(&self) -> bool { // We reset `lower_params` after we lower the parameters self.lower_params.is_none() } fn returned_or_cancelled(&self) -> bool { // We reset `lift_result` after we return or exit self.lift_result.is_none() } fn ready_to_delete(&self) -> bool { let threads_completed = self.threads.is_empty(); let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); let pending_completion_event = matches!( self.common.event, Some(Event::Subtask { status: Status::Returned | Status::ReturnCancelled }) ); let ready = threads_completed && !has_sync_result && !pending_completion_event && !matches!(self.host_future_state, HostFutureState::Live); log::trace!( "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", threads_completed, has_sync_result, pending_completion_event, self.host_future_state ); ready } fn new( lower_params: RawLower, lift_result: LiftResult, caller: Caller, callback: Option, instance: RuntimeInstance, async_function: bool, ) -> Result { let host_future_state = match &caller { Caller::Guest { .. } => HostFutureState::NotApplicable, Caller::Host { host_future_present, .. } => { if *host_future_present { HostFutureState::Live } else { HostFutureState::NotApplicable } } }; Ok(Self { common: WaitableCommon::default(), lower_params: Some(lower_params), lift_result: Some(lift_result), result: None, callback, caller, call_context: CallContext::default(), sync_result: SyncResult::NotProduced, cancel_sent: false, starting_sent: false, instance, event: None, exited: false, threads: HashSet::new(), host_future_state, async_function, }) } /// Dispose of this guest task. fn dispose(self, _state: &mut ConcurrentState) -> Result<()> { assert!(self.threads.is_empty()); Ok(()) } } impl TableDebug for GuestTask { fn type_name() -> &'static str { "GuestTask" } } /// Represents state common to all kinds of waitables. #[derive(Default)] struct WaitableCommon { /// The currently pending event for this waitable, if any. event: Option, /// The set to which this waitable belongs, if any. set: Option>, /// The handle with which the guest refers to this waitable, if any. handle: Option, } /// Represents a Component Model Async `waitable`. #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] enum Waitable { /// A host task Host(TableId), /// A guest task Guest(TableId), /// The read or write end of a stream or future Transmit(TableId), } impl Waitable { /// Retrieve the `Waitable` corresponding to the specified guest-visible /// handle. fn from_instance( state: Pin<&mut ComponentInstance>, caller_instance: RuntimeComponentInstanceIndex, waitable: u32, ) -> Result { use crate::runtime::vm::component::Waitable; let (waitable, kind) = state.instance_states().0[caller_instance] .handle_table() .waitable_rep(waitable)?; Ok(match kind { Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), }) } /// Retrieve the host-visible identifier for this `Waitable`. fn rep(&self) -> u32 { match self { Self::Host(id) => id.rep(), Self::Guest(id) => id.rep(), Self::Transmit(id) => id.rep(), } } /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or /// remove it from any set it may currently belong to (when `set` is /// `None`). fn join(&self, state: &mut ConcurrentState, set: Option>) -> Result<()> { log::trace!("waitable {self:?} join set {set:?}",); let old = mem::replace(&mut self.common(state)?.set, set); if let Some(old) = old { match *self { Waitable::Host(id) => state.remove_child(id, old), Waitable::Guest(id) => state.remove_child(id, old), Waitable::Transmit(id) => state.remove_child(id, old), }?; state.get_mut(old)?.ready.remove(self); } if let Some(set) = set { match *self { Waitable::Host(id) => state.add_child(id, set), Waitable::Guest(id) => state.add_child(id, set), Waitable::Transmit(id) => state.add_child(id, set), }?; if self.common(state)?.event.is_some() { self.mark_ready(state)?; } } Ok(()) } /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { Ok(match self { Self::Host(id) => &mut state.get_mut(*id)?.common, Self::Guest(id) => &mut state.get_mut(*id)?.common, Self::Transmit(id) => &mut state.get_mut(*id)?.common, }) } /// Set or clear the pending event for this waitable and either deliver it /// to the first waiter, if any, or mark it as ready to be delivered to the /// next waiter that arrives. fn set_event(&self, state: &mut ConcurrentState, event: Option) -> Result<()> { log::trace!("set event for {self:?}: {event:?}"); self.common(state)?.event = event; self.mark_ready(state) } /// Take the pending event from this waitable, leaving `None` in its place. fn take_event(&self, state: &mut ConcurrentState) -> Result> { let common = self.common(state)?; let event = common.event.take(); if let Some(set) = self.common(state)?.set { state.get_mut(set)?.ready.remove(self); } Ok(event) } /// Deliver the current event for this waitable to the first waiter, if any, /// or else mark it as ready to be delivered to the next waiter that /// arrives. fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { if let Some(set) = self.common(state)?.set { state.get_mut(set)?.ready.insert(*self); if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); let item = match mode { WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), WaitMode::Callback(instance) => WorkItem::GuestCall( state.get_mut(thread.task)?.instance.index, GuestCall { thread, kind: GuestCallKind::DeliverEvent { instance, set: Some(set), }, }, ), }; state.push_high_priority(item); } } Ok(()) } /// Remove this waitable from the instance's rep table. fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { match self { Self::Host(task) => { log::trace!("delete host task {task:?}"); state.delete(*task)?; } Self::Guest(task) => { log::trace!("delete guest task {task:?}"); state.delete(*task)?.dispose(state)?; } Self::Transmit(task) => { state.delete(*task)?; } } Ok(()) } } impl fmt::Debug for Waitable { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::Host(id) => write!(f, "{id:?}"), Self::Guest(id) => write!(f, "{id:?}"), Self::Transmit(id) => write!(f, "{id:?}"), } } } /// Represents a Component Model Async `waitable-set`. #[derive(Default)] struct WaitableSet { /// Which waitables in this set have pending events, if any. ready: BTreeSet, /// Which guest threads are currently waiting on this set, if any. waiting: BTreeMap, } impl TableDebug for WaitableSet { fn type_name() -> &'static str { "WaitableSet" } } /// Type-erased closure to lower the parameters for a guest task. type RawLower = Box]) -> Result<()> + Send + Sync>; /// Type-erased closure to lift the result for a guest task. type RawLift = Box< dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result> + Send + Sync, >; /// Type erased result of a guest task which may be downcast to the expected /// type by a host caller (or simply ignored in the case of a guest caller; see /// `DummyResult`). type LiftedResult = Box; /// Used to return a result from a `LiftFn` when the actual result has already /// been lowered to a guest task's stack and linear memory. struct DummyResult; /// Represents the Component Model Async state of a (sub-)component instance. #[derive(Default)] pub struct ConcurrentInstanceState { /// Whether backpressure is set for this instance (enabled if >0) backpressure: u16, /// Whether this instance can be entered do_not_enter: bool, /// Pending calls for this instance which require `Self::backpressure` to be /// `true` and/or `Self::do_not_enter` to be false before they can proceed. pending: BTreeMap, } impl ConcurrentInstanceState { pub fn pending_is_empty(&self) -> bool { self.pending.is_empty() } } #[derive(Debug, Copy, Clone)] pub(crate) enum CurrentThread { Guest(QualifiedThreadId), Host(TableId), None, } impl CurrentThread { fn guest(&self) -> Option<&QualifiedThreadId> { match self { Self::Guest(id) => Some(id), _ => None, } } fn host(&self) -> Option> { match self { Self::Host(id) => Some(*id), _ => None, } } fn is_none(&self) -> bool { matches!(self, Self::None) } } impl From for CurrentThread { fn from(id: QualifiedThreadId) -> Self { Self::Guest(id) } } impl From> for CurrentThread { fn from(id: TableId) -> Self { Self::Host(id) } } /// Represents the Component Model Async state of a store. pub struct ConcurrentState { /// The currently running thread, if any. current_thread: CurrentThread, /// The set of pending host and background tasks, if any. /// /// See `ComponentInstance::poll_until` for where we temporarily take this /// out, poll it, then put it back to avoid any mutable aliasing hazards. futures: AlwaysMut>>, /// The table of waitables, waitable sets, etc. table: AlwaysMut, /// The "high priority" work queue for this store's event loop. high_priority: Vec, /// The "low priority" work queue for this store's event loop. low_priority: VecDeque, /// A place to stash the reason a fiber is suspending so that the code which /// resumed it will know under what conditions the fiber should be resumed /// again. suspend_reason: Option, /// A cached fiber which is waiting for work to do. /// /// This helps us avoid creating a new fiber for each `GuestCall` work item. worker: Option>, /// A place to stash the work item for which we're resuming a worker fiber. worker_item: Option, /// Reference counts for all component error contexts /// /// NOTE: it is possible the global ref count to be *greater* than the sum of /// (sub)component ref counts as tracked by `error_context_tables`, for /// example when the host holds one or more references to error contexts. /// /// The key of this primary map is often referred to as the "rep" (i.e. host-side /// component-wide representation) of the index into concurrent state for a given /// stored `ErrorContext`. /// /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same /// as a `TableId`. global_error_context_ref_counts: BTreeMap, } impl Default for ConcurrentState { fn default() -> Self { Self { current_thread: CurrentThread::None, table: AlwaysMut::new(ResourceTable::new()), futures: AlwaysMut::new(Some(FuturesUnordered::new())), high_priority: Vec::new(), low_priority: VecDeque::new(), suspend_reason: None, worker: None, worker_item: None, global_error_context_ref_counts: BTreeMap::new(), } } } impl ConcurrentState { /// Take ownership of any fibers and futures owned by this object. /// /// This should be used when disposing of the `Store` containing this object /// in order to gracefully resolve any and all fibers using /// `StoreFiber::dispose`. This is necessary to avoid possible /// use-after-free bugs due to fibers which may still have access to the /// `Store`. /// /// Additionally, the futures collected with this function should be dropped /// within a `tls::set` call, which will ensure than any futures closing /// over an `&Accessor` will have access to the store when dropped, allowing /// e.g. `WithAccessor[AndValue]` instances to be disposed of without /// panicking. /// /// Note that this will leave the object in an inconsistent and unusable /// state, so it should only be used just prior to dropping it. pub(crate) fn take_fibers_and_futures( &mut self, fibers: &mut Vec>, futures: &mut Vec>, ) { for entry in self.table.get_mut().iter_mut() { if let Some(set) = entry.downcast_mut::() { for mode in mem::take(&mut set.waiting).into_values() { if let WaitMode::Fiber(fiber) = mode { fibers.push(fiber); } } } else if let Some(thread) = entry.downcast_mut::() { if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) = mem::replace(&mut thread.state, GuestThreadState::Completed) { fibers.push(fiber); } } } if let Some(fiber) = self.worker.take() { fibers.push(fiber); } let mut handle_item = |item| match item { WorkItem::ResumeFiber(fiber) => { fibers.push(fiber); } WorkItem::PushFuture(future) => { self.futures .get_mut() .as_mut() .unwrap() .push(future.into_inner()); } WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => { } }; for item in mem::take(&mut self.high_priority) { handle_item(item); } for item in mem::take(&mut self.low_priority) { handle_item(item); } if let Some(them) = self.futures.get_mut().take() { futures.push(them); } } /// Collect the next set of work items to run. This will be either all /// high-priority items, or a single low-priority item if there are no /// high-priority items. fn collect_work_items_to_run(&mut self) -> Vec { let mut ready = mem::take(&mut self.high_priority); if ready.is_empty() { if let Some(item) = self.low_priority.pop_back() { ready.push(item); } } ready } fn push( &mut self, value: V, ) -> Result, ResourceTableError> { self.table.get_mut().push(value).map(TableId::from) } fn get_mut(&mut self, id: TableId) -> Result<&mut V, ResourceTableError> { self.table.get_mut().get_mut(&Resource::from(id)) } pub fn add_child( &mut self, child: TableId, parent: TableId, ) -> Result<(), ResourceTableError> { self.table .get_mut() .add_child(Resource::from(child), Resource::from(parent)) } pub fn remove_child( &mut self, child: TableId, parent: TableId, ) -> Result<(), ResourceTableError> { self.table .get_mut() .remove_child(Resource::from(child), Resource::from(parent)) } fn delete(&mut self, id: TableId) -> Result { self.table.get_mut().delete(Resource::from(id)) } fn push_future(&mut self, future: HostTaskFuture) { // Note that we can't directly push to `ConcurrentState::futures` here // since this may be called from a future that's being polled inside // `Self::poll_until`, which temporarily removes the `FuturesUnordered` // so it has exclusive access while polling it. Therefore, we push a // work item to the "high priority" queue, which will actually push to // `ConcurrentState::futures` later. self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); } fn push_high_priority(&mut self, item: WorkItem) { log::trace!("push high priority: {item:?}"); self.high_priority.push(item); } fn push_low_priority(&mut self, item: WorkItem) { log::trace!("push low priority: {item:?}"); self.low_priority.push_front(item); } fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { if high_priority { self.push_high_priority(item); } else { self.push_low_priority(item); } } fn promote_instance_local_thread_work_item( &mut self, current_instance: RuntimeComponentInstanceIndex, ) -> bool { self.promote_work_items_matching(|item: &WorkItem| match item { WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => { *instance == current_instance } _ => false, }) } fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool { self.promote_work_items_matching(|item: &WorkItem| match item { WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => { *t == thread } _ => false, }) } fn promote_work_items_matching(&mut self, mut predicate: F) -> bool where F: FnMut(&WorkItem) -> bool, { // If there's a high-priority work item to resume the current guest thread, // we don't need to promote anything, but we return true to indicate that // work is pending for the current instance. if self.high_priority.iter().any(&mut predicate) { true } // Otherwise, look for a low-priority work item that matches the current // instance and promote it to high-priority. else if let Some(idx) = self.low_priority.iter().position(&mut predicate) { let item = self.low_priority.remove(idx).unwrap(); self.push_high_priority(item); true } else { false } } /// Implements the `context.get` intrinsic. pub(crate) fn context_get(&mut self, slot: u32) -> Result { let thread = self.current_guest_thread()?; let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?]; log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); Ok(val) } /// Implements the `context.set` intrinsic. pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { let thread = self.current_guest_thread()?; log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val; Ok(()) } /// Returns whether there's a pending cancellation on the current guest thread, /// consuming the event if so. fn take_pending_cancellation(&mut self) -> Result { let thread = self.current_guest_thread()?; if let Some(event) = self.get_mut(thread.task)?.event.take() { assert!(matches!(event, Event::Cancelled)); Ok(true) } else { Ok(false) } } fn check_blocking_for(&mut self, task: TableId) -> Result<()> { if self.may_block(task)? { Ok(()) } else { Err(Trap::CannotBlockSyncTask.into()) } } fn may_block(&mut self, task: TableId) -> Result { let task = self.get_mut(task)?; Ok(task.async_function || task.returned_or_cancelled()) } /// Used by `ResourceTables` to acquire the current `CallContext` for the /// specified task. /// /// The `task` is bit-packed as returned by `current_call_context_scope_id` /// below. pub fn call_context(&mut self, task: u32) -> Result<&mut CallContext> { let (task, is_host) = (task >> 1, task & 1 == 1); if is_host { let task: TableId = TableId::new(task); Ok(&mut self.get_mut(task)?.call_context) } else { let task: TableId = TableId::new(task); Ok(&mut self.get_mut(task)?.call_context) } } /// Used by `ResourceTables` to record the scope of a borrow to get undone /// in the future. pub fn current_call_context_scope_id(&self) -> Result { let (bits, is_host) = match self.current_thread { CurrentThread::Guest(id) => (id.task.rep(), false), CurrentThread::Host(id) => (id.rep(), true), CurrentThread::None => bail_bug!("current thread is not set"), }; assert_eq!((bits << 1) >> 1, bits); Ok((bits << 1) | u32::from(is_host)) } fn current_guest_thread(&self) -> Result { match self.current_thread.guest() { Some(id) => Ok(*id), None => bail_bug!("current thread is not a guest thread"), } } fn current_host_thread(&self) -> Result> { match self.current_thread.host() { Some(id) => Ok(id), None => bail_bug!("current thread is not a host thread"), } } fn futures_mut(&mut self) -> Result<&mut FuturesUnordered> { match self.futures.get_mut().as_mut() { Some(f) => Ok(f), None => bail_bug!("futures field of concurrent state is currently taken"), } } pub(crate) fn table(&mut self) -> &mut ResourceTable { self.table.get_mut() } } /// Provide a type hint to compiler about the shape of a parameter lower /// closure. fn for_any_lower< F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit]) -> Result<()> + Send + Sync, >( fun: F, ) -> F { fun } /// Provide a type hint to compiler about the shape of a result lift closure. fn for_any_lift< F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result> + Send + Sync, >( fun: F, ) -> F { fun } /// Wrap the specified future in a `poll_fn` which asserts that the future is /// only polled from the event loop of the specified `Store`. /// /// See `StoreContextMut::run_concurrent` for details. fn checked( id: StoreId, fut: F, ) -> impl Future + Send + 'static { async move { let mut fut = pin!(fut); future::poll_fn(move |cx| { let message = "\ `Future`s which depend on asynchronous component tasks, streams, or \ futures to complete may only be polled from the event loop of the \ store to which they belong. Please use \ `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ "; tls::try_get(|store| { let matched = match store { tls::TryGet::Some(store) => store.id() == id, tls::TryGet::Taken | tls::TryGet::None => false, }; if !matched { panic!("{message}") } }); fut.as_mut().poll(cx) }) .await } } /// Assert that `StoreContextMut::run_concurrent` has not been called from /// within an store's event loop. fn check_recursive_run() { tls::try_get(|store| { if !matches!(store, tls::TryGet::None) { panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") } }); } fn unpack_callback_code(code: u32) -> (u32, u32) { (code & 0xF, code >> 4) } /// Helper struct for packaging parameters to be passed to /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or /// `waitable-set.poll`. struct WaitableCheckParams { set: TableId, options: OptionsIndex, payload: u32, } /// Indicates whether `ComponentInstance::waitable_check` is being called for /// `waitable-set.wait` or `waitable-set.poll`. enum WaitableCheck { Wait, Poll, } /// Represents a guest task called from the host, prepared using `prepare_call`. pub(crate) struct PreparedCall { /// The guest export to be called handle: Func, /// The guest thread created by `prepare_call` thread: QualifiedThreadId, /// The number of lowered core Wasm parameters to pass to the call. param_count: usize, /// The `oneshot::Receiver` to which the result of the call will be /// delivered when it is available. rx: oneshot::Receiver, _phantom: PhantomData, } impl PreparedCall { /// Get a copy of the `TaskId` for this `PreparedCall`. pub(crate) fn task_id(&self) -> TaskId { TaskId { task: self.thread.task, } } } /// Represents a task created by `prepare_call`. pub(crate) struct TaskId { task: TableId, } impl TaskId { /// The host future for an async task was dropped. If the parameters have not been lowered yet, /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended /// and can be resumed by other tasks for this component, so we mark the future as dropped /// and delete the task when all threads are done. pub(crate) fn host_future_dropped(&self, store: StoreContextMut) -> Result<()> { let task = store.0.concurrent_state_mut().get_mut(self.task)?; if !task.already_lowered_parameters() { Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? } else { task.host_future_state = HostFutureState::Dropped; if task.ready_to_delete() { Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? } } Ok(()) } } /// Prepare a call to the specified exported Wasm function, providing functions /// for lowering the parameters and lifting the result. /// /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event /// loop, use `queue_call`. pub(crate) fn prepare_call( mut store: StoreContextMut, handle: Func, param_count: usize, host_future_present: bool, lower_params: impl FnOnce(Func, StoreContextMut, &mut [MaybeUninit]) -> Result<()> + Send + Sync + 'static, lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result> + Send + Sync + 'static, ) -> Result> { let (options, _flags, ty, raw_options) = handle.abi_info(store.0); let instance = handle.instance().id().get(store.0); let options = &instance.component().env_component().options[options]; let ty = &instance.component().types()[ty]; let async_function = ty.async_; let task_return_type = ty.results; let component_instance = raw_options.instance; let callback = options.callback.map(|i| instance.runtime_callback(i)); let memory = options .memory() .map(|i| instance.runtime_memory(i)) .map(SendSyncPtr::new); let string_encoding = options.string_encoding; let token = StoreToken::new(store.as_context_mut()); let state = store.0.concurrent_state_mut(); let (tx, rx) = oneshot::channel(); let instance = RuntimeInstance { instance: handle.instance().id().instance(), index: component_instance, }; let caller = state.current_thread; let task = GuestTask::new( Box::new(for_any_lower(move |store, params| { lower_params(handle, token.as_context_mut(store), params) })), LiftResult { lift: Box::new(for_any_lift(move |store, result| { lift_result(handle, store, result) })), ty: task_return_type, memory, string_encoding, }, Caller::Host { tx: Some(tx), host_future_present, caller, }, callback.map(|callback| { let callback = SendSyncPtr::new(callback); let instance = handle.instance(); Box::new(move |store: &mut dyn VMStore, event, handle| { let store = token.as_context_mut(store); // SAFETY: Per the contract of `prepare_call`, the callback // will remain valid at least as long is this task exists. unsafe { instance.call_callback(store, callback, event, handle) } }) as CallbackFn }), instance, async_function, )?; let task = state.push(task)?; let new_thread = GuestThread::new_implicit(state, task)?; let thread = state.push(new_thread)?; state.get_mut(task)?.threads.insert(thread); if !store.0.may_enter(instance)? { bail!(Trap::CannotEnterComponent); } Ok(PreparedCall { handle, thread: QualifiedThreadId { task, thread }, param_count, rx, _phantom: PhantomData, }) } /// Queue a call previously prepared using `prepare_call` to be run as part of /// the associated `ComponentInstance`'s event loop. /// /// The returned future will resolve to the result once it is available, but /// must only be polled via the instance's event loop. See /// `StoreContextMut::run_concurrent` for details. pub(crate) fn queue_call( mut store: StoreContextMut, prepared: PreparedCall, ) -> Result> + Send + 'static + use> { let PreparedCall { handle, thread, param_count, rx, .. } = prepared; queue_call0(store.as_context_mut(), handle, thread, param_count)?; Ok(checked( store.0.id(), rx.map(move |result| match result { Ok(r) => match r.downcast() { Ok(r) => Ok(*r), Err(_) => bail_bug!("wrong type of value produced"), }, Err(e) => Err(e.into()), }), )) } /// Queue a call previously prepared using `prepare_call` to be run as part of /// the associated `ComponentInstance`'s event loop. fn queue_call0( store: StoreContextMut, handle: Func, guest_thread: QualifiedThreadId, param_count: usize, ) -> Result<()> { let (_options, _, _ty, raw_options) = handle.abi_info(store.0); let is_concurrent = raw_options.async_; let callback = raw_options.callback; let instance = handle.instance(); let callee = handle.lifted_core_func(store.0); let post_return = handle.post_return_core_func(store.0); let callback = callback.map(|i| { let instance = instance.id().get(store.0); SendSyncPtr::new(instance.runtime_callback(i)) }); log::trace!("queueing call {guest_thread:?}"); // SAFETY: `callee`, `callback`, and `post_return` are valid pointers // (with signatures appropriate for this call) and will remain valid as // long as this instance is valid. unsafe { instance.queue_call( store, guest_thread, SendSyncPtr::new(callee), param_count, 1, is_concurrent, callback, post_return.map(SendSyncPtr::new), ) } }