1 //! Runtime support for the Component Model Async ABI. 2 //! 3 //! This module and its submodules provide host runtime support for Component 4 //! Model Async features such as async-lifted exports, async-lowered imports, 5 //! streams, futures, and related intrinsics. See [the Async 6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) 7 //! for a high-level overview. 8 //! 9 //! At the core of this support is an event loop which schedules and switches 10 //! between guest tasks and any host tasks they create. Each 11 //! `Store` will have at most one event loop running at any given 12 //! time, and that loop may be suspended and resumed by the host embedder using 13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` 14 //! function contains the loop itself, while the 15 //! `StoreOpaque::concurrent_state` field holds its state. 16 //! 17 //! # Public API Overview 18 //! 19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) 20 //! 21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an 22 //! async-lifted or sync-lifted import, creating a guest task. 23 //! 24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified 25 //! instance, allowing any and all tasks belonging to that instance to make 26 //! progress. 27 //! 28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop 29 //! for the specified instance. 30 //! 31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or 32 //! `stream` which may be passed to the guest. This takes a 33 //! `{Future,Stream}Producer` implementation which will be polled for items when 34 //! the consumer requests them. 35 //! 36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by 37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items 38 //! produced by the write end. 39 //! 40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) 41 //! 42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host 43 //! function with the linker. That function will take an `Accessor` as its 44 //! first parameter, which provides access to the store between (but not across) 45 //! await points. 46 //! 47 //! - `Accessor::with`: Access the store and its associated data. 48 //! 49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the 50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use 51 //! in host functions. 52 53 use crate::component::func::{self, Func}; 54 use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError}; 55 use crate::fiber::{self, StoreFiber, StoreFiberYield}; 56 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; 57 use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables}; 58 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; 59 use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType}; 60 use anyhow::{Context as _, Result, anyhow, bail}; 61 use error_contexts::GlobalErrorContextRefCount; 62 use futures::channel::oneshot; 63 use futures::future::{self, Either, FutureExt}; 64 use futures::stream::{FuturesUnordered, StreamExt}; 65 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; 66 use std::any::Any; 67 use std::borrow::ToOwned; 68 use std::boxed::Box; 69 use std::cell::UnsafeCell; 70 use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; 71 use std::fmt; 72 use std::future::Future; 73 use std::marker::PhantomData; 74 use std::mem::{self, ManuallyDrop, MaybeUninit}; 75 use std::ops::DerefMut; 76 use std::pin::{Pin, pin}; 77 use std::ptr::{self, NonNull}; 78 use std::slice; 79 use std::sync::Arc; 80 use std::task::{Context, Poll, Waker}; 81 use std::vec::Vec; 82 use table::{TableDebug, TableId}; 83 use wasmtime_environ::Trap; 84 use wasmtime_environ::component::{ 85 CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, MAX_FLAT_RESULTS, 86 OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, 87 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, 88 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, 89 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, 90 }; 91 92 pub use abort::JoinHandle; 93 pub use future_stream_any::{FutureAny, StreamAny}; 94 pub use futures_and_streams::{ 95 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, 96 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, 97 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, 98 }; 99 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; 100 101 mod abort; 102 mod error_contexts; 103 mod future_stream_any; 104 mod futures_and_streams; 105 pub(crate) mod table; 106 pub(crate) mod tls; 107 108 /// Constant defined in the Component Model spec to indicate that the async 109 /// intrinsic (e.g. `future.write`) has not yet completed. 110 const BLOCKED: u32 = 0xffff_ffff; 111 112 /// Corresponds to `CallState` in the upstream spec. 113 #[derive(Clone, Copy, Eq, PartialEq, Debug)] 114 pub enum Status { 115 Starting = 0, 116 Started = 1, 117 Returned = 2, 118 StartCancelled = 3, 119 ReturnCancelled = 4, 120 } 121 122 impl Status { 123 /// Packs this status and the optional `waitable` provided into a 32-bit 124 /// result that the canonical ABI requires. 125 /// 126 /// The low 4 bits are reserved for the status while the upper 28 bits are 127 /// the waitable, if present. 128 pub fn pack(self, waitable: Option<u32>) -> u32 { 129 assert!(matches!(self, Status::Returned) == waitable.is_none()); 130 let waitable = waitable.unwrap_or(0); 131 assert!(waitable < (1 << 28)); 132 (waitable << 4) | (self as u32) 133 } 134 } 135 136 /// Corresponds to `EventCode` in the Component Model spec, plus related payload 137 /// data. 138 #[derive(Clone, Copy, Debug)] 139 enum Event { 140 None, 141 Cancelled, 142 Subtask { 143 status: Status, 144 }, 145 StreamRead { 146 code: ReturnCode, 147 pending: Option<(TypeStreamTableIndex, u32)>, 148 }, 149 StreamWrite { 150 code: ReturnCode, 151 pending: Option<(TypeStreamTableIndex, u32)>, 152 }, 153 FutureRead { 154 code: ReturnCode, 155 pending: Option<(TypeFutureTableIndex, u32)>, 156 }, 157 FutureWrite { 158 code: ReturnCode, 159 pending: Option<(TypeFutureTableIndex, u32)>, 160 }, 161 } 162 163 impl Event { 164 /// Lower this event to core Wasm integers for delivery to the guest. 165 /// 166 /// Note that the waitable handle, if any, is assumed to be lowered 167 /// separately. 168 fn parts(self) -> (u32, u32) { 169 const EVENT_NONE: u32 = 0; 170 const EVENT_SUBTASK: u32 = 1; 171 const EVENT_STREAM_READ: u32 = 2; 172 const EVENT_STREAM_WRITE: u32 = 3; 173 const EVENT_FUTURE_READ: u32 = 4; 174 const EVENT_FUTURE_WRITE: u32 = 5; 175 const EVENT_CANCELLED: u32 = 6; 176 match self { 177 Event::None => (EVENT_NONE, 0), 178 Event::Cancelled => (EVENT_CANCELLED, 0), 179 Event::Subtask { status } => (EVENT_SUBTASK, status as u32), 180 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), 181 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), 182 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), 183 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), 184 } 185 } 186 } 187 188 /// Corresponds to `CallbackCode` in the spec. 189 mod callback_code { 190 pub const EXIT: u32 = 0; 191 pub const YIELD: u32 = 1; 192 pub const WAIT: u32 = 2; 193 pub const POLL: u32 = 3; 194 } 195 196 /// A flag indicating that the callee is an async-lowered export. 197 /// 198 /// This may be passed to the `async-start` intrinsic from a fused adapter. 199 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; 200 201 /// Provides access to either store data (via the `get` method) or the store 202 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component 203 /// instance to which the current host task belongs. 204 /// 205 /// See [`Accessor::with`] for details. 206 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> { 207 store: StoreContextMut<'a, T>, 208 get_data: fn(&mut T) -> D::Data<'_>, 209 } 210 211 impl<'a, T, D> Access<'a, T, D> 212 where 213 D: HasData + ?Sized, 214 T: 'static, 215 { 216 /// Creates a new [`Access`] from its component parts. 217 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { 218 Self { store, get_data } 219 } 220 221 /// Get mutable access to the store data. 222 pub fn data_mut(&mut self) -> &mut T { 223 self.store.data_mut() 224 } 225 226 /// Get mutable access to the store data. 227 pub fn get(&mut self) -> D::Data<'_> { 228 (self.get_data)(self.data_mut()) 229 } 230 231 /// Spawn a background task. 232 /// 233 /// See [`Accessor::spawn`] for details. 234 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle 235 where 236 T: 'static, 237 { 238 let accessor = Accessor { 239 get_data: self.get_data, 240 token: StoreToken::new(self.store.as_context_mut()), 241 }; 242 self.store 243 .as_context_mut() 244 .spawn_with_accessor(accessor, task) 245 } 246 247 /// Returns the getter this accessor is using to project from `T` into 248 /// `D::Data`. 249 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 250 self.get_data 251 } 252 } 253 254 impl<'a, T, D> AsContext for Access<'a, T, D> 255 where 256 D: HasData + ?Sized, 257 T: 'static, 258 { 259 type Data = T; 260 261 fn as_context(&self) -> StoreContext<'_, T> { 262 self.store.as_context() 263 } 264 } 265 266 impl<'a, T, D> AsContextMut for Access<'a, T, D> 267 where 268 D: HasData + ?Sized, 269 T: 'static, 270 { 271 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { 272 self.store.as_context_mut() 273 } 274 } 275 276 /// Provides scoped mutable access to store data in the context of a concurrent 277 /// host task future. 278 /// 279 /// This allows multiple host task futures to execute concurrently and access 280 /// the store between (but not across) `await` points. 281 /// 282 /// # Rationale 283 /// 284 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to 285 /// `D::Data<'_>`. The problem this is solving, however, is that it does not 286 /// literally store these values. The basic problem is that when a concurrent 287 /// host future is being polled it has access to `&mut T` (and the whole 288 /// `Store`) but when it's not being polled it does not have access to these 289 /// values. This reflects how the store is only ever polling one future at a 290 /// time so the store is effectively being passed between futures. 291 /// 292 /// Rust's `Future` trait, however, has no means of passing a `Store` 293 /// temporarily between futures. The [`Context`](std::task::Context) type does 294 /// not have the ability to attach arbitrary information to it at this time. 295 /// This type, [`Accessor`], is used to bridge this expressivity gap. 296 /// 297 /// The [`Accessor`] type here represents the ability to acquire, temporarily in 298 /// a synchronous manner, the current store. The [`Accessor::with`] function 299 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut 300 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does 301 /// not take an `async` closure as its argument, instead it's a synchronous 302 /// closure which must complete during on run of `Future::poll`. This reflects 303 /// how the store is temporarily made available while a host future is being 304 /// polled. 305 /// 306 /// # Implementation 307 /// 308 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and 309 /// this type additionally doesn't even have a lifetime parameter. This is 310 /// instead a representation of proof of the ability to acquire these while a 311 /// future is being polled. Wasmtime will, when it polls a host future, 312 /// configure ambient state such that the `Accessor` that a future closes over 313 /// will work and be able to access the store. 314 /// 315 /// This has a number of implications for users such as: 316 /// 317 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within 318 /// the lifetime of a single future. 319 /// * A future is expected to, however, close over an `Accessor` and keep it 320 /// alive probably for the duration of the entire future. 321 /// * Different host futures will be given different `Accessor`s, and that's 322 /// intentional. 323 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which 324 /// alleviates some otherwise required bounds to be written down. 325 /// 326 /// # Using `Accessor` in `Drop` 327 /// 328 /// The methods on `Accessor` are only expected to work in the context of 329 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a 330 /// host future can be dropped at any time throughout the system and Wasmtime 331 /// store context is not necessarily available at that time. It's recommended to 332 /// not use `Accessor` methods in anything connected to a `Drop` implementation 333 /// as they will panic and have unintended results. If you run into this though 334 /// feel free to file an issue on the Wasmtime repository. 335 pub struct Accessor<T: 'static, D = HasSelf<T>> 336 where 337 D: HasData + ?Sized, 338 { 339 token: StoreToken<T>, 340 get_data: fn(&mut T) -> D::Data<'_>, 341 } 342 343 /// A helper trait to take any type of accessor-with-data in functions. 344 /// 345 /// This trait is similar to [`AsContextMut`] except that it's used when 346 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The 347 /// [`Accessor`] is the main type used in concurrent settings and is passed to 348 /// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`]. 349 /// 350 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements 351 /// this trait. This effectively means that regardless of the `D` in 352 /// `Accessor<T, D>` it can still be passed to a function which just needs a 353 /// store accessor. 354 /// 355 /// Acquiring an [`Accessor`] can be done through 356 /// [`StoreContextMut::run_concurrent`] for example or in a host function 357 /// through 358 /// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent). 359 pub trait AsAccessor { 360 /// The `T` in `Store<T>` that this accessor refers to. 361 type Data: 'static; 362 363 /// The `D` in `Accessor<T, D>`, or the projection out of 364 /// `Self::Data`. 365 type AccessorData: HasData + ?Sized; 366 367 /// Returns the accessor that this is referring to. 368 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>; 369 } 370 371 impl<T: AsAccessor + ?Sized> AsAccessor for &T { 372 type Data = T::Data; 373 type AccessorData = T::AccessorData; 374 375 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> { 376 T::as_accessor(self) 377 } 378 } 379 380 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> { 381 type Data = T; 382 type AccessorData = D; 383 384 fn as_accessor(&self) -> &Accessor<T, D> { 385 self 386 } 387 } 388 389 // Note that it is intentional at this time that `Accessor` does not actually 390 // store `&mut T` or anything similar. This distinctly enables the `Accessor` 391 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for 392 // that matter). This is used to ergonomically simplify bindings where the 393 // majority of the time `Accessor` is closed over in a future which then needs 394 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as 395 // you already have to write `T: 'static`...) it helps to avoid this. 396 // 397 // Note as well that `Accessor` doesn't actually store its data at all. Instead 398 // it's more of a "proof" of what can be accessed from TLS. API design around 399 // `Accessor` and functions like `Linker::func_wrap_concurrent` are 400 // intentionally made to ensure that `Accessor` is ideally only used in the 401 // context that TLS variables are actually set. For example host functions are 402 // given `&Accessor`, not `Accessor`, and this prevents them from persisting 403 // the value outside of a future. Within the future the TLS variables are all 404 // guaranteed to be set while the future is being polled. 405 // 406 // Finally though this is not an ironclad guarantee, but nor does it need to be. 407 // The TLS APIs are designed to panic or otherwise model usage where they're 408 // called recursively or similar. It's hoped that code cannot be constructed to 409 // actually hit this at runtime but this is not a safety requirement at this 410 // time. 411 const _: () = { 412 const fn assert<T: Send + Sync>() {} 413 assert::<Accessor<UnsafeCell<u32>>>(); 414 }; 415 416 impl<T> Accessor<T> { 417 /// Creates a new `Accessor` backed by the specified functions. 418 /// 419 /// - `get`: used to retrieve the store 420 /// 421 /// - `get_data`: used to "project" from the store's associated data to 422 /// another type (e.g. a field of that data or a wrapper around it). 423 /// 424 /// - `spawn`: used to queue spawned background tasks to be run later 425 pub(crate) fn new(token: StoreToken<T>) -> Self { 426 Self { 427 token, 428 get_data: |x| x, 429 } 430 } 431 } 432 433 impl<T, D> Accessor<T, D> 434 where 435 D: HasData + ?Sized, 436 { 437 /// Run the specified closure, passing it mutable access to the store. 438 /// 439 /// This function is one of the main building blocks of the [`Accessor`] 440 /// type. This yields synchronous, blocking, access to store via an 441 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to 442 /// providing the ability to access `D` via [`Access::get`]. Note that the 443 /// `fun` here is given only temporary access to the store and `T`/`D` 444 /// meaning that the return value `R` here is not allowed to capture borrows 445 /// into the two. If access is needed to data within `T` or `D` outside of 446 /// this closure then it must be `clone`d out, for example. 447 /// 448 /// # Panics 449 /// 450 /// This function will panic if it is call recursively with any other 451 /// accessor already in scope. For example if `with` is called within `fun`, 452 /// then this function will panic. It is up to the embedder to ensure that 453 /// this does not happen. 454 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { 455 tls::get(|vmstore| { 456 fun(Access { 457 store: self.token.as_context_mut(vmstore), 458 get_data: self.get_data, 459 }) 460 }) 461 } 462 463 /// Returns the getter this accessor is using to project from `T` into 464 /// `D::Data`. 465 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 466 self.get_data 467 } 468 469 /// Changes this accessor to access `D2` instead of the current type 470 /// parameter `D`. 471 /// 472 /// This changes the underlying data access from `T` to `D2::Data<'_>`. 473 /// 474 /// # Panics 475 /// 476 /// When using this API the returned value is disconnected from `&self` and 477 /// the lifetime binding the `self` argument. An `Accessor` only works 478 /// within the context of the closure or async closure that it was 479 /// originally given to, however. This means that due to the fact that the 480 /// returned value has no lifetime connection it's possible to use the 481 /// accessor outside of `&self`, the original accessor, and panic. 482 /// 483 /// The returned value should only be used within the scope of the original 484 /// `Accessor` that `self` refers to. 485 pub fn with_getter<D2: HasData>( 486 &self, 487 get_data: fn(&mut T) -> D2::Data<'_>, 488 ) -> Accessor<T, D2> { 489 Accessor { 490 token: self.token, 491 get_data, 492 } 493 } 494 495 /// Spawn a background task which will receive an `&Accessor<T, D>` and 496 /// run concurrently with any other tasks in progress for the current 497 /// store. 498 /// 499 /// This is particularly useful for host functions which return a `stream` 500 /// or `future` such that the code to write to the write end of that 501 /// `stream` or `future` must run after the function returns. 502 /// 503 /// The returned [`JoinHandle`] may be used to cancel the task. 504 /// 505 /// # Panics 506 /// 507 /// Panics if called within a closure provided to the [`Accessor::with`] 508 /// function. This can only be called outside an active invocation of 509 /// [`Accessor::with`]. 510 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle 511 where 512 T: 'static, 513 { 514 let accessor = self.clone_for_spawn(); 515 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) 516 } 517 518 fn clone_for_spawn(&self) -> Self { 519 Self { 520 token: self.token, 521 get_data: self.get_data, 522 } 523 } 524 } 525 526 /// Represents a task which may be provided to `Accessor::spawn`, 527 /// `Accessor::forward`, or `StorecContextMut::spawn`. 528 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable 529 // option. 530 // 531 // As of this writing, it's not possible to specify e.g. `Send` and `Sync` 532 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: 533 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F + 534 // Send + Sync + 'static` fails with a type mismatch error when we try to pass 535 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the 536 // best we can do for the time being. 537 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static 538 where 539 D: HasData + ?Sized, 540 { 541 /// Run the task. 542 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send; 543 } 544 545 /// Represents parameter and result metadata for the caller side of a 546 /// guest->guest call orchestrated by a fused adapter. 547 enum CallerInfo { 548 /// Metadata for a call to an async-lowered import 549 Async { 550 params: Vec<ValRaw>, 551 has_result: bool, 552 }, 553 /// Metadata for a call to an sync-lowered import 554 Sync { 555 params: Vec<ValRaw>, 556 result_count: u32, 557 }, 558 } 559 560 /// Indicates how a guest task is waiting on a waitable set. 561 enum WaitMode { 562 /// The guest task is waiting using `task.wait` 563 Fiber(StoreFiber<'static>), 564 /// The guest task is waiting via a callback declared as part of an 565 /// async-lifted export. 566 Callback(Instance), 567 } 568 569 /// Represents the reason a fiber is suspending itself. 570 #[derive(Debug)] 571 enum SuspendReason { 572 /// The fiber is waiting for an event to be delivered to the specified 573 /// waitable set or task. 574 Waiting { 575 set: TableId<WaitableSet>, 576 thread: QualifiedThreadId, 577 skip_may_block_check: bool, 578 }, 579 /// The fiber has finished handling its most recent work item and is waiting 580 /// for another (or to be dropped if it is no longer needed). 581 NeedWork, 582 /// The fiber is yielding and should be resumed once other tasks have had a 583 /// chance to run. 584 Yielding { thread: QualifiedThreadId }, 585 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. 586 ExplicitlySuspending { 587 thread: QualifiedThreadId, 588 skip_may_block_check: bool, 589 }, 590 } 591 592 /// Represents a pending call into guest code for a given guest task. 593 enum GuestCallKind { 594 /// Indicates there's an event to deliver to the task, possibly related to a 595 /// waitable set the task has been waiting on or polling. 596 DeliverEvent { 597 /// The instance to which the task belongs. 598 instance: Instance, 599 /// The waitable set the event belongs to, if any. 600 /// 601 /// If this is `None` the event will be waiting in the 602 /// `GuestTask::event` field for the task. 603 set: Option<TableId<WaitableSet>>, 604 }, 605 /// Indicates that a new guest task call is pending and may be executed 606 /// using the specified closure. 607 /// 608 /// If the closure returns `Ok(Some(call))`, the `call` should be run 609 /// immediately using `handle_guest_call`. 610 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>), 611 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>), 612 } 613 614 impl fmt::Debug for GuestCallKind { 615 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 616 match self { 617 Self::DeliverEvent { instance, set } => f 618 .debug_struct("DeliverEvent") 619 .field("instance", instance) 620 .field("set", set) 621 .finish(), 622 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), 623 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), 624 } 625 } 626 } 627 628 /// Represents a pending call into guest code for a given guest thread. 629 #[derive(Debug)] 630 struct GuestCall { 631 thread: QualifiedThreadId, 632 kind: GuestCallKind, 633 } 634 635 impl GuestCall { 636 /// Returns whether or not the call is ready to run. 637 /// 638 /// A call will not be ready to run if either: 639 /// 640 /// - the (sub-)component instance to be called has already been entered and 641 /// cannot be reentered until an in-progress call completes 642 /// 643 /// - the call is for a not-yet started task and the (sub-)component 644 /// instance to be called has backpressure enabled 645 fn is_ready(&self, state: &mut ConcurrentState) -> Result<bool> { 646 let task_instance = state.get_mut(self.thread.task)?.instance; 647 let state = state.instance_state(task_instance); 648 649 let ready = match &self.kind { 650 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, 651 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), 652 GuestCallKind::StartExplicit(_) => true, 653 }; 654 log::trace!( 655 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", 656 state.do_not_enter, 657 state.backpressure 658 ); 659 Ok(ready) 660 } 661 } 662 663 /// Job to be run on a worker fiber. 664 enum WorkerItem { 665 GuestCall(GuestCall), 666 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 667 } 668 669 /// Represents state related to an in-progress poll operation (e.g. `task.poll` 670 /// or `CallbackCode.POLL`). 671 #[derive(Debug)] 672 struct PollParams { 673 /// The instance to which the polling thread belongs. 674 instance: Instance, 675 /// The polling thread. 676 thread: QualifiedThreadId, 677 /// The waitable set being polled. 678 set: TableId<WaitableSet>, 679 } 680 681 /// Represents a pending work item to be handled by the event loop for a given 682 /// component instance. 683 enum WorkItem { 684 /// A host task to be pushed to `ConcurrentState::futures`. 685 PushFuture(AlwaysMut<HostTaskFuture>), 686 /// A fiber to resume. 687 ResumeFiber(StoreFiber<'static>), 688 /// A pending call into guest code for a given guest task. 689 GuestCall(GuestCall), 690 /// A pending `task.poll` or `CallbackCode.POLL` operation. 691 Poll(PollParams), 692 /// A job to run on a worker fiber. 693 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 694 } 695 696 impl fmt::Debug for WorkItem { 697 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 698 match self { 699 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), 700 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), 701 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(), 702 Self::Poll(params) => f.debug_tuple("Poll").field(params).finish(), 703 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), 704 } 705 } 706 } 707 708 /// Whether a suspension intrinsic was cancelled or completed 709 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 710 pub(crate) enum WaitResult { 711 Cancelled, 712 Completed, 713 } 714 715 /// Raise a trap if the calling task is synchronous and trying to block prior to 716 /// returning a value. 717 pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> { 718 store.concurrent_state_mut().check_blocking() 719 } 720 721 /// Poll the specified future until it completes on behalf of a guest->host call 722 /// using a sync-lowered import. 723 /// 724 /// This is similar to `Instance::first_poll` except it's for sync-lowered 725 /// imports, meaning we don't need to handle cancellation and we can block the 726 /// caller until the task completes, at which point the caller can handle 727 /// lowering the result to the guest's stack and linear memory. 728 pub(crate) fn poll_and_block<R: Send + Sync + 'static>( 729 store: &mut dyn VMStore, 730 future: impl Future<Output = Result<R>> + Send + 'static, 731 caller_instance: RuntimeComponentInstanceIndex, 732 ) -> Result<R> { 733 let state = store.concurrent_state_mut(); 734 735 // If there is no current guest thread set, that means the host function was 736 // registered using e.g. `LinkerInstance::func_wrap`, in which case it 737 // should complete immediately. 738 let Some(caller) = state.guest_thread else { 739 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) { 740 Poll::Ready(result) => result, 741 Poll::Pending => { 742 unreachable!() 743 } 744 }; 745 }; 746 747 // Save any existing result stashed in `GuestTask::result` so we can replace 748 // it with the new result. 749 let old_result = state 750 .get_mut(caller.task) 751 .with_context(|| format!("bad handle: {caller:?}"))? 752 .result 753 .take(); 754 755 // Add a temporary host task into the table so we can track its progress. 756 // Note that we'll never allocate a waitable handle for the guest since 757 // we're being called synchronously. 758 let task = state.push(HostTask::new(caller_instance, None))?; 759 760 log::trace!("new host task child of {caller:?}: {task:?}"); 761 762 // Wrap the future in a closure which will take care of stashing the result 763 // in `GuestTask::result` and resuming this fiber when the host task 764 // completes. 765 let mut future = Box::pin(async move { 766 let result = future.await?; 767 tls::get(move |store| { 768 let state = store.concurrent_state_mut(); 769 state.get_mut(caller.task)?.result = Some(Box::new(result) as _); 770 771 Waitable::Host(task).set_event( 772 state, 773 Some(Event::Subtask { 774 status: Status::Returned, 775 }), 776 )?; 777 778 Ok(()) 779 }) 780 }) as HostTaskFuture; 781 782 // Finally, poll the future. We can use a dummy `Waker` here because we'll 783 // add the future to `ConcurrentState::futures` and poll it automatically 784 // from the event loop if it doesn't complete immediately here. 785 let poll = tls::set(store, || { 786 future 787 .as_mut() 788 .poll(&mut Context::from_waker(&Waker::noop())) 789 }); 790 791 match poll { 792 Poll::Ready(result) => { 793 // It completed immediately; check the result and delete the task. 794 result?; 795 log::trace!("delete host task {task:?} (already ready)"); 796 store.concurrent_state_mut().delete(task)?; 797 } 798 Poll::Pending => { 799 // It did not complete immediately; add it to 800 // `ConcurrentState::futures` so it will be polled via the event 801 // loop; then use `GuestTask::sync_call_set` to wait for the task to 802 // complete, suspending the current fiber until it does so. 803 let state = store.concurrent_state_mut(); 804 state.push_future(future); 805 806 let set = state.get_mut(caller.task)?.sync_call_set; 807 Waitable::Host(task).join(state, Some(set))?; 808 809 store.suspend(SuspendReason::Waiting { 810 set, 811 thread: caller, 812 skip_may_block_check: false, 813 })?; 814 } 815 } 816 817 // Retrieve and return the result. 818 Ok(*mem::replace( 819 &mut store.concurrent_state_mut().get_mut(caller.task)?.result, 820 old_result, 821 ) 822 .unwrap() 823 .downcast() 824 .unwrap()) 825 } 826 827 /// Execute the specified guest call. 828 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { 829 let mut next = Some(call); 830 while let Some(call) = next.take() { 831 match call.kind { 832 GuestCallKind::DeliverEvent { instance, set } => { 833 let (event, waitable) = instance 834 .get_event(store, call.thread.task, set, true)? 835 .unwrap(); 836 let state = store.concurrent_state_mut(); 837 let task = state.get_mut(call.thread.task)?; 838 let runtime_instance = task.instance; 839 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 840 841 log::trace!( 842 "use callback to deliver event {event:?} to {:?} for {waitable:?}", 843 call.thread, 844 ); 845 846 let old_thread = state.guest_thread.replace(call.thread); 847 log::trace!( 848 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", 849 call.thread 850 ); 851 852 store.maybe_push_call_context(call.thread.task)?; 853 854 let state = store.concurrent_state_mut(); 855 state.enter_instance(runtime_instance); 856 857 let callback = state.get_mut(call.thread.task)?.callback.take().unwrap(); 858 859 let code = callback(store, runtime_instance, event, handle)?; 860 861 let state = store.concurrent_state_mut(); 862 863 state.get_mut(call.thread.task)?.callback = Some(callback); 864 state.exit_instance(runtime_instance)?; 865 866 store.maybe_pop_call_context(call.thread.task)?; 867 868 next = instance.handle_callback_code(store, call.thread, runtime_instance, code)?; 869 870 store.concurrent_state_mut().guest_thread = old_thread; 871 log::trace!( 872 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" 873 ); 874 } 875 GuestCallKind::StartImplicit(fun) => { 876 next = fun(store)?; 877 } 878 GuestCallKind::StartExplicit(fun) => { 879 fun(store)?; 880 } 881 } 882 } 883 884 Ok(()) 885 } 886 887 impl<T> Store<T> { 888 /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. 889 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 890 where 891 T: Send + 'static, 892 { 893 self.as_context_mut().run_concurrent(fun).await 894 } 895 896 #[doc(hidden)] 897 pub fn assert_concurrent_state_empty(&mut self) { 898 self.as_context_mut().assert_concurrent_state_empty(); 899 } 900 901 /// Convenience wrapper for [`StoreContextMut::spawn`]. 902 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle 903 where 904 T: 'static, 905 { 906 self.as_context_mut().spawn(task) 907 } 908 } 909 910 impl<T> StoreContextMut<'_, T> { 911 /// Assert that all the relevant tables and queues in the concurrent state 912 /// for this store are empty. 913 /// 914 /// This is for sanity checking in integration tests 915 /// (e.g. `component-async-tests`) that the relevant state has been cleared 916 /// after each test concludes. This should help us catch leaks, e.g. guest 917 /// tasks which haven't been deleted despite having completed and having 918 /// been dropped by their supertasks. 919 #[doc(hidden)] 920 pub fn assert_concurrent_state_empty(self) { 921 let store = self.0; 922 store 923 .store_data_mut() 924 .components 925 .assert_guest_tables_empty(); 926 let state = store.concurrent_state_mut(); 927 assert!( 928 state.table.get_mut().is_empty(), 929 "non-empty table: {:?}", 930 state.table.get_mut() 931 ); 932 assert!(state.high_priority.is_empty()); 933 assert!(state.low_priority.is_empty()); 934 assert!(state.guest_thread.is_none()); 935 assert!(state.futures.get_mut().as_ref().unwrap().is_empty()); 936 assert!( 937 state 938 .instance_states 939 .iter() 940 .all(|(_, state)| state.pending.is_empty()) 941 ); 942 assert!(state.global_error_context_ref_counts.is_empty()); 943 } 944 945 /// Spawn a background task to run as part of this instance's event loop. 946 /// 947 /// The task will receive an `&Accessor<U>` and run concurrently with 948 /// any other tasks in progress for the instance. 949 /// 950 /// Note that the task will only make progress if and when the event loop 951 /// for this instance is run. 952 /// 953 /// The returned [`SpawnHandle`] may be used to cancel the task. 954 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle 955 where 956 T: 'static, 957 { 958 let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); 959 self.spawn_with_accessor(accessor, task) 960 } 961 962 /// Internal implementation of `spawn` functions where a `store` is 963 /// available along with an `Accessor`. 964 fn spawn_with_accessor<D>( 965 self, 966 accessor: Accessor<T, D>, 967 task: impl AccessorTask<T, D>, 968 ) -> JoinHandle 969 where 970 T: 'static, 971 D: HasData + ?Sized, 972 { 973 // Create an "abortable future" here where internally the future will 974 // hook calls to poll and possibly spawn more background tasks on each 975 // iteration. 976 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); 977 self.0 978 .concurrent_state_mut() 979 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); 980 handle 981 } 982 983 /// Run the specified closure `fun` to completion as part of this store's 984 /// event loop. 985 /// 986 /// This will run `fun` as part of this store's event loop until it 987 /// yields a result. `fun` is provided an [`Accessor`], which provides 988 /// controlled access to the store and its data. 989 /// 990 /// This function can be used to invoke [`Func::call_concurrent`] for 991 /// example within the async closure provided here. 992 /// 993 /// # Store-blocking behavior 994 /// 995 /// 996 /// 997 /// At this time there are certain situations in which the `Future` returned 998 /// by the `AsyncFnOnce` passed to this function will not be polled for an 999 /// extended period of time, despite one or more `Waker::wake` events having 1000 /// occurred for the task to which it belongs. This can manifest as the 1001 /// `Future` seeming to be "blocked" or "locked up", but is actually due to 1002 /// the `Store` being held by e.g. a blocking host function, preventing the 1003 /// `Future` from being polled. A canonical example of this is when the 1004 /// `fun` provided to this function attempts to set a timeout for an 1005 /// invocation of a wasm function. In this situation the async closure is 1006 /// waiting both on (a) the wasm computation to finish, and (b) the timeout 1007 /// to elapse. At this time this setup will not always work and the timeout 1008 /// may not reliably fire. 1009 /// 1010 /// This function will not block the current thread and as such is always 1011 /// suitable to run in an `async` context, but the current implementation of 1012 /// Wasmtime can lead to situations where a certain wasm computation is 1013 /// required to make progress the closure to make progress. This is an 1014 /// artifact of Wasmtime's historical implementation of `async` functions 1015 /// and is the topic of [#11869] and [#11870]. In the timeout example from 1016 /// above it means that Wasmtime can get "wedged" for a bit where (a) must 1017 /// progress for a readiness notification of (b) to get delivered. 1018 /// 1019 /// This effectively means that it's not possible to reliably perform a 1020 /// "select" operation within the `fun` closure, which timeouts for example 1021 /// are based on. Fixing this requires some relatively major refactoring 1022 /// work within Wasmtime itself. This is a known pitfall otherwise and one 1023 /// that is intended to be fixed one day. In the meantime it's recommended 1024 /// to apply timeouts or such to the entire `run_concurrent` call itself 1025 /// rather than internally. 1026 /// 1027 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 1028 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 1029 /// 1030 /// # Example 1031 /// 1032 /// ``` 1033 /// # use { 1034 /// # anyhow::{Result}, 1035 /// # wasmtime::{ 1036 /// # component::{ Component, Linker, Resource, ResourceTable}, 1037 /// # Config, Engine, Store 1038 /// # }, 1039 /// # }; 1040 /// # 1041 /// # struct MyResource(u32); 1042 /// # struct Ctx { table: ResourceTable } 1043 /// # 1044 /// # async fn foo() -> Result<()> { 1045 /// # let mut config = Config::new(); 1046 /// # let engine = Engine::new(&config)?; 1047 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); 1048 /// # let mut linker = Linker::new(&engine); 1049 /// # let component = Component::new(&engine, "")?; 1050 /// # let instance = linker.instantiate_async(&mut store, &component).await?; 1051 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?; 1052 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; 1053 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { 1054 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; 1055 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0; 1056 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; 1057 /// bar.call_concurrent(accessor, (value.0,)).await?; 1058 /// Ok(()) 1059 /// }).await??; 1060 /// # Ok(()) 1061 /// # } 1062 /// ``` 1063 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 1064 where 1065 T: Send + 'static, 1066 { 1067 self.do_run_concurrent(fun, false).await 1068 } 1069 1070 pub(super) async fn run_concurrent_trap_on_idle<R>( 1071 self, 1072 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1073 ) -> Result<R> 1074 where 1075 T: Send + 'static, 1076 { 1077 self.do_run_concurrent(fun, true).await 1078 } 1079 1080 async fn do_run_concurrent<R>( 1081 mut self, 1082 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1083 trap_on_idle: bool, 1084 ) -> Result<R> 1085 where 1086 T: Send + 'static, 1087 { 1088 check_recursive_run(); 1089 let token = StoreToken::new(self.as_context_mut()); 1090 1091 struct Dropper<'a, T: 'static, V> { 1092 store: StoreContextMut<'a, T>, 1093 value: ManuallyDrop<V>, 1094 } 1095 1096 impl<'a, T, V> Drop for Dropper<'a, T, V> { 1097 fn drop(&mut self) { 1098 tls::set(self.store.0, || { 1099 // SAFETY: Here we drop the value without moving it for the 1100 // first and only time -- per the contract for `Drop::drop`, 1101 // this code won't run again, and the `value` field will no 1102 // longer be accessible. 1103 unsafe { ManuallyDrop::drop(&mut self.value) } 1104 }); 1105 } 1106 } 1107 1108 let accessor = &Accessor::new(token); 1109 let dropper = &mut Dropper { 1110 store: self, 1111 value: ManuallyDrop::new(fun(accessor)), 1112 }; 1113 // SAFETY: We never move `dropper` nor its `value` field. 1114 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; 1115 1116 dropper 1117 .store 1118 .as_context_mut() 1119 .poll_until(future, trap_on_idle) 1120 .await 1121 } 1122 1123 /// Run this store's event loop. 1124 /// 1125 /// The returned future will resolve when the specified future completes or, 1126 /// if `trap_on_idle` is true, when the event loop can't make further 1127 /// progress. 1128 async fn poll_until<R>( 1129 mut self, 1130 mut future: Pin<&mut impl Future<Output = R>>, 1131 trap_on_idle: bool, 1132 ) -> Result<R> 1133 where 1134 T: Send + 'static, 1135 { 1136 struct Reset<'a, T: 'static> { 1137 store: StoreContextMut<'a, T>, 1138 futures: Option<FuturesUnordered<HostTaskFuture>>, 1139 } 1140 1141 impl<'a, T> Drop for Reset<'a, T> { 1142 fn drop(&mut self) { 1143 if let Some(futures) = self.futures.take() { 1144 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); 1145 } 1146 } 1147 } 1148 1149 loop { 1150 // Take `ConcurrentState::futures` out of the store so we can poll 1151 // it while also safely giving any of the futures inside access to 1152 // `self`. 1153 let futures = self.0.concurrent_state_mut().futures.get_mut().take(); 1154 let mut reset = Reset { 1155 store: self.as_context_mut(), 1156 futures, 1157 }; 1158 let mut next = pin!(reset.futures.as_mut().unwrap().next()); 1159 1160 let result = future::poll_fn(|cx| { 1161 // First, poll the future we were passed as an argument and 1162 // return immediately if it's ready. 1163 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { 1164 return Poll::Ready(Ok(Either::Left(value))); 1165 } 1166 1167 // Next, poll `ConcurrentState::futures` (which includes any 1168 // pending host tasks and/or background tasks), returning 1169 // immediately if one of them fails. 1170 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { 1171 Poll::Ready(Some(output)) => { 1172 match output { 1173 Err(e) => return Poll::Ready(Err(e)), 1174 Ok(()) => {} 1175 } 1176 Poll::Ready(true) 1177 } 1178 Poll::Ready(None) => Poll::Ready(false), 1179 Poll::Pending => Poll::Pending, 1180 }; 1181 1182 // Next, check the "high priority" work queue and return 1183 // immediately if it has at least one item. 1184 let state = reset.store.0.concurrent_state_mut(); 1185 let ready = mem::take(&mut state.high_priority); 1186 let ready = if ready.is_empty() { 1187 // Next, check the "low priority" work queue and return 1188 // immediately if it has at least one item. 1189 let ready = mem::take(&mut state.low_priority); 1190 if ready.is_empty() { 1191 return match next { 1192 Poll::Ready(true) => { 1193 // In this case, one of the futures in 1194 // `ConcurrentState::futures` completed 1195 // successfully, so we return now and continue 1196 // the outer loop in case there is another one 1197 // ready to complete. 1198 Poll::Ready(Ok(Either::Right(Vec::new()))) 1199 } 1200 Poll::Ready(false) => { 1201 // Poll the future we were passed one last time 1202 // in case one of `ConcurrentState::futures` had 1203 // the side effect of unblocking it. 1204 if let Poll::Ready(value) = 1205 tls::set(reset.store.0, || future.as_mut().poll(cx)) 1206 { 1207 Poll::Ready(Ok(Either::Left(value))) 1208 } else { 1209 // In this case, there are no more pending 1210 // futures in `ConcurrentState::futures`, 1211 // there are no remaining work items, _and_ 1212 // the future we were passed as an argument 1213 // still hasn't completed. 1214 if trap_on_idle { 1215 // `trap_on_idle` is true, so we exit 1216 // immediately. 1217 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) 1218 } else { 1219 // `trap_on_idle` is false, so we assume 1220 // that future will wake up and give us 1221 // more work to do when it's ready to. 1222 Poll::Pending 1223 } 1224 } 1225 } 1226 // There is at least one pending future in 1227 // `ConcurrentState::futures` and we have nothing 1228 // else to do but wait for now, so we return 1229 // `Pending`. 1230 Poll::Pending => Poll::Pending, 1231 }; 1232 } else { 1233 ready 1234 } 1235 } else { 1236 ready 1237 }; 1238 1239 Poll::Ready(Ok(Either::Right(ready))) 1240 }) 1241 .await; 1242 1243 // Put the `ConcurrentState::futures` back into the store before we 1244 // return or handle any work items since one or more of those items 1245 // might append more futures. 1246 drop(reset); 1247 1248 match result? { 1249 // The future we were passed as an argument completed, so we 1250 // return the result. 1251 Either::Left(value) => break Ok(value), 1252 // The future we were passed has not yet completed, so handle 1253 // any work items and then loop again. 1254 Either::Right(ready) => { 1255 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> { 1256 store: StoreContextMut<'a, T>, 1257 ready: I, 1258 } 1259 1260 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> { 1261 fn drop(&mut self) { 1262 while let Some(item) = self.ready.next() { 1263 match item { 1264 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), 1265 WorkItem::PushFuture(future) => { 1266 tls::set(self.store.0, move || drop(future)) 1267 } 1268 _ => {} 1269 } 1270 } 1271 } 1272 } 1273 1274 let mut dispose = Dispose { 1275 store: self.as_context_mut(), 1276 ready: ready.into_iter(), 1277 }; 1278 1279 while let Some(item) = dispose.ready.next() { 1280 dispose 1281 .store 1282 .as_context_mut() 1283 .handle_work_item(item) 1284 .await?; 1285 } 1286 } 1287 } 1288 } 1289 } 1290 1291 /// Handle the specified work item, possibly resuming a fiber if applicable. 1292 async fn handle_work_item(self, item: WorkItem) -> Result<()> 1293 where 1294 T: Send, 1295 { 1296 log::trace!("handle work item {item:?}"); 1297 match item { 1298 WorkItem::PushFuture(future) => { 1299 self.0 1300 .concurrent_state_mut() 1301 .futures 1302 .get_mut() 1303 .as_mut() 1304 .unwrap() 1305 .push(future.into_inner()); 1306 } 1307 WorkItem::ResumeFiber(fiber) => { 1308 self.0.resume_fiber(fiber).await?; 1309 } 1310 WorkItem::GuestCall(call) => { 1311 let state = self.0.concurrent_state_mut(); 1312 if call.is_ready(state)? { 1313 self.run_on_worker(WorkerItem::GuestCall(call)).await?; 1314 } else { 1315 let task = state.get_mut(call.thread.task)?; 1316 if !task.starting_sent { 1317 task.starting_sent = true; 1318 if let GuestCallKind::StartImplicit(_) = &call.kind { 1319 Waitable::Guest(call.thread.task).set_event( 1320 state, 1321 Some(Event::Subtask { 1322 status: Status::Starting, 1323 }), 1324 )?; 1325 } 1326 } 1327 1328 let runtime_instance = state.get_mut(call.thread.task)?.instance; 1329 state 1330 .instance_state(runtime_instance) 1331 .pending 1332 .insert(call.thread, call.kind); 1333 } 1334 } 1335 WorkItem::Poll(params) => { 1336 let state = self.0.concurrent_state_mut(); 1337 if state.get_mut(params.thread.task)?.event.is_some() 1338 || !state.get_mut(params.set)?.ready.is_empty() 1339 { 1340 // There's at least one event immediately available; deliver 1341 // it to the guest ASAP. 1342 state.push_high_priority(WorkItem::GuestCall(GuestCall { 1343 thread: params.thread, 1344 kind: GuestCallKind::DeliverEvent { 1345 instance: params.instance, 1346 set: Some(params.set), 1347 }, 1348 })); 1349 } else { 1350 // There are no events immediately available; deliver 1351 // `Event::None` to the guest. 1352 state.get_mut(params.thread.task)?.event = Some(Event::None); 1353 state.push_high_priority(WorkItem::GuestCall(GuestCall { 1354 thread: params.thread, 1355 kind: GuestCallKind::DeliverEvent { 1356 instance: params.instance, 1357 set: Some(params.set), 1358 }, 1359 })); 1360 } 1361 } 1362 WorkItem::WorkerFunction(fun) => { 1363 self.run_on_worker(WorkerItem::Function(fun)).await?; 1364 } 1365 } 1366 1367 Ok(()) 1368 } 1369 1370 /// Execute the specified guest call on a worker fiber. 1371 async fn run_on_worker(self, item: WorkerItem) -> Result<()> 1372 where 1373 T: Send, 1374 { 1375 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { 1376 fiber 1377 } else { 1378 fiber::make_fiber(self.0, move |store| { 1379 loop { 1380 match store.concurrent_state_mut().worker_item.take().unwrap() { 1381 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, 1382 WorkerItem::Function(fun) => fun.into_inner()(store)?, 1383 } 1384 1385 store.suspend(SuspendReason::NeedWork)?; 1386 } 1387 })? 1388 }; 1389 1390 let worker_item = &mut self.0.concurrent_state_mut().worker_item; 1391 assert!(worker_item.is_none()); 1392 *worker_item = Some(item); 1393 1394 self.0.resume_fiber(worker).await 1395 } 1396 1397 /// Wrap the specified host function in a future which will call it, passing 1398 /// it an `&Accessor<T>`. 1399 /// 1400 /// See the `Accessor` documentation for details. 1401 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static 1402 where 1403 T: 'static, 1404 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> 1405 + Send 1406 + Sync 1407 + 'static, 1408 R: Send + Sync + 'static, 1409 { 1410 let token = StoreToken::new(self); 1411 async move { 1412 let mut accessor = Accessor::new(token); 1413 closure(&mut accessor).await 1414 } 1415 } 1416 } 1417 1418 impl StoreOpaque { 1419 /// Resume the specified fiber, giving it exclusive access to the specified 1420 /// store. 1421 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { 1422 let old_thread = self.concurrent_state_mut().guest_thread; 1423 log::trace!("resume_fiber: save current thread {old_thread:?}"); 1424 1425 let fiber = fiber::resolve_or_release(self, fiber).await?; 1426 1427 let state = self.concurrent_state_mut(); 1428 1429 state.guest_thread = old_thread; 1430 if let Some(ref ot) = old_thread { 1431 state.get_mut(ot.thread)?.state = GuestThreadState::Running; 1432 } 1433 log::trace!("resume_fiber: restore current thread {old_thread:?}"); 1434 1435 if let Some(mut fiber) = fiber { 1436 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); 1437 // See the `SuspendReason` documentation for what each case means. 1438 match state.suspend_reason.take().unwrap() { 1439 SuspendReason::NeedWork => { 1440 if state.worker.is_none() { 1441 state.worker = Some(fiber); 1442 } else { 1443 fiber.dispose(self); 1444 } 1445 } 1446 SuspendReason::Yielding { thread, .. } => { 1447 state.get_mut(thread.thread)?.state = GuestThreadState::Pending; 1448 state.push_low_priority(WorkItem::ResumeFiber(fiber)); 1449 } 1450 SuspendReason::ExplicitlySuspending { thread, .. } => { 1451 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); 1452 } 1453 SuspendReason::Waiting { set, thread, .. } => { 1454 let old = state 1455 .get_mut(set)? 1456 .waiting 1457 .insert(thread, WaitMode::Fiber(fiber)); 1458 assert!(old.is_none()); 1459 } 1460 }; 1461 } else { 1462 log::trace!("resume_fiber: fiber has exited"); 1463 } 1464 1465 Ok(()) 1466 } 1467 1468 /// Suspend the current fiber, storing the reason in 1469 /// `ConcurrentState::suspend_reason` to indicate the conditions under which 1470 /// it should be resumed. 1471 /// 1472 /// See the `SuspendReason` documentation for details. 1473 fn suspend(&mut self, reason: SuspendReason) -> Result<()> { 1474 log::trace!("suspend fiber: {reason:?}"); 1475 1476 // If we're yielding or waiting on behalf of a guest thread, we'll need to 1477 // pop the call context which manages resource borrows before suspending 1478 // and then push it again once we've resumed. 1479 let task = match &reason { 1480 SuspendReason::Yielding { thread, .. } 1481 | SuspendReason::Waiting { thread, .. } 1482 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), 1483 SuspendReason::NeedWork => None, 1484 }; 1485 1486 let old_guest_thread = if let Some(task) = task { 1487 self.maybe_pop_call_context(task)?; 1488 self.concurrent_state_mut().guest_thread 1489 } else { 1490 None 1491 }; 1492 1493 // We should not have reached here unless either there's no current 1494 // task, or the current task is permitted to block. In addition, we 1495 // special-case `thread.switch-to` and waiting for a subtask to go from 1496 // `starting` to `started`, both of which we consider non-blocking 1497 // operations despite requiring a suspend. 1498 assert!( 1499 matches!( 1500 reason, 1501 SuspendReason::ExplicitlySuspending { 1502 skip_may_block_check: true, 1503 .. 1504 } | SuspendReason::Waiting { 1505 skip_may_block_check: true, 1506 .. 1507 } 1508 ) || old_guest_thread 1509 .map(|thread| self.concurrent_state_mut().may_block(thread.task)) 1510 .unwrap_or(true) 1511 ); 1512 1513 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; 1514 assert!(suspend_reason.is_none()); 1515 *suspend_reason = Some(reason); 1516 1517 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; 1518 1519 if let Some(task) = task { 1520 self.concurrent_state_mut().guest_thread = old_guest_thread; 1521 self.maybe_push_call_context(task)?; 1522 } 1523 1524 Ok(()) 1525 } 1526 1527 /// Push the call context for managing resource borrows for the specified 1528 /// guest task if it has not yet either returned a result or cancelled 1529 /// itself. 1530 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> { 1531 let task = self.concurrent_state_mut().get_mut(guest_task)?; 1532 1533 if !task.returned_or_cancelled() { 1534 log::trace!("push call context for {guest_task:?}"); 1535 let call_context = task.call_context.take().unwrap(); 1536 self.component_resource_state().0.push(call_context); 1537 } 1538 Ok(()) 1539 } 1540 1541 /// Pop the call context for managing resource borrows for the specified 1542 /// guest task if it has not yet either returned a result or cancelled 1543 /// itself. 1544 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> { 1545 if !self 1546 .concurrent_state_mut() 1547 .get_mut(guest_task)? 1548 .returned_or_cancelled() 1549 { 1550 log::trace!("pop call context for {guest_task:?}"); 1551 let call_context = Some(self.component_resource_state().0.pop().unwrap()); 1552 self.concurrent_state_mut() 1553 .get_mut(guest_task)? 1554 .call_context = call_context; 1555 } 1556 Ok(()) 1557 } 1558 1559 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { 1560 let state = self.concurrent_state_mut(); 1561 let caller = state.guest_thread.unwrap(); 1562 let old_set = waitable.common(state)?.set; 1563 let set = state.get_mut(caller.task)?.sync_call_set; 1564 waitable.join(state, Some(set))?; 1565 self.suspend(SuspendReason::Waiting { 1566 set, 1567 thread: caller, 1568 skip_may_block_check: false, 1569 })?; 1570 let state = self.concurrent_state_mut(); 1571 waitable.join(state, old_set) 1572 } 1573 } 1574 1575 impl Instance { 1576 /// Get the next pending event for the specified task and (optional) 1577 /// waitable set, along with the waitable handle if applicable. 1578 fn get_event( 1579 self, 1580 store: &mut StoreOpaque, 1581 guest_task: TableId<GuestTask>, 1582 set: Option<TableId<WaitableSet>>, 1583 cancellable: bool, 1584 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> { 1585 let state = store.concurrent_state_mut(); 1586 1587 if let Some(event) = state.get_mut(guest_task)?.event.take() { 1588 log::trace!("deliver event {event:?} to {guest_task:?}"); 1589 1590 if cancellable || !matches!(event, Event::Cancelled) { 1591 return Ok(Some((event, None))); 1592 } else { 1593 state.get_mut(guest_task)?.event = Some(event); 1594 } 1595 } 1596 1597 Ok( 1598 if let Some((set, waitable)) = set 1599 .and_then(|set| { 1600 state 1601 .get_mut(set) 1602 .map(|v| v.ready.pop_first().map(|v| (set, v))) 1603 .transpose() 1604 }) 1605 .transpose()? 1606 { 1607 let common = waitable.common(state)?; 1608 let handle = common.handle.unwrap(); 1609 let event = common.event.take().unwrap(); 1610 1611 log::trace!( 1612 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" 1613 ); 1614 1615 waitable.on_delivery(store, self, event); 1616 1617 Some((event, Some((waitable, handle)))) 1618 } else { 1619 None 1620 }, 1621 ) 1622 } 1623 1624 /// Handle the `CallbackCode` returned from an async-lifted export or its 1625 /// callback. 1626 /// 1627 /// If this returns `Ok(Some(call))`, then `call` should be run immediately 1628 /// using `handle_guest_call`. 1629 fn handle_callback_code( 1630 self, 1631 store: &mut StoreOpaque, 1632 guest_thread: QualifiedThreadId, 1633 runtime_instance: RuntimeComponentInstanceIndex, 1634 code: u32, 1635 ) -> Result<Option<GuestCall>> { 1636 let (code, set) = unpack_callback_code(code); 1637 1638 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); 1639 1640 let state = store.concurrent_state_mut(); 1641 1642 let get_set = |store, handle| { 1643 if handle == 0 { 1644 bail!("invalid waitable-set handle"); 1645 } 1646 1647 let set = self.id().get_mut(store).guest_tables().0[runtime_instance] 1648 .waitable_set_rep(handle)?; 1649 1650 Ok(TableId::<WaitableSet>::new(set)) 1651 }; 1652 1653 Ok(match code { 1654 callback_code::EXIT => { 1655 log::trace!("implicit thread {guest_thread:?} completed"); 1656 self.cleanup_thread(store, guest_thread, runtime_instance)?; 1657 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1658 if task.threads.is_empty() && !task.returned_or_cancelled() { 1659 bail!(Trap::NoAsyncResult); 1660 } 1661 match &task.caller { 1662 Caller::Host { .. } => { 1663 if task.ready_to_delete() { 1664 Waitable::Guest(guest_thread.task) 1665 .delete_from(store.concurrent_state_mut())?; 1666 } 1667 } 1668 Caller::Guest { .. } => { 1669 task.exited = true; 1670 task.callback = None; 1671 } 1672 } 1673 None 1674 } 1675 callback_code::YIELD => { 1676 let task = state.get_mut(guest_thread.task)?; 1677 assert!(task.event.is_none()); 1678 task.event = Some(Event::None); 1679 let call = GuestCall { 1680 thread: guest_thread, 1681 kind: GuestCallKind::DeliverEvent { 1682 instance: self, 1683 set: None, 1684 }, 1685 }; 1686 if state.may_block(guest_thread.task) { 1687 // Push this thread onto the "low priority" queue so it runs 1688 // after any other threads have had a chance to run. 1689 state.push_low_priority(WorkItem::GuestCall(call)); 1690 None 1691 } else { 1692 // Yielding in a non-blocking context is defined as a no-op 1693 // according to the spec, so we must run this thread 1694 // immediately without allowing any others to run. 1695 Some(call) 1696 } 1697 } 1698 callback_code::WAIT | callback_code::POLL => { 1699 // The task may only return `WAIT` or `POLL` if it was created 1700 // for a call to an async export). Otherwise, we'll trap. 1701 state.check_blocking_for(guest_thread.task)?; 1702 1703 let set = get_set(store, set)?; 1704 let state = store.concurrent_state_mut(); 1705 1706 if state.get_mut(guest_thread.task)?.event.is_some() 1707 || !state.get_mut(set)?.ready.is_empty() 1708 { 1709 // An event is immediately available; deliver it ASAP. 1710 state.push_high_priority(WorkItem::GuestCall(GuestCall { 1711 thread: guest_thread, 1712 kind: GuestCallKind::DeliverEvent { 1713 instance: self, 1714 set: Some(set), 1715 }, 1716 })); 1717 } else { 1718 // No event is immediately available. 1719 match code { 1720 callback_code::POLL => { 1721 // We're polling, so just yield and check whether an 1722 // event has arrived after that. 1723 state.push_low_priority(WorkItem::Poll(PollParams { 1724 instance: self, 1725 thread: guest_thread, 1726 set, 1727 })); 1728 } 1729 callback_code::WAIT => { 1730 // We're waiting, so register to be woken up when an 1731 // event is published for this waitable set. 1732 // 1733 // Here we also set `GuestTask::wake_on_cancel` 1734 // which allows `subtask.cancel` to interrupt the 1735 // wait. 1736 let old = state 1737 .get_mut(guest_thread.thread)? 1738 .wake_on_cancel 1739 .replace(set); 1740 assert!(old.is_none()); 1741 let old = state 1742 .get_mut(set)? 1743 .waiting 1744 .insert(guest_thread, WaitMode::Callback(self)); 1745 assert!(old.is_none()); 1746 } 1747 _ => unreachable!(), 1748 } 1749 } 1750 None 1751 } 1752 _ => bail!("unsupported callback code: {code}"), 1753 }) 1754 } 1755 1756 fn cleanup_thread( 1757 self, 1758 store: &mut StoreOpaque, 1759 guest_thread: QualifiedThreadId, 1760 runtime_instance: RuntimeComponentInstanceIndex, 1761 ) -> Result<()> { 1762 let guest_id = store 1763 .concurrent_state_mut() 1764 .get_mut(guest_thread.thread)? 1765 .instance_rep; 1766 self.id().get_mut(store).guest_tables().0[runtime_instance] 1767 .guest_thread_remove(guest_id.unwrap())?; 1768 1769 store.concurrent_state_mut().delete(guest_thread.thread)?; 1770 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1771 task.threads.remove(&guest_thread.thread); 1772 Ok(()) 1773 } 1774 1775 /// Add the specified guest call to the "high priority" work item queue, to 1776 /// be started as soon as backpressure and/or reentrance rules allow. 1777 /// 1778 /// SAFETY: The raw pointer arguments must be valid references to guest 1779 /// functions (with the appropriate signatures) when the closures queued by 1780 /// this function are called. 1781 unsafe fn queue_call<T: 'static>( 1782 self, 1783 mut store: StoreContextMut<T>, 1784 guest_thread: QualifiedThreadId, 1785 callee: SendSyncPtr<VMFuncRef>, 1786 param_count: usize, 1787 result_count: usize, 1788 flags: Option<InstanceFlags>, 1789 async_: bool, 1790 callback: Option<SendSyncPtr<VMFuncRef>>, 1791 post_return: Option<SendSyncPtr<VMFuncRef>>, 1792 ) -> Result<()> { 1793 /// Return a closure which will call the specified function in the scope 1794 /// of the specified task. 1795 /// 1796 /// This will use `GuestTask::lower_params` to lower the parameters, but 1797 /// will not lift the result; instead, it returns a 1798 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if 1799 /// any, may be lifted. Note that an async-lifted export will have 1800 /// returned its result using the `task.return` intrinsic (or not 1801 /// returned a result at all, in the case of `task.cancel`), in which 1802 /// case the "result" of this call will either be a callback code or 1803 /// nothing. 1804 /// 1805 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when 1806 /// the returned closure is called. 1807 unsafe fn make_call<T: 'static>( 1808 store: StoreContextMut<T>, 1809 guest_thread: QualifiedThreadId, 1810 callee: SendSyncPtr<VMFuncRef>, 1811 param_count: usize, 1812 result_count: usize, 1813 flags: Option<InstanceFlags>, 1814 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]> 1815 + Send 1816 + Sync 1817 + 'static 1818 + use<T> { 1819 let token = StoreToken::new(store); 1820 move |store: &mut dyn VMStore| { 1821 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; 1822 1823 store 1824 .concurrent_state_mut() 1825 .get_mut(guest_thread.thread)? 1826 .state = GuestThreadState::Running; 1827 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1828 let may_enter_after_call = task.call_post_return_automatically(); 1829 let lower = task.lower_params.take().unwrap(); 1830 1831 lower(store, &mut storage[..param_count])?; 1832 1833 let mut store = token.as_context_mut(store); 1834 1835 // SAFETY: Per the contract documented in `make_call's` 1836 // documentation, `callee` must be a valid pointer. 1837 unsafe { 1838 if let Some(mut flags) = flags { 1839 flags.set_may_enter(false); 1840 } 1841 crate::Func::call_unchecked_raw( 1842 &mut store, 1843 callee.as_non_null(), 1844 NonNull::new( 1845 &mut storage[..param_count.max(result_count)] 1846 as *mut [MaybeUninit<ValRaw>] as _, 1847 ) 1848 .unwrap(), 1849 )?; 1850 if let Some(mut flags) = flags { 1851 flags.set_may_enter(may_enter_after_call); 1852 } 1853 } 1854 1855 Ok(storage) 1856 } 1857 } 1858 1859 // SAFETY: Per the contract described in this function documentation, 1860 // the `callee` pointer which `call` closes over must be valid when 1861 // called by the closure we queue below. 1862 let call = unsafe { 1863 make_call( 1864 store.as_context_mut(), 1865 guest_thread, 1866 callee, 1867 param_count, 1868 result_count, 1869 flags, 1870 ) 1871 }; 1872 1873 let callee_instance = store 1874 .0 1875 .concurrent_state_mut() 1876 .get_mut(guest_thread.task)? 1877 .instance; 1878 let fun = if callback.is_some() { 1879 assert!(async_); 1880 1881 Box::new(move |store: &mut dyn VMStore| { 1882 self.add_guest_thread_to_instance_table( 1883 guest_thread.thread, 1884 store, 1885 callee_instance, 1886 )?; 1887 let old_thread = store 1888 .concurrent_state_mut() 1889 .guest_thread 1890 .replace(guest_thread); 1891 log::trace!( 1892 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" 1893 ); 1894 1895 store.maybe_push_call_context(guest_thread.task)?; 1896 1897 store.concurrent_state_mut().enter_instance(callee_instance); 1898 1899 // SAFETY: See the documentation for `make_call` to review the 1900 // contract we must uphold for `call` here. 1901 // 1902 // Per the contract described in the `queue_call` 1903 // documentation, the `callee` pointer which `call` closes 1904 // over must be valid. 1905 let storage = call(store)?; 1906 1907 store 1908 .concurrent_state_mut() 1909 .exit_instance(callee_instance)?; 1910 1911 store.maybe_pop_call_context(guest_thread.task)?; 1912 1913 let state = store.concurrent_state_mut(); 1914 state.guest_thread = old_thread; 1915 old_thread 1916 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 1917 log::trace!("stackless call: restored {old_thread:?} as current thread"); 1918 1919 // SAFETY: `wasmparser` will have validated that the callback 1920 // function returns a `i32` result. 1921 let code = unsafe { storage[0].assume_init() }.get_i32() as u32; 1922 1923 self.handle_callback_code(store, guest_thread, callee_instance, code) 1924 }) 1925 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync> 1926 } else { 1927 let token = StoreToken::new(store.as_context_mut()); 1928 Box::new(move |store: &mut dyn VMStore| { 1929 self.add_guest_thread_to_instance_table( 1930 guest_thread.thread, 1931 store, 1932 callee_instance, 1933 )?; 1934 let old_thread = store 1935 .concurrent_state_mut() 1936 .guest_thread 1937 .replace(guest_thread); 1938 log::trace!( 1939 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", 1940 ); 1941 let mut flags = self.id().get(store).instance_flags(callee_instance); 1942 1943 store.maybe_push_call_context(guest_thread.task)?; 1944 1945 // Unless this is a callback-less (i.e. stackful) 1946 // async-lifted export, we need to record that the instance 1947 // cannot be entered until the call returns. 1948 if !async_ { 1949 store.concurrent_state_mut().enter_instance(callee_instance); 1950 } 1951 1952 // SAFETY: See the documentation for `make_call` to review the 1953 // contract we must uphold for `call` here. 1954 // 1955 // Per the contract described in the `queue_call` 1956 // documentation, the `callee` pointer which `call` closes 1957 // over must be valid. 1958 let storage = call(store)?; 1959 1960 // This is a callback-less call, so the implicit thread has now completed 1961 self.cleanup_thread(store, guest_thread, callee_instance)?; 1962 1963 if async_ { 1964 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1965 if task.threads.is_empty() && !task.returned_or_cancelled() { 1966 bail!(Trap::NoAsyncResult); 1967 } 1968 } else { 1969 // This is a sync-lifted export, so now is when we lift the 1970 // result, optionally call the post-return function, if any, 1971 // and finally notify any current or future waiters that the 1972 // subtask has returned. 1973 1974 let lift = { 1975 let state = store.concurrent_state_mut(); 1976 state.exit_instance(callee_instance)?; 1977 1978 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 1979 1980 state 1981 .get_mut(guest_thread.task)? 1982 .lift_result 1983 .take() 1984 .unwrap() 1985 }; 1986 1987 // SAFETY: `result_count` represents the number of core Wasm 1988 // results returned, per `wasmparser`. 1989 let result = (lift.lift)(store, unsafe { 1990 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>( 1991 &storage[..result_count], 1992 ) 1993 })?; 1994 1995 let post_return_arg = match result_count { 1996 0 => ValRaw::i32(0), 1997 // SAFETY: `result_count` represents the number of 1998 // core Wasm results returned, per `wasmparser`. 1999 1 => unsafe { storage[0].assume_init() }, 2000 _ => unreachable!(), 2001 }; 2002 2003 if store 2004 .concurrent_state_mut() 2005 .get_mut(guest_thread.task)? 2006 .call_post_return_automatically() 2007 { 2008 unsafe { 2009 flags.set_may_leave(false); 2010 flags.set_needs_post_return(false); 2011 } 2012 2013 if let Some(func) = post_return { 2014 let mut store = token.as_context_mut(store); 2015 2016 // SAFETY: `func` is a valid `*mut VMFuncRef` from 2017 // either `wasmtime-cranelift`-generated fused adapter 2018 // code or `component::Options`. Per `wasmparser` 2019 // post-return signature validation, we know it takes a 2020 // single parameter. 2021 unsafe { 2022 crate::Func::call_unchecked_raw( 2023 &mut store, 2024 func.as_non_null(), 2025 slice::from_ref(&post_return_arg).into(), 2026 )?; 2027 } 2028 } 2029 2030 unsafe { 2031 flags.set_may_leave(true); 2032 flags.set_may_enter(true); 2033 } 2034 } 2035 2036 self.task_complete( 2037 store, 2038 guest_thread.task, 2039 result, 2040 Status::Returned, 2041 post_return_arg, 2042 )?; 2043 } 2044 2045 store.maybe_pop_call_context(guest_thread.task)?; 2046 2047 let state = store.concurrent_state_mut(); 2048 let task = state.get_mut(guest_thread.task)?; 2049 2050 match &task.caller { 2051 Caller::Host { .. } => { 2052 if task.ready_to_delete() { 2053 Waitable::Guest(guest_thread.task).delete_from(state)?; 2054 } 2055 } 2056 Caller::Guest { .. } => { 2057 task.exited = true; 2058 } 2059 } 2060 2061 Ok(None) 2062 }) 2063 }; 2064 2065 store 2066 .0 2067 .concurrent_state_mut() 2068 .push_high_priority(WorkItem::GuestCall(GuestCall { 2069 thread: guest_thread, 2070 kind: GuestCallKind::StartImplicit(fun), 2071 })); 2072 2073 Ok(()) 2074 } 2075 2076 /// Prepare (but do not start) a guest->guest call. 2077 /// 2078 /// This is called from fused adapter code generated in 2079 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` 2080 /// are synthesized Wasm functions which move the parameters from the caller 2081 /// to the callee and the result from the callee to the caller, 2082 /// respectively. The adapter will call `Self::start_call` immediately 2083 /// after calling this function. 2084 /// 2085 /// SAFETY: All the pointer arguments must be valid pointers to guest 2086 /// entities (and with the expected signatures for the function references 2087 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). 2088 unsafe fn prepare_call<T: 'static>( 2089 self, 2090 mut store: StoreContextMut<T>, 2091 start: *mut VMFuncRef, 2092 return_: *mut VMFuncRef, 2093 caller_instance: RuntimeComponentInstanceIndex, 2094 callee_instance: RuntimeComponentInstanceIndex, 2095 task_return_type: TypeTupleIndex, 2096 callee_async: bool, 2097 memory: *mut VMMemoryDefinition, 2098 string_encoding: u8, 2099 caller_info: CallerInfo, 2100 ) -> Result<()> { 2101 self.id().get(store.0).check_may_leave(caller_instance)?; 2102 2103 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { 2104 // A task may only call an async-typed function via a sync lower if 2105 // it was created by a call to an async export. Otherwise, we'll 2106 // trap. 2107 store.0.concurrent_state_mut().check_blocking()?; 2108 } 2109 2110 enum ResultInfo { 2111 Heap { results: u32 }, 2112 Stack { result_count: u32 }, 2113 } 2114 2115 let result_info = match &caller_info { 2116 CallerInfo::Async { 2117 has_result: true, 2118 params, 2119 } => ResultInfo::Heap { 2120 results: params.last().unwrap().get_u32(), 2121 }, 2122 CallerInfo::Async { 2123 has_result: false, .. 2124 } => ResultInfo::Stack { result_count: 0 }, 2125 CallerInfo::Sync { 2126 result_count, 2127 params, 2128 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap { 2129 results: params.last().unwrap().get_u32(), 2130 }, 2131 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { 2132 result_count: *result_count, 2133 }, 2134 }; 2135 2136 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); 2137 2138 // Create a new guest task for the call, closing over the `start` and 2139 // `return_` functions to lift the parameters and lower the result, 2140 // respectively. 2141 let start = SendSyncPtr::new(NonNull::new(start).unwrap()); 2142 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap()); 2143 let token = StoreToken::new(store.as_context_mut()); 2144 let state = store.0.concurrent_state_mut(); 2145 let old_thread = state.guest_thread.take(); 2146 let new_task = GuestTask::new( 2147 state, 2148 Box::new(move |store, dst| { 2149 let mut store = token.as_context_mut(store); 2150 assert!(dst.len() <= MAX_FLAT_PARAMS); 2151 // The `+ 1` here accounts for the return pointer, if any: 2152 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; 2153 let count = match caller_info { 2154 // Async callers, if they have a result, use the last 2155 // parameter as a return pointer so chop that off if 2156 // relevant here. 2157 CallerInfo::Async { params, has_result } => { 2158 let params = ¶ms[..params.len() - usize::from(has_result)]; 2159 for (param, src) in params.iter().zip(&mut src) { 2160 src.write(*param); 2161 } 2162 params.len() 2163 } 2164 2165 // Sync callers forward everything directly. 2166 CallerInfo::Sync { params, .. } => { 2167 for (param, src) in params.iter().zip(&mut src) { 2168 src.write(*param); 2169 } 2170 params.len() 2171 } 2172 }; 2173 // SAFETY: `start` is a valid `*mut VMFuncRef` from 2174 // `wasmtime-cranelift`-generated fused adapter code. Based on 2175 // how it was constructed (see 2176 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` 2177 // for details) we know it takes count parameters and returns 2178 // `dst.len()` results. 2179 unsafe { 2180 crate::Func::call_unchecked_raw( 2181 &mut store, 2182 start.as_non_null(), 2183 NonNull::new( 2184 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _, 2185 ) 2186 .unwrap(), 2187 )?; 2188 } 2189 dst.copy_from_slice(&src[..dst.len()]); 2190 let state = store.0.concurrent_state_mut(); 2191 Waitable::Guest(state.guest_thread.unwrap().task).set_event( 2192 state, 2193 Some(Event::Subtask { 2194 status: Status::Started, 2195 }), 2196 )?; 2197 Ok(()) 2198 }), 2199 LiftResult { 2200 lift: Box::new(move |store, src| { 2201 // SAFETY: See comment in closure passed as `lower_params` 2202 // parameter above. 2203 let mut store = token.as_context_mut(store); 2204 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? 2205 if let ResultInfo::Heap { results } = &result_info { 2206 my_src.push(ValRaw::u32(*results)); 2207 } 2208 // SAFETY: `return_` is a valid `*mut VMFuncRef` from 2209 // `wasmtime-cranelift`-generated fused adapter code. Based 2210 // on how it was constructed (see 2211 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` 2212 // for details) we know it takes `src.len()` parameters and 2213 // returns up to 1 result. 2214 unsafe { 2215 crate::Func::call_unchecked_raw( 2216 &mut store, 2217 return_.as_non_null(), 2218 my_src.as_mut_slice().into(), 2219 )?; 2220 } 2221 let state = store.0.concurrent_state_mut(); 2222 let thread = state.guest_thread.unwrap(); 2223 if sync_caller { 2224 state.get_mut(thread.task)?.sync_result = SyncResult::Produced( 2225 if let ResultInfo::Stack { result_count } = &result_info { 2226 match result_count { 2227 0 => None, 2228 1 => Some(my_src[0]), 2229 _ => unreachable!(), 2230 } 2231 } else { 2232 None 2233 }, 2234 ); 2235 } 2236 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>) 2237 }), 2238 ty: task_return_type, 2239 memory: NonNull::new(memory).map(SendSyncPtr::new), 2240 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(), 2241 }, 2242 Caller::Guest { 2243 thread: old_thread.unwrap(), 2244 instance: caller_instance, 2245 }, 2246 None, 2247 callee_instance, 2248 callee_async, 2249 )?; 2250 2251 let guest_task = state.push(new_task)?; 2252 let new_thread = GuestThread::new_implicit(guest_task); 2253 let guest_thread = state.push(new_thread)?; 2254 state.get_mut(guest_task)?.threads.insert(guest_thread); 2255 2256 let state = store.0.concurrent_state_mut(); 2257 if let Some(old_thread) = old_thread { 2258 if !state.may_enter(guest_task) { 2259 bail!(crate::Trap::CannotEnterComponent); 2260 } 2261 2262 state.get_mut(old_thread.task)?.subtasks.insert(guest_task); 2263 }; 2264 2265 // Make the new thread the current one so that `Self::start_call` knows 2266 // which one to start. 2267 state.guest_thread = Some(QualifiedThreadId { 2268 task: guest_task, 2269 thread: guest_thread, 2270 }); 2271 log::trace!( 2272 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" 2273 ); 2274 2275 Ok(()) 2276 } 2277 2278 /// Call the specified callback function for an async-lifted export. 2279 /// 2280 /// SAFETY: `function` must be a valid reference to a guest function of the 2281 /// correct signature for a callback. 2282 unsafe fn call_callback<T>( 2283 self, 2284 mut store: StoreContextMut<T>, 2285 callee_instance: RuntimeComponentInstanceIndex, 2286 function: SendSyncPtr<VMFuncRef>, 2287 event: Event, 2288 handle: u32, 2289 may_enter_after_call: bool, 2290 ) -> Result<u32> { 2291 let mut flags = self.id().get(store.0).instance_flags(callee_instance); 2292 2293 let (ordinal, result) = event.parts(); 2294 let params = &mut [ 2295 ValRaw::u32(ordinal), 2296 ValRaw::u32(handle), 2297 ValRaw::u32(result), 2298 ]; 2299 // SAFETY: `func` is a valid `*mut VMFuncRef` from either 2300 // `wasmtime-cranelift`-generated fused adapter code or 2301 // `component::Options`. Per `wasmparser` callback signature 2302 // validation, we know it takes three parameters and returns one. 2303 unsafe { 2304 flags.set_may_enter(false); 2305 crate::Func::call_unchecked_raw( 2306 &mut store, 2307 function.as_non_null(), 2308 params.as_mut_slice().into(), 2309 )?; 2310 flags.set_may_enter(may_enter_after_call); 2311 } 2312 Ok(params[0].get_u32()) 2313 } 2314 2315 /// Start a guest->guest call previously prepared using 2316 /// `Self::prepare_call`. 2317 /// 2318 /// This is called from fused adapter code generated in 2319 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call 2320 /// this function immediately after calling `Self::prepare_call`. 2321 /// 2322 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest 2323 /// functions with the appropriate signatures for the current guest task. 2324 /// If this is a call to an async-lowered import, the actual call may be 2325 /// deferred and run after this function returns, in which case the pointer 2326 /// arguments must also be valid when the call happens. 2327 unsafe fn start_call<T: 'static>( 2328 self, 2329 mut store: StoreContextMut<T>, 2330 callback: *mut VMFuncRef, 2331 post_return: *mut VMFuncRef, 2332 callee: *mut VMFuncRef, 2333 param_count: u32, 2334 result_count: u32, 2335 flags: u32, 2336 storage: Option<&mut [MaybeUninit<ValRaw>]>, 2337 ) -> Result<u32> { 2338 let token = StoreToken::new(store.as_context_mut()); 2339 let async_caller = storage.is_none(); 2340 let state = store.0.concurrent_state_mut(); 2341 let guest_thread = state.guest_thread.unwrap(); 2342 let callee_async = state.get_mut(guest_thread.task)?.async_function; 2343 let may_enter_after_call = state 2344 .get_mut(guest_thread.task)? 2345 .call_post_return_automatically(); 2346 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap()); 2347 let param_count = usize::try_from(param_count).unwrap(); 2348 assert!(param_count <= MAX_FLAT_PARAMS); 2349 let result_count = usize::try_from(result_count).unwrap(); 2350 assert!(result_count <= MAX_FLAT_RESULTS); 2351 2352 let task = state.get_mut(guest_thread.task)?; 2353 if !callback.is_null() { 2354 // We're calling an async-lifted export with a callback, so store 2355 // the callback and related context as part of the task so we can 2356 // call it later when needed. 2357 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap()); 2358 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| { 2359 let store = token.as_context_mut(store); 2360 unsafe { 2361 self.call_callback::<T>( 2362 store, 2363 runtime_instance, 2364 callback, 2365 event, 2366 handle, 2367 may_enter_after_call, 2368 ) 2369 } 2370 })); 2371 } 2372 2373 let Caller::Guest { 2374 thread: caller, 2375 instance: runtime_instance, 2376 } = &task.caller 2377 else { 2378 // As of this writing, `start_call` is only used for guest->guest 2379 // calls. 2380 unreachable!() 2381 }; 2382 let caller = *caller; 2383 let caller_instance = *runtime_instance; 2384 2385 let callee_instance = task.instance; 2386 2387 let instance_flags = if callback.is_null() { 2388 None 2389 } else { 2390 Some(self.id().get(store.0).instance_flags(callee_instance)) 2391 }; 2392 2393 // Queue the call as a "high priority" work item. 2394 unsafe { 2395 self.queue_call( 2396 store.as_context_mut(), 2397 guest_thread, 2398 callee, 2399 param_count, 2400 result_count, 2401 instance_flags, 2402 (flags & START_FLAG_ASYNC_CALLEE) != 0, 2403 NonNull::new(callback).map(SendSyncPtr::new), 2404 NonNull::new(post_return).map(SendSyncPtr::new), 2405 )?; 2406 } 2407 2408 let state = store.0.concurrent_state_mut(); 2409 2410 // Use the caller's `GuestTask::sync_call_set` to register interest in 2411 // the subtask... 2412 let guest_waitable = Waitable::Guest(guest_thread.task); 2413 let old_set = guest_waitable.common(state)?.set; 2414 let set = state.get_mut(caller.task)?.sync_call_set; 2415 guest_waitable.join(state, Some(set))?; 2416 2417 // ... and suspend this fiber temporarily while we wait for it to start. 2418 // 2419 // Note that we _could_ call the callee directly using the current fiber 2420 // rather than suspend this one, but that would make reasoning about the 2421 // event loop more complicated and is probably only worth doing if 2422 // there's a measurable performance benefit. In addition, it would mean 2423 // blocking the caller if the callee calls a blocking sync-lowered 2424 // import, and as of this writing the spec says we must not do that. 2425 // 2426 // Alternatively, the fused adapter code could be modified to call the 2427 // callee directly without calling a host-provided intrinsic at all (in 2428 // which case it would need to do its own, inline backpressure checks, 2429 // etc.). Again, we'd want to see a measurable performance benefit 2430 // before committing to such an optimization. And again, we'd need to 2431 // update the spec to allow that. 2432 let (status, waitable) = loop { 2433 store.0.suspend(SuspendReason::Waiting { 2434 set, 2435 thread: caller, 2436 // Normally, `StoreOpaque::suspend` would assert it's being 2437 // called from a context where blocking is allowed. However, if 2438 // `async_caller` is `true`, we'll only "block" long enough for 2439 // the callee to start, i.e. we won't repeat this loop, so we 2440 // tell `suspend` it's okay even if we're not allowed to block. 2441 // Alternatively, if the callee is not an async function, then 2442 // we know it won't block anyway. 2443 skip_may_block_check: async_caller || !callee_async, 2444 })?; 2445 2446 let state = store.0.concurrent_state_mut(); 2447 2448 log::trace!("taking event for {:?}", guest_thread.task); 2449 let event = guest_waitable.take_event(state)?; 2450 let Some(Event::Subtask { status }) = event else { 2451 unreachable!(); 2452 }; 2453 2454 log::trace!("status {status:?} for {:?}", guest_thread.task); 2455 2456 if status == Status::Returned { 2457 // It returned, so we can stop waiting. 2458 break (status, None); 2459 } else if async_caller { 2460 // It hasn't returned yet, but the caller is calling via an 2461 // async-lowered import, so we generate a handle for the task 2462 // waitable and return the status. 2463 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance] 2464 .subtask_insert_guest(guest_thread.task.rep())?; 2465 store 2466 .0 2467 .concurrent_state_mut() 2468 .get_mut(guest_thread.task)? 2469 .common 2470 .handle = Some(handle); 2471 break (status, Some(handle)); 2472 } else { 2473 // The callee hasn't returned yet, and the caller is calling via 2474 // a sync-lowered import, so we loop and keep waiting until the 2475 // callee returns. 2476 } 2477 }; 2478 2479 let state = store.0.concurrent_state_mut(); 2480 2481 guest_waitable.join(state, old_set)?; 2482 2483 if let Some(storage) = storage { 2484 // The caller used a sync-lowered import to call an async-lifted 2485 // export, in which case the result, if any, has been stashed in 2486 // `GuestTask::sync_result`. 2487 let task = state.get_mut(guest_thread.task)?; 2488 if let Some(result) = task.sync_result.take() { 2489 if let Some(result) = result { 2490 storage[0] = MaybeUninit::new(result); 2491 } 2492 2493 if task.exited { 2494 if task.ready_to_delete() { 2495 Waitable::Guest(guest_thread.task).delete_from(state)?; 2496 } 2497 } 2498 } 2499 } 2500 2501 // Reset the current thread to point to the caller as it resumes control. 2502 state.guest_thread = Some(caller); 2503 state.get_mut(caller.thread)?.state = GuestThreadState::Running; 2504 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); 2505 2506 Ok(status.pack(waitable)) 2507 } 2508 2509 /// Poll the specified future once on behalf of a guest->host call using an 2510 /// async-lowered import. 2511 /// 2512 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns 2513 /// `Pending`, add it to the set of futures to be polled as part of this 2514 /// instance's event loop until it completes, and then return 2515 /// `Ok(Some(handle))` where `handle` is the waitable handle to return. 2516 /// 2517 /// Whether the future returns `Ready` immediately or later, the `lower` 2518 /// function will be used to lower the result, if any, into the guest caller's 2519 /// stack and linear memory unless the task has been cancelled. 2520 pub(crate) fn first_poll<T: 'static, R: Send + 'static>( 2521 self, 2522 mut store: StoreContextMut<T>, 2523 future: impl Future<Output = Result<R>> + Send + 'static, 2524 caller_instance: RuntimeComponentInstanceIndex, 2525 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static, 2526 ) -> Result<Option<u32>> { 2527 let token = StoreToken::new(store.as_context_mut()); 2528 let state = store.0.concurrent_state_mut(); 2529 let caller = state.guest_thread.unwrap(); 2530 2531 // Create an abortable future which hooks calls to poll and manages call 2532 // context state for the future. 2533 let (join_handle, future) = JoinHandle::run(async move { 2534 let mut future = pin!(future); 2535 let mut call_context = None; 2536 future::poll_fn(move |cx| { 2537 // Push the call context for managing any resource borrows 2538 // for the task. 2539 tls::get(|store| { 2540 if let Some(call_context) = call_context.take() { 2541 token 2542 .as_context_mut(store) 2543 .0 2544 .component_resource_state() 2545 .0 2546 .push(call_context); 2547 } 2548 }); 2549 2550 let result = future.as_mut().poll(cx); 2551 2552 if result.is_pending() { 2553 // Pop the call context for managing any resource 2554 // borrows for the task. 2555 tls::get(|store| { 2556 call_context = Some( 2557 token 2558 .as_context_mut(store) 2559 .0 2560 .component_resource_state() 2561 .0 2562 .pop() 2563 .unwrap(), 2564 ); 2565 }); 2566 } 2567 result 2568 }) 2569 .await 2570 }); 2571 2572 // We create a new host task even though it might complete immediately 2573 // (in which case we won't need to pass a waitable back to the guest). 2574 // If it does complete immediately, we'll remove it before we return. 2575 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?; 2576 2577 log::trace!("new host task child of {caller:?}: {task:?}"); 2578 2579 let mut future = Box::pin(future); 2580 2581 // Finally, poll the future. We can use a dummy `Waker` here because 2582 // we'll add the future to `ConcurrentState::futures` and poll it 2583 // automatically from the event loop if it doesn't complete immediately 2584 // here. 2585 let poll = tls::set(store.0, || { 2586 future 2587 .as_mut() 2588 .poll(&mut Context::from_waker(&Waker::noop())) 2589 }); 2590 2591 Ok(match poll { 2592 Poll::Ready(None) => unreachable!(), 2593 Poll::Ready(Some(result)) => { 2594 // It finished immediately; lower the result and delete the 2595 // task. 2596 lower(store.as_context_mut(), result?)?; 2597 log::trace!("delete host task {task:?} (already ready)"); 2598 store.0.concurrent_state_mut().delete(task)?; 2599 None 2600 } 2601 Poll::Pending => { 2602 // It hasn't finished yet; add the future to 2603 // `ConcurrentState::futures` so it will be polled by the event 2604 // loop and allocate a waitable handle to return to the guest. 2605 2606 // Wrap the future in a closure responsible for lowering the result into 2607 // the guest's stack and memory, as well as notifying any waiters that 2608 // the task returned. 2609 let future = 2610 Box::pin(async move { 2611 let result = match future.await { 2612 Some(result) => result?, 2613 // Task was cancelled; nothing left to do. 2614 None => return Ok(()), 2615 }; 2616 tls::get(move |store| { 2617 // Here we schedule a task to run on a worker fiber to do 2618 // the lowering since it may involve a call to the guest's 2619 // realloc function. This is necessary because calling the 2620 // guest while there are host embedder frames on the stack 2621 // is unsound. 2622 store.concurrent_state_mut().push_high_priority( 2623 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| { 2624 lower(token.as_context_mut(store), result)?; 2625 let state = store.concurrent_state_mut(); 2626 state.get_mut(task)?.join_handle.take(); 2627 Waitable::Host(task).set_event( 2628 state, 2629 Some(Event::Subtask { 2630 status: Status::Returned, 2631 }), 2632 ) 2633 }))), 2634 ); 2635 Ok(()) 2636 }) 2637 }); 2638 2639 store.0.concurrent_state_mut().push_future(future); 2640 let handle = self.id().get_mut(store.0).guest_tables().0[caller_instance] 2641 .subtask_insert_host(task.rep())?; 2642 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); 2643 log::trace!( 2644 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}" 2645 ); 2646 Some(handle) 2647 } 2648 }) 2649 } 2650 2651 /// Implements the `task.return` intrinsic, lifting the result for the 2652 /// current guest task. 2653 pub(crate) fn task_return( 2654 self, 2655 store: &mut dyn VMStore, 2656 caller: RuntimeComponentInstanceIndex, 2657 ty: TypeTupleIndex, 2658 options: OptionsIndex, 2659 storage: &[ValRaw], 2660 ) -> Result<()> { 2661 self.id().get(store).check_may_leave(caller)?; 2662 let state = store.concurrent_state_mut(); 2663 let guest_thread = state.guest_thread.unwrap(); 2664 let lift = state 2665 .get_mut(guest_thread.task)? 2666 .lift_result 2667 .take() 2668 .ok_or_else(|| { 2669 anyhow!("`task.return` or `task.cancel` called more than once for current task") 2670 })?; 2671 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2672 2673 let CanonicalOptions { 2674 string_encoding, 2675 data_model, 2676 .. 2677 } = &self.id().get(store).component().env_component().options[options]; 2678 2679 let invalid = ty != lift.ty 2680 || string_encoding != &lift.string_encoding 2681 || match data_model { 2682 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { 2683 Some(memory) => { 2684 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); 2685 let actual = self.id().get(store).runtime_memory(memory); 2686 expected != actual.as_ptr() 2687 } 2688 // Memory not specified, meaning it didn't need to be 2689 // specified per validation, so not invalid. 2690 None => false, 2691 }, 2692 // Always invalid as this isn't supported. 2693 CanonicalOptionsDataModel::Gc { .. } => true, 2694 }; 2695 2696 if invalid { 2697 bail!("invalid `task.return` signature and/or options for current task"); 2698 } 2699 2700 log::trace!("task.return for {guest_thread:?}"); 2701 2702 let result = (lift.lift)(store, storage)?; 2703 self.task_complete( 2704 store, 2705 guest_thread.task, 2706 result, 2707 Status::Returned, 2708 ValRaw::i32(0), 2709 ) 2710 } 2711 2712 /// Implements the `task.cancel` intrinsic. 2713 pub(crate) fn task_cancel( 2714 self, 2715 store: &mut StoreOpaque, 2716 caller: RuntimeComponentInstanceIndex, 2717 ) -> Result<()> { 2718 self.id().get(store).check_may_leave(caller)?; 2719 let state = store.concurrent_state_mut(); 2720 let guest_thread = state.guest_thread.unwrap(); 2721 let task = state.get_mut(guest_thread.task)?; 2722 if !task.cancel_sent { 2723 bail!("`task.cancel` called by task which has not been cancelled") 2724 } 2725 _ = task.lift_result.take().ok_or_else(|| { 2726 anyhow!("`task.return` or `task.cancel` called more than once for current task") 2727 })?; 2728 2729 assert!(task.result.is_none()); 2730 2731 log::trace!("task.cancel for {guest_thread:?}"); 2732 2733 self.task_complete( 2734 store, 2735 guest_thread.task, 2736 Box::new(DummyResult), 2737 Status::ReturnCancelled, 2738 ValRaw::i32(0), 2739 ) 2740 } 2741 2742 /// Complete the specified guest task (i.e. indicate that it has either 2743 /// returned a (possibly empty) result or cancelled itself). 2744 /// 2745 /// This will return any resource borrows and notify any current or future 2746 /// waiters that the task has completed. 2747 fn task_complete( 2748 self, 2749 store: &mut StoreOpaque, 2750 guest_task: TableId<GuestTask>, 2751 result: Box<dyn Any + Send + Sync>, 2752 status: Status, 2753 post_return_arg: ValRaw, 2754 ) -> Result<()> { 2755 if store 2756 .concurrent_state_mut() 2757 .get_mut(guest_task)? 2758 .call_post_return_automatically() 2759 { 2760 let (calls, host_table, _, instance) = 2761 store.component_resource_state_with_instance(self); 2762 ResourceTables { 2763 calls, 2764 host_table: Some(host_table), 2765 guest: Some(instance.guest_tables()), 2766 } 2767 .exit_call()?; 2768 } else { 2769 // As of this writing, the only scenario where `call_post_return_automatically` 2770 // would be false for a `GuestTask` is for host-to-guest calls using 2771 // `[Typed]Func::call_async`, in which case the `function_index` 2772 // should be a non-`None` value. 2773 let function_index = store 2774 .concurrent_state_mut() 2775 .get_mut(guest_task)? 2776 .function_index 2777 .unwrap(); 2778 self.id() 2779 .get_mut(store) 2780 .post_return_arg_set(function_index, post_return_arg); 2781 } 2782 2783 let state = store.concurrent_state_mut(); 2784 let task = state.get_mut(guest_task)?; 2785 2786 if let Caller::Host { tx, .. } = &mut task.caller { 2787 if let Some(tx) = tx.take() { 2788 _ = tx.send(result); 2789 } 2790 } else { 2791 task.result = Some(result); 2792 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; 2793 } 2794 2795 Ok(()) 2796 } 2797 2798 /// Implements the `waitable-set.new` intrinsic. 2799 pub(crate) fn waitable_set_new( 2800 self, 2801 store: &mut StoreOpaque, 2802 caller_instance: RuntimeComponentInstanceIndex, 2803 ) -> Result<u32> { 2804 self.id().get_mut(store).check_may_leave(caller_instance)?; 2805 let set = store.concurrent_state_mut().push(WaitableSet::default())?; 2806 let handle = self.id().get_mut(store).guest_tables().0[caller_instance] 2807 .waitable_set_insert(set.rep())?; 2808 log::trace!("new waitable set {set:?} (handle {handle})"); 2809 Ok(handle) 2810 } 2811 2812 /// Implements the `waitable-set.drop` intrinsic. 2813 pub(crate) fn waitable_set_drop( 2814 self, 2815 store: &mut StoreOpaque, 2816 caller_instance: RuntimeComponentInstanceIndex, 2817 set: u32, 2818 ) -> Result<()> { 2819 self.id().get_mut(store).check_may_leave(caller_instance)?; 2820 let rep = 2821 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_remove(set)?; 2822 2823 log::trace!("drop waitable set {rep} (handle {set})"); 2824 2825 let set = store 2826 .concurrent_state_mut() 2827 .delete(TableId::<WaitableSet>::new(rep))?; 2828 2829 if !set.waiting.is_empty() { 2830 bail!("cannot drop waitable set with waiters"); 2831 } 2832 2833 Ok(()) 2834 } 2835 2836 /// Implements the `waitable.join` intrinsic. 2837 pub(crate) fn waitable_join( 2838 self, 2839 store: &mut StoreOpaque, 2840 caller_instance: RuntimeComponentInstanceIndex, 2841 waitable_handle: u32, 2842 set_handle: u32, 2843 ) -> Result<()> { 2844 let mut instance = self.id().get_mut(store); 2845 instance.check_may_leave(caller_instance)?; 2846 let waitable = 2847 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; 2848 2849 let set = if set_handle == 0 { 2850 None 2851 } else { 2852 let set = instance.guest_tables().0[caller_instance].waitable_set_rep(set_handle)?; 2853 2854 Some(TableId::<WaitableSet>::new(set)) 2855 }; 2856 2857 log::trace!( 2858 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", 2859 ); 2860 2861 waitable.join(store.concurrent_state_mut(), set) 2862 } 2863 2864 /// Implements the `subtask.drop` intrinsic. 2865 pub(crate) fn subtask_drop( 2866 self, 2867 store: &mut StoreOpaque, 2868 caller_instance: RuntimeComponentInstanceIndex, 2869 task_id: u32, 2870 ) -> Result<()> { 2871 self.id().get_mut(store).check_may_leave(caller_instance)?; 2872 self.waitable_join(store, caller_instance, task_id, 0)?; 2873 2874 let (rep, is_host) = 2875 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_remove(task_id)?; 2876 2877 let concurrent_state = store.concurrent_state_mut(); 2878 let (waitable, expected_caller_instance, delete) = if is_host { 2879 let id = TableId::<HostTask>::new(rep); 2880 let task = concurrent_state.get_mut(id)?; 2881 if task.join_handle.is_some() { 2882 bail!("cannot drop a subtask which has not yet resolved"); 2883 } 2884 (Waitable::Host(id), task.caller_instance, true) 2885 } else { 2886 let id = TableId::<GuestTask>::new(rep); 2887 let task = concurrent_state.get_mut(id)?; 2888 if task.lift_result.is_some() { 2889 bail!("cannot drop a subtask which has not yet resolved"); 2890 } 2891 if let Caller::Guest { instance, .. } = &task.caller { 2892 (Waitable::Guest(id), *instance, task.exited) 2893 } else { 2894 unreachable!() 2895 } 2896 }; 2897 2898 waitable.common(concurrent_state)?.handle = None; 2899 2900 if waitable.take_event(concurrent_state)?.is_some() { 2901 bail!("cannot drop a subtask with an undelivered event"); 2902 } 2903 2904 if delete { 2905 waitable.delete_from(concurrent_state)?; 2906 } 2907 2908 // Since waitables can neither be passed between instances nor forged, 2909 // this should never fail unless there's a bug in Wasmtime, but we check 2910 // here to be sure: 2911 assert_eq!(expected_caller_instance, caller_instance); 2912 log::trace!("subtask_drop {waitable:?} (handle {task_id})"); 2913 Ok(()) 2914 } 2915 2916 /// Implements the `waitable-set.wait` intrinsic. 2917 pub(crate) fn waitable_set_wait( 2918 self, 2919 store: &mut StoreOpaque, 2920 caller: RuntimeComponentInstanceIndex, 2921 options: OptionsIndex, 2922 set: u32, 2923 payload: u32, 2924 ) -> Result<u32> { 2925 self.id().get(store).check_may_leave(caller)?; 2926 2927 if !self.options(store, options).async_ { 2928 // The caller may only call `waitable-set.wait` from an async task 2929 // (i.e. a task created via a call to an async export). 2930 // Otherwise, we'll trap. 2931 store.concurrent_state_mut().check_blocking()?; 2932 } 2933 2934 let &CanonicalOptions { 2935 cancellable, 2936 instance: caller_instance, 2937 .. 2938 } = &self.id().get(store).component().env_component().options[options]; 2939 let rep = 2940 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?; 2941 2942 self.waitable_check( 2943 store, 2944 cancellable, 2945 WaitableCheck::Wait(WaitableCheckParams { 2946 set: TableId::new(rep), 2947 options, 2948 payload, 2949 }), 2950 ) 2951 } 2952 2953 /// Implements the `waitable-set.poll` intrinsic. 2954 pub(crate) fn waitable_set_poll( 2955 self, 2956 store: &mut StoreOpaque, 2957 caller: RuntimeComponentInstanceIndex, 2958 options: OptionsIndex, 2959 set: u32, 2960 payload: u32, 2961 ) -> Result<u32> { 2962 self.id().get(store).check_may_leave(caller)?; 2963 2964 if !self.options(store, options).async_ { 2965 // The caller may only call `waitable-set.poll` from an async task 2966 // (i.e. a task created via a call to an async export). 2967 // Otherwise, we'll trap. 2968 store.concurrent_state_mut().check_blocking()?; 2969 } 2970 2971 let &CanonicalOptions { 2972 cancellable, 2973 instance: caller_instance, 2974 .. 2975 } = &self.id().get(store).component().env_component().options[options]; 2976 let rep = 2977 self.id().get_mut(store).guest_tables().0[caller_instance].waitable_set_rep(set)?; 2978 2979 self.waitable_check( 2980 store, 2981 cancellable, 2982 WaitableCheck::Poll(WaitableCheckParams { 2983 set: TableId::new(rep), 2984 options, 2985 payload, 2986 }), 2987 ) 2988 } 2989 2990 /// Implements the `thread.index` intrinsic. 2991 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> { 2992 let thread_id = store 2993 .concurrent_state_mut() 2994 .guest_thread 2995 .ok_or_else(|| anyhow!("no current thread"))? 2996 .thread; 2997 // The unwrap is safe because `instance_rep` must be `Some` by this point 2998 Ok(store 2999 .concurrent_state_mut() 3000 .get_mut(thread_id)? 3001 .instance_rep 3002 .unwrap()) 3003 } 3004 3005 /// Implements the `thread.new-indirect` intrinsic. 3006 pub(crate) fn thread_new_indirect<T: 'static>( 3007 self, 3008 mut store: StoreContextMut<T>, 3009 runtime_instance: RuntimeComponentInstanceIndex, 3010 _func_ty_idx: TypeFuncIndex, // currently unused 3011 start_func_table_idx: RuntimeTableIndex, 3012 start_func_idx: u32, 3013 context: i32, 3014 ) -> Result<u32> { 3015 self.id().get(store.0).check_may_leave(runtime_instance)?; 3016 3017 log::trace!("creating new thread"); 3018 3019 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); 3020 let (instance, registry) = self.id().get_mut_and_registry(store.0); 3021 let callee = instance 3022 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? 3023 .ok_or_else(|| { 3024 anyhow!("the start function index points to an uninitialized function") 3025 })?; 3026 if callee.type_index(store.0) != start_func_ty.type_index() { 3027 bail!( 3028 "start function does not match expected type (currently only `(i32) -> ()` is supported)" 3029 ); 3030 } 3031 3032 let token = StoreToken::new(store.as_context_mut()); 3033 let start_func = Box::new( 3034 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { 3035 let old_thread = store 3036 .concurrent_state_mut() 3037 .guest_thread 3038 .replace(guest_thread); 3039 log::trace!( 3040 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" 3041 ); 3042 3043 store.maybe_push_call_context(guest_thread.task)?; 3044 3045 let mut store = token.as_context_mut(store); 3046 let mut params = [ValRaw::i32(context)]; 3047 // Use call_unchecked rather than call or call_async, as we don't want to run the function 3048 // on a separate fiber if we're running in an async store. 3049 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; 3050 3051 store.0.maybe_pop_call_context(guest_thread.task)?; 3052 3053 self.cleanup_thread(store.0, guest_thread, runtime_instance)?; 3054 log::trace!("explicit thread {guest_thread:?} completed"); 3055 let state = store.0.concurrent_state_mut(); 3056 let task = state.get_mut(guest_thread.task)?; 3057 if task.threads.is_empty() && !task.returned_or_cancelled() { 3058 bail!(Trap::NoAsyncResult); 3059 } 3060 state.guest_thread = old_thread; 3061 old_thread 3062 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 3063 if state.get_mut(guest_thread.task)?.ready_to_delete() { 3064 Waitable::Guest(guest_thread.task).delete_from(state)?; 3065 } 3066 log::trace!("thread start: restored {old_thread:?} as current thread"); 3067 3068 Ok(()) 3069 }, 3070 ); 3071 3072 let state = store.0.concurrent_state_mut(); 3073 let current_thread = state.guest_thread.unwrap(); 3074 let parent_task = current_thread.task; 3075 3076 let new_thread = GuestThread::new_explicit(parent_task, start_func); 3077 let thread_id = state.push(new_thread)?; 3078 state.get_mut(parent_task)?.threads.insert(thread_id); 3079 3080 log::trace!("new thread with id {thread_id:?} created"); 3081 3082 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) 3083 } 3084 3085 pub(crate) fn resume_suspended_thread( 3086 self, 3087 store: &mut StoreOpaque, 3088 runtime_instance: RuntimeComponentInstanceIndex, 3089 thread_idx: u32, 3090 high_priority: bool, 3091 ) -> Result<()> { 3092 let thread_id = 3093 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; 3094 let state = store.concurrent_state_mut(); 3095 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; 3096 let thread = state.get_mut(guest_thread.thread)?; 3097 3098 match mem::replace(&mut thread.state, GuestThreadState::Running) { 3099 GuestThreadState::NotStartedExplicit(start_func) => { 3100 log::trace!("starting thread {guest_thread:?}"); 3101 let guest_call = WorkItem::GuestCall(GuestCall { 3102 thread: guest_thread, 3103 kind: GuestCallKind::StartExplicit(Box::new(move |store| { 3104 start_func(store, guest_thread) 3105 })), 3106 }); 3107 store 3108 .concurrent_state_mut() 3109 .push_work_item(guest_call, high_priority); 3110 } 3111 GuestThreadState::Suspended(fiber) => { 3112 log::trace!("resuming thread {thread_id:?} that was suspended"); 3113 store 3114 .concurrent_state_mut() 3115 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); 3116 } 3117 _ => { 3118 bail!("cannot resume thread which is not suspended"); 3119 } 3120 } 3121 Ok(()) 3122 } 3123 3124 fn add_guest_thread_to_instance_table( 3125 self, 3126 thread_id: TableId<GuestThread>, 3127 store: &mut StoreOpaque, 3128 runtime_instance: RuntimeComponentInstanceIndex, 3129 ) -> Result<u32> { 3130 let guest_id = self.id().get_mut(store).guest_tables().0[runtime_instance] 3131 .guest_thread_insert(thread_id.rep())?; 3132 store 3133 .concurrent_state_mut() 3134 .get_mut(thread_id)? 3135 .instance_rep = Some(guest_id); 3136 Ok(guest_id) 3137 } 3138 3139 /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`, 3140 /// and `thread.switch-to` intrinsics. 3141 pub(crate) fn suspension_intrinsic( 3142 self, 3143 store: &mut StoreOpaque, 3144 caller: RuntimeComponentInstanceIndex, 3145 cancellable: bool, 3146 yielding: bool, 3147 to_thread: Option<u32>, 3148 ) -> Result<WaitResult> { 3149 self.id().get(store).check_may_leave(caller)?; 3150 3151 if to_thread.is_none() { 3152 let state = store.concurrent_state_mut(); 3153 if yielding { 3154 // This is a `thread.yield` call 3155 if !state.may_block(state.guest_thread.unwrap().task) { 3156 // The spec defines `thread.yield` to be a no-op in a 3157 // non-blocking context, so we return immediately without giving 3158 // any other thread a chance to run. 3159 return Ok(WaitResult::Completed); 3160 } 3161 } else { 3162 // The caller may only call `thread.suspend` from an async task 3163 // (i.e. a task created via a call to an async export). 3164 // Otherwise, we'll trap. 3165 state.check_blocking()?; 3166 } 3167 } 3168 3169 // There could be a pending cancellation from a previous uncancellable wait 3170 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3171 return Ok(WaitResult::Cancelled); 3172 } 3173 3174 if let Some(thread) = to_thread { 3175 self.resume_suspended_thread(store, caller, thread, true)?; 3176 } 3177 3178 let state = store.concurrent_state_mut(); 3179 let guest_thread = state.guest_thread.unwrap(); 3180 let reason = if yielding { 3181 SuspendReason::Yielding { 3182 thread: guest_thread, 3183 } 3184 } else { 3185 SuspendReason::ExplicitlySuspending { 3186 thread: guest_thread, 3187 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3188 // we're handling a `thread.switch-to` call; otherwise it would 3189 // panic if we called it in a non-blocking context. 3190 skip_may_block_check: to_thread.is_some(), 3191 } 3192 }; 3193 3194 store.suspend(reason)?; 3195 3196 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3197 Ok(WaitResult::Cancelled) 3198 } else { 3199 Ok(WaitResult::Completed) 3200 } 3201 } 3202 3203 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. 3204 fn waitable_check( 3205 self, 3206 store: &mut StoreOpaque, 3207 cancellable: bool, 3208 check: WaitableCheck, 3209 ) -> Result<u32> { 3210 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap(); 3211 3212 let (wait, set) = match &check { 3213 WaitableCheck::Wait(params) => (true, Some(params.set)), 3214 WaitableCheck::Poll(params) => (false, Some(params.set)), 3215 }; 3216 3217 log::trace!("waitable check for {guest_thread:?}; set {set:?}"); 3218 // First, suspend this fiber, allowing any other threads to run. 3219 store.suspend(SuspendReason::Yielding { 3220 thread: guest_thread, 3221 })?; 3222 3223 log::trace!("waitable check for {guest_thread:?}; set {set:?}"); 3224 3225 let state = store.concurrent_state_mut(); 3226 let task = state.get_mut(guest_thread.task)?; 3227 3228 // If we're waiting, and there are no events immediately available, 3229 // suspend the fiber until that changes. 3230 if wait { 3231 let set = set.unwrap(); 3232 3233 if (task.event.is_none() 3234 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) 3235 && state.get_mut(set)?.ready.is_empty() 3236 { 3237 if cancellable { 3238 let old = state 3239 .get_mut(guest_thread.thread)? 3240 .wake_on_cancel 3241 .replace(set); 3242 assert!(old.is_none()); 3243 } 3244 3245 store.suspend(SuspendReason::Waiting { 3246 set, 3247 thread: guest_thread, 3248 skip_may_block_check: false, 3249 })?; 3250 } 3251 } 3252 3253 log::trace!("waitable check for {guest_thread:?}; set {set:?}, part two"); 3254 3255 let result = match check { 3256 // Deliver any pending events to the guest and return. 3257 WaitableCheck::Wait(params) | WaitableCheck::Poll(params) => { 3258 let event = 3259 self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; 3260 3261 let (ordinal, handle, result) = if wait { 3262 let (event, waitable) = event.unwrap(); 3263 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3264 let (ordinal, result) = event.parts(); 3265 (ordinal, handle, result) 3266 } else { 3267 if let Some((event, waitable)) = event { 3268 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3269 let (ordinal, result) = event.parts(); 3270 (ordinal, handle, result) 3271 } else { 3272 log::trace!( 3273 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", 3274 guest_thread.task, 3275 params.set 3276 ); 3277 let (ordinal, result) = Event::None.parts(); 3278 (ordinal, 0, result) 3279 } 3280 }; 3281 let memory = self.options_memory_mut(store, params.options); 3282 let ptr = 3283 func::validate_inbounds::<(u32, u32)>(memory, &ValRaw::u32(params.payload))?; 3284 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); 3285 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); 3286 Ok(ordinal) 3287 } 3288 }; 3289 3290 result 3291 } 3292 3293 /// Implements the `subtask.cancel` intrinsic. 3294 pub(crate) fn subtask_cancel( 3295 self, 3296 store: &mut StoreOpaque, 3297 caller_instance: RuntimeComponentInstanceIndex, 3298 async_: bool, 3299 task_id: u32, 3300 ) -> Result<u32> { 3301 self.id().get(store).check_may_leave(caller_instance)?; 3302 3303 if !async_ { 3304 // The caller may only sync call `subtask.cancel` from an async task 3305 // (i.e. a task created via a call to an async export). Otherwise, 3306 // we'll trap. 3307 store.concurrent_state_mut().check_blocking()?; 3308 } 3309 3310 let (rep, is_host) = 3311 self.id().get_mut(store).guest_tables().0[caller_instance].subtask_rep(task_id)?; 3312 let (waitable, expected_caller_instance) = if is_host { 3313 let id = TableId::<HostTask>::new(rep); 3314 ( 3315 Waitable::Host(id), 3316 store.concurrent_state_mut().get_mut(id)?.caller_instance, 3317 ) 3318 } else { 3319 let id = TableId::<GuestTask>::new(rep); 3320 if let Caller::Guest { instance, .. } = 3321 &store.concurrent_state_mut().get_mut(id)?.caller 3322 { 3323 (Waitable::Guest(id), *instance) 3324 } else { 3325 unreachable!() 3326 } 3327 }; 3328 // Since waitables can neither be passed between instances nor forged, 3329 // this should never fail unless there's a bug in Wasmtime, but we check 3330 // here to be sure: 3331 assert_eq!(expected_caller_instance, caller_instance); 3332 3333 log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); 3334 3335 let concurrent_state = store.concurrent_state_mut(); 3336 if let Waitable::Host(host_task) = waitable { 3337 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() { 3338 handle.abort(); 3339 return Ok(Status::ReturnCancelled as u32); 3340 } 3341 } else { 3342 let caller = concurrent_state.guest_thread.unwrap(); 3343 let guest_task = TableId::<GuestTask>::new(rep); 3344 let task = concurrent_state.get_mut(guest_task)?; 3345 if !task.already_lowered_parameters() { 3346 task.lower_params = None; 3347 task.lift_result = None; 3348 3349 // Not yet started; cancel and remove from pending 3350 let callee_instance = task.instance; 3351 3352 let pending = &mut concurrent_state.instance_state(callee_instance).pending; 3353 let pending_count = pending.len(); 3354 pending.retain(|thread, _| thread.task != guest_task); 3355 // If there were no pending threads for this task, we're in an error state 3356 if pending.len() == pending_count { 3357 bail!("`subtask.cancel` called after terminal status delivered"); 3358 } 3359 return Ok(Status::StartCancelled as u32); 3360 } else if !task.returned_or_cancelled() { 3361 // Started, but not yet returned or cancelled; send the 3362 // `CANCELLED` event 3363 task.cancel_sent = true; 3364 // Note that this might overwrite an event that was set earlier 3365 // (e.g. `Event::None` if the task is yielding, or 3366 // `Event::Cancelled` if it was already cancelled), but that's 3367 // okay -- this should supersede the previous state. 3368 task.event = Some(Event::Cancelled); 3369 for thread in task.threads.clone() { 3370 let thread = QualifiedThreadId { 3371 task: guest_task, 3372 thread, 3373 }; 3374 if let Some(set) = concurrent_state 3375 .get_mut(thread.thread) 3376 .unwrap() 3377 .wake_on_cancel 3378 .take() 3379 { 3380 let item = match concurrent_state 3381 .get_mut(set)? 3382 .waiting 3383 .remove(&thread) 3384 .unwrap() 3385 { 3386 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 3387 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { 3388 thread, 3389 kind: GuestCallKind::DeliverEvent { 3390 instance, 3391 set: None, 3392 }, 3393 }), 3394 }; 3395 concurrent_state.push_high_priority(item); 3396 3397 store.suspend(SuspendReason::Yielding { thread: caller })?; 3398 break; 3399 } 3400 } 3401 3402 let concurrent_state = store.concurrent_state_mut(); 3403 let task = concurrent_state.get_mut(guest_task)?; 3404 if !task.returned_or_cancelled() { 3405 if async_ { 3406 return Ok(BLOCKED); 3407 } else { 3408 store.wait_for_event(Waitable::Guest(guest_task))?; 3409 } 3410 } 3411 } 3412 } 3413 3414 let event = waitable.take_event(store.concurrent_state_mut())?; 3415 if let Some(Event::Subtask { 3416 status: status @ (Status::Returned | Status::ReturnCancelled), 3417 }) = event 3418 { 3419 Ok(status as u32) 3420 } else { 3421 bail!("`subtask.cancel` called after terminal status delivered"); 3422 } 3423 } 3424 3425 pub(crate) fn context_get( 3426 self, 3427 store: &mut StoreOpaque, 3428 caller: RuntimeComponentInstanceIndex, 3429 slot: u32, 3430 ) -> Result<u32> { 3431 self.id().get(store).check_may_leave(caller)?; 3432 store.concurrent_state_mut().context_get(slot) 3433 } 3434 3435 pub(crate) fn context_set( 3436 self, 3437 store: &mut StoreOpaque, 3438 caller: RuntimeComponentInstanceIndex, 3439 slot: u32, 3440 value: u32, 3441 ) -> Result<()> { 3442 self.id().get(store).check_may_leave(caller)?; 3443 store.concurrent_state_mut().context_set(slot, value) 3444 } 3445 } 3446 3447 /// Trait representing component model ABI async intrinsics and fused adapter 3448 /// helper functions. 3449 /// 3450 /// SAFETY (callers): Most of the methods in this trait accept raw pointers, 3451 /// which must be valid for at least the duration of the call (and possibly for 3452 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` 3453 /// pointers used for async calls). 3454 pub trait VMComponentAsyncStore { 3455 /// A helper function for fused adapter modules involving calls where the 3456 /// one of the caller or callee is async. 3457 /// 3458 /// This helper is not used when the caller and callee both use the sync 3459 /// ABI, only when at least one is async is this used. 3460 unsafe fn prepare_call( 3461 &mut self, 3462 instance: Instance, 3463 memory: *mut VMMemoryDefinition, 3464 start: *mut VMFuncRef, 3465 return_: *mut VMFuncRef, 3466 caller_instance: RuntimeComponentInstanceIndex, 3467 callee_instance: RuntimeComponentInstanceIndex, 3468 task_return_type: TypeTupleIndex, 3469 callee_async: bool, 3470 string_encoding: u8, 3471 result_count: u32, 3472 storage: *mut ValRaw, 3473 storage_len: usize, 3474 ) -> Result<()>; 3475 3476 /// A helper function for fused adapter modules involving calls where the 3477 /// caller is sync-lowered but the callee is async-lifted. 3478 unsafe fn sync_start( 3479 &mut self, 3480 instance: Instance, 3481 callback: *mut VMFuncRef, 3482 callee: *mut VMFuncRef, 3483 param_count: u32, 3484 storage: *mut MaybeUninit<ValRaw>, 3485 storage_len: usize, 3486 ) -> Result<()>; 3487 3488 /// A helper function for fused adapter modules involving calls where the 3489 /// caller is async-lowered. 3490 unsafe fn async_start( 3491 &mut self, 3492 instance: Instance, 3493 callback: *mut VMFuncRef, 3494 post_return: *mut VMFuncRef, 3495 callee: *mut VMFuncRef, 3496 param_count: u32, 3497 result_count: u32, 3498 flags: u32, 3499 ) -> Result<u32>; 3500 3501 /// The `future.write` intrinsic. 3502 fn future_write( 3503 &mut self, 3504 instance: Instance, 3505 caller: RuntimeComponentInstanceIndex, 3506 ty: TypeFutureTableIndex, 3507 options: OptionsIndex, 3508 future: u32, 3509 address: u32, 3510 ) -> Result<u32>; 3511 3512 /// The `future.read` intrinsic. 3513 fn future_read( 3514 &mut self, 3515 instance: Instance, 3516 caller: RuntimeComponentInstanceIndex, 3517 ty: TypeFutureTableIndex, 3518 options: OptionsIndex, 3519 future: u32, 3520 address: u32, 3521 ) -> Result<u32>; 3522 3523 /// The `future.drop-writable` intrinsic. 3524 fn future_drop_writable( 3525 &mut self, 3526 instance: Instance, 3527 caller: RuntimeComponentInstanceIndex, 3528 ty: TypeFutureTableIndex, 3529 writer: u32, 3530 ) -> Result<()>; 3531 3532 /// The `stream.write` intrinsic. 3533 fn stream_write( 3534 &mut self, 3535 instance: Instance, 3536 caller: RuntimeComponentInstanceIndex, 3537 ty: TypeStreamTableIndex, 3538 options: OptionsIndex, 3539 stream: u32, 3540 address: u32, 3541 count: u32, 3542 ) -> Result<u32>; 3543 3544 /// The `stream.read` intrinsic. 3545 fn stream_read( 3546 &mut self, 3547 instance: Instance, 3548 caller: RuntimeComponentInstanceIndex, 3549 ty: TypeStreamTableIndex, 3550 options: OptionsIndex, 3551 stream: u32, 3552 address: u32, 3553 count: u32, 3554 ) -> Result<u32>; 3555 3556 /// The "fast-path" implementation of the `stream.write` intrinsic for 3557 /// "flat" (i.e. memcpy-able) payloads. 3558 fn flat_stream_write( 3559 &mut self, 3560 instance: Instance, 3561 caller: RuntimeComponentInstanceIndex, 3562 ty: TypeStreamTableIndex, 3563 options: OptionsIndex, 3564 payload_size: u32, 3565 payload_align: u32, 3566 stream: u32, 3567 address: u32, 3568 count: u32, 3569 ) -> Result<u32>; 3570 3571 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" 3572 /// (i.e. memcpy-able) payloads. 3573 fn flat_stream_read( 3574 &mut self, 3575 instance: Instance, 3576 caller: RuntimeComponentInstanceIndex, 3577 ty: TypeStreamTableIndex, 3578 options: OptionsIndex, 3579 payload_size: u32, 3580 payload_align: u32, 3581 stream: u32, 3582 address: u32, 3583 count: u32, 3584 ) -> Result<u32>; 3585 3586 /// The `stream.drop-writable` intrinsic. 3587 fn stream_drop_writable( 3588 &mut self, 3589 instance: Instance, 3590 caller: RuntimeComponentInstanceIndex, 3591 ty: TypeStreamTableIndex, 3592 writer: u32, 3593 ) -> Result<()>; 3594 3595 /// The `error-context.debug-message` intrinsic. 3596 fn error_context_debug_message( 3597 &mut self, 3598 instance: Instance, 3599 caller: RuntimeComponentInstanceIndex, 3600 ty: TypeComponentLocalErrorContextTableIndex, 3601 options: OptionsIndex, 3602 err_ctx_handle: u32, 3603 debug_msg_address: u32, 3604 ) -> Result<()>; 3605 3606 /// The `thread.new-indirect` intrinsic 3607 fn thread_new_indirect( 3608 &mut self, 3609 instance: Instance, 3610 caller: RuntimeComponentInstanceIndex, 3611 func_ty_idx: TypeFuncIndex, 3612 start_func_table_idx: RuntimeTableIndex, 3613 start_func_idx: u32, 3614 context: i32, 3615 ) -> Result<u32>; 3616 } 3617 3618 /// SAFETY: See trait docs. 3619 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> { 3620 unsafe fn prepare_call( 3621 &mut self, 3622 instance: Instance, 3623 memory: *mut VMMemoryDefinition, 3624 start: *mut VMFuncRef, 3625 return_: *mut VMFuncRef, 3626 caller_instance: RuntimeComponentInstanceIndex, 3627 callee_instance: RuntimeComponentInstanceIndex, 3628 task_return_type: TypeTupleIndex, 3629 callee_async: bool, 3630 string_encoding: u8, 3631 result_count_or_max_if_async: u32, 3632 storage: *mut ValRaw, 3633 storage_len: usize, 3634 ) -> Result<()> { 3635 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3636 // this method will have ensured that `storage` is a valid 3637 // pointer containing at least `storage_len` items. 3638 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); 3639 3640 unsafe { 3641 instance.prepare_call( 3642 StoreContextMut(self), 3643 start, 3644 return_, 3645 caller_instance, 3646 callee_instance, 3647 task_return_type, 3648 callee_async, 3649 memory, 3650 string_encoding, 3651 match result_count_or_max_if_async { 3652 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { 3653 params, 3654 has_result: false, 3655 }, 3656 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { 3657 params, 3658 has_result: true, 3659 }, 3660 result_count => CallerInfo::Sync { 3661 params, 3662 result_count, 3663 }, 3664 }, 3665 ) 3666 } 3667 } 3668 3669 unsafe fn sync_start( 3670 &mut self, 3671 instance: Instance, 3672 callback: *mut VMFuncRef, 3673 callee: *mut VMFuncRef, 3674 param_count: u32, 3675 storage: *mut MaybeUninit<ValRaw>, 3676 storage_len: usize, 3677 ) -> Result<()> { 3678 unsafe { 3679 instance 3680 .start_call( 3681 StoreContextMut(self), 3682 callback, 3683 ptr::null_mut(), 3684 callee, 3685 param_count, 3686 1, 3687 START_FLAG_ASYNC_CALLEE, 3688 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3689 // this method will have ensured that `storage` is a valid 3690 // pointer containing at least `storage_len` items. 3691 Some(std::slice::from_raw_parts_mut(storage, storage_len)), 3692 ) 3693 .map(drop) 3694 } 3695 } 3696 3697 unsafe fn async_start( 3698 &mut self, 3699 instance: Instance, 3700 callback: *mut VMFuncRef, 3701 post_return: *mut VMFuncRef, 3702 callee: *mut VMFuncRef, 3703 param_count: u32, 3704 result_count: u32, 3705 flags: u32, 3706 ) -> Result<u32> { 3707 unsafe { 3708 instance.start_call( 3709 StoreContextMut(self), 3710 callback, 3711 post_return, 3712 callee, 3713 param_count, 3714 result_count, 3715 flags, 3716 None, 3717 ) 3718 } 3719 } 3720 3721 fn future_write( 3722 &mut self, 3723 instance: Instance, 3724 caller: RuntimeComponentInstanceIndex, 3725 ty: TypeFutureTableIndex, 3726 options: OptionsIndex, 3727 future: u32, 3728 address: u32, 3729 ) -> Result<u32> { 3730 instance.id().get(self).check_may_leave(caller)?; 3731 instance 3732 .guest_write( 3733 StoreContextMut(self), 3734 caller, 3735 TransmitIndex::Future(ty), 3736 options, 3737 None, 3738 future, 3739 address, 3740 1, 3741 ) 3742 .map(|result| result.encode()) 3743 } 3744 3745 fn future_read( 3746 &mut self, 3747 instance: Instance, 3748 caller: RuntimeComponentInstanceIndex, 3749 ty: TypeFutureTableIndex, 3750 options: OptionsIndex, 3751 future: u32, 3752 address: u32, 3753 ) -> Result<u32> { 3754 instance.id().get(self).check_may_leave(caller)?; 3755 instance 3756 .guest_read( 3757 StoreContextMut(self), 3758 caller, 3759 TransmitIndex::Future(ty), 3760 options, 3761 None, 3762 future, 3763 address, 3764 1, 3765 ) 3766 .map(|result| result.encode()) 3767 } 3768 3769 fn stream_write( 3770 &mut self, 3771 instance: Instance, 3772 caller: RuntimeComponentInstanceIndex, 3773 ty: TypeStreamTableIndex, 3774 options: OptionsIndex, 3775 stream: u32, 3776 address: u32, 3777 count: u32, 3778 ) -> Result<u32> { 3779 instance.id().get(self).check_may_leave(caller)?; 3780 instance 3781 .guest_write( 3782 StoreContextMut(self), 3783 caller, 3784 TransmitIndex::Stream(ty), 3785 options, 3786 None, 3787 stream, 3788 address, 3789 count, 3790 ) 3791 .map(|result| result.encode()) 3792 } 3793 3794 fn stream_read( 3795 &mut self, 3796 instance: Instance, 3797 caller: RuntimeComponentInstanceIndex, 3798 ty: TypeStreamTableIndex, 3799 options: OptionsIndex, 3800 stream: u32, 3801 address: u32, 3802 count: u32, 3803 ) -> Result<u32> { 3804 instance.id().get(self).check_may_leave(caller)?; 3805 instance 3806 .guest_read( 3807 StoreContextMut(self), 3808 caller, 3809 TransmitIndex::Stream(ty), 3810 options, 3811 None, 3812 stream, 3813 address, 3814 count, 3815 ) 3816 .map(|result| result.encode()) 3817 } 3818 3819 fn future_drop_writable( 3820 &mut self, 3821 instance: Instance, 3822 caller: RuntimeComponentInstanceIndex, 3823 ty: TypeFutureTableIndex, 3824 writer: u32, 3825 ) -> Result<()> { 3826 instance.id().get(self).check_may_leave(caller)?; 3827 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) 3828 } 3829 3830 fn flat_stream_write( 3831 &mut self, 3832 instance: Instance, 3833 caller: RuntimeComponentInstanceIndex, 3834 ty: TypeStreamTableIndex, 3835 options: OptionsIndex, 3836 payload_size: u32, 3837 payload_align: u32, 3838 stream: u32, 3839 address: u32, 3840 count: u32, 3841 ) -> Result<u32> { 3842 instance.id().get(self).check_may_leave(caller)?; 3843 instance 3844 .guest_write( 3845 StoreContextMut(self), 3846 caller, 3847 TransmitIndex::Stream(ty), 3848 options, 3849 Some(FlatAbi { 3850 size: payload_size, 3851 align: payload_align, 3852 }), 3853 stream, 3854 address, 3855 count, 3856 ) 3857 .map(|result| result.encode()) 3858 } 3859 3860 fn flat_stream_read( 3861 &mut self, 3862 instance: Instance, 3863 caller: RuntimeComponentInstanceIndex, 3864 ty: TypeStreamTableIndex, 3865 options: OptionsIndex, 3866 payload_size: u32, 3867 payload_align: u32, 3868 stream: u32, 3869 address: u32, 3870 count: u32, 3871 ) -> Result<u32> { 3872 instance.id().get(self).check_may_leave(caller)?; 3873 instance 3874 .guest_read( 3875 StoreContextMut(self), 3876 caller, 3877 TransmitIndex::Stream(ty), 3878 options, 3879 Some(FlatAbi { 3880 size: payload_size, 3881 align: payload_align, 3882 }), 3883 stream, 3884 address, 3885 count, 3886 ) 3887 .map(|result| result.encode()) 3888 } 3889 3890 fn stream_drop_writable( 3891 &mut self, 3892 instance: Instance, 3893 caller: RuntimeComponentInstanceIndex, 3894 ty: TypeStreamTableIndex, 3895 writer: u32, 3896 ) -> Result<()> { 3897 instance.id().get(self).check_may_leave(caller)?; 3898 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) 3899 } 3900 3901 fn error_context_debug_message( 3902 &mut self, 3903 instance: Instance, 3904 caller: RuntimeComponentInstanceIndex, 3905 ty: TypeComponentLocalErrorContextTableIndex, 3906 options: OptionsIndex, 3907 err_ctx_handle: u32, 3908 debug_msg_address: u32, 3909 ) -> Result<()> { 3910 instance.id().get(self).check_may_leave(caller)?; 3911 instance.error_context_debug_message( 3912 StoreContextMut(self), 3913 ty, 3914 options, 3915 err_ctx_handle, 3916 debug_msg_address, 3917 ) 3918 } 3919 3920 fn thread_new_indirect( 3921 &mut self, 3922 instance: Instance, 3923 caller: RuntimeComponentInstanceIndex, 3924 func_ty_idx: TypeFuncIndex, 3925 start_func_table_idx: RuntimeTableIndex, 3926 start_func_idx: u32, 3927 context: i32, 3928 ) -> Result<u32> { 3929 instance.thread_new_indirect( 3930 StoreContextMut(self), 3931 caller, 3932 func_ty_idx, 3933 start_func_table_idx, 3934 start_func_idx, 3935 context, 3936 ) 3937 } 3938 } 3939 3940 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>; 3941 3942 /// Represents the state of a pending host task. 3943 struct HostTask { 3944 common: WaitableCommon, 3945 caller_instance: RuntimeComponentInstanceIndex, 3946 join_handle: Option<JoinHandle>, 3947 } 3948 3949 impl HostTask { 3950 fn new( 3951 caller_instance: RuntimeComponentInstanceIndex, 3952 join_handle: Option<JoinHandle>, 3953 ) -> Self { 3954 Self { 3955 common: WaitableCommon::default(), 3956 caller_instance, 3957 join_handle, 3958 } 3959 } 3960 } 3961 3962 impl TableDebug for HostTask { 3963 fn type_name() -> &'static str { 3964 "HostTask" 3965 } 3966 } 3967 3968 type CallbackFn = Box< 3969 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32> 3970 + Send 3971 + Sync 3972 + 'static, 3973 >; 3974 3975 /// Represents the caller of a given guest task. 3976 enum Caller { 3977 /// The host called the guest task. 3978 Host { 3979 /// If present, may be used to deliver the result. 3980 tx: Option<oneshot::Sender<LiftedResult>>, 3981 /// Channel to notify once all subtasks spawned by this caller have 3982 /// completed. 3983 /// 3984 /// Note that we'll never actually send anything to this channel; 3985 /// dropping it when the refcount goes to zero is sufficient to notify 3986 /// the receiver. 3987 exit_tx: Arc<oneshot::Sender<()>>, 3988 /// If true, there's a host future that must be dropped before the task 3989 /// can be deleted. 3990 host_future_present: bool, 3991 /// If true, call `post-return` function (if any) automatically. 3992 call_post_return_automatically: bool, 3993 }, 3994 /// Another guest thread called the guest task 3995 Guest { 3996 /// The id of the caller 3997 thread: QualifiedThreadId, 3998 /// The instance to use to enforce reentrance rules. 3999 /// 4000 /// Note that this might not be the same as the instance the caller task 4001 /// started executing in given that one or more synchronous guest->guest 4002 /// calls may have occurred involving multiple instances. 4003 instance: RuntimeComponentInstanceIndex, 4004 }, 4005 } 4006 4007 /// Represents a closure and related canonical ABI parameters required to 4008 /// validate a `task.return` call at runtime and lift the result. 4009 struct LiftResult { 4010 lift: RawLift, 4011 ty: TypeTupleIndex, 4012 memory: Option<SendSyncPtr<VMMemoryDefinition>>, 4013 string_encoding: StringEncoding, 4014 } 4015 4016 /// The table ID for a guest thread, qualified by the task to which it belongs. 4017 /// 4018 /// This exists to minimize table lookups and the necessity to pass stores around mutably 4019 /// for the common case of identifying the task to which a thread belongs. 4020 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4021 struct QualifiedThreadId { 4022 task: TableId<GuestTask>, 4023 thread: TableId<GuestThread>, 4024 } 4025 4026 impl QualifiedThreadId { 4027 fn qualify( 4028 state: &mut ConcurrentState, 4029 thread: TableId<GuestThread>, 4030 ) -> Result<QualifiedThreadId> { 4031 Ok(QualifiedThreadId { 4032 task: state.get_mut(thread)?.parent_task, 4033 thread, 4034 }) 4035 } 4036 } 4037 4038 impl fmt::Debug for QualifiedThreadId { 4039 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 4040 f.debug_tuple("QualifiedThreadId") 4041 .field(&self.task.rep()) 4042 .field(&self.thread.rep()) 4043 .finish() 4044 } 4045 } 4046 4047 enum GuestThreadState { 4048 NotStartedImplicit, 4049 NotStartedExplicit( 4050 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>, 4051 ), 4052 Running, 4053 Suspended(StoreFiber<'static>), 4054 Pending, 4055 Completed, 4056 } 4057 pub struct GuestThread { 4058 /// Context-local state used to implement the `context.{get,set}` 4059 /// intrinsics. 4060 context: [u32; 2], 4061 /// The owning guest task. 4062 parent_task: TableId<GuestTask>, 4063 /// If present, indicates that the thread is currently waiting on the 4064 /// specified set but may be cancelled and woken immediately. 4065 wake_on_cancel: Option<TableId<WaitableSet>>, 4066 /// The execution state of this guest thread 4067 state: GuestThreadState, 4068 /// The index of this thread in the component instance's handle table. 4069 /// This must always be `Some` after initialization. 4070 instance_rep: Option<u32>, 4071 } 4072 4073 impl GuestThread { 4074 /// Retrieve the `GuestThread` corresponding to the specified guest-visible 4075 /// handle. 4076 fn from_instance( 4077 state: Pin<&mut ComponentInstance>, 4078 caller_instance: RuntimeComponentInstanceIndex, 4079 guest_thread: u32, 4080 ) -> Result<TableId<Self>> { 4081 let rep = state.guest_tables().0[caller_instance].guest_thread_rep(guest_thread)?; 4082 Ok(TableId::new(rep)) 4083 } 4084 4085 fn new_implicit(parent_task: TableId<GuestTask>) -> Self { 4086 Self { 4087 context: [0; 2], 4088 parent_task, 4089 wake_on_cancel: None, 4090 state: GuestThreadState::NotStartedImplicit, 4091 instance_rep: None, 4092 } 4093 } 4094 4095 fn new_explicit( 4096 parent_task: TableId<GuestTask>, 4097 start_func: Box< 4098 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, 4099 >, 4100 ) -> Self { 4101 Self { 4102 context: [0; 2], 4103 parent_task, 4104 wake_on_cancel: None, 4105 state: GuestThreadState::NotStartedExplicit(start_func), 4106 instance_rep: None, 4107 } 4108 } 4109 } 4110 4111 impl TableDebug for GuestThread { 4112 fn type_name() -> &'static str { 4113 "GuestThread" 4114 } 4115 } 4116 4117 enum SyncResult { 4118 NotProduced, 4119 Produced(Option<ValRaw>), 4120 Taken, 4121 } 4122 4123 impl SyncResult { 4124 fn take(&mut self) -> Option<Option<ValRaw>> { 4125 match mem::replace(self, SyncResult::Taken) { 4126 SyncResult::NotProduced => None, 4127 SyncResult::Produced(val) => Some(val), 4128 SyncResult::Taken => { 4129 panic!("attempted to take a synchronous result that was already taken") 4130 } 4131 } 4132 } 4133 } 4134 4135 #[derive(Debug)] 4136 enum HostFutureState { 4137 NotApplicable, 4138 Live, 4139 Dropped, 4140 } 4141 4142 /// Represents a pending guest task. 4143 pub(crate) struct GuestTask { 4144 /// See `WaitableCommon` 4145 common: WaitableCommon, 4146 /// Closure to lower the parameters passed to this task. 4147 lower_params: Option<RawLower>, 4148 /// See `LiftResult` 4149 lift_result: Option<LiftResult>, 4150 /// A place to stash the type-erased lifted result if it can't be delivered 4151 /// immediately. 4152 result: Option<LiftedResult>, 4153 /// Closure to call the callback function for an async-lifted export, if 4154 /// provided. 4155 callback: Option<CallbackFn>, 4156 /// See `Caller` 4157 caller: Caller, 4158 /// A place to stash the call context for managing resource borrows while 4159 /// switching between guest tasks. 4160 call_context: Option<CallContext>, 4161 /// A place to stash the lowered result for a sync-to-async call until it 4162 /// can be returned to the caller. 4163 sync_result: SyncResult, 4164 /// Whether or not the task has been cancelled (i.e. whether the task is 4165 /// permitted to call `task.cancel`). 4166 cancel_sent: bool, 4167 /// Whether or not we've sent a `Status::Starting` event to any current or 4168 /// future waiters for this waitable. 4169 starting_sent: bool, 4170 /// Pending guest subtasks created by this task (directly or indirectly). 4171 /// 4172 /// This is used to re-parent subtasks which are still running when their 4173 /// parent task is disposed. 4174 subtasks: HashSet<TableId<GuestTask>>, 4175 /// Scratch waitable set used to watch subtasks during synchronous calls. 4176 sync_call_set: TableId<WaitableSet>, 4177 /// The instance to which the exported function for this guest task belongs. 4178 /// 4179 /// Note that the task may do a sync->sync call via a fused adapter which 4180 /// results in that task executing code in a different instance, and it may 4181 /// call host functions and intrinsics from that other instance. 4182 instance: RuntimeComponentInstanceIndex, 4183 /// If present, a pending `Event::None` or `Event::Cancelled` to be 4184 /// delivered to this task. 4185 event: Option<Event>, 4186 /// The `ExportIndex` of the guest function being called, if known. 4187 function_index: Option<ExportIndex>, 4188 /// Whether or not the task has exited. 4189 exited: bool, 4190 /// Threads belonging to this task 4191 threads: HashSet<TableId<GuestThread>>, 4192 /// The state of the host future that represents an async task, which must 4193 /// be dropped before we can delete the task. 4194 host_future_state: HostFutureState, 4195 /// Indicates whether this task was created for a call to an async-lifted 4196 /// export. 4197 async_function: bool, 4198 } 4199 4200 impl GuestTask { 4201 fn already_lowered_parameters(&self) -> bool { 4202 // We reset `lower_params` after we lower the parameters 4203 self.lower_params.is_none() 4204 } 4205 fn returned_or_cancelled(&self) -> bool { 4206 // We reset `lift_result` after we return or exit 4207 self.lift_result.is_none() 4208 } 4209 fn ready_to_delete(&self) -> bool { 4210 let threads_completed = self.threads.is_empty(); 4211 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); 4212 let pending_completion_event = matches!( 4213 self.common.event, 4214 Some(Event::Subtask { 4215 status: Status::Returned | Status::ReturnCancelled 4216 }) 4217 ); 4218 let ready = threads_completed 4219 && !has_sync_result 4220 && !pending_completion_event 4221 && !matches!(self.host_future_state, HostFutureState::Live); 4222 log::trace!( 4223 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", 4224 threads_completed, 4225 has_sync_result, 4226 pending_completion_event, 4227 self.host_future_state 4228 ); 4229 ready 4230 } 4231 fn new( 4232 state: &mut ConcurrentState, 4233 lower_params: RawLower, 4234 lift_result: LiftResult, 4235 caller: Caller, 4236 callback: Option<CallbackFn>, 4237 component_instance: RuntimeComponentInstanceIndex, 4238 async_function: bool, 4239 ) -> Result<Self> { 4240 let sync_call_set = state.push(WaitableSet::default())?; 4241 let host_future_state = match &caller { 4242 Caller::Guest { .. } => HostFutureState::NotApplicable, 4243 Caller::Host { 4244 host_future_present, 4245 .. 4246 } => { 4247 if *host_future_present { 4248 HostFutureState::Live 4249 } else { 4250 HostFutureState::NotApplicable 4251 } 4252 } 4253 }; 4254 Ok(Self { 4255 common: WaitableCommon::default(), 4256 lower_params: Some(lower_params), 4257 lift_result: Some(lift_result), 4258 result: None, 4259 callback, 4260 caller, 4261 call_context: Some(CallContext::default()), 4262 sync_result: SyncResult::NotProduced, 4263 cancel_sent: false, 4264 starting_sent: false, 4265 subtasks: HashSet::new(), 4266 sync_call_set, 4267 instance: component_instance, 4268 event: None, 4269 function_index: None, 4270 exited: false, 4271 threads: HashSet::new(), 4272 host_future_state, 4273 async_function, 4274 }) 4275 } 4276 4277 /// Dispose of this guest task, reparenting any pending subtasks to the 4278 /// caller. 4279 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> { 4280 // If there are not-yet-delivered completion events for subtasks in 4281 // `self.sync_call_set`, recursively dispose of those subtasks as well. 4282 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) { 4283 if let Some(Event::Subtask { 4284 status: Status::Returned | Status::ReturnCancelled, 4285 }) = waitable.common(state)?.event 4286 { 4287 waitable.delete_from(state)?; 4288 } 4289 } 4290 4291 assert!(self.threads.is_empty()); 4292 4293 state.delete(self.sync_call_set)?; 4294 4295 // Reparent any pending subtasks to the caller. 4296 match &self.caller { 4297 Caller::Guest { 4298 thread, 4299 instance: runtime_instance, 4300 } => { 4301 let task_mut = state.get_mut(thread.task)?; 4302 let present = task_mut.subtasks.remove(&me); 4303 assert!(present); 4304 4305 for subtask in &self.subtasks { 4306 task_mut.subtasks.insert(*subtask); 4307 } 4308 4309 for subtask in &self.subtasks { 4310 state.get_mut(*subtask)?.caller = Caller::Guest { 4311 thread: *thread, 4312 instance: *runtime_instance, 4313 }; 4314 } 4315 } 4316 Caller::Host { exit_tx, .. } => { 4317 for subtask in &self.subtasks { 4318 state.get_mut(*subtask)?.caller = Caller::Host { 4319 tx: None, 4320 // Clone `exit_tx` to ensure that it is only dropped 4321 // once all transitive subtasks of the host call have 4322 // exited: 4323 exit_tx: exit_tx.clone(), 4324 host_future_present: false, 4325 call_post_return_automatically: true, 4326 }; 4327 } 4328 } 4329 } 4330 4331 for subtask in self.subtasks { 4332 if state.get_mut(subtask)?.exited { 4333 Waitable::Guest(subtask).delete_from(state)?; 4334 } 4335 } 4336 4337 Ok(()) 4338 } 4339 4340 fn call_post_return_automatically(&self) -> bool { 4341 matches!( 4342 self.caller, 4343 Caller::Guest { .. } 4344 | Caller::Host { 4345 call_post_return_automatically: true, 4346 .. 4347 } 4348 ) 4349 } 4350 } 4351 4352 impl TableDebug for GuestTask { 4353 fn type_name() -> &'static str { 4354 "GuestTask" 4355 } 4356 } 4357 4358 /// Represents state common to all kinds of waitables. 4359 #[derive(Default)] 4360 struct WaitableCommon { 4361 /// The currently pending event for this waitable, if any. 4362 event: Option<Event>, 4363 /// The set to which this waitable belongs, if any. 4364 set: Option<TableId<WaitableSet>>, 4365 /// The handle with which the guest refers to this waitable, if any. 4366 handle: Option<u32>, 4367 } 4368 4369 /// Represents a Component Model Async `waitable`. 4370 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4371 enum Waitable { 4372 /// A host task 4373 Host(TableId<HostTask>), 4374 /// A guest task 4375 Guest(TableId<GuestTask>), 4376 /// The read or write end of a stream or future 4377 Transmit(TableId<TransmitHandle>), 4378 } 4379 4380 impl Waitable { 4381 /// Retrieve the `Waitable` corresponding to the specified guest-visible 4382 /// handle. 4383 fn from_instance( 4384 state: Pin<&mut ComponentInstance>, 4385 caller_instance: RuntimeComponentInstanceIndex, 4386 waitable: u32, 4387 ) -> Result<Self> { 4388 use crate::runtime::vm::component::Waitable; 4389 4390 let (waitable, kind) = state.guest_tables().0[caller_instance].waitable_rep(waitable)?; 4391 4392 Ok(match kind { 4393 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), 4394 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), 4395 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), 4396 }) 4397 } 4398 4399 /// Retrieve the host-visible identifier for this `Waitable`. 4400 fn rep(&self) -> u32 { 4401 match self { 4402 Self::Host(id) => id.rep(), 4403 Self::Guest(id) => id.rep(), 4404 Self::Transmit(id) => id.rep(), 4405 } 4406 } 4407 4408 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or 4409 /// remove it from any set it may currently belong to (when `set` is 4410 /// `None`). 4411 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> { 4412 log::trace!("waitable {self:?} join set {set:?}",); 4413 4414 let old = mem::replace(&mut self.common(state)?.set, set); 4415 4416 if let Some(old) = old { 4417 match *self { 4418 Waitable::Host(id) => state.remove_child(id, old), 4419 Waitable::Guest(id) => state.remove_child(id, old), 4420 Waitable::Transmit(id) => state.remove_child(id, old), 4421 }?; 4422 4423 state.get_mut(old)?.ready.remove(self); 4424 } 4425 4426 if let Some(set) = set { 4427 match *self { 4428 Waitable::Host(id) => state.add_child(id, set), 4429 Waitable::Guest(id) => state.add_child(id, set), 4430 Waitable::Transmit(id) => state.add_child(id, set), 4431 }?; 4432 4433 if self.common(state)?.event.is_some() { 4434 self.mark_ready(state)?; 4435 } 4436 } 4437 4438 Ok(()) 4439 } 4440 4441 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. 4442 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { 4443 Ok(match self { 4444 Self::Host(id) => &mut state.get_mut(*id)?.common, 4445 Self::Guest(id) => &mut state.get_mut(*id)?.common, 4446 Self::Transmit(id) => &mut state.get_mut(*id)?.common, 4447 }) 4448 } 4449 4450 /// Set or clear the pending event for this waitable and either deliver it 4451 /// to the first waiter, if any, or mark it as ready to be delivered to the 4452 /// next waiter that arrives. 4453 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> { 4454 log::trace!("set event for {self:?}: {event:?}"); 4455 self.common(state)?.event = event; 4456 self.mark_ready(state) 4457 } 4458 4459 /// Take the pending event from this waitable, leaving `None` in its place. 4460 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> { 4461 let common = self.common(state)?; 4462 let event = common.event.take(); 4463 if let Some(set) = self.common(state)?.set { 4464 state.get_mut(set)?.ready.remove(self); 4465 } 4466 4467 Ok(event) 4468 } 4469 4470 /// Deliver the current event for this waitable to the first waiter, if any, 4471 /// or else mark it as ready to be delivered to the next waiter that 4472 /// arrives. 4473 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { 4474 if let Some(set) = self.common(state)?.set { 4475 state.get_mut(set)?.ready.insert(*self); 4476 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { 4477 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); 4478 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); 4479 4480 let item = match mode { 4481 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 4482 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { 4483 thread, 4484 kind: GuestCallKind::DeliverEvent { 4485 instance, 4486 set: Some(set), 4487 }, 4488 }), 4489 }; 4490 state.push_high_priority(item); 4491 } 4492 } 4493 Ok(()) 4494 } 4495 4496 /// Remove this waitable from the instance's rep table. 4497 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { 4498 match self { 4499 Self::Host(task) => { 4500 log::trace!("delete host task {task:?}"); 4501 state.delete(*task)?; 4502 } 4503 Self::Guest(task) => { 4504 log::trace!("delete guest task {task:?}"); 4505 state.delete(*task)?.dispose(state, *task)?; 4506 } 4507 Self::Transmit(task) => { 4508 state.delete(*task)?; 4509 } 4510 } 4511 4512 Ok(()) 4513 } 4514 } 4515 4516 impl fmt::Debug for Waitable { 4517 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 4518 match self { 4519 Self::Host(id) => write!(f, "{id:?}"), 4520 Self::Guest(id) => write!(f, "{id:?}"), 4521 Self::Transmit(id) => write!(f, "{id:?}"), 4522 } 4523 } 4524 } 4525 4526 /// Represents a Component Model Async `waitable-set`. 4527 #[derive(Default)] 4528 struct WaitableSet { 4529 /// Which waitables in this set have pending events, if any. 4530 ready: BTreeSet<Waitable>, 4531 /// Which guest threads are currently waiting on this set, if any. 4532 waiting: BTreeMap<QualifiedThreadId, WaitMode>, 4533 } 4534 4535 impl TableDebug for WaitableSet { 4536 fn type_name() -> &'static str { 4537 "WaitableSet" 4538 } 4539 } 4540 4541 /// Type-erased closure to lower the parameters for a guest task. 4542 type RawLower = 4543 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>; 4544 4545 /// Type-erased closure to lift the result for a guest task. 4546 type RawLift = Box< 4547 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4548 >; 4549 4550 /// Type erased result of a guest task which may be downcast to the expected 4551 /// type by a host caller (or simply ignored in the case of a guest caller; see 4552 /// `DummyResult`). 4553 type LiftedResult = Box<dyn Any + Send + Sync>; 4554 4555 /// Used to return a result from a `LiftFn` when the actual result has already 4556 /// been lowered to a guest task's stack and linear memory. 4557 struct DummyResult; 4558 4559 /// Represents the Component Model Async state of a (sub-)component instance. 4560 #[derive(Default)] 4561 struct InstanceState { 4562 /// Whether backpressure is set for this instance (enabled if >0) 4563 backpressure: u16, 4564 /// Whether this instance can be entered 4565 do_not_enter: bool, 4566 /// Pending calls for this instance which require `Self::backpressure` to be 4567 /// `true` and/or `Self::do_not_enter` to be false before they can proceed. 4568 pending: BTreeMap<QualifiedThreadId, GuestCallKind>, 4569 } 4570 4571 /// Represents the Component Model Async state of a store. 4572 pub struct ConcurrentState { 4573 /// The currently running guest thread, if any. 4574 guest_thread: Option<QualifiedThreadId>, 4575 4576 /// The set of pending host and background tasks, if any. 4577 /// 4578 /// See `ComponentInstance::poll_until` for where we temporarily take this 4579 /// out, poll it, then put it back to avoid any mutable aliasing hazards. 4580 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>, 4581 /// The table of waitables, waitable sets, etc. 4582 table: AlwaysMut<ResourceTable>, 4583 /// Per (sub-)component instance states. 4584 /// 4585 /// See `InstanceState` for details and note that this map is lazily 4586 /// populated as needed. 4587 // TODO: this can and should be a `PrimaryMap` 4588 instance_states: HashMap<RuntimeComponentInstanceIndex, InstanceState>, 4589 /// The "high priority" work queue for this store's event loop. 4590 high_priority: Vec<WorkItem>, 4591 /// The "low priority" work queue for this store's event loop. 4592 low_priority: Vec<WorkItem>, 4593 /// A place to stash the reason a fiber is suspending so that the code which 4594 /// resumed it will know under what conditions the fiber should be resumed 4595 /// again. 4596 suspend_reason: Option<SuspendReason>, 4597 /// A cached fiber which is waiting for work to do. 4598 /// 4599 /// This helps us avoid creating a new fiber for each `GuestCall` work item. 4600 worker: Option<StoreFiber<'static>>, 4601 /// A place to stash the work item for which we're resuming a worker fiber. 4602 worker_item: Option<WorkerItem>, 4603 4604 /// Reference counts for all component error contexts 4605 /// 4606 /// NOTE: it is possible the global ref count to be *greater* than the sum of 4607 /// (sub)component ref counts as tracked by `error_context_tables`, for 4608 /// example when the host holds one or more references to error contexts. 4609 /// 4610 /// The key of this primary map is often referred to as the "rep" (i.e. host-side 4611 /// component-wide representation) of the index into concurrent state for a given 4612 /// stored `ErrorContext`. 4613 /// 4614 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same 4615 /// as a `TableId<ErrorContextState>`. 4616 global_error_context_ref_counts: 4617 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>, 4618 } 4619 4620 impl Default for ConcurrentState { 4621 fn default() -> Self { 4622 Self { 4623 guest_thread: None, 4624 table: AlwaysMut::new(ResourceTable::new()), 4625 futures: AlwaysMut::new(Some(FuturesUnordered::new())), 4626 instance_states: HashMap::new(), 4627 high_priority: Vec::new(), 4628 low_priority: Vec::new(), 4629 suspend_reason: None, 4630 worker: None, 4631 worker_item: None, 4632 global_error_context_ref_counts: BTreeMap::new(), 4633 } 4634 } 4635 } 4636 4637 impl ConcurrentState { 4638 /// Take ownership of any fibers and futures owned by this object. 4639 /// 4640 /// This should be used when disposing of the `Store` containing this object 4641 /// in order to gracefully resolve any and all fibers using 4642 /// `StoreFiber::dispose`. This is necessary to avoid possible 4643 /// use-after-free bugs due to fibers which may still have access to the 4644 /// `Store`. 4645 /// 4646 /// Additionally, the futures collected with this function should be dropped 4647 /// within a `tls::set` call, which will ensure than any futures closing 4648 /// over an `&Accessor` will have access to the store when dropped, allowing 4649 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without 4650 /// panicking. 4651 /// 4652 /// Note that this will leave the object in an inconsistent and unusable 4653 /// state, so it should only be used just prior to dropping it. 4654 pub(crate) fn take_fibers_and_futures( 4655 &mut self, 4656 fibers: &mut Vec<StoreFiber<'static>>, 4657 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, 4658 ) { 4659 for entry in self.table.get_mut().iter_mut() { 4660 if let Some(set) = entry.downcast_mut::<WaitableSet>() { 4661 for mode in mem::take(&mut set.waiting).into_values() { 4662 if let WaitMode::Fiber(fiber) = mode { 4663 fibers.push(fiber); 4664 } 4665 } 4666 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() { 4667 if let GuestThreadState::Suspended(fiber) = 4668 mem::replace(&mut thread.state, GuestThreadState::Completed) 4669 { 4670 fibers.push(fiber); 4671 } 4672 } 4673 } 4674 4675 if let Some(fiber) = self.worker.take() { 4676 fibers.push(fiber); 4677 } 4678 4679 let mut take_items = |list| { 4680 for item in mem::take(list) { 4681 match item { 4682 WorkItem::ResumeFiber(fiber) => { 4683 fibers.push(fiber); 4684 } 4685 WorkItem::PushFuture(future) => { 4686 self.futures 4687 .get_mut() 4688 .as_mut() 4689 .unwrap() 4690 .push(future.into_inner()); 4691 } 4692 _ => {} 4693 } 4694 } 4695 }; 4696 4697 take_items(&mut self.high_priority); 4698 take_items(&mut self.low_priority); 4699 4700 if let Some(them) = self.futures.get_mut().take() { 4701 futures.push(them); 4702 } 4703 } 4704 4705 fn instance_state(&mut self, instance: RuntimeComponentInstanceIndex) -> &mut InstanceState { 4706 self.instance_states.entry(instance).or_default() 4707 } 4708 4709 fn push<V: Send + Sync + 'static>( 4710 &mut self, 4711 value: V, 4712 ) -> Result<TableId<V>, ResourceTableError> { 4713 self.table.get_mut().push(value).map(TableId::from) 4714 } 4715 4716 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> { 4717 self.table.get_mut().get_mut(&Resource::from(id)) 4718 } 4719 4720 pub fn add_child<T: 'static, U: 'static>( 4721 &mut self, 4722 child: TableId<T>, 4723 parent: TableId<U>, 4724 ) -> Result<(), ResourceTableError> { 4725 self.table 4726 .get_mut() 4727 .add_child(Resource::from(child), Resource::from(parent)) 4728 } 4729 4730 pub fn remove_child<T: 'static, U: 'static>( 4731 &mut self, 4732 child: TableId<T>, 4733 parent: TableId<U>, 4734 ) -> Result<(), ResourceTableError> { 4735 self.table 4736 .get_mut() 4737 .remove_child(Resource::from(child), Resource::from(parent)) 4738 } 4739 4740 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> { 4741 self.table.get_mut().delete(Resource::from(id)) 4742 } 4743 4744 fn push_future(&mut self, future: HostTaskFuture) { 4745 // Note that we can't directly push to `ConcurrentState::futures` here 4746 // since this may be called from a future that's being polled inside 4747 // `Self::poll_until`, which temporarily removes the `FuturesUnordered` 4748 // so it has exclusive access while polling it. Therefore, we push a 4749 // work item to the "high priority" queue, which will actually push to 4750 // `ConcurrentState::futures` later. 4751 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); 4752 } 4753 4754 fn push_high_priority(&mut self, item: WorkItem) { 4755 log::trace!("push high priority: {item:?}"); 4756 self.high_priority.push(item); 4757 } 4758 4759 fn push_low_priority(&mut self, item: WorkItem) { 4760 log::trace!("push low priority: {item:?}"); 4761 self.low_priority.push(item); 4762 } 4763 4764 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { 4765 if high_priority { 4766 self.push_high_priority(item); 4767 } else { 4768 self.push_low_priority(item); 4769 } 4770 } 4771 4772 /// Determine whether the instance associated with the specified guest task 4773 /// may be entered (i.e. is not already on the async call stack). 4774 /// 4775 /// This is an additional check on top of the "may_enter" instance flag; 4776 /// it's needed because async-lifted exports with callback functions must 4777 /// not call their own instances directly or indirectly, and due to the 4778 /// "stackless" nature of callback-enabled guest tasks this may happen even 4779 /// if there are no activation records on the stack (i.e. the "may_enter" 4780 /// field is `true`) for that instance. 4781 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool { 4782 let guest_instance = self.get_mut(guest_task).unwrap().instance; 4783 4784 // Walk the task tree back to the root, looking for potential 4785 // reentrance. 4786 // 4787 // TODO: This could be optimized by maintaining a per-`GuestTask` bitset 4788 // such that each bit represents and instance which has been entered by 4789 // that task or an ancestor of that task, in which case this would be a 4790 // constant time check. 4791 loop { 4792 let next_thread = match &self.get_mut(guest_task).unwrap().caller { 4793 Caller::Host { .. } => break true, 4794 Caller::Guest { thread, instance } => { 4795 if *instance == guest_instance { 4796 break false; 4797 } else { 4798 *thread 4799 } 4800 } 4801 }; 4802 guest_task = next_thread.task; 4803 } 4804 } 4805 4806 /// Record that we're about to enter a (sub-)component instance which does 4807 /// not support more than one concurrent, stackful activation, meaning it 4808 /// cannot be entered again until the next call returns. 4809 fn enter_instance(&mut self, instance: RuntimeComponentInstanceIndex) { 4810 self.instance_state(instance).do_not_enter = true; 4811 } 4812 4813 /// Record that we've exited a (sub-)component instance previously entered 4814 /// with `Self::enter_instance` and then calls `Self::partition_pending`. 4815 /// See the documentation for the latter for details. 4816 fn exit_instance(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { 4817 self.instance_state(instance).do_not_enter = false; 4818 self.partition_pending(instance) 4819 } 4820 4821 /// Iterate over `InstanceState::pending`, moving any ready items into the 4822 /// "high priority" work item queue. 4823 /// 4824 /// See `GuestCall::is_ready` for details. 4825 fn partition_pending(&mut self, instance: RuntimeComponentInstanceIndex) -> Result<()> { 4826 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { 4827 let call = GuestCall { thread, kind }; 4828 if call.is_ready(self)? { 4829 self.push_high_priority(WorkItem::GuestCall(call)); 4830 } else { 4831 self.instance_state(instance) 4832 .pending 4833 .insert(call.thread, call.kind); 4834 } 4835 } 4836 4837 Ok(()) 4838 } 4839 4840 /// Implements the `backpressure.{inc,dec}` intrinsics. 4841 pub(crate) fn backpressure_modify( 4842 &mut self, 4843 caller_instance: RuntimeComponentInstanceIndex, 4844 modify: impl FnOnce(u16) -> Option<u16>, 4845 ) -> Result<()> { 4846 let state = self.instance_state(caller_instance); 4847 let old = state.backpressure; 4848 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?; 4849 state.backpressure = new; 4850 4851 if old > 0 && new == 0 { 4852 // Backpressure was previously enabled and is now disabled; move any 4853 // newly-eligible guest calls to the "high priority" queue. 4854 self.partition_pending(caller_instance)?; 4855 } 4856 4857 Ok(()) 4858 } 4859 4860 /// Implements the `context.get` intrinsic. 4861 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> { 4862 let thread = self.guest_thread.unwrap(); 4863 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()]; 4864 log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); 4865 Ok(val) 4866 } 4867 4868 /// Implements the `context.set` intrinsic. 4869 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { 4870 let thread = self.guest_thread.unwrap(); 4871 log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); 4872 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val; 4873 Ok(()) 4874 } 4875 4876 /// Returns whether there's a pending cancellation on the current guest thread, 4877 /// consuming the event if so. 4878 fn take_pending_cancellation(&mut self) -> bool { 4879 let thread = self.guest_thread.unwrap(); 4880 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() { 4881 assert!(matches!(event, Event::Cancelled)); 4882 true 4883 } else { 4884 false 4885 } 4886 } 4887 4888 fn check_blocking(&mut self) -> Result<()> { 4889 let task = self.guest_thread.unwrap().task; 4890 self.check_blocking_for(task) 4891 } 4892 4893 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> { 4894 if self.may_block(task) { 4895 Ok(()) 4896 } else { 4897 Err(Trap::CannotBlockSyncTask.into()) 4898 } 4899 } 4900 4901 fn may_block(&mut self, task: TableId<GuestTask>) -> bool { 4902 let task = self.get_mut(task).unwrap(); 4903 task.async_function || task.returned_or_cancelled() 4904 } 4905 } 4906 4907 /// Provide a type hint to compiler about the shape of a parameter lower 4908 /// closure. 4909 fn for_any_lower< 4910 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, 4911 >( 4912 fun: F, 4913 ) -> F { 4914 fun 4915 } 4916 4917 /// Provide a type hint to compiler about the shape of a result lift closure. 4918 fn for_any_lift< 4919 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4920 >( 4921 fun: F, 4922 ) -> F { 4923 fun 4924 } 4925 4926 /// Wrap the specified future in a `poll_fn` which asserts that the future is 4927 /// only polled from the event loop of the specified `Store`. 4928 /// 4929 /// See `StoreContextMut::run_concurrent` for details. 4930 fn checked<F: Future + Send + 'static>( 4931 id: StoreId, 4932 fut: F, 4933 ) -> impl Future<Output = F::Output> + Send + 'static { 4934 async move { 4935 let mut fut = pin!(fut); 4936 future::poll_fn(move |cx| { 4937 let message = "\ 4938 `Future`s which depend on asynchronous component tasks, streams, or \ 4939 futures to complete may only be polled from the event loop of the \ 4940 store to which they belong. Please use \ 4941 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ 4942 "; 4943 tls::try_get(|store| { 4944 let matched = match store { 4945 tls::TryGet::Some(store) => store.id() == id, 4946 tls::TryGet::Taken | tls::TryGet::None => false, 4947 }; 4948 4949 if !matched { 4950 panic!("{message}") 4951 } 4952 }); 4953 fut.as_mut().poll(cx) 4954 }) 4955 .await 4956 } 4957 } 4958 4959 /// Assert that `StoreContextMut::run_concurrent` has not been called from 4960 /// within an store's event loop. 4961 fn check_recursive_run() { 4962 tls::try_get(|store| { 4963 if !matches!(store, tls::TryGet::None) { 4964 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") 4965 } 4966 }); 4967 } 4968 4969 fn unpack_callback_code(code: u32) -> (u32, u32) { 4970 (code & 0xF, code >> 4) 4971 } 4972 4973 /// Helper struct for packaging parameters to be passed to 4974 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or 4975 /// `waitable-set.poll`. 4976 struct WaitableCheckParams { 4977 set: TableId<WaitableSet>, 4978 options: OptionsIndex, 4979 payload: u32, 4980 } 4981 4982 /// Helper enum for passing parameters to `ComponentInstance::waitable_check`. 4983 enum WaitableCheck { 4984 Wait(WaitableCheckParams), 4985 Poll(WaitableCheckParams), 4986 } 4987 4988 /// Represents a guest task called from the host, prepared using `prepare_call`. 4989 pub(crate) struct PreparedCall<R> { 4990 /// The guest export to be called 4991 handle: Func, 4992 /// The guest thread created by `prepare_call` 4993 thread: QualifiedThreadId, 4994 /// The number of lowered core Wasm parameters to pass to the call. 4995 param_count: usize, 4996 /// The `oneshot::Receiver` to which the result of the call will be 4997 /// delivered when it is available. 4998 rx: oneshot::Receiver<LiftedResult>, 4999 /// The `oneshot::Receiver` which will resolve when the task -- and any 5000 /// transitive subtasks -- have all exited. 5001 exit_rx: oneshot::Receiver<()>, 5002 _phantom: PhantomData<R>, 5003 } 5004 5005 impl<R> PreparedCall<R> { 5006 /// Get a copy of the `TaskId` for this `PreparedCall`. 5007 pub(crate) fn task_id(&self) -> TaskId { 5008 TaskId { 5009 task: self.thread.task, 5010 } 5011 } 5012 } 5013 5014 /// Represents a task created by `prepare_call`. 5015 pub(crate) struct TaskId { 5016 task: TableId<GuestTask>, 5017 } 5018 5019 impl TaskId { 5020 /// The host future for an async task was dropped. If the parameters have not been lowered yet, 5021 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, 5022 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended 5023 /// and can be resumed by other tasks for this component, so we mark the future as dropped 5024 /// and delete the task when all threads are done. 5025 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> { 5026 let task = store.0.concurrent_state_mut().get_mut(self.task)?; 5027 if !task.already_lowered_parameters() { 5028 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5029 } else { 5030 task.host_future_state = HostFutureState::Dropped; 5031 if task.ready_to_delete() { 5032 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5033 } 5034 } 5035 Ok(()) 5036 } 5037 } 5038 5039 /// Prepare a call to the specified exported Wasm function, providing functions 5040 /// for lowering the parameters and lifting the result. 5041 /// 5042 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event 5043 /// loop, use `queue_call`. 5044 pub(crate) fn prepare_call<T, R>( 5045 mut store: StoreContextMut<T>, 5046 handle: Func, 5047 param_count: usize, 5048 host_future_present: bool, 5049 call_post_return_automatically: bool, 5050 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> 5051 + Send 5052 + Sync 5053 + 'static, 5054 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> 5055 + Send 5056 + Sync 5057 + 'static, 5058 ) -> Result<PreparedCall<R>> { 5059 let (options, _flags, ty, raw_options) = handle.abi_info(store.0); 5060 5061 let instance = handle.instance().id().get(store.0); 5062 let options = &instance.component().env_component().options[options]; 5063 let ty = &instance.component().types()[ty]; 5064 let async_function = ty.async_; 5065 let task_return_type = ty.results; 5066 let component_instance = raw_options.instance; 5067 let callback = options.callback.map(|i| instance.runtime_callback(i)); 5068 let memory = options 5069 .memory() 5070 .map(|i| instance.runtime_memory(i)) 5071 .map(SendSyncPtr::new); 5072 let string_encoding = options.string_encoding; 5073 let token = StoreToken::new(store.as_context_mut()); 5074 let state = store.0.concurrent_state_mut(); 5075 5076 assert!(state.guest_thread.is_none()); 5077 5078 let (tx, rx) = oneshot::channel(); 5079 let (exit_tx, exit_rx) = oneshot::channel(); 5080 5081 let mut task = GuestTask::new( 5082 state, 5083 Box::new(for_any_lower(move |store, params| { 5084 lower_params(handle, token.as_context_mut(store), params) 5085 })), 5086 LiftResult { 5087 lift: Box::new(for_any_lift(move |store, result| { 5088 lift_result(handle, store, result) 5089 })), 5090 ty: task_return_type, 5091 memory, 5092 string_encoding, 5093 }, 5094 Caller::Host { 5095 tx: Some(tx), 5096 exit_tx: Arc::new(exit_tx), 5097 host_future_present, 5098 call_post_return_automatically, 5099 }, 5100 callback.map(|callback| { 5101 let callback = SendSyncPtr::new(callback); 5102 let instance = handle.instance(); 5103 Box::new( 5104 move |store: &mut dyn VMStore, runtime_instance, event, handle| { 5105 let store = token.as_context_mut(store); 5106 // SAFETY: Per the contract of `prepare_call`, the callback 5107 // will remain valid at least as long is this task exists. 5108 unsafe { 5109 instance.call_callback( 5110 store, 5111 runtime_instance, 5112 callback, 5113 event, 5114 handle, 5115 call_post_return_automatically, 5116 ) 5117 } 5118 }, 5119 ) as CallbackFn 5120 }), 5121 component_instance, 5122 async_function, 5123 )?; 5124 task.function_index = Some(handle.index()); 5125 5126 let task = state.push(task)?; 5127 let thread = state.push(GuestThread::new_implicit(task))?; 5128 state.get_mut(task)?.threads.insert(thread); 5129 5130 Ok(PreparedCall { 5131 handle, 5132 thread: QualifiedThreadId { task, thread }, 5133 param_count, 5134 rx, 5135 exit_rx, 5136 _phantom: PhantomData, 5137 }) 5138 } 5139 5140 /// Queue a call previously prepared using `prepare_call` to be run as part of 5141 /// the associated `ComponentInstance`'s event loop. 5142 /// 5143 /// The returned future will resolve to the result once it is available, but 5144 /// must only be polled via the instance's event loop. See 5145 /// `StoreContextMut::run_concurrent` for details. 5146 pub(crate) fn queue_call<T: 'static, R: Send + 'static>( 5147 mut store: StoreContextMut<T>, 5148 prepared: PreparedCall<R>, 5149 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> { 5150 let PreparedCall { 5151 handle, 5152 thread, 5153 param_count, 5154 rx, 5155 exit_rx, 5156 .. 5157 } = prepared; 5158 5159 queue_call0(store.as_context_mut(), handle, thread, param_count)?; 5160 5161 Ok(checked( 5162 store.0.id(), 5163 rx.map(move |result| { 5164 result 5165 .map(|v| (*v.downcast().unwrap(), exit_rx)) 5166 .map_err(anyhow::Error::from) 5167 }), 5168 )) 5169 } 5170 5171 /// Queue a call previously prepared using `prepare_call` to be run as part of 5172 /// the associated `ComponentInstance`'s event loop. 5173 fn queue_call0<T: 'static>( 5174 store: StoreContextMut<T>, 5175 handle: Func, 5176 guest_thread: QualifiedThreadId, 5177 param_count: usize, 5178 ) -> Result<()> { 5179 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0); 5180 let is_concurrent = raw_options.async_; 5181 let callback = raw_options.callback; 5182 let instance = handle.instance(); 5183 let callee = handle.lifted_core_func(store.0); 5184 let post_return = handle.post_return_core_func(store.0); 5185 let callback = callback.map(|i| { 5186 let instance = instance.id().get(store.0); 5187 SendSyncPtr::new(instance.runtime_callback(i)) 5188 }); 5189 5190 log::trace!("queueing call {guest_thread:?}"); 5191 5192 let instance_flags = if callback.is_none() { 5193 None 5194 } else { 5195 Some(flags) 5196 }; 5197 5198 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers 5199 // (with signatures appropriate for this call) and will remain valid as 5200 // long as this instance is valid. 5201 unsafe { 5202 instance.queue_call( 5203 store, 5204 guest_thread, 5205 SendSyncPtr::new(callee), 5206 param_count, 5207 1, 5208 instance_flags, 5209 is_concurrent, 5210 callback, 5211 post_return.map(SendSyncPtr::new), 5212 ) 5213 } 5214 } 5215