1 //! Runtime support for the Component Model Async ABI. 2 //! 3 //! This module and its submodules provide host runtime support for Component 4 //! Model Async features such as async-lifted exports, async-lowered imports, 5 //! streams, futures, and related intrinsics. See [the Async 6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) 7 //! for a high-level overview. 8 //! 9 //! At the core of this support is an event loop which schedules and switches 10 //! between guest tasks and any host tasks they create. Each 11 //! `Store` will have at most one event loop running at any given 12 //! time, and that loop may be suspended and resumed by the host embedder using 13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` 14 //! function contains the loop itself, while the 15 //! `StoreOpaque::concurrent_state` field holds its state. 16 //! 17 //! # Public API Overview 18 //! 19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) 20 //! 21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an 22 //! async-lifted or sync-lifted import, creating a guest task. 23 //! 24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified 25 //! instance, allowing any and all tasks belonging to that instance to make 26 //! progress. 27 //! 28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop 29 //! for the specified instance. 30 //! 31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or 32 //! `stream` which may be passed to the guest. This takes a 33 //! `{Future,Stream}Producer` implementation which will be polled for items when 34 //! the consumer requests them. 35 //! 36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by 37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items 38 //! produced by the write end. 39 //! 40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) 41 //! 42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host 43 //! function with the linker. That function will take an `Accessor` as its 44 //! first parameter, which provides access to the store between (but not across) 45 //! await points. 46 //! 47 //! - `Accessor::with`: Access the store and its associated data. 48 //! 49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the 50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use 51 //! in host functions. 52 53 use crate::bail_bug; 54 use crate::component::func::{self, Func, call_post_return}; 55 use crate::component::{ 56 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance, 57 }; 58 use crate::fiber::{self, StoreFiber, StoreFiberYield}; 59 use crate::prelude::*; 60 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; 61 use crate::vm::component::{CallContext, ComponentInstance, InstanceState}; 62 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; 63 use crate::{ 64 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, bail, 65 }; 66 use error_contexts::GlobalErrorContextRefCount; 67 use futures::channel::oneshot; 68 use futures::future::{self, FutureExt}; 69 use futures::stream::{FuturesUnordered, StreamExt}; 70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; 71 use std::any::Any; 72 use std::borrow::ToOwned; 73 use std::boxed::Box; 74 use std::cell::UnsafeCell; 75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; 76 use std::fmt; 77 use std::future::Future; 78 use std::marker::PhantomData; 79 use std::mem::{self, ManuallyDrop, MaybeUninit}; 80 use std::ops::DerefMut; 81 use std::pin::{Pin, pin}; 82 use std::ptr::{self, NonNull}; 83 use std::task::{Context, Poll, Waker}; 84 use std::vec::Vec; 85 use table::{TableDebug, TableId}; 86 use wasmtime_environ::Trap; 87 use wasmtime_environ::component::{ 88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS, 89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, 90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, 91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, 92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, 93 }; 94 use wasmtime_environ::packed_option::ReservedValue; 95 96 pub use abort::JoinHandle; 97 pub use future_stream_any::{FutureAny, StreamAny}; 98 pub use futures_and_streams::{ 99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, 100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, 101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, 102 }; 103 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; 104 105 mod abort; 106 mod error_contexts; 107 mod future_stream_any; 108 mod futures_and_streams; 109 pub(crate) mod table; 110 pub(crate) mod tls; 111 112 /// Constant defined in the Component Model spec to indicate that the async 113 /// intrinsic (e.g. `future.write`) has not yet completed. 114 const BLOCKED: u32 = 0xffff_ffff; 115 116 /// Corresponds to `CallState` in the upstream spec. 117 #[derive(Clone, Copy, Eq, PartialEq, Debug)] 118 pub enum Status { 119 Starting = 0, 120 Started = 1, 121 Returned = 2, 122 StartCancelled = 3, 123 ReturnCancelled = 4, 124 } 125 126 impl Status { 127 /// Packs this status and the optional `waitable` provided into a 32-bit 128 /// result that the canonical ABI requires. 129 /// 130 /// The low 4 bits are reserved for the status while the upper 28 bits are 131 /// the waitable, if present. 132 pub fn pack(self, waitable: Option<u32>) -> u32 { 133 assert!(matches!(self, Status::Returned) == waitable.is_none()); 134 let waitable = waitable.unwrap_or(0); 135 assert!(waitable < (1 << 28)); 136 (waitable << 4) | (self as u32) 137 } 138 } 139 140 /// Corresponds to `EventCode` in the Component Model spec, plus related payload 141 /// data. 142 #[derive(Clone, Copy, Debug)] 143 enum Event { 144 None, 145 Cancelled, 146 Subtask { 147 status: Status, 148 }, 149 StreamRead { 150 code: ReturnCode, 151 pending: Option<(TypeStreamTableIndex, u32)>, 152 }, 153 StreamWrite { 154 code: ReturnCode, 155 pending: Option<(TypeStreamTableIndex, u32)>, 156 }, 157 FutureRead { 158 code: ReturnCode, 159 pending: Option<(TypeFutureTableIndex, u32)>, 160 }, 161 FutureWrite { 162 code: ReturnCode, 163 pending: Option<(TypeFutureTableIndex, u32)>, 164 }, 165 } 166 167 impl Event { 168 /// Lower this event to core Wasm integers for delivery to the guest. 169 /// 170 /// Note that the waitable handle, if any, is assumed to be lowered 171 /// separately. 172 fn parts(self) -> (u32, u32) { 173 const EVENT_NONE: u32 = 0; 174 const EVENT_SUBTASK: u32 = 1; 175 const EVENT_STREAM_READ: u32 = 2; 176 const EVENT_STREAM_WRITE: u32 = 3; 177 const EVENT_FUTURE_READ: u32 = 4; 178 const EVENT_FUTURE_WRITE: u32 = 5; 179 const EVENT_CANCELLED: u32 = 6; 180 match self { 181 Event::None => (EVENT_NONE, 0), 182 Event::Cancelled => (EVENT_CANCELLED, 0), 183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32), 184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), 185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), 186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), 187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), 188 } 189 } 190 } 191 192 /// Corresponds to `CallbackCode` in the spec. 193 mod callback_code { 194 pub const EXIT: u32 = 0; 195 pub const YIELD: u32 = 1; 196 pub const WAIT: u32 = 2; 197 } 198 199 /// A flag indicating that the callee is an async-lowered export. 200 /// 201 /// This may be passed to the `async-start` intrinsic from a fused adapter. 202 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; 203 204 /// Provides access to either store data (via the `get` method) or the store 205 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component 206 /// instance to which the current host task belongs. 207 /// 208 /// See [`Accessor::with`] for details. 209 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> { 210 store: StoreContextMut<'a, T>, 211 get_data: fn(&mut T) -> D::Data<'_>, 212 } 213 214 impl<'a, T, D> Access<'a, T, D> 215 where 216 D: HasData + ?Sized, 217 T: 'static, 218 { 219 /// Creates a new [`Access`] from its component parts. 220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { 221 Self { store, get_data } 222 } 223 224 /// Get mutable access to the store data. 225 pub fn data_mut(&mut self) -> &mut T { 226 self.store.data_mut() 227 } 228 229 /// Get mutable access to the store data. 230 pub fn get(&mut self) -> D::Data<'_> { 231 (self.get_data)(self.data_mut()) 232 } 233 234 /// Spawn a background task. 235 /// 236 /// See [`Accessor::spawn`] for details. 237 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle 238 where 239 T: 'static, 240 { 241 let accessor = Accessor { 242 get_data: self.get_data, 243 token: StoreToken::new(self.store.as_context_mut()), 244 }; 245 self.store 246 .as_context_mut() 247 .spawn_with_accessor(accessor, task) 248 } 249 250 /// Returns the getter this accessor is using to project from `T` into 251 /// `D::Data`. 252 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 253 self.get_data 254 } 255 } 256 257 impl<'a, T, D> AsContext for Access<'a, T, D> 258 where 259 D: HasData + ?Sized, 260 T: 'static, 261 { 262 type Data = T; 263 264 fn as_context(&self) -> StoreContext<'_, T> { 265 self.store.as_context() 266 } 267 } 268 269 impl<'a, T, D> AsContextMut for Access<'a, T, D> 270 where 271 D: HasData + ?Sized, 272 T: 'static, 273 { 274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { 275 self.store.as_context_mut() 276 } 277 } 278 279 /// Provides scoped mutable access to store data in the context of a concurrent 280 /// host task future. 281 /// 282 /// This allows multiple host task futures to execute concurrently and access 283 /// the store between (but not across) `await` points. 284 /// 285 /// # Rationale 286 /// 287 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to 288 /// `D::Data<'_>`. The problem this is solving, however, is that it does not 289 /// literally store these values. The basic problem is that when a concurrent 290 /// host future is being polled it has access to `&mut T` (and the whole 291 /// `Store`) but when it's not being polled it does not have access to these 292 /// values. This reflects how the store is only ever polling one future at a 293 /// time so the store is effectively being passed between futures. 294 /// 295 /// Rust's `Future` trait, however, has no means of passing a `Store` 296 /// temporarily between futures. The [`Context`](std::task::Context) type does 297 /// not have the ability to attach arbitrary information to it at this time. 298 /// This type, [`Accessor`], is used to bridge this expressivity gap. 299 /// 300 /// The [`Accessor`] type here represents the ability to acquire, temporarily in 301 /// a synchronous manner, the current store. The [`Accessor::with`] function 302 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut 303 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does 304 /// not take an `async` closure as its argument, instead it's a synchronous 305 /// closure which must complete during on run of `Future::poll`. This reflects 306 /// how the store is temporarily made available while a host future is being 307 /// polled. 308 /// 309 /// # Implementation 310 /// 311 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and 312 /// this type additionally doesn't even have a lifetime parameter. This is 313 /// instead a representation of proof of the ability to acquire these while a 314 /// future is being polled. Wasmtime will, when it polls a host future, 315 /// configure ambient state such that the `Accessor` that a future closes over 316 /// will work and be able to access the store. 317 /// 318 /// This has a number of implications for users such as: 319 /// 320 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within 321 /// the lifetime of a single future. 322 /// * A future is expected to, however, close over an `Accessor` and keep it 323 /// alive probably for the duration of the entire future. 324 /// * Different host futures will be given different `Accessor`s, and that's 325 /// intentional. 326 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which 327 /// alleviates some otherwise required bounds to be written down. 328 /// 329 /// # Using `Accessor` in `Drop` 330 /// 331 /// The methods on `Accessor` are only expected to work in the context of 332 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a 333 /// host future can be dropped at any time throughout the system and Wasmtime 334 /// store context is not necessarily available at that time. It's recommended to 335 /// not use `Accessor` methods in anything connected to a `Drop` implementation 336 /// as they will panic and have unintended results. If you run into this though 337 /// feel free to file an issue on the Wasmtime repository. 338 pub struct Accessor<T: 'static, D = HasSelf<T>> 339 where 340 D: HasData + ?Sized, 341 { 342 token: StoreToken<T>, 343 get_data: fn(&mut T) -> D::Data<'_>, 344 } 345 346 /// A helper trait to take any type of accessor-with-data in functions. 347 /// 348 /// This trait is similar to [`AsContextMut`] except that it's used when 349 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The 350 /// [`Accessor`] is the main type used in concurrent settings and is passed to 351 /// functions such as [`Func::call_concurrent`]. 352 /// 353 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements 354 /// this trait. This effectively means that regardless of the `D` in 355 /// `Accessor<T, D>` it can still be passed to a function which just needs a 356 /// store accessor. 357 /// 358 /// Acquiring an [`Accessor`] can be done through 359 /// [`StoreContextMut::run_concurrent`] for example or in a host function 360 /// through 361 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent). 362 pub trait AsAccessor { 363 /// The `T` in `Store<T>` that this accessor refers to. 364 type Data: 'static; 365 366 /// The `D` in `Accessor<T, D>`, or the projection out of 367 /// `Self::Data`. 368 type AccessorData: HasData + ?Sized; 369 370 /// Returns the accessor that this is referring to. 371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>; 372 } 373 374 impl<T: AsAccessor + ?Sized> AsAccessor for &T { 375 type Data = T::Data; 376 type AccessorData = T::AccessorData; 377 378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> { 379 T::as_accessor(self) 380 } 381 } 382 383 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> { 384 type Data = T; 385 type AccessorData = D; 386 387 fn as_accessor(&self) -> &Accessor<T, D> { 388 self 389 } 390 } 391 392 // Note that it is intentional at this time that `Accessor` does not actually 393 // store `&mut T` or anything similar. This distinctly enables the `Accessor` 394 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for 395 // that matter). This is used to ergonomically simplify bindings where the 396 // majority of the time `Accessor` is closed over in a future which then needs 397 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as 398 // you already have to write `T: 'static`...) it helps to avoid this. 399 // 400 // Note as well that `Accessor` doesn't actually store its data at all. Instead 401 // it's more of a "proof" of what can be accessed from TLS. API design around 402 // `Accessor` and functions like `Linker::func_wrap_concurrent` are 403 // intentionally made to ensure that `Accessor` is ideally only used in the 404 // context that TLS variables are actually set. For example host functions are 405 // given `&Accessor`, not `Accessor`, and this prevents them from persisting 406 // the value outside of a future. Within the future the TLS variables are all 407 // guaranteed to be set while the future is being polled. 408 // 409 // Finally though this is not an ironclad guarantee, but nor does it need to be. 410 // The TLS APIs are designed to panic or otherwise model usage where they're 411 // called recursively or similar. It's hoped that code cannot be constructed to 412 // actually hit this at runtime but this is not a safety requirement at this 413 // time. 414 const _: () = { 415 const fn assert<T: Send + Sync>() {} 416 assert::<Accessor<UnsafeCell<u32>>>(); 417 }; 418 419 impl<T> Accessor<T> { 420 /// Creates a new `Accessor` backed by the specified functions. 421 /// 422 /// - `get`: used to retrieve the store 423 /// 424 /// - `get_data`: used to "project" from the store's associated data to 425 /// another type (e.g. a field of that data or a wrapper around it). 426 /// 427 /// - `spawn`: used to queue spawned background tasks to be run later 428 pub(crate) fn new(token: StoreToken<T>) -> Self { 429 Self { 430 token, 431 get_data: |x| x, 432 } 433 } 434 } 435 436 impl<T, D> Accessor<T, D> 437 where 438 D: HasData + ?Sized, 439 { 440 /// Run the specified closure, passing it mutable access to the store. 441 /// 442 /// This function is one of the main building blocks of the [`Accessor`] 443 /// type. This yields synchronous, blocking, access to the store via an 444 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to 445 /// providing the ability to access `D` via [`Access::get`]. Note that the 446 /// `fun` here is given only temporary access to the store and `T`/`D` 447 /// meaning that the return value `R` here is not allowed to capture borrows 448 /// into the two. If access is needed to data within `T` or `D` outside of 449 /// this closure then it must be `clone`d out, for example. 450 /// 451 /// # Panics 452 /// 453 /// This function will panic if it is call recursively with any other 454 /// accessor already in scope. For example if `with` is called within `fun`, 455 /// then this function will panic. It is up to the embedder to ensure that 456 /// this does not happen. 457 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { 458 tls::get(|vmstore| { 459 fun(Access { 460 store: self.token.as_context_mut(vmstore), 461 get_data: self.get_data, 462 }) 463 }) 464 } 465 466 /// Returns the getter this accessor is using to project from `T` into 467 /// `D::Data`. 468 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 469 self.get_data 470 } 471 472 /// Changes this accessor to access `D2` instead of the current type 473 /// parameter `D`. 474 /// 475 /// This changes the underlying data access from `T` to `D2::Data<'_>`. 476 /// 477 /// # Panics 478 /// 479 /// When using this API the returned value is disconnected from `&self` and 480 /// the lifetime binding the `self` argument. An `Accessor` only works 481 /// within the context of the closure or async closure that it was 482 /// originally given to, however. This means that due to the fact that the 483 /// returned value has no lifetime connection it's possible to use the 484 /// accessor outside of `&self`, the original accessor, and panic. 485 /// 486 /// The returned value should only be used within the scope of the original 487 /// `Accessor` that `self` refers to. 488 pub fn with_getter<D2: HasData>( 489 &self, 490 get_data: fn(&mut T) -> D2::Data<'_>, 491 ) -> Accessor<T, D2> { 492 Accessor { 493 token: self.token, 494 get_data, 495 } 496 } 497 498 /// Spawn a background task which will receive an `&Accessor<T, D>` and 499 /// run concurrently with any other tasks in progress for the current 500 /// store. 501 /// 502 /// This is particularly useful for host functions which return a `stream` 503 /// or `future` such that the code to write to the write end of that 504 /// `stream` or `future` must run after the function returns. 505 /// 506 /// The returned [`JoinHandle`] may be used to cancel the task. 507 /// 508 /// # Panics 509 /// 510 /// Panics if called within a closure provided to the [`Accessor::with`] 511 /// function. This can only be called outside an active invocation of 512 /// [`Accessor::with`]. 513 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle 514 where 515 T: 'static, 516 { 517 let accessor = self.clone_for_spawn(); 518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) 519 } 520 521 fn clone_for_spawn(&self) -> Self { 522 Self { 523 token: self.token, 524 get_data: self.get_data, 525 } 526 } 527 } 528 529 /// Represents a task which may be provided to `Accessor::spawn`, 530 /// `Accessor::forward`, or `StorecContextMut::spawn`. 531 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable 532 // option. 533 // 534 // As of this writing, it's not possible to specify e.g. `Send` and `Sync` 535 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: 536 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F + 537 // Send + Sync + 'static` fails with a type mismatch error when we try to pass 538 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the 539 // best we can do for the time being. 540 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static 541 where 542 D: HasData + ?Sized, 543 { 544 /// Run the task. 545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send; 546 } 547 548 /// Represents parameter and result metadata for the caller side of a 549 /// guest->guest call orchestrated by a fused adapter. 550 enum CallerInfo { 551 /// Metadata for a call to an async-lowered import 552 Async { 553 params: Vec<ValRaw>, 554 has_result: bool, 555 }, 556 /// Metadata for a call to an sync-lowered import 557 Sync { 558 params: Vec<ValRaw>, 559 result_count: u32, 560 }, 561 } 562 563 /// Indicates how a guest task is waiting on a waitable set. 564 enum WaitMode { 565 /// The guest task is waiting using `task.wait` 566 Fiber(StoreFiber<'static>), 567 /// The guest task is waiting via a callback declared as part of an 568 /// async-lifted export. 569 Callback(Instance), 570 } 571 572 /// Represents the reason a fiber is suspending itself. 573 #[derive(Debug)] 574 enum SuspendReason { 575 /// The fiber is waiting for an event to be delivered to the specified 576 /// waitable set or task. 577 Waiting { 578 set: TableId<WaitableSet>, 579 thread: QualifiedThreadId, 580 skip_may_block_check: bool, 581 }, 582 /// The fiber has finished handling its most recent work item and is waiting 583 /// for another (or to be dropped if it is no longer needed). 584 NeedWork, 585 /// The fiber is yielding and should be resumed once other tasks have had a 586 /// chance to run. 587 Yielding { 588 thread: QualifiedThreadId, 589 skip_may_block_check: bool, 590 }, 591 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. 592 ExplicitlySuspending { 593 thread: QualifiedThreadId, 594 skip_may_block_check: bool, 595 }, 596 } 597 598 /// Represents a pending call into guest code for a given guest task. 599 enum GuestCallKind { 600 /// Indicates there's an event to deliver to the task, possibly related to a 601 /// waitable set the task has been waiting on or polling. 602 DeliverEvent { 603 /// The instance to which the task belongs. 604 instance: Instance, 605 /// The waitable set the event belongs to, if any. 606 /// 607 /// If this is `None` the event will be waiting in the 608 /// `GuestTask::event` field for the task. 609 set: Option<TableId<WaitableSet>>, 610 }, 611 /// Indicates that a new guest task call is pending and may be executed 612 /// using the specified closure. 613 /// 614 /// If the closure returns `Ok(Some(call))`, the `call` should be run 615 /// immediately using `handle_guest_call`. 616 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>), 617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>), 618 } 619 620 impl fmt::Debug for GuestCallKind { 621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 622 match self { 623 Self::DeliverEvent { instance, set } => f 624 .debug_struct("DeliverEvent") 625 .field("instance", instance) 626 .field("set", set) 627 .finish(), 628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), 629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), 630 } 631 } 632 } 633 634 /// The target of a suspension intrinsic. 635 #[derive(Copy, Clone, Debug)] 636 pub enum SuspensionTarget { 637 SomeSuspended(u32), 638 Some(u32), 639 None, 640 } 641 642 impl SuspensionTarget { 643 fn is_none(&self) -> bool { 644 matches!(self, SuspensionTarget::None) 645 } 646 fn is_some(&self) -> bool { 647 !self.is_none() 648 } 649 } 650 651 /// Represents a pending call into guest code for a given guest thread. 652 #[derive(Debug)] 653 struct GuestCall { 654 thread: QualifiedThreadId, 655 kind: GuestCallKind, 656 } 657 658 impl GuestCall { 659 /// Returns whether or not the call is ready to run. 660 /// 661 /// A call will not be ready to run if either: 662 /// 663 /// - the (sub-)component instance to be called has already been entered and 664 /// cannot be reentered until an in-progress call completes 665 /// 666 /// - the call is for a not-yet started task and the (sub-)component 667 /// instance to be called has backpressure enabled 668 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> { 669 let instance = store 670 .concurrent_state_mut() 671 .get_mut(self.thread.task)? 672 .instance; 673 let state = store.instance_state(instance).concurrent_state(); 674 675 let ready = match &self.kind { 676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, 677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), 678 GuestCallKind::StartExplicit(_) => true, 679 }; 680 log::trace!( 681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", 682 state.do_not_enter, 683 state.backpressure 684 ); 685 Ok(ready) 686 } 687 } 688 689 /// Job to be run on a worker fiber. 690 enum WorkerItem { 691 GuestCall(GuestCall), 692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 693 } 694 695 /// Represents a pending work item to be handled by the event loop for a given 696 /// component instance. 697 enum WorkItem { 698 /// A host task to be pushed to `ConcurrentState::futures`. 699 PushFuture(AlwaysMut<HostTaskFuture>), 700 /// A fiber to resume. 701 ResumeFiber(StoreFiber<'static>), 702 /// A thread to resume. 703 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId), 704 /// A pending call into guest code for a given guest task. 705 GuestCall(RuntimeComponentInstanceIndex, GuestCall), 706 /// A job to run on a worker fiber. 707 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 708 } 709 710 impl fmt::Debug for WorkItem { 711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 712 match self { 713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), 714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), 715 Self::ResumeThread(instance, thread) => f 716 .debug_tuple("ResumeThread") 717 .field(instance) 718 .field(thread) 719 .finish(), 720 Self::GuestCall(instance, call) => f 721 .debug_tuple("GuestCall") 722 .field(instance) 723 .field(call) 724 .finish(), 725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), 726 } 727 } 728 } 729 730 /// Whether a suspension intrinsic was cancelled or completed 731 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 732 pub(crate) enum WaitResult { 733 Cancelled, 734 Completed, 735 } 736 737 /// Poll the specified future until it completes on behalf of a guest->host call 738 /// using a sync-lowered import. 739 /// 740 /// This is similar to `Instance::first_poll` except it's for sync-lowered 741 /// imports, meaning we don't need to handle cancellation and we can block the 742 /// caller until the task completes, at which point the caller can handle 743 /// lowering the result to the guest's stack and linear memory. 744 pub(crate) fn poll_and_block<R: Send + Sync + 'static>( 745 store: &mut dyn VMStore, 746 future: impl Future<Output = Result<R>> + Send + 'static, 747 ) -> Result<R> { 748 let state = store.concurrent_state_mut(); 749 let task = state.current_host_thread()?; 750 751 // Wrap the future in a closure which will take care of stashing the result 752 // in `GuestTask::result` and resuming this fiber when the host task 753 // completes. 754 let mut future = Box::pin(async move { 755 let result = future.await?; 756 tls::get(move |store| { 757 let state = store.concurrent_state_mut(); 758 let host_state = &mut state.get_mut(task)?.state; 759 assert!(matches!(host_state, HostTaskState::CalleeStarted)); 760 *host_state = HostTaskState::CalleeFinished(Box::new(result)); 761 762 Waitable::Host(task).set_event( 763 state, 764 Some(Event::Subtask { 765 status: Status::Returned, 766 }), 767 )?; 768 769 Ok(()) 770 }) 771 }) as HostTaskFuture; 772 773 // Finally, poll the future. We can use a dummy `Waker` here because we'll 774 // add the future to `ConcurrentState::futures` and poll it automatically 775 // from the event loop if it doesn't complete immediately here. 776 let poll = tls::set(store, || { 777 future 778 .as_mut() 779 .poll(&mut Context::from_waker(&Waker::noop())) 780 }); 781 782 match poll { 783 // It completed immediately; check the result and delete the task. 784 Poll::Ready(result) => result?, 785 786 // It did not complete immediately; add it to 787 // `ConcurrentState::futures` so it will be polled via the event loop; 788 // then use `GuestThread::sync_call_set` to wait for the task to 789 // complete, suspending the current fiber until it does so. 790 Poll::Pending => { 791 let state = store.concurrent_state_mut(); 792 state.push_future(future); 793 794 let caller = state.get_mut(task)?.caller; 795 let set = state.get_mut(caller.thread)?.sync_call_set; 796 Waitable::Host(task).join(state, Some(set))?; 797 798 store.suspend(SuspendReason::Waiting { 799 set, 800 thread: caller, 801 skip_may_block_check: false, 802 })?; 803 804 // Remove the `task` from the `sync_call_set` to ensure that when 805 // this function returns and the task is deleted that there are no 806 // more lingering references to this host task. 807 Waitable::Host(task).join(store.concurrent_state_mut(), None)?; 808 } 809 } 810 811 // Retrieve and return the result. 812 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state; 813 match mem::replace(host_state, HostTaskState::CalleeDone { cancelled: false }) { 814 HostTaskState::CalleeFinished(result) => Ok(match result.downcast() { 815 Ok(result) => *result, 816 Err(_) => bail_bug!("host task finished with wrong type of result"), 817 }), 818 _ => bail_bug!("unexpected host task state after completion"), 819 } 820 } 821 822 /// Execute the specified guest call. 823 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { 824 let mut next = Some(call); 825 while let Some(call) = next.take() { 826 match call.kind { 827 GuestCallKind::DeliverEvent { instance, set } => { 828 let (event, waitable) = 829 match instance.get_event(store, call.thread.task, set, true)? { 830 Some(pair) => pair, 831 None => bail_bug!("delivering non-present event"), 832 }; 833 let state = store.concurrent_state_mut(); 834 let task = state.get_mut(call.thread.task)?; 835 let runtime_instance = task.instance; 836 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 837 838 log::trace!( 839 "use callback to deliver event {event:?} to {:?} for {waitable:?}", 840 call.thread, 841 ); 842 843 let old_thread = store.set_thread(call.thread)?; 844 log::trace!( 845 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", 846 call.thread 847 ); 848 849 store.enter_instance(runtime_instance); 850 851 let Some(callback) = store 852 .concurrent_state_mut() 853 .get_mut(call.thread.task)? 854 .callback 855 .take() 856 else { 857 bail_bug!("guest task callback field not present") 858 }; 859 860 let code = callback(store, event, handle)?; 861 862 store 863 .concurrent_state_mut() 864 .get_mut(call.thread.task)? 865 .callback = Some(callback); 866 867 store.exit_instance(runtime_instance)?; 868 869 store.set_thread(old_thread)?; 870 871 next = instance.handle_callback_code( 872 store, 873 call.thread, 874 runtime_instance.index, 875 code, 876 )?; 877 878 log::trace!( 879 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" 880 ); 881 } 882 GuestCallKind::StartImplicit(fun) => { 883 next = fun(store)?; 884 } 885 GuestCallKind::StartExplicit(fun) => { 886 fun(store)?; 887 } 888 } 889 } 890 891 Ok(()) 892 } 893 894 impl<T> Store<T> { 895 /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. 896 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 897 where 898 T: Send + 'static, 899 { 900 ensure!( 901 self.as_context().0.concurrency_support(), 902 "cannot use `run_concurrent` when Config::concurrency_support disabled", 903 ); 904 self.as_context_mut().run_concurrent(fun).await 905 } 906 907 #[doc(hidden)] 908 pub fn assert_concurrent_state_empty(&mut self) { 909 self.as_context_mut().assert_concurrent_state_empty(); 910 } 911 912 #[doc(hidden)] 913 pub fn concurrent_state_table_size(&mut self) -> usize { 914 self.as_context_mut().concurrent_state_table_size() 915 } 916 917 /// Convenience wrapper for [`StoreContextMut::spawn`]. 918 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle 919 where 920 T: 'static, 921 { 922 self.as_context_mut().spawn(task) 923 } 924 } 925 926 impl<T> StoreContextMut<'_, T> { 927 /// Assert that all the relevant tables and queues in the concurrent state 928 /// for this store are empty. 929 /// 930 /// This is for sanity checking in integration tests 931 /// (e.g. `component-async-tests`) that the relevant state has been cleared 932 /// after each test concludes. This should help us catch leaks, e.g. guest 933 /// tasks which haven't been deleted despite having completed and having 934 /// been dropped by their supertasks. 935 /// 936 /// Only intended for use in Wasmtime's own testing. 937 #[doc(hidden)] 938 pub fn assert_concurrent_state_empty(self) { 939 let store = self.0; 940 store 941 .store_data_mut() 942 .components 943 .assert_instance_states_empty(); 944 let state = store.concurrent_state_mut(); 945 assert!( 946 state.table.get_mut().is_empty(), 947 "non-empty table: {:?}", 948 state.table.get_mut() 949 ); 950 assert!(state.high_priority.is_empty()); 951 assert!(state.low_priority.is_empty()); 952 assert!(state.current_thread.is_none()); 953 assert!(state.futures_mut().unwrap().is_empty()); 954 assert!(state.global_error_context_ref_counts.is_empty()); 955 } 956 957 /// Helper function to perform tests over the size of the concurrent state 958 /// table which can be useful for detecting leaks. 959 /// 960 /// Only intended for use in Wasmtime's own testing. 961 #[doc(hidden)] 962 pub fn concurrent_state_table_size(&mut self) -> usize { 963 self.0 964 .concurrent_state_mut() 965 .table 966 .get_mut() 967 .iter_mut() 968 .count() 969 } 970 971 /// Spawn a background task to run as part of this instance's event loop. 972 /// 973 /// The task will receive an `&Accessor<U>` and run concurrently with 974 /// any other tasks in progress for the instance. 975 /// 976 /// Note that the task will only make progress if and when the event loop 977 /// for this instance is run. 978 /// 979 /// The returned [`JoinHandle`] may be used to cancel the task. 980 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle 981 where 982 T: 'static, 983 { 984 let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); 985 self.spawn_with_accessor(accessor, task) 986 } 987 988 /// Internal implementation of `spawn` functions where a `store` is 989 /// available along with an `Accessor`. 990 fn spawn_with_accessor<D>( 991 self, 992 accessor: Accessor<T, D>, 993 task: impl AccessorTask<T, D>, 994 ) -> JoinHandle 995 where 996 T: 'static, 997 D: HasData + ?Sized, 998 { 999 // Create an "abortable future" here where internally the future will 1000 // hook calls to poll and possibly spawn more background tasks on each 1001 // iteration. 1002 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); 1003 self.0 1004 .concurrent_state_mut() 1005 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); 1006 handle 1007 } 1008 1009 /// Run the specified closure `fun` to completion as part of this store's 1010 /// event loop. 1011 /// 1012 /// This will run `fun` as part of this store's event loop until it 1013 /// yields a result. `fun` is provided an [`Accessor`], which provides 1014 /// controlled access to the store and its data. 1015 /// 1016 /// This function can be used to invoke [`Func::call_concurrent`] for 1017 /// example within the async closure provided here. 1018 /// 1019 /// This function will unconditionally return an error if 1020 /// [`Config::concurrency_support`] is disabled. 1021 /// 1022 /// [`Config::concurrency_support`]: crate::Config::concurrency_support 1023 /// 1024 /// # Store-blocking behavior 1025 /// 1026 /// At this time there are certain situations in which the `Future` returned 1027 /// by the `AsyncFnOnce` passed to this function will not be polled for an 1028 /// extended period of time, despite one or more `Waker::wake` events having 1029 /// occurred for the task to which it belongs. This can manifest as the 1030 /// `Future` seeming to be "blocked" or "locked up", but is actually due to 1031 /// the `Store` being held by e.g. a blocking host function, preventing the 1032 /// `Future` from being polled. A canonical example of this is when the 1033 /// `fun` provided to this function attempts to set a timeout for an 1034 /// invocation of a wasm function. In this situation the async closure is 1035 /// waiting both on (a) the wasm computation to finish, and (b) the timeout 1036 /// to elapse. At this time this setup will not always work and the timeout 1037 /// may not reliably fire. 1038 /// 1039 /// This function will not block the current thread and as such is always 1040 /// suitable to run in an `async` context, but the current implementation of 1041 /// Wasmtime can lead to situations where a certain wasm computation is 1042 /// required to make progress the closure to make progress. This is an 1043 /// artifact of Wasmtime's historical implementation of `async` functions 1044 /// and is the topic of [#11869] and [#11870]. In the timeout example from 1045 /// above it means that Wasmtime can get "wedged" for a bit where (a) must 1046 /// progress for a readiness notification of (b) to get delivered. 1047 /// 1048 /// This effectively means that it's not possible to reliably perform a 1049 /// "select" operation within the `fun` closure, which timeouts for example 1050 /// are based on. Fixing this requires some relatively major refactoring 1051 /// work within Wasmtime itself. This is a known pitfall otherwise and one 1052 /// that is intended to be fixed one day. In the meantime it's recommended 1053 /// to apply timeouts or such to the entire `run_concurrent` call itself 1054 /// rather than internally. 1055 /// 1056 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 1057 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 1058 /// 1059 /// # Example 1060 /// 1061 /// ``` 1062 /// # use { 1063 /// # wasmtime::{ 1064 /// # error::{Result}, 1065 /// # component::{ Component, Linker, Resource, ResourceTable}, 1066 /// # Config, Engine, Store 1067 /// # }, 1068 /// # }; 1069 /// # 1070 /// # struct MyResource(u32); 1071 /// # struct Ctx { table: ResourceTable } 1072 /// # 1073 /// # async fn foo() -> Result<()> { 1074 /// # let mut config = Config::new(); 1075 /// # let engine = Engine::new(&config)?; 1076 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); 1077 /// # let mut linker = Linker::new(&engine); 1078 /// # let component = Component::new(&engine, "")?; 1079 /// # let instance = linker.instantiate_async(&mut store, &component).await?; 1080 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?; 1081 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; 1082 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { 1083 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; 1084 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?; 1085 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; 1086 /// bar.call_concurrent(accessor, (value.0,)).await?; 1087 /// Ok(()) 1088 /// }).await??; 1089 /// # Ok(()) 1090 /// # } 1091 /// ``` 1092 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 1093 where 1094 T: Send + 'static, 1095 { 1096 ensure!( 1097 self.0.concurrency_support(), 1098 "cannot use `run_concurrent` when Config::concurrency_support disabled", 1099 ); 1100 self.do_run_concurrent(fun, false).await 1101 } 1102 1103 pub(super) async fn run_concurrent_trap_on_idle<R>( 1104 self, 1105 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1106 ) -> Result<R> 1107 where 1108 T: Send + 'static, 1109 { 1110 self.do_run_concurrent(fun, true).await 1111 } 1112 1113 async fn do_run_concurrent<R>( 1114 mut self, 1115 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1116 trap_on_idle: bool, 1117 ) -> Result<R> 1118 where 1119 T: Send + 'static, 1120 { 1121 debug_assert!(self.0.concurrency_support()); 1122 check_recursive_run(); 1123 let token = StoreToken::new(self.as_context_mut()); 1124 1125 struct Dropper<'a, T: 'static, V> { 1126 store: StoreContextMut<'a, T>, 1127 value: ManuallyDrop<V>, 1128 } 1129 1130 impl<'a, T, V> Drop for Dropper<'a, T, V> { 1131 fn drop(&mut self) { 1132 tls::set(self.store.0, || { 1133 // SAFETY: Here we drop the value without moving it for the 1134 // first and only time -- per the contract for `Drop::drop`, 1135 // this code won't run again, and the `value` field will no 1136 // longer be accessible. 1137 unsafe { ManuallyDrop::drop(&mut self.value) } 1138 }); 1139 } 1140 } 1141 1142 let accessor = &Accessor::new(token); 1143 let dropper = &mut Dropper { 1144 store: self, 1145 value: ManuallyDrop::new(fun(accessor)), 1146 }; 1147 // SAFETY: We never move `dropper` nor its `value` field. 1148 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; 1149 1150 dropper 1151 .store 1152 .as_context_mut() 1153 .poll_until(future, trap_on_idle) 1154 .await 1155 } 1156 1157 /// Run this store's event loop. 1158 /// 1159 /// The returned future will resolve when the specified future completes or, 1160 /// if `trap_on_idle` is true, when the event loop can't make further 1161 /// progress. 1162 async fn poll_until<R>( 1163 mut self, 1164 mut future: Pin<&mut impl Future<Output = R>>, 1165 trap_on_idle: bool, 1166 ) -> Result<R> 1167 where 1168 T: Send + 'static, 1169 { 1170 struct Reset<'a, T: 'static> { 1171 store: StoreContextMut<'a, T>, 1172 futures: Option<FuturesUnordered<HostTaskFuture>>, 1173 } 1174 1175 impl<'a, T> Drop for Reset<'a, T> { 1176 fn drop(&mut self) { 1177 if let Some(futures) = self.futures.take() { 1178 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); 1179 } 1180 } 1181 } 1182 1183 loop { 1184 // Take `ConcurrentState::futures` out of the store so we can poll 1185 // it while also safely giving any of the futures inside access to 1186 // `self`. 1187 let futures = self.0.concurrent_state_mut().futures.get_mut().take(); 1188 let mut reset = Reset { 1189 store: self.as_context_mut(), 1190 futures, 1191 }; 1192 let mut next = match reset.futures.as_mut() { 1193 Some(f) => pin!(f.next()), 1194 None => bail_bug!("concurrent state missing futures field"), 1195 }; 1196 1197 enum PollResult<R> { 1198 Complete(R), 1199 ProcessWork(Vec<WorkItem>), 1200 } 1201 let result = future::poll_fn(|cx| { 1202 // First, poll the future we were passed as an argument and 1203 // return immediately if it's ready. 1204 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { 1205 return Poll::Ready(Ok(PollResult::Complete(value))); 1206 } 1207 1208 // Next, poll `ConcurrentState::futures` (which includes any 1209 // pending host tasks and/or background tasks), returning 1210 // immediately if one of them fails. 1211 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { 1212 Poll::Ready(Some(output)) => { 1213 match output { 1214 Err(e) => return Poll::Ready(Err(e)), 1215 Ok(()) => {} 1216 } 1217 Poll::Ready(true) 1218 } 1219 Poll::Ready(None) => Poll::Ready(false), 1220 Poll::Pending => Poll::Pending, 1221 }; 1222 1223 // Next, collect the next batch of work items to process, if any. 1224 // This will be either all of the high-priority work items, or if 1225 // there are none, a single low-priority work item. 1226 let state = reset.store.0.concurrent_state_mut(); 1227 let ready = state.collect_work_items_to_run(); 1228 if !ready.is_empty() { 1229 return Poll::Ready(Ok(PollResult::ProcessWork(ready))); 1230 } 1231 1232 // Finally, if we have nothing else to do right now, determine what to do 1233 // based on whether there are any pending futures in 1234 // `ConcurrentState::futures`. 1235 return match next { 1236 Poll::Ready(true) => { 1237 // In this case, one of the futures in 1238 // `ConcurrentState::futures` completed 1239 // successfully, so we return now and continue 1240 // the outer loop in case there is another one 1241 // ready to complete. 1242 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new()))) 1243 } 1244 Poll::Ready(false) => { 1245 // Poll the future we were passed one last time 1246 // in case one of `ConcurrentState::futures` had 1247 // the side effect of unblocking it. 1248 if let Poll::Ready(value) = 1249 tls::set(reset.store.0, || future.as_mut().poll(cx)) 1250 { 1251 Poll::Ready(Ok(PollResult::Complete(value))) 1252 } else { 1253 // In this case, there are no more pending 1254 // futures in `ConcurrentState::futures`, 1255 // there are no remaining work items, _and_ 1256 // the future we were passed as an argument 1257 // still hasn't completed. 1258 if trap_on_idle { 1259 // `trap_on_idle` is true, so we exit 1260 // immediately. 1261 Poll::Ready(Err(Trap::AsyncDeadlock.into())) 1262 } else { 1263 // `trap_on_idle` is false, so we assume 1264 // that future will wake up and give us 1265 // more work to do when it's ready to. 1266 Poll::Pending 1267 } 1268 } 1269 } 1270 // There is at least one pending future in 1271 // `ConcurrentState::futures` and we have nothing 1272 // else to do but wait for now, so we return 1273 // `Pending`. 1274 Poll::Pending => Poll::Pending, 1275 }; 1276 }) 1277 .await; 1278 1279 // Put the `ConcurrentState::futures` back into the store before we 1280 // return or handle any work items since one or more of those items 1281 // might append more futures. 1282 drop(reset); 1283 1284 match result? { 1285 // The future we were passed as an argument completed, so we 1286 // return the result. 1287 PollResult::Complete(value) => break Ok(value), 1288 // The future we were passed has not yet completed, so handle 1289 // any work items and then loop again. 1290 PollResult::ProcessWork(ready) => { 1291 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> { 1292 store: StoreContextMut<'a, T>, 1293 ready: I, 1294 } 1295 1296 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> { 1297 fn drop(&mut self) { 1298 while let Some(item) = self.ready.next() { 1299 match item { 1300 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), 1301 WorkItem::PushFuture(future) => { 1302 tls::set(self.store.0, move || drop(future)) 1303 } 1304 _ => {} 1305 } 1306 } 1307 } 1308 } 1309 1310 let mut dispose = Dispose { 1311 store: self.as_context_mut(), 1312 ready: ready.into_iter(), 1313 }; 1314 1315 while let Some(item) = dispose.ready.next() { 1316 dispose 1317 .store 1318 .as_context_mut() 1319 .handle_work_item(item) 1320 .await?; 1321 } 1322 } 1323 } 1324 } 1325 } 1326 1327 /// Handle the specified work item, possibly resuming a fiber if applicable. 1328 async fn handle_work_item(self, item: WorkItem) -> Result<()> 1329 where 1330 T: Send, 1331 { 1332 log::trace!("handle work item {item:?}"); 1333 match item { 1334 WorkItem::PushFuture(future) => { 1335 self.0 1336 .concurrent_state_mut() 1337 .futures_mut()? 1338 .push(future.into_inner()); 1339 } 1340 WorkItem::ResumeFiber(fiber) => { 1341 self.0.resume_fiber(fiber).await?; 1342 } 1343 WorkItem::ResumeThread(_, thread) => { 1344 if let GuestThreadState::Ready(fiber) = mem::replace( 1345 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state, 1346 GuestThreadState::Running, 1347 ) { 1348 self.0.resume_fiber(fiber).await?; 1349 } else { 1350 bail_bug!("cannot resume non-pending thread {thread:?}"); 1351 } 1352 } 1353 WorkItem::GuestCall(_, call) => { 1354 if call.is_ready(self.0)? { 1355 self.run_on_worker(WorkerItem::GuestCall(call)).await?; 1356 } else { 1357 let state = self.0.concurrent_state_mut(); 1358 let task = state.get_mut(call.thread.task)?; 1359 if !task.starting_sent { 1360 task.starting_sent = true; 1361 if let GuestCallKind::StartImplicit(_) = &call.kind { 1362 Waitable::Guest(call.thread.task).set_event( 1363 state, 1364 Some(Event::Subtask { 1365 status: Status::Starting, 1366 }), 1367 )?; 1368 } 1369 } 1370 1371 let instance = state.get_mut(call.thread.task)?.instance; 1372 self.0 1373 .instance_state(instance) 1374 .concurrent_state() 1375 .pending 1376 .insert(call.thread, call.kind); 1377 } 1378 } 1379 WorkItem::WorkerFunction(fun) => { 1380 self.run_on_worker(WorkerItem::Function(fun)).await?; 1381 } 1382 } 1383 1384 Ok(()) 1385 } 1386 1387 /// Execute the specified guest call on a worker fiber. 1388 async fn run_on_worker(self, item: WorkerItem) -> Result<()> 1389 where 1390 T: Send, 1391 { 1392 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { 1393 fiber 1394 } else { 1395 fiber::make_fiber(self.0, move |store| { 1396 loop { 1397 let Some(item) = store.concurrent_state_mut().worker_item.take() else { 1398 bail_bug!("worker_item not present when resuming fiber") 1399 }; 1400 match item { 1401 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, 1402 WorkerItem::Function(fun) => fun.into_inner()(store)?, 1403 } 1404 1405 store.suspend(SuspendReason::NeedWork)?; 1406 } 1407 })? 1408 }; 1409 1410 let worker_item = &mut self.0.concurrent_state_mut().worker_item; 1411 assert!(worker_item.is_none()); 1412 *worker_item = Some(item); 1413 1414 self.0.resume_fiber(worker).await 1415 } 1416 1417 /// Wrap the specified host function in a future which will call it, passing 1418 /// it an `&Accessor<T>`. 1419 /// 1420 /// See the `Accessor` documentation for details. 1421 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static 1422 where 1423 T: 'static, 1424 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> 1425 + Send 1426 + Sync 1427 + 'static, 1428 R: Send + Sync + 'static, 1429 { 1430 let token = StoreToken::new(self); 1431 async move { 1432 let mut accessor = Accessor::new(token); 1433 closure(&mut accessor).await 1434 } 1435 } 1436 } 1437 1438 impl StoreOpaque { 1439 /// Push a `GuestTask` onto the task stack for either a sync-to-sync, 1440 /// guest-to-guest call or a sync host-to-guest call. 1441 /// 1442 /// This task will only be used for the purpose of handling calls to 1443 /// intrinsic functions; both parameter lowering and result lifting are 1444 /// assumed to be taken care of elsewhere. 1445 pub(crate) fn enter_guest_sync_call( 1446 &mut self, 1447 guest_caller: Option<RuntimeInstance>, 1448 callee_async: bool, 1449 callee: RuntimeInstance, 1450 ) -> Result<()> { 1451 log::trace!("enter sync call {callee:?}"); 1452 if !self.concurrency_support() { 1453 return Ok(self.enter_call_not_concurrent()); 1454 } 1455 1456 let state = self.concurrent_state_mut(); 1457 let thread = state.current_thread; 1458 let instance = if let Some(thread) = thread.guest() { 1459 Some(state.get_mut(thread.task)?.instance) 1460 } else { 1461 None 1462 }; 1463 if guest_caller.is_some() { 1464 debug_assert_eq!(instance, guest_caller); 1465 } 1466 let task = GuestTask::new( 1467 Box::new(move |_, _| bail_bug!("cannot lower params in sync call")), 1468 LiftResult { 1469 lift: Box::new(move |_, _| bail_bug!("cannot lift result in sync call")), 1470 ty: TypeTupleIndex::reserved_value(), 1471 memory: None, 1472 string_encoding: StringEncoding::Utf8, 1473 }, 1474 if let Some(thread) = thread.guest() { 1475 Caller::Guest { thread: *thread } 1476 } else { 1477 Caller::Host { 1478 tx: None, 1479 host_future_present: false, 1480 caller: thread, 1481 } 1482 }, 1483 None, 1484 callee, 1485 callee_async, 1486 )?; 1487 1488 let guest_task = state.push(task)?; 1489 let new_thread = GuestThread::new_implicit(state, guest_task)?; 1490 let guest_thread = state.push(new_thread)?; 1491 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table( 1492 guest_thread, 1493 self, 1494 callee.index, 1495 )?; 1496 1497 let state = self.concurrent_state_mut(); 1498 state.get_mut(guest_task)?.threads.insert(guest_thread); 1499 1500 self.set_thread(QualifiedThreadId { 1501 task: guest_task, 1502 thread: guest_thread, 1503 })?; 1504 1505 Ok(()) 1506 } 1507 1508 /// Pop a `GuestTask` previously pushed using `enter_sync_call`. 1509 pub(crate) fn exit_guest_sync_call(&mut self) -> Result<()> { 1510 if !self.concurrency_support() { 1511 return Ok(self.exit_call_not_concurrent()); 1512 } 1513 let thread = match self.set_thread(CurrentThread::None)?.guest() { 1514 Some(t) => *t, 1515 None => bail_bug!("expected task when exiting"), 1516 }; 1517 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance; 1518 log::trace!("exit sync call {instance:?}"); 1519 Instance::from_wasmtime(self, instance.instance).cleanup_thread( 1520 self, 1521 thread, 1522 instance.index, 1523 )?; 1524 1525 let state = self.concurrent_state_mut(); 1526 let task = state.get_mut(thread.task)?; 1527 let caller = match &task.caller { 1528 &Caller::Guest { thread } => thread.into(), 1529 &Caller::Host { caller, .. } => caller, 1530 }; 1531 self.set_thread(caller)?; 1532 1533 let state = self.concurrent_state_mut(); 1534 let task = state.get_mut(thread.task)?; 1535 if task.ready_to_delete() { 1536 state.delete(thread.task)?.dispose(state)?; 1537 } 1538 1539 Ok(()) 1540 } 1541 1542 /// Similar to `enter_guest_sync_call` except for when the guest makes a 1543 /// transition to the host. 1544 /// 1545 /// FIXME: this is called for all guest->host transitions and performs some 1546 /// relatively expensive table manipulations. This would ideally be 1547 /// optimized to avoid the full allocation of a `HostTask` in at least some 1548 /// situations. 1549 pub(crate) fn host_task_create(&mut self) -> Result<Option<TableId<HostTask>>> { 1550 if !self.concurrency_support() { 1551 self.enter_call_not_concurrent(); 1552 return Ok(None); 1553 } 1554 let state = self.concurrent_state_mut(); 1555 let caller = state.current_guest_thread()?; 1556 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?; 1557 log::trace!("new host task {task:?}"); 1558 self.set_thread(task)?; 1559 Ok(Some(task)) 1560 } 1561 1562 /// Invoked before lowering the results of a host task to the guest. 1563 /// 1564 /// This is used to update the current thread annotations within the store 1565 /// to ensure that it reflects the guest task, not the host task, since 1566 /// lowering may execute guest code. 1567 pub fn host_task_reenter_caller(&mut self) -> Result<()> { 1568 if !self.concurrency_support() { 1569 return Ok(()); 1570 } 1571 let task = self.concurrent_state_mut().current_host_thread()?; 1572 let caller = self.concurrent_state_mut().get_mut(task)?.caller; 1573 self.set_thread(caller)?; 1574 Ok(()) 1575 } 1576 1577 /// Dual of `host_task_create` and signifies that the host has finished and 1578 /// will be cleaned up. 1579 /// 1580 /// Note that this isn't invoked when the host is invoked asynchronously and 1581 /// the host isn't complete yet. In that situation the host task persists 1582 /// and will be cleaned up separately in `subtask_drop` 1583 pub(crate) fn host_task_delete(&mut self, task: Option<TableId<HostTask>>) -> Result<()> { 1584 match task { 1585 Some(task) => { 1586 log::trace!("delete host task {task:?}"); 1587 self.concurrent_state_mut().delete(task)?; 1588 } 1589 None => { 1590 self.exit_call_not_concurrent(); 1591 } 1592 } 1593 Ok(()) 1594 } 1595 1596 /// Determine whether the specified instance may be entered from the host. 1597 /// 1598 /// We return `true` here only if all of the following hold: 1599 /// 1600 /// - The top-level instance is not already on the current task's call stack. 1601 /// - The instance is not in need of a post-return function call. 1602 /// - `self` has not been poisoned due to a trap. 1603 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> Result<bool> { 1604 if self.trapped() { 1605 return Ok(false); 1606 } 1607 if !self.concurrency_support() { 1608 return Ok(true); 1609 } 1610 let state = self.concurrent_state_mut(); 1611 let mut cur = state.current_thread; 1612 loop { 1613 match cur { 1614 CurrentThread::None => break Ok(true), 1615 CurrentThread::Guest(thread) => { 1616 let task = state.get_mut(thread.task)?; 1617 1618 // Note that we only compare top-level instance IDs here. 1619 // The idea is that the host is not allowed to recursively 1620 // enter a top-level instance even if the specific leaf 1621 // instance is not on the stack. This the behavior defined 1622 // in the spec, and it allows us to elide runtime checks in 1623 // guest-to-guest adapters. 1624 if task.instance.instance == instance.instance { 1625 break Ok(false); 1626 } 1627 cur = match task.caller { 1628 Caller::Host { caller, .. } => caller, 1629 Caller::Guest { thread } => thread.into(), 1630 }; 1631 } 1632 CurrentThread::Host(id) => { 1633 cur = state.get_mut(id)?.caller.into(); 1634 } 1635 } 1636 } 1637 } 1638 1639 /// Helper function to retrieve the `InstanceState` for the 1640 /// specified instance. 1641 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState { 1642 self.component_instance_mut(instance.instance) 1643 .instance_state(instance.index) 1644 } 1645 1646 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> Result<CurrentThread> { 1647 // Each time we switch threads, we conservatively set `task_may_block` 1648 // to `false` for the component instance we're switching away from (if 1649 // any), meaning it will be `false` for any new thread created for that 1650 // instance unless explicitly set otherwise. 1651 let state = self.concurrent_state_mut(); 1652 let old_thread = mem::replace(&mut state.current_thread, thread.into()); 1653 if let Some(old_thread) = old_thread.guest() { 1654 let instance = state.get_mut(old_thread.task)?.instance.instance; 1655 self.component_instance_mut(instance) 1656 .set_task_may_block(false) 1657 } 1658 1659 // If we're switching to a new thread, set its component instance's 1660 // `task_may_block` according to where it left off. 1661 if self.concurrent_state_mut().current_thread.guest().is_some() { 1662 self.set_task_may_block()?; 1663 } 1664 1665 Ok(old_thread) 1666 } 1667 1668 /// Set the global variable representing whether the current task may block 1669 /// prior to entering Wasm code. 1670 fn set_task_may_block(&mut self) -> Result<()> { 1671 let state = self.concurrent_state_mut(); 1672 let guest_thread = state.current_guest_thread()?; 1673 let instance = state.get_mut(guest_thread.task)?.instance.instance; 1674 let may_block = self.concurrent_state_mut().may_block(guest_thread.task)?; 1675 self.component_instance_mut(instance) 1676 .set_task_may_block(may_block); 1677 Ok(()) 1678 } 1679 1680 pub(crate) fn check_blocking(&mut self) -> Result<()> { 1681 if !self.concurrency_support() { 1682 return Ok(()); 1683 } 1684 let state = self.concurrent_state_mut(); 1685 let task = state.current_guest_thread()?.task; 1686 let instance = state.get_mut(task)?.instance.instance; 1687 let task_may_block = self.component_instance(instance).get_task_may_block(); 1688 1689 if task_may_block { 1690 Ok(()) 1691 } else { 1692 Err(Trap::CannotBlockSyncTask.into()) 1693 } 1694 } 1695 1696 /// Record that we're about to enter a (sub-)component instance which does 1697 /// not support more than one concurrent, stackful activation, meaning it 1698 /// cannot be entered again until the next call returns. 1699 fn enter_instance(&mut self, instance: RuntimeInstance) { 1700 log::trace!("enter {instance:?}"); 1701 self.instance_state(instance) 1702 .concurrent_state() 1703 .do_not_enter = true; 1704 } 1705 1706 /// Record that we've exited a (sub-)component instance previously entered 1707 /// with `Self::enter_instance` and then calls `Self::partition_pending`. 1708 /// See the documentation for the latter for details. 1709 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> { 1710 log::trace!("exit {instance:?}"); 1711 self.instance_state(instance) 1712 .concurrent_state() 1713 .do_not_enter = false; 1714 self.partition_pending(instance) 1715 } 1716 1717 /// Iterate over `InstanceState::pending`, moving any ready items into the 1718 /// "high priority" work item queue. 1719 /// 1720 /// See `GuestCall::is_ready` for details. 1721 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { 1722 for (thread, kind) in 1723 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter() 1724 { 1725 let call = GuestCall { thread, kind }; 1726 if call.is_ready(self)? { 1727 self.concurrent_state_mut() 1728 .push_high_priority(WorkItem::GuestCall(instance.index, call)); 1729 } else { 1730 self.instance_state(instance) 1731 .concurrent_state() 1732 .pending 1733 .insert(call.thread, call.kind); 1734 } 1735 } 1736 1737 Ok(()) 1738 } 1739 1740 /// Implements the `backpressure.{inc,dec}` intrinsics. 1741 pub(crate) fn backpressure_modify( 1742 &mut self, 1743 caller_instance: RuntimeInstance, 1744 modify: impl FnOnce(u16) -> Option<u16>, 1745 ) -> Result<()> { 1746 let state = self.instance_state(caller_instance).concurrent_state(); 1747 let old = state.backpressure; 1748 let new = modify(old).ok_or_else(|| Trap::BackpressureOverflow)?; 1749 state.backpressure = new; 1750 1751 if old > 0 && new == 0 { 1752 // Backpressure was previously enabled and is now disabled; move any 1753 // newly-eligible guest calls to the "high priority" queue. 1754 self.partition_pending(caller_instance)?; 1755 } 1756 1757 Ok(()) 1758 } 1759 1760 /// Resume the specified fiber, giving it exclusive access to the specified 1761 /// store. 1762 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { 1763 let old_thread = self.concurrent_state_mut().current_thread; 1764 log::trace!("resume_fiber: save current thread {old_thread:?}"); 1765 1766 let fiber = fiber::resolve_or_release(self, fiber).await?; 1767 1768 self.set_thread(old_thread)?; 1769 1770 let state = self.concurrent_state_mut(); 1771 1772 if let Some(ot) = old_thread.guest() { 1773 state.get_mut(ot.thread)?.state = GuestThreadState::Running; 1774 } 1775 log::trace!("resume_fiber: restore current thread {old_thread:?}"); 1776 1777 if let Some(mut fiber) = fiber { 1778 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); 1779 // See the `SuspendReason` documentation for what each case means. 1780 let reason = match state.suspend_reason.take() { 1781 Some(r) => r, 1782 None => bail_bug!("suspend reason missing when resuming fiber"), 1783 }; 1784 match reason { 1785 SuspendReason::NeedWork => { 1786 if state.worker.is_none() { 1787 state.worker = Some(fiber); 1788 } else { 1789 fiber.dispose(self); 1790 } 1791 } 1792 SuspendReason::Yielding { thread, .. } => { 1793 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber); 1794 let instance = state.get_mut(thread.task)?.instance.index; 1795 state.push_low_priority(WorkItem::ResumeThread(instance, thread)); 1796 } 1797 SuspendReason::ExplicitlySuspending { thread, .. } => { 1798 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); 1799 } 1800 SuspendReason::Waiting { set, thread, .. } => { 1801 let old = state 1802 .get_mut(set)? 1803 .waiting 1804 .insert(thread, WaitMode::Fiber(fiber)); 1805 assert!(old.is_none()); 1806 } 1807 }; 1808 } else { 1809 log::trace!("resume_fiber: fiber has exited"); 1810 } 1811 1812 Ok(()) 1813 } 1814 1815 /// Suspend the current fiber, storing the reason in 1816 /// `ConcurrentState::suspend_reason` to indicate the conditions under which 1817 /// it should be resumed. 1818 /// 1819 /// See the `SuspendReason` documentation for details. 1820 fn suspend(&mut self, reason: SuspendReason) -> Result<()> { 1821 log::trace!("suspend fiber: {reason:?}"); 1822 1823 // If we're yielding or waiting on behalf of a guest thread, we'll need to 1824 // pop the call context which manages resource borrows before suspending 1825 // and then push it again once we've resumed. 1826 let task = match &reason { 1827 SuspendReason::Yielding { thread, .. } 1828 | SuspendReason::Waiting { thread, .. } 1829 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), 1830 SuspendReason::NeedWork => None, 1831 }; 1832 1833 let old_guest_thread = if task.is_some() { 1834 self.concurrent_state_mut().current_thread 1835 } else { 1836 CurrentThread::None 1837 }; 1838 1839 // We should not have reached here unless either there's no current 1840 // task, or the current task is permitted to block. In addition, we 1841 // special-case `thread.switch-to` and waiting for a subtask to go from 1842 // `starting` to `started`, both of which we consider non-blocking 1843 // operations despite requiring a suspend. 1844 debug_assert!( 1845 matches!( 1846 reason, 1847 SuspendReason::ExplicitlySuspending { 1848 skip_may_block_check: true, 1849 .. 1850 } | SuspendReason::Waiting { 1851 skip_may_block_check: true, 1852 .. 1853 } | SuspendReason::Yielding { 1854 skip_may_block_check: true, 1855 .. 1856 } 1857 ) || old_guest_thread 1858 .guest() 1859 .map(|thread| self.concurrent_state_mut().may_block(thread.task)) 1860 .transpose()? 1861 .unwrap_or(true) 1862 ); 1863 1864 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; 1865 assert!(suspend_reason.is_none()); 1866 *suspend_reason = Some(reason); 1867 1868 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; 1869 1870 if task.is_some() { 1871 self.set_thread(old_guest_thread)?; 1872 } 1873 1874 Ok(()) 1875 } 1876 1877 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { 1878 let state = self.concurrent_state_mut(); 1879 let caller = state.current_guest_thread()?; 1880 let old_set = waitable.common(state)?.set; 1881 let set = state.get_mut(caller.thread)?.sync_call_set; 1882 waitable.join(state, Some(set))?; 1883 self.suspend(SuspendReason::Waiting { 1884 set, 1885 thread: caller, 1886 skip_may_block_check: false, 1887 })?; 1888 let state = self.concurrent_state_mut(); 1889 waitable.join(state, old_set) 1890 } 1891 } 1892 1893 impl Instance { 1894 /// Get the next pending event for the specified task and (optional) 1895 /// waitable set, along with the waitable handle if applicable. 1896 fn get_event( 1897 self, 1898 store: &mut StoreOpaque, 1899 guest_task: TableId<GuestTask>, 1900 set: Option<TableId<WaitableSet>>, 1901 cancellable: bool, 1902 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> { 1903 let state = store.concurrent_state_mut(); 1904 1905 let event = &mut state.get_mut(guest_task)?.event; 1906 if let Some(ev) = event 1907 && (cancellable || !matches!(ev, Event::Cancelled)) 1908 { 1909 log::trace!("deliver event {ev:?} to {guest_task:?}"); 1910 let ev = *ev; 1911 *event = None; 1912 return Ok(Some((ev, None))); 1913 } 1914 1915 let set = match set { 1916 Some(set) => set, 1917 None => return Ok(None), 1918 }; 1919 let waitable = match state.get_mut(set)?.ready.pop_first() { 1920 Some(v) => v, 1921 None => return Ok(None), 1922 }; 1923 1924 let common = waitable.common(state)?; 1925 let handle = match common.handle { 1926 Some(h) => h, 1927 None => bail_bug!("handle not set when delivering event"), 1928 }; 1929 let event = match common.event.take() { 1930 Some(e) => e, 1931 None => bail_bug!("event not set when delivering event"), 1932 }; 1933 1934 log::trace!( 1935 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" 1936 ); 1937 1938 waitable.on_delivery(store, self, event)?; 1939 1940 Ok(Some((event, Some((waitable, handle))))) 1941 } 1942 1943 /// Handle the `CallbackCode` returned from an async-lifted export or its 1944 /// callback. 1945 /// 1946 /// If this returns `Ok(Some(call))`, then `call` should be run immediately 1947 /// using `handle_guest_call`. 1948 fn handle_callback_code( 1949 self, 1950 store: &mut StoreOpaque, 1951 guest_thread: QualifiedThreadId, 1952 runtime_instance: RuntimeComponentInstanceIndex, 1953 code: u32, 1954 ) -> Result<Option<GuestCall>> { 1955 let (code, set) = unpack_callback_code(code); 1956 1957 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); 1958 1959 let state = store.concurrent_state_mut(); 1960 1961 let get_set = |store: &mut StoreOpaque, handle| -> Result<_> { 1962 let set = store 1963 .instance_state(RuntimeInstance { 1964 instance: self.id().instance(), 1965 index: runtime_instance, 1966 }) 1967 .handle_table() 1968 .waitable_set_rep(handle)?; 1969 1970 Ok(TableId::<WaitableSet>::new(set)) 1971 }; 1972 1973 Ok(match code { 1974 callback_code::EXIT => { 1975 log::trace!("implicit thread {guest_thread:?} completed"); 1976 self.cleanup_thread(store, guest_thread, runtime_instance)?; 1977 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1978 if task.threads.is_empty() && !task.returned_or_cancelled() { 1979 bail!(Trap::NoAsyncResult); 1980 } 1981 if let Caller::Guest { .. } = task.caller { 1982 task.exited = true; 1983 task.callback = None; 1984 } 1985 if task.ready_to_delete() { 1986 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?; 1987 } 1988 None 1989 } 1990 callback_code::YIELD => { 1991 let task = state.get_mut(guest_thread.task)?; 1992 // If an `Event::Cancelled` is pending, we'll deliver that; 1993 // otherwise, we'll deliver `Event::None`. Note that 1994 // `GuestTask::event` is only ever set to one of those two 1995 // `Event` variants. 1996 if let Some(event) = task.event { 1997 assert!(matches!(event, Event::None | Event::Cancelled)); 1998 } else { 1999 task.event = Some(Event::None); 2000 } 2001 let call = GuestCall { 2002 thread: guest_thread, 2003 kind: GuestCallKind::DeliverEvent { 2004 instance: self, 2005 set: None, 2006 }, 2007 }; 2008 if state.may_block(guest_thread.task)? { 2009 // Push this thread onto the "low priority" queue so it runs 2010 // after any other threads have had a chance to run. 2011 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call)); 2012 None 2013 } else { 2014 // Yielding in a non-blocking context is defined as a no-op 2015 // according to the spec, so we must run this thread 2016 // immediately without allowing any others to run. 2017 Some(call) 2018 } 2019 } 2020 callback_code::WAIT => { 2021 // The task may only return `WAIT` if it was created for a call 2022 // to an async export). Otherwise, we'll trap. 2023 state.check_blocking_for(guest_thread.task)?; 2024 2025 let set = get_set(store, set)?; 2026 let state = store.concurrent_state_mut(); 2027 2028 if state.get_mut(guest_thread.task)?.event.is_some() 2029 || !state.get_mut(set)?.ready.is_empty() 2030 { 2031 // An event is immediately available; deliver it ASAP. 2032 state.push_high_priority(WorkItem::GuestCall( 2033 runtime_instance, 2034 GuestCall { 2035 thread: guest_thread, 2036 kind: GuestCallKind::DeliverEvent { 2037 instance: self, 2038 set: Some(set), 2039 }, 2040 }, 2041 )); 2042 } else { 2043 // No event is immediately available. 2044 // 2045 // We're waiting, so register to be woken up when an event 2046 // is published for this waitable set. 2047 // 2048 // Here we also set `GuestTask::wake_on_cancel` which allows 2049 // `subtask.cancel` to interrupt the wait. 2050 let old = state 2051 .get_mut(guest_thread.thread)? 2052 .wake_on_cancel 2053 .replace(set); 2054 if !old.is_none() { 2055 bail_bug!("thread unexpectedly had wake_on_cancel set"); 2056 } 2057 let old = state 2058 .get_mut(set)? 2059 .waiting 2060 .insert(guest_thread, WaitMode::Callback(self)); 2061 if !old.is_none() { 2062 bail_bug!("set's waiting set already had this thread registered"); 2063 } 2064 } 2065 None 2066 } 2067 _ => bail!(Trap::UnsupportedCallbackCode), 2068 }) 2069 } 2070 2071 fn cleanup_thread( 2072 self, 2073 store: &mut StoreOpaque, 2074 guest_thread: QualifiedThreadId, 2075 runtime_instance: RuntimeComponentInstanceIndex, 2076 ) -> Result<()> { 2077 let state = store.concurrent_state_mut(); 2078 let thread_data = state.get_mut(guest_thread.thread)?; 2079 let guest_id = match thread_data.instance_rep { 2080 Some(id) => id, 2081 None => bail_bug!("thread must have instance_rep set by now"), 2082 }; 2083 let sync_call_set = thread_data.sync_call_set; 2084 2085 // Clean up any pending subtasks in the sync_call_set 2086 for waitable in mem::take(&mut state.get_mut(sync_call_set)?.ready) { 2087 if let Some(Event::Subtask { 2088 status: Status::Returned | Status::ReturnCancelled, 2089 }) = waitable.common(state)?.event 2090 { 2091 waitable.delete_from(state)?; 2092 } 2093 } 2094 2095 store 2096 .instance_state(RuntimeInstance { 2097 instance: self.id().instance(), 2098 index: runtime_instance, 2099 }) 2100 .thread_handle_table() 2101 .guest_thread_remove(guest_id)?; 2102 2103 store.concurrent_state_mut().delete(guest_thread.thread)?; 2104 store.concurrent_state_mut().delete(sync_call_set)?; 2105 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2106 task.threads.remove(&guest_thread.thread); 2107 Ok(()) 2108 } 2109 2110 /// Add the specified guest call to the "high priority" work item queue, to 2111 /// be started as soon as backpressure and/or reentrance rules allow. 2112 /// 2113 /// SAFETY: The raw pointer arguments must be valid references to guest 2114 /// functions (with the appropriate signatures) when the closures queued by 2115 /// this function are called. 2116 unsafe fn queue_call<T: 'static>( 2117 self, 2118 mut store: StoreContextMut<T>, 2119 guest_thread: QualifiedThreadId, 2120 callee: SendSyncPtr<VMFuncRef>, 2121 param_count: usize, 2122 result_count: usize, 2123 async_: bool, 2124 callback: Option<SendSyncPtr<VMFuncRef>>, 2125 post_return: Option<SendSyncPtr<VMFuncRef>>, 2126 ) -> Result<()> { 2127 /// Return a closure which will call the specified function in the scope 2128 /// of the specified task. 2129 /// 2130 /// This will use `GuestTask::lower_params` to lower the parameters, but 2131 /// will not lift the result; instead, it returns a 2132 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if 2133 /// any, may be lifted. Note that an async-lifted export will have 2134 /// returned its result using the `task.return` intrinsic (or not 2135 /// returned a result at all, in the case of `task.cancel`), in which 2136 /// case the "result" of this call will either be a callback code or 2137 /// nothing. 2138 /// 2139 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when 2140 /// the returned closure is called. 2141 unsafe fn make_call<T: 'static>( 2142 store: StoreContextMut<T>, 2143 guest_thread: QualifiedThreadId, 2144 callee: SendSyncPtr<VMFuncRef>, 2145 param_count: usize, 2146 result_count: usize, 2147 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]> 2148 + Send 2149 + Sync 2150 + 'static 2151 + use<T> { 2152 let token = StoreToken::new(store); 2153 move |store: &mut dyn VMStore| { 2154 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; 2155 2156 store 2157 .concurrent_state_mut() 2158 .get_mut(guest_thread.thread)? 2159 .state = GuestThreadState::Running; 2160 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2161 let lower = match task.lower_params.take() { 2162 Some(l) => l, 2163 None => bail_bug!("lower_params missing"), 2164 }; 2165 2166 lower(store, &mut storage[..param_count])?; 2167 2168 let mut store = token.as_context_mut(store); 2169 2170 // SAFETY: Per the contract documented in `make_call's` 2171 // documentation, `callee` must be a valid pointer. 2172 unsafe { 2173 crate::Func::call_unchecked_raw( 2174 &mut store, 2175 callee.as_non_null(), 2176 NonNull::new( 2177 &mut storage[..param_count.max(result_count)] 2178 as *mut [MaybeUninit<ValRaw>] as _, 2179 ) 2180 .unwrap(), 2181 )?; 2182 } 2183 2184 Ok(storage) 2185 } 2186 } 2187 2188 // SAFETY: Per the contract described in this function documentation, 2189 // the `callee` pointer which `call` closes over must be valid when 2190 // called by the closure we queue below. 2191 let call = unsafe { 2192 make_call( 2193 store.as_context_mut(), 2194 guest_thread, 2195 callee, 2196 param_count, 2197 result_count, 2198 ) 2199 }; 2200 2201 let callee_instance = store 2202 .0 2203 .concurrent_state_mut() 2204 .get_mut(guest_thread.task)? 2205 .instance; 2206 2207 let fun = if callback.is_some() { 2208 assert!(async_); 2209 2210 Box::new(move |store: &mut dyn VMStore| { 2211 self.add_guest_thread_to_instance_table( 2212 guest_thread.thread, 2213 store, 2214 callee_instance.index, 2215 )?; 2216 let old_thread = store.set_thread(guest_thread)?; 2217 log::trace!( 2218 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" 2219 ); 2220 2221 store.enter_instance(callee_instance); 2222 2223 // SAFETY: See the documentation for `make_call` to review the 2224 // contract we must uphold for `call` here. 2225 // 2226 // Per the contract described in the `queue_call` 2227 // documentation, the `callee` pointer which `call` closes 2228 // over must be valid. 2229 let storage = call(store)?; 2230 2231 store.exit_instance(callee_instance)?; 2232 2233 store.set_thread(old_thread)?; 2234 let state = store.concurrent_state_mut(); 2235 if let Some(t) = old_thread.guest() { 2236 state.get_mut(t.thread)?.state = GuestThreadState::Running; 2237 } 2238 log::trace!("stackless call: restored {old_thread:?} as current thread"); 2239 2240 // SAFETY: `wasmparser` will have validated that the callback 2241 // function returns a `i32` result. 2242 let code = unsafe { storage[0].assume_init() }.get_i32() as u32; 2243 2244 self.handle_callback_code(store, guest_thread, callee_instance.index, code) 2245 }) 2246 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync> 2247 } else { 2248 let token = StoreToken::new(store.as_context_mut()); 2249 Box::new(move |store: &mut dyn VMStore| { 2250 self.add_guest_thread_to_instance_table( 2251 guest_thread.thread, 2252 store, 2253 callee_instance.index, 2254 )?; 2255 let old_thread = store.set_thread(guest_thread)?; 2256 log::trace!( 2257 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", 2258 ); 2259 let flags = self.id().get(store).instance_flags(callee_instance.index); 2260 2261 // Unless this is a callback-less (i.e. stackful) 2262 // async-lifted export, we need to record that the instance 2263 // cannot be entered until the call returns. 2264 if !async_ { 2265 store.enter_instance(callee_instance); 2266 } 2267 2268 // SAFETY: See the documentation for `make_call` to review the 2269 // contract we must uphold for `call` here. 2270 // 2271 // Per the contract described in the `queue_call` 2272 // documentation, the `callee` pointer which `call` closes 2273 // over must be valid. 2274 let storage = call(store)?; 2275 2276 if async_ { 2277 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2278 if task.threads.len() == 1 && !task.returned_or_cancelled() { 2279 bail!(Trap::NoAsyncResult); 2280 } 2281 } else { 2282 // This is a sync-lifted export, so now is when we lift the 2283 // result, optionally call the post-return function, if any, 2284 // and finally notify any current or future waiters that the 2285 // subtask has returned. 2286 2287 let lift = { 2288 store.exit_instance(callee_instance)?; 2289 2290 let state = store.concurrent_state_mut(); 2291 if !state.get_mut(guest_thread.task)?.result.is_none() { 2292 bail_bug!("task has not yet produced a result"); 2293 } 2294 2295 match state.get_mut(guest_thread.task)?.lift_result.take() { 2296 Some(lift) => lift, 2297 None => bail_bug!("lift_result field is missing"), 2298 } 2299 }; 2300 2301 // SAFETY: `result_count` represents the number of core Wasm 2302 // results returned, per `wasmparser`. 2303 let result = (lift.lift)(store, unsafe { 2304 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>( 2305 &storage[..result_count], 2306 ) 2307 })?; 2308 2309 let post_return_arg = match result_count { 2310 0 => ValRaw::i32(0), 2311 // SAFETY: `result_count` represents the number of 2312 // core Wasm results returned, per `wasmparser`. 2313 1 => unsafe { storage[0].assume_init() }, 2314 _ => unreachable!(), 2315 }; 2316 2317 unsafe { 2318 call_post_return( 2319 token.as_context_mut(store), 2320 post_return.map(|v| v.as_non_null()), 2321 post_return_arg, 2322 flags, 2323 )?; 2324 } 2325 2326 self.task_complete(store, guest_thread.task, result, Status::Returned)?; 2327 } 2328 2329 // This is a callback-less call, so the implicit thread has now completed 2330 self.cleanup_thread(store, guest_thread, callee_instance.index)?; 2331 2332 store.set_thread(old_thread)?; 2333 2334 let state = store.concurrent_state_mut(); 2335 let task = state.get_mut(guest_thread.task)?; 2336 2337 match &task.caller { 2338 Caller::Host { .. } => { 2339 if task.ready_to_delete() { 2340 Waitable::Guest(guest_thread.task).delete_from(state)?; 2341 } 2342 } 2343 Caller::Guest { .. } => { 2344 task.exited = true; 2345 } 2346 } 2347 2348 Ok(None) 2349 }) 2350 }; 2351 2352 store 2353 .0 2354 .concurrent_state_mut() 2355 .push_high_priority(WorkItem::GuestCall( 2356 callee_instance.index, 2357 GuestCall { 2358 thread: guest_thread, 2359 kind: GuestCallKind::StartImplicit(fun), 2360 }, 2361 )); 2362 2363 Ok(()) 2364 } 2365 2366 /// Prepare (but do not start) a guest->guest call. 2367 /// 2368 /// This is called from fused adapter code generated in 2369 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` 2370 /// are synthesized Wasm functions which move the parameters from the caller 2371 /// to the callee and the result from the callee to the caller, 2372 /// respectively. The adapter will call `Self::start_call` immediately 2373 /// after calling this function. 2374 /// 2375 /// SAFETY: All the pointer arguments must be valid pointers to guest 2376 /// entities (and with the expected signatures for the function references 2377 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). 2378 unsafe fn prepare_call<T: 'static>( 2379 self, 2380 mut store: StoreContextMut<T>, 2381 start: NonNull<VMFuncRef>, 2382 return_: NonNull<VMFuncRef>, 2383 caller_instance: RuntimeComponentInstanceIndex, 2384 callee_instance: RuntimeComponentInstanceIndex, 2385 task_return_type: TypeTupleIndex, 2386 callee_async: bool, 2387 memory: *mut VMMemoryDefinition, 2388 string_encoding: StringEncoding, 2389 caller_info: CallerInfo, 2390 ) -> Result<()> { 2391 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { 2392 // A task may only call an async-typed function via a sync lower if 2393 // it was created by a call to an async export. Otherwise, we'll 2394 // trap. 2395 store.0.check_blocking()?; 2396 } 2397 2398 enum ResultInfo { 2399 Heap { results: u32 }, 2400 Stack { result_count: u32 }, 2401 } 2402 2403 let result_info = match &caller_info { 2404 CallerInfo::Async { 2405 has_result: true, 2406 params, 2407 } => ResultInfo::Heap { 2408 results: match params.last() { 2409 Some(r) => r.get_u32(), 2410 None => bail_bug!("retptr missing"), 2411 }, 2412 }, 2413 CallerInfo::Async { 2414 has_result: false, .. 2415 } => ResultInfo::Stack { result_count: 0 }, 2416 CallerInfo::Sync { 2417 result_count, 2418 params, 2419 } if *result_count > u32::try_from(MAX_FLAT_RESULTS)? => ResultInfo::Heap { 2420 results: match params.last() { 2421 Some(r) => r.get_u32(), 2422 None => bail_bug!("arg ptr missing"), 2423 }, 2424 }, 2425 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { 2426 result_count: *result_count, 2427 }, 2428 }; 2429 2430 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); 2431 2432 // Create a new guest task for the call, closing over the `start` and 2433 // `return_` functions to lift the parameters and lower the result, 2434 // respectively. 2435 let start = SendSyncPtr::new(start); 2436 let return_ = SendSyncPtr::new(return_); 2437 let token = StoreToken::new(store.as_context_mut()); 2438 let state = store.0.concurrent_state_mut(); 2439 let old_thread = state.current_guest_thread()?; 2440 2441 debug_assert_eq!( 2442 state.get_mut(old_thread.task)?.instance, 2443 RuntimeInstance { 2444 instance: self.id().instance(), 2445 index: caller_instance, 2446 } 2447 ); 2448 2449 let new_task = GuestTask::new( 2450 Box::new(move |store, dst| { 2451 let mut store = token.as_context_mut(store); 2452 assert!(dst.len() <= MAX_FLAT_PARAMS); 2453 // The `+ 1` here accounts for the return pointer, if any: 2454 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; 2455 let count = match caller_info { 2456 // Async callers, if they have a result, use the last 2457 // parameter as a return pointer so chop that off if 2458 // relevant here. 2459 CallerInfo::Async { params, has_result } => { 2460 let params = ¶ms[..params.len() - usize::from(has_result)]; 2461 for (param, src) in params.iter().zip(&mut src) { 2462 src.write(*param); 2463 } 2464 params.len() 2465 } 2466 2467 // Sync callers forward everything directly. 2468 CallerInfo::Sync { params, .. } => { 2469 for (param, src) in params.iter().zip(&mut src) { 2470 src.write(*param); 2471 } 2472 params.len() 2473 } 2474 }; 2475 // SAFETY: `start` is a valid `*mut VMFuncRef` from 2476 // `wasmtime-cranelift`-generated fused adapter code. Based on 2477 // how it was constructed (see 2478 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` 2479 // for details) we know it takes count parameters and returns 2480 // `dst.len()` results. 2481 unsafe { 2482 crate::Func::call_unchecked_raw( 2483 &mut store, 2484 start.as_non_null(), 2485 NonNull::new( 2486 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _, 2487 ) 2488 .unwrap(), 2489 )?; 2490 } 2491 dst.copy_from_slice(&src[..dst.len()]); 2492 let state = store.0.concurrent_state_mut(); 2493 Waitable::Guest(state.current_guest_thread()?.task).set_event( 2494 state, 2495 Some(Event::Subtask { 2496 status: Status::Started, 2497 }), 2498 )?; 2499 Ok(()) 2500 }), 2501 LiftResult { 2502 lift: Box::new(move |store, src| { 2503 // SAFETY: See comment in closure passed as `lower_params` 2504 // parameter above. 2505 let mut store = token.as_context_mut(store); 2506 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? 2507 if let ResultInfo::Heap { results } = &result_info { 2508 my_src.push(ValRaw::u32(*results)); 2509 } 2510 2511 // Execute the `return_` hook, generated by Wasmtime's FACT 2512 // compiler, in the context of the old thread. The old 2513 // thread, this thread's caller, may have `realloc` 2514 // callbacks invoked for example and those need the correct 2515 // context set for the current thread. 2516 let prev = store.0.set_thread(old_thread)?; 2517 2518 // SAFETY: `return_` is a valid `*mut VMFuncRef` from 2519 // `wasmtime-cranelift`-generated fused adapter code. Based 2520 // on how it was constructed (see 2521 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` 2522 // for details) we know it takes `src.len()` parameters and 2523 // returns up to 1 result. 2524 unsafe { 2525 crate::Func::call_unchecked_raw( 2526 &mut store, 2527 return_.as_non_null(), 2528 my_src.as_mut_slice().into(), 2529 )?; 2530 } 2531 2532 // Restore the previous current thread after the 2533 // lifting/lowering has returned. 2534 store.0.set_thread(prev)?; 2535 2536 let state = store.0.concurrent_state_mut(); 2537 let thread = state.current_guest_thread()?; 2538 if sync_caller { 2539 state.get_mut(thread.task)?.sync_result = SyncResult::Produced( 2540 if let ResultInfo::Stack { result_count } = &result_info { 2541 match result_count { 2542 0 => None, 2543 1 => Some(my_src[0]), 2544 _ => unreachable!(), 2545 } 2546 } else { 2547 None 2548 }, 2549 ); 2550 } 2551 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>) 2552 }), 2553 ty: task_return_type, 2554 memory: NonNull::new(memory).map(SendSyncPtr::new), 2555 string_encoding, 2556 }, 2557 Caller::Guest { thread: old_thread }, 2558 None, 2559 RuntimeInstance { 2560 instance: self.id().instance(), 2561 index: callee_instance, 2562 }, 2563 callee_async, 2564 )?; 2565 2566 let guest_task = state.push(new_task)?; 2567 let new_thread = GuestThread::new_implicit(state, guest_task)?; 2568 let guest_thread = state.push(new_thread)?; 2569 state.get_mut(guest_task)?.threads.insert(guest_thread); 2570 2571 // Make the new thread the current one so that `Self::start_call` knows 2572 // which one to start. 2573 store.0.set_thread(QualifiedThreadId { 2574 task: guest_task, 2575 thread: guest_thread, 2576 })?; 2577 log::trace!( 2578 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" 2579 ); 2580 2581 Ok(()) 2582 } 2583 2584 /// Call the specified callback function for an async-lifted export. 2585 /// 2586 /// SAFETY: `function` must be a valid reference to a guest function of the 2587 /// correct signature for a callback. 2588 unsafe fn call_callback<T>( 2589 self, 2590 mut store: StoreContextMut<T>, 2591 function: SendSyncPtr<VMFuncRef>, 2592 event: Event, 2593 handle: u32, 2594 ) -> Result<u32> { 2595 let (ordinal, result) = event.parts(); 2596 let params = &mut [ 2597 ValRaw::u32(ordinal), 2598 ValRaw::u32(handle), 2599 ValRaw::u32(result), 2600 ]; 2601 // SAFETY: `func` is a valid `*mut VMFuncRef` from either 2602 // `wasmtime-cranelift`-generated fused adapter code or 2603 // `component::Options`. Per `wasmparser` callback signature 2604 // validation, we know it takes three parameters and returns one. 2605 unsafe { 2606 crate::Func::call_unchecked_raw( 2607 &mut store, 2608 function.as_non_null(), 2609 params.as_mut_slice().into(), 2610 )?; 2611 } 2612 Ok(params[0].get_u32()) 2613 } 2614 2615 /// Start a guest->guest call previously prepared using 2616 /// `Self::prepare_call`. 2617 /// 2618 /// This is called from fused adapter code generated in 2619 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call 2620 /// this function immediately after calling `Self::prepare_call`. 2621 /// 2622 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest 2623 /// functions with the appropriate signatures for the current guest task. 2624 /// If this is a call to an async-lowered import, the actual call may be 2625 /// deferred and run after this function returns, in which case the pointer 2626 /// arguments must also be valid when the call happens. 2627 unsafe fn start_call<T: 'static>( 2628 self, 2629 mut store: StoreContextMut<T>, 2630 callback: *mut VMFuncRef, 2631 post_return: *mut VMFuncRef, 2632 callee: NonNull<VMFuncRef>, 2633 param_count: u32, 2634 result_count: u32, 2635 flags: u32, 2636 storage: Option<&mut [MaybeUninit<ValRaw>]>, 2637 ) -> Result<u32> { 2638 let token = StoreToken::new(store.as_context_mut()); 2639 let async_caller = storage.is_none(); 2640 let state = store.0.concurrent_state_mut(); 2641 let guest_thread = state.current_guest_thread()?; 2642 let callee_async = state.get_mut(guest_thread.task)?.async_function; 2643 let callee = SendSyncPtr::new(callee); 2644 let param_count = usize::try_from(param_count)?; 2645 assert!(param_count <= MAX_FLAT_PARAMS); 2646 let result_count = usize::try_from(result_count)?; 2647 assert!(result_count <= MAX_FLAT_RESULTS); 2648 2649 let task = state.get_mut(guest_thread.task)?; 2650 if let Some(callback) = NonNull::new(callback) { 2651 // We're calling an async-lifted export with a callback, so store 2652 // the callback and related context as part of the task so we can 2653 // call it later when needed. 2654 let callback = SendSyncPtr::new(callback); 2655 task.callback = Some(Box::new(move |store, event, handle| { 2656 let store = token.as_context_mut(store); 2657 unsafe { self.call_callback::<T>(store, callback, event, handle) } 2658 })); 2659 } 2660 2661 let Caller::Guest { thread: caller } = &task.caller else { 2662 // As of this writing, `start_call` is only used for guest->guest 2663 // calls. 2664 bail_bug!("start_call unexpectedly invoked for host->guest call"); 2665 }; 2666 let caller = *caller; 2667 let caller_instance = state.get_mut(caller.task)?.instance; 2668 2669 // Queue the call as a "high priority" work item. 2670 unsafe { 2671 self.queue_call( 2672 store.as_context_mut(), 2673 guest_thread, 2674 callee, 2675 param_count, 2676 result_count, 2677 (flags & START_FLAG_ASYNC_CALLEE) != 0, 2678 NonNull::new(callback).map(SendSyncPtr::new), 2679 NonNull::new(post_return).map(SendSyncPtr::new), 2680 )?; 2681 } 2682 2683 let state = store.0.concurrent_state_mut(); 2684 2685 // Use the caller's `GuestThread::sync_call_set` to register interest in 2686 // the subtask... 2687 let guest_waitable = Waitable::Guest(guest_thread.task); 2688 let old_set = guest_waitable.common(state)?.set; 2689 let set = state.get_mut(caller.thread)?.sync_call_set; 2690 guest_waitable.join(state, Some(set))?; 2691 2692 // ... and suspend this fiber temporarily while we wait for it to start. 2693 // 2694 // Note that we _could_ call the callee directly using the current fiber 2695 // rather than suspend this one, but that would make reasoning about the 2696 // event loop more complicated and is probably only worth doing if 2697 // there's a measurable performance benefit. In addition, it would mean 2698 // blocking the caller if the callee calls a blocking sync-lowered 2699 // import, and as of this writing the spec says we must not do that. 2700 // 2701 // Alternatively, the fused adapter code could be modified to call the 2702 // callee directly without calling a host-provided intrinsic at all (in 2703 // which case it would need to do its own, inline backpressure checks, 2704 // etc.). Again, we'd want to see a measurable performance benefit 2705 // before committing to such an optimization. And again, we'd need to 2706 // update the spec to allow that. 2707 let (status, waitable) = loop { 2708 store.0.suspend(SuspendReason::Waiting { 2709 set, 2710 thread: caller, 2711 // Normally, `StoreOpaque::suspend` would assert it's being 2712 // called from a context where blocking is allowed. However, if 2713 // `async_caller` is `true`, we'll only "block" long enough for 2714 // the callee to start, i.e. we won't repeat this loop, so we 2715 // tell `suspend` it's okay even if we're not allowed to block. 2716 // Alternatively, if the callee is not an async function, then 2717 // we know it won't block anyway. 2718 skip_may_block_check: async_caller || !callee_async, 2719 })?; 2720 2721 let state = store.0.concurrent_state_mut(); 2722 2723 log::trace!("taking event for {:?}", guest_thread.task); 2724 let event = guest_waitable.take_event(state)?; 2725 let Some(Event::Subtask { status }) = event else { 2726 bail_bug!("subtasks should only get subtask events, got {event:?}") 2727 }; 2728 2729 log::trace!("status {status:?} for {:?}", guest_thread.task); 2730 2731 if status == Status::Returned { 2732 // It returned, so we can stop waiting. 2733 break (status, None); 2734 } else if async_caller { 2735 // It hasn't returned yet, but the caller is calling via an 2736 // async-lowered import, so we generate a handle for the task 2737 // waitable and return the status. 2738 let handle = store 2739 .0 2740 .instance_state(caller_instance) 2741 .handle_table() 2742 .subtask_insert_guest(guest_thread.task.rep())?; 2743 store 2744 .0 2745 .concurrent_state_mut() 2746 .get_mut(guest_thread.task)? 2747 .common 2748 .handle = Some(handle); 2749 break (status, Some(handle)); 2750 } else { 2751 // The callee hasn't returned yet, and the caller is calling via 2752 // a sync-lowered import, so we loop and keep waiting until the 2753 // callee returns. 2754 } 2755 }; 2756 2757 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?; 2758 2759 // Reset the current thread to point to the caller as it resumes control. 2760 store.0.set_thread(caller)?; 2761 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running; 2762 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); 2763 2764 if let Some(storage) = storage { 2765 // The caller used a sync-lowered import to call an async-lifted 2766 // export, in which case the result, if any, has been stashed in 2767 // `GuestTask::sync_result`. 2768 let state = store.0.concurrent_state_mut(); 2769 let task = state.get_mut(guest_thread.task)?; 2770 if let Some(result) = task.sync_result.take()? { 2771 if let Some(result) = result { 2772 storage[0] = MaybeUninit::new(result); 2773 } 2774 2775 if task.exited && task.ready_to_delete() { 2776 Waitable::Guest(guest_thread.task).delete_from(state)?; 2777 } 2778 } 2779 } 2780 2781 Ok(status.pack(waitable)) 2782 } 2783 2784 /// Poll the specified future once on behalf of a guest->host call using an 2785 /// async-lowered import. 2786 /// 2787 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns 2788 /// `Pending`, add it to the set of futures to be polled as part of this 2789 /// instance's event loop until it completes, and then return 2790 /// `Ok(Some(handle))` where `handle` is the waitable handle to return. 2791 /// 2792 /// Whether the future returns `Ready` immediately or later, the `lower` 2793 /// function will be used to lower the result, if any, into the guest caller's 2794 /// stack and linear memory. The `lower` function is invoked with `None` if 2795 /// the future is cancelled. 2796 pub(crate) fn first_poll<T: 'static, R: Send + 'static>( 2797 self, 2798 mut store: StoreContextMut<'_, T>, 2799 future: impl Future<Output = Result<R>> + Send + 'static, 2800 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static, 2801 ) -> Result<Option<u32>> { 2802 let token = StoreToken::new(store.as_context_mut()); 2803 let state = store.0.concurrent_state_mut(); 2804 let task = state.current_host_thread()?; 2805 2806 // Create an abortable future which hooks calls to poll and manages call 2807 // context state for the future. 2808 let (join_handle, future) = JoinHandle::run(future); 2809 { 2810 let state = &mut state.get_mut(task)?.state; 2811 assert!(matches!(state, HostTaskState::CalleeStarted)); 2812 *state = HostTaskState::CalleeRunning(join_handle); 2813 } 2814 2815 let mut future = Box::pin(future); 2816 2817 // Finally, poll the future. We can use a dummy `Waker` here because 2818 // we'll add the future to `ConcurrentState::futures` and poll it 2819 // automatically from the event loop if it doesn't complete immediately 2820 // here. 2821 let poll = tls::set(store.0, || { 2822 future 2823 .as_mut() 2824 .poll(&mut Context::from_waker(&Waker::noop())) 2825 }); 2826 2827 match poll { 2828 // It finished immediately; lower the result and delete the task. 2829 Poll::Ready(result) => { 2830 let result = result.transpose()?; 2831 lower(store.as_context_mut(), result)?; 2832 return Ok(None); 2833 } 2834 2835 // Future isn't ready yet, so fall through. 2836 Poll::Pending => {} 2837 } 2838 2839 // It hasn't finished yet; add the future to 2840 // `ConcurrentState::futures` so it will be polled by the event 2841 // loop and allocate a waitable handle to return to the guest. 2842 2843 // Wrap the future in a closure responsible for lowering the result into 2844 // the guest's stack and memory, as well as notifying any waiters that 2845 // the task returned. 2846 let future = Box::pin(async move { 2847 let result = match future.await { 2848 Some(result) => Some(result?), 2849 None => None, 2850 }; 2851 let on_complete = move |store: &mut dyn VMStore| { 2852 // Restore the `current_thread` to be the host so `lower` knows 2853 // how to manipulate borrows and knows which scope of borrows 2854 // to check. 2855 let mut store = token.as_context_mut(store); 2856 let old = store.0.set_thread(task)?; 2857 2858 let status = if result.is_some() { 2859 Status::Returned 2860 } else { 2861 Status::ReturnCancelled 2862 }; 2863 2864 lower(store.as_context_mut(), result)?; 2865 let state = store.0.concurrent_state_mut(); 2866 match &mut state.get_mut(task)?.state { 2867 // The task is already flagged as finished because it was 2868 // cancelled. No need to transition further. 2869 HostTaskState::CalleeDone { .. } => {} 2870 2871 // Otherwise transition this task to the done state. 2872 other => *other = HostTaskState::CalleeDone { cancelled: false }, 2873 } 2874 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?; 2875 2876 store.0.set_thread(old)?; 2877 Ok(()) 2878 }; 2879 2880 // Here we schedule a task to run on a worker fiber to do the 2881 // lowering since it may involve a call to the guest's realloc 2882 // function. This is necessary because calling the guest while 2883 // there are host embedder frames on the stack is unsound. 2884 tls::get(move |store| { 2885 store 2886 .concurrent_state_mut() 2887 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new( 2888 on_complete, 2889 )))); 2890 Ok(()) 2891 }) 2892 }); 2893 2894 // Make this task visible to the guest and then record what it 2895 // was made visible as. 2896 let state = store.0.concurrent_state_mut(); 2897 state.push_future(future); 2898 let caller = state.get_mut(task)?.caller; 2899 let instance = state.get_mut(caller.task)?.instance; 2900 let handle = store 2901 .0 2902 .instance_state(instance) 2903 .handle_table() 2904 .subtask_insert_host(task.rep())?; 2905 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); 2906 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}"); 2907 2908 // Restore the currently running thread to this host task's 2909 // caller. Note that the host task isn't deallocated as it's 2910 // within the store and will get deallocated later. 2911 store.0.set_thread(caller)?; 2912 Ok(Some(handle)) 2913 } 2914 2915 /// Implements the `task.return` intrinsic, lifting the result for the 2916 /// current guest task. 2917 pub(crate) fn task_return( 2918 self, 2919 store: &mut dyn VMStore, 2920 ty: TypeTupleIndex, 2921 options: OptionsIndex, 2922 storage: &[ValRaw], 2923 ) -> Result<()> { 2924 let state = store.concurrent_state_mut(); 2925 let guest_thread = state.current_guest_thread()?; 2926 let lift = state 2927 .get_mut(guest_thread.task)? 2928 .lift_result 2929 .take() 2930 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?; 2931 if !state.get_mut(guest_thread.task)?.result.is_none() { 2932 bail_bug!("task result unexpectedly already set"); 2933 } 2934 2935 let CanonicalOptions { 2936 string_encoding, 2937 data_model, 2938 .. 2939 } = &self.id().get(store).component().env_component().options[options]; 2940 2941 let invalid = ty != lift.ty 2942 || string_encoding != &lift.string_encoding 2943 || match data_model { 2944 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { 2945 Some(memory) => { 2946 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); 2947 let actual = self.id().get(store).runtime_memory(memory); 2948 expected != actual.as_ptr() 2949 } 2950 // Memory not specified, meaning it didn't need to be 2951 // specified per validation, so not invalid. 2952 None => false, 2953 }, 2954 // Always invalid as this isn't supported. 2955 CanonicalOptionsDataModel::Gc { .. } => true, 2956 }; 2957 2958 if invalid { 2959 bail!(Trap::TaskReturnInvalid); 2960 } 2961 2962 log::trace!("task.return for {guest_thread:?}"); 2963 2964 let result = (lift.lift)(store, storage)?; 2965 self.task_complete(store, guest_thread.task, result, Status::Returned) 2966 } 2967 2968 /// Implements the `task.cancel` intrinsic. 2969 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> { 2970 let state = store.concurrent_state_mut(); 2971 let guest_thread = state.current_guest_thread()?; 2972 let task = state.get_mut(guest_thread.task)?; 2973 if !task.cancel_sent { 2974 bail!(Trap::TaskCancelNotCancelled); 2975 } 2976 _ = task 2977 .lift_result 2978 .take() 2979 .ok_or_else(|| Trap::TaskCancelOrReturnTwice)?; 2980 2981 if !task.result.is_none() { 2982 bail_bug!("task result should not bet set yet"); 2983 } 2984 2985 log::trace!("task.cancel for {guest_thread:?}"); 2986 2987 self.task_complete( 2988 store, 2989 guest_thread.task, 2990 Box::new(DummyResult), 2991 Status::ReturnCancelled, 2992 ) 2993 } 2994 2995 /// Complete the specified guest task (i.e. indicate that it has either 2996 /// returned a (possibly empty) result or cancelled itself). 2997 /// 2998 /// This will return any resource borrows and notify any current or future 2999 /// waiters that the task has completed. 3000 fn task_complete( 3001 self, 3002 store: &mut StoreOpaque, 3003 guest_task: TableId<GuestTask>, 3004 result: Box<dyn Any + Send + Sync>, 3005 status: Status, 3006 ) -> Result<()> { 3007 store 3008 .component_resource_tables(Some(self)) 3009 .validate_scope_exit()?; 3010 3011 let state = store.concurrent_state_mut(); 3012 let task = state.get_mut(guest_task)?; 3013 3014 if let Caller::Host { tx, .. } = &mut task.caller { 3015 if let Some(tx) = tx.take() { 3016 _ = tx.send(result); 3017 } 3018 } else { 3019 task.result = Some(result); 3020 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; 3021 } 3022 3023 Ok(()) 3024 } 3025 3026 /// Implements the `waitable-set.new` intrinsic. 3027 pub(crate) fn waitable_set_new( 3028 self, 3029 store: &mut StoreOpaque, 3030 caller_instance: RuntimeComponentInstanceIndex, 3031 ) -> Result<u32> { 3032 let set = store.concurrent_state_mut().push(WaitableSet::default())?; 3033 let handle = store 3034 .instance_state(RuntimeInstance { 3035 instance: self.id().instance(), 3036 index: caller_instance, 3037 }) 3038 .handle_table() 3039 .waitable_set_insert(set.rep())?; 3040 log::trace!("new waitable set {set:?} (handle {handle})"); 3041 Ok(handle) 3042 } 3043 3044 /// Implements the `waitable-set.drop` intrinsic. 3045 pub(crate) fn waitable_set_drop( 3046 self, 3047 store: &mut StoreOpaque, 3048 caller_instance: RuntimeComponentInstanceIndex, 3049 set: u32, 3050 ) -> Result<()> { 3051 let rep = store 3052 .instance_state(RuntimeInstance { 3053 instance: self.id().instance(), 3054 index: caller_instance, 3055 }) 3056 .handle_table() 3057 .waitable_set_remove(set)?; 3058 3059 log::trace!("drop waitable set {rep} (handle {set})"); 3060 3061 let set = store 3062 .concurrent_state_mut() 3063 .delete(TableId::<WaitableSet>::new(rep))?; 3064 3065 if !set.waiting.is_empty() { 3066 bail!(Trap::WaitableSetDropHasWaiters); 3067 } 3068 3069 Ok(()) 3070 } 3071 3072 /// Implements the `waitable.join` intrinsic. 3073 pub(crate) fn waitable_join( 3074 self, 3075 store: &mut StoreOpaque, 3076 caller_instance: RuntimeComponentInstanceIndex, 3077 waitable_handle: u32, 3078 set_handle: u32, 3079 ) -> Result<()> { 3080 let mut instance = self.id().get_mut(store); 3081 let waitable = 3082 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; 3083 3084 let set = if set_handle == 0 { 3085 None 3086 } else { 3087 let set = instance.instance_states().0[caller_instance] 3088 .handle_table() 3089 .waitable_set_rep(set_handle)?; 3090 3091 Some(TableId::<WaitableSet>::new(set)) 3092 }; 3093 3094 log::trace!( 3095 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", 3096 ); 3097 3098 waitable.join(store.concurrent_state_mut(), set) 3099 } 3100 3101 /// Implements the `subtask.drop` intrinsic. 3102 pub(crate) fn subtask_drop( 3103 self, 3104 store: &mut StoreOpaque, 3105 caller_instance: RuntimeComponentInstanceIndex, 3106 task_id: u32, 3107 ) -> Result<()> { 3108 self.waitable_join(store, caller_instance, task_id, 0)?; 3109 3110 let (rep, is_host) = store 3111 .instance_state(RuntimeInstance { 3112 instance: self.id().instance(), 3113 index: caller_instance, 3114 }) 3115 .handle_table() 3116 .subtask_remove(task_id)?; 3117 3118 let concurrent_state = store.concurrent_state_mut(); 3119 let (waitable, expected_caller, delete) = if is_host { 3120 let id = TableId::<HostTask>::new(rep); 3121 let task = concurrent_state.get_mut(id)?; 3122 match &task.state { 3123 HostTaskState::CalleeRunning(_) => bail!(Trap::SubtaskDropNotResolved), 3124 HostTaskState::CalleeDone { .. } => {} 3125 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => { 3126 bail_bug!("invalid state for callee in `subtask.drop`") 3127 } 3128 } 3129 (Waitable::Host(id), task.caller, true) 3130 } else { 3131 let id = TableId::<GuestTask>::new(rep); 3132 let task = concurrent_state.get_mut(id)?; 3133 if task.lift_result.is_some() { 3134 bail!(Trap::SubtaskDropNotResolved); 3135 } 3136 if let Caller::Guest { thread } = task.caller { 3137 ( 3138 Waitable::Guest(id), 3139 thread, 3140 concurrent_state.get_mut(id)?.exited, 3141 ) 3142 } else { 3143 bail_bug!("expected guest caller for `subtask.drop`") 3144 } 3145 }; 3146 3147 waitable.common(concurrent_state)?.handle = None; 3148 3149 // If this subtask has an event that means that the terminal status of 3150 // this subtask wasn't yet received so it can't be dropped yet. 3151 if waitable.take_event(concurrent_state)?.is_some() { 3152 bail!(Trap::SubtaskDropNotResolved); 3153 } 3154 3155 if delete { 3156 waitable.delete_from(concurrent_state)?; 3157 } 3158 3159 // Since waitables can neither be passed between instances nor forged, 3160 // this should never fail unless there's a bug in Wasmtime, but we check 3161 // here to be sure: 3162 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?); 3163 log::trace!("subtask_drop {waitable:?} (handle {task_id})"); 3164 Ok(()) 3165 } 3166 3167 /// Implements the `waitable-set.wait` intrinsic. 3168 pub(crate) fn waitable_set_wait( 3169 self, 3170 store: &mut StoreOpaque, 3171 options: OptionsIndex, 3172 set: u32, 3173 payload: u32, 3174 ) -> Result<u32> { 3175 if !self.options(store, options).async_ { 3176 // The caller may only call `waitable-set.wait` from an async task 3177 // (i.e. a task created via a call to an async export). 3178 // Otherwise, we'll trap. 3179 store.check_blocking()?; 3180 } 3181 3182 let &CanonicalOptions { 3183 cancellable, 3184 instance: caller_instance, 3185 .. 3186 } = &self.id().get(store).component().env_component().options[options]; 3187 let rep = store 3188 .instance_state(RuntimeInstance { 3189 instance: self.id().instance(), 3190 index: caller_instance, 3191 }) 3192 .handle_table() 3193 .waitable_set_rep(set)?; 3194 3195 self.waitable_check( 3196 store, 3197 cancellable, 3198 WaitableCheck::Wait, 3199 WaitableCheckParams { 3200 set: TableId::new(rep), 3201 options, 3202 payload, 3203 }, 3204 ) 3205 } 3206 3207 /// Implements the `waitable-set.poll` intrinsic. 3208 pub(crate) fn waitable_set_poll( 3209 self, 3210 store: &mut StoreOpaque, 3211 options: OptionsIndex, 3212 set: u32, 3213 payload: u32, 3214 ) -> Result<u32> { 3215 let &CanonicalOptions { 3216 cancellable, 3217 instance: caller_instance, 3218 .. 3219 } = &self.id().get(store).component().env_component().options[options]; 3220 let rep = store 3221 .instance_state(RuntimeInstance { 3222 instance: self.id().instance(), 3223 index: caller_instance, 3224 }) 3225 .handle_table() 3226 .waitable_set_rep(set)?; 3227 3228 self.waitable_check( 3229 store, 3230 cancellable, 3231 WaitableCheck::Poll, 3232 WaitableCheckParams { 3233 set: TableId::new(rep), 3234 options, 3235 payload, 3236 }, 3237 ) 3238 } 3239 3240 /// Implements the `thread.index` intrinsic. 3241 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> { 3242 let thread_id = store.concurrent_state_mut().current_guest_thread()?.thread; 3243 match store 3244 .concurrent_state_mut() 3245 .get_mut(thread_id)? 3246 .instance_rep 3247 { 3248 Some(r) => Ok(r), 3249 None => bail_bug!("thread should have instance_rep by now"), 3250 } 3251 } 3252 3253 /// Implements the `thread.new-indirect` intrinsic. 3254 pub(crate) fn thread_new_indirect<T: 'static>( 3255 self, 3256 mut store: StoreContextMut<T>, 3257 runtime_instance: RuntimeComponentInstanceIndex, 3258 _func_ty_idx: TypeFuncIndex, // currently unused 3259 start_func_table_idx: RuntimeTableIndex, 3260 start_func_idx: u32, 3261 context: i32, 3262 ) -> Result<u32> { 3263 log::trace!("creating new thread"); 3264 3265 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); 3266 let (instance, registry) = self.id().get_mut_and_registry(store.0); 3267 let callee = instance 3268 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? 3269 .ok_or_else(|| Trap::ThreadNewIndirectUninitialized)?; 3270 if callee.type_index(store.0) != start_func_ty.type_index() { 3271 bail!(Trap::ThreadNewIndirectInvalidType); 3272 } 3273 3274 let token = StoreToken::new(store.as_context_mut()); 3275 let start_func = Box::new( 3276 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { 3277 let old_thread = store.set_thread(guest_thread)?; 3278 log::trace!( 3279 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" 3280 ); 3281 3282 let mut store = token.as_context_mut(store); 3283 let mut params = [ValRaw::i32(context)]; 3284 // Use call_unchecked rather than call or call_async, as we don't want to run the function 3285 // on a separate fiber if we're running in an async store. 3286 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; 3287 3288 self.cleanup_thread(store.0, guest_thread, runtime_instance)?; 3289 log::trace!("explicit thread {guest_thread:?} completed"); 3290 let state = store.0.concurrent_state_mut(); 3291 let task = state.get_mut(guest_thread.task)?; 3292 if task.threads.is_empty() && !task.returned_or_cancelled() { 3293 bail!(Trap::NoAsyncResult); 3294 } 3295 store.0.set_thread(old_thread)?; 3296 let state = store.0.concurrent_state_mut(); 3297 if let Some(t) = old_thread.guest() { 3298 state.get_mut(t.thread)?.state = GuestThreadState::Running; 3299 } 3300 if state.get_mut(guest_thread.task)?.ready_to_delete() { 3301 Waitable::Guest(guest_thread.task).delete_from(state)?; 3302 } 3303 log::trace!("thread start: restored {old_thread:?} as current thread"); 3304 3305 Ok(()) 3306 }, 3307 ); 3308 3309 let state = store.0.concurrent_state_mut(); 3310 let current_thread = state.current_guest_thread()?; 3311 let parent_task = current_thread.task; 3312 3313 let new_thread = GuestThread::new_explicit(state, parent_task, start_func)?; 3314 let thread_id = state.push(new_thread)?; 3315 state.get_mut(parent_task)?.threads.insert(thread_id); 3316 3317 log::trace!("new thread with id {thread_id:?} created"); 3318 3319 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) 3320 } 3321 3322 pub(crate) fn resume_thread( 3323 self, 3324 store: &mut StoreOpaque, 3325 runtime_instance: RuntimeComponentInstanceIndex, 3326 thread_idx: u32, 3327 high_priority: bool, 3328 allow_ready: bool, 3329 ) -> Result<()> { 3330 let thread_id = 3331 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; 3332 let state = store.concurrent_state_mut(); 3333 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; 3334 let thread = state.get_mut(guest_thread.thread)?; 3335 3336 match mem::replace(&mut thread.state, GuestThreadState::Running) { 3337 GuestThreadState::NotStartedExplicit(start_func) => { 3338 log::trace!("starting thread {guest_thread:?}"); 3339 let guest_call = WorkItem::GuestCall( 3340 runtime_instance, 3341 GuestCall { 3342 thread: guest_thread, 3343 kind: GuestCallKind::StartExplicit(Box::new(move |store| { 3344 start_func(store, guest_thread) 3345 })), 3346 }, 3347 ); 3348 store 3349 .concurrent_state_mut() 3350 .push_work_item(guest_call, high_priority); 3351 } 3352 GuestThreadState::Suspended(fiber) => { 3353 log::trace!("resuming thread {thread_id:?} that was suspended"); 3354 store 3355 .concurrent_state_mut() 3356 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); 3357 } 3358 GuestThreadState::Ready(fiber) if allow_ready => { 3359 log::trace!("resuming thread {thread_id:?} that was ready"); 3360 thread.state = GuestThreadState::Ready(fiber); 3361 store 3362 .concurrent_state_mut() 3363 .promote_thread_work_item(guest_thread); 3364 } 3365 other => { 3366 thread.state = other; 3367 bail!(Trap::CannotResumeThread); 3368 } 3369 } 3370 Ok(()) 3371 } 3372 3373 fn add_guest_thread_to_instance_table( 3374 self, 3375 thread_id: TableId<GuestThread>, 3376 store: &mut StoreOpaque, 3377 runtime_instance: RuntimeComponentInstanceIndex, 3378 ) -> Result<u32> { 3379 let guest_id = store 3380 .instance_state(RuntimeInstance { 3381 instance: self.id().instance(), 3382 index: runtime_instance, 3383 }) 3384 .thread_handle_table() 3385 .guest_thread_insert(thread_id.rep())?; 3386 store 3387 .concurrent_state_mut() 3388 .get_mut(thread_id)? 3389 .instance_rep = Some(guest_id); 3390 Ok(guest_id) 3391 } 3392 3393 /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`, 3394 /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics. 3395 pub(crate) fn suspension_intrinsic( 3396 self, 3397 store: &mut StoreOpaque, 3398 caller: RuntimeComponentInstanceIndex, 3399 cancellable: bool, 3400 yielding: bool, 3401 to_thread: SuspensionTarget, 3402 ) -> Result<WaitResult> { 3403 let guest_thread = store.concurrent_state_mut().current_guest_thread()?; 3404 if to_thread.is_none() { 3405 let state = store.concurrent_state_mut(); 3406 if yielding { 3407 // This is a `thread.yield` call 3408 if !state.may_block(guest_thread.task)? { 3409 // In a non-blocking context, a `thread.yield` may trigger 3410 // other threads in the same component instance to run. 3411 if !state.promote_instance_local_thread_work_item(caller) { 3412 // No other threads are runnable, so just return 3413 return Ok(WaitResult::Completed); 3414 } 3415 } 3416 } else { 3417 // The caller may only call `thread.suspend` from an async task 3418 // (i.e. a task created via a call to an async export). 3419 // Otherwise, we'll trap. 3420 store.check_blocking()?; 3421 } 3422 } 3423 3424 // There could be a pending cancellation from a previous uncancellable wait 3425 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? { 3426 return Ok(WaitResult::Cancelled); 3427 } 3428 3429 match to_thread { 3430 SuspensionTarget::SomeSuspended(thread) => { 3431 self.resume_thread(store, caller, thread, true, false)? 3432 } 3433 SuspensionTarget::Some(thread) => { 3434 self.resume_thread(store, caller, thread, true, true)? 3435 } 3436 SuspensionTarget::None => { /* nothing to do */ } 3437 } 3438 3439 let reason = if yielding { 3440 SuspendReason::Yielding { 3441 thread: guest_thread, 3442 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3443 // we're handling a `thread.yield-to-suspended` call; otherwise it would 3444 // panic if we called it in a non-blocking context. 3445 skip_may_block_check: to_thread.is_some(), 3446 } 3447 } else { 3448 SuspendReason::ExplicitlySuspending { 3449 thread: guest_thread, 3450 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3451 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would 3452 // panic if we called it in a non-blocking context. 3453 skip_may_block_check: to_thread.is_some(), 3454 } 3455 }; 3456 3457 store.suspend(reason)?; 3458 3459 if cancellable && store.concurrent_state_mut().take_pending_cancellation()? { 3460 Ok(WaitResult::Cancelled) 3461 } else { 3462 Ok(WaitResult::Completed) 3463 } 3464 } 3465 3466 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. 3467 fn waitable_check( 3468 self, 3469 store: &mut StoreOpaque, 3470 cancellable: bool, 3471 check: WaitableCheck, 3472 params: WaitableCheckParams, 3473 ) -> Result<u32> { 3474 let guest_thread = store.concurrent_state_mut().current_guest_thread()?; 3475 3476 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set); 3477 3478 let state = store.concurrent_state_mut(); 3479 let task = state.get_mut(guest_thread.task)?; 3480 3481 // If we're waiting, and there are no events immediately available, 3482 // suspend the fiber until that changes. 3483 match &check { 3484 WaitableCheck::Wait => { 3485 let set = params.set; 3486 3487 if (task.event.is_none() 3488 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) 3489 && state.get_mut(set)?.ready.is_empty() 3490 { 3491 if cancellable { 3492 let old = state 3493 .get_mut(guest_thread.thread)? 3494 .wake_on_cancel 3495 .replace(set); 3496 if !old.is_none() { 3497 bail_bug!("thread unexpectedly in a prior wake_on_cancel set"); 3498 } 3499 } 3500 3501 store.suspend(SuspendReason::Waiting { 3502 set, 3503 thread: guest_thread, 3504 skip_may_block_check: false, 3505 })?; 3506 } 3507 } 3508 WaitableCheck::Poll => {} 3509 } 3510 3511 log::trace!( 3512 "waitable check for {guest_thread:?}; set {:?}, part two", 3513 params.set 3514 ); 3515 3516 // Deliver any pending events to the guest and return. 3517 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; 3518 3519 let (ordinal, handle, result) = match &check { 3520 WaitableCheck::Wait => { 3521 let (event, waitable) = match event { 3522 Some(p) => p, 3523 None => bail_bug!("event expected to be present"), 3524 }; 3525 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3526 let (ordinal, result) = event.parts(); 3527 (ordinal, handle, result) 3528 } 3529 WaitableCheck::Poll => { 3530 if let Some((event, waitable)) = event { 3531 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3532 let (ordinal, result) = event.parts(); 3533 (ordinal, handle, result) 3534 } else { 3535 log::trace!( 3536 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", 3537 guest_thread.task, 3538 params.set 3539 ); 3540 let (ordinal, result) = Event::None.parts(); 3541 (ordinal, 0, result) 3542 } 3543 } 3544 }; 3545 let memory = self.options_memory_mut(store, params.options); 3546 let ptr = func::validate_inbounds_dynamic( 3547 &CanonicalAbiInfo::POINTER_PAIR, 3548 memory, 3549 &ValRaw::u32(params.payload), 3550 )?; 3551 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); 3552 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); 3553 Ok(ordinal) 3554 } 3555 3556 /// Implements the `subtask.cancel` intrinsic. 3557 pub(crate) fn subtask_cancel( 3558 self, 3559 store: &mut StoreOpaque, 3560 caller_instance: RuntimeComponentInstanceIndex, 3561 async_: bool, 3562 task_id: u32, 3563 ) -> Result<u32> { 3564 if !async_ { 3565 // The caller may only sync call `subtask.cancel` from an async task 3566 // (i.e. a task created via a call to an async export). Otherwise, 3567 // we'll trap. 3568 store.check_blocking()?; 3569 } 3570 3571 let (rep, is_host) = store 3572 .instance_state(RuntimeInstance { 3573 instance: self.id().instance(), 3574 index: caller_instance, 3575 }) 3576 .handle_table() 3577 .subtask_rep(task_id)?; 3578 let (waitable, expected_caller) = if is_host { 3579 let id = TableId::<HostTask>::new(rep); 3580 ( 3581 Waitable::Host(id), 3582 store.concurrent_state_mut().get_mut(id)?.caller, 3583 ) 3584 } else { 3585 let id = TableId::<GuestTask>::new(rep); 3586 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller { 3587 (Waitable::Guest(id), thread) 3588 } else { 3589 bail_bug!("expected guest caller for `subtask.cancel`") 3590 } 3591 }; 3592 // Since waitables can neither be passed between instances nor forged, 3593 // this should never fail unless there's a bug in Wasmtime, but we check 3594 // here to be sure: 3595 let concurrent_state = store.concurrent_state_mut(); 3596 debug_assert_eq!(expected_caller, concurrent_state.current_guest_thread()?); 3597 3598 log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); 3599 3600 let needs_block; 3601 if let Waitable::Host(host_task) = waitable { 3602 let state = &mut concurrent_state.get_mut(host_task)?.state; 3603 match mem::replace(state, HostTaskState::CalleeDone { cancelled: true }) { 3604 // If the callee is still running, signal an abort is requested. 3605 // 3606 // After cancelling this falls through to block waiting for the 3607 // host task to actually finish assuming that `async_` is false. 3608 // This blocking behavior resolves the race of `handle.abort()` 3609 // with the task actually getting cancelled or finishing. 3610 HostTaskState::CalleeRunning(handle) => { 3611 handle.abort(); 3612 needs_block = true; 3613 } 3614 3615 // Cancellation was already requested, so fail as the task can't 3616 // be cancelled twice. 3617 HostTaskState::CalleeDone { cancelled } => { 3618 if cancelled { 3619 bail!(Trap::SubtaskCancelAfterTerminal); 3620 } else { 3621 // The callee is already done so there's no need to 3622 // block further for an event. 3623 needs_block = false; 3624 } 3625 } 3626 3627 // These states should not be possible for a subtask that's 3628 // visible from the guest, so trap here. 3629 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => { 3630 bail_bug!("invalid states for host callee") 3631 } 3632 } 3633 } else { 3634 let caller = concurrent_state.current_guest_thread()?; 3635 let guest_task = TableId::<GuestTask>::new(rep); 3636 let task = concurrent_state.get_mut(guest_task)?; 3637 if !task.already_lowered_parameters() { 3638 // The task is in a `starting` state, meaning it hasn't run at 3639 // all yet. Here we update its fields to indicate that it is 3640 // ready to delete immediately once `subtask.drop` is called. 3641 task.lower_params = None; 3642 task.lift_result = None; 3643 task.exited = true; 3644 3645 let instance = task.instance; 3646 3647 assert_eq!(1, task.threads.len()); 3648 let thread = mem::take(&mut task.threads).into_iter().next().unwrap(); 3649 let concurrent_state = store.concurrent_state_mut(); 3650 concurrent_state.delete(thread)?; 3651 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete()); 3652 3653 // Not yet started; cancel and remove from pending 3654 let pending = &mut store.instance_state(instance).concurrent_state().pending; 3655 let pending_count = pending.len(); 3656 pending.retain(|thread, _| thread.task != guest_task); 3657 // If there were no pending threads for this task, we're in an error state 3658 if pending.len() == pending_count { 3659 bail!(Trap::SubtaskCancelAfterTerminal); 3660 } 3661 return Ok(Status::StartCancelled as u32); 3662 } else if !task.returned_or_cancelled() { 3663 // Started, but not yet returned or cancelled; send the 3664 // `CANCELLED` event 3665 task.cancel_sent = true; 3666 // Note that this might overwrite an event that was set earlier 3667 // (e.g. `Event::None` if the task is yielding, or 3668 // `Event::Cancelled` if it was already cancelled), but that's 3669 // okay -- this should supersede the previous state. 3670 task.event = Some(Event::Cancelled); 3671 let runtime_instance = task.instance.index; 3672 for thread in task.threads.clone() { 3673 let thread = QualifiedThreadId { 3674 task: guest_task, 3675 thread, 3676 }; 3677 if let Some(set) = concurrent_state 3678 .get_mut(thread.thread)? 3679 .wake_on_cancel 3680 .take() 3681 { 3682 let item = match concurrent_state.get_mut(set)?.waiting.remove(&thread) { 3683 Some(WaitMode::Fiber(fiber)) => WorkItem::ResumeFiber(fiber), 3684 Some(WaitMode::Callback(instance)) => WorkItem::GuestCall( 3685 runtime_instance, 3686 GuestCall { 3687 thread, 3688 kind: GuestCallKind::DeliverEvent { 3689 instance, 3690 set: None, 3691 }, 3692 }, 3693 ), 3694 None => bail_bug!("thread not present in wake_on_cancel set"), 3695 }; 3696 concurrent_state.push_high_priority(item); 3697 3698 store.suspend(SuspendReason::Yielding { 3699 thread: caller, 3700 // `subtask.cancel` is not allowed to be called in a 3701 // sync context, so we cannot skip the may-block check. 3702 skip_may_block_check: false, 3703 })?; 3704 break; 3705 } 3706 } 3707 3708 // Guest tasks need to block if they have not yet returned or 3709 // cancelled, even as a result of the event delivery above. 3710 needs_block = !store 3711 .concurrent_state_mut() 3712 .get_mut(guest_task)? 3713 .returned_or_cancelled() 3714 } else { 3715 needs_block = false; 3716 } 3717 }; 3718 3719 // If we need to block waiting on the terminal status of this subtask 3720 // then return immediately in `async` mode, or otherwise wait for the 3721 // event to get signaled through the store. 3722 if needs_block { 3723 if async_ { 3724 return Ok(BLOCKED); 3725 } 3726 3727 // Wait for this waitable to get signaled with its terminal status 3728 // from the completion callback enqueued by `first_poll`. Once 3729 // that's done fall through to the sahred 3730 store.wait_for_event(waitable)?; 3731 3732 // .. fall through to determine what event's in store for us. 3733 } 3734 3735 let event = waitable.take_event(store.concurrent_state_mut())?; 3736 if let Some(Event::Subtask { 3737 status: status @ (Status::Returned | Status::ReturnCancelled), 3738 }) = event 3739 { 3740 Ok(status as u32) 3741 } else { 3742 bail!(Trap::SubtaskCancelAfterTerminal); 3743 } 3744 } 3745 3746 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> { 3747 store.concurrent_state_mut().context_get(slot) 3748 } 3749 3750 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> { 3751 store.concurrent_state_mut().context_set(slot, value) 3752 } 3753 } 3754 3755 /// Trait representing component model ABI async intrinsics and fused adapter 3756 /// helper functions. 3757 /// 3758 /// SAFETY (callers): Most of the methods in this trait accept raw pointers, 3759 /// which must be valid for at least the duration of the call (and possibly for 3760 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` 3761 /// pointers used for async calls). 3762 pub trait VMComponentAsyncStore { 3763 /// A helper function for fused adapter modules involving calls where the 3764 /// one of the caller or callee is async. 3765 /// 3766 /// This helper is not used when the caller and callee both use the sync 3767 /// ABI, only when at least one is async is this used. 3768 unsafe fn prepare_call( 3769 &mut self, 3770 instance: Instance, 3771 memory: *mut VMMemoryDefinition, 3772 start: NonNull<VMFuncRef>, 3773 return_: NonNull<VMFuncRef>, 3774 caller_instance: RuntimeComponentInstanceIndex, 3775 callee_instance: RuntimeComponentInstanceIndex, 3776 task_return_type: TypeTupleIndex, 3777 callee_async: bool, 3778 string_encoding: StringEncoding, 3779 result_count: u32, 3780 storage: *mut ValRaw, 3781 storage_len: usize, 3782 ) -> Result<()>; 3783 3784 /// A helper function for fused adapter modules involving calls where the 3785 /// caller is sync-lowered but the callee is async-lifted. 3786 unsafe fn sync_start( 3787 &mut self, 3788 instance: Instance, 3789 callback: *mut VMFuncRef, 3790 callee: NonNull<VMFuncRef>, 3791 param_count: u32, 3792 storage: *mut MaybeUninit<ValRaw>, 3793 storage_len: usize, 3794 ) -> Result<()>; 3795 3796 /// A helper function for fused adapter modules involving calls where the 3797 /// caller is async-lowered. 3798 unsafe fn async_start( 3799 &mut self, 3800 instance: Instance, 3801 callback: *mut VMFuncRef, 3802 post_return: *mut VMFuncRef, 3803 callee: NonNull<VMFuncRef>, 3804 param_count: u32, 3805 result_count: u32, 3806 flags: u32, 3807 ) -> Result<u32>; 3808 3809 /// The `future.write` intrinsic. 3810 fn future_write( 3811 &mut self, 3812 instance: Instance, 3813 caller: RuntimeComponentInstanceIndex, 3814 ty: TypeFutureTableIndex, 3815 options: OptionsIndex, 3816 future: u32, 3817 address: u32, 3818 ) -> Result<u32>; 3819 3820 /// The `future.read` intrinsic. 3821 fn future_read( 3822 &mut self, 3823 instance: Instance, 3824 caller: RuntimeComponentInstanceIndex, 3825 ty: TypeFutureTableIndex, 3826 options: OptionsIndex, 3827 future: u32, 3828 address: u32, 3829 ) -> Result<u32>; 3830 3831 /// The `future.drop-writable` intrinsic. 3832 fn future_drop_writable( 3833 &mut self, 3834 instance: Instance, 3835 ty: TypeFutureTableIndex, 3836 writer: u32, 3837 ) -> Result<()>; 3838 3839 /// The `stream.write` intrinsic. 3840 fn stream_write( 3841 &mut self, 3842 instance: Instance, 3843 caller: RuntimeComponentInstanceIndex, 3844 ty: TypeStreamTableIndex, 3845 options: OptionsIndex, 3846 stream: u32, 3847 address: u32, 3848 count: u32, 3849 ) -> Result<u32>; 3850 3851 /// The `stream.read` intrinsic. 3852 fn stream_read( 3853 &mut self, 3854 instance: Instance, 3855 caller: RuntimeComponentInstanceIndex, 3856 ty: TypeStreamTableIndex, 3857 options: OptionsIndex, 3858 stream: u32, 3859 address: u32, 3860 count: u32, 3861 ) -> Result<u32>; 3862 3863 /// The "fast-path" implementation of the `stream.write` intrinsic for 3864 /// "flat" (i.e. memcpy-able) payloads. 3865 fn flat_stream_write( 3866 &mut self, 3867 instance: Instance, 3868 caller: RuntimeComponentInstanceIndex, 3869 ty: TypeStreamTableIndex, 3870 options: OptionsIndex, 3871 payload_size: u32, 3872 payload_align: u32, 3873 stream: u32, 3874 address: u32, 3875 count: u32, 3876 ) -> Result<u32>; 3877 3878 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" 3879 /// (i.e. memcpy-able) payloads. 3880 fn flat_stream_read( 3881 &mut self, 3882 instance: Instance, 3883 caller: RuntimeComponentInstanceIndex, 3884 ty: TypeStreamTableIndex, 3885 options: OptionsIndex, 3886 payload_size: u32, 3887 payload_align: u32, 3888 stream: u32, 3889 address: u32, 3890 count: u32, 3891 ) -> Result<u32>; 3892 3893 /// The `stream.drop-writable` intrinsic. 3894 fn stream_drop_writable( 3895 &mut self, 3896 instance: Instance, 3897 ty: TypeStreamTableIndex, 3898 writer: u32, 3899 ) -> Result<()>; 3900 3901 /// The `error-context.debug-message` intrinsic. 3902 fn error_context_debug_message( 3903 &mut self, 3904 instance: Instance, 3905 ty: TypeComponentLocalErrorContextTableIndex, 3906 options: OptionsIndex, 3907 err_ctx_handle: u32, 3908 debug_msg_address: u32, 3909 ) -> Result<()>; 3910 3911 /// The `thread.new-indirect` intrinsic 3912 fn thread_new_indirect( 3913 &mut self, 3914 instance: Instance, 3915 caller: RuntimeComponentInstanceIndex, 3916 func_ty_idx: TypeFuncIndex, 3917 start_func_table_idx: RuntimeTableIndex, 3918 start_func_idx: u32, 3919 context: i32, 3920 ) -> Result<u32>; 3921 } 3922 3923 /// SAFETY: See trait docs. 3924 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> { 3925 unsafe fn prepare_call( 3926 &mut self, 3927 instance: Instance, 3928 memory: *mut VMMemoryDefinition, 3929 start: NonNull<VMFuncRef>, 3930 return_: NonNull<VMFuncRef>, 3931 caller_instance: RuntimeComponentInstanceIndex, 3932 callee_instance: RuntimeComponentInstanceIndex, 3933 task_return_type: TypeTupleIndex, 3934 callee_async: bool, 3935 string_encoding: StringEncoding, 3936 result_count_or_max_if_async: u32, 3937 storage: *mut ValRaw, 3938 storage_len: usize, 3939 ) -> Result<()> { 3940 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3941 // this method will have ensured that `storage` is a valid 3942 // pointer containing at least `storage_len` items. 3943 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); 3944 3945 unsafe { 3946 instance.prepare_call( 3947 StoreContextMut(self), 3948 start, 3949 return_, 3950 caller_instance, 3951 callee_instance, 3952 task_return_type, 3953 callee_async, 3954 memory, 3955 string_encoding, 3956 match result_count_or_max_if_async { 3957 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { 3958 params, 3959 has_result: false, 3960 }, 3961 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { 3962 params, 3963 has_result: true, 3964 }, 3965 result_count => CallerInfo::Sync { 3966 params, 3967 result_count, 3968 }, 3969 }, 3970 ) 3971 } 3972 } 3973 3974 unsafe fn sync_start( 3975 &mut self, 3976 instance: Instance, 3977 callback: *mut VMFuncRef, 3978 callee: NonNull<VMFuncRef>, 3979 param_count: u32, 3980 storage: *mut MaybeUninit<ValRaw>, 3981 storage_len: usize, 3982 ) -> Result<()> { 3983 unsafe { 3984 instance 3985 .start_call( 3986 StoreContextMut(self), 3987 callback, 3988 ptr::null_mut(), 3989 callee, 3990 param_count, 3991 1, 3992 START_FLAG_ASYNC_CALLEE, 3993 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3994 // this method will have ensured that `storage` is a valid 3995 // pointer containing at least `storage_len` items. 3996 Some(std::slice::from_raw_parts_mut(storage, storage_len)), 3997 ) 3998 .map(drop) 3999 } 4000 } 4001 4002 unsafe fn async_start( 4003 &mut self, 4004 instance: Instance, 4005 callback: *mut VMFuncRef, 4006 post_return: *mut VMFuncRef, 4007 callee: NonNull<VMFuncRef>, 4008 param_count: u32, 4009 result_count: u32, 4010 flags: u32, 4011 ) -> Result<u32> { 4012 unsafe { 4013 instance.start_call( 4014 StoreContextMut(self), 4015 callback, 4016 post_return, 4017 callee, 4018 param_count, 4019 result_count, 4020 flags, 4021 None, 4022 ) 4023 } 4024 } 4025 4026 fn future_write( 4027 &mut self, 4028 instance: Instance, 4029 caller: RuntimeComponentInstanceIndex, 4030 ty: TypeFutureTableIndex, 4031 options: OptionsIndex, 4032 future: u32, 4033 address: u32, 4034 ) -> Result<u32> { 4035 instance 4036 .guest_write( 4037 StoreContextMut(self), 4038 caller, 4039 TransmitIndex::Future(ty), 4040 options, 4041 None, 4042 future, 4043 address, 4044 1, 4045 ) 4046 .map(|result| result.encode()) 4047 } 4048 4049 fn future_read( 4050 &mut self, 4051 instance: Instance, 4052 caller: RuntimeComponentInstanceIndex, 4053 ty: TypeFutureTableIndex, 4054 options: OptionsIndex, 4055 future: u32, 4056 address: u32, 4057 ) -> Result<u32> { 4058 instance 4059 .guest_read( 4060 StoreContextMut(self), 4061 caller, 4062 TransmitIndex::Future(ty), 4063 options, 4064 None, 4065 future, 4066 address, 4067 1, 4068 ) 4069 .map(|result| result.encode()) 4070 } 4071 4072 fn stream_write( 4073 &mut self, 4074 instance: Instance, 4075 caller: RuntimeComponentInstanceIndex, 4076 ty: TypeStreamTableIndex, 4077 options: OptionsIndex, 4078 stream: u32, 4079 address: u32, 4080 count: u32, 4081 ) -> Result<u32> { 4082 instance 4083 .guest_write( 4084 StoreContextMut(self), 4085 caller, 4086 TransmitIndex::Stream(ty), 4087 options, 4088 None, 4089 stream, 4090 address, 4091 count, 4092 ) 4093 .map(|result| result.encode()) 4094 } 4095 4096 fn stream_read( 4097 &mut self, 4098 instance: Instance, 4099 caller: RuntimeComponentInstanceIndex, 4100 ty: TypeStreamTableIndex, 4101 options: OptionsIndex, 4102 stream: u32, 4103 address: u32, 4104 count: u32, 4105 ) -> Result<u32> { 4106 instance 4107 .guest_read( 4108 StoreContextMut(self), 4109 caller, 4110 TransmitIndex::Stream(ty), 4111 options, 4112 None, 4113 stream, 4114 address, 4115 count, 4116 ) 4117 .map(|result| result.encode()) 4118 } 4119 4120 fn future_drop_writable( 4121 &mut self, 4122 instance: Instance, 4123 ty: TypeFutureTableIndex, 4124 writer: u32, 4125 ) -> Result<()> { 4126 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) 4127 } 4128 4129 fn flat_stream_write( 4130 &mut self, 4131 instance: Instance, 4132 caller: RuntimeComponentInstanceIndex, 4133 ty: TypeStreamTableIndex, 4134 options: OptionsIndex, 4135 payload_size: u32, 4136 payload_align: u32, 4137 stream: u32, 4138 address: u32, 4139 count: u32, 4140 ) -> Result<u32> { 4141 instance 4142 .guest_write( 4143 StoreContextMut(self), 4144 caller, 4145 TransmitIndex::Stream(ty), 4146 options, 4147 Some(FlatAbi { 4148 size: payload_size, 4149 align: payload_align, 4150 }), 4151 stream, 4152 address, 4153 count, 4154 ) 4155 .map(|result| result.encode()) 4156 } 4157 4158 fn flat_stream_read( 4159 &mut self, 4160 instance: Instance, 4161 caller: RuntimeComponentInstanceIndex, 4162 ty: TypeStreamTableIndex, 4163 options: OptionsIndex, 4164 payload_size: u32, 4165 payload_align: u32, 4166 stream: u32, 4167 address: u32, 4168 count: u32, 4169 ) -> Result<u32> { 4170 instance 4171 .guest_read( 4172 StoreContextMut(self), 4173 caller, 4174 TransmitIndex::Stream(ty), 4175 options, 4176 Some(FlatAbi { 4177 size: payload_size, 4178 align: payload_align, 4179 }), 4180 stream, 4181 address, 4182 count, 4183 ) 4184 .map(|result| result.encode()) 4185 } 4186 4187 fn stream_drop_writable( 4188 &mut self, 4189 instance: Instance, 4190 ty: TypeStreamTableIndex, 4191 writer: u32, 4192 ) -> Result<()> { 4193 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) 4194 } 4195 4196 fn error_context_debug_message( 4197 &mut self, 4198 instance: Instance, 4199 ty: TypeComponentLocalErrorContextTableIndex, 4200 options: OptionsIndex, 4201 err_ctx_handle: u32, 4202 debug_msg_address: u32, 4203 ) -> Result<()> { 4204 instance.error_context_debug_message( 4205 StoreContextMut(self), 4206 ty, 4207 options, 4208 err_ctx_handle, 4209 debug_msg_address, 4210 ) 4211 } 4212 4213 fn thread_new_indirect( 4214 &mut self, 4215 instance: Instance, 4216 caller: RuntimeComponentInstanceIndex, 4217 func_ty_idx: TypeFuncIndex, 4218 start_func_table_idx: RuntimeTableIndex, 4219 start_func_idx: u32, 4220 context: i32, 4221 ) -> Result<u32> { 4222 instance.thread_new_indirect( 4223 StoreContextMut(self), 4224 caller, 4225 func_ty_idx, 4226 start_func_table_idx, 4227 start_func_idx, 4228 context, 4229 ) 4230 } 4231 } 4232 4233 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>; 4234 4235 /// Represents the state of a pending host task. 4236 /// 4237 /// This is used to represent tasks when the guest calls into the host. 4238 pub(crate) struct HostTask { 4239 common: WaitableCommon, 4240 4241 /// Guest thread which called the host. 4242 caller: QualifiedThreadId, 4243 4244 /// State of borrows/etc the host needs to track. Used when the guest passes 4245 /// borrows to the host, for example. 4246 call_context: CallContext, 4247 4248 state: HostTaskState, 4249 } 4250 4251 enum HostTaskState { 4252 /// A host task has been created and it's considered "started". 4253 /// 4254 /// The host task has yet to enter `first_poll` or `poll_and_block` which 4255 /// is where this will get updated further. 4256 CalleeStarted, 4257 4258 /// State used for tasks in `first_poll` meaning that the guest did an async 4259 /// lower of a host async function which is blocked. The specified handle is 4260 /// linked to the future in the main `FuturesUnordered` of a store which is 4261 /// used to cancel it if the guest requests cancellation. 4262 CalleeRunning(JoinHandle), 4263 4264 /// Terminal state used for tasks in `poll_and_block` to store the result of 4265 /// their computation. Note that this state is not used for tasks in 4266 /// `first_poll`. 4267 CalleeFinished(LiftedResult), 4268 4269 /// Terminal state for host tasks meaning that the task was cancelled or the 4270 /// result was taken. 4271 CalleeDone { cancelled: bool }, 4272 } 4273 4274 impl HostTask { 4275 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self { 4276 Self { 4277 common: WaitableCommon::default(), 4278 call_context: CallContext::default(), 4279 caller, 4280 state, 4281 } 4282 } 4283 } 4284 4285 impl TableDebug for HostTask { 4286 fn type_name() -> &'static str { 4287 "HostTask" 4288 } 4289 } 4290 4291 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>; 4292 4293 /// Represents the caller of a given guest task. 4294 enum Caller { 4295 /// The host called the guest task. 4296 Host { 4297 /// If present, may be used to deliver the result. 4298 tx: Option<oneshot::Sender<LiftedResult>>, 4299 /// If true, there's a host future that must be dropped before the task 4300 /// can be deleted. 4301 host_future_present: bool, 4302 /// Represents the caller of the host function which called back into a 4303 /// guest. Note that this thread could belong to an entirely unrelated 4304 /// top-level component instance than the one the host called into. 4305 caller: CurrentThread, 4306 }, 4307 /// Another guest thread called the guest task 4308 Guest { 4309 /// The id of the caller 4310 thread: QualifiedThreadId, 4311 }, 4312 } 4313 4314 /// Represents a closure and related canonical ABI parameters required to 4315 /// validate a `task.return` call at runtime and lift the result. 4316 struct LiftResult { 4317 lift: RawLift, 4318 ty: TypeTupleIndex, 4319 memory: Option<SendSyncPtr<VMMemoryDefinition>>, 4320 string_encoding: StringEncoding, 4321 } 4322 4323 /// The table ID for a guest thread, qualified by the task to which it belongs. 4324 /// 4325 /// This exists to minimize table lookups and the necessity to pass stores around mutably 4326 /// for the common case of identifying the task to which a thread belongs. 4327 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4328 pub(crate) struct QualifiedThreadId { 4329 task: TableId<GuestTask>, 4330 thread: TableId<GuestThread>, 4331 } 4332 4333 impl QualifiedThreadId { 4334 fn qualify( 4335 state: &mut ConcurrentState, 4336 thread: TableId<GuestThread>, 4337 ) -> Result<QualifiedThreadId> { 4338 Ok(QualifiedThreadId { 4339 task: state.get_mut(thread)?.parent_task, 4340 thread, 4341 }) 4342 } 4343 } 4344 4345 impl fmt::Debug for QualifiedThreadId { 4346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 4347 f.debug_tuple("QualifiedThreadId") 4348 .field(&self.task.rep()) 4349 .field(&self.thread.rep()) 4350 .finish() 4351 } 4352 } 4353 4354 enum GuestThreadState { 4355 NotStartedImplicit, 4356 NotStartedExplicit( 4357 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>, 4358 ), 4359 Running, 4360 Suspended(StoreFiber<'static>), 4361 Ready(StoreFiber<'static>), 4362 Completed, 4363 } 4364 pub struct GuestThread { 4365 /// Context-local state used to implement the `context.{get,set}` 4366 /// intrinsics. 4367 context: [u32; 2], 4368 /// The owning guest task. 4369 parent_task: TableId<GuestTask>, 4370 /// If present, indicates that the thread is currently waiting on the 4371 /// specified set but may be cancelled and woken immediately. 4372 wake_on_cancel: Option<TableId<WaitableSet>>, 4373 /// The execution state of this guest thread 4374 state: GuestThreadState, 4375 /// The index of this thread in the component instance's handle table. 4376 /// This must always be `Some` after initialization. 4377 instance_rep: Option<u32>, 4378 /// Scratch waitable set used to watch subtasks during synchronous calls. 4379 sync_call_set: TableId<WaitableSet>, 4380 } 4381 4382 impl GuestThread { 4383 /// Retrieve the `GuestThread` corresponding to the specified guest-visible 4384 /// handle. 4385 fn from_instance( 4386 state: Pin<&mut ComponentInstance>, 4387 caller_instance: RuntimeComponentInstanceIndex, 4388 guest_thread: u32, 4389 ) -> Result<TableId<Self>> { 4390 let rep = state.instance_states().0[caller_instance] 4391 .thread_handle_table() 4392 .guest_thread_rep(guest_thread)?; 4393 Ok(TableId::new(rep)) 4394 } 4395 4396 fn new_implicit(state: &mut ConcurrentState, parent_task: TableId<GuestTask>) -> Result<Self> { 4397 let sync_call_set = state.push(WaitableSet::default())?; 4398 Ok(Self { 4399 context: [0; 2], 4400 parent_task, 4401 wake_on_cancel: None, 4402 state: GuestThreadState::NotStartedImplicit, 4403 instance_rep: None, 4404 sync_call_set, 4405 }) 4406 } 4407 4408 fn new_explicit( 4409 state: &mut ConcurrentState, 4410 parent_task: TableId<GuestTask>, 4411 start_func: Box< 4412 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, 4413 >, 4414 ) -> Result<Self> { 4415 let sync_call_set = state.push(WaitableSet::default())?; 4416 Ok(Self { 4417 context: [0; 2], 4418 parent_task, 4419 wake_on_cancel: None, 4420 state: GuestThreadState::NotStartedExplicit(start_func), 4421 instance_rep: None, 4422 sync_call_set, 4423 }) 4424 } 4425 } 4426 4427 impl TableDebug for GuestThread { 4428 fn type_name() -> &'static str { 4429 "GuestThread" 4430 } 4431 } 4432 4433 enum SyncResult { 4434 NotProduced, 4435 Produced(Option<ValRaw>), 4436 Taken, 4437 } 4438 4439 impl SyncResult { 4440 fn take(&mut self) -> Result<Option<Option<ValRaw>>> { 4441 Ok(match mem::replace(self, SyncResult::Taken) { 4442 SyncResult::NotProduced => None, 4443 SyncResult::Produced(val) => Some(val), 4444 SyncResult::Taken => { 4445 bail_bug!("attempted to take a synchronous result that was already taken") 4446 } 4447 }) 4448 } 4449 } 4450 4451 #[derive(Debug)] 4452 enum HostFutureState { 4453 NotApplicable, 4454 Live, 4455 Dropped, 4456 } 4457 4458 /// Represents a pending guest task. 4459 pub(crate) struct GuestTask { 4460 /// See `WaitableCommon` 4461 common: WaitableCommon, 4462 /// Closure to lower the parameters passed to this task. 4463 lower_params: Option<RawLower>, 4464 /// See `LiftResult` 4465 lift_result: Option<LiftResult>, 4466 /// A place to stash the type-erased lifted result if it can't be delivered 4467 /// immediately. 4468 result: Option<LiftedResult>, 4469 /// Closure to call the callback function for an async-lifted export, if 4470 /// provided. 4471 callback: Option<CallbackFn>, 4472 /// See `Caller` 4473 caller: Caller, 4474 /// Borrow state for this task. 4475 /// 4476 /// Keeps track of `borrow<T>` received to this task to ensure that 4477 /// everything is dropped by the time it exits. 4478 call_context: CallContext, 4479 /// A place to stash the lowered result for a sync-to-async call until it 4480 /// can be returned to the caller. 4481 sync_result: SyncResult, 4482 /// Whether or not the task has been cancelled (i.e. whether the task is 4483 /// permitted to call `task.cancel`). 4484 cancel_sent: bool, 4485 /// Whether or not we've sent a `Status::Starting` event to any current or 4486 /// future waiters for this waitable. 4487 starting_sent: bool, 4488 /// The runtime instance to which the exported function for this guest task 4489 /// belongs. 4490 /// 4491 /// Note that the task may do a sync->sync call via a fused adapter which 4492 /// results in that task executing code in a different instance, and it may 4493 /// call host functions and intrinsics from that other instance. 4494 instance: RuntimeInstance, 4495 /// If present, a pending `Event::None` or `Event::Cancelled` to be 4496 /// delivered to this task. 4497 event: Option<Event>, 4498 /// Whether or not the task has exited. 4499 exited: bool, 4500 /// Threads belonging to this task 4501 threads: HashSet<TableId<GuestThread>>, 4502 /// The state of the host future that represents an async task, which must 4503 /// be dropped before we can delete the task. 4504 host_future_state: HostFutureState, 4505 /// Indicates whether this task was created for a call to an async-lifted 4506 /// export. 4507 async_function: bool, 4508 } 4509 4510 impl GuestTask { 4511 fn already_lowered_parameters(&self) -> bool { 4512 // We reset `lower_params` after we lower the parameters 4513 self.lower_params.is_none() 4514 } 4515 4516 fn returned_or_cancelled(&self) -> bool { 4517 // We reset `lift_result` after we return or exit 4518 self.lift_result.is_none() 4519 } 4520 4521 fn ready_to_delete(&self) -> bool { 4522 let threads_completed = self.threads.is_empty(); 4523 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); 4524 let pending_completion_event = matches!( 4525 self.common.event, 4526 Some(Event::Subtask { 4527 status: Status::Returned | Status::ReturnCancelled 4528 }) 4529 ); 4530 let ready = threads_completed 4531 && !has_sync_result 4532 && !pending_completion_event 4533 && !matches!(self.host_future_state, HostFutureState::Live); 4534 log::trace!( 4535 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", 4536 threads_completed, 4537 has_sync_result, 4538 pending_completion_event, 4539 self.host_future_state 4540 ); 4541 ready 4542 } 4543 4544 fn new( 4545 lower_params: RawLower, 4546 lift_result: LiftResult, 4547 caller: Caller, 4548 callback: Option<CallbackFn>, 4549 instance: RuntimeInstance, 4550 async_function: bool, 4551 ) -> Result<Self> { 4552 let host_future_state = match &caller { 4553 Caller::Guest { .. } => HostFutureState::NotApplicable, 4554 Caller::Host { 4555 host_future_present, 4556 .. 4557 } => { 4558 if *host_future_present { 4559 HostFutureState::Live 4560 } else { 4561 HostFutureState::NotApplicable 4562 } 4563 } 4564 }; 4565 Ok(Self { 4566 common: WaitableCommon::default(), 4567 lower_params: Some(lower_params), 4568 lift_result: Some(lift_result), 4569 result: None, 4570 callback, 4571 caller, 4572 call_context: CallContext::default(), 4573 sync_result: SyncResult::NotProduced, 4574 cancel_sent: false, 4575 starting_sent: false, 4576 instance, 4577 event: None, 4578 exited: false, 4579 threads: HashSet::new(), 4580 host_future_state, 4581 async_function, 4582 }) 4583 } 4584 4585 /// Dispose of this guest task. 4586 fn dispose(self, _state: &mut ConcurrentState) -> Result<()> { 4587 assert!(self.threads.is_empty()); 4588 Ok(()) 4589 } 4590 } 4591 4592 impl TableDebug for GuestTask { 4593 fn type_name() -> &'static str { 4594 "GuestTask" 4595 } 4596 } 4597 4598 /// Represents state common to all kinds of waitables. 4599 #[derive(Default)] 4600 struct WaitableCommon { 4601 /// The currently pending event for this waitable, if any. 4602 event: Option<Event>, 4603 /// The set to which this waitable belongs, if any. 4604 set: Option<TableId<WaitableSet>>, 4605 /// The handle with which the guest refers to this waitable, if any. 4606 handle: Option<u32>, 4607 } 4608 4609 /// Represents a Component Model Async `waitable`. 4610 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4611 enum Waitable { 4612 /// A host task 4613 Host(TableId<HostTask>), 4614 /// A guest task 4615 Guest(TableId<GuestTask>), 4616 /// The read or write end of a stream or future 4617 Transmit(TableId<TransmitHandle>), 4618 } 4619 4620 impl Waitable { 4621 /// Retrieve the `Waitable` corresponding to the specified guest-visible 4622 /// handle. 4623 fn from_instance( 4624 state: Pin<&mut ComponentInstance>, 4625 caller_instance: RuntimeComponentInstanceIndex, 4626 waitable: u32, 4627 ) -> Result<Self> { 4628 use crate::runtime::vm::component::Waitable; 4629 4630 let (waitable, kind) = state.instance_states().0[caller_instance] 4631 .handle_table() 4632 .waitable_rep(waitable)?; 4633 4634 Ok(match kind { 4635 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), 4636 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), 4637 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), 4638 }) 4639 } 4640 4641 /// Retrieve the host-visible identifier for this `Waitable`. 4642 fn rep(&self) -> u32 { 4643 match self { 4644 Self::Host(id) => id.rep(), 4645 Self::Guest(id) => id.rep(), 4646 Self::Transmit(id) => id.rep(), 4647 } 4648 } 4649 4650 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or 4651 /// remove it from any set it may currently belong to (when `set` is 4652 /// `None`). 4653 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> { 4654 log::trace!("waitable {self:?} join set {set:?}",); 4655 4656 let old = mem::replace(&mut self.common(state)?.set, set); 4657 4658 if let Some(old) = old { 4659 match *self { 4660 Waitable::Host(id) => state.remove_child(id, old), 4661 Waitable::Guest(id) => state.remove_child(id, old), 4662 Waitable::Transmit(id) => state.remove_child(id, old), 4663 }?; 4664 4665 state.get_mut(old)?.ready.remove(self); 4666 } 4667 4668 if let Some(set) = set { 4669 match *self { 4670 Waitable::Host(id) => state.add_child(id, set), 4671 Waitable::Guest(id) => state.add_child(id, set), 4672 Waitable::Transmit(id) => state.add_child(id, set), 4673 }?; 4674 4675 if self.common(state)?.event.is_some() { 4676 self.mark_ready(state)?; 4677 } 4678 } 4679 4680 Ok(()) 4681 } 4682 4683 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. 4684 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { 4685 Ok(match self { 4686 Self::Host(id) => &mut state.get_mut(*id)?.common, 4687 Self::Guest(id) => &mut state.get_mut(*id)?.common, 4688 Self::Transmit(id) => &mut state.get_mut(*id)?.common, 4689 }) 4690 } 4691 4692 /// Set or clear the pending event for this waitable and either deliver it 4693 /// to the first waiter, if any, or mark it as ready to be delivered to the 4694 /// next waiter that arrives. 4695 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> { 4696 log::trace!("set event for {self:?}: {event:?}"); 4697 self.common(state)?.event = event; 4698 self.mark_ready(state) 4699 } 4700 4701 /// Take the pending event from this waitable, leaving `None` in its place. 4702 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> { 4703 let common = self.common(state)?; 4704 let event = common.event.take(); 4705 if let Some(set) = self.common(state)?.set { 4706 state.get_mut(set)?.ready.remove(self); 4707 } 4708 4709 Ok(event) 4710 } 4711 4712 /// Deliver the current event for this waitable to the first waiter, if any, 4713 /// or else mark it as ready to be delivered to the next waiter that 4714 /// arrives. 4715 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { 4716 if let Some(set) = self.common(state)?.set { 4717 state.get_mut(set)?.ready.insert(*self); 4718 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { 4719 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); 4720 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); 4721 4722 let item = match mode { 4723 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 4724 WaitMode::Callback(instance) => WorkItem::GuestCall( 4725 state.get_mut(thread.task)?.instance.index, 4726 GuestCall { 4727 thread, 4728 kind: GuestCallKind::DeliverEvent { 4729 instance, 4730 set: Some(set), 4731 }, 4732 }, 4733 ), 4734 }; 4735 state.push_high_priority(item); 4736 } 4737 } 4738 Ok(()) 4739 } 4740 4741 /// Remove this waitable from the instance's rep table. 4742 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { 4743 match self { 4744 Self::Host(task) => { 4745 log::trace!("delete host task {task:?}"); 4746 state.delete(*task)?; 4747 } 4748 Self::Guest(task) => { 4749 log::trace!("delete guest task {task:?}"); 4750 state.delete(*task)?.dispose(state)?; 4751 } 4752 Self::Transmit(task) => { 4753 state.delete(*task)?; 4754 } 4755 } 4756 4757 Ok(()) 4758 } 4759 } 4760 4761 impl fmt::Debug for Waitable { 4762 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 4763 match self { 4764 Self::Host(id) => write!(f, "{id:?}"), 4765 Self::Guest(id) => write!(f, "{id:?}"), 4766 Self::Transmit(id) => write!(f, "{id:?}"), 4767 } 4768 } 4769 } 4770 4771 /// Represents a Component Model Async `waitable-set`. 4772 #[derive(Default)] 4773 struct WaitableSet { 4774 /// Which waitables in this set have pending events, if any. 4775 ready: BTreeSet<Waitable>, 4776 /// Which guest threads are currently waiting on this set, if any. 4777 waiting: BTreeMap<QualifiedThreadId, WaitMode>, 4778 } 4779 4780 impl TableDebug for WaitableSet { 4781 fn type_name() -> &'static str { 4782 "WaitableSet" 4783 } 4784 } 4785 4786 /// Type-erased closure to lower the parameters for a guest task. 4787 type RawLower = 4788 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>; 4789 4790 /// Type-erased closure to lift the result for a guest task. 4791 type RawLift = Box< 4792 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4793 >; 4794 4795 /// Type erased result of a guest task which may be downcast to the expected 4796 /// type by a host caller (or simply ignored in the case of a guest caller; see 4797 /// `DummyResult`). 4798 type LiftedResult = Box<dyn Any + Send + Sync>; 4799 4800 /// Used to return a result from a `LiftFn` when the actual result has already 4801 /// been lowered to a guest task's stack and linear memory. 4802 struct DummyResult; 4803 4804 /// Represents the Component Model Async state of a (sub-)component instance. 4805 #[derive(Default)] 4806 pub struct ConcurrentInstanceState { 4807 /// Whether backpressure is set for this instance (enabled if >0) 4808 backpressure: u16, 4809 /// Whether this instance can be entered 4810 do_not_enter: bool, 4811 /// Pending calls for this instance which require `Self::backpressure` to be 4812 /// `true` and/or `Self::do_not_enter` to be false before they can proceed. 4813 pending: BTreeMap<QualifiedThreadId, GuestCallKind>, 4814 } 4815 4816 impl ConcurrentInstanceState { 4817 pub fn pending_is_empty(&self) -> bool { 4818 self.pending.is_empty() 4819 } 4820 } 4821 4822 #[derive(Debug, Copy, Clone)] 4823 pub(crate) enum CurrentThread { 4824 Guest(QualifiedThreadId), 4825 Host(TableId<HostTask>), 4826 None, 4827 } 4828 4829 impl CurrentThread { 4830 fn guest(&self) -> Option<&QualifiedThreadId> { 4831 match self { 4832 Self::Guest(id) => Some(id), 4833 _ => None, 4834 } 4835 } 4836 4837 fn host(&self) -> Option<TableId<HostTask>> { 4838 match self { 4839 Self::Host(id) => Some(*id), 4840 _ => None, 4841 } 4842 } 4843 4844 fn is_none(&self) -> bool { 4845 matches!(self, Self::None) 4846 } 4847 } 4848 4849 impl From<QualifiedThreadId> for CurrentThread { 4850 fn from(id: QualifiedThreadId) -> Self { 4851 Self::Guest(id) 4852 } 4853 } 4854 4855 impl From<TableId<HostTask>> for CurrentThread { 4856 fn from(id: TableId<HostTask>) -> Self { 4857 Self::Host(id) 4858 } 4859 } 4860 4861 /// Represents the Component Model Async state of a store. 4862 pub struct ConcurrentState { 4863 /// The currently running thread, if any. 4864 current_thread: CurrentThread, 4865 4866 /// The set of pending host and background tasks, if any. 4867 /// 4868 /// See `ComponentInstance::poll_until` for where we temporarily take this 4869 /// out, poll it, then put it back to avoid any mutable aliasing hazards. 4870 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>, 4871 /// The table of waitables, waitable sets, etc. 4872 table: AlwaysMut<ResourceTable>, 4873 /// The "high priority" work queue for this store's event loop. 4874 high_priority: Vec<WorkItem>, 4875 /// The "low priority" work queue for this store's event loop. 4876 low_priority: VecDeque<WorkItem>, 4877 /// A place to stash the reason a fiber is suspending so that the code which 4878 /// resumed it will know under what conditions the fiber should be resumed 4879 /// again. 4880 suspend_reason: Option<SuspendReason>, 4881 /// A cached fiber which is waiting for work to do. 4882 /// 4883 /// This helps us avoid creating a new fiber for each `GuestCall` work item. 4884 worker: Option<StoreFiber<'static>>, 4885 /// A place to stash the work item for which we're resuming a worker fiber. 4886 worker_item: Option<WorkerItem>, 4887 4888 /// Reference counts for all component error contexts 4889 /// 4890 /// NOTE: it is possible the global ref count to be *greater* than the sum of 4891 /// (sub)component ref counts as tracked by `error_context_tables`, for 4892 /// example when the host holds one or more references to error contexts. 4893 /// 4894 /// The key of this primary map is often referred to as the "rep" (i.e. host-side 4895 /// component-wide representation) of the index into concurrent state for a given 4896 /// stored `ErrorContext`. 4897 /// 4898 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same 4899 /// as a `TableId<ErrorContextState>`. 4900 global_error_context_ref_counts: 4901 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>, 4902 } 4903 4904 impl Default for ConcurrentState { 4905 fn default() -> Self { 4906 Self { 4907 current_thread: CurrentThread::None, 4908 table: AlwaysMut::new(ResourceTable::new()), 4909 futures: AlwaysMut::new(Some(FuturesUnordered::new())), 4910 high_priority: Vec::new(), 4911 low_priority: VecDeque::new(), 4912 suspend_reason: None, 4913 worker: None, 4914 worker_item: None, 4915 global_error_context_ref_counts: BTreeMap::new(), 4916 } 4917 } 4918 } 4919 4920 impl ConcurrentState { 4921 /// Take ownership of any fibers and futures owned by this object. 4922 /// 4923 /// This should be used when disposing of the `Store` containing this object 4924 /// in order to gracefully resolve any and all fibers using 4925 /// `StoreFiber::dispose`. This is necessary to avoid possible 4926 /// use-after-free bugs due to fibers which may still have access to the 4927 /// `Store`. 4928 /// 4929 /// Additionally, the futures collected with this function should be dropped 4930 /// within a `tls::set` call, which will ensure than any futures closing 4931 /// over an `&Accessor` will have access to the store when dropped, allowing 4932 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without 4933 /// panicking. 4934 /// 4935 /// Note that this will leave the object in an inconsistent and unusable 4936 /// state, so it should only be used just prior to dropping it. 4937 pub(crate) fn take_fibers_and_futures( 4938 &mut self, 4939 fibers: &mut Vec<StoreFiber<'static>>, 4940 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, 4941 ) { 4942 for entry in self.table.get_mut().iter_mut() { 4943 if let Some(set) = entry.downcast_mut::<WaitableSet>() { 4944 for mode in mem::take(&mut set.waiting).into_values() { 4945 if let WaitMode::Fiber(fiber) = mode { 4946 fibers.push(fiber); 4947 } 4948 } 4949 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() { 4950 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) = 4951 mem::replace(&mut thread.state, GuestThreadState::Completed) 4952 { 4953 fibers.push(fiber); 4954 } 4955 } 4956 } 4957 4958 if let Some(fiber) = self.worker.take() { 4959 fibers.push(fiber); 4960 } 4961 4962 let mut handle_item = |item| match item { 4963 WorkItem::ResumeFiber(fiber) => { 4964 fibers.push(fiber); 4965 } 4966 WorkItem::PushFuture(future) => { 4967 self.futures 4968 .get_mut() 4969 .as_mut() 4970 .unwrap() 4971 .push(future.into_inner()); 4972 } 4973 WorkItem::ResumeThread(..) | WorkItem::GuestCall(..) | WorkItem::WorkerFunction(..) => { 4974 } 4975 }; 4976 4977 for item in mem::take(&mut self.high_priority) { 4978 handle_item(item); 4979 } 4980 for item in mem::take(&mut self.low_priority) { 4981 handle_item(item); 4982 } 4983 4984 if let Some(them) = self.futures.get_mut().take() { 4985 futures.push(them); 4986 } 4987 } 4988 4989 /// Collect the next set of work items to run. This will be either all 4990 /// high-priority items, or a single low-priority item if there are no 4991 /// high-priority items. 4992 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> { 4993 let mut ready = mem::take(&mut self.high_priority); 4994 if ready.is_empty() { 4995 if let Some(item) = self.low_priority.pop_back() { 4996 ready.push(item); 4997 } 4998 } 4999 ready 5000 } 5001 5002 fn push<V: Send + Sync + 'static>( 5003 &mut self, 5004 value: V, 5005 ) -> Result<TableId<V>, ResourceTableError> { 5006 self.table.get_mut().push(value).map(TableId::from) 5007 } 5008 5009 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> { 5010 self.table.get_mut().get_mut(&Resource::from(id)) 5011 } 5012 5013 pub fn add_child<T: 'static, U: 'static>( 5014 &mut self, 5015 child: TableId<T>, 5016 parent: TableId<U>, 5017 ) -> Result<(), ResourceTableError> { 5018 self.table 5019 .get_mut() 5020 .add_child(Resource::from(child), Resource::from(parent)) 5021 } 5022 5023 pub fn remove_child<T: 'static, U: 'static>( 5024 &mut self, 5025 child: TableId<T>, 5026 parent: TableId<U>, 5027 ) -> Result<(), ResourceTableError> { 5028 self.table 5029 .get_mut() 5030 .remove_child(Resource::from(child), Resource::from(parent)) 5031 } 5032 5033 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> { 5034 self.table.get_mut().delete(Resource::from(id)) 5035 } 5036 5037 fn push_future(&mut self, future: HostTaskFuture) { 5038 // Note that we can't directly push to `ConcurrentState::futures` here 5039 // since this may be called from a future that's being polled inside 5040 // `Self::poll_until`, which temporarily removes the `FuturesUnordered` 5041 // so it has exclusive access while polling it. Therefore, we push a 5042 // work item to the "high priority" queue, which will actually push to 5043 // `ConcurrentState::futures` later. 5044 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); 5045 } 5046 5047 fn push_high_priority(&mut self, item: WorkItem) { 5048 log::trace!("push high priority: {item:?}"); 5049 self.high_priority.push(item); 5050 } 5051 5052 fn push_low_priority(&mut self, item: WorkItem) { 5053 log::trace!("push low priority: {item:?}"); 5054 self.low_priority.push_front(item); 5055 } 5056 5057 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { 5058 if high_priority { 5059 self.push_high_priority(item); 5060 } else { 5061 self.push_low_priority(item); 5062 } 5063 } 5064 5065 fn promote_instance_local_thread_work_item( 5066 &mut self, 5067 current_instance: RuntimeComponentInstanceIndex, 5068 ) -> bool { 5069 self.promote_work_items_matching(|item: &WorkItem| match item { 5070 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => { 5071 *instance == current_instance 5072 } 5073 _ => false, 5074 }) 5075 } 5076 5077 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool { 5078 self.promote_work_items_matching(|item: &WorkItem| match item { 5079 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => { 5080 *t == thread 5081 } 5082 _ => false, 5083 }) 5084 } 5085 5086 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool 5087 where 5088 F: FnMut(&WorkItem) -> bool, 5089 { 5090 // If there's a high-priority work item to resume the current guest thread, 5091 // we don't need to promote anything, but we return true to indicate that 5092 // work is pending for the current instance. 5093 if self.high_priority.iter().any(&mut predicate) { 5094 true 5095 } 5096 // Otherwise, look for a low-priority work item that matches the current 5097 // instance and promote it to high-priority. 5098 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) { 5099 let item = self.low_priority.remove(idx).unwrap(); 5100 self.push_high_priority(item); 5101 true 5102 } else { 5103 false 5104 } 5105 } 5106 5107 /// Implements the `context.get` intrinsic. 5108 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> { 5109 let thread = self.current_guest_thread()?; 5110 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot)?]; 5111 log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); 5112 Ok(val) 5113 } 5114 5115 /// Implements the `context.set` intrinsic. 5116 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { 5117 let thread = self.current_guest_thread()?; 5118 log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); 5119 self.get_mut(thread.thread)?.context[usize::try_from(slot)?] = val; 5120 Ok(()) 5121 } 5122 5123 /// Returns whether there's a pending cancellation on the current guest thread, 5124 /// consuming the event if so. 5125 fn take_pending_cancellation(&mut self) -> Result<bool> { 5126 let thread = self.current_guest_thread()?; 5127 if let Some(event) = self.get_mut(thread.task)?.event.take() { 5128 assert!(matches!(event, Event::Cancelled)); 5129 Ok(true) 5130 } else { 5131 Ok(false) 5132 } 5133 } 5134 5135 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> { 5136 if self.may_block(task)? { 5137 Ok(()) 5138 } else { 5139 Err(Trap::CannotBlockSyncTask.into()) 5140 } 5141 } 5142 5143 fn may_block(&mut self, task: TableId<GuestTask>) -> Result<bool> { 5144 let task = self.get_mut(task)?; 5145 Ok(task.async_function || task.returned_or_cancelled()) 5146 } 5147 5148 /// Used by `ResourceTables` to acquire the current `CallContext` for the 5149 /// specified task. 5150 /// 5151 /// The `task` is bit-packed as returned by `current_call_context_scope_id` 5152 /// below. 5153 pub fn call_context(&mut self, task: u32) -> &mut CallContext { 5154 let (task, is_host) = (task >> 1, task & 1 == 1); 5155 if is_host { 5156 let task: TableId<HostTask> = TableId::new(task); 5157 &mut self.get_mut(task).unwrap().call_context 5158 } else { 5159 let task: TableId<GuestTask> = TableId::new(task); 5160 &mut self.get_mut(task).unwrap().call_context 5161 } 5162 } 5163 5164 /// Used by `ResourceTables` to record the scope of a borrow to get undone 5165 /// in the future. 5166 pub fn current_call_context_scope_id(&self) -> u32 { 5167 let (bits, is_host) = match self.current_thread { 5168 CurrentThread::Guest(id) => (id.task.rep(), false), 5169 CurrentThread::Host(id) => (id.rep(), true), 5170 CurrentThread::None => unreachable!(), 5171 }; 5172 assert_eq!((bits << 1) >> 1, bits); 5173 (bits << 1) | u32::from(is_host) 5174 } 5175 5176 fn current_guest_thread(&self) -> Result<QualifiedThreadId> { 5177 match self.current_thread.guest() { 5178 Some(id) => Ok(*id), 5179 None => bail_bug!("current thread is not a guest thread"), 5180 } 5181 } 5182 5183 fn current_host_thread(&self) -> Result<TableId<HostTask>> { 5184 match self.current_thread.host() { 5185 Some(id) => Ok(id), 5186 None => bail_bug!("current thread is not a host thread"), 5187 } 5188 } 5189 5190 fn futures_mut(&mut self) -> Result<&mut FuturesUnordered<HostTaskFuture>> { 5191 match self.futures.get_mut().as_mut() { 5192 Some(f) => Ok(f), 5193 None => bail_bug!("futures field of concurrent state is currently taken"), 5194 } 5195 } 5196 5197 pub(crate) fn table(&mut self) -> &mut ResourceTable { 5198 self.table.get_mut() 5199 } 5200 } 5201 5202 /// Provide a type hint to compiler about the shape of a parameter lower 5203 /// closure. 5204 fn for_any_lower< 5205 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, 5206 >( 5207 fun: F, 5208 ) -> F { 5209 fun 5210 } 5211 5212 /// Provide a type hint to compiler about the shape of a result lift closure. 5213 fn for_any_lift< 5214 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 5215 >( 5216 fun: F, 5217 ) -> F { 5218 fun 5219 } 5220 5221 /// Wrap the specified future in a `poll_fn` which asserts that the future is 5222 /// only polled from the event loop of the specified `Store`. 5223 /// 5224 /// See `StoreContextMut::run_concurrent` for details. 5225 fn checked<F: Future + Send + 'static>( 5226 id: StoreId, 5227 fut: F, 5228 ) -> impl Future<Output = F::Output> + Send + 'static { 5229 async move { 5230 let mut fut = pin!(fut); 5231 future::poll_fn(move |cx| { 5232 let message = "\ 5233 `Future`s which depend on asynchronous component tasks, streams, or \ 5234 futures to complete may only be polled from the event loop of the \ 5235 store to which they belong. Please use \ 5236 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ 5237 "; 5238 tls::try_get(|store| { 5239 let matched = match store { 5240 tls::TryGet::Some(store) => store.id() == id, 5241 tls::TryGet::Taken | tls::TryGet::None => false, 5242 }; 5243 5244 if !matched { 5245 panic!("{message}") 5246 } 5247 }); 5248 fut.as_mut().poll(cx) 5249 }) 5250 .await 5251 } 5252 } 5253 5254 /// Assert that `StoreContextMut::run_concurrent` has not been called from 5255 /// within an store's event loop. 5256 fn check_recursive_run() { 5257 tls::try_get(|store| { 5258 if !matches!(store, tls::TryGet::None) { 5259 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") 5260 } 5261 }); 5262 } 5263 5264 fn unpack_callback_code(code: u32) -> (u32, u32) { 5265 (code & 0xF, code >> 4) 5266 } 5267 5268 /// Helper struct for packaging parameters to be passed to 5269 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or 5270 /// `waitable-set.poll`. 5271 struct WaitableCheckParams { 5272 set: TableId<WaitableSet>, 5273 options: OptionsIndex, 5274 payload: u32, 5275 } 5276 5277 /// Indicates whether `ComponentInstance::waitable_check` is being called for 5278 /// `waitable-set.wait` or `waitable-set.poll`. 5279 enum WaitableCheck { 5280 Wait, 5281 Poll, 5282 } 5283 5284 /// Represents a guest task called from the host, prepared using `prepare_call`. 5285 pub(crate) struct PreparedCall<R> { 5286 /// The guest export to be called 5287 handle: Func, 5288 /// The guest thread created by `prepare_call` 5289 thread: QualifiedThreadId, 5290 /// The number of lowered core Wasm parameters to pass to the call. 5291 param_count: usize, 5292 /// The `oneshot::Receiver` to which the result of the call will be 5293 /// delivered when it is available. 5294 rx: oneshot::Receiver<LiftedResult>, 5295 _phantom: PhantomData<R>, 5296 } 5297 5298 impl<R> PreparedCall<R> { 5299 /// Get a copy of the `TaskId` for this `PreparedCall`. 5300 pub(crate) fn task_id(&self) -> TaskId { 5301 TaskId { 5302 task: self.thread.task, 5303 } 5304 } 5305 } 5306 5307 /// Represents a task created by `prepare_call`. 5308 pub(crate) struct TaskId { 5309 task: TableId<GuestTask>, 5310 } 5311 5312 impl TaskId { 5313 /// The host future for an async task was dropped. If the parameters have not been lowered yet, 5314 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, 5315 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended 5316 /// and can be resumed by other tasks for this component, so we mark the future as dropped 5317 /// and delete the task when all threads are done. 5318 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> { 5319 let task = store.0.concurrent_state_mut().get_mut(self.task)?; 5320 if !task.already_lowered_parameters() { 5321 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5322 } else { 5323 task.host_future_state = HostFutureState::Dropped; 5324 if task.ready_to_delete() { 5325 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5326 } 5327 } 5328 Ok(()) 5329 } 5330 } 5331 5332 /// Prepare a call to the specified exported Wasm function, providing functions 5333 /// for lowering the parameters and lifting the result. 5334 /// 5335 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event 5336 /// loop, use `queue_call`. 5337 pub(crate) fn prepare_call<T, R>( 5338 mut store: StoreContextMut<T>, 5339 handle: Func, 5340 param_count: usize, 5341 host_future_present: bool, 5342 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> 5343 + Send 5344 + Sync 5345 + 'static, 5346 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> 5347 + Send 5348 + Sync 5349 + 'static, 5350 ) -> Result<PreparedCall<R>> { 5351 let (options, _flags, ty, raw_options) = handle.abi_info(store.0); 5352 5353 let instance = handle.instance().id().get(store.0); 5354 let options = &instance.component().env_component().options[options]; 5355 let ty = &instance.component().types()[ty]; 5356 let async_function = ty.async_; 5357 let task_return_type = ty.results; 5358 let component_instance = raw_options.instance; 5359 let callback = options.callback.map(|i| instance.runtime_callback(i)); 5360 let memory = options 5361 .memory() 5362 .map(|i| instance.runtime_memory(i)) 5363 .map(SendSyncPtr::new); 5364 let string_encoding = options.string_encoding; 5365 let token = StoreToken::new(store.as_context_mut()); 5366 let state = store.0.concurrent_state_mut(); 5367 5368 let (tx, rx) = oneshot::channel(); 5369 5370 let instance = RuntimeInstance { 5371 instance: handle.instance().id().instance(), 5372 index: component_instance, 5373 }; 5374 let caller = state.current_thread; 5375 let task = GuestTask::new( 5376 Box::new(for_any_lower(move |store, params| { 5377 lower_params(handle, token.as_context_mut(store), params) 5378 })), 5379 LiftResult { 5380 lift: Box::new(for_any_lift(move |store, result| { 5381 lift_result(handle, store, result) 5382 })), 5383 ty: task_return_type, 5384 memory, 5385 string_encoding, 5386 }, 5387 Caller::Host { 5388 tx: Some(tx), 5389 host_future_present, 5390 caller, 5391 }, 5392 callback.map(|callback| { 5393 let callback = SendSyncPtr::new(callback); 5394 let instance = handle.instance(); 5395 Box::new(move |store: &mut dyn VMStore, event, handle| { 5396 let store = token.as_context_mut(store); 5397 // SAFETY: Per the contract of `prepare_call`, the callback 5398 // will remain valid at least as long is this task exists. 5399 unsafe { instance.call_callback(store, callback, event, handle) } 5400 }) as CallbackFn 5401 }), 5402 instance, 5403 async_function, 5404 )?; 5405 5406 let task = state.push(task)?; 5407 let new_thread = GuestThread::new_implicit(state, task)?; 5408 let thread = state.push(new_thread)?; 5409 state.get_mut(task)?.threads.insert(thread); 5410 5411 if !store.0.may_enter(instance)? { 5412 bail!(Trap::CannotEnterComponent); 5413 } 5414 5415 Ok(PreparedCall { 5416 handle, 5417 thread: QualifiedThreadId { task, thread }, 5418 param_count, 5419 rx, 5420 _phantom: PhantomData, 5421 }) 5422 } 5423 5424 /// Queue a call previously prepared using `prepare_call` to be run as part of 5425 /// the associated `ComponentInstance`'s event loop. 5426 /// 5427 /// The returned future will resolve to the result once it is available, but 5428 /// must only be polled via the instance's event loop. See 5429 /// `StoreContextMut::run_concurrent` for details. 5430 pub(crate) fn queue_call<T: 'static, R: Send + 'static>( 5431 mut store: StoreContextMut<T>, 5432 prepared: PreparedCall<R>, 5433 ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> { 5434 let PreparedCall { 5435 handle, 5436 thread, 5437 param_count, 5438 rx, 5439 .. 5440 } = prepared; 5441 5442 queue_call0(store.as_context_mut(), handle, thread, param_count)?; 5443 5444 Ok(checked( 5445 store.0.id(), 5446 rx.map(move |result| match result { 5447 Ok(r) => match r.downcast() { 5448 Ok(r) => Ok(*r), 5449 Err(_) => bail_bug!("wrong type of value produced"), 5450 }, 5451 Err(e) => Err(e.into()), 5452 }), 5453 )) 5454 } 5455 5456 /// Queue a call previously prepared using `prepare_call` to be run as part of 5457 /// the associated `ComponentInstance`'s event loop. 5458 fn queue_call0<T: 'static>( 5459 store: StoreContextMut<T>, 5460 handle: Func, 5461 guest_thread: QualifiedThreadId, 5462 param_count: usize, 5463 ) -> Result<()> { 5464 let (_options, _, _ty, raw_options) = handle.abi_info(store.0); 5465 let is_concurrent = raw_options.async_; 5466 let callback = raw_options.callback; 5467 let instance = handle.instance(); 5468 let callee = handle.lifted_core_func(store.0); 5469 let post_return = handle.post_return_core_func(store.0); 5470 let callback = callback.map(|i| { 5471 let instance = instance.id().get(store.0); 5472 SendSyncPtr::new(instance.runtime_callback(i)) 5473 }); 5474 5475 log::trace!("queueing call {guest_thread:?}"); 5476 5477 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers 5478 // (with signatures appropriate for this call) and will remain valid as 5479 // long as this instance is valid. 5480 unsafe { 5481 instance.queue_call( 5482 store, 5483 guest_thread, 5484 SendSyncPtr::new(callee), 5485 param_count, 5486 1, 5487 is_concurrent, 5488 callback, 5489 post_return.map(SendSyncPtr::new), 5490 ) 5491 } 5492 } 5493