1 //! Runtime support for the Component Model Async ABI. 2 //! 3 //! This module and its submodules provide host runtime support for Component 4 //! Model Async features such as async-lifted exports, async-lowered imports, 5 //! streams, futures, and related intrinsics. See [the Async 6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) 7 //! for a high-level overview. 8 //! 9 //! At the core of this support is an event loop which schedules and switches 10 //! between guest tasks and any host tasks they create. Each 11 //! `Store` will have at most one event loop running at any given 12 //! time, and that loop may be suspended and resumed by the host embedder using 13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` 14 //! function contains the loop itself, while the 15 //! `StoreOpaque::concurrent_state` field holds its state. 16 //! 17 //! # Public API Overview 18 //! 19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) 20 //! 21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an 22 //! async-lifted or sync-lifted import, creating a guest task. 23 //! 24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified 25 //! instance, allowing any and all tasks belonging to that instance to make 26 //! progress. 27 //! 28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop 29 //! for the specified instance. 30 //! 31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or 32 //! `stream` which may be passed to the guest. This takes a 33 //! `{Future,Stream}Producer` implementation which will be polled for items when 34 //! the consumer requests them. 35 //! 36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by 37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items 38 //! produced by the write end. 39 //! 40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) 41 //! 42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host 43 //! function with the linker. That function will take an `Accessor` as its 44 //! first parameter, which provides access to the store between (but not across) 45 //! await points. 46 //! 47 //! - `Accessor::with`: Access the store and its associated data. 48 //! 49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the 50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use 51 //! in host functions. 52 53 use crate::component::func::{self, Func}; 54 use crate::component::store::StoreComponentInstanceId; 55 use crate::component::{ 56 ComponentInstanceId, HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, 57 }; 58 use crate::fiber::{self, StoreFiber, StoreFiberYield}; 59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; 60 use crate::vm::component::{ 61 CallContext, ComponentInstance, HandleTable, InstanceFlags, ResourceTables, 62 }; 63 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; 64 use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType}; 65 use anyhow::{Context as _, Result, anyhow, bail}; 66 use error_contexts::GlobalErrorContextRefCount; 67 use futures::channel::oneshot; 68 use futures::future::{self, Either, FutureExt}; 69 use futures::stream::{FuturesUnordered, StreamExt}; 70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; 71 use std::any::Any; 72 use std::borrow::ToOwned; 73 use std::boxed::Box; 74 use std::cell::UnsafeCell; 75 use std::collections::{BTreeMap, BTreeSet, HashSet}; 76 use std::fmt; 77 use std::future::Future; 78 use std::marker::PhantomData; 79 use std::mem::{self, ManuallyDrop, MaybeUninit}; 80 use std::ops::DerefMut; 81 use std::pin::{Pin, pin}; 82 use std::ptr::{self, NonNull}; 83 use std::slice; 84 use std::sync::Arc; 85 use std::task::{Context, Poll, Waker}; 86 use std::vec::Vec; 87 use table::{TableDebug, TableId}; 88 use wasmtime_environ::Trap; 89 use wasmtime_environ::component::{ 90 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, ExportIndex, MAX_FLAT_PARAMS, 91 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, 92 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, 93 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, 94 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, 95 }; 96 97 pub use abort::JoinHandle; 98 pub use future_stream_any::{FutureAny, StreamAny}; 99 pub use futures_and_streams::{ 100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, 101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, 102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, 103 }; 104 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; 105 106 mod abort; 107 mod error_contexts; 108 mod future_stream_any; 109 mod futures_and_streams; 110 pub(crate) mod table; 111 pub(crate) mod tls; 112 113 /// Constant defined in the Component Model spec to indicate that the async 114 /// intrinsic (e.g. `future.write`) has not yet completed. 115 const BLOCKED: u32 = 0xffff_ffff; 116 117 /// Corresponds to `CallState` in the upstream spec. 118 #[derive(Clone, Copy, Eq, PartialEq, Debug)] 119 pub enum Status { 120 Starting = 0, 121 Started = 1, 122 Returned = 2, 123 StartCancelled = 3, 124 ReturnCancelled = 4, 125 } 126 127 impl Status { 128 /// Packs this status and the optional `waitable` provided into a 32-bit 129 /// result that the canonical ABI requires. 130 /// 131 /// The low 4 bits are reserved for the status while the upper 28 bits are 132 /// the waitable, if present. 133 pub fn pack(self, waitable: Option<u32>) -> u32 { 134 assert!(matches!(self, Status::Returned) == waitable.is_none()); 135 let waitable = waitable.unwrap_or(0); 136 assert!(waitable < (1 << 28)); 137 (waitable << 4) | (self as u32) 138 } 139 } 140 141 /// Corresponds to `EventCode` in the Component Model spec, plus related payload 142 /// data. 143 #[derive(Clone, Copy, Debug)] 144 enum Event { 145 None, 146 Cancelled, 147 Subtask { 148 status: Status, 149 }, 150 StreamRead { 151 code: ReturnCode, 152 pending: Option<(TypeStreamTableIndex, u32)>, 153 }, 154 StreamWrite { 155 code: ReturnCode, 156 pending: Option<(TypeStreamTableIndex, u32)>, 157 }, 158 FutureRead { 159 code: ReturnCode, 160 pending: Option<(TypeFutureTableIndex, u32)>, 161 }, 162 FutureWrite { 163 code: ReturnCode, 164 pending: Option<(TypeFutureTableIndex, u32)>, 165 }, 166 } 167 168 impl Event { 169 /// Lower this event to core Wasm integers for delivery to the guest. 170 /// 171 /// Note that the waitable handle, if any, is assumed to be lowered 172 /// separately. 173 fn parts(self) -> (u32, u32) { 174 const EVENT_NONE: u32 = 0; 175 const EVENT_SUBTASK: u32 = 1; 176 const EVENT_STREAM_READ: u32 = 2; 177 const EVENT_STREAM_WRITE: u32 = 3; 178 const EVENT_FUTURE_READ: u32 = 4; 179 const EVENT_FUTURE_WRITE: u32 = 5; 180 const EVENT_CANCELLED: u32 = 6; 181 match self { 182 Event::None => (EVENT_NONE, 0), 183 Event::Cancelled => (EVENT_CANCELLED, 0), 184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32), 185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), 186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), 187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), 188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), 189 } 190 } 191 } 192 193 /// Corresponds to `CallbackCode` in the spec. 194 mod callback_code { 195 pub const EXIT: u32 = 0; 196 pub const YIELD: u32 = 1; 197 pub const WAIT: u32 = 2; 198 } 199 200 /// A flag indicating that the callee is an async-lowered export. 201 /// 202 /// This may be passed to the `async-start` intrinsic from a fused adapter. 203 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; 204 205 /// Provides access to either store data (via the `get` method) or the store 206 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component 207 /// instance to which the current host task belongs. 208 /// 209 /// See [`Accessor::with`] for details. 210 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> { 211 store: StoreContextMut<'a, T>, 212 get_data: fn(&mut T) -> D::Data<'_>, 213 } 214 215 impl<'a, T, D> Access<'a, T, D> 216 where 217 D: HasData + ?Sized, 218 T: 'static, 219 { 220 /// Creates a new [`Access`] from its component parts. 221 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { 222 Self { store, get_data } 223 } 224 225 /// Get mutable access to the store data. 226 pub fn data_mut(&mut self) -> &mut T { 227 self.store.data_mut() 228 } 229 230 /// Get mutable access to the store data. 231 pub fn get(&mut self) -> D::Data<'_> { 232 (self.get_data)(self.data_mut()) 233 } 234 235 /// Spawn a background task. 236 /// 237 /// See [`Accessor::spawn`] for details. 238 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle 239 where 240 T: 'static, 241 { 242 let accessor = Accessor { 243 get_data: self.get_data, 244 token: StoreToken::new(self.store.as_context_mut()), 245 }; 246 self.store 247 .as_context_mut() 248 .spawn_with_accessor(accessor, task) 249 } 250 251 /// Returns the getter this accessor is using to project from `T` into 252 /// `D::Data`. 253 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 254 self.get_data 255 } 256 } 257 258 impl<'a, T, D> AsContext for Access<'a, T, D> 259 where 260 D: HasData + ?Sized, 261 T: 'static, 262 { 263 type Data = T; 264 265 fn as_context(&self) -> StoreContext<'_, T> { 266 self.store.as_context() 267 } 268 } 269 270 impl<'a, T, D> AsContextMut for Access<'a, T, D> 271 where 272 D: HasData + ?Sized, 273 T: 'static, 274 { 275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { 276 self.store.as_context_mut() 277 } 278 } 279 280 /// Provides scoped mutable access to store data in the context of a concurrent 281 /// host task future. 282 /// 283 /// This allows multiple host task futures to execute concurrently and access 284 /// the store between (but not across) `await` points. 285 /// 286 /// # Rationale 287 /// 288 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to 289 /// `D::Data<'_>`. The problem this is solving, however, is that it does not 290 /// literally store these values. The basic problem is that when a concurrent 291 /// host future is being polled it has access to `&mut T` (and the whole 292 /// `Store`) but when it's not being polled it does not have access to these 293 /// values. This reflects how the store is only ever polling one future at a 294 /// time so the store is effectively being passed between futures. 295 /// 296 /// Rust's `Future` trait, however, has no means of passing a `Store` 297 /// temporarily between futures. The [`Context`](std::task::Context) type does 298 /// not have the ability to attach arbitrary information to it at this time. 299 /// This type, [`Accessor`], is used to bridge this expressivity gap. 300 /// 301 /// The [`Accessor`] type here represents the ability to acquire, temporarily in 302 /// a synchronous manner, the current store. The [`Accessor::with`] function 303 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut 304 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does 305 /// not take an `async` closure as its argument, instead it's a synchronous 306 /// closure which must complete during on run of `Future::poll`. This reflects 307 /// how the store is temporarily made available while a host future is being 308 /// polled. 309 /// 310 /// # Implementation 311 /// 312 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and 313 /// this type additionally doesn't even have a lifetime parameter. This is 314 /// instead a representation of proof of the ability to acquire these while a 315 /// future is being polled. Wasmtime will, when it polls a host future, 316 /// configure ambient state such that the `Accessor` that a future closes over 317 /// will work and be able to access the store. 318 /// 319 /// This has a number of implications for users such as: 320 /// 321 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within 322 /// the lifetime of a single future. 323 /// * A future is expected to, however, close over an `Accessor` and keep it 324 /// alive probably for the duration of the entire future. 325 /// * Different host futures will be given different `Accessor`s, and that's 326 /// intentional. 327 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which 328 /// alleviates some otherwise required bounds to be written down. 329 /// 330 /// # Using `Accessor` in `Drop` 331 /// 332 /// The methods on `Accessor` are only expected to work in the context of 333 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a 334 /// host future can be dropped at any time throughout the system and Wasmtime 335 /// store context is not necessarily available at that time. It's recommended to 336 /// not use `Accessor` methods in anything connected to a `Drop` implementation 337 /// as they will panic and have unintended results. If you run into this though 338 /// feel free to file an issue on the Wasmtime repository. 339 pub struct Accessor<T: 'static, D = HasSelf<T>> 340 where 341 D: HasData + ?Sized, 342 { 343 token: StoreToken<T>, 344 get_data: fn(&mut T) -> D::Data<'_>, 345 } 346 347 /// A helper trait to take any type of accessor-with-data in functions. 348 /// 349 /// This trait is similar to [`AsContextMut`] except that it's used when 350 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The 351 /// [`Accessor`] is the main type used in concurrent settings and is passed to 352 /// functions such as [`Func::call_concurrent`] or [`FutureWriter::write`]. 353 /// 354 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements 355 /// this trait. This effectively means that regardless of the `D` in 356 /// `Accessor<T, D>` it can still be passed to a function which just needs a 357 /// store accessor. 358 /// 359 /// Acquiring an [`Accessor`] can be done through 360 /// [`StoreContextMut::run_concurrent`] for example or in a host function 361 /// through 362 /// [`Linker::func_wrap_concurrent`](crate::component::Linker::func_wrap_concurrent). 363 pub trait AsAccessor { 364 /// The `T` in `Store<T>` that this accessor refers to. 365 type Data: 'static; 366 367 /// The `D` in `Accessor<T, D>`, or the projection out of 368 /// `Self::Data`. 369 type AccessorData: HasData + ?Sized; 370 371 /// Returns the accessor that this is referring to. 372 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>; 373 } 374 375 impl<T: AsAccessor + ?Sized> AsAccessor for &T { 376 type Data = T::Data; 377 type AccessorData = T::AccessorData; 378 379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> { 380 T::as_accessor(self) 381 } 382 } 383 384 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> { 385 type Data = T; 386 type AccessorData = D; 387 388 fn as_accessor(&self) -> &Accessor<T, D> { 389 self 390 } 391 } 392 393 // Note that it is intentional at this time that `Accessor` does not actually 394 // store `&mut T` or anything similar. This distinctly enables the `Accessor` 395 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for 396 // that matter). This is used to ergonomically simplify bindings where the 397 // majority of the time `Accessor` is closed over in a future which then needs 398 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as 399 // you already have to write `T: 'static`...) it helps to avoid this. 400 // 401 // Note as well that `Accessor` doesn't actually store its data at all. Instead 402 // it's more of a "proof" of what can be accessed from TLS. API design around 403 // `Accessor` and functions like `Linker::func_wrap_concurrent` are 404 // intentionally made to ensure that `Accessor` is ideally only used in the 405 // context that TLS variables are actually set. For example host functions are 406 // given `&Accessor`, not `Accessor`, and this prevents them from persisting 407 // the value outside of a future. Within the future the TLS variables are all 408 // guaranteed to be set while the future is being polled. 409 // 410 // Finally though this is not an ironclad guarantee, but nor does it need to be. 411 // The TLS APIs are designed to panic or otherwise model usage where they're 412 // called recursively or similar. It's hoped that code cannot be constructed to 413 // actually hit this at runtime but this is not a safety requirement at this 414 // time. 415 const _: () = { 416 const fn assert<T: Send + Sync>() {} 417 assert::<Accessor<UnsafeCell<u32>>>(); 418 }; 419 420 impl<T> Accessor<T> { 421 /// Creates a new `Accessor` backed by the specified functions. 422 /// 423 /// - `get`: used to retrieve the store 424 /// 425 /// - `get_data`: used to "project" from the store's associated data to 426 /// another type (e.g. a field of that data or a wrapper around it). 427 /// 428 /// - `spawn`: used to queue spawned background tasks to be run later 429 pub(crate) fn new(token: StoreToken<T>) -> Self { 430 Self { 431 token, 432 get_data: |x| x, 433 } 434 } 435 } 436 437 impl<T, D> Accessor<T, D> 438 where 439 D: HasData + ?Sized, 440 { 441 /// Run the specified closure, passing it mutable access to the store. 442 /// 443 /// This function is one of the main building blocks of the [`Accessor`] 444 /// type. This yields synchronous, blocking, access to store via an 445 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to 446 /// providing the ability to access `D` via [`Access::get`]. Note that the 447 /// `fun` here is given only temporary access to the store and `T`/`D` 448 /// meaning that the return value `R` here is not allowed to capture borrows 449 /// into the two. If access is needed to data within `T` or `D` outside of 450 /// this closure then it must be `clone`d out, for example. 451 /// 452 /// # Panics 453 /// 454 /// This function will panic if it is call recursively with any other 455 /// accessor already in scope. For example if `with` is called within `fun`, 456 /// then this function will panic. It is up to the embedder to ensure that 457 /// this does not happen. 458 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { 459 tls::get(|vmstore| { 460 fun(Access { 461 store: self.token.as_context_mut(vmstore), 462 get_data: self.get_data, 463 }) 464 }) 465 } 466 467 /// Returns the getter this accessor is using to project from `T` into 468 /// `D::Data`. 469 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 470 self.get_data 471 } 472 473 /// Changes this accessor to access `D2` instead of the current type 474 /// parameter `D`. 475 /// 476 /// This changes the underlying data access from `T` to `D2::Data<'_>`. 477 /// 478 /// # Panics 479 /// 480 /// When using this API the returned value is disconnected from `&self` and 481 /// the lifetime binding the `self` argument. An `Accessor` only works 482 /// within the context of the closure or async closure that it was 483 /// originally given to, however. This means that due to the fact that the 484 /// returned value has no lifetime connection it's possible to use the 485 /// accessor outside of `&self`, the original accessor, and panic. 486 /// 487 /// The returned value should only be used within the scope of the original 488 /// `Accessor` that `self` refers to. 489 pub fn with_getter<D2: HasData>( 490 &self, 491 get_data: fn(&mut T) -> D2::Data<'_>, 492 ) -> Accessor<T, D2> { 493 Accessor { 494 token: self.token, 495 get_data, 496 } 497 } 498 499 /// Spawn a background task which will receive an `&Accessor<T, D>` and 500 /// run concurrently with any other tasks in progress for the current 501 /// store. 502 /// 503 /// This is particularly useful for host functions which return a `stream` 504 /// or `future` such that the code to write to the write end of that 505 /// `stream` or `future` must run after the function returns. 506 /// 507 /// The returned [`JoinHandle`] may be used to cancel the task. 508 /// 509 /// # Panics 510 /// 511 /// Panics if called within a closure provided to the [`Accessor::with`] 512 /// function. This can only be called outside an active invocation of 513 /// [`Accessor::with`]. 514 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle 515 where 516 T: 'static, 517 { 518 let accessor = self.clone_for_spawn(); 519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) 520 } 521 522 fn clone_for_spawn(&self) -> Self { 523 Self { 524 token: self.token, 525 get_data: self.get_data, 526 } 527 } 528 } 529 530 /// Represents a task which may be provided to `Accessor::spawn`, 531 /// `Accessor::forward`, or `StorecContextMut::spawn`. 532 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable 533 // option. 534 // 535 // As of this writing, it's not possible to specify e.g. `Send` and `Sync` 536 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: 537 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F + 538 // Send + Sync + 'static` fails with a type mismatch error when we try to pass 539 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the 540 // best we can do for the time being. 541 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static 542 where 543 D: HasData + ?Sized, 544 { 545 /// Run the task. 546 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send; 547 } 548 549 /// Represents parameter and result metadata for the caller side of a 550 /// guest->guest call orchestrated by a fused adapter. 551 enum CallerInfo { 552 /// Metadata for a call to an async-lowered import 553 Async { 554 params: Vec<ValRaw>, 555 has_result: bool, 556 }, 557 /// Metadata for a call to an sync-lowered import 558 Sync { 559 params: Vec<ValRaw>, 560 result_count: u32, 561 }, 562 } 563 564 /// Indicates how a guest task is waiting on a waitable set. 565 enum WaitMode { 566 /// The guest task is waiting using `task.wait` 567 Fiber(StoreFiber<'static>), 568 /// The guest task is waiting via a callback declared as part of an 569 /// async-lifted export. 570 Callback(Instance), 571 } 572 573 /// Represents the reason a fiber is suspending itself. 574 #[derive(Debug)] 575 enum SuspendReason { 576 /// The fiber is waiting for an event to be delivered to the specified 577 /// waitable set or task. 578 Waiting { 579 set: TableId<WaitableSet>, 580 thread: QualifiedThreadId, 581 skip_may_block_check: bool, 582 }, 583 /// The fiber has finished handling its most recent work item and is waiting 584 /// for another (or to be dropped if it is no longer needed). 585 NeedWork, 586 /// The fiber is yielding and should be resumed once other tasks have had a 587 /// chance to run. 588 Yielding { thread: QualifiedThreadId }, 589 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. 590 ExplicitlySuspending { 591 thread: QualifiedThreadId, 592 skip_may_block_check: bool, 593 }, 594 } 595 596 /// Represents a pending call into guest code for a given guest task. 597 enum GuestCallKind { 598 /// Indicates there's an event to deliver to the task, possibly related to a 599 /// waitable set the task has been waiting on or polling. 600 DeliverEvent { 601 /// The instance to which the task belongs. 602 instance: Instance, 603 /// The waitable set the event belongs to, if any. 604 /// 605 /// If this is `None` the event will be waiting in the 606 /// `GuestTask::event` field for the task. 607 set: Option<TableId<WaitableSet>>, 608 }, 609 /// Indicates that a new guest task call is pending and may be executed 610 /// using the specified closure. 611 /// 612 /// If the closure returns `Ok(Some(call))`, the `call` should be run 613 /// immediately using `handle_guest_call`. 614 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>), 615 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>), 616 } 617 618 impl fmt::Debug for GuestCallKind { 619 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 620 match self { 621 Self::DeliverEvent { instance, set } => f 622 .debug_struct("DeliverEvent") 623 .field("instance", instance) 624 .field("set", set) 625 .finish(), 626 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), 627 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), 628 } 629 } 630 } 631 632 /// Represents a pending call into guest code for a given guest thread. 633 #[derive(Debug)] 634 struct GuestCall { 635 thread: QualifiedThreadId, 636 kind: GuestCallKind, 637 } 638 639 impl GuestCall { 640 /// Returns whether or not the call is ready to run. 641 /// 642 /// A call will not be ready to run if either: 643 /// 644 /// - the (sub-)component instance to be called has already been entered and 645 /// cannot be reentered until an in-progress call completes 646 /// 647 /// - the call is for a not-yet started task and the (sub-)component 648 /// instance to be called has backpressure enabled 649 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> { 650 let instance = store 651 .concurrent_state_mut() 652 .get_mut(self.thread.task)? 653 .instance; 654 let state = store.instance_state(instance); 655 656 let ready = match &self.kind { 657 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, 658 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), 659 GuestCallKind::StartExplicit(_) => true, 660 }; 661 log::trace!( 662 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", 663 state.do_not_enter, 664 state.backpressure 665 ); 666 Ok(ready) 667 } 668 } 669 670 /// Job to be run on a worker fiber. 671 enum WorkerItem { 672 GuestCall(GuestCall), 673 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 674 } 675 676 /// Represents a pending work item to be handled by the event loop for a given 677 /// component instance. 678 enum WorkItem { 679 /// A host task to be pushed to `ConcurrentState::futures`. 680 PushFuture(AlwaysMut<HostTaskFuture>), 681 /// A fiber to resume. 682 ResumeFiber(StoreFiber<'static>), 683 /// A pending call into guest code for a given guest task. 684 GuestCall(GuestCall), 685 /// A job to run on a worker fiber. 686 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 687 } 688 689 impl fmt::Debug for WorkItem { 690 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 691 match self { 692 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), 693 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), 694 Self::GuestCall(call) => f.debug_tuple("GuestCall").field(call).finish(), 695 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), 696 } 697 } 698 } 699 700 /// Whether a suspension intrinsic was cancelled or completed 701 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 702 pub(crate) enum WaitResult { 703 Cancelled, 704 Completed, 705 } 706 707 /// Raise a trap if the calling task is synchronous and trying to block prior to 708 /// returning a value. 709 pub(crate) fn check_blocking(store: &mut dyn VMStore) -> Result<()> { 710 store.concurrent_state_mut().check_blocking() 711 } 712 713 /// Poll the specified future until it completes on behalf of a guest->host call 714 /// using a sync-lowered import. 715 /// 716 /// This is similar to `Instance::first_poll` except it's for sync-lowered 717 /// imports, meaning we don't need to handle cancellation and we can block the 718 /// caller until the task completes, at which point the caller can handle 719 /// lowering the result to the guest's stack and linear memory. 720 pub(crate) fn poll_and_block<R: Send + Sync + 'static>( 721 store: &mut dyn VMStore, 722 future: impl Future<Output = Result<R>> + Send + 'static, 723 caller_instance: RuntimeComponentInstanceIndex, 724 ) -> Result<R> { 725 let state = store.concurrent_state_mut(); 726 727 // If there is no current guest thread set, that means the host function was 728 // registered using e.g. `LinkerInstance::func_wrap`, in which case it 729 // should complete immediately. 730 let Some(caller) = state.guest_thread else { 731 return match pin!(future).poll(&mut Context::from_waker(&Waker::noop())) { 732 Poll::Ready(result) => result, 733 Poll::Pending => { 734 unreachable!() 735 } 736 }; 737 }; 738 739 // Save any existing result stashed in `GuestTask::result` so we can replace 740 // it with the new result. 741 let old_result = state 742 .get_mut(caller.task) 743 .with_context(|| format!("bad handle: {caller:?}"))? 744 .result 745 .take(); 746 747 // Add a temporary host task into the table so we can track its progress. 748 // Note that we'll never allocate a waitable handle for the guest since 749 // we're being called synchronously. 750 let task = state.push(HostTask::new(caller_instance, None))?; 751 752 log::trace!("new host task child of {caller:?}: {task:?}"); 753 754 // Wrap the future in a closure which will take care of stashing the result 755 // in `GuestTask::result` and resuming this fiber when the host task 756 // completes. 757 let mut future = Box::pin(async move { 758 let result = future.await?; 759 tls::get(move |store| { 760 let state = store.concurrent_state_mut(); 761 state.get_mut(caller.task)?.result = Some(Box::new(result) as _); 762 763 Waitable::Host(task).set_event( 764 state, 765 Some(Event::Subtask { 766 status: Status::Returned, 767 }), 768 )?; 769 770 Ok(()) 771 }) 772 }) as HostTaskFuture; 773 774 // Finally, poll the future. We can use a dummy `Waker` here because we'll 775 // add the future to `ConcurrentState::futures` and poll it automatically 776 // from the event loop if it doesn't complete immediately here. 777 let poll = tls::set(store, || { 778 future 779 .as_mut() 780 .poll(&mut Context::from_waker(&Waker::noop())) 781 }); 782 783 match poll { 784 Poll::Ready(result) => { 785 // It completed immediately; check the result and delete the task. 786 result?; 787 log::trace!("delete host task {task:?} (already ready)"); 788 store.concurrent_state_mut().delete(task)?; 789 } 790 Poll::Pending => { 791 // It did not complete immediately; add it to 792 // `ConcurrentState::futures` so it will be polled via the event 793 // loop; then use `GuestTask::sync_call_set` to wait for the task to 794 // complete, suspending the current fiber until it does so. 795 let state = store.concurrent_state_mut(); 796 state.push_future(future); 797 798 let set = state.get_mut(caller.task)?.sync_call_set; 799 Waitable::Host(task).join(state, Some(set))?; 800 801 store.suspend(SuspendReason::Waiting { 802 set, 803 thread: caller, 804 skip_may_block_check: false, 805 })?; 806 } 807 } 808 809 // Retrieve and return the result. 810 Ok(*mem::replace( 811 &mut store.concurrent_state_mut().get_mut(caller.task)?.result, 812 old_result, 813 ) 814 .unwrap() 815 .downcast() 816 .unwrap()) 817 } 818 819 /// Execute the specified guest call. 820 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { 821 let mut next = Some(call); 822 while let Some(call) = next.take() { 823 match call.kind { 824 GuestCallKind::DeliverEvent { instance, set } => { 825 let (event, waitable) = instance 826 .get_event(store, call.thread.task, set, true)? 827 .unwrap(); 828 let state = store.concurrent_state_mut(); 829 let task = state.get_mut(call.thread.task)?; 830 let runtime_instance = task.instance; 831 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 832 833 log::trace!( 834 "use callback to deliver event {event:?} to {:?} for {waitable:?}", 835 call.thread, 836 ); 837 838 let old_thread = state.guest_thread.replace(call.thread); 839 log::trace!( 840 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", 841 call.thread 842 ); 843 844 store.maybe_push_call_context(call.thread.task)?; 845 846 store.enter_instance(runtime_instance); 847 848 let callback = store 849 .concurrent_state_mut() 850 .get_mut(call.thread.task)? 851 .callback 852 .take() 853 .unwrap(); 854 855 let code = callback(store, runtime_instance.index, event, handle)?; 856 857 store 858 .concurrent_state_mut() 859 .get_mut(call.thread.task)? 860 .callback = Some(callback); 861 862 store.exit_instance(runtime_instance)?; 863 864 store.maybe_pop_call_context(call.thread.task)?; 865 866 next = instance.handle_callback_code( 867 store, 868 call.thread, 869 runtime_instance.index, 870 code, 871 )?; 872 873 store.concurrent_state_mut().guest_thread = old_thread; 874 log::trace!( 875 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" 876 ); 877 } 878 GuestCallKind::StartImplicit(fun) => { 879 next = fun(store)?; 880 } 881 GuestCallKind::StartExplicit(fun) => { 882 fun(store)?; 883 } 884 } 885 } 886 887 Ok(()) 888 } 889 890 impl<T> Store<T> { 891 /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. 892 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 893 where 894 T: Send + 'static, 895 { 896 self.as_context_mut().run_concurrent(fun).await 897 } 898 899 #[doc(hidden)] 900 pub fn assert_concurrent_state_empty(&mut self) { 901 self.as_context_mut().assert_concurrent_state_empty(); 902 } 903 904 /// Convenience wrapper for [`StoreContextMut::spawn`]. 905 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle 906 where 907 T: 'static, 908 { 909 self.as_context_mut().spawn(task) 910 } 911 } 912 913 impl<T> StoreContextMut<'_, T> { 914 /// Assert that all the relevant tables and queues in the concurrent state 915 /// for this store are empty. 916 /// 917 /// This is for sanity checking in integration tests 918 /// (e.g. `component-async-tests`) that the relevant state has been cleared 919 /// after each test concludes. This should help us catch leaks, e.g. guest 920 /// tasks which haven't been deleted despite having completed and having 921 /// been dropped by their supertasks. 922 #[doc(hidden)] 923 pub fn assert_concurrent_state_empty(self) { 924 let store = self.0; 925 store 926 .store_data_mut() 927 .components 928 .assert_instance_states_empty(); 929 let state = store.concurrent_state_mut(); 930 assert!( 931 state.table.get_mut().is_empty(), 932 "non-empty table: {:?}", 933 state.table.get_mut() 934 ); 935 assert!(state.high_priority.is_empty()); 936 assert!(state.low_priority.is_empty()); 937 assert!(state.guest_thread.is_none()); 938 assert!(state.futures.get_mut().as_ref().unwrap().is_empty()); 939 assert!(state.global_error_context_ref_counts.is_empty()); 940 } 941 942 /// Spawn a background task to run as part of this instance's event loop. 943 /// 944 /// The task will receive an `&Accessor<U>` and run concurrently with 945 /// any other tasks in progress for the instance. 946 /// 947 /// Note that the task will only make progress if and when the event loop 948 /// for this instance is run. 949 /// 950 /// The returned [`SpawnHandle`] may be used to cancel the task. 951 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle 952 where 953 T: 'static, 954 { 955 let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); 956 self.spawn_with_accessor(accessor, task) 957 } 958 959 /// Internal implementation of `spawn` functions where a `store` is 960 /// available along with an `Accessor`. 961 fn spawn_with_accessor<D>( 962 self, 963 accessor: Accessor<T, D>, 964 task: impl AccessorTask<T, D>, 965 ) -> JoinHandle 966 where 967 T: 'static, 968 D: HasData + ?Sized, 969 { 970 // Create an "abortable future" here where internally the future will 971 // hook calls to poll and possibly spawn more background tasks on each 972 // iteration. 973 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); 974 self.0 975 .concurrent_state_mut() 976 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); 977 handle 978 } 979 980 /// Run the specified closure `fun` to completion as part of this store's 981 /// event loop. 982 /// 983 /// This will run `fun` as part of this store's event loop until it 984 /// yields a result. `fun` is provided an [`Accessor`], which provides 985 /// controlled access to the store and its data. 986 /// 987 /// This function can be used to invoke [`Func::call_concurrent`] for 988 /// example within the async closure provided here. 989 /// 990 /// # Store-blocking behavior 991 /// 992 /// 993 /// 994 /// At this time there are certain situations in which the `Future` returned 995 /// by the `AsyncFnOnce` passed to this function will not be polled for an 996 /// extended period of time, despite one or more `Waker::wake` events having 997 /// occurred for the task to which it belongs. This can manifest as the 998 /// `Future` seeming to be "blocked" or "locked up", but is actually due to 999 /// the `Store` being held by e.g. a blocking host function, preventing the 1000 /// `Future` from being polled. A canonical example of this is when the 1001 /// `fun` provided to this function attempts to set a timeout for an 1002 /// invocation of a wasm function. In this situation the async closure is 1003 /// waiting both on (a) the wasm computation to finish, and (b) the timeout 1004 /// to elapse. At this time this setup will not always work and the timeout 1005 /// may not reliably fire. 1006 /// 1007 /// This function will not block the current thread and as such is always 1008 /// suitable to run in an `async` context, but the current implementation of 1009 /// Wasmtime can lead to situations where a certain wasm computation is 1010 /// required to make progress the closure to make progress. This is an 1011 /// artifact of Wasmtime's historical implementation of `async` functions 1012 /// and is the topic of [#11869] and [#11870]. In the timeout example from 1013 /// above it means that Wasmtime can get "wedged" for a bit where (a) must 1014 /// progress for a readiness notification of (b) to get delivered. 1015 /// 1016 /// This effectively means that it's not possible to reliably perform a 1017 /// "select" operation within the `fun` closure, which timeouts for example 1018 /// are based on. Fixing this requires some relatively major refactoring 1019 /// work within Wasmtime itself. This is a known pitfall otherwise and one 1020 /// that is intended to be fixed one day. In the meantime it's recommended 1021 /// to apply timeouts or such to the entire `run_concurrent` call itself 1022 /// rather than internally. 1023 /// 1024 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 1025 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 1026 /// 1027 /// # Example 1028 /// 1029 /// ``` 1030 /// # use { 1031 /// # anyhow::{Result}, 1032 /// # wasmtime::{ 1033 /// # component::{ Component, Linker, Resource, ResourceTable}, 1034 /// # Config, Engine, Store 1035 /// # }, 1036 /// # }; 1037 /// # 1038 /// # struct MyResource(u32); 1039 /// # struct Ctx { table: ResourceTable } 1040 /// # 1041 /// # async fn foo() -> Result<()> { 1042 /// # let mut config = Config::new(); 1043 /// # let engine = Engine::new(&config)?; 1044 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); 1045 /// # let mut linker = Linker::new(&engine); 1046 /// # let component = Component::new(&engine, "")?; 1047 /// # let instance = linker.instantiate_async(&mut store, &component).await?; 1048 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?; 1049 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; 1050 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { 1051 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; 1052 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0; 1053 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; 1054 /// bar.call_concurrent(accessor, (value.0,)).await?; 1055 /// Ok(()) 1056 /// }).await??; 1057 /// # Ok(()) 1058 /// # } 1059 /// ``` 1060 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 1061 where 1062 T: Send + 'static, 1063 { 1064 self.do_run_concurrent(fun, false).await 1065 } 1066 1067 pub(super) async fn run_concurrent_trap_on_idle<R>( 1068 self, 1069 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1070 ) -> Result<R> 1071 where 1072 T: Send + 'static, 1073 { 1074 self.do_run_concurrent(fun, true).await 1075 } 1076 1077 async fn do_run_concurrent<R>( 1078 mut self, 1079 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1080 trap_on_idle: bool, 1081 ) -> Result<R> 1082 where 1083 T: Send + 'static, 1084 { 1085 check_recursive_run(); 1086 let token = StoreToken::new(self.as_context_mut()); 1087 1088 struct Dropper<'a, T: 'static, V> { 1089 store: StoreContextMut<'a, T>, 1090 value: ManuallyDrop<V>, 1091 } 1092 1093 impl<'a, T, V> Drop for Dropper<'a, T, V> { 1094 fn drop(&mut self) { 1095 tls::set(self.store.0, || { 1096 // SAFETY: Here we drop the value without moving it for the 1097 // first and only time -- per the contract for `Drop::drop`, 1098 // this code won't run again, and the `value` field will no 1099 // longer be accessible. 1100 unsafe { ManuallyDrop::drop(&mut self.value) } 1101 }); 1102 } 1103 } 1104 1105 let accessor = &Accessor::new(token); 1106 let dropper = &mut Dropper { 1107 store: self, 1108 value: ManuallyDrop::new(fun(accessor)), 1109 }; 1110 // SAFETY: We never move `dropper` nor its `value` field. 1111 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; 1112 1113 dropper 1114 .store 1115 .as_context_mut() 1116 .poll_until(future, trap_on_idle) 1117 .await 1118 } 1119 1120 /// Run this store's event loop. 1121 /// 1122 /// The returned future will resolve when the specified future completes or, 1123 /// if `trap_on_idle` is true, when the event loop can't make further 1124 /// progress. 1125 async fn poll_until<R>( 1126 mut self, 1127 mut future: Pin<&mut impl Future<Output = R>>, 1128 trap_on_idle: bool, 1129 ) -> Result<R> 1130 where 1131 T: Send + 'static, 1132 { 1133 struct Reset<'a, T: 'static> { 1134 store: StoreContextMut<'a, T>, 1135 futures: Option<FuturesUnordered<HostTaskFuture>>, 1136 } 1137 1138 impl<'a, T> Drop for Reset<'a, T> { 1139 fn drop(&mut self) { 1140 if let Some(futures) = self.futures.take() { 1141 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); 1142 } 1143 } 1144 } 1145 1146 loop { 1147 // Take `ConcurrentState::futures` out of the store so we can poll 1148 // it while also safely giving any of the futures inside access to 1149 // `self`. 1150 let futures = self.0.concurrent_state_mut().futures.get_mut().take(); 1151 let mut reset = Reset { 1152 store: self.as_context_mut(), 1153 futures, 1154 }; 1155 let mut next = pin!(reset.futures.as_mut().unwrap().next()); 1156 1157 let result = future::poll_fn(|cx| { 1158 // First, poll the future we were passed as an argument and 1159 // return immediately if it's ready. 1160 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { 1161 return Poll::Ready(Ok(Either::Left(value))); 1162 } 1163 1164 // Next, poll `ConcurrentState::futures` (which includes any 1165 // pending host tasks and/or background tasks), returning 1166 // immediately if one of them fails. 1167 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { 1168 Poll::Ready(Some(output)) => { 1169 match output { 1170 Err(e) => return Poll::Ready(Err(e)), 1171 Ok(()) => {} 1172 } 1173 Poll::Ready(true) 1174 } 1175 Poll::Ready(None) => Poll::Ready(false), 1176 Poll::Pending => Poll::Pending, 1177 }; 1178 1179 // Next, check the "high priority" work queue and return 1180 // immediately if it has at least one item. 1181 let state = reset.store.0.concurrent_state_mut(); 1182 let ready = mem::take(&mut state.high_priority); 1183 let ready = if ready.is_empty() { 1184 // Next, check the "low priority" work queue and return 1185 // immediately if it has at least one item. 1186 let ready = mem::take(&mut state.low_priority); 1187 if ready.is_empty() { 1188 return match next { 1189 Poll::Ready(true) => { 1190 // In this case, one of the futures in 1191 // `ConcurrentState::futures` completed 1192 // successfully, so we return now and continue 1193 // the outer loop in case there is another one 1194 // ready to complete. 1195 Poll::Ready(Ok(Either::Right(Vec::new()))) 1196 } 1197 Poll::Ready(false) => { 1198 // Poll the future we were passed one last time 1199 // in case one of `ConcurrentState::futures` had 1200 // the side effect of unblocking it. 1201 if let Poll::Ready(value) = 1202 tls::set(reset.store.0, || future.as_mut().poll(cx)) 1203 { 1204 Poll::Ready(Ok(Either::Left(value))) 1205 } else { 1206 // In this case, there are no more pending 1207 // futures in `ConcurrentState::futures`, 1208 // there are no remaining work items, _and_ 1209 // the future we were passed as an argument 1210 // still hasn't completed. 1211 if trap_on_idle { 1212 // `trap_on_idle` is true, so we exit 1213 // immediately. 1214 Poll::Ready(Err(anyhow!(crate::Trap::AsyncDeadlock))) 1215 } else { 1216 // `trap_on_idle` is false, so we assume 1217 // that future will wake up and give us 1218 // more work to do when it's ready to. 1219 Poll::Pending 1220 } 1221 } 1222 } 1223 // There is at least one pending future in 1224 // `ConcurrentState::futures` and we have nothing 1225 // else to do but wait for now, so we return 1226 // `Pending`. 1227 Poll::Pending => Poll::Pending, 1228 }; 1229 } else { 1230 ready 1231 } 1232 } else { 1233 ready 1234 }; 1235 1236 Poll::Ready(Ok(Either::Right(ready))) 1237 }) 1238 .await; 1239 1240 // Put the `ConcurrentState::futures` back into the store before we 1241 // return or handle any work items since one or more of those items 1242 // might append more futures. 1243 drop(reset); 1244 1245 match result? { 1246 // The future we were passed as an argument completed, so we 1247 // return the result. 1248 Either::Left(value) => break Ok(value), 1249 // The future we were passed has not yet completed, so handle 1250 // any work items and then loop again. 1251 Either::Right(ready) => { 1252 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> { 1253 store: StoreContextMut<'a, T>, 1254 ready: I, 1255 } 1256 1257 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> { 1258 fn drop(&mut self) { 1259 while let Some(item) = self.ready.next() { 1260 match item { 1261 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), 1262 WorkItem::PushFuture(future) => { 1263 tls::set(self.store.0, move || drop(future)) 1264 } 1265 _ => {} 1266 } 1267 } 1268 } 1269 } 1270 1271 let mut dispose = Dispose { 1272 store: self.as_context_mut(), 1273 ready: ready.into_iter(), 1274 }; 1275 1276 while let Some(item) = dispose.ready.next() { 1277 dispose 1278 .store 1279 .as_context_mut() 1280 .handle_work_item(item) 1281 .await?; 1282 } 1283 } 1284 } 1285 } 1286 } 1287 1288 /// Handle the specified work item, possibly resuming a fiber if applicable. 1289 async fn handle_work_item(self, item: WorkItem) -> Result<()> 1290 where 1291 T: Send, 1292 { 1293 log::trace!("handle work item {item:?}"); 1294 match item { 1295 WorkItem::PushFuture(future) => { 1296 self.0 1297 .concurrent_state_mut() 1298 .futures 1299 .get_mut() 1300 .as_mut() 1301 .unwrap() 1302 .push(future.into_inner()); 1303 } 1304 WorkItem::ResumeFiber(fiber) => { 1305 self.0.resume_fiber(fiber).await?; 1306 } 1307 WorkItem::GuestCall(call) => { 1308 if call.is_ready(self.0)? { 1309 self.run_on_worker(WorkerItem::GuestCall(call)).await?; 1310 } else { 1311 let state = self.0.concurrent_state_mut(); 1312 let task = state.get_mut(call.thread.task)?; 1313 if !task.starting_sent { 1314 task.starting_sent = true; 1315 if let GuestCallKind::StartImplicit(_) = &call.kind { 1316 Waitable::Guest(call.thread.task).set_event( 1317 state, 1318 Some(Event::Subtask { 1319 status: Status::Starting, 1320 }), 1321 )?; 1322 } 1323 } 1324 1325 let instance = state.get_mut(call.thread.task)?.instance; 1326 self.0 1327 .instance_state(instance) 1328 .pending 1329 .insert(call.thread, call.kind); 1330 } 1331 } 1332 WorkItem::WorkerFunction(fun) => { 1333 self.run_on_worker(WorkerItem::Function(fun)).await?; 1334 } 1335 } 1336 1337 Ok(()) 1338 } 1339 1340 /// Execute the specified guest call on a worker fiber. 1341 async fn run_on_worker(self, item: WorkerItem) -> Result<()> 1342 where 1343 T: Send, 1344 { 1345 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { 1346 fiber 1347 } else { 1348 fiber::make_fiber(self.0, move |store| { 1349 loop { 1350 match store.concurrent_state_mut().worker_item.take().unwrap() { 1351 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, 1352 WorkerItem::Function(fun) => fun.into_inner()(store)?, 1353 } 1354 1355 store.suspend(SuspendReason::NeedWork)?; 1356 } 1357 })? 1358 }; 1359 1360 let worker_item = &mut self.0.concurrent_state_mut().worker_item; 1361 assert!(worker_item.is_none()); 1362 *worker_item = Some(item); 1363 1364 self.0.resume_fiber(worker).await 1365 } 1366 1367 /// Wrap the specified host function in a future which will call it, passing 1368 /// it an `&Accessor<T>`. 1369 /// 1370 /// See the `Accessor` documentation for details. 1371 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static 1372 where 1373 T: 'static, 1374 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> 1375 + Send 1376 + Sync 1377 + 'static, 1378 R: Send + Sync + 'static, 1379 { 1380 let token = StoreToken::new(self); 1381 async move { 1382 let mut accessor = Accessor::new(token); 1383 closure(&mut accessor).await 1384 } 1385 } 1386 } 1387 1388 #[derive(Debug, Copy, Clone)] 1389 pub struct RuntimeInstance { 1390 pub instance: ComponentInstanceId, 1391 pub index: RuntimeComponentInstanceIndex, 1392 } 1393 1394 impl StoreOpaque { 1395 /// Helper function to retrieve the `ConcurrentInstanceState` for the 1396 /// specified instance. 1397 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut ConcurrentInstanceState { 1398 StoreComponentInstanceId::new(self.id(), instance.instance) 1399 .get_mut(self) 1400 .instance_state(instance.index) 1401 .unwrap() 1402 .concurrent_state() 1403 } 1404 1405 /// Helper function to retrieve the `HandleTable` for the specified 1406 /// instance. 1407 fn handle_table(&mut self, instance: RuntimeInstance) -> &mut HandleTable { 1408 StoreComponentInstanceId::new(self.id(), instance.instance) 1409 .get_mut(self) 1410 .instance_state(instance.index) 1411 .unwrap() 1412 .handle_table() 1413 } 1414 1415 /// Record that we're about to enter a (sub-)component instance which does 1416 /// not support more than one concurrent, stackful activation, meaning it 1417 /// cannot be entered again until the next call returns. 1418 fn enter_instance(&mut self, instance: RuntimeInstance) { 1419 log::trace!("enter {instance:?}"); 1420 self.instance_state(instance).do_not_enter = true; 1421 } 1422 1423 /// Record that we've exited a (sub-)component instance previously entered 1424 /// with `Self::enter_instance` and then calls `Self::partition_pending`. 1425 /// See the documentation for the latter for details. 1426 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> { 1427 log::trace!("exit {instance:?}"); 1428 self.instance_state(instance).do_not_enter = false; 1429 self.partition_pending(instance) 1430 } 1431 1432 /// Iterate over `InstanceState::pending`, moving any ready items into the 1433 /// "high priority" work item queue. 1434 /// 1435 /// See `GuestCall::is_ready` for details. 1436 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { 1437 for (thread, kind) in mem::take(&mut self.instance_state(instance).pending).into_iter() { 1438 let call = GuestCall { thread, kind }; 1439 if call.is_ready(self)? { 1440 self.concurrent_state_mut() 1441 .push_high_priority(WorkItem::GuestCall(call)); 1442 } else { 1443 self.instance_state(instance) 1444 .pending 1445 .insert(call.thread, call.kind); 1446 } 1447 } 1448 1449 Ok(()) 1450 } 1451 1452 /// Implements the `backpressure.{inc,dec}` intrinsics. 1453 pub(crate) fn backpressure_modify( 1454 &mut self, 1455 caller_instance: RuntimeInstance, 1456 modify: impl FnOnce(u16) -> Option<u16>, 1457 ) -> Result<()> { 1458 let state = self.instance_state(caller_instance); 1459 let old = state.backpressure; 1460 let new = modify(old).ok_or_else(|| anyhow!("backpressure counter overflow"))?; 1461 state.backpressure = new; 1462 1463 if old > 0 && new == 0 { 1464 // Backpressure was previously enabled and is now disabled; move any 1465 // newly-eligible guest calls to the "high priority" queue. 1466 self.partition_pending(caller_instance)?; 1467 } 1468 1469 Ok(()) 1470 } 1471 1472 /// Resume the specified fiber, giving it exclusive access to the specified 1473 /// store. 1474 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { 1475 let old_thread = self.concurrent_state_mut().guest_thread; 1476 log::trace!("resume_fiber: save current thread {old_thread:?}"); 1477 1478 let fiber = fiber::resolve_or_release(self, fiber).await?; 1479 1480 let state = self.concurrent_state_mut(); 1481 1482 state.guest_thread = old_thread; 1483 if let Some(ref ot) = old_thread { 1484 state.get_mut(ot.thread)?.state = GuestThreadState::Running; 1485 } 1486 log::trace!("resume_fiber: restore current thread {old_thread:?}"); 1487 1488 if let Some(mut fiber) = fiber { 1489 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); 1490 // See the `SuspendReason` documentation for what each case means. 1491 match state.suspend_reason.take().unwrap() { 1492 SuspendReason::NeedWork => { 1493 if state.worker.is_none() { 1494 state.worker = Some(fiber); 1495 } else { 1496 fiber.dispose(self); 1497 } 1498 } 1499 SuspendReason::Yielding { thread, .. } => { 1500 state.get_mut(thread.thread)?.state = GuestThreadState::Pending; 1501 state.push_low_priority(WorkItem::ResumeFiber(fiber)); 1502 } 1503 SuspendReason::ExplicitlySuspending { thread, .. } => { 1504 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); 1505 } 1506 SuspendReason::Waiting { set, thread, .. } => { 1507 let old = state 1508 .get_mut(set)? 1509 .waiting 1510 .insert(thread, WaitMode::Fiber(fiber)); 1511 assert!(old.is_none()); 1512 } 1513 }; 1514 } else { 1515 log::trace!("resume_fiber: fiber has exited"); 1516 } 1517 1518 Ok(()) 1519 } 1520 1521 /// Suspend the current fiber, storing the reason in 1522 /// `ConcurrentState::suspend_reason` to indicate the conditions under which 1523 /// it should be resumed. 1524 /// 1525 /// See the `SuspendReason` documentation for details. 1526 fn suspend(&mut self, reason: SuspendReason) -> Result<()> { 1527 log::trace!("suspend fiber: {reason:?}"); 1528 1529 // If we're yielding or waiting on behalf of a guest thread, we'll need to 1530 // pop the call context which manages resource borrows before suspending 1531 // and then push it again once we've resumed. 1532 let task = match &reason { 1533 SuspendReason::Yielding { thread, .. } 1534 | SuspendReason::Waiting { thread, .. } 1535 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), 1536 SuspendReason::NeedWork => None, 1537 }; 1538 1539 let old_guest_thread = if let Some(task) = task { 1540 self.maybe_pop_call_context(task)?; 1541 self.concurrent_state_mut().guest_thread 1542 } else { 1543 None 1544 }; 1545 1546 // We should not have reached here unless either there's no current 1547 // task, or the current task is permitted to block. In addition, we 1548 // special-case `thread.switch-to` and waiting for a subtask to go from 1549 // `starting` to `started`, both of which we consider non-blocking 1550 // operations despite requiring a suspend. 1551 assert!( 1552 matches!( 1553 reason, 1554 SuspendReason::ExplicitlySuspending { 1555 skip_may_block_check: true, 1556 .. 1557 } | SuspendReason::Waiting { 1558 skip_may_block_check: true, 1559 .. 1560 } 1561 ) || old_guest_thread 1562 .map(|thread| self.concurrent_state_mut().may_block(thread.task)) 1563 .unwrap_or(true) 1564 ); 1565 1566 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; 1567 assert!(suspend_reason.is_none()); 1568 *suspend_reason = Some(reason); 1569 1570 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; 1571 1572 if let Some(task) = task { 1573 self.concurrent_state_mut().guest_thread = old_guest_thread; 1574 self.maybe_push_call_context(task)?; 1575 } 1576 1577 Ok(()) 1578 } 1579 1580 /// Push the call context for managing resource borrows for the specified 1581 /// guest task if it has not yet either returned a result or cancelled 1582 /// itself. 1583 fn maybe_push_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> { 1584 let task = self.concurrent_state_mut().get_mut(guest_task)?; 1585 1586 if !task.returned_or_cancelled() { 1587 log::trace!("push call context for {guest_task:?}"); 1588 let call_context = task.call_context.take().unwrap(); 1589 self.component_resource_state().0.push(call_context); 1590 } 1591 Ok(()) 1592 } 1593 1594 /// Pop the call context for managing resource borrows for the specified 1595 /// guest task if it has not yet either returned a result or cancelled 1596 /// itself. 1597 fn maybe_pop_call_context(&mut self, guest_task: TableId<GuestTask>) -> Result<()> { 1598 if !self 1599 .concurrent_state_mut() 1600 .get_mut(guest_task)? 1601 .returned_or_cancelled() 1602 { 1603 log::trace!("pop call context for {guest_task:?}"); 1604 let call_context = Some(self.component_resource_state().0.pop().unwrap()); 1605 self.concurrent_state_mut() 1606 .get_mut(guest_task)? 1607 .call_context = call_context; 1608 } 1609 Ok(()) 1610 } 1611 1612 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { 1613 let state = self.concurrent_state_mut(); 1614 let caller = state.guest_thread.unwrap(); 1615 let old_set = waitable.common(state)?.set; 1616 let set = state.get_mut(caller.task)?.sync_call_set; 1617 waitable.join(state, Some(set))?; 1618 self.suspend(SuspendReason::Waiting { 1619 set, 1620 thread: caller, 1621 skip_may_block_check: false, 1622 })?; 1623 let state = self.concurrent_state_mut(); 1624 waitable.join(state, old_set) 1625 } 1626 } 1627 1628 impl Instance { 1629 /// Get the next pending event for the specified task and (optional) 1630 /// waitable set, along with the waitable handle if applicable. 1631 fn get_event( 1632 self, 1633 store: &mut StoreOpaque, 1634 guest_task: TableId<GuestTask>, 1635 set: Option<TableId<WaitableSet>>, 1636 cancellable: bool, 1637 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> { 1638 let state = store.concurrent_state_mut(); 1639 1640 if let Some(event) = state.get_mut(guest_task)?.event.take() { 1641 log::trace!("deliver event {event:?} to {guest_task:?}"); 1642 1643 if cancellable || !matches!(event, Event::Cancelled) { 1644 return Ok(Some((event, None))); 1645 } else { 1646 state.get_mut(guest_task)?.event = Some(event); 1647 } 1648 } 1649 1650 Ok( 1651 if let Some((set, waitable)) = set 1652 .and_then(|set| { 1653 state 1654 .get_mut(set) 1655 .map(|v| v.ready.pop_first().map(|v| (set, v))) 1656 .transpose() 1657 }) 1658 .transpose()? 1659 { 1660 let common = waitable.common(state)?; 1661 let handle = common.handle.unwrap(); 1662 let event = common.event.take().unwrap(); 1663 1664 log::trace!( 1665 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" 1666 ); 1667 1668 waitable.on_delivery(store, self, event); 1669 1670 Some((event, Some((waitable, handle)))) 1671 } else { 1672 None 1673 }, 1674 ) 1675 } 1676 1677 /// Handle the `CallbackCode` returned from an async-lifted export or its 1678 /// callback. 1679 /// 1680 /// If this returns `Ok(Some(call))`, then `call` should be run immediately 1681 /// using `handle_guest_call`. 1682 fn handle_callback_code( 1683 self, 1684 store: &mut StoreOpaque, 1685 guest_thread: QualifiedThreadId, 1686 runtime_instance: RuntimeComponentInstanceIndex, 1687 code: u32, 1688 ) -> Result<Option<GuestCall>> { 1689 let (code, set) = unpack_callback_code(code); 1690 1691 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); 1692 1693 let state = store.concurrent_state_mut(); 1694 1695 let get_set = |store: &mut StoreOpaque, handle| { 1696 if handle == 0 { 1697 bail!("invalid waitable-set handle"); 1698 } 1699 1700 let set = store 1701 .handle_table(RuntimeInstance { 1702 instance: self.id().instance(), 1703 index: runtime_instance, 1704 }) 1705 .waitable_set_rep(handle)?; 1706 1707 Ok(TableId::<WaitableSet>::new(set)) 1708 }; 1709 1710 Ok(match code { 1711 callback_code::EXIT => { 1712 log::trace!("implicit thread {guest_thread:?} completed"); 1713 self.cleanup_thread(store, guest_thread, runtime_instance)?; 1714 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1715 if task.threads.is_empty() && !task.returned_or_cancelled() { 1716 bail!(Trap::NoAsyncResult); 1717 } 1718 match &task.caller { 1719 Caller::Host { .. } => { 1720 if task.ready_to_delete() { 1721 Waitable::Guest(guest_thread.task) 1722 .delete_from(store.concurrent_state_mut())?; 1723 } 1724 } 1725 Caller::Guest { .. } => { 1726 task.exited = true; 1727 task.callback = None; 1728 } 1729 } 1730 None 1731 } 1732 callback_code::YIELD => { 1733 let task = state.get_mut(guest_thread.task)?; 1734 // If an `Event::Cancelled` is pending, we'll deliver that; 1735 // otherwise, we'll deliver `Event::None`. Note that 1736 // `GuestTask::event` is only ever set to one of those two 1737 // `Event` variants. 1738 if let Some(event) = task.event { 1739 assert!(matches!(event, Event::None | Event::Cancelled)); 1740 } else { 1741 task.event = Some(Event::None); 1742 } 1743 let call = GuestCall { 1744 thread: guest_thread, 1745 kind: GuestCallKind::DeliverEvent { 1746 instance: self, 1747 set: None, 1748 }, 1749 }; 1750 if state.may_block(guest_thread.task) { 1751 // Push this thread onto the "low priority" queue so it runs 1752 // after any other threads have had a chance to run. 1753 state.push_low_priority(WorkItem::GuestCall(call)); 1754 None 1755 } else { 1756 // Yielding in a non-blocking context is defined as a no-op 1757 // according to the spec, so we must run this thread 1758 // immediately without allowing any others to run. 1759 Some(call) 1760 } 1761 } 1762 callback_code::WAIT => { 1763 // The task may only return `WAIT` if it was created for a call 1764 // to an async export). Otherwise, we'll trap. 1765 state.check_blocking_for(guest_thread.task)?; 1766 1767 let set = get_set(store, set)?; 1768 let state = store.concurrent_state_mut(); 1769 1770 if state.get_mut(guest_thread.task)?.event.is_some() 1771 || !state.get_mut(set)?.ready.is_empty() 1772 { 1773 // An event is immediately available; deliver it ASAP. 1774 state.push_high_priority(WorkItem::GuestCall(GuestCall { 1775 thread: guest_thread, 1776 kind: GuestCallKind::DeliverEvent { 1777 instance: self, 1778 set: Some(set), 1779 }, 1780 })); 1781 } else { 1782 // No event is immediately available. 1783 // 1784 // We're waiting, so register to be woken up when an event 1785 // is published for this waitable set. 1786 // 1787 // Here we also set `GuestTask::wake_on_cancel` which allows 1788 // `subtask.cancel` to interrupt the wait. 1789 let old = state 1790 .get_mut(guest_thread.thread)? 1791 .wake_on_cancel 1792 .replace(set); 1793 assert!(old.is_none()); 1794 let old = state 1795 .get_mut(set)? 1796 .waiting 1797 .insert(guest_thread, WaitMode::Callback(self)); 1798 assert!(old.is_none()); 1799 } 1800 None 1801 } 1802 _ => bail!("unsupported callback code: {code}"), 1803 }) 1804 } 1805 1806 fn cleanup_thread( 1807 self, 1808 store: &mut StoreOpaque, 1809 guest_thread: QualifiedThreadId, 1810 runtime_instance: RuntimeComponentInstanceIndex, 1811 ) -> Result<()> { 1812 let guest_id = store 1813 .concurrent_state_mut() 1814 .get_mut(guest_thread.thread)? 1815 .instance_rep; 1816 store 1817 .handle_table(RuntimeInstance { 1818 instance: self.id().instance(), 1819 index: runtime_instance, 1820 }) 1821 .guest_thread_remove(guest_id.unwrap())?; 1822 1823 store.concurrent_state_mut().delete(guest_thread.thread)?; 1824 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1825 task.threads.remove(&guest_thread.thread); 1826 Ok(()) 1827 } 1828 1829 /// Add the specified guest call to the "high priority" work item queue, to 1830 /// be started as soon as backpressure and/or reentrance rules allow. 1831 /// 1832 /// SAFETY: The raw pointer arguments must be valid references to guest 1833 /// functions (with the appropriate signatures) when the closures queued by 1834 /// this function are called. 1835 unsafe fn queue_call<T: 'static>( 1836 self, 1837 mut store: StoreContextMut<T>, 1838 guest_thread: QualifiedThreadId, 1839 callee: SendSyncPtr<VMFuncRef>, 1840 param_count: usize, 1841 result_count: usize, 1842 flags: Option<InstanceFlags>, 1843 async_: bool, 1844 callback: Option<SendSyncPtr<VMFuncRef>>, 1845 post_return: Option<SendSyncPtr<VMFuncRef>>, 1846 ) -> Result<()> { 1847 /// Return a closure which will call the specified function in the scope 1848 /// of the specified task. 1849 /// 1850 /// This will use `GuestTask::lower_params` to lower the parameters, but 1851 /// will not lift the result; instead, it returns a 1852 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if 1853 /// any, may be lifted. Note that an async-lifted export will have 1854 /// returned its result using the `task.return` intrinsic (or not 1855 /// returned a result at all, in the case of `task.cancel`), in which 1856 /// case the "result" of this call will either be a callback code or 1857 /// nothing. 1858 /// 1859 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when 1860 /// the returned closure is called. 1861 unsafe fn make_call<T: 'static>( 1862 store: StoreContextMut<T>, 1863 guest_thread: QualifiedThreadId, 1864 callee: SendSyncPtr<VMFuncRef>, 1865 param_count: usize, 1866 result_count: usize, 1867 flags: Option<InstanceFlags>, 1868 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]> 1869 + Send 1870 + Sync 1871 + 'static 1872 + use<T> { 1873 let token = StoreToken::new(store); 1874 move |store: &mut dyn VMStore| { 1875 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; 1876 1877 store 1878 .concurrent_state_mut() 1879 .get_mut(guest_thread.thread)? 1880 .state = GuestThreadState::Running; 1881 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1882 let may_enter_after_call = task.call_post_return_automatically(); 1883 let lower = task.lower_params.take().unwrap(); 1884 1885 lower(store, &mut storage[..param_count])?; 1886 1887 let mut store = token.as_context_mut(store); 1888 1889 // SAFETY: Per the contract documented in `make_call's` 1890 // documentation, `callee` must be a valid pointer. 1891 unsafe { 1892 if let Some(mut flags) = flags { 1893 flags.set_may_enter(false); 1894 } 1895 crate::Func::call_unchecked_raw( 1896 &mut store, 1897 callee.as_non_null(), 1898 NonNull::new( 1899 &mut storage[..param_count.max(result_count)] 1900 as *mut [MaybeUninit<ValRaw>] as _, 1901 ) 1902 .unwrap(), 1903 )?; 1904 if let Some(mut flags) = flags { 1905 flags.set_may_enter(may_enter_after_call); 1906 } 1907 } 1908 1909 Ok(storage) 1910 } 1911 } 1912 1913 // SAFETY: Per the contract described in this function documentation, 1914 // the `callee` pointer which `call` closes over must be valid when 1915 // called by the closure we queue below. 1916 let call = unsafe { 1917 make_call( 1918 store.as_context_mut(), 1919 guest_thread, 1920 callee, 1921 param_count, 1922 result_count, 1923 flags, 1924 ) 1925 }; 1926 1927 let callee_instance = store 1928 .0 1929 .concurrent_state_mut() 1930 .get_mut(guest_thread.task)? 1931 .instance; 1932 1933 let fun = if callback.is_some() { 1934 assert!(async_); 1935 1936 Box::new(move |store: &mut dyn VMStore| { 1937 self.add_guest_thread_to_instance_table( 1938 guest_thread.thread, 1939 store, 1940 callee_instance.index, 1941 )?; 1942 let old_thread = store 1943 .concurrent_state_mut() 1944 .guest_thread 1945 .replace(guest_thread); 1946 log::trace!( 1947 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" 1948 ); 1949 1950 store.maybe_push_call_context(guest_thread.task)?; 1951 1952 store.enter_instance(callee_instance); 1953 1954 // SAFETY: See the documentation for `make_call` to review the 1955 // contract we must uphold for `call` here. 1956 // 1957 // Per the contract described in the `queue_call` 1958 // documentation, the `callee` pointer which `call` closes 1959 // over must be valid. 1960 let storage = call(store)?; 1961 1962 store.exit_instance(callee_instance)?; 1963 1964 store.maybe_pop_call_context(guest_thread.task)?; 1965 1966 let state = store.concurrent_state_mut(); 1967 state.guest_thread = old_thread; 1968 old_thread 1969 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 1970 log::trace!("stackless call: restored {old_thread:?} as current thread"); 1971 1972 // SAFETY: `wasmparser` will have validated that the callback 1973 // function returns a `i32` result. 1974 let code = unsafe { storage[0].assume_init() }.get_i32() as u32; 1975 1976 self.handle_callback_code(store, guest_thread, callee_instance.index, code) 1977 }) 1978 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync> 1979 } else { 1980 let token = StoreToken::new(store.as_context_mut()); 1981 Box::new(move |store: &mut dyn VMStore| { 1982 self.add_guest_thread_to_instance_table( 1983 guest_thread.thread, 1984 store, 1985 callee_instance.index, 1986 )?; 1987 let old_thread = store 1988 .concurrent_state_mut() 1989 .guest_thread 1990 .replace(guest_thread); 1991 log::trace!( 1992 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", 1993 ); 1994 let mut flags = self.id().get(store).instance_flags(callee_instance.index); 1995 1996 store.maybe_push_call_context(guest_thread.task)?; 1997 1998 // Unless this is a callback-less (i.e. stackful) 1999 // async-lifted export, we need to record that the instance 2000 // cannot be entered until the call returns. 2001 if !async_ { 2002 store.enter_instance(callee_instance); 2003 } 2004 2005 // SAFETY: See the documentation for `make_call` to review the 2006 // contract we must uphold for `call` here. 2007 // 2008 // Per the contract described in the `queue_call` 2009 // documentation, the `callee` pointer which `call` closes 2010 // over must be valid. 2011 let storage = call(store)?; 2012 2013 // This is a callback-less call, so the implicit thread has now completed 2014 self.cleanup_thread(store, guest_thread, callee_instance.index)?; 2015 2016 if async_ { 2017 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2018 if task.threads.is_empty() && !task.returned_or_cancelled() { 2019 bail!(Trap::NoAsyncResult); 2020 } 2021 } else { 2022 // This is a sync-lifted export, so now is when we lift the 2023 // result, optionally call the post-return function, if any, 2024 // and finally notify any current or future waiters that the 2025 // subtask has returned. 2026 2027 let lift = { 2028 store.exit_instance(callee_instance)?; 2029 2030 let state = store.concurrent_state_mut(); 2031 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2032 2033 state 2034 .get_mut(guest_thread.task)? 2035 .lift_result 2036 .take() 2037 .unwrap() 2038 }; 2039 2040 // SAFETY: `result_count` represents the number of core Wasm 2041 // results returned, per `wasmparser`. 2042 let result = (lift.lift)(store, unsafe { 2043 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>( 2044 &storage[..result_count], 2045 ) 2046 })?; 2047 2048 let post_return_arg = match result_count { 2049 0 => ValRaw::i32(0), 2050 // SAFETY: `result_count` represents the number of 2051 // core Wasm results returned, per `wasmparser`. 2052 1 => unsafe { storage[0].assume_init() }, 2053 _ => unreachable!(), 2054 }; 2055 2056 if store 2057 .concurrent_state_mut() 2058 .get_mut(guest_thread.task)? 2059 .call_post_return_automatically() 2060 { 2061 unsafe { 2062 flags.set_may_leave(false); 2063 flags.set_needs_post_return(false); 2064 } 2065 2066 if let Some(func) = post_return { 2067 let mut store = token.as_context_mut(store); 2068 2069 // SAFETY: `func` is a valid `*mut VMFuncRef` from 2070 // either `wasmtime-cranelift`-generated fused adapter 2071 // code or `component::Options`. Per `wasmparser` 2072 // post-return signature validation, we know it takes a 2073 // single parameter. 2074 unsafe { 2075 crate::Func::call_unchecked_raw( 2076 &mut store, 2077 func.as_non_null(), 2078 slice::from_ref(&post_return_arg).into(), 2079 )?; 2080 } 2081 } 2082 2083 unsafe { 2084 flags.set_may_leave(true); 2085 flags.set_may_enter(true); 2086 } 2087 } 2088 2089 self.task_complete( 2090 store, 2091 guest_thread.task, 2092 result, 2093 Status::Returned, 2094 post_return_arg, 2095 )?; 2096 } 2097 2098 store.maybe_pop_call_context(guest_thread.task)?; 2099 2100 let state = store.concurrent_state_mut(); 2101 let task = state.get_mut(guest_thread.task)?; 2102 2103 match &task.caller { 2104 Caller::Host { .. } => { 2105 if task.ready_to_delete() { 2106 Waitable::Guest(guest_thread.task).delete_from(state)?; 2107 } 2108 } 2109 Caller::Guest { .. } => { 2110 task.exited = true; 2111 } 2112 } 2113 2114 Ok(None) 2115 }) 2116 }; 2117 2118 store 2119 .0 2120 .concurrent_state_mut() 2121 .push_high_priority(WorkItem::GuestCall(GuestCall { 2122 thread: guest_thread, 2123 kind: GuestCallKind::StartImplicit(fun), 2124 })); 2125 2126 Ok(()) 2127 } 2128 2129 /// Prepare (but do not start) a guest->guest call. 2130 /// 2131 /// This is called from fused adapter code generated in 2132 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` 2133 /// are synthesized Wasm functions which move the parameters from the caller 2134 /// to the callee and the result from the callee to the caller, 2135 /// respectively. The adapter will call `Self::start_call` immediately 2136 /// after calling this function. 2137 /// 2138 /// SAFETY: All the pointer arguments must be valid pointers to guest 2139 /// entities (and with the expected signatures for the function references 2140 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). 2141 unsafe fn prepare_call<T: 'static>( 2142 self, 2143 mut store: StoreContextMut<T>, 2144 start: *mut VMFuncRef, 2145 return_: *mut VMFuncRef, 2146 caller_instance: RuntimeComponentInstanceIndex, 2147 callee_instance: RuntimeComponentInstanceIndex, 2148 task_return_type: TypeTupleIndex, 2149 callee_async: bool, 2150 memory: *mut VMMemoryDefinition, 2151 string_encoding: u8, 2152 caller_info: CallerInfo, 2153 ) -> Result<()> { 2154 self.id().get(store.0).check_may_leave(caller_instance)?; 2155 2156 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { 2157 // A task may only call an async-typed function via a sync lower if 2158 // it was created by a call to an async export. Otherwise, we'll 2159 // trap. 2160 store.0.concurrent_state_mut().check_blocking()?; 2161 } 2162 2163 enum ResultInfo { 2164 Heap { results: u32 }, 2165 Stack { result_count: u32 }, 2166 } 2167 2168 let result_info = match &caller_info { 2169 CallerInfo::Async { 2170 has_result: true, 2171 params, 2172 } => ResultInfo::Heap { 2173 results: params.last().unwrap().get_u32(), 2174 }, 2175 CallerInfo::Async { 2176 has_result: false, .. 2177 } => ResultInfo::Stack { result_count: 0 }, 2178 CallerInfo::Sync { 2179 result_count, 2180 params, 2181 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap { 2182 results: params.last().unwrap().get_u32(), 2183 }, 2184 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { 2185 result_count: *result_count, 2186 }, 2187 }; 2188 2189 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); 2190 2191 // Create a new guest task for the call, closing over the `start` and 2192 // `return_` functions to lift the parameters and lower the result, 2193 // respectively. 2194 let start = SendSyncPtr::new(NonNull::new(start).unwrap()); 2195 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap()); 2196 let token = StoreToken::new(store.as_context_mut()); 2197 let state = store.0.concurrent_state_mut(); 2198 let old_thread = state.guest_thread.take(); 2199 let new_task = GuestTask::new( 2200 state, 2201 Box::new(move |store, dst| { 2202 let mut store = token.as_context_mut(store); 2203 assert!(dst.len() <= MAX_FLAT_PARAMS); 2204 // The `+ 1` here accounts for the return pointer, if any: 2205 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; 2206 let count = match caller_info { 2207 // Async callers, if they have a result, use the last 2208 // parameter as a return pointer so chop that off if 2209 // relevant here. 2210 CallerInfo::Async { params, has_result } => { 2211 let params = ¶ms[..params.len() - usize::from(has_result)]; 2212 for (param, src) in params.iter().zip(&mut src) { 2213 src.write(*param); 2214 } 2215 params.len() 2216 } 2217 2218 // Sync callers forward everything directly. 2219 CallerInfo::Sync { params, .. } => { 2220 for (param, src) in params.iter().zip(&mut src) { 2221 src.write(*param); 2222 } 2223 params.len() 2224 } 2225 }; 2226 // SAFETY: `start` is a valid `*mut VMFuncRef` from 2227 // `wasmtime-cranelift`-generated fused adapter code. Based on 2228 // how it was constructed (see 2229 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` 2230 // for details) we know it takes count parameters and returns 2231 // `dst.len()` results. 2232 unsafe { 2233 crate::Func::call_unchecked_raw( 2234 &mut store, 2235 start.as_non_null(), 2236 NonNull::new( 2237 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _, 2238 ) 2239 .unwrap(), 2240 )?; 2241 } 2242 dst.copy_from_slice(&src[..dst.len()]); 2243 let state = store.0.concurrent_state_mut(); 2244 Waitable::Guest(state.guest_thread.unwrap().task).set_event( 2245 state, 2246 Some(Event::Subtask { 2247 status: Status::Started, 2248 }), 2249 )?; 2250 Ok(()) 2251 }), 2252 LiftResult { 2253 lift: Box::new(move |store, src| { 2254 // SAFETY: See comment in closure passed as `lower_params` 2255 // parameter above. 2256 let mut store = token.as_context_mut(store); 2257 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? 2258 if let ResultInfo::Heap { results } = &result_info { 2259 my_src.push(ValRaw::u32(*results)); 2260 } 2261 // SAFETY: `return_` is a valid `*mut VMFuncRef` from 2262 // `wasmtime-cranelift`-generated fused adapter code. Based 2263 // on how it was constructed (see 2264 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` 2265 // for details) we know it takes `src.len()` parameters and 2266 // returns up to 1 result. 2267 unsafe { 2268 crate::Func::call_unchecked_raw( 2269 &mut store, 2270 return_.as_non_null(), 2271 my_src.as_mut_slice().into(), 2272 )?; 2273 } 2274 let state = store.0.concurrent_state_mut(); 2275 let thread = state.guest_thread.unwrap(); 2276 if sync_caller { 2277 state.get_mut(thread.task)?.sync_result = SyncResult::Produced( 2278 if let ResultInfo::Stack { result_count } = &result_info { 2279 match result_count { 2280 0 => None, 2281 1 => Some(my_src[0]), 2282 _ => unreachable!(), 2283 } 2284 } else { 2285 None 2286 }, 2287 ); 2288 } 2289 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>) 2290 }), 2291 ty: task_return_type, 2292 memory: NonNull::new(memory).map(SendSyncPtr::new), 2293 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(), 2294 }, 2295 Caller::Guest { 2296 thread: old_thread.unwrap(), 2297 instance: caller_instance, 2298 }, 2299 None, 2300 self, 2301 callee_instance, 2302 callee_async, 2303 )?; 2304 2305 let guest_task = state.push(new_task)?; 2306 let new_thread = GuestThread::new_implicit(guest_task); 2307 let guest_thread = state.push(new_thread)?; 2308 state.get_mut(guest_task)?.threads.insert(guest_thread); 2309 2310 let state = store.0.concurrent_state_mut(); 2311 if let Some(old_thread) = old_thread { 2312 if !state.may_enter(guest_task) { 2313 bail!(crate::Trap::CannotEnterComponent); 2314 } 2315 2316 state.get_mut(old_thread.task)?.subtasks.insert(guest_task); 2317 }; 2318 2319 // Make the new thread the current one so that `Self::start_call` knows 2320 // which one to start. 2321 state.guest_thread = Some(QualifiedThreadId { 2322 task: guest_task, 2323 thread: guest_thread, 2324 }); 2325 log::trace!( 2326 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" 2327 ); 2328 2329 Ok(()) 2330 } 2331 2332 /// Call the specified callback function for an async-lifted export. 2333 /// 2334 /// SAFETY: `function` must be a valid reference to a guest function of the 2335 /// correct signature for a callback. 2336 unsafe fn call_callback<T>( 2337 self, 2338 mut store: StoreContextMut<T>, 2339 callee_instance: RuntimeComponentInstanceIndex, 2340 function: SendSyncPtr<VMFuncRef>, 2341 event: Event, 2342 handle: u32, 2343 may_enter_after_call: bool, 2344 ) -> Result<u32> { 2345 let mut flags = self.id().get(store.0).instance_flags(callee_instance); 2346 2347 let (ordinal, result) = event.parts(); 2348 let params = &mut [ 2349 ValRaw::u32(ordinal), 2350 ValRaw::u32(handle), 2351 ValRaw::u32(result), 2352 ]; 2353 // SAFETY: `func` is a valid `*mut VMFuncRef` from either 2354 // `wasmtime-cranelift`-generated fused adapter code or 2355 // `component::Options`. Per `wasmparser` callback signature 2356 // validation, we know it takes three parameters and returns one. 2357 unsafe { 2358 flags.set_may_enter(false); 2359 crate::Func::call_unchecked_raw( 2360 &mut store, 2361 function.as_non_null(), 2362 params.as_mut_slice().into(), 2363 )?; 2364 flags.set_may_enter(may_enter_after_call); 2365 } 2366 Ok(params[0].get_u32()) 2367 } 2368 2369 /// Start a guest->guest call previously prepared using 2370 /// `Self::prepare_call`. 2371 /// 2372 /// This is called from fused adapter code generated in 2373 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call 2374 /// this function immediately after calling `Self::prepare_call`. 2375 /// 2376 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest 2377 /// functions with the appropriate signatures for the current guest task. 2378 /// If this is a call to an async-lowered import, the actual call may be 2379 /// deferred and run after this function returns, in which case the pointer 2380 /// arguments must also be valid when the call happens. 2381 unsafe fn start_call<T: 'static>( 2382 self, 2383 mut store: StoreContextMut<T>, 2384 callback: *mut VMFuncRef, 2385 post_return: *mut VMFuncRef, 2386 callee: *mut VMFuncRef, 2387 param_count: u32, 2388 result_count: u32, 2389 flags: u32, 2390 storage: Option<&mut [MaybeUninit<ValRaw>]>, 2391 ) -> Result<u32> { 2392 let token = StoreToken::new(store.as_context_mut()); 2393 let async_caller = storage.is_none(); 2394 let state = store.0.concurrent_state_mut(); 2395 let guest_thread = state.guest_thread.unwrap(); 2396 let callee_async = state.get_mut(guest_thread.task)?.async_function; 2397 let may_enter_after_call = state 2398 .get_mut(guest_thread.task)? 2399 .call_post_return_automatically(); 2400 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap()); 2401 let param_count = usize::try_from(param_count).unwrap(); 2402 assert!(param_count <= MAX_FLAT_PARAMS); 2403 let result_count = usize::try_from(result_count).unwrap(); 2404 assert!(result_count <= MAX_FLAT_RESULTS); 2405 2406 let task = state.get_mut(guest_thread.task)?; 2407 if !callback.is_null() { 2408 // We're calling an async-lifted export with a callback, so store 2409 // the callback and related context as part of the task so we can 2410 // call it later when needed. 2411 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap()); 2412 task.callback = Some(Box::new(move |store, runtime_instance, event, handle| { 2413 let store = token.as_context_mut(store); 2414 unsafe { 2415 self.call_callback::<T>( 2416 store, 2417 runtime_instance, 2418 callback, 2419 event, 2420 handle, 2421 may_enter_after_call, 2422 ) 2423 } 2424 })); 2425 } 2426 2427 let Caller::Guest { 2428 thread: caller, 2429 instance: runtime_instance, 2430 } = &task.caller 2431 else { 2432 // As of this writing, `start_call` is only used for guest->guest 2433 // calls. 2434 unreachable!() 2435 }; 2436 let caller = *caller; 2437 let caller_instance = *runtime_instance; 2438 2439 let callee_instance = task.instance; 2440 2441 let instance_flags = if callback.is_null() { 2442 None 2443 } else { 2444 Some(self.id().get(store.0).instance_flags(callee_instance.index)) 2445 }; 2446 2447 // Queue the call as a "high priority" work item. 2448 unsafe { 2449 self.queue_call( 2450 store.as_context_mut(), 2451 guest_thread, 2452 callee, 2453 param_count, 2454 result_count, 2455 instance_flags, 2456 (flags & START_FLAG_ASYNC_CALLEE) != 0, 2457 NonNull::new(callback).map(SendSyncPtr::new), 2458 NonNull::new(post_return).map(SendSyncPtr::new), 2459 )?; 2460 } 2461 2462 let state = store.0.concurrent_state_mut(); 2463 2464 // Use the caller's `GuestTask::sync_call_set` to register interest in 2465 // the subtask... 2466 let guest_waitable = Waitable::Guest(guest_thread.task); 2467 let old_set = guest_waitable.common(state)?.set; 2468 let set = state.get_mut(caller.task)?.sync_call_set; 2469 guest_waitable.join(state, Some(set))?; 2470 2471 // ... and suspend this fiber temporarily while we wait for it to start. 2472 // 2473 // Note that we _could_ call the callee directly using the current fiber 2474 // rather than suspend this one, but that would make reasoning about the 2475 // event loop more complicated and is probably only worth doing if 2476 // there's a measurable performance benefit. In addition, it would mean 2477 // blocking the caller if the callee calls a blocking sync-lowered 2478 // import, and as of this writing the spec says we must not do that. 2479 // 2480 // Alternatively, the fused adapter code could be modified to call the 2481 // callee directly without calling a host-provided intrinsic at all (in 2482 // which case it would need to do its own, inline backpressure checks, 2483 // etc.). Again, we'd want to see a measurable performance benefit 2484 // before committing to such an optimization. And again, we'd need to 2485 // update the spec to allow that. 2486 let (status, waitable) = loop { 2487 store.0.suspend(SuspendReason::Waiting { 2488 set, 2489 thread: caller, 2490 // Normally, `StoreOpaque::suspend` would assert it's being 2491 // called from a context where blocking is allowed. However, if 2492 // `async_caller` is `true`, we'll only "block" long enough for 2493 // the callee to start, i.e. we won't repeat this loop, so we 2494 // tell `suspend` it's okay even if we're not allowed to block. 2495 // Alternatively, if the callee is not an async function, then 2496 // we know it won't block anyway. 2497 skip_may_block_check: async_caller || !callee_async, 2498 })?; 2499 2500 let state = store.0.concurrent_state_mut(); 2501 2502 log::trace!("taking event for {:?}", guest_thread.task); 2503 let event = guest_waitable.take_event(state)?; 2504 let Some(Event::Subtask { status }) = event else { 2505 unreachable!(); 2506 }; 2507 2508 log::trace!("status {status:?} for {:?}", guest_thread.task); 2509 2510 if status == Status::Returned { 2511 // It returned, so we can stop waiting. 2512 break (status, None); 2513 } else if async_caller { 2514 // It hasn't returned yet, but the caller is calling via an 2515 // async-lowered import, so we generate a handle for the task 2516 // waitable and return the status. 2517 let handle = store 2518 .0 2519 .handle_table(RuntimeInstance { 2520 instance: self.id().instance(), 2521 index: caller_instance, 2522 }) 2523 .subtask_insert_guest(guest_thread.task.rep())?; 2524 store 2525 .0 2526 .concurrent_state_mut() 2527 .get_mut(guest_thread.task)? 2528 .common 2529 .handle = Some(handle); 2530 break (status, Some(handle)); 2531 } else { 2532 // The callee hasn't returned yet, and the caller is calling via 2533 // a sync-lowered import, so we loop and keep waiting until the 2534 // callee returns. 2535 } 2536 }; 2537 2538 let state = store.0.concurrent_state_mut(); 2539 2540 guest_waitable.join(state, old_set)?; 2541 2542 if let Some(storage) = storage { 2543 // The caller used a sync-lowered import to call an async-lifted 2544 // export, in which case the result, if any, has been stashed in 2545 // `GuestTask::sync_result`. 2546 let task = state.get_mut(guest_thread.task)?; 2547 if let Some(result) = task.sync_result.take() { 2548 if let Some(result) = result { 2549 storage[0] = MaybeUninit::new(result); 2550 } 2551 2552 if task.exited { 2553 if task.ready_to_delete() { 2554 Waitable::Guest(guest_thread.task).delete_from(state)?; 2555 } 2556 } 2557 } 2558 } 2559 2560 // Reset the current thread to point to the caller as it resumes control. 2561 state.guest_thread = Some(caller); 2562 state.get_mut(caller.thread)?.state = GuestThreadState::Running; 2563 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); 2564 2565 Ok(status.pack(waitable)) 2566 } 2567 2568 /// Poll the specified future once on behalf of a guest->host call using an 2569 /// async-lowered import. 2570 /// 2571 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns 2572 /// `Pending`, add it to the set of futures to be polled as part of this 2573 /// instance's event loop until it completes, and then return 2574 /// `Ok(Some(handle))` where `handle` is the waitable handle to return. 2575 /// 2576 /// Whether the future returns `Ready` immediately or later, the `lower` 2577 /// function will be used to lower the result, if any, into the guest caller's 2578 /// stack and linear memory unless the task has been cancelled. 2579 pub(crate) fn first_poll<T: 'static, R: Send + 'static>( 2580 self, 2581 mut store: StoreContextMut<'_, T>, 2582 future: impl Future<Output = Result<R>> + Send + 'static, 2583 caller_instance: RuntimeComponentInstanceIndex, 2584 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static, 2585 ) -> Result<Option<u32>> { 2586 let token = StoreToken::new(store.as_context_mut()); 2587 let state = store.0.concurrent_state_mut(); 2588 let caller = state.guest_thread.unwrap(); 2589 2590 // Create an abortable future which hooks calls to poll and manages call 2591 // context state for the future. 2592 let (join_handle, future) = JoinHandle::run(async move { 2593 let mut future = pin!(future); 2594 let mut call_context = None; 2595 future::poll_fn(move |cx| { 2596 // Push the call context for managing any resource borrows 2597 // for the task. 2598 tls::get(|store| { 2599 if let Some(call_context) = call_context.take() { 2600 token 2601 .as_context_mut(store) 2602 .0 2603 .component_resource_state() 2604 .0 2605 .push(call_context); 2606 } 2607 }); 2608 2609 let result = future.as_mut().poll(cx); 2610 2611 if result.is_pending() { 2612 // Pop the call context for managing any resource 2613 // borrows for the task. 2614 tls::get(|store| { 2615 call_context = Some( 2616 token 2617 .as_context_mut(store) 2618 .0 2619 .component_resource_state() 2620 .0 2621 .pop() 2622 .unwrap(), 2623 ); 2624 }); 2625 } 2626 result 2627 }) 2628 .await 2629 }); 2630 2631 // We create a new host task even though it might complete immediately 2632 // (in which case we won't need to pass a waitable back to the guest). 2633 // If it does complete immediately, we'll remove it before we return. 2634 let task = state.push(HostTask::new(caller_instance, Some(join_handle)))?; 2635 2636 log::trace!("new host task child of {caller:?}: {task:?}"); 2637 2638 let mut future = Box::pin(future); 2639 2640 // Finally, poll the future. We can use a dummy `Waker` here because 2641 // we'll add the future to `ConcurrentState::futures` and poll it 2642 // automatically from the event loop if it doesn't complete immediately 2643 // here. 2644 let poll = tls::set(store.0, || { 2645 future 2646 .as_mut() 2647 .poll(&mut Context::from_waker(&Waker::noop())) 2648 }); 2649 2650 Ok(match poll { 2651 Poll::Ready(None) => unreachable!(), 2652 Poll::Ready(Some(result)) => { 2653 // It finished immediately; lower the result and delete the 2654 // task. 2655 lower(store.as_context_mut(), result?)?; 2656 log::trace!("delete host task {task:?} (already ready)"); 2657 store.0.concurrent_state_mut().delete(task)?; 2658 None 2659 } 2660 Poll::Pending => { 2661 // It hasn't finished yet; add the future to 2662 // `ConcurrentState::futures` so it will be polled by the event 2663 // loop and allocate a waitable handle to return to the guest. 2664 2665 // Wrap the future in a closure responsible for lowering the result into 2666 // the guest's stack and memory, as well as notifying any waiters that 2667 // the task returned. 2668 let future = 2669 Box::pin(async move { 2670 let result = match future.await { 2671 Some(result) => result?, 2672 // Task was cancelled; nothing left to do. 2673 None => return Ok(()), 2674 }; 2675 tls::get(move |store| { 2676 // Here we schedule a task to run on a worker fiber to do 2677 // the lowering since it may involve a call to the guest's 2678 // realloc function. This is necessary because calling the 2679 // guest while there are host embedder frames on the stack 2680 // is unsound. 2681 store.concurrent_state_mut().push_high_priority( 2682 WorkItem::WorkerFunction(AlwaysMut::new(Box::new(move |store| { 2683 lower(token.as_context_mut(store), result)?; 2684 let state = store.concurrent_state_mut(); 2685 state.get_mut(task)?.join_handle.take(); 2686 Waitable::Host(task).set_event( 2687 state, 2688 Some(Event::Subtask { 2689 status: Status::Returned, 2690 }), 2691 ) 2692 }))), 2693 ); 2694 Ok(()) 2695 }) 2696 }); 2697 2698 store.0.concurrent_state_mut().push_future(future); 2699 let handle = store 2700 .0 2701 .handle_table(RuntimeInstance { 2702 instance: self.id().instance(), 2703 index: caller_instance, 2704 }) 2705 .subtask_insert_host(task.rep())?; 2706 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); 2707 log::trace!( 2708 "assign {task:?} handle {handle} for {caller:?} instance {caller_instance:?}" 2709 ); 2710 Some(handle) 2711 } 2712 }) 2713 } 2714 2715 /// Implements the `task.return` intrinsic, lifting the result for the 2716 /// current guest task. 2717 pub(crate) fn task_return( 2718 self, 2719 store: &mut dyn VMStore, 2720 caller: RuntimeComponentInstanceIndex, 2721 ty: TypeTupleIndex, 2722 options: OptionsIndex, 2723 storage: &[ValRaw], 2724 ) -> Result<()> { 2725 self.id().get(store).check_may_leave(caller)?; 2726 let state = store.concurrent_state_mut(); 2727 let guest_thread = state.guest_thread.unwrap(); 2728 let lift = state 2729 .get_mut(guest_thread.task)? 2730 .lift_result 2731 .take() 2732 .ok_or_else(|| { 2733 anyhow!("`task.return` or `task.cancel` called more than once for current task") 2734 })?; 2735 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2736 2737 let CanonicalOptions { 2738 string_encoding, 2739 data_model, 2740 .. 2741 } = &self.id().get(store).component().env_component().options[options]; 2742 2743 let invalid = ty != lift.ty 2744 || string_encoding != &lift.string_encoding 2745 || match data_model { 2746 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { 2747 Some(memory) => { 2748 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); 2749 let actual = self.id().get(store).runtime_memory(memory); 2750 expected != actual.as_ptr() 2751 } 2752 // Memory not specified, meaning it didn't need to be 2753 // specified per validation, so not invalid. 2754 None => false, 2755 }, 2756 // Always invalid as this isn't supported. 2757 CanonicalOptionsDataModel::Gc { .. } => true, 2758 }; 2759 2760 if invalid { 2761 bail!("invalid `task.return` signature and/or options for current task"); 2762 } 2763 2764 log::trace!("task.return for {guest_thread:?}"); 2765 2766 let result = (lift.lift)(store, storage)?; 2767 self.task_complete( 2768 store, 2769 guest_thread.task, 2770 result, 2771 Status::Returned, 2772 ValRaw::i32(0), 2773 ) 2774 } 2775 2776 /// Implements the `task.cancel` intrinsic. 2777 pub(crate) fn task_cancel( 2778 self, 2779 store: &mut StoreOpaque, 2780 caller: RuntimeComponentInstanceIndex, 2781 ) -> Result<()> { 2782 self.id().get(store).check_may_leave(caller)?; 2783 let state = store.concurrent_state_mut(); 2784 let guest_thread = state.guest_thread.unwrap(); 2785 let task = state.get_mut(guest_thread.task)?; 2786 if !task.cancel_sent { 2787 bail!("`task.cancel` called by task which has not been cancelled") 2788 } 2789 _ = task.lift_result.take().ok_or_else(|| { 2790 anyhow!("`task.return` or `task.cancel` called more than once for current task") 2791 })?; 2792 2793 assert!(task.result.is_none()); 2794 2795 log::trace!("task.cancel for {guest_thread:?}"); 2796 2797 self.task_complete( 2798 store, 2799 guest_thread.task, 2800 Box::new(DummyResult), 2801 Status::ReturnCancelled, 2802 ValRaw::i32(0), 2803 ) 2804 } 2805 2806 /// Complete the specified guest task (i.e. indicate that it has either 2807 /// returned a (possibly empty) result or cancelled itself). 2808 /// 2809 /// This will return any resource borrows and notify any current or future 2810 /// waiters that the task has completed. 2811 fn task_complete( 2812 self, 2813 store: &mut StoreOpaque, 2814 guest_task: TableId<GuestTask>, 2815 result: Box<dyn Any + Send + Sync>, 2816 status: Status, 2817 post_return_arg: ValRaw, 2818 ) -> Result<()> { 2819 if store 2820 .concurrent_state_mut() 2821 .get_mut(guest_task)? 2822 .call_post_return_automatically() 2823 { 2824 let (calls, host_table, _, instance) = 2825 store.component_resource_state_with_instance(self); 2826 ResourceTables { 2827 calls, 2828 host_table: Some(host_table), 2829 guest: Some(instance.instance_states()), 2830 } 2831 .exit_call()?; 2832 } else { 2833 // As of this writing, the only scenario where `call_post_return_automatically` 2834 // would be false for a `GuestTask` is for host-to-guest calls using 2835 // `[Typed]Func::call_async`, in which case the `function_index` 2836 // should be a non-`None` value. 2837 let function_index = store 2838 .concurrent_state_mut() 2839 .get_mut(guest_task)? 2840 .function_index 2841 .unwrap(); 2842 self.id() 2843 .get_mut(store) 2844 .post_return_arg_set(function_index, post_return_arg); 2845 } 2846 2847 let state = store.concurrent_state_mut(); 2848 let task = state.get_mut(guest_task)?; 2849 2850 if let Caller::Host { tx, .. } = &mut task.caller { 2851 if let Some(tx) = tx.take() { 2852 _ = tx.send(result); 2853 } 2854 } else { 2855 task.result = Some(result); 2856 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; 2857 } 2858 2859 Ok(()) 2860 } 2861 2862 /// Implements the `waitable-set.new` intrinsic. 2863 pub(crate) fn waitable_set_new( 2864 self, 2865 store: &mut StoreOpaque, 2866 caller_instance: RuntimeComponentInstanceIndex, 2867 ) -> Result<u32> { 2868 self.id().get_mut(store).check_may_leave(caller_instance)?; 2869 let set = store.concurrent_state_mut().push(WaitableSet::default())?; 2870 let handle = store 2871 .handle_table(RuntimeInstance { 2872 instance: self.id().instance(), 2873 index: caller_instance, 2874 }) 2875 .waitable_set_insert(set.rep())?; 2876 log::trace!("new waitable set {set:?} (handle {handle})"); 2877 Ok(handle) 2878 } 2879 2880 /// Implements the `waitable-set.drop` intrinsic. 2881 pub(crate) fn waitable_set_drop( 2882 self, 2883 store: &mut StoreOpaque, 2884 caller_instance: RuntimeComponentInstanceIndex, 2885 set: u32, 2886 ) -> Result<()> { 2887 self.id().get_mut(store).check_may_leave(caller_instance)?; 2888 let rep = store 2889 .handle_table(RuntimeInstance { 2890 instance: self.id().instance(), 2891 index: caller_instance, 2892 }) 2893 .waitable_set_remove(set)?; 2894 2895 log::trace!("drop waitable set {rep} (handle {set})"); 2896 2897 let set = store 2898 .concurrent_state_mut() 2899 .delete(TableId::<WaitableSet>::new(rep))?; 2900 2901 if !set.waiting.is_empty() { 2902 bail!("cannot drop waitable set with waiters"); 2903 } 2904 2905 Ok(()) 2906 } 2907 2908 /// Implements the `waitable.join` intrinsic. 2909 pub(crate) fn waitable_join( 2910 self, 2911 store: &mut StoreOpaque, 2912 caller_instance: RuntimeComponentInstanceIndex, 2913 waitable_handle: u32, 2914 set_handle: u32, 2915 ) -> Result<()> { 2916 let mut instance = self.id().get_mut(store); 2917 instance.check_may_leave(caller_instance)?; 2918 let waitable = 2919 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; 2920 2921 let set = if set_handle == 0 { 2922 None 2923 } else { 2924 let set = instance.instance_states().0[caller_instance] 2925 .handle_table() 2926 .waitable_set_rep(set_handle)?; 2927 2928 Some(TableId::<WaitableSet>::new(set)) 2929 }; 2930 2931 log::trace!( 2932 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", 2933 ); 2934 2935 waitable.join(store.concurrent_state_mut(), set) 2936 } 2937 2938 /// Implements the `subtask.drop` intrinsic. 2939 pub(crate) fn subtask_drop( 2940 self, 2941 store: &mut StoreOpaque, 2942 caller_instance: RuntimeComponentInstanceIndex, 2943 task_id: u32, 2944 ) -> Result<()> { 2945 self.id().get_mut(store).check_may_leave(caller_instance)?; 2946 self.waitable_join(store, caller_instance, task_id, 0)?; 2947 2948 let (rep, is_host) = store 2949 .handle_table(RuntimeInstance { 2950 instance: self.id().instance(), 2951 index: caller_instance, 2952 }) 2953 .subtask_remove(task_id)?; 2954 2955 let concurrent_state = store.concurrent_state_mut(); 2956 let (waitable, expected_caller_instance, delete) = if is_host { 2957 let id = TableId::<HostTask>::new(rep); 2958 let task = concurrent_state.get_mut(id)?; 2959 if task.join_handle.is_some() { 2960 bail!("cannot drop a subtask which has not yet resolved"); 2961 } 2962 (Waitable::Host(id), task.caller_instance, true) 2963 } else { 2964 let id = TableId::<GuestTask>::new(rep); 2965 let task = concurrent_state.get_mut(id)?; 2966 if task.lift_result.is_some() { 2967 bail!("cannot drop a subtask which has not yet resolved"); 2968 } 2969 if let Caller::Guest { instance, .. } = &task.caller { 2970 (Waitable::Guest(id), *instance, task.exited) 2971 } else { 2972 unreachable!() 2973 } 2974 }; 2975 2976 waitable.common(concurrent_state)?.handle = None; 2977 2978 if waitable.take_event(concurrent_state)?.is_some() { 2979 bail!("cannot drop a subtask with an undelivered event"); 2980 } 2981 2982 if delete { 2983 waitable.delete_from(concurrent_state)?; 2984 } 2985 2986 // Since waitables can neither be passed between instances nor forged, 2987 // this should never fail unless there's a bug in Wasmtime, but we check 2988 // here to be sure: 2989 assert_eq!(expected_caller_instance, caller_instance); 2990 log::trace!("subtask_drop {waitable:?} (handle {task_id})"); 2991 Ok(()) 2992 } 2993 2994 /// Implements the `waitable-set.wait` intrinsic. 2995 pub(crate) fn waitable_set_wait( 2996 self, 2997 store: &mut StoreOpaque, 2998 caller: RuntimeComponentInstanceIndex, 2999 options: OptionsIndex, 3000 set: u32, 3001 payload: u32, 3002 ) -> Result<u32> { 3003 self.id().get(store).check_may_leave(caller)?; 3004 3005 if !self.options(store, options).async_ { 3006 // The caller may only call `waitable-set.wait` from an async task 3007 // (i.e. a task created via a call to an async export). 3008 // Otherwise, we'll trap. 3009 store.concurrent_state_mut().check_blocking()?; 3010 } 3011 3012 let &CanonicalOptions { 3013 cancellable, 3014 instance: caller_instance, 3015 .. 3016 } = &self.id().get(store).component().env_component().options[options]; 3017 let rep = store 3018 .handle_table(RuntimeInstance { 3019 instance: self.id().instance(), 3020 index: caller_instance, 3021 }) 3022 .waitable_set_rep(set)?; 3023 3024 self.waitable_check( 3025 store, 3026 cancellable, 3027 WaitableCheck::Wait, 3028 WaitableCheckParams { 3029 set: TableId::new(rep), 3030 options, 3031 payload, 3032 }, 3033 ) 3034 } 3035 3036 /// Implements the `waitable-set.poll` intrinsic. 3037 pub(crate) fn waitable_set_poll( 3038 self, 3039 store: &mut StoreOpaque, 3040 caller: RuntimeComponentInstanceIndex, 3041 options: OptionsIndex, 3042 set: u32, 3043 payload: u32, 3044 ) -> Result<u32> { 3045 self.id().get(store).check_may_leave(caller)?; 3046 3047 let &CanonicalOptions { 3048 cancellable, 3049 instance: caller_instance, 3050 .. 3051 } = &self.id().get(store).component().env_component().options[options]; 3052 let rep = store 3053 .handle_table(RuntimeInstance { 3054 instance: self.id().instance(), 3055 index: caller_instance, 3056 }) 3057 .waitable_set_rep(set)?; 3058 3059 self.waitable_check( 3060 store, 3061 cancellable, 3062 WaitableCheck::Poll, 3063 WaitableCheckParams { 3064 set: TableId::new(rep), 3065 options, 3066 payload, 3067 }, 3068 ) 3069 } 3070 3071 /// Implements the `thread.index` intrinsic. 3072 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> { 3073 let thread_id = store 3074 .concurrent_state_mut() 3075 .guest_thread 3076 .ok_or_else(|| anyhow!("no current thread"))? 3077 .thread; 3078 // The unwrap is safe because `instance_rep` must be `Some` by this point 3079 Ok(store 3080 .concurrent_state_mut() 3081 .get_mut(thread_id)? 3082 .instance_rep 3083 .unwrap()) 3084 } 3085 3086 /// Implements the `thread.new-indirect` intrinsic. 3087 pub(crate) fn thread_new_indirect<T: 'static>( 3088 self, 3089 mut store: StoreContextMut<T>, 3090 runtime_instance: RuntimeComponentInstanceIndex, 3091 _func_ty_idx: TypeFuncIndex, // currently unused 3092 start_func_table_idx: RuntimeTableIndex, 3093 start_func_idx: u32, 3094 context: i32, 3095 ) -> Result<u32> { 3096 self.id().get(store.0).check_may_leave(runtime_instance)?; 3097 3098 log::trace!("creating new thread"); 3099 3100 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); 3101 let (instance, registry) = self.id().get_mut_and_registry(store.0); 3102 let callee = instance 3103 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? 3104 .ok_or_else(|| { 3105 anyhow!("the start function index points to an uninitialized function") 3106 })?; 3107 if callee.type_index(store.0) != start_func_ty.type_index() { 3108 bail!( 3109 "start function does not match expected type (currently only `(i32) -> ()` is supported)" 3110 ); 3111 } 3112 3113 let token = StoreToken::new(store.as_context_mut()); 3114 let start_func = Box::new( 3115 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { 3116 let old_thread = store 3117 .concurrent_state_mut() 3118 .guest_thread 3119 .replace(guest_thread); 3120 log::trace!( 3121 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" 3122 ); 3123 3124 store.maybe_push_call_context(guest_thread.task)?; 3125 3126 let mut store = token.as_context_mut(store); 3127 let mut params = [ValRaw::i32(context)]; 3128 // Use call_unchecked rather than call or call_async, as we don't want to run the function 3129 // on a separate fiber if we're running in an async store. 3130 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; 3131 3132 store.0.maybe_pop_call_context(guest_thread.task)?; 3133 3134 self.cleanup_thread(store.0, guest_thread, runtime_instance)?; 3135 log::trace!("explicit thread {guest_thread:?} completed"); 3136 let state = store.0.concurrent_state_mut(); 3137 let task = state.get_mut(guest_thread.task)?; 3138 if task.threads.is_empty() && !task.returned_or_cancelled() { 3139 bail!(Trap::NoAsyncResult); 3140 } 3141 state.guest_thread = old_thread; 3142 old_thread 3143 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 3144 if state.get_mut(guest_thread.task)?.ready_to_delete() { 3145 Waitable::Guest(guest_thread.task).delete_from(state)?; 3146 } 3147 log::trace!("thread start: restored {old_thread:?} as current thread"); 3148 3149 Ok(()) 3150 }, 3151 ); 3152 3153 let state = store.0.concurrent_state_mut(); 3154 let current_thread = state.guest_thread.unwrap(); 3155 let parent_task = current_thread.task; 3156 3157 let new_thread = GuestThread::new_explicit(parent_task, start_func); 3158 let thread_id = state.push(new_thread)?; 3159 state.get_mut(parent_task)?.threads.insert(thread_id); 3160 3161 log::trace!("new thread with id {thread_id:?} created"); 3162 3163 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) 3164 } 3165 3166 pub(crate) fn resume_suspended_thread( 3167 self, 3168 store: &mut StoreOpaque, 3169 runtime_instance: RuntimeComponentInstanceIndex, 3170 thread_idx: u32, 3171 high_priority: bool, 3172 ) -> Result<()> { 3173 let thread_id = 3174 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; 3175 let state = store.concurrent_state_mut(); 3176 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; 3177 let thread = state.get_mut(guest_thread.thread)?; 3178 3179 match mem::replace(&mut thread.state, GuestThreadState::Running) { 3180 GuestThreadState::NotStartedExplicit(start_func) => { 3181 log::trace!("starting thread {guest_thread:?}"); 3182 let guest_call = WorkItem::GuestCall(GuestCall { 3183 thread: guest_thread, 3184 kind: GuestCallKind::StartExplicit(Box::new(move |store| { 3185 start_func(store, guest_thread) 3186 })), 3187 }); 3188 store 3189 .concurrent_state_mut() 3190 .push_work_item(guest_call, high_priority); 3191 } 3192 GuestThreadState::Suspended(fiber) => { 3193 log::trace!("resuming thread {thread_id:?} that was suspended"); 3194 store 3195 .concurrent_state_mut() 3196 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); 3197 } 3198 _ => { 3199 bail!("cannot resume thread which is not suspended"); 3200 } 3201 } 3202 Ok(()) 3203 } 3204 3205 fn add_guest_thread_to_instance_table( 3206 self, 3207 thread_id: TableId<GuestThread>, 3208 store: &mut StoreOpaque, 3209 runtime_instance: RuntimeComponentInstanceIndex, 3210 ) -> Result<u32> { 3211 let guest_id = store 3212 .handle_table(RuntimeInstance { 3213 instance: self.id().instance(), 3214 index: runtime_instance, 3215 }) 3216 .guest_thread_insert(thread_id.rep())?; 3217 store 3218 .concurrent_state_mut() 3219 .get_mut(thread_id)? 3220 .instance_rep = Some(guest_id); 3221 Ok(guest_id) 3222 } 3223 3224 /// Helper function for the `thread.yield`, `thread.yield-to`, `thread.suspend`, 3225 /// and `thread.switch-to` intrinsics. 3226 pub(crate) fn suspension_intrinsic( 3227 self, 3228 store: &mut StoreOpaque, 3229 caller: RuntimeComponentInstanceIndex, 3230 cancellable: bool, 3231 yielding: bool, 3232 to_thread: Option<u32>, 3233 ) -> Result<WaitResult> { 3234 self.id().get(store).check_may_leave(caller)?; 3235 3236 if to_thread.is_none() { 3237 let state = store.concurrent_state_mut(); 3238 if yielding { 3239 // This is a `thread.yield` call 3240 if !state.may_block(state.guest_thread.unwrap().task) { 3241 // The spec defines `thread.yield` to be a no-op in a 3242 // non-blocking context, so we return immediately without giving 3243 // any other thread a chance to run. 3244 return Ok(WaitResult::Completed); 3245 } 3246 } else { 3247 // The caller may only call `thread.suspend` from an async task 3248 // (i.e. a task created via a call to an async export). 3249 // Otherwise, we'll trap. 3250 state.check_blocking()?; 3251 } 3252 } 3253 3254 // There could be a pending cancellation from a previous uncancellable wait 3255 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3256 return Ok(WaitResult::Cancelled); 3257 } 3258 3259 if let Some(thread) = to_thread { 3260 self.resume_suspended_thread(store, caller, thread, true)?; 3261 } 3262 3263 let state = store.concurrent_state_mut(); 3264 let guest_thread = state.guest_thread.unwrap(); 3265 let reason = if yielding { 3266 SuspendReason::Yielding { 3267 thread: guest_thread, 3268 } 3269 } else { 3270 SuspendReason::ExplicitlySuspending { 3271 thread: guest_thread, 3272 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3273 // we're handling a `thread.switch-to` call; otherwise it would 3274 // panic if we called it in a non-blocking context. 3275 skip_may_block_check: to_thread.is_some(), 3276 } 3277 }; 3278 3279 store.suspend(reason)?; 3280 3281 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3282 Ok(WaitResult::Cancelled) 3283 } else { 3284 Ok(WaitResult::Completed) 3285 } 3286 } 3287 3288 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. 3289 fn waitable_check( 3290 self, 3291 store: &mut StoreOpaque, 3292 cancellable: bool, 3293 check: WaitableCheck, 3294 params: WaitableCheckParams, 3295 ) -> Result<u32> { 3296 let guest_thread = store.concurrent_state_mut().guest_thread.unwrap(); 3297 3298 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set); 3299 3300 let state = store.concurrent_state_mut(); 3301 let task = state.get_mut(guest_thread.task)?; 3302 3303 // If we're waiting, and there are no events immediately available, 3304 // suspend the fiber until that changes. 3305 match &check { 3306 WaitableCheck::Wait => { 3307 let set = params.set; 3308 3309 if (task.event.is_none() 3310 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) 3311 && state.get_mut(set)?.ready.is_empty() 3312 { 3313 if cancellable { 3314 let old = state 3315 .get_mut(guest_thread.thread)? 3316 .wake_on_cancel 3317 .replace(set); 3318 assert!(old.is_none()); 3319 } 3320 3321 store.suspend(SuspendReason::Waiting { 3322 set, 3323 thread: guest_thread, 3324 skip_may_block_check: false, 3325 })?; 3326 } 3327 } 3328 WaitableCheck::Poll => {} 3329 } 3330 3331 log::trace!( 3332 "waitable check for {guest_thread:?}; set {:?}, part two", 3333 params.set 3334 ); 3335 3336 // Deliver any pending events to the guest and return. 3337 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; 3338 3339 let (ordinal, handle, result) = match &check { 3340 WaitableCheck::Wait => { 3341 let (event, waitable) = event.unwrap(); 3342 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3343 let (ordinal, result) = event.parts(); 3344 (ordinal, handle, result) 3345 } 3346 WaitableCheck::Poll => { 3347 if let Some((event, waitable)) = event { 3348 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3349 let (ordinal, result) = event.parts(); 3350 (ordinal, handle, result) 3351 } else { 3352 log::trace!( 3353 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", 3354 guest_thread.task, 3355 params.set 3356 ); 3357 let (ordinal, result) = Event::None.parts(); 3358 (ordinal, 0, result) 3359 } 3360 } 3361 }; 3362 let memory = self.options_memory_mut(store, params.options); 3363 let ptr = func::validate_inbounds_dynamic( 3364 &CanonicalAbiInfo::POINTER_PAIR, 3365 memory, 3366 &ValRaw::u32(params.payload), 3367 )?; 3368 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); 3369 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); 3370 Ok(ordinal) 3371 } 3372 3373 /// Implements the `subtask.cancel` intrinsic. 3374 pub(crate) fn subtask_cancel( 3375 self, 3376 store: &mut StoreOpaque, 3377 caller_instance: RuntimeComponentInstanceIndex, 3378 async_: bool, 3379 task_id: u32, 3380 ) -> Result<u32> { 3381 self.id().get(store).check_may_leave(caller_instance)?; 3382 3383 if !async_ { 3384 // The caller may only sync call `subtask.cancel` from an async task 3385 // (i.e. a task created via a call to an async export). Otherwise, 3386 // we'll trap. 3387 store.concurrent_state_mut().check_blocking()?; 3388 } 3389 3390 let (rep, is_host) = store 3391 .handle_table(RuntimeInstance { 3392 instance: self.id().instance(), 3393 index: caller_instance, 3394 }) 3395 .subtask_rep(task_id)?; 3396 let (waitable, expected_caller_instance) = if is_host { 3397 let id = TableId::<HostTask>::new(rep); 3398 ( 3399 Waitable::Host(id), 3400 store.concurrent_state_mut().get_mut(id)?.caller_instance, 3401 ) 3402 } else { 3403 let id = TableId::<GuestTask>::new(rep); 3404 if let Caller::Guest { instance, .. } = 3405 &store.concurrent_state_mut().get_mut(id)?.caller 3406 { 3407 (Waitable::Guest(id), *instance) 3408 } else { 3409 unreachable!() 3410 } 3411 }; 3412 // Since waitables can neither be passed between instances nor forged, 3413 // this should never fail unless there's a bug in Wasmtime, but we check 3414 // here to be sure: 3415 assert_eq!(expected_caller_instance, caller_instance); 3416 3417 log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); 3418 3419 let concurrent_state = store.concurrent_state_mut(); 3420 if let Waitable::Host(host_task) = waitable { 3421 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() { 3422 handle.abort(); 3423 return Ok(Status::ReturnCancelled as u32); 3424 } 3425 } else { 3426 let caller = concurrent_state.guest_thread.unwrap(); 3427 let guest_task = TableId::<GuestTask>::new(rep); 3428 let task = concurrent_state.get_mut(guest_task)?; 3429 if !task.already_lowered_parameters() { 3430 task.lower_params = None; 3431 task.lift_result = None; 3432 3433 // Not yet started; cancel and remove from pending 3434 let instance = task.instance; 3435 let pending = &mut store.instance_state(instance).pending; 3436 let pending_count = pending.len(); 3437 pending.retain(|thread, _| thread.task != guest_task); 3438 // If there were no pending threads for this task, we're in an error state 3439 if pending.len() == pending_count { 3440 bail!("`subtask.cancel` called after terminal status delivered"); 3441 } 3442 return Ok(Status::StartCancelled as u32); 3443 } else if !task.returned_or_cancelled() { 3444 // Started, but not yet returned or cancelled; send the 3445 // `CANCELLED` event 3446 task.cancel_sent = true; 3447 // Note that this might overwrite an event that was set earlier 3448 // (e.g. `Event::None` if the task is yielding, or 3449 // `Event::Cancelled` if it was already cancelled), but that's 3450 // okay -- this should supersede the previous state. 3451 task.event = Some(Event::Cancelled); 3452 for thread in task.threads.clone() { 3453 let thread = QualifiedThreadId { 3454 task: guest_task, 3455 thread, 3456 }; 3457 if let Some(set) = concurrent_state 3458 .get_mut(thread.thread) 3459 .unwrap() 3460 .wake_on_cancel 3461 .take() 3462 { 3463 let item = match concurrent_state 3464 .get_mut(set)? 3465 .waiting 3466 .remove(&thread) 3467 .unwrap() 3468 { 3469 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 3470 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { 3471 thread, 3472 kind: GuestCallKind::DeliverEvent { 3473 instance, 3474 set: None, 3475 }, 3476 }), 3477 }; 3478 concurrent_state.push_high_priority(item); 3479 3480 store.suspend(SuspendReason::Yielding { thread: caller })?; 3481 break; 3482 } 3483 } 3484 3485 let concurrent_state = store.concurrent_state_mut(); 3486 let task = concurrent_state.get_mut(guest_task)?; 3487 if !task.returned_or_cancelled() { 3488 if async_ { 3489 return Ok(BLOCKED); 3490 } else { 3491 store.wait_for_event(Waitable::Guest(guest_task))?; 3492 } 3493 } 3494 } 3495 } 3496 3497 let event = waitable.take_event(store.concurrent_state_mut())?; 3498 if let Some(Event::Subtask { 3499 status: status @ (Status::Returned | Status::ReturnCancelled), 3500 }) = event 3501 { 3502 Ok(status as u32) 3503 } else { 3504 bail!("`subtask.cancel` called after terminal status delivered"); 3505 } 3506 } 3507 3508 pub(crate) fn context_get( 3509 self, 3510 store: &mut StoreOpaque, 3511 caller: RuntimeComponentInstanceIndex, 3512 slot: u32, 3513 ) -> Result<u32> { 3514 self.id().get(store).check_may_leave(caller)?; 3515 store.concurrent_state_mut().context_get(slot) 3516 } 3517 3518 pub(crate) fn context_set( 3519 self, 3520 store: &mut StoreOpaque, 3521 caller: RuntimeComponentInstanceIndex, 3522 slot: u32, 3523 value: u32, 3524 ) -> Result<()> { 3525 self.id().get(store).check_may_leave(caller)?; 3526 store.concurrent_state_mut().context_set(slot, value) 3527 } 3528 } 3529 3530 /// Trait representing component model ABI async intrinsics and fused adapter 3531 /// helper functions. 3532 /// 3533 /// SAFETY (callers): Most of the methods in this trait accept raw pointers, 3534 /// which must be valid for at least the duration of the call (and possibly for 3535 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` 3536 /// pointers used for async calls). 3537 pub trait VMComponentAsyncStore { 3538 /// A helper function for fused adapter modules involving calls where the 3539 /// one of the caller or callee is async. 3540 /// 3541 /// This helper is not used when the caller and callee both use the sync 3542 /// ABI, only when at least one is async is this used. 3543 unsafe fn prepare_call( 3544 &mut self, 3545 instance: Instance, 3546 memory: *mut VMMemoryDefinition, 3547 start: *mut VMFuncRef, 3548 return_: *mut VMFuncRef, 3549 caller_instance: RuntimeComponentInstanceIndex, 3550 callee_instance: RuntimeComponentInstanceIndex, 3551 task_return_type: TypeTupleIndex, 3552 callee_async: bool, 3553 string_encoding: u8, 3554 result_count: u32, 3555 storage: *mut ValRaw, 3556 storage_len: usize, 3557 ) -> Result<()>; 3558 3559 /// A helper function for fused adapter modules involving calls where the 3560 /// caller is sync-lowered but the callee is async-lifted. 3561 unsafe fn sync_start( 3562 &mut self, 3563 instance: Instance, 3564 callback: *mut VMFuncRef, 3565 callee: *mut VMFuncRef, 3566 param_count: u32, 3567 storage: *mut MaybeUninit<ValRaw>, 3568 storage_len: usize, 3569 ) -> Result<()>; 3570 3571 /// A helper function for fused adapter modules involving calls where the 3572 /// caller is async-lowered. 3573 unsafe fn async_start( 3574 &mut self, 3575 instance: Instance, 3576 callback: *mut VMFuncRef, 3577 post_return: *mut VMFuncRef, 3578 callee: *mut VMFuncRef, 3579 param_count: u32, 3580 result_count: u32, 3581 flags: u32, 3582 ) -> Result<u32>; 3583 3584 /// The `future.write` intrinsic. 3585 fn future_write( 3586 &mut self, 3587 instance: Instance, 3588 caller: RuntimeComponentInstanceIndex, 3589 ty: TypeFutureTableIndex, 3590 options: OptionsIndex, 3591 future: u32, 3592 address: u32, 3593 ) -> Result<u32>; 3594 3595 /// The `future.read` intrinsic. 3596 fn future_read( 3597 &mut self, 3598 instance: Instance, 3599 caller: RuntimeComponentInstanceIndex, 3600 ty: TypeFutureTableIndex, 3601 options: OptionsIndex, 3602 future: u32, 3603 address: u32, 3604 ) -> Result<u32>; 3605 3606 /// The `future.drop-writable` intrinsic. 3607 fn future_drop_writable( 3608 &mut self, 3609 instance: Instance, 3610 caller: RuntimeComponentInstanceIndex, 3611 ty: TypeFutureTableIndex, 3612 writer: u32, 3613 ) -> Result<()>; 3614 3615 /// The `stream.write` intrinsic. 3616 fn stream_write( 3617 &mut self, 3618 instance: Instance, 3619 caller: RuntimeComponentInstanceIndex, 3620 ty: TypeStreamTableIndex, 3621 options: OptionsIndex, 3622 stream: u32, 3623 address: u32, 3624 count: u32, 3625 ) -> Result<u32>; 3626 3627 /// The `stream.read` intrinsic. 3628 fn stream_read( 3629 &mut self, 3630 instance: Instance, 3631 caller: RuntimeComponentInstanceIndex, 3632 ty: TypeStreamTableIndex, 3633 options: OptionsIndex, 3634 stream: u32, 3635 address: u32, 3636 count: u32, 3637 ) -> Result<u32>; 3638 3639 /// The "fast-path" implementation of the `stream.write` intrinsic for 3640 /// "flat" (i.e. memcpy-able) payloads. 3641 fn flat_stream_write( 3642 &mut self, 3643 instance: Instance, 3644 caller: RuntimeComponentInstanceIndex, 3645 ty: TypeStreamTableIndex, 3646 options: OptionsIndex, 3647 payload_size: u32, 3648 payload_align: u32, 3649 stream: u32, 3650 address: u32, 3651 count: u32, 3652 ) -> Result<u32>; 3653 3654 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" 3655 /// (i.e. memcpy-able) payloads. 3656 fn flat_stream_read( 3657 &mut self, 3658 instance: Instance, 3659 caller: RuntimeComponentInstanceIndex, 3660 ty: TypeStreamTableIndex, 3661 options: OptionsIndex, 3662 payload_size: u32, 3663 payload_align: u32, 3664 stream: u32, 3665 address: u32, 3666 count: u32, 3667 ) -> Result<u32>; 3668 3669 /// The `stream.drop-writable` intrinsic. 3670 fn stream_drop_writable( 3671 &mut self, 3672 instance: Instance, 3673 caller: RuntimeComponentInstanceIndex, 3674 ty: TypeStreamTableIndex, 3675 writer: u32, 3676 ) -> Result<()>; 3677 3678 /// The `error-context.debug-message` intrinsic. 3679 fn error_context_debug_message( 3680 &mut self, 3681 instance: Instance, 3682 caller: RuntimeComponentInstanceIndex, 3683 ty: TypeComponentLocalErrorContextTableIndex, 3684 options: OptionsIndex, 3685 err_ctx_handle: u32, 3686 debug_msg_address: u32, 3687 ) -> Result<()>; 3688 3689 /// The `thread.new-indirect` intrinsic 3690 fn thread_new_indirect( 3691 &mut self, 3692 instance: Instance, 3693 caller: RuntimeComponentInstanceIndex, 3694 func_ty_idx: TypeFuncIndex, 3695 start_func_table_idx: RuntimeTableIndex, 3696 start_func_idx: u32, 3697 context: i32, 3698 ) -> Result<u32>; 3699 } 3700 3701 /// SAFETY: See trait docs. 3702 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> { 3703 unsafe fn prepare_call( 3704 &mut self, 3705 instance: Instance, 3706 memory: *mut VMMemoryDefinition, 3707 start: *mut VMFuncRef, 3708 return_: *mut VMFuncRef, 3709 caller_instance: RuntimeComponentInstanceIndex, 3710 callee_instance: RuntimeComponentInstanceIndex, 3711 task_return_type: TypeTupleIndex, 3712 callee_async: bool, 3713 string_encoding: u8, 3714 result_count_or_max_if_async: u32, 3715 storage: *mut ValRaw, 3716 storage_len: usize, 3717 ) -> Result<()> { 3718 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3719 // this method will have ensured that `storage` is a valid 3720 // pointer containing at least `storage_len` items. 3721 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); 3722 3723 unsafe { 3724 instance.prepare_call( 3725 StoreContextMut(self), 3726 start, 3727 return_, 3728 caller_instance, 3729 callee_instance, 3730 task_return_type, 3731 callee_async, 3732 memory, 3733 string_encoding, 3734 match result_count_or_max_if_async { 3735 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { 3736 params, 3737 has_result: false, 3738 }, 3739 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { 3740 params, 3741 has_result: true, 3742 }, 3743 result_count => CallerInfo::Sync { 3744 params, 3745 result_count, 3746 }, 3747 }, 3748 ) 3749 } 3750 } 3751 3752 unsafe fn sync_start( 3753 &mut self, 3754 instance: Instance, 3755 callback: *mut VMFuncRef, 3756 callee: *mut VMFuncRef, 3757 param_count: u32, 3758 storage: *mut MaybeUninit<ValRaw>, 3759 storage_len: usize, 3760 ) -> Result<()> { 3761 unsafe { 3762 instance 3763 .start_call( 3764 StoreContextMut(self), 3765 callback, 3766 ptr::null_mut(), 3767 callee, 3768 param_count, 3769 1, 3770 START_FLAG_ASYNC_CALLEE, 3771 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3772 // this method will have ensured that `storage` is a valid 3773 // pointer containing at least `storage_len` items. 3774 Some(std::slice::from_raw_parts_mut(storage, storage_len)), 3775 ) 3776 .map(drop) 3777 } 3778 } 3779 3780 unsafe fn async_start( 3781 &mut self, 3782 instance: Instance, 3783 callback: *mut VMFuncRef, 3784 post_return: *mut VMFuncRef, 3785 callee: *mut VMFuncRef, 3786 param_count: u32, 3787 result_count: u32, 3788 flags: u32, 3789 ) -> Result<u32> { 3790 unsafe { 3791 instance.start_call( 3792 StoreContextMut(self), 3793 callback, 3794 post_return, 3795 callee, 3796 param_count, 3797 result_count, 3798 flags, 3799 None, 3800 ) 3801 } 3802 } 3803 3804 fn future_write( 3805 &mut self, 3806 instance: Instance, 3807 caller: RuntimeComponentInstanceIndex, 3808 ty: TypeFutureTableIndex, 3809 options: OptionsIndex, 3810 future: u32, 3811 address: u32, 3812 ) -> Result<u32> { 3813 instance.id().get(self).check_may_leave(caller)?; 3814 instance 3815 .guest_write( 3816 StoreContextMut(self), 3817 caller, 3818 TransmitIndex::Future(ty), 3819 options, 3820 None, 3821 future, 3822 address, 3823 1, 3824 ) 3825 .map(|result| result.encode()) 3826 } 3827 3828 fn future_read( 3829 &mut self, 3830 instance: Instance, 3831 caller: RuntimeComponentInstanceIndex, 3832 ty: TypeFutureTableIndex, 3833 options: OptionsIndex, 3834 future: u32, 3835 address: u32, 3836 ) -> Result<u32> { 3837 instance.id().get(self).check_may_leave(caller)?; 3838 instance 3839 .guest_read( 3840 StoreContextMut(self), 3841 caller, 3842 TransmitIndex::Future(ty), 3843 options, 3844 None, 3845 future, 3846 address, 3847 1, 3848 ) 3849 .map(|result| result.encode()) 3850 } 3851 3852 fn stream_write( 3853 &mut self, 3854 instance: Instance, 3855 caller: RuntimeComponentInstanceIndex, 3856 ty: TypeStreamTableIndex, 3857 options: OptionsIndex, 3858 stream: u32, 3859 address: u32, 3860 count: u32, 3861 ) -> Result<u32> { 3862 instance.id().get(self).check_may_leave(caller)?; 3863 instance 3864 .guest_write( 3865 StoreContextMut(self), 3866 caller, 3867 TransmitIndex::Stream(ty), 3868 options, 3869 None, 3870 stream, 3871 address, 3872 count, 3873 ) 3874 .map(|result| result.encode()) 3875 } 3876 3877 fn stream_read( 3878 &mut self, 3879 instance: Instance, 3880 caller: RuntimeComponentInstanceIndex, 3881 ty: TypeStreamTableIndex, 3882 options: OptionsIndex, 3883 stream: u32, 3884 address: u32, 3885 count: u32, 3886 ) -> Result<u32> { 3887 instance.id().get(self).check_may_leave(caller)?; 3888 instance 3889 .guest_read( 3890 StoreContextMut(self), 3891 caller, 3892 TransmitIndex::Stream(ty), 3893 options, 3894 None, 3895 stream, 3896 address, 3897 count, 3898 ) 3899 .map(|result| result.encode()) 3900 } 3901 3902 fn future_drop_writable( 3903 &mut self, 3904 instance: Instance, 3905 caller: RuntimeComponentInstanceIndex, 3906 ty: TypeFutureTableIndex, 3907 writer: u32, 3908 ) -> Result<()> { 3909 instance.id().get(self).check_may_leave(caller)?; 3910 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) 3911 } 3912 3913 fn flat_stream_write( 3914 &mut self, 3915 instance: Instance, 3916 caller: RuntimeComponentInstanceIndex, 3917 ty: TypeStreamTableIndex, 3918 options: OptionsIndex, 3919 payload_size: u32, 3920 payload_align: u32, 3921 stream: u32, 3922 address: u32, 3923 count: u32, 3924 ) -> Result<u32> { 3925 instance.id().get(self).check_may_leave(caller)?; 3926 instance 3927 .guest_write( 3928 StoreContextMut(self), 3929 caller, 3930 TransmitIndex::Stream(ty), 3931 options, 3932 Some(FlatAbi { 3933 size: payload_size, 3934 align: payload_align, 3935 }), 3936 stream, 3937 address, 3938 count, 3939 ) 3940 .map(|result| result.encode()) 3941 } 3942 3943 fn flat_stream_read( 3944 &mut self, 3945 instance: Instance, 3946 caller: RuntimeComponentInstanceIndex, 3947 ty: TypeStreamTableIndex, 3948 options: OptionsIndex, 3949 payload_size: u32, 3950 payload_align: u32, 3951 stream: u32, 3952 address: u32, 3953 count: u32, 3954 ) -> Result<u32> { 3955 instance.id().get(self).check_may_leave(caller)?; 3956 instance 3957 .guest_read( 3958 StoreContextMut(self), 3959 caller, 3960 TransmitIndex::Stream(ty), 3961 options, 3962 Some(FlatAbi { 3963 size: payload_size, 3964 align: payload_align, 3965 }), 3966 stream, 3967 address, 3968 count, 3969 ) 3970 .map(|result| result.encode()) 3971 } 3972 3973 fn stream_drop_writable( 3974 &mut self, 3975 instance: Instance, 3976 caller: RuntimeComponentInstanceIndex, 3977 ty: TypeStreamTableIndex, 3978 writer: u32, 3979 ) -> Result<()> { 3980 instance.id().get(self).check_may_leave(caller)?; 3981 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) 3982 } 3983 3984 fn error_context_debug_message( 3985 &mut self, 3986 instance: Instance, 3987 caller: RuntimeComponentInstanceIndex, 3988 ty: TypeComponentLocalErrorContextTableIndex, 3989 options: OptionsIndex, 3990 err_ctx_handle: u32, 3991 debug_msg_address: u32, 3992 ) -> Result<()> { 3993 instance.id().get(self).check_may_leave(caller)?; 3994 instance.error_context_debug_message( 3995 StoreContextMut(self), 3996 ty, 3997 options, 3998 err_ctx_handle, 3999 debug_msg_address, 4000 ) 4001 } 4002 4003 fn thread_new_indirect( 4004 &mut self, 4005 instance: Instance, 4006 caller: RuntimeComponentInstanceIndex, 4007 func_ty_idx: TypeFuncIndex, 4008 start_func_table_idx: RuntimeTableIndex, 4009 start_func_idx: u32, 4010 context: i32, 4011 ) -> Result<u32> { 4012 instance.thread_new_indirect( 4013 StoreContextMut(self), 4014 caller, 4015 func_ty_idx, 4016 start_func_table_idx, 4017 start_func_idx, 4018 context, 4019 ) 4020 } 4021 } 4022 4023 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>; 4024 4025 /// Represents the state of a pending host task. 4026 struct HostTask { 4027 common: WaitableCommon, 4028 caller_instance: RuntimeComponentInstanceIndex, 4029 join_handle: Option<JoinHandle>, 4030 } 4031 4032 impl HostTask { 4033 fn new( 4034 caller_instance: RuntimeComponentInstanceIndex, 4035 join_handle: Option<JoinHandle>, 4036 ) -> Self { 4037 Self { 4038 common: WaitableCommon::default(), 4039 caller_instance, 4040 join_handle, 4041 } 4042 } 4043 } 4044 4045 impl TableDebug for HostTask { 4046 fn type_name() -> &'static str { 4047 "HostTask" 4048 } 4049 } 4050 4051 type CallbackFn = Box< 4052 dyn Fn(&mut dyn VMStore, RuntimeComponentInstanceIndex, Event, u32) -> Result<u32> 4053 + Send 4054 + Sync 4055 + 'static, 4056 >; 4057 4058 /// Represents the caller of a given guest task. 4059 enum Caller { 4060 /// The host called the guest task. 4061 Host { 4062 /// If present, may be used to deliver the result. 4063 tx: Option<oneshot::Sender<LiftedResult>>, 4064 /// Channel to notify once all subtasks spawned by this caller have 4065 /// completed. 4066 /// 4067 /// Note that we'll never actually send anything to this channel; 4068 /// dropping it when the refcount goes to zero is sufficient to notify 4069 /// the receiver. 4070 exit_tx: Arc<oneshot::Sender<()>>, 4071 /// If true, there's a host future that must be dropped before the task 4072 /// can be deleted. 4073 host_future_present: bool, 4074 /// If true, call `post-return` function (if any) automatically. 4075 call_post_return_automatically: bool, 4076 }, 4077 /// Another guest thread called the guest task 4078 Guest { 4079 /// The id of the caller 4080 thread: QualifiedThreadId, 4081 /// The instance to use to enforce reentrance rules. 4082 /// 4083 /// Note that this might not be the same as the instance the caller task 4084 /// started executing in given that one or more synchronous guest->guest 4085 /// calls may have occurred involving multiple instances. 4086 instance: RuntimeComponentInstanceIndex, 4087 }, 4088 } 4089 4090 /// Represents a closure and related canonical ABI parameters required to 4091 /// validate a `task.return` call at runtime and lift the result. 4092 struct LiftResult { 4093 lift: RawLift, 4094 ty: TypeTupleIndex, 4095 memory: Option<SendSyncPtr<VMMemoryDefinition>>, 4096 string_encoding: StringEncoding, 4097 } 4098 4099 /// The table ID for a guest thread, qualified by the task to which it belongs. 4100 /// 4101 /// This exists to minimize table lookups and the necessity to pass stores around mutably 4102 /// for the common case of identifying the task to which a thread belongs. 4103 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4104 struct QualifiedThreadId { 4105 task: TableId<GuestTask>, 4106 thread: TableId<GuestThread>, 4107 } 4108 4109 impl QualifiedThreadId { 4110 fn qualify( 4111 state: &mut ConcurrentState, 4112 thread: TableId<GuestThread>, 4113 ) -> Result<QualifiedThreadId> { 4114 Ok(QualifiedThreadId { 4115 task: state.get_mut(thread)?.parent_task, 4116 thread, 4117 }) 4118 } 4119 } 4120 4121 impl fmt::Debug for QualifiedThreadId { 4122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 4123 f.debug_tuple("QualifiedThreadId") 4124 .field(&self.task.rep()) 4125 .field(&self.thread.rep()) 4126 .finish() 4127 } 4128 } 4129 4130 enum GuestThreadState { 4131 NotStartedImplicit, 4132 NotStartedExplicit( 4133 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>, 4134 ), 4135 Running, 4136 Suspended(StoreFiber<'static>), 4137 Pending, 4138 Completed, 4139 } 4140 pub struct GuestThread { 4141 /// Context-local state used to implement the `context.{get,set}` 4142 /// intrinsics. 4143 context: [u32; 2], 4144 /// The owning guest task. 4145 parent_task: TableId<GuestTask>, 4146 /// If present, indicates that the thread is currently waiting on the 4147 /// specified set but may be cancelled and woken immediately. 4148 wake_on_cancel: Option<TableId<WaitableSet>>, 4149 /// The execution state of this guest thread 4150 state: GuestThreadState, 4151 /// The index of this thread in the component instance's handle table. 4152 /// This must always be `Some` after initialization. 4153 instance_rep: Option<u32>, 4154 } 4155 4156 impl GuestThread { 4157 /// Retrieve the `GuestThread` corresponding to the specified guest-visible 4158 /// handle. 4159 fn from_instance( 4160 state: Pin<&mut ComponentInstance>, 4161 caller_instance: RuntimeComponentInstanceIndex, 4162 guest_thread: u32, 4163 ) -> Result<TableId<Self>> { 4164 let rep = state.instance_states().0[caller_instance] 4165 .handle_table() 4166 .guest_thread_rep(guest_thread)?; 4167 Ok(TableId::new(rep)) 4168 } 4169 4170 fn new_implicit(parent_task: TableId<GuestTask>) -> Self { 4171 Self { 4172 context: [0; 2], 4173 parent_task, 4174 wake_on_cancel: None, 4175 state: GuestThreadState::NotStartedImplicit, 4176 instance_rep: None, 4177 } 4178 } 4179 4180 fn new_explicit( 4181 parent_task: TableId<GuestTask>, 4182 start_func: Box< 4183 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, 4184 >, 4185 ) -> Self { 4186 Self { 4187 context: [0; 2], 4188 parent_task, 4189 wake_on_cancel: None, 4190 state: GuestThreadState::NotStartedExplicit(start_func), 4191 instance_rep: None, 4192 } 4193 } 4194 } 4195 4196 impl TableDebug for GuestThread { 4197 fn type_name() -> &'static str { 4198 "GuestThread" 4199 } 4200 } 4201 4202 enum SyncResult { 4203 NotProduced, 4204 Produced(Option<ValRaw>), 4205 Taken, 4206 } 4207 4208 impl SyncResult { 4209 fn take(&mut self) -> Option<Option<ValRaw>> { 4210 match mem::replace(self, SyncResult::Taken) { 4211 SyncResult::NotProduced => None, 4212 SyncResult::Produced(val) => Some(val), 4213 SyncResult::Taken => { 4214 panic!("attempted to take a synchronous result that was already taken") 4215 } 4216 } 4217 } 4218 } 4219 4220 #[derive(Debug)] 4221 enum HostFutureState { 4222 NotApplicable, 4223 Live, 4224 Dropped, 4225 } 4226 4227 /// Represents a pending guest task. 4228 pub(crate) struct GuestTask { 4229 /// See `WaitableCommon` 4230 common: WaitableCommon, 4231 /// Closure to lower the parameters passed to this task. 4232 lower_params: Option<RawLower>, 4233 /// See `LiftResult` 4234 lift_result: Option<LiftResult>, 4235 /// A place to stash the type-erased lifted result if it can't be delivered 4236 /// immediately. 4237 result: Option<LiftedResult>, 4238 /// Closure to call the callback function for an async-lifted export, if 4239 /// provided. 4240 callback: Option<CallbackFn>, 4241 /// See `Caller` 4242 caller: Caller, 4243 /// A place to stash the call context for managing resource borrows while 4244 /// switching between guest tasks. 4245 call_context: Option<CallContext>, 4246 /// A place to stash the lowered result for a sync-to-async call until it 4247 /// can be returned to the caller. 4248 sync_result: SyncResult, 4249 /// Whether or not the task has been cancelled (i.e. whether the task is 4250 /// permitted to call `task.cancel`). 4251 cancel_sent: bool, 4252 /// Whether or not we've sent a `Status::Starting` event to any current or 4253 /// future waiters for this waitable. 4254 starting_sent: bool, 4255 /// Pending guest subtasks created by this task (directly or indirectly). 4256 /// 4257 /// This is used to re-parent subtasks which are still running when their 4258 /// parent task is disposed. 4259 subtasks: HashSet<TableId<GuestTask>>, 4260 /// Scratch waitable set used to watch subtasks during synchronous calls. 4261 sync_call_set: TableId<WaitableSet>, 4262 /// The runtime instance to which the exported function for this guest task 4263 /// belongs. 4264 /// 4265 /// Note that the task may do a sync->sync call via a fused adapter which 4266 /// results in that task executing code in a different instance, and it may 4267 /// call host functions and intrinsics from that other instance. 4268 instance: RuntimeInstance, 4269 /// If present, a pending `Event::None` or `Event::Cancelled` to be 4270 /// delivered to this task. 4271 event: Option<Event>, 4272 /// The `ExportIndex` of the guest function being called, if known. 4273 function_index: Option<ExportIndex>, 4274 /// Whether or not the task has exited. 4275 exited: bool, 4276 /// Threads belonging to this task 4277 threads: HashSet<TableId<GuestThread>>, 4278 /// The state of the host future that represents an async task, which must 4279 /// be dropped before we can delete the task. 4280 host_future_state: HostFutureState, 4281 /// Indicates whether this task was created for a call to an async-lifted 4282 /// export. 4283 async_function: bool, 4284 } 4285 4286 impl GuestTask { 4287 fn already_lowered_parameters(&self) -> bool { 4288 // We reset `lower_params` after we lower the parameters 4289 self.lower_params.is_none() 4290 } 4291 4292 fn returned_or_cancelled(&self) -> bool { 4293 // We reset `lift_result` after we return or exit 4294 self.lift_result.is_none() 4295 } 4296 4297 fn ready_to_delete(&self) -> bool { 4298 let threads_completed = self.threads.is_empty(); 4299 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); 4300 let pending_completion_event = matches!( 4301 self.common.event, 4302 Some(Event::Subtask { 4303 status: Status::Returned | Status::ReturnCancelled 4304 }) 4305 ); 4306 let ready = threads_completed 4307 && !has_sync_result 4308 && !pending_completion_event 4309 && !matches!(self.host_future_state, HostFutureState::Live); 4310 log::trace!( 4311 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", 4312 threads_completed, 4313 has_sync_result, 4314 pending_completion_event, 4315 self.host_future_state 4316 ); 4317 ready 4318 } 4319 4320 fn new( 4321 state: &mut ConcurrentState, 4322 lower_params: RawLower, 4323 lift_result: LiftResult, 4324 caller: Caller, 4325 callback: Option<CallbackFn>, 4326 component_instance: Instance, 4327 instance: RuntimeComponentInstanceIndex, 4328 async_function: bool, 4329 ) -> Result<Self> { 4330 let sync_call_set = state.push(WaitableSet::default())?; 4331 let host_future_state = match &caller { 4332 Caller::Guest { .. } => HostFutureState::NotApplicable, 4333 Caller::Host { 4334 host_future_present, 4335 .. 4336 } => { 4337 if *host_future_present { 4338 HostFutureState::Live 4339 } else { 4340 HostFutureState::NotApplicable 4341 } 4342 } 4343 }; 4344 Ok(Self { 4345 common: WaitableCommon::default(), 4346 lower_params: Some(lower_params), 4347 lift_result: Some(lift_result), 4348 result: None, 4349 callback, 4350 caller, 4351 call_context: Some(CallContext::default()), 4352 sync_result: SyncResult::NotProduced, 4353 cancel_sent: false, 4354 starting_sent: false, 4355 subtasks: HashSet::new(), 4356 sync_call_set, 4357 instance: RuntimeInstance { 4358 instance: component_instance.id().instance(), 4359 index: instance, 4360 }, 4361 event: None, 4362 function_index: None, 4363 exited: false, 4364 threads: HashSet::new(), 4365 host_future_state, 4366 async_function, 4367 }) 4368 } 4369 4370 /// Dispose of this guest task, reparenting any pending subtasks to the 4371 /// caller. 4372 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> { 4373 // If there are not-yet-delivered completion events for subtasks in 4374 // `self.sync_call_set`, recursively dispose of those subtasks as well. 4375 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) { 4376 if let Some(Event::Subtask { 4377 status: Status::Returned | Status::ReturnCancelled, 4378 }) = waitable.common(state)?.event 4379 { 4380 waitable.delete_from(state)?; 4381 } 4382 } 4383 4384 assert!(self.threads.is_empty()); 4385 4386 state.delete(self.sync_call_set)?; 4387 4388 // Reparent any pending subtasks to the caller. 4389 match &self.caller { 4390 Caller::Guest { 4391 thread, 4392 instance: runtime_instance, 4393 } => { 4394 let task_mut = state.get_mut(thread.task)?; 4395 let present = task_mut.subtasks.remove(&me); 4396 assert!(present); 4397 4398 for subtask in &self.subtasks { 4399 task_mut.subtasks.insert(*subtask); 4400 } 4401 4402 for subtask in &self.subtasks { 4403 state.get_mut(*subtask)?.caller = Caller::Guest { 4404 thread: *thread, 4405 instance: *runtime_instance, 4406 }; 4407 } 4408 } 4409 Caller::Host { exit_tx, .. } => { 4410 for subtask in &self.subtasks { 4411 state.get_mut(*subtask)?.caller = Caller::Host { 4412 tx: None, 4413 // Clone `exit_tx` to ensure that it is only dropped 4414 // once all transitive subtasks of the host call have 4415 // exited: 4416 exit_tx: exit_tx.clone(), 4417 host_future_present: false, 4418 call_post_return_automatically: true, 4419 }; 4420 } 4421 } 4422 } 4423 4424 for subtask in self.subtasks { 4425 if state.get_mut(subtask)?.exited { 4426 Waitable::Guest(subtask).delete_from(state)?; 4427 } 4428 } 4429 4430 Ok(()) 4431 } 4432 4433 fn call_post_return_automatically(&self) -> bool { 4434 matches!( 4435 self.caller, 4436 Caller::Guest { .. } 4437 | Caller::Host { 4438 call_post_return_automatically: true, 4439 .. 4440 } 4441 ) 4442 } 4443 } 4444 4445 impl TableDebug for GuestTask { 4446 fn type_name() -> &'static str { 4447 "GuestTask" 4448 } 4449 } 4450 4451 /// Represents state common to all kinds of waitables. 4452 #[derive(Default)] 4453 struct WaitableCommon { 4454 /// The currently pending event for this waitable, if any. 4455 event: Option<Event>, 4456 /// The set to which this waitable belongs, if any. 4457 set: Option<TableId<WaitableSet>>, 4458 /// The handle with which the guest refers to this waitable, if any. 4459 handle: Option<u32>, 4460 } 4461 4462 /// Represents a Component Model Async `waitable`. 4463 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4464 enum Waitable { 4465 /// A host task 4466 Host(TableId<HostTask>), 4467 /// A guest task 4468 Guest(TableId<GuestTask>), 4469 /// The read or write end of a stream or future 4470 Transmit(TableId<TransmitHandle>), 4471 } 4472 4473 impl Waitable { 4474 /// Retrieve the `Waitable` corresponding to the specified guest-visible 4475 /// handle. 4476 fn from_instance( 4477 state: Pin<&mut ComponentInstance>, 4478 caller_instance: RuntimeComponentInstanceIndex, 4479 waitable: u32, 4480 ) -> Result<Self> { 4481 use crate::runtime::vm::component::Waitable; 4482 4483 let (waitable, kind) = state.instance_states().0[caller_instance] 4484 .handle_table() 4485 .waitable_rep(waitable)?; 4486 4487 Ok(match kind { 4488 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), 4489 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), 4490 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), 4491 }) 4492 } 4493 4494 /// Retrieve the host-visible identifier for this `Waitable`. 4495 fn rep(&self) -> u32 { 4496 match self { 4497 Self::Host(id) => id.rep(), 4498 Self::Guest(id) => id.rep(), 4499 Self::Transmit(id) => id.rep(), 4500 } 4501 } 4502 4503 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or 4504 /// remove it from any set it may currently belong to (when `set` is 4505 /// `None`). 4506 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> { 4507 log::trace!("waitable {self:?} join set {set:?}",); 4508 4509 let old = mem::replace(&mut self.common(state)?.set, set); 4510 4511 if let Some(old) = old { 4512 match *self { 4513 Waitable::Host(id) => state.remove_child(id, old), 4514 Waitable::Guest(id) => state.remove_child(id, old), 4515 Waitable::Transmit(id) => state.remove_child(id, old), 4516 }?; 4517 4518 state.get_mut(old)?.ready.remove(self); 4519 } 4520 4521 if let Some(set) = set { 4522 match *self { 4523 Waitable::Host(id) => state.add_child(id, set), 4524 Waitable::Guest(id) => state.add_child(id, set), 4525 Waitable::Transmit(id) => state.add_child(id, set), 4526 }?; 4527 4528 if self.common(state)?.event.is_some() { 4529 self.mark_ready(state)?; 4530 } 4531 } 4532 4533 Ok(()) 4534 } 4535 4536 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. 4537 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { 4538 Ok(match self { 4539 Self::Host(id) => &mut state.get_mut(*id)?.common, 4540 Self::Guest(id) => &mut state.get_mut(*id)?.common, 4541 Self::Transmit(id) => &mut state.get_mut(*id)?.common, 4542 }) 4543 } 4544 4545 /// Set or clear the pending event for this waitable and either deliver it 4546 /// to the first waiter, if any, or mark it as ready to be delivered to the 4547 /// next waiter that arrives. 4548 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> { 4549 log::trace!("set event for {self:?}: {event:?}"); 4550 self.common(state)?.event = event; 4551 self.mark_ready(state) 4552 } 4553 4554 /// Take the pending event from this waitable, leaving `None` in its place. 4555 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> { 4556 let common = self.common(state)?; 4557 let event = common.event.take(); 4558 if let Some(set) = self.common(state)?.set { 4559 state.get_mut(set)?.ready.remove(self); 4560 } 4561 4562 Ok(event) 4563 } 4564 4565 /// Deliver the current event for this waitable to the first waiter, if any, 4566 /// or else mark it as ready to be delivered to the next waiter that 4567 /// arrives. 4568 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { 4569 if let Some(set) = self.common(state)?.set { 4570 state.get_mut(set)?.ready.insert(*self); 4571 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { 4572 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); 4573 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); 4574 4575 let item = match mode { 4576 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 4577 WaitMode::Callback(instance) => WorkItem::GuestCall(GuestCall { 4578 thread, 4579 kind: GuestCallKind::DeliverEvent { 4580 instance, 4581 set: Some(set), 4582 }, 4583 }), 4584 }; 4585 state.push_high_priority(item); 4586 } 4587 } 4588 Ok(()) 4589 } 4590 4591 /// Remove this waitable from the instance's rep table. 4592 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { 4593 match self { 4594 Self::Host(task) => { 4595 log::trace!("delete host task {task:?}"); 4596 state.delete(*task)?; 4597 } 4598 Self::Guest(task) => { 4599 log::trace!("delete guest task {task:?}"); 4600 state.delete(*task)?.dispose(state, *task)?; 4601 } 4602 Self::Transmit(task) => { 4603 state.delete(*task)?; 4604 } 4605 } 4606 4607 Ok(()) 4608 } 4609 } 4610 4611 impl fmt::Debug for Waitable { 4612 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 4613 match self { 4614 Self::Host(id) => write!(f, "{id:?}"), 4615 Self::Guest(id) => write!(f, "{id:?}"), 4616 Self::Transmit(id) => write!(f, "{id:?}"), 4617 } 4618 } 4619 } 4620 4621 /// Represents a Component Model Async `waitable-set`. 4622 #[derive(Default)] 4623 struct WaitableSet { 4624 /// Which waitables in this set have pending events, if any. 4625 ready: BTreeSet<Waitable>, 4626 /// Which guest threads are currently waiting on this set, if any. 4627 waiting: BTreeMap<QualifiedThreadId, WaitMode>, 4628 } 4629 4630 impl TableDebug for WaitableSet { 4631 fn type_name() -> &'static str { 4632 "WaitableSet" 4633 } 4634 } 4635 4636 /// Type-erased closure to lower the parameters for a guest task. 4637 type RawLower = 4638 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>; 4639 4640 /// Type-erased closure to lift the result for a guest task. 4641 type RawLift = Box< 4642 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4643 >; 4644 4645 /// Type erased result of a guest task which may be downcast to the expected 4646 /// type by a host caller (or simply ignored in the case of a guest caller; see 4647 /// `DummyResult`). 4648 type LiftedResult = Box<dyn Any + Send + Sync>; 4649 4650 /// Used to return a result from a `LiftFn` when the actual result has already 4651 /// been lowered to a guest task's stack and linear memory. 4652 struct DummyResult; 4653 4654 /// Represents the Component Model Async state of a (sub-)component instance. 4655 #[derive(Default)] 4656 pub struct ConcurrentInstanceState { 4657 /// Whether backpressure is set for this instance (enabled if >0) 4658 backpressure: u16, 4659 /// Whether this instance can be entered 4660 do_not_enter: bool, 4661 /// Pending calls for this instance which require `Self::backpressure` to be 4662 /// `true` and/or `Self::do_not_enter` to be false before they can proceed. 4663 pending: BTreeMap<QualifiedThreadId, GuestCallKind>, 4664 } 4665 4666 impl ConcurrentInstanceState { 4667 pub fn pending_is_empty(&self) -> bool { 4668 self.pending.is_empty() 4669 } 4670 } 4671 4672 /// Represents the Component Model Async state of a store. 4673 pub struct ConcurrentState { 4674 /// The currently running guest thread, if any. 4675 guest_thread: Option<QualifiedThreadId>, 4676 4677 /// The set of pending host and background tasks, if any. 4678 /// 4679 /// See `ComponentInstance::poll_until` for where we temporarily take this 4680 /// out, poll it, then put it back to avoid any mutable aliasing hazards. 4681 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>, 4682 /// The table of waitables, waitable sets, etc. 4683 table: AlwaysMut<ResourceTable>, 4684 /// The "high priority" work queue for this store's event loop. 4685 high_priority: Vec<WorkItem>, 4686 /// The "low priority" work queue for this store's event loop. 4687 low_priority: Vec<WorkItem>, 4688 /// A place to stash the reason a fiber is suspending so that the code which 4689 /// resumed it will know under what conditions the fiber should be resumed 4690 /// again. 4691 suspend_reason: Option<SuspendReason>, 4692 /// A cached fiber which is waiting for work to do. 4693 /// 4694 /// This helps us avoid creating a new fiber for each `GuestCall` work item. 4695 worker: Option<StoreFiber<'static>>, 4696 /// A place to stash the work item for which we're resuming a worker fiber. 4697 worker_item: Option<WorkerItem>, 4698 4699 /// Reference counts for all component error contexts 4700 /// 4701 /// NOTE: it is possible the global ref count to be *greater* than the sum of 4702 /// (sub)component ref counts as tracked by `error_context_tables`, for 4703 /// example when the host holds one or more references to error contexts. 4704 /// 4705 /// The key of this primary map is often referred to as the "rep" (i.e. host-side 4706 /// component-wide representation) of the index into concurrent state for a given 4707 /// stored `ErrorContext`. 4708 /// 4709 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same 4710 /// as a `TableId<ErrorContextState>`. 4711 global_error_context_ref_counts: 4712 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>, 4713 } 4714 4715 impl Default for ConcurrentState { 4716 fn default() -> Self { 4717 Self { 4718 guest_thread: None, 4719 table: AlwaysMut::new(ResourceTable::new()), 4720 futures: AlwaysMut::new(Some(FuturesUnordered::new())), 4721 high_priority: Vec::new(), 4722 low_priority: Vec::new(), 4723 suspend_reason: None, 4724 worker: None, 4725 worker_item: None, 4726 global_error_context_ref_counts: BTreeMap::new(), 4727 } 4728 } 4729 } 4730 4731 impl ConcurrentState { 4732 /// Take ownership of any fibers and futures owned by this object. 4733 /// 4734 /// This should be used when disposing of the `Store` containing this object 4735 /// in order to gracefully resolve any and all fibers using 4736 /// `StoreFiber::dispose`. This is necessary to avoid possible 4737 /// use-after-free bugs due to fibers which may still have access to the 4738 /// `Store`. 4739 /// 4740 /// Additionally, the futures collected with this function should be dropped 4741 /// within a `tls::set` call, which will ensure than any futures closing 4742 /// over an `&Accessor` will have access to the store when dropped, allowing 4743 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without 4744 /// panicking. 4745 /// 4746 /// Note that this will leave the object in an inconsistent and unusable 4747 /// state, so it should only be used just prior to dropping it. 4748 pub(crate) fn take_fibers_and_futures( 4749 &mut self, 4750 fibers: &mut Vec<StoreFiber<'static>>, 4751 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, 4752 ) { 4753 for entry in self.table.get_mut().iter_mut() { 4754 if let Some(set) = entry.downcast_mut::<WaitableSet>() { 4755 for mode in mem::take(&mut set.waiting).into_values() { 4756 if let WaitMode::Fiber(fiber) = mode { 4757 fibers.push(fiber); 4758 } 4759 } 4760 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() { 4761 if let GuestThreadState::Suspended(fiber) = 4762 mem::replace(&mut thread.state, GuestThreadState::Completed) 4763 { 4764 fibers.push(fiber); 4765 } 4766 } 4767 } 4768 4769 if let Some(fiber) = self.worker.take() { 4770 fibers.push(fiber); 4771 } 4772 4773 let mut take_items = |list| { 4774 for item in mem::take(list) { 4775 match item { 4776 WorkItem::ResumeFiber(fiber) => { 4777 fibers.push(fiber); 4778 } 4779 WorkItem::PushFuture(future) => { 4780 self.futures 4781 .get_mut() 4782 .as_mut() 4783 .unwrap() 4784 .push(future.into_inner()); 4785 } 4786 _ => {} 4787 } 4788 } 4789 }; 4790 4791 take_items(&mut self.high_priority); 4792 take_items(&mut self.low_priority); 4793 4794 if let Some(them) = self.futures.get_mut().take() { 4795 futures.push(them); 4796 } 4797 } 4798 4799 fn push<V: Send + Sync + 'static>( 4800 &mut self, 4801 value: V, 4802 ) -> Result<TableId<V>, ResourceTableError> { 4803 self.table.get_mut().push(value).map(TableId::from) 4804 } 4805 4806 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> { 4807 self.table.get_mut().get_mut(&Resource::from(id)) 4808 } 4809 4810 pub fn add_child<T: 'static, U: 'static>( 4811 &mut self, 4812 child: TableId<T>, 4813 parent: TableId<U>, 4814 ) -> Result<(), ResourceTableError> { 4815 self.table 4816 .get_mut() 4817 .add_child(Resource::from(child), Resource::from(parent)) 4818 } 4819 4820 pub fn remove_child<T: 'static, U: 'static>( 4821 &mut self, 4822 child: TableId<T>, 4823 parent: TableId<U>, 4824 ) -> Result<(), ResourceTableError> { 4825 self.table 4826 .get_mut() 4827 .remove_child(Resource::from(child), Resource::from(parent)) 4828 } 4829 4830 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> { 4831 self.table.get_mut().delete(Resource::from(id)) 4832 } 4833 4834 fn push_future(&mut self, future: HostTaskFuture) { 4835 // Note that we can't directly push to `ConcurrentState::futures` here 4836 // since this may be called from a future that's being polled inside 4837 // `Self::poll_until`, which temporarily removes the `FuturesUnordered` 4838 // so it has exclusive access while polling it. Therefore, we push a 4839 // work item to the "high priority" queue, which will actually push to 4840 // `ConcurrentState::futures` later. 4841 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); 4842 } 4843 4844 fn push_high_priority(&mut self, item: WorkItem) { 4845 log::trace!("push high priority: {item:?}"); 4846 self.high_priority.push(item); 4847 } 4848 4849 fn push_low_priority(&mut self, item: WorkItem) { 4850 log::trace!("push low priority: {item:?}"); 4851 self.low_priority.push(item); 4852 } 4853 4854 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { 4855 if high_priority { 4856 self.push_high_priority(item); 4857 } else { 4858 self.push_low_priority(item); 4859 } 4860 } 4861 4862 /// Determine whether the instance associated with the specified guest task 4863 /// may be entered (i.e. is not already on the async call stack). 4864 /// 4865 /// This is an additional check on top of the "may_enter" instance flag; 4866 /// it's needed because async-lifted exports with callback functions must 4867 /// not call their own instances directly or indirectly, and due to the 4868 /// "stackless" nature of callback-enabled guest tasks this may happen even 4869 /// if there are no activation records on the stack (i.e. the "may_enter" 4870 /// field is `true`) for that instance. 4871 fn may_enter(&mut self, mut guest_task: TableId<GuestTask>) -> bool { 4872 let guest_instance = self.get_mut(guest_task).unwrap().instance; 4873 4874 // Walk the task tree back to the root, looking for potential 4875 // reentrance. 4876 // 4877 // TODO: This could be optimized by maintaining a per-`GuestTask` bitset 4878 // such that each bit represents and instance which has been entered by 4879 // that task or an ancestor of that task, in which case this would be a 4880 // constant time check. 4881 loop { 4882 let next_thread = match &self.get_mut(guest_task).unwrap().caller { 4883 Caller::Host { .. } => break true, 4884 Caller::Guest { thread, instance } => { 4885 if *instance == guest_instance.index { 4886 break false; 4887 } else { 4888 *thread 4889 } 4890 } 4891 }; 4892 guest_task = next_thread.task; 4893 } 4894 } 4895 4896 /// Implements the `context.get` intrinsic. 4897 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> { 4898 let thread = self.guest_thread.unwrap(); 4899 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()]; 4900 log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); 4901 Ok(val) 4902 } 4903 4904 /// Implements the `context.set` intrinsic. 4905 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { 4906 let thread = self.guest_thread.unwrap(); 4907 log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); 4908 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val; 4909 Ok(()) 4910 } 4911 4912 /// Returns whether there's a pending cancellation on the current guest thread, 4913 /// consuming the event if so. 4914 fn take_pending_cancellation(&mut self) -> bool { 4915 let thread = self.guest_thread.unwrap(); 4916 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() { 4917 assert!(matches!(event, Event::Cancelled)); 4918 true 4919 } else { 4920 false 4921 } 4922 } 4923 4924 fn check_blocking(&mut self) -> Result<()> { 4925 let task = self.guest_thread.unwrap().task; 4926 self.check_blocking_for(task) 4927 } 4928 4929 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> { 4930 if self.may_block(task) { 4931 Ok(()) 4932 } else { 4933 Err(Trap::CannotBlockSyncTask.into()) 4934 } 4935 } 4936 4937 fn may_block(&mut self, task: TableId<GuestTask>) -> bool { 4938 let task = self.get_mut(task).unwrap(); 4939 task.async_function || task.returned_or_cancelled() 4940 } 4941 } 4942 4943 /// Provide a type hint to compiler about the shape of a parameter lower 4944 /// closure. 4945 fn for_any_lower< 4946 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, 4947 >( 4948 fun: F, 4949 ) -> F { 4950 fun 4951 } 4952 4953 /// Provide a type hint to compiler about the shape of a result lift closure. 4954 fn for_any_lift< 4955 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4956 >( 4957 fun: F, 4958 ) -> F { 4959 fun 4960 } 4961 4962 /// Wrap the specified future in a `poll_fn` which asserts that the future is 4963 /// only polled from the event loop of the specified `Store`. 4964 /// 4965 /// See `StoreContextMut::run_concurrent` for details. 4966 fn checked<F: Future + Send + 'static>( 4967 id: StoreId, 4968 fut: F, 4969 ) -> impl Future<Output = F::Output> + Send + 'static { 4970 async move { 4971 let mut fut = pin!(fut); 4972 future::poll_fn(move |cx| { 4973 let message = "\ 4974 `Future`s which depend on asynchronous component tasks, streams, or \ 4975 futures to complete may only be polled from the event loop of the \ 4976 store to which they belong. Please use \ 4977 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ 4978 "; 4979 tls::try_get(|store| { 4980 let matched = match store { 4981 tls::TryGet::Some(store) => store.id() == id, 4982 tls::TryGet::Taken | tls::TryGet::None => false, 4983 }; 4984 4985 if !matched { 4986 panic!("{message}") 4987 } 4988 }); 4989 fut.as_mut().poll(cx) 4990 }) 4991 .await 4992 } 4993 } 4994 4995 /// Assert that `StoreContextMut::run_concurrent` has not been called from 4996 /// within an store's event loop. 4997 fn check_recursive_run() { 4998 tls::try_get(|store| { 4999 if !matches!(store, tls::TryGet::None) { 5000 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") 5001 } 5002 }); 5003 } 5004 5005 fn unpack_callback_code(code: u32) -> (u32, u32) { 5006 (code & 0xF, code >> 4) 5007 } 5008 5009 /// Helper struct for packaging parameters to be passed to 5010 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or 5011 /// `waitable-set.poll`. 5012 struct WaitableCheckParams { 5013 set: TableId<WaitableSet>, 5014 options: OptionsIndex, 5015 payload: u32, 5016 } 5017 5018 /// Indicates whether `ComponentInstance::waitable_check` is being called for 5019 /// `waitable-set.wait` or `waitable-set.poll`. 5020 enum WaitableCheck { 5021 Wait, 5022 Poll, 5023 } 5024 5025 /// Represents a guest task called from the host, prepared using `prepare_call`. 5026 pub(crate) struct PreparedCall<R> { 5027 /// The guest export to be called 5028 handle: Func, 5029 /// The guest thread created by `prepare_call` 5030 thread: QualifiedThreadId, 5031 /// The number of lowered core Wasm parameters to pass to the call. 5032 param_count: usize, 5033 /// The `oneshot::Receiver` to which the result of the call will be 5034 /// delivered when it is available. 5035 rx: oneshot::Receiver<LiftedResult>, 5036 /// The `oneshot::Receiver` which will resolve when the task -- and any 5037 /// transitive subtasks -- have all exited. 5038 exit_rx: oneshot::Receiver<()>, 5039 _phantom: PhantomData<R>, 5040 } 5041 5042 impl<R> PreparedCall<R> { 5043 /// Get a copy of the `TaskId` for this `PreparedCall`. 5044 pub(crate) fn task_id(&self) -> TaskId { 5045 TaskId { 5046 task: self.thread.task, 5047 } 5048 } 5049 } 5050 5051 /// Represents a task created by `prepare_call`. 5052 pub(crate) struct TaskId { 5053 task: TableId<GuestTask>, 5054 } 5055 5056 impl TaskId { 5057 /// The host future for an async task was dropped. If the parameters have not been lowered yet, 5058 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, 5059 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended 5060 /// and can be resumed by other tasks for this component, so we mark the future as dropped 5061 /// and delete the task when all threads are done. 5062 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> { 5063 let task = store.0.concurrent_state_mut().get_mut(self.task)?; 5064 if !task.already_lowered_parameters() { 5065 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5066 } else { 5067 task.host_future_state = HostFutureState::Dropped; 5068 if task.ready_to_delete() { 5069 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5070 } 5071 } 5072 Ok(()) 5073 } 5074 } 5075 5076 /// Prepare a call to the specified exported Wasm function, providing functions 5077 /// for lowering the parameters and lifting the result. 5078 /// 5079 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event 5080 /// loop, use `queue_call`. 5081 pub(crate) fn prepare_call<T, R>( 5082 mut store: StoreContextMut<T>, 5083 handle: Func, 5084 param_count: usize, 5085 host_future_present: bool, 5086 call_post_return_automatically: bool, 5087 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> 5088 + Send 5089 + Sync 5090 + 'static, 5091 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> 5092 + Send 5093 + Sync 5094 + 'static, 5095 ) -> Result<PreparedCall<R>> { 5096 let (options, _flags, ty, raw_options) = handle.abi_info(store.0); 5097 5098 let instance = handle.instance().id().get(store.0); 5099 let options = &instance.component().env_component().options[options]; 5100 let ty = &instance.component().types()[ty]; 5101 let async_function = ty.async_; 5102 let task_return_type = ty.results; 5103 let component_instance = raw_options.instance; 5104 let callback = options.callback.map(|i| instance.runtime_callback(i)); 5105 let memory = options 5106 .memory() 5107 .map(|i| instance.runtime_memory(i)) 5108 .map(SendSyncPtr::new); 5109 let string_encoding = options.string_encoding; 5110 let token = StoreToken::new(store.as_context_mut()); 5111 let state = store.0.concurrent_state_mut(); 5112 5113 let (tx, rx) = oneshot::channel(); 5114 let (exit_tx, exit_rx) = oneshot::channel(); 5115 5116 let mut task = GuestTask::new( 5117 state, 5118 Box::new(for_any_lower(move |store, params| { 5119 lower_params(handle, token.as_context_mut(store), params) 5120 })), 5121 LiftResult { 5122 lift: Box::new(for_any_lift(move |store, result| { 5123 lift_result(handle, store, result) 5124 })), 5125 ty: task_return_type, 5126 memory, 5127 string_encoding, 5128 }, 5129 Caller::Host { 5130 tx: Some(tx), 5131 exit_tx: Arc::new(exit_tx), 5132 host_future_present, 5133 call_post_return_automatically, 5134 }, 5135 callback.map(|callback| { 5136 let callback = SendSyncPtr::new(callback); 5137 let instance = handle.instance(); 5138 Box::new( 5139 move |store: &mut dyn VMStore, runtime_instance, event, handle| { 5140 let store = token.as_context_mut(store); 5141 // SAFETY: Per the contract of `prepare_call`, the callback 5142 // will remain valid at least as long is this task exists. 5143 unsafe { 5144 instance.call_callback( 5145 store, 5146 runtime_instance, 5147 callback, 5148 event, 5149 handle, 5150 call_post_return_automatically, 5151 ) 5152 } 5153 }, 5154 ) as CallbackFn 5155 }), 5156 handle.instance(), 5157 component_instance, 5158 async_function, 5159 )?; 5160 task.function_index = Some(handle.index()); 5161 5162 let task = state.push(task)?; 5163 let thread = state.push(GuestThread::new_implicit(task))?; 5164 state.get_mut(task)?.threads.insert(thread); 5165 5166 Ok(PreparedCall { 5167 handle, 5168 thread: QualifiedThreadId { task, thread }, 5169 param_count, 5170 rx, 5171 exit_rx, 5172 _phantom: PhantomData, 5173 }) 5174 } 5175 5176 /// Queue a call previously prepared using `prepare_call` to be run as part of 5177 /// the associated `ComponentInstance`'s event loop. 5178 /// 5179 /// The returned future will resolve to the result once it is available, but 5180 /// must only be polled via the instance's event loop. See 5181 /// `StoreContextMut::run_concurrent` for details. 5182 pub(crate) fn queue_call<T: 'static, R: Send + 'static>( 5183 mut store: StoreContextMut<T>, 5184 prepared: PreparedCall<R>, 5185 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> { 5186 let PreparedCall { 5187 handle, 5188 thread, 5189 param_count, 5190 rx, 5191 exit_rx, 5192 .. 5193 } = prepared; 5194 5195 queue_call0(store.as_context_mut(), handle, thread, param_count)?; 5196 5197 Ok(checked( 5198 store.0.id(), 5199 rx.map(move |result| { 5200 result 5201 .map(|v| (*v.downcast().unwrap(), exit_rx)) 5202 .map_err(anyhow::Error::from) 5203 }), 5204 )) 5205 } 5206 5207 /// Queue a call previously prepared using `prepare_call` to be run as part of 5208 /// the associated `ComponentInstance`'s event loop. 5209 fn queue_call0<T: 'static>( 5210 store: StoreContextMut<T>, 5211 handle: Func, 5212 guest_thread: QualifiedThreadId, 5213 param_count: usize, 5214 ) -> Result<()> { 5215 let (_options, flags, _ty, raw_options) = handle.abi_info(store.0); 5216 let is_concurrent = raw_options.async_; 5217 let callback = raw_options.callback; 5218 let instance = handle.instance(); 5219 let callee = handle.lifted_core_func(store.0); 5220 let post_return = handle.post_return_core_func(store.0); 5221 let callback = callback.map(|i| { 5222 let instance = instance.id().get(store.0); 5223 SendSyncPtr::new(instance.runtime_callback(i)) 5224 }); 5225 5226 log::trace!("queueing call {guest_thread:?}"); 5227 5228 let instance_flags = if callback.is_none() { 5229 None 5230 } else { 5231 Some(flags) 5232 }; 5233 5234 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers 5235 // (with signatures appropriate for this call) and will remain valid as 5236 // long as this instance is valid. 5237 unsafe { 5238 instance.queue_call( 5239 store, 5240 guest_thread, 5241 SendSyncPtr::new(callee), 5242 param_count, 5243 1, 5244 instance_flags, 5245 is_concurrent, 5246 callback, 5247 post_return.map(SendSyncPtr::new), 5248 ) 5249 } 5250 } 5251