1 //! Runtime support for the Component Model Async ABI. 2 //! 3 //! This module and its submodules provide host runtime support for Component 4 //! Model Async features such as async-lifted exports, async-lowered imports, 5 //! streams, futures, and related intrinsics. See [the Async 6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) 7 //! for a high-level overview. 8 //! 9 //! At the core of this support is an event loop which schedules and switches 10 //! between guest tasks and any host tasks they create. Each 11 //! `Store` will have at most one event loop running at any given 12 //! time, and that loop may be suspended and resumed by the host embedder using 13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` 14 //! function contains the loop itself, while the 15 //! `StoreOpaque::concurrent_state` field holds its state. 16 //! 17 //! # Public API Overview 18 //! 19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) 20 //! 21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an 22 //! async-lifted or sync-lifted import, creating a guest task. 23 //! 24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified 25 //! instance, allowing any and all tasks belonging to that instance to make 26 //! progress. 27 //! 28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop 29 //! for the specified instance. 30 //! 31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or 32 //! `stream` which may be passed to the guest. This takes a 33 //! `{Future,Stream}Producer` implementation which will be polled for items when 34 //! the consumer requests them. 35 //! 36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by 37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items 38 //! produced by the write end. 39 //! 40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) 41 //! 42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host 43 //! function with the linker. That function will take an `Accessor` as its 44 //! first parameter, which provides access to the store between (but not across) 45 //! await points. 46 //! 47 //! - `Accessor::with`: Access the store and its associated data. 48 //! 49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the 50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use 51 //! in host functions. 52 53 use crate::component::func::{self, Func, call_post_return}; 54 use crate::component::{ 55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance, 56 }; 57 use crate::fiber::{self, StoreFiber, StoreFiberYield}; 58 use crate::prelude::*; 59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; 60 use crate::vm::component::{CallContext, ComponentInstance, InstanceState}; 61 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; 62 use crate::{ 63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, 64 bail, error::format_err, 65 }; 66 use error_contexts::GlobalErrorContextRefCount; 67 use futures::channel::oneshot; 68 use futures::future::{self, FutureExt}; 69 use futures::stream::{FuturesUnordered, StreamExt}; 70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; 71 use std::any::Any; 72 use std::borrow::ToOwned; 73 use std::boxed::Box; 74 use std::cell::UnsafeCell; 75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; 76 use std::fmt; 77 use std::future::Future; 78 use std::marker::PhantomData; 79 use std::mem::{self, ManuallyDrop, MaybeUninit}; 80 use std::ops::DerefMut; 81 use std::pin::{Pin, pin}; 82 use std::ptr::{self, NonNull}; 83 use std::task::{Context, Poll, Waker}; 84 use std::vec::Vec; 85 use table::{TableDebug, TableId}; 86 use wasmtime_environ::Trap; 87 use wasmtime_environ::component::{ 88 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS, 89 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, 90 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, 91 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, 92 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, 93 }; 94 use wasmtime_environ::packed_option::ReservedValue; 95 96 pub use abort::JoinHandle; 97 pub use future_stream_any::{FutureAny, StreamAny}; 98 pub use futures_and_streams::{ 99 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, 100 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, 101 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, 102 }; 103 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; 104 105 mod abort; 106 mod error_contexts; 107 mod future_stream_any; 108 mod futures_and_streams; 109 pub(crate) mod table; 110 pub(crate) mod tls; 111 112 /// Constant defined in the Component Model spec to indicate that the async 113 /// intrinsic (e.g. `future.write`) has not yet completed. 114 const BLOCKED: u32 = 0xffff_ffff; 115 116 /// Corresponds to `CallState` in the upstream spec. 117 #[derive(Clone, Copy, Eq, PartialEq, Debug)] 118 pub enum Status { 119 Starting = 0, 120 Started = 1, 121 Returned = 2, 122 StartCancelled = 3, 123 ReturnCancelled = 4, 124 } 125 126 impl Status { 127 /// Packs this status and the optional `waitable` provided into a 32-bit 128 /// result that the canonical ABI requires. 129 /// 130 /// The low 4 bits are reserved for the status while the upper 28 bits are 131 /// the waitable, if present. 132 pub fn pack(self, waitable: Option<u32>) -> u32 { 133 assert!(matches!(self, Status::Returned) == waitable.is_none()); 134 let waitable = waitable.unwrap_or(0); 135 assert!(waitable < (1 << 28)); 136 (waitable << 4) | (self as u32) 137 } 138 } 139 140 /// Corresponds to `EventCode` in the Component Model spec, plus related payload 141 /// data. 142 #[derive(Clone, Copy, Debug)] 143 enum Event { 144 None, 145 Cancelled, 146 Subtask { 147 status: Status, 148 }, 149 StreamRead { 150 code: ReturnCode, 151 pending: Option<(TypeStreamTableIndex, u32)>, 152 }, 153 StreamWrite { 154 code: ReturnCode, 155 pending: Option<(TypeStreamTableIndex, u32)>, 156 }, 157 FutureRead { 158 code: ReturnCode, 159 pending: Option<(TypeFutureTableIndex, u32)>, 160 }, 161 FutureWrite { 162 code: ReturnCode, 163 pending: Option<(TypeFutureTableIndex, u32)>, 164 }, 165 } 166 167 impl Event { 168 /// Lower this event to core Wasm integers for delivery to the guest. 169 /// 170 /// Note that the waitable handle, if any, is assumed to be lowered 171 /// separately. 172 fn parts(self) -> (u32, u32) { 173 const EVENT_NONE: u32 = 0; 174 const EVENT_SUBTASK: u32 = 1; 175 const EVENT_STREAM_READ: u32 = 2; 176 const EVENT_STREAM_WRITE: u32 = 3; 177 const EVENT_FUTURE_READ: u32 = 4; 178 const EVENT_FUTURE_WRITE: u32 = 5; 179 const EVENT_CANCELLED: u32 = 6; 180 match self { 181 Event::None => (EVENT_NONE, 0), 182 Event::Cancelled => (EVENT_CANCELLED, 0), 183 Event::Subtask { status } => (EVENT_SUBTASK, status as u32), 184 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), 185 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), 186 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), 187 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), 188 } 189 } 190 } 191 192 /// Corresponds to `CallbackCode` in the spec. 193 mod callback_code { 194 pub const EXIT: u32 = 0; 195 pub const YIELD: u32 = 1; 196 pub const WAIT: u32 = 2; 197 } 198 199 /// A flag indicating that the callee is an async-lowered export. 200 /// 201 /// This may be passed to the `async-start` intrinsic from a fused adapter. 202 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; 203 204 /// Provides access to either store data (via the `get` method) or the store 205 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component 206 /// instance to which the current host task belongs. 207 /// 208 /// See [`Accessor::with`] for details. 209 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> { 210 store: StoreContextMut<'a, T>, 211 get_data: fn(&mut T) -> D::Data<'_>, 212 } 213 214 impl<'a, T, D> Access<'a, T, D> 215 where 216 D: HasData + ?Sized, 217 T: 'static, 218 { 219 /// Creates a new [`Access`] from its component parts. 220 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { 221 Self { store, get_data } 222 } 223 224 /// Get mutable access to the store data. 225 pub fn data_mut(&mut self) -> &mut T { 226 self.store.data_mut() 227 } 228 229 /// Get mutable access to the store data. 230 pub fn get(&mut self) -> D::Data<'_> { 231 (self.get_data)(self.data_mut()) 232 } 233 234 /// Spawn a background task. 235 /// 236 /// See [`Accessor::spawn`] for details. 237 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle 238 where 239 T: 'static, 240 { 241 let accessor = Accessor { 242 get_data: self.get_data, 243 token: StoreToken::new(self.store.as_context_mut()), 244 }; 245 self.store 246 .as_context_mut() 247 .spawn_with_accessor(accessor, task) 248 } 249 250 /// Returns the getter this accessor is using to project from `T` into 251 /// `D::Data`. 252 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 253 self.get_data 254 } 255 } 256 257 impl<'a, T, D> AsContext for Access<'a, T, D> 258 where 259 D: HasData + ?Sized, 260 T: 'static, 261 { 262 type Data = T; 263 264 fn as_context(&self) -> StoreContext<'_, T> { 265 self.store.as_context() 266 } 267 } 268 269 impl<'a, T, D> AsContextMut for Access<'a, T, D> 270 where 271 D: HasData + ?Sized, 272 T: 'static, 273 { 274 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { 275 self.store.as_context_mut() 276 } 277 } 278 279 /// Provides scoped mutable access to store data in the context of a concurrent 280 /// host task future. 281 /// 282 /// This allows multiple host task futures to execute concurrently and access 283 /// the store between (but not across) `await` points. 284 /// 285 /// # Rationale 286 /// 287 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to 288 /// `D::Data<'_>`. The problem this is solving, however, is that it does not 289 /// literally store these values. The basic problem is that when a concurrent 290 /// host future is being polled it has access to `&mut T` (and the whole 291 /// `Store`) but when it's not being polled it does not have access to these 292 /// values. This reflects how the store is only ever polling one future at a 293 /// time so the store is effectively being passed between futures. 294 /// 295 /// Rust's `Future` trait, however, has no means of passing a `Store` 296 /// temporarily between futures. The [`Context`](std::task::Context) type does 297 /// not have the ability to attach arbitrary information to it at this time. 298 /// This type, [`Accessor`], is used to bridge this expressivity gap. 299 /// 300 /// The [`Accessor`] type here represents the ability to acquire, temporarily in 301 /// a synchronous manner, the current store. The [`Accessor::with`] function 302 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut 303 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does 304 /// not take an `async` closure as its argument, instead it's a synchronous 305 /// closure which must complete during on run of `Future::poll`. This reflects 306 /// how the store is temporarily made available while a host future is being 307 /// polled. 308 /// 309 /// # Implementation 310 /// 311 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and 312 /// this type additionally doesn't even have a lifetime parameter. This is 313 /// instead a representation of proof of the ability to acquire these while a 314 /// future is being polled. Wasmtime will, when it polls a host future, 315 /// configure ambient state such that the `Accessor` that a future closes over 316 /// will work and be able to access the store. 317 /// 318 /// This has a number of implications for users such as: 319 /// 320 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within 321 /// the lifetime of a single future. 322 /// * A future is expected to, however, close over an `Accessor` and keep it 323 /// alive probably for the duration of the entire future. 324 /// * Different host futures will be given different `Accessor`s, and that's 325 /// intentional. 326 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which 327 /// alleviates some otherwise required bounds to be written down. 328 /// 329 /// # Using `Accessor` in `Drop` 330 /// 331 /// The methods on `Accessor` are only expected to work in the context of 332 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a 333 /// host future can be dropped at any time throughout the system and Wasmtime 334 /// store context is not necessarily available at that time. It's recommended to 335 /// not use `Accessor` methods in anything connected to a `Drop` implementation 336 /// as they will panic and have unintended results. If you run into this though 337 /// feel free to file an issue on the Wasmtime repository. 338 pub struct Accessor<T: 'static, D = HasSelf<T>> 339 where 340 D: HasData + ?Sized, 341 { 342 token: StoreToken<T>, 343 get_data: fn(&mut T) -> D::Data<'_>, 344 } 345 346 /// A helper trait to take any type of accessor-with-data in functions. 347 /// 348 /// This trait is similar to [`AsContextMut`] except that it's used when 349 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The 350 /// [`Accessor`] is the main type used in concurrent settings and is passed to 351 /// functions such as [`Func::call_concurrent`]. 352 /// 353 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements 354 /// this trait. This effectively means that regardless of the `D` in 355 /// `Accessor<T, D>` it can still be passed to a function which just needs a 356 /// store accessor. 357 /// 358 /// Acquiring an [`Accessor`] can be done through 359 /// [`StoreContextMut::run_concurrent`] for example or in a host function 360 /// through 361 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent). 362 pub trait AsAccessor { 363 /// The `T` in `Store<T>` that this accessor refers to. 364 type Data: 'static; 365 366 /// The `D` in `Accessor<T, D>`, or the projection out of 367 /// `Self::Data`. 368 type AccessorData: HasData + ?Sized; 369 370 /// Returns the accessor that this is referring to. 371 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>; 372 } 373 374 impl<T: AsAccessor + ?Sized> AsAccessor for &T { 375 type Data = T::Data; 376 type AccessorData = T::AccessorData; 377 378 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> { 379 T::as_accessor(self) 380 } 381 } 382 383 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> { 384 type Data = T; 385 type AccessorData = D; 386 387 fn as_accessor(&self) -> &Accessor<T, D> { 388 self 389 } 390 } 391 392 // Note that it is intentional at this time that `Accessor` does not actually 393 // store `&mut T` or anything similar. This distinctly enables the `Accessor` 394 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for 395 // that matter). This is used to ergonomically simplify bindings where the 396 // majority of the time `Accessor` is closed over in a future which then needs 397 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as 398 // you already have to write `T: 'static`...) it helps to avoid this. 399 // 400 // Note as well that `Accessor` doesn't actually store its data at all. Instead 401 // it's more of a "proof" of what can be accessed from TLS. API design around 402 // `Accessor` and functions like `Linker::func_wrap_concurrent` are 403 // intentionally made to ensure that `Accessor` is ideally only used in the 404 // context that TLS variables are actually set. For example host functions are 405 // given `&Accessor`, not `Accessor`, and this prevents them from persisting 406 // the value outside of a future. Within the future the TLS variables are all 407 // guaranteed to be set while the future is being polled. 408 // 409 // Finally though this is not an ironclad guarantee, but nor does it need to be. 410 // The TLS APIs are designed to panic or otherwise model usage where they're 411 // called recursively or similar. It's hoped that code cannot be constructed to 412 // actually hit this at runtime but this is not a safety requirement at this 413 // time. 414 const _: () = { 415 const fn assert<T: Send + Sync>() {} 416 assert::<Accessor<UnsafeCell<u32>>>(); 417 }; 418 419 impl<T> Accessor<T> { 420 /// Creates a new `Accessor` backed by the specified functions. 421 /// 422 /// - `get`: used to retrieve the store 423 /// 424 /// - `get_data`: used to "project" from the store's associated data to 425 /// another type (e.g. a field of that data or a wrapper around it). 426 /// 427 /// - `spawn`: used to queue spawned background tasks to be run later 428 pub(crate) fn new(token: StoreToken<T>) -> Self { 429 Self { 430 token, 431 get_data: |x| x, 432 } 433 } 434 } 435 436 impl<T, D> Accessor<T, D> 437 where 438 D: HasData + ?Sized, 439 { 440 /// Run the specified closure, passing it mutable access to the store. 441 /// 442 /// This function is one of the main building blocks of the [`Accessor`] 443 /// type. This yields synchronous, blocking, access to the store via an 444 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to 445 /// providing the ability to access `D` via [`Access::get`]. Note that the 446 /// `fun` here is given only temporary access to the store and `T`/`D` 447 /// meaning that the return value `R` here is not allowed to capture borrows 448 /// into the two. If access is needed to data within `T` or `D` outside of 449 /// this closure then it must be `clone`d out, for example. 450 /// 451 /// # Panics 452 /// 453 /// This function will panic if it is call recursively with any other 454 /// accessor already in scope. For example if `with` is called within `fun`, 455 /// then this function will panic. It is up to the embedder to ensure that 456 /// this does not happen. 457 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { 458 tls::get(|vmstore| { 459 fun(Access { 460 store: self.token.as_context_mut(vmstore), 461 get_data: self.get_data, 462 }) 463 }) 464 } 465 466 /// Returns the getter this accessor is using to project from `T` into 467 /// `D::Data`. 468 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 469 self.get_data 470 } 471 472 /// Changes this accessor to access `D2` instead of the current type 473 /// parameter `D`. 474 /// 475 /// This changes the underlying data access from `T` to `D2::Data<'_>`. 476 /// 477 /// # Panics 478 /// 479 /// When using this API the returned value is disconnected from `&self` and 480 /// the lifetime binding the `self` argument. An `Accessor` only works 481 /// within the context of the closure or async closure that it was 482 /// originally given to, however. This means that due to the fact that the 483 /// returned value has no lifetime connection it's possible to use the 484 /// accessor outside of `&self`, the original accessor, and panic. 485 /// 486 /// The returned value should only be used within the scope of the original 487 /// `Accessor` that `self` refers to. 488 pub fn with_getter<D2: HasData>( 489 &self, 490 get_data: fn(&mut T) -> D2::Data<'_>, 491 ) -> Accessor<T, D2> { 492 Accessor { 493 token: self.token, 494 get_data, 495 } 496 } 497 498 /// Spawn a background task which will receive an `&Accessor<T, D>` and 499 /// run concurrently with any other tasks in progress for the current 500 /// store. 501 /// 502 /// This is particularly useful for host functions which return a `stream` 503 /// or `future` such that the code to write to the write end of that 504 /// `stream` or `future` must run after the function returns. 505 /// 506 /// The returned [`JoinHandle`] may be used to cancel the task. 507 /// 508 /// # Panics 509 /// 510 /// Panics if called within a closure provided to the [`Accessor::with`] 511 /// function. This can only be called outside an active invocation of 512 /// [`Accessor::with`]. 513 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle 514 where 515 T: 'static, 516 { 517 let accessor = self.clone_for_spawn(); 518 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) 519 } 520 521 fn clone_for_spawn(&self) -> Self { 522 Self { 523 token: self.token, 524 get_data: self.get_data, 525 } 526 } 527 } 528 529 /// Represents a task which may be provided to `Accessor::spawn`, 530 /// `Accessor::forward`, or `StorecContextMut::spawn`. 531 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable 532 // option. 533 // 534 // As of this writing, it's not possible to specify e.g. `Send` and `Sync` 535 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: 536 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F + 537 // Send + Sync + 'static` fails with a type mismatch error when we try to pass 538 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the 539 // best we can do for the time being. 540 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static 541 where 542 D: HasData + ?Sized, 543 { 544 /// Run the task. 545 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send; 546 } 547 548 /// Represents parameter and result metadata for the caller side of a 549 /// guest->guest call orchestrated by a fused adapter. 550 enum CallerInfo { 551 /// Metadata for a call to an async-lowered import 552 Async { 553 params: Vec<ValRaw>, 554 has_result: bool, 555 }, 556 /// Metadata for a call to an sync-lowered import 557 Sync { 558 params: Vec<ValRaw>, 559 result_count: u32, 560 }, 561 } 562 563 /// Indicates how a guest task is waiting on a waitable set. 564 enum WaitMode { 565 /// The guest task is waiting using `task.wait` 566 Fiber(StoreFiber<'static>), 567 /// The guest task is waiting via a callback declared as part of an 568 /// async-lifted export. 569 Callback(Instance), 570 } 571 572 /// Represents the reason a fiber is suspending itself. 573 #[derive(Debug)] 574 enum SuspendReason { 575 /// The fiber is waiting for an event to be delivered to the specified 576 /// waitable set or task. 577 Waiting { 578 set: TableId<WaitableSet>, 579 thread: QualifiedThreadId, 580 skip_may_block_check: bool, 581 }, 582 /// The fiber has finished handling its most recent work item and is waiting 583 /// for another (or to be dropped if it is no longer needed). 584 NeedWork, 585 /// The fiber is yielding and should be resumed once other tasks have had a 586 /// chance to run. 587 Yielding { 588 thread: QualifiedThreadId, 589 skip_may_block_check: bool, 590 }, 591 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. 592 ExplicitlySuspending { 593 thread: QualifiedThreadId, 594 skip_may_block_check: bool, 595 }, 596 } 597 598 /// Represents a pending call into guest code for a given guest task. 599 enum GuestCallKind { 600 /// Indicates there's an event to deliver to the task, possibly related to a 601 /// waitable set the task has been waiting on or polling. 602 DeliverEvent { 603 /// The instance to which the task belongs. 604 instance: Instance, 605 /// The waitable set the event belongs to, if any. 606 /// 607 /// If this is `None` the event will be waiting in the 608 /// `GuestTask::event` field for the task. 609 set: Option<TableId<WaitableSet>>, 610 }, 611 /// Indicates that a new guest task call is pending and may be executed 612 /// using the specified closure. 613 /// 614 /// If the closure returns `Ok(Some(call))`, the `call` should be run 615 /// immediately using `handle_guest_call`. 616 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>), 617 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>), 618 } 619 620 impl fmt::Debug for GuestCallKind { 621 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 622 match self { 623 Self::DeliverEvent { instance, set } => f 624 .debug_struct("DeliverEvent") 625 .field("instance", instance) 626 .field("set", set) 627 .finish(), 628 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), 629 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), 630 } 631 } 632 } 633 634 /// The target of a suspension intrinsic. 635 #[derive(Copy, Clone, Debug)] 636 pub enum SuspensionTarget { 637 SomeSuspended(u32), 638 Some(u32), 639 None, 640 } 641 642 impl SuspensionTarget { 643 fn is_none(&self) -> bool { 644 matches!(self, SuspensionTarget::None) 645 } 646 fn is_some(&self) -> bool { 647 !self.is_none() 648 } 649 } 650 651 /// Represents a pending call into guest code for a given guest thread. 652 #[derive(Debug)] 653 struct GuestCall { 654 thread: QualifiedThreadId, 655 kind: GuestCallKind, 656 } 657 658 impl GuestCall { 659 /// Returns whether or not the call is ready to run. 660 /// 661 /// A call will not be ready to run if either: 662 /// 663 /// - the (sub-)component instance to be called has already been entered and 664 /// cannot be reentered until an in-progress call completes 665 /// 666 /// - the call is for a not-yet started task and the (sub-)component 667 /// instance to be called has backpressure enabled 668 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> { 669 let instance = store 670 .concurrent_state_mut() 671 .get_mut(self.thread.task)? 672 .instance; 673 let state = store.instance_state(instance).concurrent_state(); 674 675 let ready = match &self.kind { 676 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, 677 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), 678 GuestCallKind::StartExplicit(_) => true, 679 }; 680 log::trace!( 681 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", 682 state.do_not_enter, 683 state.backpressure 684 ); 685 Ok(ready) 686 } 687 } 688 689 /// Job to be run on a worker fiber. 690 enum WorkerItem { 691 GuestCall(GuestCall), 692 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 693 } 694 695 /// Represents a pending work item to be handled by the event loop for a given 696 /// component instance. 697 enum WorkItem { 698 /// A host task to be pushed to `ConcurrentState::futures`. 699 PushFuture(AlwaysMut<HostTaskFuture>), 700 /// A fiber to resume. 701 ResumeFiber(StoreFiber<'static>), 702 /// A thread to resume. 703 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId), 704 /// A pending call into guest code for a given guest task. 705 GuestCall(RuntimeComponentInstanceIndex, GuestCall), 706 /// A job to run on a worker fiber. 707 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 708 } 709 710 impl fmt::Debug for WorkItem { 711 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 712 match self { 713 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), 714 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), 715 Self::ResumeThread(instance, thread) => f 716 .debug_tuple("ResumeThread") 717 .field(instance) 718 .field(thread) 719 .finish(), 720 Self::GuestCall(instance, call) => f 721 .debug_tuple("GuestCall") 722 .field(instance) 723 .field(call) 724 .finish(), 725 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), 726 } 727 } 728 } 729 730 /// Whether a suspension intrinsic was cancelled or completed 731 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 732 pub(crate) enum WaitResult { 733 Cancelled, 734 Completed, 735 } 736 737 /// Poll the specified future until it completes on behalf of a guest->host call 738 /// using a sync-lowered import. 739 /// 740 /// This is similar to `Instance::first_poll` except it's for sync-lowered 741 /// imports, meaning we don't need to handle cancellation and we can block the 742 /// caller until the task completes, at which point the caller can handle 743 /// lowering the result to the guest's stack and linear memory. 744 pub(crate) fn poll_and_block<R: Send + Sync + 'static>( 745 store: &mut dyn VMStore, 746 future: impl Future<Output = Result<R>> + Send + 'static, 747 ) -> Result<R> { 748 let state = store.concurrent_state_mut(); 749 let task = state.unwrap_current_host_thread(); 750 751 // Wrap the future in a closure which will take care of stashing the result 752 // in `GuestTask::result` and resuming this fiber when the host task 753 // completes. 754 let mut future = Box::pin(async move { 755 let result = future.await?; 756 tls::get(move |store| { 757 let state = store.concurrent_state_mut(); 758 let host_state = &mut state.get_mut(task)?.state; 759 assert!(matches!(host_state, HostTaskState::CalleeStarted)); 760 *host_state = HostTaskState::CalleeFinished(Box::new(result)); 761 762 Waitable::Host(task).set_event( 763 state, 764 Some(Event::Subtask { 765 status: Status::Returned, 766 }), 767 )?; 768 769 Ok(()) 770 }) 771 }) as HostTaskFuture; 772 773 // Finally, poll the future. We can use a dummy `Waker` here because we'll 774 // add the future to `ConcurrentState::futures` and poll it automatically 775 // from the event loop if it doesn't complete immediately here. 776 let poll = tls::set(store, || { 777 future 778 .as_mut() 779 .poll(&mut Context::from_waker(&Waker::noop())) 780 }); 781 782 match poll { 783 // It completed immediately; check the result and delete the task. 784 Poll::Ready(result) => result?, 785 786 // It did not complete immediately; add it to 787 // `ConcurrentState::futures` so it will be polled via the event loop; 788 // then use `GuestTask::sync_call_set` to wait for the task to 789 // complete, suspending the current fiber until it does so. 790 Poll::Pending => { 791 let state = store.concurrent_state_mut(); 792 state.push_future(future); 793 794 let caller = state.get_mut(task)?.caller; 795 let set = state.get_mut(caller.task)?.sync_call_set; 796 Waitable::Host(task).join(state, Some(set))?; 797 798 store.suspend(SuspendReason::Waiting { 799 set, 800 thread: caller, 801 skip_may_block_check: false, 802 })?; 803 804 // Remove the `task` from the `sync_call_set` to ensure that when 805 // this function returns and the task is deleted that there are no 806 // more lingering references to this host task. 807 Waitable::Host(task).join(store.concurrent_state_mut(), None)?; 808 } 809 } 810 811 // Retrieve and return the result. 812 let host_state = &mut store.concurrent_state_mut().get_mut(task)?.state; 813 match mem::replace(host_state, HostTaskState::CalleeDone) { 814 HostTaskState::CalleeFinished(result) => Ok(*result.downcast().unwrap()), 815 _ => panic!("unexpected host task state after completion"), 816 } 817 } 818 819 /// Execute the specified guest call. 820 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { 821 let mut next = Some(call); 822 while let Some(call) = next.take() { 823 match call.kind { 824 GuestCallKind::DeliverEvent { instance, set } => { 825 let (event, waitable) = instance 826 .get_event(store, call.thread.task, set, true)? 827 .unwrap(); 828 let state = store.concurrent_state_mut(); 829 let task = state.get_mut(call.thread.task)?; 830 let runtime_instance = task.instance; 831 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 832 833 log::trace!( 834 "use callback to deliver event {event:?} to {:?} for {waitable:?}", 835 call.thread, 836 ); 837 838 let old_thread = store.set_thread(call.thread); 839 log::trace!( 840 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", 841 call.thread 842 ); 843 844 store.enter_instance(runtime_instance); 845 846 let callback = store 847 .concurrent_state_mut() 848 .get_mut(call.thread.task)? 849 .callback 850 .take() 851 .unwrap(); 852 853 let code = callback(store, event, handle)?; 854 855 store 856 .concurrent_state_mut() 857 .get_mut(call.thread.task)? 858 .callback = Some(callback); 859 860 store.exit_instance(runtime_instance)?; 861 862 store.set_thread(old_thread); 863 864 next = instance.handle_callback_code( 865 store, 866 call.thread, 867 runtime_instance.index, 868 code, 869 )?; 870 871 log::trace!( 872 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" 873 ); 874 } 875 GuestCallKind::StartImplicit(fun) => { 876 next = fun(store)?; 877 } 878 GuestCallKind::StartExplicit(fun) => { 879 fun(store)?; 880 } 881 } 882 } 883 884 Ok(()) 885 } 886 887 impl<T> Store<T> { 888 /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. 889 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 890 where 891 T: Send + 'static, 892 { 893 ensure!( 894 self.as_context().0.concurrency_support(), 895 "cannot use `run_concurrent` when Config::concurrency_support disabled", 896 ); 897 self.as_context_mut().run_concurrent(fun).await 898 } 899 900 #[doc(hidden)] 901 pub fn assert_concurrent_state_empty(&mut self) { 902 self.as_context_mut().assert_concurrent_state_empty(); 903 } 904 905 #[doc(hidden)] 906 pub fn concurrent_state_table_size(&mut self) -> usize { 907 self.as_context_mut().concurrent_state_table_size() 908 } 909 910 /// Convenience wrapper for [`StoreContextMut::spawn`]. 911 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle 912 where 913 T: 'static, 914 { 915 self.as_context_mut().spawn(task) 916 } 917 } 918 919 impl<T> StoreContextMut<'_, T> { 920 /// Assert that all the relevant tables and queues in the concurrent state 921 /// for this store are empty. 922 /// 923 /// This is for sanity checking in integration tests 924 /// (e.g. `component-async-tests`) that the relevant state has been cleared 925 /// after each test concludes. This should help us catch leaks, e.g. guest 926 /// tasks which haven't been deleted despite having completed and having 927 /// been dropped by their supertasks. 928 /// 929 /// Only intended for use in Wasmtime's own testing. 930 #[doc(hidden)] 931 pub fn assert_concurrent_state_empty(self) { 932 let store = self.0; 933 store 934 .store_data_mut() 935 .components 936 .assert_instance_states_empty(); 937 let state = store.concurrent_state_mut(); 938 assert!( 939 state.table.get_mut().is_empty(), 940 "non-empty table: {:?}", 941 state.table.get_mut() 942 ); 943 assert!(state.high_priority.is_empty()); 944 assert!(state.low_priority.is_empty()); 945 assert!(state.current_thread.is_none()); 946 assert!(state.futures.get_mut().as_ref().unwrap().is_empty()); 947 assert!(state.global_error_context_ref_counts.is_empty()); 948 } 949 950 /// Helper function to perform tests over the size of the concurrent state 951 /// table which can be useful for detecting leaks. 952 /// 953 /// Only intended for use in Wasmtime's own testing. 954 #[doc(hidden)] 955 pub fn concurrent_state_table_size(&mut self) -> usize { 956 self.0 957 .concurrent_state_mut() 958 .table 959 .get_mut() 960 .iter_mut() 961 .count() 962 } 963 964 /// Spawn a background task to run as part of this instance's event loop. 965 /// 966 /// The task will receive an `&Accessor<U>` and run concurrently with 967 /// any other tasks in progress for the instance. 968 /// 969 /// Note that the task will only make progress if and when the event loop 970 /// for this instance is run. 971 /// 972 /// The returned [`JoinHandle`] may be used to cancel the task. 973 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle 974 where 975 T: 'static, 976 { 977 let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); 978 self.spawn_with_accessor(accessor, task) 979 } 980 981 /// Internal implementation of `spawn` functions where a `store` is 982 /// available along with an `Accessor`. 983 fn spawn_with_accessor<D>( 984 self, 985 accessor: Accessor<T, D>, 986 task: impl AccessorTask<T, D>, 987 ) -> JoinHandle 988 where 989 T: 'static, 990 D: HasData + ?Sized, 991 { 992 // Create an "abortable future" here where internally the future will 993 // hook calls to poll and possibly spawn more background tasks on each 994 // iteration. 995 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); 996 self.0 997 .concurrent_state_mut() 998 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); 999 handle 1000 } 1001 1002 /// Run the specified closure `fun` to completion as part of this store's 1003 /// event loop. 1004 /// 1005 /// This will run `fun` as part of this store's event loop until it 1006 /// yields a result. `fun` is provided an [`Accessor`], which provides 1007 /// controlled access to the store and its data. 1008 /// 1009 /// This function can be used to invoke [`Func::call_concurrent`] for 1010 /// example within the async closure provided here. 1011 /// 1012 /// This function will unconditionally return an error if 1013 /// [`Config::concurrency_support`] is disabled. 1014 /// 1015 /// [`Config::concurrency_support`]: crate::Config::concurrency_support 1016 /// 1017 /// # Store-blocking behavior 1018 /// 1019 /// At this time there are certain situations in which the `Future` returned 1020 /// by the `AsyncFnOnce` passed to this function will not be polled for an 1021 /// extended period of time, despite one or more `Waker::wake` events having 1022 /// occurred for the task to which it belongs. This can manifest as the 1023 /// `Future` seeming to be "blocked" or "locked up", but is actually due to 1024 /// the `Store` being held by e.g. a blocking host function, preventing the 1025 /// `Future` from being polled. A canonical example of this is when the 1026 /// `fun` provided to this function attempts to set a timeout for an 1027 /// invocation of a wasm function. In this situation the async closure is 1028 /// waiting both on (a) the wasm computation to finish, and (b) the timeout 1029 /// to elapse. At this time this setup will not always work and the timeout 1030 /// may not reliably fire. 1031 /// 1032 /// This function will not block the current thread and as such is always 1033 /// suitable to run in an `async` context, but the current implementation of 1034 /// Wasmtime can lead to situations where a certain wasm computation is 1035 /// required to make progress the closure to make progress. This is an 1036 /// artifact of Wasmtime's historical implementation of `async` functions 1037 /// and is the topic of [#11869] and [#11870]. In the timeout example from 1038 /// above it means that Wasmtime can get "wedged" for a bit where (a) must 1039 /// progress for a readiness notification of (b) to get delivered. 1040 /// 1041 /// This effectively means that it's not possible to reliably perform a 1042 /// "select" operation within the `fun` closure, which timeouts for example 1043 /// are based on. Fixing this requires some relatively major refactoring 1044 /// work within Wasmtime itself. This is a known pitfall otherwise and one 1045 /// that is intended to be fixed one day. In the meantime it's recommended 1046 /// to apply timeouts or such to the entire `run_concurrent` call itself 1047 /// rather than internally. 1048 /// 1049 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 1050 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 1051 /// 1052 /// # Example 1053 /// 1054 /// ``` 1055 /// # use { 1056 /// # wasmtime::{ 1057 /// # error::{Result}, 1058 /// # component::{ Component, Linker, Resource, ResourceTable}, 1059 /// # Config, Engine, Store 1060 /// # }, 1061 /// # }; 1062 /// # 1063 /// # struct MyResource(u32); 1064 /// # struct Ctx { table: ResourceTable } 1065 /// # 1066 /// # async fn foo() -> Result<()> { 1067 /// # let mut config = Config::new(); 1068 /// # let engine = Engine::new(&config)?; 1069 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); 1070 /// # let mut linker = Linker::new(&engine); 1071 /// # let component = Component::new(&engine, "")?; 1072 /// # let instance = linker.instantiate_async(&mut store, &component).await?; 1073 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?; 1074 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; 1075 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { 1076 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; 1077 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?; 1078 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; 1079 /// bar.call_concurrent(accessor, (value.0,)).await?; 1080 /// Ok(()) 1081 /// }).await??; 1082 /// # Ok(()) 1083 /// # } 1084 /// ``` 1085 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 1086 where 1087 T: Send + 'static, 1088 { 1089 ensure!( 1090 self.0.concurrency_support(), 1091 "cannot use `run_concurrent` when Config::concurrency_support disabled", 1092 ); 1093 self.do_run_concurrent(fun, false).await 1094 } 1095 1096 pub(super) async fn run_concurrent_trap_on_idle<R>( 1097 self, 1098 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1099 ) -> Result<R> 1100 where 1101 T: Send + 'static, 1102 { 1103 self.do_run_concurrent(fun, true).await 1104 } 1105 1106 async fn do_run_concurrent<R>( 1107 mut self, 1108 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1109 trap_on_idle: bool, 1110 ) -> Result<R> 1111 where 1112 T: Send + 'static, 1113 { 1114 debug_assert!(self.0.concurrency_support()); 1115 check_recursive_run(); 1116 let token = StoreToken::new(self.as_context_mut()); 1117 1118 struct Dropper<'a, T: 'static, V> { 1119 store: StoreContextMut<'a, T>, 1120 value: ManuallyDrop<V>, 1121 } 1122 1123 impl<'a, T, V> Drop for Dropper<'a, T, V> { 1124 fn drop(&mut self) { 1125 tls::set(self.store.0, || { 1126 // SAFETY: Here we drop the value without moving it for the 1127 // first and only time -- per the contract for `Drop::drop`, 1128 // this code won't run again, and the `value` field will no 1129 // longer be accessible. 1130 unsafe { ManuallyDrop::drop(&mut self.value) } 1131 }); 1132 } 1133 } 1134 1135 let accessor = &Accessor::new(token); 1136 let dropper = &mut Dropper { 1137 store: self, 1138 value: ManuallyDrop::new(fun(accessor)), 1139 }; 1140 // SAFETY: We never move `dropper` nor its `value` field. 1141 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; 1142 1143 dropper 1144 .store 1145 .as_context_mut() 1146 .poll_until(future, trap_on_idle) 1147 .await 1148 } 1149 1150 /// Run this store's event loop. 1151 /// 1152 /// The returned future will resolve when the specified future completes or, 1153 /// if `trap_on_idle` is true, when the event loop can't make further 1154 /// progress. 1155 async fn poll_until<R>( 1156 mut self, 1157 mut future: Pin<&mut impl Future<Output = R>>, 1158 trap_on_idle: bool, 1159 ) -> Result<R> 1160 where 1161 T: Send + 'static, 1162 { 1163 struct Reset<'a, T: 'static> { 1164 store: StoreContextMut<'a, T>, 1165 futures: Option<FuturesUnordered<HostTaskFuture>>, 1166 } 1167 1168 impl<'a, T> Drop for Reset<'a, T> { 1169 fn drop(&mut self) { 1170 if let Some(futures) = self.futures.take() { 1171 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); 1172 } 1173 } 1174 } 1175 1176 loop { 1177 // Take `ConcurrentState::futures` out of the store so we can poll 1178 // it while also safely giving any of the futures inside access to 1179 // `self`. 1180 let futures = self.0.concurrent_state_mut().futures.get_mut().take(); 1181 let mut reset = Reset { 1182 store: self.as_context_mut(), 1183 futures, 1184 }; 1185 let mut next = pin!(reset.futures.as_mut().unwrap().next()); 1186 1187 enum PollResult<R> { 1188 Complete(R), 1189 ProcessWork(Vec<WorkItem>), 1190 } 1191 let result = future::poll_fn(|cx| { 1192 // First, poll the future we were passed as an argument and 1193 // return immediately if it's ready. 1194 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { 1195 return Poll::Ready(Ok(PollResult::Complete(value))); 1196 } 1197 1198 // Next, poll `ConcurrentState::futures` (which includes any 1199 // pending host tasks and/or background tasks), returning 1200 // immediately if one of them fails. 1201 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { 1202 Poll::Ready(Some(output)) => { 1203 match output { 1204 Err(e) => return Poll::Ready(Err(e)), 1205 Ok(()) => {} 1206 } 1207 Poll::Ready(true) 1208 } 1209 Poll::Ready(None) => Poll::Ready(false), 1210 Poll::Pending => Poll::Pending, 1211 }; 1212 1213 // Next, collect the next batch of work items to process, if any. 1214 // This will be either all of the high-priority work items, or if 1215 // there are none, a single low-priority work item. 1216 let state = reset.store.0.concurrent_state_mut(); 1217 let ready = state.collect_work_items_to_run(); 1218 if !ready.is_empty() { 1219 return Poll::Ready(Ok(PollResult::ProcessWork(ready))); 1220 } 1221 1222 // Finally, if we have nothing else to do right now, determine what to do 1223 // based on whether there are any pending futures in 1224 // `ConcurrentState::futures`. 1225 return match next { 1226 Poll::Ready(true) => { 1227 // In this case, one of the futures in 1228 // `ConcurrentState::futures` completed 1229 // successfully, so we return now and continue 1230 // the outer loop in case there is another one 1231 // ready to complete. 1232 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new()))) 1233 } 1234 Poll::Ready(false) => { 1235 // Poll the future we were passed one last time 1236 // in case one of `ConcurrentState::futures` had 1237 // the side effect of unblocking it. 1238 if let Poll::Ready(value) = 1239 tls::set(reset.store.0, || future.as_mut().poll(cx)) 1240 { 1241 Poll::Ready(Ok(PollResult::Complete(value))) 1242 } else { 1243 // In this case, there are no more pending 1244 // futures in `ConcurrentState::futures`, 1245 // there are no remaining work items, _and_ 1246 // the future we were passed as an argument 1247 // still hasn't completed. 1248 if trap_on_idle { 1249 // `trap_on_idle` is true, so we exit 1250 // immediately. 1251 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock))) 1252 } else { 1253 // `trap_on_idle` is false, so we assume 1254 // that future will wake up and give us 1255 // more work to do when it's ready to. 1256 Poll::Pending 1257 } 1258 } 1259 } 1260 // There is at least one pending future in 1261 // `ConcurrentState::futures` and we have nothing 1262 // else to do but wait for now, so we return 1263 // `Pending`. 1264 Poll::Pending => Poll::Pending, 1265 }; 1266 }) 1267 .await; 1268 1269 // Put the `ConcurrentState::futures` back into the store before we 1270 // return or handle any work items since one or more of those items 1271 // might append more futures. 1272 drop(reset); 1273 1274 match result? { 1275 // The future we were passed as an argument completed, so we 1276 // return the result. 1277 PollResult::Complete(value) => break Ok(value), 1278 // The future we were passed has not yet completed, so handle 1279 // any work items and then loop again. 1280 PollResult::ProcessWork(ready) => { 1281 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> { 1282 store: StoreContextMut<'a, T>, 1283 ready: I, 1284 } 1285 1286 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> { 1287 fn drop(&mut self) { 1288 while let Some(item) = self.ready.next() { 1289 match item { 1290 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), 1291 WorkItem::PushFuture(future) => { 1292 tls::set(self.store.0, move || drop(future)) 1293 } 1294 _ => {} 1295 } 1296 } 1297 } 1298 } 1299 1300 let mut dispose = Dispose { 1301 store: self.as_context_mut(), 1302 ready: ready.into_iter(), 1303 }; 1304 1305 while let Some(item) = dispose.ready.next() { 1306 dispose 1307 .store 1308 .as_context_mut() 1309 .handle_work_item(item) 1310 .await?; 1311 } 1312 } 1313 } 1314 } 1315 } 1316 1317 /// Handle the specified work item, possibly resuming a fiber if applicable. 1318 async fn handle_work_item(self, item: WorkItem) -> Result<()> 1319 where 1320 T: Send, 1321 { 1322 log::trace!("handle work item {item:?}"); 1323 match item { 1324 WorkItem::PushFuture(future) => { 1325 self.0 1326 .concurrent_state_mut() 1327 .futures 1328 .get_mut() 1329 .as_mut() 1330 .unwrap() 1331 .push(future.into_inner()); 1332 } 1333 WorkItem::ResumeFiber(fiber) => { 1334 self.0.resume_fiber(fiber).await?; 1335 } 1336 WorkItem::ResumeThread(_, thread) => { 1337 if let GuestThreadState::Ready(fiber) = mem::replace( 1338 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state, 1339 GuestThreadState::Running, 1340 ) { 1341 self.0.resume_fiber(fiber).await?; 1342 } else { 1343 bail!("cannot resume non-pending thread {thread:?}"); 1344 } 1345 } 1346 WorkItem::GuestCall(_, call) => { 1347 if call.is_ready(self.0)? { 1348 self.run_on_worker(WorkerItem::GuestCall(call)).await?; 1349 } else { 1350 let state = self.0.concurrent_state_mut(); 1351 let task = state.get_mut(call.thread.task)?; 1352 if !task.starting_sent { 1353 task.starting_sent = true; 1354 if let GuestCallKind::StartImplicit(_) = &call.kind { 1355 Waitable::Guest(call.thread.task).set_event( 1356 state, 1357 Some(Event::Subtask { 1358 status: Status::Starting, 1359 }), 1360 )?; 1361 } 1362 } 1363 1364 let instance = state.get_mut(call.thread.task)?.instance; 1365 self.0 1366 .instance_state(instance) 1367 .concurrent_state() 1368 .pending 1369 .insert(call.thread, call.kind); 1370 } 1371 } 1372 WorkItem::WorkerFunction(fun) => { 1373 self.run_on_worker(WorkerItem::Function(fun)).await?; 1374 } 1375 } 1376 1377 Ok(()) 1378 } 1379 1380 /// Execute the specified guest call on a worker fiber. 1381 async fn run_on_worker(self, item: WorkerItem) -> Result<()> 1382 where 1383 T: Send, 1384 { 1385 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { 1386 fiber 1387 } else { 1388 fiber::make_fiber(self.0, move |store| { 1389 loop { 1390 match store.concurrent_state_mut().worker_item.take().unwrap() { 1391 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, 1392 WorkerItem::Function(fun) => fun.into_inner()(store)?, 1393 } 1394 1395 store.suspend(SuspendReason::NeedWork)?; 1396 } 1397 })? 1398 }; 1399 1400 let worker_item = &mut self.0.concurrent_state_mut().worker_item; 1401 assert!(worker_item.is_none()); 1402 *worker_item = Some(item); 1403 1404 self.0.resume_fiber(worker).await 1405 } 1406 1407 /// Wrap the specified host function in a future which will call it, passing 1408 /// it an `&Accessor<T>`. 1409 /// 1410 /// See the `Accessor` documentation for details. 1411 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static 1412 where 1413 T: 'static, 1414 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> 1415 + Send 1416 + Sync 1417 + 'static, 1418 R: Send + Sync + 'static, 1419 { 1420 let token = StoreToken::new(self); 1421 async move { 1422 let mut accessor = Accessor::new(token); 1423 closure(&mut accessor).await 1424 } 1425 } 1426 } 1427 1428 impl StoreOpaque { 1429 /// Push a `GuestTask` onto the task stack for either a sync-to-sync, 1430 /// guest-to-guest call or a sync host-to-guest call. 1431 /// 1432 /// This task will only be used for the purpose of handling calls to 1433 /// intrinsic functions; both parameter lowering and result lifting are 1434 /// assumed to be taken care of elsewhere. 1435 pub(crate) fn enter_guest_sync_call( 1436 &mut self, 1437 guest_caller: Option<RuntimeInstance>, 1438 callee_async: bool, 1439 callee: RuntimeInstance, 1440 ) -> Result<()> { 1441 log::trace!("enter sync call {callee:?}"); 1442 if !self.concurrency_support() { 1443 return Ok(self.enter_call_not_concurrent()); 1444 } 1445 1446 let state = self.concurrent_state_mut(); 1447 let thread = state.current_thread; 1448 let instance = if let Some(thread) = thread.guest() { 1449 Some(state.get_mut(thread.task)?.instance) 1450 } else { 1451 None 1452 }; 1453 let task = GuestTask::new( 1454 state, 1455 Box::new(move |_, _| unreachable!()), 1456 LiftResult { 1457 lift: Box::new(move |_, _| unreachable!()), 1458 ty: TypeTupleIndex::reserved_value(), 1459 memory: None, 1460 string_encoding: StringEncoding::Utf8, 1461 }, 1462 if let Some(caller) = guest_caller { 1463 assert_eq!(caller, instance.unwrap()); 1464 Caller::Guest { 1465 thread: *thread.guest().unwrap(), 1466 } 1467 } else { 1468 Caller::Host { 1469 tx: None, 1470 host_future_present: false, 1471 caller: thread, 1472 } 1473 }, 1474 None, 1475 callee, 1476 callee_async, 1477 )?; 1478 1479 let guest_task = state.push(task)?; 1480 let new_thread = GuestThread::new_implicit(guest_task); 1481 let guest_thread = state.push(new_thread)?; 1482 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table( 1483 guest_thread, 1484 self, 1485 callee.index, 1486 )?; 1487 1488 let state = self.concurrent_state_mut(); 1489 state.get_mut(guest_task)?.threads.insert(guest_thread); 1490 1491 self.set_thread(QualifiedThreadId { 1492 task: guest_task, 1493 thread: guest_thread, 1494 }); 1495 1496 Ok(()) 1497 } 1498 1499 /// Pop a `GuestTask` previously pushed using `enter_sync_call`. 1500 pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> { 1501 if !self.concurrency_support() { 1502 return Ok(self.exit_call_not_concurrent()); 1503 } 1504 let thread = *self.set_thread(CurrentThread::None).guest().unwrap(); 1505 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance; 1506 log::trace!("exit sync call {instance:?}"); 1507 Instance::from_wasmtime(self, instance.instance).cleanup_thread( 1508 self, 1509 thread, 1510 instance.index, 1511 )?; 1512 1513 let state = self.concurrent_state_mut(); 1514 let task = state.get_mut(thread.task)?; 1515 let caller = match &task.caller { 1516 &Caller::Guest { thread } => { 1517 assert!(guest_caller); 1518 thread.into() 1519 } 1520 &Caller::Host { caller, .. } => { 1521 assert!(!guest_caller); 1522 caller 1523 } 1524 }; 1525 self.set_thread(caller); 1526 1527 let state = self.concurrent_state_mut(); 1528 let task = state.get_mut(thread.task)?; 1529 if task.ready_to_delete() { 1530 state.delete(thread.task)?.dispose(state)?; 1531 } 1532 1533 Ok(()) 1534 } 1535 1536 /// Similar to `enter_guest_sync_call` except for when the guest makes a 1537 /// transition to the host. 1538 /// 1539 /// FIXME: this is called for all guest->host transitions and performs some 1540 /// relatively expensive table manipulations. This would ideally be 1541 /// optimized to avoid the full allocation of a `HostTask` in at least some 1542 /// situations. 1543 pub fn enter_host_call(&mut self) -> Result<()> { 1544 if !self.concurrency_support() { 1545 self.enter_call_not_concurrent(); 1546 return Ok(()); 1547 } 1548 let state = self.concurrent_state_mut(); 1549 let caller = state.unwrap_current_guest_thread(); 1550 let task = state.push(HostTask::new(caller, HostTaskState::CalleeStarted))?; 1551 log::trace!("new host task {task:?}"); 1552 self.set_thread(task); 1553 Ok(()) 1554 } 1555 1556 /// Dual of `enter_host_call` and signifies that the host has finished and 1557 /// will be cleaned up. 1558 /// 1559 /// Note that this isn't invoked when the host is invoked asynchronously and 1560 /// the host isn't complete yet. In that situation the host task persists 1561 /// and will be cleaned up separately. 1562 pub fn exit_host_call(&mut self) -> Result<()> { 1563 if !self.concurrency_support() { 1564 self.exit_call_not_concurrent(); 1565 return Ok(()); 1566 } 1567 let task = self.concurrent_state_mut().unwrap_current_host_thread(); 1568 log::trace!("delete host task {task:?}"); 1569 let task = self.concurrent_state_mut().delete(task)?; 1570 self.set_thread(task.caller); 1571 Ok(()) 1572 } 1573 1574 /// Determine whether the specified instance may be entered from the host. 1575 /// 1576 /// We return `true` here only if all of the following hold: 1577 /// 1578 /// - The top-level instance is not already on the current task's call stack. 1579 /// - The instance is not in need of a post-return function call. 1580 /// - `self` has not been poisoned due to a trap. 1581 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool { 1582 if self.trapped() { 1583 return false; 1584 } 1585 if !self.concurrency_support() { 1586 return true; 1587 } 1588 let state = self.concurrent_state_mut(); 1589 let mut cur = state.current_thread; 1590 loop { 1591 match cur { 1592 CurrentThread::None => break true, 1593 CurrentThread::Guest(thread) => { 1594 let task = state.get_mut(thread.task).unwrap(); 1595 1596 // Note that we only compare top-level instance IDs here. 1597 // The idea is that the host is not allowed to recursively 1598 // enter a top-level instance even if the specific leaf 1599 // instance is not on the stack. This the behavior defined 1600 // in the spec, and it allows us to elide runtime checks in 1601 // guest-to-guest adapters. 1602 if task.instance.instance == instance.instance { 1603 break false; 1604 } 1605 cur = match task.caller { 1606 Caller::Host { caller, .. } => caller, 1607 Caller::Guest { thread } => thread.into(), 1608 }; 1609 } 1610 CurrentThread::Host(id) => { 1611 cur = state.get_mut(id).unwrap().caller.into(); 1612 } 1613 } 1614 } 1615 } 1616 1617 /// Helper function to retrieve the `InstanceState` for the 1618 /// specified instance. 1619 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState { 1620 self.component_instance_mut(instance.instance) 1621 .instance_state(instance.index) 1622 } 1623 1624 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread { 1625 // Each time we switch threads, we conservatively set `task_may_block` 1626 // to `false` for the component instance we're switching away from (if 1627 // any), meaning it will be `false` for any new thread created for that 1628 // instance unless explicitly set otherwise. 1629 let state = self.concurrent_state_mut(); 1630 let old_thread = mem::replace(&mut state.current_thread, thread.into()); 1631 if let Some(old_thread) = old_thread.guest() { 1632 let instance = state.get_mut(old_thread.task).unwrap().instance.instance; 1633 self.component_instance_mut(instance) 1634 .set_task_may_block(false) 1635 } 1636 1637 // If we're switching to a new thread, set its component instance's 1638 // `task_may_block` according to where it left off. 1639 if self.concurrent_state_mut().current_thread.guest().is_some() { 1640 self.set_task_may_block(); 1641 } 1642 1643 old_thread 1644 } 1645 1646 /// Set the global variable representing whether the current task may block 1647 /// prior to entering Wasm code. 1648 fn set_task_may_block(&mut self) { 1649 let state = self.concurrent_state_mut(); 1650 let guest_thread = state.unwrap_current_guest_thread(); 1651 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance; 1652 let may_block = self.concurrent_state_mut().may_block(guest_thread.task); 1653 self.component_instance_mut(instance) 1654 .set_task_may_block(may_block) 1655 } 1656 1657 pub(crate) fn check_blocking(&mut self) -> Result<()> { 1658 if !self.concurrency_support() { 1659 return Ok(()); 1660 } 1661 let state = self.concurrent_state_mut(); 1662 let task = state.unwrap_current_guest_thread().task; 1663 let instance = state.get_mut(task).unwrap().instance.instance; 1664 let task_may_block = self.component_instance(instance).get_task_may_block(); 1665 1666 if task_may_block { 1667 Ok(()) 1668 } else { 1669 Err(Trap::CannotBlockSyncTask.into()) 1670 } 1671 } 1672 1673 /// Record that we're about to enter a (sub-)component instance which does 1674 /// not support more than one concurrent, stackful activation, meaning it 1675 /// cannot be entered again until the next call returns. 1676 fn enter_instance(&mut self, instance: RuntimeInstance) { 1677 log::trace!("enter {instance:?}"); 1678 self.instance_state(instance) 1679 .concurrent_state() 1680 .do_not_enter = true; 1681 } 1682 1683 /// Record that we've exited a (sub-)component instance previously entered 1684 /// with `Self::enter_instance` and then calls `Self::partition_pending`. 1685 /// See the documentation for the latter for details. 1686 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> { 1687 log::trace!("exit {instance:?}"); 1688 self.instance_state(instance) 1689 .concurrent_state() 1690 .do_not_enter = false; 1691 self.partition_pending(instance) 1692 } 1693 1694 /// Iterate over `InstanceState::pending`, moving any ready items into the 1695 /// "high priority" work item queue. 1696 /// 1697 /// See `GuestCall::is_ready` for details. 1698 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { 1699 for (thread, kind) in 1700 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter() 1701 { 1702 let call = GuestCall { thread, kind }; 1703 if call.is_ready(self)? { 1704 self.concurrent_state_mut() 1705 .push_high_priority(WorkItem::GuestCall(instance.index, call)); 1706 } else { 1707 self.instance_state(instance) 1708 .concurrent_state() 1709 .pending 1710 .insert(call.thread, call.kind); 1711 } 1712 } 1713 1714 Ok(()) 1715 } 1716 1717 /// Implements the `backpressure.{inc,dec}` intrinsics. 1718 pub(crate) fn backpressure_modify( 1719 &mut self, 1720 caller_instance: RuntimeInstance, 1721 modify: impl FnOnce(u16) -> Option<u16>, 1722 ) -> Result<()> { 1723 let state = self.instance_state(caller_instance).concurrent_state(); 1724 let old = state.backpressure; 1725 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?; 1726 state.backpressure = new; 1727 1728 if old > 0 && new == 0 { 1729 // Backpressure was previously enabled and is now disabled; move any 1730 // newly-eligible guest calls to the "high priority" queue. 1731 self.partition_pending(caller_instance)?; 1732 } 1733 1734 Ok(()) 1735 } 1736 1737 /// Resume the specified fiber, giving it exclusive access to the specified 1738 /// store. 1739 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { 1740 let old_thread = self.concurrent_state_mut().current_thread; 1741 log::trace!("resume_fiber: save current thread {old_thread:?}"); 1742 1743 let fiber = fiber::resolve_or_release(self, fiber).await?; 1744 1745 self.set_thread(old_thread); 1746 1747 let state = self.concurrent_state_mut(); 1748 1749 if let Some(ot) = old_thread.guest() { 1750 state.get_mut(ot.thread)?.state = GuestThreadState::Running; 1751 } 1752 log::trace!("resume_fiber: restore current thread {old_thread:?}"); 1753 1754 if let Some(mut fiber) = fiber { 1755 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); 1756 // See the `SuspendReason` documentation for what each case means. 1757 match state.suspend_reason.take().unwrap() { 1758 SuspendReason::NeedWork => { 1759 if state.worker.is_none() { 1760 state.worker = Some(fiber); 1761 } else { 1762 fiber.dispose(self); 1763 } 1764 } 1765 SuspendReason::Yielding { thread, .. } => { 1766 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber); 1767 let instance = state.get_mut(thread.task)?.instance.index; 1768 state.push_low_priority(WorkItem::ResumeThread(instance, thread)); 1769 } 1770 SuspendReason::ExplicitlySuspending { thread, .. } => { 1771 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); 1772 } 1773 SuspendReason::Waiting { set, thread, .. } => { 1774 let old = state 1775 .get_mut(set)? 1776 .waiting 1777 .insert(thread, WaitMode::Fiber(fiber)); 1778 assert!(old.is_none()); 1779 } 1780 }; 1781 } else { 1782 log::trace!("resume_fiber: fiber has exited"); 1783 } 1784 1785 Ok(()) 1786 } 1787 1788 /// Suspend the current fiber, storing the reason in 1789 /// `ConcurrentState::suspend_reason` to indicate the conditions under which 1790 /// it should be resumed. 1791 /// 1792 /// See the `SuspendReason` documentation for details. 1793 fn suspend(&mut self, reason: SuspendReason) -> Result<()> { 1794 log::trace!("suspend fiber: {reason:?}"); 1795 1796 // If we're yielding or waiting on behalf of a guest thread, we'll need to 1797 // pop the call context which manages resource borrows before suspending 1798 // and then push it again once we've resumed. 1799 let task = match &reason { 1800 SuspendReason::Yielding { thread, .. } 1801 | SuspendReason::Waiting { thread, .. } 1802 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), 1803 SuspendReason::NeedWork => None, 1804 }; 1805 1806 let old_guest_thread = if task.is_some() { 1807 self.concurrent_state_mut().current_thread 1808 } else { 1809 CurrentThread::None 1810 }; 1811 1812 // We should not have reached here unless either there's no current 1813 // task, or the current task is permitted to block. In addition, we 1814 // special-case `thread.switch-to` and waiting for a subtask to go from 1815 // `starting` to `started`, both of which we consider non-blocking 1816 // operations despite requiring a suspend. 1817 assert!( 1818 matches!( 1819 reason, 1820 SuspendReason::ExplicitlySuspending { 1821 skip_may_block_check: true, 1822 .. 1823 } | SuspendReason::Waiting { 1824 skip_may_block_check: true, 1825 .. 1826 } | SuspendReason::Yielding { 1827 skip_may_block_check: true, 1828 .. 1829 } 1830 ) || old_guest_thread 1831 .guest() 1832 .map(|thread| self.concurrent_state_mut().may_block(thread.task)) 1833 .unwrap_or(true) 1834 ); 1835 1836 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; 1837 assert!(suspend_reason.is_none()); 1838 *suspend_reason = Some(reason); 1839 1840 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; 1841 1842 if task.is_some() { 1843 self.set_thread(old_guest_thread); 1844 } 1845 1846 Ok(()) 1847 } 1848 1849 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { 1850 let state = self.concurrent_state_mut(); 1851 let caller = state.unwrap_current_guest_thread(); 1852 let old_set = waitable.common(state)?.set; 1853 let set = state.get_mut(caller.task)?.sync_call_set; 1854 waitable.join(state, Some(set))?; 1855 self.suspend(SuspendReason::Waiting { 1856 set, 1857 thread: caller, 1858 skip_may_block_check: false, 1859 })?; 1860 let state = self.concurrent_state_mut(); 1861 waitable.join(state, old_set) 1862 } 1863 } 1864 1865 impl Instance { 1866 /// Get the next pending event for the specified task and (optional) 1867 /// waitable set, along with the waitable handle if applicable. 1868 fn get_event( 1869 self, 1870 store: &mut StoreOpaque, 1871 guest_task: TableId<GuestTask>, 1872 set: Option<TableId<WaitableSet>>, 1873 cancellable: bool, 1874 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> { 1875 let state = store.concurrent_state_mut(); 1876 1877 if let Some(event) = state.get_mut(guest_task)?.event.take() { 1878 log::trace!("deliver event {event:?} to {guest_task:?}"); 1879 1880 if cancellable || !matches!(event, Event::Cancelled) { 1881 return Ok(Some((event, None))); 1882 } else { 1883 state.get_mut(guest_task)?.event = Some(event); 1884 } 1885 } 1886 1887 Ok( 1888 if let Some((set, waitable)) = set 1889 .and_then(|set| { 1890 state 1891 .get_mut(set) 1892 .map(|v| v.ready.pop_first().map(|v| (set, v))) 1893 .transpose() 1894 }) 1895 .transpose()? 1896 { 1897 let common = waitable.common(state)?; 1898 let handle = common.handle.unwrap(); 1899 let event = common.event.take().unwrap(); 1900 1901 log::trace!( 1902 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" 1903 ); 1904 1905 waitable.on_delivery(store, self, event); 1906 1907 Some((event, Some((waitable, handle)))) 1908 } else { 1909 None 1910 }, 1911 ) 1912 } 1913 1914 /// Handle the `CallbackCode` returned from an async-lifted export or its 1915 /// callback. 1916 /// 1917 /// If this returns `Ok(Some(call))`, then `call` should be run immediately 1918 /// using `handle_guest_call`. 1919 fn handle_callback_code( 1920 self, 1921 store: &mut StoreOpaque, 1922 guest_thread: QualifiedThreadId, 1923 runtime_instance: RuntimeComponentInstanceIndex, 1924 code: u32, 1925 ) -> Result<Option<GuestCall>> { 1926 let (code, set) = unpack_callback_code(code); 1927 1928 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); 1929 1930 let state = store.concurrent_state_mut(); 1931 1932 let get_set = |store: &mut StoreOpaque, handle| { 1933 if handle == 0 { 1934 bail!("invalid waitable-set handle"); 1935 } 1936 1937 let set = store 1938 .instance_state(RuntimeInstance { 1939 instance: self.id().instance(), 1940 index: runtime_instance, 1941 }) 1942 .handle_table() 1943 .waitable_set_rep(handle)?; 1944 1945 Ok(TableId::<WaitableSet>::new(set)) 1946 }; 1947 1948 Ok(match code { 1949 callback_code::EXIT => { 1950 log::trace!("implicit thread {guest_thread:?} completed"); 1951 self.cleanup_thread(store, guest_thread, runtime_instance)?; 1952 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1953 if task.threads.is_empty() && !task.returned_or_cancelled() { 1954 bail!(Trap::NoAsyncResult); 1955 } 1956 if let Caller::Guest { .. } = task.caller { 1957 task.exited = true; 1958 task.callback = None; 1959 } 1960 if task.ready_to_delete() { 1961 Waitable::Guest(guest_thread.task).delete_from(store.concurrent_state_mut())?; 1962 } 1963 None 1964 } 1965 callback_code::YIELD => { 1966 let task = state.get_mut(guest_thread.task)?; 1967 // If an `Event::Cancelled` is pending, we'll deliver that; 1968 // otherwise, we'll deliver `Event::None`. Note that 1969 // `GuestTask::event` is only ever set to one of those two 1970 // `Event` variants. 1971 if let Some(event) = task.event { 1972 assert!(matches!(event, Event::None | Event::Cancelled)); 1973 } else { 1974 task.event = Some(Event::None); 1975 } 1976 let call = GuestCall { 1977 thread: guest_thread, 1978 kind: GuestCallKind::DeliverEvent { 1979 instance: self, 1980 set: None, 1981 }, 1982 }; 1983 if state.may_block(guest_thread.task) { 1984 // Push this thread onto the "low priority" queue so it runs 1985 // after any other threads have had a chance to run. 1986 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call)); 1987 None 1988 } else { 1989 // Yielding in a non-blocking context is defined as a no-op 1990 // according to the spec, so we must run this thread 1991 // immediately without allowing any others to run. 1992 Some(call) 1993 } 1994 } 1995 callback_code::WAIT => { 1996 // The task may only return `WAIT` if it was created for a call 1997 // to an async export). Otherwise, we'll trap. 1998 state.check_blocking_for(guest_thread.task)?; 1999 2000 let set = get_set(store, set)?; 2001 let state = store.concurrent_state_mut(); 2002 2003 if state.get_mut(guest_thread.task)?.event.is_some() 2004 || !state.get_mut(set)?.ready.is_empty() 2005 { 2006 // An event is immediately available; deliver it ASAP. 2007 state.push_high_priority(WorkItem::GuestCall( 2008 runtime_instance, 2009 GuestCall { 2010 thread: guest_thread, 2011 kind: GuestCallKind::DeliverEvent { 2012 instance: self, 2013 set: Some(set), 2014 }, 2015 }, 2016 )); 2017 } else { 2018 // No event is immediately available. 2019 // 2020 // We're waiting, so register to be woken up when an event 2021 // is published for this waitable set. 2022 // 2023 // Here we also set `GuestTask::wake_on_cancel` which allows 2024 // `subtask.cancel` to interrupt the wait. 2025 let old = state 2026 .get_mut(guest_thread.thread)? 2027 .wake_on_cancel 2028 .replace(set); 2029 assert!(old.is_none()); 2030 let old = state 2031 .get_mut(set)? 2032 .waiting 2033 .insert(guest_thread, WaitMode::Callback(self)); 2034 assert!(old.is_none()); 2035 } 2036 None 2037 } 2038 _ => bail!("unsupported callback code: {code}"), 2039 }) 2040 } 2041 2042 fn cleanup_thread( 2043 self, 2044 store: &mut StoreOpaque, 2045 guest_thread: QualifiedThreadId, 2046 runtime_instance: RuntimeComponentInstanceIndex, 2047 ) -> Result<()> { 2048 let guest_id = store 2049 .concurrent_state_mut() 2050 .get_mut(guest_thread.thread)? 2051 .instance_rep; 2052 store 2053 .instance_state(RuntimeInstance { 2054 instance: self.id().instance(), 2055 index: runtime_instance, 2056 }) 2057 .thread_handle_table() 2058 .guest_thread_remove(guest_id.unwrap())?; 2059 2060 store.concurrent_state_mut().delete(guest_thread.thread)?; 2061 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2062 task.threads.remove(&guest_thread.thread); 2063 Ok(()) 2064 } 2065 2066 /// Add the specified guest call to the "high priority" work item queue, to 2067 /// be started as soon as backpressure and/or reentrance rules allow. 2068 /// 2069 /// SAFETY: The raw pointer arguments must be valid references to guest 2070 /// functions (with the appropriate signatures) when the closures queued by 2071 /// this function are called. 2072 unsafe fn queue_call<T: 'static>( 2073 self, 2074 mut store: StoreContextMut<T>, 2075 guest_thread: QualifiedThreadId, 2076 callee: SendSyncPtr<VMFuncRef>, 2077 param_count: usize, 2078 result_count: usize, 2079 async_: bool, 2080 callback: Option<SendSyncPtr<VMFuncRef>>, 2081 post_return: Option<SendSyncPtr<VMFuncRef>>, 2082 ) -> Result<()> { 2083 /// Return a closure which will call the specified function in the scope 2084 /// of the specified task. 2085 /// 2086 /// This will use `GuestTask::lower_params` to lower the parameters, but 2087 /// will not lift the result; instead, it returns a 2088 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if 2089 /// any, may be lifted. Note that an async-lifted export will have 2090 /// returned its result using the `task.return` intrinsic (or not 2091 /// returned a result at all, in the case of `task.cancel`), in which 2092 /// case the "result" of this call will either be a callback code or 2093 /// nothing. 2094 /// 2095 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when 2096 /// the returned closure is called. 2097 unsafe fn make_call<T: 'static>( 2098 store: StoreContextMut<T>, 2099 guest_thread: QualifiedThreadId, 2100 callee: SendSyncPtr<VMFuncRef>, 2101 param_count: usize, 2102 result_count: usize, 2103 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]> 2104 + Send 2105 + Sync 2106 + 'static 2107 + use<T> { 2108 let token = StoreToken::new(store); 2109 move |store: &mut dyn VMStore| { 2110 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; 2111 2112 store 2113 .concurrent_state_mut() 2114 .get_mut(guest_thread.thread)? 2115 .state = GuestThreadState::Running; 2116 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2117 let lower = task.lower_params.take().unwrap(); 2118 2119 lower(store, &mut storage[..param_count])?; 2120 2121 let mut store = token.as_context_mut(store); 2122 2123 // SAFETY: Per the contract documented in `make_call's` 2124 // documentation, `callee` must be a valid pointer. 2125 unsafe { 2126 crate::Func::call_unchecked_raw( 2127 &mut store, 2128 callee.as_non_null(), 2129 NonNull::new( 2130 &mut storage[..param_count.max(result_count)] 2131 as *mut [MaybeUninit<ValRaw>] as _, 2132 ) 2133 .unwrap(), 2134 )?; 2135 } 2136 2137 Ok(storage) 2138 } 2139 } 2140 2141 // SAFETY: Per the contract described in this function documentation, 2142 // the `callee` pointer which `call` closes over must be valid when 2143 // called by the closure we queue below. 2144 let call = unsafe { 2145 make_call( 2146 store.as_context_mut(), 2147 guest_thread, 2148 callee, 2149 param_count, 2150 result_count, 2151 ) 2152 }; 2153 2154 let callee_instance = store 2155 .0 2156 .concurrent_state_mut() 2157 .get_mut(guest_thread.task)? 2158 .instance; 2159 2160 let fun = if callback.is_some() { 2161 assert!(async_); 2162 2163 Box::new(move |store: &mut dyn VMStore| { 2164 self.add_guest_thread_to_instance_table( 2165 guest_thread.thread, 2166 store, 2167 callee_instance.index, 2168 )?; 2169 let old_thread = store.set_thread(guest_thread); 2170 log::trace!( 2171 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" 2172 ); 2173 2174 store.enter_instance(callee_instance); 2175 2176 // SAFETY: See the documentation for `make_call` to review the 2177 // contract we must uphold for `call` here. 2178 // 2179 // Per the contract described in the `queue_call` 2180 // documentation, the `callee` pointer which `call` closes 2181 // over must be valid. 2182 let storage = call(store)?; 2183 2184 store.exit_instance(callee_instance)?; 2185 2186 store.set_thread(old_thread); 2187 let state = store.concurrent_state_mut(); 2188 old_thread 2189 .guest() 2190 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 2191 log::trace!("stackless call: restored {old_thread:?} as current thread"); 2192 2193 // SAFETY: `wasmparser` will have validated that the callback 2194 // function returns a `i32` result. 2195 let code = unsafe { storage[0].assume_init() }.get_i32() as u32; 2196 2197 self.handle_callback_code(store, guest_thread, callee_instance.index, code) 2198 }) 2199 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync> 2200 } else { 2201 let token = StoreToken::new(store.as_context_mut()); 2202 Box::new(move |store: &mut dyn VMStore| { 2203 self.add_guest_thread_to_instance_table( 2204 guest_thread.thread, 2205 store, 2206 callee_instance.index, 2207 )?; 2208 let old_thread = store.set_thread(guest_thread); 2209 log::trace!( 2210 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", 2211 ); 2212 let flags = self.id().get(store).instance_flags(callee_instance.index); 2213 2214 // Unless this is a callback-less (i.e. stackful) 2215 // async-lifted export, we need to record that the instance 2216 // cannot be entered until the call returns. 2217 if !async_ { 2218 store.enter_instance(callee_instance); 2219 } 2220 2221 // SAFETY: See the documentation for `make_call` to review the 2222 // contract we must uphold for `call` here. 2223 // 2224 // Per the contract described in the `queue_call` 2225 // documentation, the `callee` pointer which `call` closes 2226 // over must be valid. 2227 let storage = call(store)?; 2228 2229 if async_ { 2230 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2231 if task.threads.len() == 1 && !task.returned_or_cancelled() { 2232 bail!(Trap::NoAsyncResult); 2233 } 2234 } else { 2235 // This is a sync-lifted export, so now is when we lift the 2236 // result, optionally call the post-return function, if any, 2237 // and finally notify any current or future waiters that the 2238 // subtask has returned. 2239 2240 let lift = { 2241 store.exit_instance(callee_instance)?; 2242 2243 let state = store.concurrent_state_mut(); 2244 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2245 2246 state 2247 .get_mut(guest_thread.task)? 2248 .lift_result 2249 .take() 2250 .unwrap() 2251 }; 2252 2253 // SAFETY: `result_count` represents the number of core Wasm 2254 // results returned, per `wasmparser`. 2255 let result = (lift.lift)(store, unsafe { 2256 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>( 2257 &storage[..result_count], 2258 ) 2259 })?; 2260 2261 let post_return_arg = match result_count { 2262 0 => ValRaw::i32(0), 2263 // SAFETY: `result_count` represents the number of 2264 // core Wasm results returned, per `wasmparser`. 2265 1 => unsafe { storage[0].assume_init() }, 2266 _ => unreachable!(), 2267 }; 2268 2269 unsafe { 2270 call_post_return( 2271 token.as_context_mut(store), 2272 post_return.map(|v| v.as_non_null()), 2273 post_return_arg, 2274 flags, 2275 )?; 2276 } 2277 2278 self.task_complete(store, guest_thread.task, result, Status::Returned)?; 2279 } 2280 2281 // This is a callback-less call, so the implicit thread has now completed 2282 self.cleanup_thread(store, guest_thread, callee_instance.index)?; 2283 2284 store.set_thread(old_thread); 2285 2286 let state = store.concurrent_state_mut(); 2287 let task = state.get_mut(guest_thread.task)?; 2288 2289 match &task.caller { 2290 Caller::Host { .. } => { 2291 if task.ready_to_delete() { 2292 Waitable::Guest(guest_thread.task).delete_from(state)?; 2293 } 2294 } 2295 Caller::Guest { .. } => { 2296 task.exited = true; 2297 } 2298 } 2299 2300 Ok(None) 2301 }) 2302 }; 2303 2304 store 2305 .0 2306 .concurrent_state_mut() 2307 .push_high_priority(WorkItem::GuestCall( 2308 callee_instance.index, 2309 GuestCall { 2310 thread: guest_thread, 2311 kind: GuestCallKind::StartImplicit(fun), 2312 }, 2313 )); 2314 2315 Ok(()) 2316 } 2317 2318 /// Prepare (but do not start) a guest->guest call. 2319 /// 2320 /// This is called from fused adapter code generated in 2321 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` 2322 /// are synthesized Wasm functions which move the parameters from the caller 2323 /// to the callee and the result from the callee to the caller, 2324 /// respectively. The adapter will call `Self::start_call` immediately 2325 /// after calling this function. 2326 /// 2327 /// SAFETY: All the pointer arguments must be valid pointers to guest 2328 /// entities (and with the expected signatures for the function references 2329 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). 2330 unsafe fn prepare_call<T: 'static>( 2331 self, 2332 mut store: StoreContextMut<T>, 2333 start: *mut VMFuncRef, 2334 return_: *mut VMFuncRef, 2335 caller_instance: RuntimeComponentInstanceIndex, 2336 callee_instance: RuntimeComponentInstanceIndex, 2337 task_return_type: TypeTupleIndex, 2338 callee_async: bool, 2339 memory: *mut VMMemoryDefinition, 2340 string_encoding: u8, 2341 caller_info: CallerInfo, 2342 ) -> Result<()> { 2343 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { 2344 // A task may only call an async-typed function via a sync lower if 2345 // it was created by a call to an async export. Otherwise, we'll 2346 // trap. 2347 store.0.check_blocking()?; 2348 } 2349 2350 enum ResultInfo { 2351 Heap { results: u32 }, 2352 Stack { result_count: u32 }, 2353 } 2354 2355 let result_info = match &caller_info { 2356 CallerInfo::Async { 2357 has_result: true, 2358 params, 2359 } => ResultInfo::Heap { 2360 results: params.last().unwrap().get_u32(), 2361 }, 2362 CallerInfo::Async { 2363 has_result: false, .. 2364 } => ResultInfo::Stack { result_count: 0 }, 2365 CallerInfo::Sync { 2366 result_count, 2367 params, 2368 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap { 2369 results: params.last().unwrap().get_u32(), 2370 }, 2371 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { 2372 result_count: *result_count, 2373 }, 2374 }; 2375 2376 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); 2377 2378 // Create a new guest task for the call, closing over the `start` and 2379 // `return_` functions to lift the parameters and lower the result, 2380 // respectively. 2381 let start = SendSyncPtr::new(NonNull::new(start).unwrap()); 2382 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap()); 2383 let token = StoreToken::new(store.as_context_mut()); 2384 let state = store.0.concurrent_state_mut(); 2385 let old_thread = state.unwrap_current_guest_thread(); 2386 2387 assert_eq!( 2388 state.get_mut(old_thread.task)?.instance, 2389 RuntimeInstance { 2390 instance: self.id().instance(), 2391 index: caller_instance, 2392 } 2393 ); 2394 2395 let new_task = GuestTask::new( 2396 state, 2397 Box::new(move |store, dst| { 2398 let mut store = token.as_context_mut(store); 2399 assert!(dst.len() <= MAX_FLAT_PARAMS); 2400 // The `+ 1` here accounts for the return pointer, if any: 2401 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; 2402 let count = match caller_info { 2403 // Async callers, if they have a result, use the last 2404 // parameter as a return pointer so chop that off if 2405 // relevant here. 2406 CallerInfo::Async { params, has_result } => { 2407 let params = ¶ms[..params.len() - usize::from(has_result)]; 2408 for (param, src) in params.iter().zip(&mut src) { 2409 src.write(*param); 2410 } 2411 params.len() 2412 } 2413 2414 // Sync callers forward everything directly. 2415 CallerInfo::Sync { params, .. } => { 2416 for (param, src) in params.iter().zip(&mut src) { 2417 src.write(*param); 2418 } 2419 params.len() 2420 } 2421 }; 2422 // SAFETY: `start` is a valid `*mut VMFuncRef` from 2423 // `wasmtime-cranelift`-generated fused adapter code. Based on 2424 // how it was constructed (see 2425 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` 2426 // for details) we know it takes count parameters and returns 2427 // `dst.len()` results. 2428 unsafe { 2429 crate::Func::call_unchecked_raw( 2430 &mut store, 2431 start.as_non_null(), 2432 NonNull::new( 2433 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _, 2434 ) 2435 .unwrap(), 2436 )?; 2437 } 2438 dst.copy_from_slice(&src[..dst.len()]); 2439 let state = store.0.concurrent_state_mut(); 2440 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event( 2441 state, 2442 Some(Event::Subtask { 2443 status: Status::Started, 2444 }), 2445 )?; 2446 Ok(()) 2447 }), 2448 LiftResult { 2449 lift: Box::new(move |store, src| { 2450 // SAFETY: See comment in closure passed as `lower_params` 2451 // parameter above. 2452 let mut store = token.as_context_mut(store); 2453 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? 2454 if let ResultInfo::Heap { results } = &result_info { 2455 my_src.push(ValRaw::u32(*results)); 2456 } 2457 // SAFETY: `return_` is a valid `*mut VMFuncRef` from 2458 // `wasmtime-cranelift`-generated fused adapter code. Based 2459 // on how it was constructed (see 2460 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` 2461 // for details) we know it takes `src.len()` parameters and 2462 // returns up to 1 result. 2463 unsafe { 2464 crate::Func::call_unchecked_raw( 2465 &mut store, 2466 return_.as_non_null(), 2467 my_src.as_mut_slice().into(), 2468 )?; 2469 } 2470 let state = store.0.concurrent_state_mut(); 2471 let thread = state.unwrap_current_guest_thread(); 2472 if sync_caller { 2473 state.get_mut(thread.task)?.sync_result = SyncResult::Produced( 2474 if let ResultInfo::Stack { result_count } = &result_info { 2475 match result_count { 2476 0 => None, 2477 1 => Some(my_src[0]), 2478 _ => unreachable!(), 2479 } 2480 } else { 2481 None 2482 }, 2483 ); 2484 } 2485 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>) 2486 }), 2487 ty: task_return_type, 2488 memory: NonNull::new(memory).map(SendSyncPtr::new), 2489 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(), 2490 }, 2491 Caller::Guest { thread: old_thread }, 2492 None, 2493 RuntimeInstance { 2494 instance: self.id().instance(), 2495 index: callee_instance, 2496 }, 2497 callee_async, 2498 )?; 2499 2500 let guest_task = state.push(new_task)?; 2501 let new_thread = GuestThread::new_implicit(guest_task); 2502 let guest_thread = state.push(new_thread)?; 2503 state.get_mut(guest_task)?.threads.insert(guest_thread); 2504 2505 // Make the new thread the current one so that `Self::start_call` knows 2506 // which one to start. 2507 store.0.set_thread(QualifiedThreadId { 2508 task: guest_task, 2509 thread: guest_thread, 2510 }); 2511 log::trace!( 2512 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" 2513 ); 2514 2515 Ok(()) 2516 } 2517 2518 /// Call the specified callback function for an async-lifted export. 2519 /// 2520 /// SAFETY: `function` must be a valid reference to a guest function of the 2521 /// correct signature for a callback. 2522 unsafe fn call_callback<T>( 2523 self, 2524 mut store: StoreContextMut<T>, 2525 function: SendSyncPtr<VMFuncRef>, 2526 event: Event, 2527 handle: u32, 2528 ) -> Result<u32> { 2529 let (ordinal, result) = event.parts(); 2530 let params = &mut [ 2531 ValRaw::u32(ordinal), 2532 ValRaw::u32(handle), 2533 ValRaw::u32(result), 2534 ]; 2535 // SAFETY: `func` is a valid `*mut VMFuncRef` from either 2536 // `wasmtime-cranelift`-generated fused adapter code or 2537 // `component::Options`. Per `wasmparser` callback signature 2538 // validation, we know it takes three parameters and returns one. 2539 unsafe { 2540 crate::Func::call_unchecked_raw( 2541 &mut store, 2542 function.as_non_null(), 2543 params.as_mut_slice().into(), 2544 )?; 2545 } 2546 Ok(params[0].get_u32()) 2547 } 2548 2549 /// Start a guest->guest call previously prepared using 2550 /// `Self::prepare_call`. 2551 /// 2552 /// This is called from fused adapter code generated in 2553 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call 2554 /// this function immediately after calling `Self::prepare_call`. 2555 /// 2556 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest 2557 /// functions with the appropriate signatures for the current guest task. 2558 /// If this is a call to an async-lowered import, the actual call may be 2559 /// deferred and run after this function returns, in which case the pointer 2560 /// arguments must also be valid when the call happens. 2561 unsafe fn start_call<T: 'static>( 2562 self, 2563 mut store: StoreContextMut<T>, 2564 callback: *mut VMFuncRef, 2565 post_return: *mut VMFuncRef, 2566 callee: *mut VMFuncRef, 2567 param_count: u32, 2568 result_count: u32, 2569 flags: u32, 2570 storage: Option<&mut [MaybeUninit<ValRaw>]>, 2571 ) -> Result<u32> { 2572 let token = StoreToken::new(store.as_context_mut()); 2573 let async_caller = storage.is_none(); 2574 let state = store.0.concurrent_state_mut(); 2575 let guest_thread = state.unwrap_current_guest_thread(); 2576 let callee_async = state.get_mut(guest_thread.task)?.async_function; 2577 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap()); 2578 let param_count = usize::try_from(param_count).unwrap(); 2579 assert!(param_count <= MAX_FLAT_PARAMS); 2580 let result_count = usize::try_from(result_count).unwrap(); 2581 assert!(result_count <= MAX_FLAT_RESULTS); 2582 2583 let task = state.get_mut(guest_thread.task)?; 2584 if !callback.is_null() { 2585 // We're calling an async-lifted export with a callback, so store 2586 // the callback and related context as part of the task so we can 2587 // call it later when needed. 2588 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap()); 2589 task.callback = Some(Box::new(move |store, event, handle| { 2590 let store = token.as_context_mut(store); 2591 unsafe { self.call_callback::<T>(store, callback, event, handle) } 2592 })); 2593 } 2594 2595 let Caller::Guest { thread: caller } = &task.caller else { 2596 // As of this writing, `start_call` is only used for guest->guest 2597 // calls. 2598 unreachable!() 2599 }; 2600 let caller = *caller; 2601 let caller_instance = state.get_mut(caller.task)?.instance; 2602 2603 // Queue the call as a "high priority" work item. 2604 unsafe { 2605 self.queue_call( 2606 store.as_context_mut(), 2607 guest_thread, 2608 callee, 2609 param_count, 2610 result_count, 2611 (flags & START_FLAG_ASYNC_CALLEE) != 0, 2612 NonNull::new(callback).map(SendSyncPtr::new), 2613 NonNull::new(post_return).map(SendSyncPtr::new), 2614 )?; 2615 } 2616 2617 let state = store.0.concurrent_state_mut(); 2618 2619 // Use the caller's `GuestTask::sync_call_set` to register interest in 2620 // the subtask... 2621 let guest_waitable = Waitable::Guest(guest_thread.task); 2622 let old_set = guest_waitable.common(state)?.set; 2623 let set = state.get_mut(caller.task)?.sync_call_set; 2624 guest_waitable.join(state, Some(set))?; 2625 2626 // ... and suspend this fiber temporarily while we wait for it to start. 2627 // 2628 // Note that we _could_ call the callee directly using the current fiber 2629 // rather than suspend this one, but that would make reasoning about the 2630 // event loop more complicated and is probably only worth doing if 2631 // there's a measurable performance benefit. In addition, it would mean 2632 // blocking the caller if the callee calls a blocking sync-lowered 2633 // import, and as of this writing the spec says we must not do that. 2634 // 2635 // Alternatively, the fused adapter code could be modified to call the 2636 // callee directly without calling a host-provided intrinsic at all (in 2637 // which case it would need to do its own, inline backpressure checks, 2638 // etc.). Again, we'd want to see a measurable performance benefit 2639 // before committing to such an optimization. And again, we'd need to 2640 // update the spec to allow that. 2641 let (status, waitable) = loop { 2642 store.0.suspend(SuspendReason::Waiting { 2643 set, 2644 thread: caller, 2645 // Normally, `StoreOpaque::suspend` would assert it's being 2646 // called from a context where blocking is allowed. However, if 2647 // `async_caller` is `true`, we'll only "block" long enough for 2648 // the callee to start, i.e. we won't repeat this loop, so we 2649 // tell `suspend` it's okay even if we're not allowed to block. 2650 // Alternatively, if the callee is not an async function, then 2651 // we know it won't block anyway. 2652 skip_may_block_check: async_caller || !callee_async, 2653 })?; 2654 2655 let state = store.0.concurrent_state_mut(); 2656 2657 log::trace!("taking event for {:?}", guest_thread.task); 2658 let event = guest_waitable.take_event(state)?; 2659 let Some(Event::Subtask { status }) = event else { 2660 unreachable!(); 2661 }; 2662 2663 log::trace!("status {status:?} for {:?}", guest_thread.task); 2664 2665 if status == Status::Returned { 2666 // It returned, so we can stop waiting. 2667 break (status, None); 2668 } else if async_caller { 2669 // It hasn't returned yet, but the caller is calling via an 2670 // async-lowered import, so we generate a handle for the task 2671 // waitable and return the status. 2672 let handle = store 2673 .0 2674 .instance_state(caller_instance) 2675 .handle_table() 2676 .subtask_insert_guest(guest_thread.task.rep())?; 2677 store 2678 .0 2679 .concurrent_state_mut() 2680 .get_mut(guest_thread.task)? 2681 .common 2682 .handle = Some(handle); 2683 break (status, Some(handle)); 2684 } else { 2685 // The callee hasn't returned yet, and the caller is calling via 2686 // a sync-lowered import, so we loop and keep waiting until the 2687 // callee returns. 2688 } 2689 }; 2690 2691 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?; 2692 2693 // Reset the current thread to point to the caller as it resumes control. 2694 store.0.set_thread(caller); 2695 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running; 2696 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); 2697 2698 if let Some(storage) = storage { 2699 // The caller used a sync-lowered import to call an async-lifted 2700 // export, in which case the result, if any, has been stashed in 2701 // `GuestTask::sync_result`. 2702 let state = store.0.concurrent_state_mut(); 2703 let task = state.get_mut(guest_thread.task)?; 2704 if let Some(result) = task.sync_result.take() { 2705 if let Some(result) = result { 2706 storage[0] = MaybeUninit::new(result); 2707 } 2708 2709 if task.exited && task.ready_to_delete() { 2710 Waitable::Guest(guest_thread.task).delete_from(state)?; 2711 } 2712 } 2713 } 2714 2715 Ok(status.pack(waitable)) 2716 } 2717 2718 /// Poll the specified future once on behalf of a guest->host call using an 2719 /// async-lowered import. 2720 /// 2721 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns 2722 /// `Pending`, add it to the set of futures to be polled as part of this 2723 /// instance's event loop until it completes, and then return 2724 /// `Ok(Some(handle))` where `handle` is the waitable handle to return. 2725 /// 2726 /// Whether the future returns `Ready` immediately or later, the `lower` 2727 /// function will be used to lower the result, if any, into the guest caller's 2728 /// stack and linear memory. The `lower` function is invoked with `None` if 2729 /// the future is cancelled. 2730 pub(crate) fn first_poll<T: 'static, R: Send + 'static>( 2731 self, 2732 mut store: StoreContextMut<'_, T>, 2733 future: impl Future<Output = Result<R>> + Send + 'static, 2734 lower: impl FnOnce(StoreContextMut<T>, Option<R>) -> Result<()> + Send + 'static, 2735 ) -> Result<Option<u32>> { 2736 let token = StoreToken::new(store.as_context_mut()); 2737 let state = store.0.concurrent_state_mut(); 2738 let task = state.unwrap_current_host_thread(); 2739 2740 // Create an abortable future which hooks calls to poll and manages call 2741 // context state for the future. 2742 let (join_handle, future) = JoinHandle::run(future); 2743 { 2744 let state = &mut state.get_mut(task)?.state; 2745 assert!(matches!(state, HostTaskState::CalleeStarted)); 2746 *state = HostTaskState::CalleeRunning(join_handle); 2747 } 2748 2749 let mut future = Box::pin(future); 2750 2751 // Finally, poll the future. We can use a dummy `Waker` here because 2752 // we'll add the future to `ConcurrentState::futures` and poll it 2753 // automatically from the event loop if it doesn't complete immediately 2754 // here. 2755 let poll = tls::set(store.0, || { 2756 future 2757 .as_mut() 2758 .poll(&mut Context::from_waker(&Waker::noop())) 2759 }); 2760 2761 match poll { 2762 // It finished immediately; lower the result and delete the task. 2763 Poll::Ready(Some(result)) => { 2764 lower(store.as_context_mut(), Some(result?))?; 2765 return Ok(None); 2766 } 2767 2768 // Shouldn't be possible since the future isn't cancelled via the 2769 // `join_handle`. 2770 Poll::Ready(None) => unreachable!(), 2771 2772 // Future isn't ready yet, so fall through. 2773 Poll::Pending => {} 2774 } 2775 2776 // It hasn't finished yet; add the future to 2777 // `ConcurrentState::futures` so it will be polled by the event 2778 // loop and allocate a waitable handle to return to the guest. 2779 2780 // Wrap the future in a closure responsible for lowering the result into 2781 // the guest's stack and memory, as well as notifying any waiters that 2782 // the task returned. 2783 let future = Box::pin(async move { 2784 let result = match future.await { 2785 Some(result) => Some(result?), 2786 None => None, 2787 }; 2788 let on_complete = move |store: &mut dyn VMStore| { 2789 // Restore the `current_thread` to be the host so `lower` knows 2790 // how to manipulate borrows and knows which scope of borrows 2791 // to check. 2792 let mut store = token.as_context_mut(store); 2793 let state = store.0.concurrent_state_mut(); 2794 assert!(state.current_thread.is_none()); 2795 store.0.set_thread(task); 2796 2797 let status = if result.is_some() { 2798 Status::Returned 2799 } else { 2800 Status::ReturnCancelled 2801 }; 2802 2803 lower(store.as_context_mut(), result)?; 2804 let state = store.0.concurrent_state_mut(); 2805 state.get_mut(task)?.state = HostTaskState::CalleeDone; 2806 Waitable::Host(task).set_event(state, Some(Event::Subtask { status }))?; 2807 2808 // Go back to "no current thread" at the end. 2809 store.0.set_thread(CurrentThread::None); 2810 Ok(()) 2811 }; 2812 2813 // Here we schedule a task to run on a worker fiber to do the 2814 // lowering since it may involve a call to the guest's realloc 2815 // function. This is necessary because calling the guest while 2816 // there are host embedder frames on the stack is unsound. 2817 tls::get(move |store| { 2818 store 2819 .concurrent_state_mut() 2820 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new( 2821 on_complete, 2822 )))); 2823 Ok(()) 2824 }) 2825 }); 2826 2827 // Make this task visible to the guest and then record what it 2828 // was made visible as. 2829 let state = store.0.concurrent_state_mut(); 2830 state.push_future(future); 2831 let caller = state.get_mut(task)?.caller; 2832 let instance = state.get_mut(caller.task)?.instance; 2833 let handle = store 2834 .0 2835 .instance_state(instance) 2836 .handle_table() 2837 .subtask_insert_host(task.rep())?; 2838 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); 2839 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}"); 2840 2841 // Restore the currently running thread to this host task's 2842 // caller. Note that the host task isn't deallocated as it's 2843 // within the store and will get deallocated later. 2844 store.0.set_thread(caller); 2845 Ok(Some(handle)) 2846 } 2847 2848 /// Implements the `task.return` intrinsic, lifting the result for the 2849 /// current guest task. 2850 pub(crate) fn task_return( 2851 self, 2852 store: &mut dyn VMStore, 2853 ty: TypeTupleIndex, 2854 options: OptionsIndex, 2855 storage: &[ValRaw], 2856 ) -> Result<()> { 2857 let state = store.concurrent_state_mut(); 2858 let guest_thread = state.unwrap_current_guest_thread(); 2859 let lift = state 2860 .get_mut(guest_thread.task)? 2861 .lift_result 2862 .take() 2863 .ok_or_else(|| { 2864 format_err!("`task.return` or `task.cancel` called more than once for current task") 2865 })?; 2866 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2867 2868 let CanonicalOptions { 2869 string_encoding, 2870 data_model, 2871 .. 2872 } = &self.id().get(store).component().env_component().options[options]; 2873 2874 let invalid = ty != lift.ty 2875 || string_encoding != &lift.string_encoding 2876 || match data_model { 2877 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { 2878 Some(memory) => { 2879 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); 2880 let actual = self.id().get(store).runtime_memory(memory); 2881 expected != actual.as_ptr() 2882 } 2883 // Memory not specified, meaning it didn't need to be 2884 // specified per validation, so not invalid. 2885 None => false, 2886 }, 2887 // Always invalid as this isn't supported. 2888 CanonicalOptionsDataModel::Gc { .. } => true, 2889 }; 2890 2891 if invalid { 2892 bail!("invalid `task.return` signature and/or options for current task"); 2893 } 2894 2895 log::trace!("task.return for {guest_thread:?}"); 2896 2897 let result = (lift.lift)(store, storage)?; 2898 self.task_complete(store, guest_thread.task, result, Status::Returned) 2899 } 2900 2901 /// Implements the `task.cancel` intrinsic. 2902 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> { 2903 let state = store.concurrent_state_mut(); 2904 let guest_thread = state.unwrap_current_guest_thread(); 2905 let task = state.get_mut(guest_thread.task)?; 2906 if !task.cancel_sent { 2907 bail!("`task.cancel` called by task which has not been cancelled") 2908 } 2909 _ = task.lift_result.take().ok_or_else(|| { 2910 format_err!("`task.return` or `task.cancel` called more than once for current task") 2911 })?; 2912 2913 assert!(task.result.is_none()); 2914 2915 log::trace!("task.cancel for {guest_thread:?}"); 2916 2917 self.task_complete( 2918 store, 2919 guest_thread.task, 2920 Box::new(DummyResult), 2921 Status::ReturnCancelled, 2922 ) 2923 } 2924 2925 /// Complete the specified guest task (i.e. indicate that it has either 2926 /// returned a (possibly empty) result or cancelled itself). 2927 /// 2928 /// This will return any resource borrows and notify any current or future 2929 /// waiters that the task has completed. 2930 fn task_complete( 2931 self, 2932 store: &mut StoreOpaque, 2933 guest_task: TableId<GuestTask>, 2934 result: Box<dyn Any + Send + Sync>, 2935 status: Status, 2936 ) -> Result<()> { 2937 store 2938 .component_resource_tables(Some(self)) 2939 .validate_scope_exit()?; 2940 2941 let state = store.concurrent_state_mut(); 2942 let task = state.get_mut(guest_task)?; 2943 2944 if let Caller::Host { tx, .. } = &mut task.caller { 2945 if let Some(tx) = tx.take() { 2946 _ = tx.send(result); 2947 } 2948 } else { 2949 task.result = Some(result); 2950 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; 2951 } 2952 2953 Ok(()) 2954 } 2955 2956 /// Implements the `waitable-set.new` intrinsic. 2957 pub(crate) fn waitable_set_new( 2958 self, 2959 store: &mut StoreOpaque, 2960 caller_instance: RuntimeComponentInstanceIndex, 2961 ) -> Result<u32> { 2962 let set = store.concurrent_state_mut().push(WaitableSet::default())?; 2963 let handle = store 2964 .instance_state(RuntimeInstance { 2965 instance: self.id().instance(), 2966 index: caller_instance, 2967 }) 2968 .handle_table() 2969 .waitable_set_insert(set.rep())?; 2970 log::trace!("new waitable set {set:?} (handle {handle})"); 2971 Ok(handle) 2972 } 2973 2974 /// Implements the `waitable-set.drop` intrinsic. 2975 pub(crate) fn waitable_set_drop( 2976 self, 2977 store: &mut StoreOpaque, 2978 caller_instance: RuntimeComponentInstanceIndex, 2979 set: u32, 2980 ) -> Result<()> { 2981 let rep = store 2982 .instance_state(RuntimeInstance { 2983 instance: self.id().instance(), 2984 index: caller_instance, 2985 }) 2986 .handle_table() 2987 .waitable_set_remove(set)?; 2988 2989 log::trace!("drop waitable set {rep} (handle {set})"); 2990 2991 let set = store 2992 .concurrent_state_mut() 2993 .delete(TableId::<WaitableSet>::new(rep))?; 2994 2995 if !set.waiting.is_empty() { 2996 bail!("cannot drop waitable set with waiters"); 2997 } 2998 2999 Ok(()) 3000 } 3001 3002 /// Implements the `waitable.join` intrinsic. 3003 pub(crate) fn waitable_join( 3004 self, 3005 store: &mut StoreOpaque, 3006 caller_instance: RuntimeComponentInstanceIndex, 3007 waitable_handle: u32, 3008 set_handle: u32, 3009 ) -> Result<()> { 3010 let mut instance = self.id().get_mut(store); 3011 let waitable = 3012 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; 3013 3014 let set = if set_handle == 0 { 3015 None 3016 } else { 3017 let set = instance.instance_states().0[caller_instance] 3018 .handle_table() 3019 .waitable_set_rep(set_handle)?; 3020 3021 Some(TableId::<WaitableSet>::new(set)) 3022 }; 3023 3024 log::trace!( 3025 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", 3026 ); 3027 3028 waitable.join(store.concurrent_state_mut(), set) 3029 } 3030 3031 /// Implements the `subtask.drop` intrinsic. 3032 pub(crate) fn subtask_drop( 3033 self, 3034 store: &mut StoreOpaque, 3035 caller_instance: RuntimeComponentInstanceIndex, 3036 task_id: u32, 3037 ) -> Result<()> { 3038 self.waitable_join(store, caller_instance, task_id, 0)?; 3039 3040 let (rep, is_host) = store 3041 .instance_state(RuntimeInstance { 3042 instance: self.id().instance(), 3043 index: caller_instance, 3044 }) 3045 .handle_table() 3046 .subtask_remove(task_id)?; 3047 3048 let concurrent_state = store.concurrent_state_mut(); 3049 let (waitable, expected_caller, delete) = if is_host { 3050 let id = TableId::<HostTask>::new(rep); 3051 let task = concurrent_state.get_mut(id)?; 3052 match &task.state { 3053 HostTaskState::CalleeRunning(_) => { 3054 bail!("cannot drop a subtask which has not yet resolved"); 3055 } 3056 HostTaskState::CalleeDone => {} 3057 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(), 3058 } 3059 (Waitable::Host(id), task.caller, true) 3060 } else { 3061 let id = TableId::<GuestTask>::new(rep); 3062 let task = concurrent_state.get_mut(id)?; 3063 if task.lift_result.is_some() { 3064 bail!("cannot drop a subtask which has not yet resolved"); 3065 } 3066 if let Caller::Guest { thread } = task.caller { 3067 ( 3068 Waitable::Guest(id), 3069 thread, 3070 concurrent_state.get_mut(id)?.exited, 3071 ) 3072 } else { 3073 unreachable!() 3074 } 3075 }; 3076 3077 waitable.common(concurrent_state)?.handle = None; 3078 3079 if waitable.take_event(concurrent_state)?.is_some() { 3080 bail!("cannot drop a subtask with an undelivered event"); 3081 } 3082 3083 if delete { 3084 waitable.delete_from(concurrent_state)?; 3085 } 3086 3087 // Since waitables can neither be passed between instances nor forged, 3088 // this should never fail unless there's a bug in Wasmtime, but we check 3089 // here to be sure: 3090 assert_eq!( 3091 expected_caller, 3092 concurrent_state.unwrap_current_guest_thread(), 3093 ); 3094 log::trace!("subtask_drop {waitable:?} (handle {task_id})"); 3095 Ok(()) 3096 } 3097 3098 /// Implements the `waitable-set.wait` intrinsic. 3099 pub(crate) fn waitable_set_wait( 3100 self, 3101 store: &mut StoreOpaque, 3102 options: OptionsIndex, 3103 set: u32, 3104 payload: u32, 3105 ) -> Result<u32> { 3106 if !self.options(store, options).async_ { 3107 // The caller may only call `waitable-set.wait` from an async task 3108 // (i.e. a task created via a call to an async export). 3109 // Otherwise, we'll trap. 3110 store.check_blocking()?; 3111 } 3112 3113 let &CanonicalOptions { 3114 cancellable, 3115 instance: caller_instance, 3116 .. 3117 } = &self.id().get(store).component().env_component().options[options]; 3118 let rep = store 3119 .instance_state(RuntimeInstance { 3120 instance: self.id().instance(), 3121 index: caller_instance, 3122 }) 3123 .handle_table() 3124 .waitable_set_rep(set)?; 3125 3126 self.waitable_check( 3127 store, 3128 cancellable, 3129 WaitableCheck::Wait, 3130 WaitableCheckParams { 3131 set: TableId::new(rep), 3132 options, 3133 payload, 3134 }, 3135 ) 3136 } 3137 3138 /// Implements the `waitable-set.poll` intrinsic. 3139 pub(crate) fn waitable_set_poll( 3140 self, 3141 store: &mut StoreOpaque, 3142 options: OptionsIndex, 3143 set: u32, 3144 payload: u32, 3145 ) -> Result<u32> { 3146 let &CanonicalOptions { 3147 cancellable, 3148 instance: caller_instance, 3149 .. 3150 } = &self.id().get(store).component().env_component().options[options]; 3151 let rep = store 3152 .instance_state(RuntimeInstance { 3153 instance: self.id().instance(), 3154 index: caller_instance, 3155 }) 3156 .handle_table() 3157 .waitable_set_rep(set)?; 3158 3159 self.waitable_check( 3160 store, 3161 cancellable, 3162 WaitableCheck::Poll, 3163 WaitableCheckParams { 3164 set: TableId::new(rep), 3165 options, 3166 payload, 3167 }, 3168 ) 3169 } 3170 3171 /// Implements the `thread.index` intrinsic. 3172 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> { 3173 let thread_id = store 3174 .concurrent_state_mut() 3175 .unwrap_current_guest_thread() 3176 .thread; 3177 // The unwrap is safe because `instance_rep` must be `Some` by this point 3178 Ok(store 3179 .concurrent_state_mut() 3180 .get_mut(thread_id)? 3181 .instance_rep 3182 .unwrap()) 3183 } 3184 3185 /// Implements the `thread.new-indirect` intrinsic. 3186 pub(crate) fn thread_new_indirect<T: 'static>( 3187 self, 3188 mut store: StoreContextMut<T>, 3189 runtime_instance: RuntimeComponentInstanceIndex, 3190 _func_ty_idx: TypeFuncIndex, // currently unused 3191 start_func_table_idx: RuntimeTableIndex, 3192 start_func_idx: u32, 3193 context: i32, 3194 ) -> Result<u32> { 3195 log::trace!("creating new thread"); 3196 3197 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); 3198 let (instance, registry) = self.id().get_mut_and_registry(store.0); 3199 let callee = instance 3200 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? 3201 .ok_or_else(|| { 3202 format_err!("the start function index points to an uninitialized function") 3203 })?; 3204 if callee.type_index(store.0) != start_func_ty.type_index() { 3205 bail!( 3206 "start function does not match expected type (currently only `(i32) -> ()` is supported)" 3207 ); 3208 } 3209 3210 let token = StoreToken::new(store.as_context_mut()); 3211 let start_func = Box::new( 3212 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { 3213 let old_thread = store.set_thread(guest_thread); 3214 log::trace!( 3215 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" 3216 ); 3217 3218 let mut store = token.as_context_mut(store); 3219 let mut params = [ValRaw::i32(context)]; 3220 // Use call_unchecked rather than call or call_async, as we don't want to run the function 3221 // on a separate fiber if we're running in an async store. 3222 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; 3223 3224 self.cleanup_thread(store.0, guest_thread, runtime_instance)?; 3225 log::trace!("explicit thread {guest_thread:?} completed"); 3226 let state = store.0.concurrent_state_mut(); 3227 let task = state.get_mut(guest_thread.task)?; 3228 if task.threads.is_empty() && !task.returned_or_cancelled() { 3229 bail!(Trap::NoAsyncResult); 3230 } 3231 store.0.set_thread(old_thread); 3232 let state = store.0.concurrent_state_mut(); 3233 old_thread 3234 .guest() 3235 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 3236 if state.get_mut(guest_thread.task)?.ready_to_delete() { 3237 Waitable::Guest(guest_thread.task).delete_from(state)?; 3238 } 3239 log::trace!("thread start: restored {old_thread:?} as current thread"); 3240 3241 Ok(()) 3242 }, 3243 ); 3244 3245 let state = store.0.concurrent_state_mut(); 3246 let current_thread = state.unwrap_current_guest_thread(); 3247 let parent_task = current_thread.task; 3248 3249 let new_thread = GuestThread::new_explicit(parent_task, start_func); 3250 let thread_id = state.push(new_thread)?; 3251 state.get_mut(parent_task)?.threads.insert(thread_id); 3252 3253 log::trace!("new thread with id {thread_id:?} created"); 3254 3255 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) 3256 } 3257 3258 pub(crate) fn resume_thread( 3259 self, 3260 store: &mut StoreOpaque, 3261 runtime_instance: RuntimeComponentInstanceIndex, 3262 thread_idx: u32, 3263 high_priority: bool, 3264 allow_ready: bool, 3265 ) -> Result<()> { 3266 let thread_id = 3267 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; 3268 let state = store.concurrent_state_mut(); 3269 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; 3270 let thread = state.get_mut(guest_thread.thread)?; 3271 3272 match mem::replace(&mut thread.state, GuestThreadState::Running) { 3273 GuestThreadState::NotStartedExplicit(start_func) => { 3274 log::trace!("starting thread {guest_thread:?}"); 3275 let guest_call = WorkItem::GuestCall( 3276 runtime_instance, 3277 GuestCall { 3278 thread: guest_thread, 3279 kind: GuestCallKind::StartExplicit(Box::new(move |store| { 3280 start_func(store, guest_thread) 3281 })), 3282 }, 3283 ); 3284 store 3285 .concurrent_state_mut() 3286 .push_work_item(guest_call, high_priority); 3287 } 3288 GuestThreadState::Suspended(fiber) => { 3289 log::trace!("resuming thread {thread_id:?} that was suspended"); 3290 store 3291 .concurrent_state_mut() 3292 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); 3293 } 3294 GuestThreadState::Ready(fiber) if allow_ready => { 3295 log::trace!("resuming thread {thread_id:?} that was ready"); 3296 thread.state = GuestThreadState::Ready(fiber); 3297 store 3298 .concurrent_state_mut() 3299 .promote_thread_work_item(guest_thread); 3300 } 3301 other => { 3302 thread.state = other; 3303 bail!("cannot resume thread which is not suspended"); 3304 } 3305 } 3306 Ok(()) 3307 } 3308 3309 fn add_guest_thread_to_instance_table( 3310 self, 3311 thread_id: TableId<GuestThread>, 3312 store: &mut StoreOpaque, 3313 runtime_instance: RuntimeComponentInstanceIndex, 3314 ) -> Result<u32> { 3315 let guest_id = store 3316 .instance_state(RuntimeInstance { 3317 instance: self.id().instance(), 3318 index: runtime_instance, 3319 }) 3320 .thread_handle_table() 3321 .guest_thread_insert(thread_id.rep())?; 3322 store 3323 .concurrent_state_mut() 3324 .get_mut(thread_id)? 3325 .instance_rep = Some(guest_id); 3326 Ok(guest_id) 3327 } 3328 3329 /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`, 3330 /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics. 3331 pub(crate) fn suspension_intrinsic( 3332 self, 3333 store: &mut StoreOpaque, 3334 caller: RuntimeComponentInstanceIndex, 3335 cancellable: bool, 3336 yielding: bool, 3337 to_thread: SuspensionTarget, 3338 ) -> Result<WaitResult> { 3339 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread(); 3340 if to_thread.is_none() { 3341 let state = store.concurrent_state_mut(); 3342 if yielding { 3343 // This is a `thread.yield` call 3344 if !state.may_block(guest_thread.task) { 3345 // In a non-blocking context, a `thread.yield` may trigger 3346 // other threads in the same component instance to run. 3347 if !state.promote_instance_local_thread_work_item(caller) { 3348 // No other threads are runnable, so just return 3349 return Ok(WaitResult::Completed); 3350 } 3351 } 3352 } else { 3353 // The caller may only call `thread.suspend` from an async task 3354 // (i.e. a task created via a call to an async export). 3355 // Otherwise, we'll trap. 3356 store.check_blocking()?; 3357 } 3358 } 3359 3360 // There could be a pending cancellation from a previous uncancellable wait 3361 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3362 return Ok(WaitResult::Cancelled); 3363 } 3364 3365 match to_thread { 3366 SuspensionTarget::SomeSuspended(thread) => { 3367 self.resume_thread(store, caller, thread, true, false)? 3368 } 3369 SuspensionTarget::Some(thread) => { 3370 self.resume_thread(store, caller, thread, true, true)? 3371 } 3372 SuspensionTarget::None => { /* nothing to do */ } 3373 } 3374 3375 let reason = if yielding { 3376 SuspendReason::Yielding { 3377 thread: guest_thread, 3378 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3379 // we're handling a `thread.yield-to-suspended` call; otherwise it would 3380 // panic if we called it in a non-blocking context. 3381 skip_may_block_check: to_thread.is_some(), 3382 } 3383 } else { 3384 SuspendReason::ExplicitlySuspending { 3385 thread: guest_thread, 3386 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3387 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would 3388 // panic if we called it in a non-blocking context. 3389 skip_may_block_check: to_thread.is_some(), 3390 } 3391 }; 3392 3393 store.suspend(reason)?; 3394 3395 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3396 Ok(WaitResult::Cancelled) 3397 } else { 3398 Ok(WaitResult::Completed) 3399 } 3400 } 3401 3402 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. 3403 fn waitable_check( 3404 self, 3405 store: &mut StoreOpaque, 3406 cancellable: bool, 3407 check: WaitableCheck, 3408 params: WaitableCheckParams, 3409 ) -> Result<u32> { 3410 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread(); 3411 3412 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set); 3413 3414 let state = store.concurrent_state_mut(); 3415 let task = state.get_mut(guest_thread.task)?; 3416 3417 // If we're waiting, and there are no events immediately available, 3418 // suspend the fiber until that changes. 3419 match &check { 3420 WaitableCheck::Wait => { 3421 let set = params.set; 3422 3423 if (task.event.is_none() 3424 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) 3425 && state.get_mut(set)?.ready.is_empty() 3426 { 3427 if cancellable { 3428 let old = state 3429 .get_mut(guest_thread.thread)? 3430 .wake_on_cancel 3431 .replace(set); 3432 assert!(old.is_none()); 3433 } 3434 3435 store.suspend(SuspendReason::Waiting { 3436 set, 3437 thread: guest_thread, 3438 skip_may_block_check: false, 3439 })?; 3440 } 3441 } 3442 WaitableCheck::Poll => {} 3443 } 3444 3445 log::trace!( 3446 "waitable check for {guest_thread:?}; set {:?}, part two", 3447 params.set 3448 ); 3449 3450 // Deliver any pending events to the guest and return. 3451 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; 3452 3453 let (ordinal, handle, result) = match &check { 3454 WaitableCheck::Wait => { 3455 let (event, waitable) = event.unwrap(); 3456 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3457 let (ordinal, result) = event.parts(); 3458 (ordinal, handle, result) 3459 } 3460 WaitableCheck::Poll => { 3461 if let Some((event, waitable)) = event { 3462 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3463 let (ordinal, result) = event.parts(); 3464 (ordinal, handle, result) 3465 } else { 3466 log::trace!( 3467 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", 3468 guest_thread.task, 3469 params.set 3470 ); 3471 let (ordinal, result) = Event::None.parts(); 3472 (ordinal, 0, result) 3473 } 3474 } 3475 }; 3476 let memory = self.options_memory_mut(store, params.options); 3477 let ptr = func::validate_inbounds_dynamic( 3478 &CanonicalAbiInfo::POINTER_PAIR, 3479 memory, 3480 &ValRaw::u32(params.payload), 3481 )?; 3482 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); 3483 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); 3484 Ok(ordinal) 3485 } 3486 3487 /// Implements the `subtask.cancel` intrinsic. 3488 pub(crate) fn subtask_cancel( 3489 self, 3490 store: &mut StoreOpaque, 3491 caller_instance: RuntimeComponentInstanceIndex, 3492 async_: bool, 3493 task_id: u32, 3494 ) -> Result<u32> { 3495 if !async_ { 3496 // The caller may only sync call `subtask.cancel` from an async task 3497 // (i.e. a task created via a call to an async export). Otherwise, 3498 // we'll trap. 3499 store.check_blocking()?; 3500 } 3501 3502 let (rep, is_host) = store 3503 .instance_state(RuntimeInstance { 3504 instance: self.id().instance(), 3505 index: caller_instance, 3506 }) 3507 .handle_table() 3508 .subtask_rep(task_id)?; 3509 let (waitable, expected_caller) = if is_host { 3510 let id = TableId::<HostTask>::new(rep); 3511 ( 3512 Waitable::Host(id), 3513 store.concurrent_state_mut().get_mut(id)?.caller, 3514 ) 3515 } else { 3516 let id = TableId::<GuestTask>::new(rep); 3517 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller { 3518 (Waitable::Guest(id), thread) 3519 } else { 3520 unreachable!() 3521 } 3522 }; 3523 // Since waitables can neither be passed between instances nor forged, 3524 // this should never fail unless there's a bug in Wasmtime, but we check 3525 // here to be sure: 3526 let concurrent_state = store.concurrent_state_mut(); 3527 assert_eq!( 3528 expected_caller, 3529 concurrent_state.unwrap_current_guest_thread(), 3530 ); 3531 3532 log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); 3533 3534 let needs_block; 3535 if let Waitable::Host(host_task) = waitable { 3536 let state = &mut concurrent_state.get_mut(host_task)?.state; 3537 match mem::replace(state, HostTaskState::CalleeDone) { 3538 // If the callee is still running, signal an abort is requested. 3539 // Then fall through to determine what to do next. 3540 HostTaskState::CalleeRunning(handle) => handle.abort(), 3541 3542 // Cancellation was already requested, so fail as the task can't 3543 // be cancelled twice. 3544 HostTaskState::CalleeDone => { 3545 bail!("`subtask.cancel` called after terminal status delivered"); 3546 } 3547 3548 // These states should not be possible for a subtask that's 3549 // visible from the guest, so panic here. 3550 HostTaskState::CalleeStarted | HostTaskState::CalleeFinished(_) => unreachable!(), 3551 } 3552 3553 // Cancelling host tasks always needs to block on them to await the 3554 // result of the completion set up in `first_poll`. This'll resolve 3555 // the race of `handle.abort()` above to see if it actually 3556 // cancelled something or if the future ended up finishing. 3557 needs_block = true; 3558 } else { 3559 let caller = concurrent_state.unwrap_current_guest_thread(); 3560 let guest_task = TableId::<GuestTask>::new(rep); 3561 let task = concurrent_state.get_mut(guest_task)?; 3562 if !task.already_lowered_parameters() { 3563 // The task is in a `starting` state, meaning it hasn't run at 3564 // all yet. Here we update its fields to indicate that it is 3565 // ready to delete immediately once `subtask.drop` is called. 3566 task.lower_params = None; 3567 task.lift_result = None; 3568 task.exited = true; 3569 3570 let instance = task.instance; 3571 3572 assert_eq!(1, task.threads.len()); 3573 let thread = mem::take(&mut task.threads).into_iter().next().unwrap(); 3574 let concurrent_state = store.concurrent_state_mut(); 3575 concurrent_state.delete(thread)?; 3576 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete()); 3577 3578 // Not yet started; cancel and remove from pending 3579 let pending = &mut store.instance_state(instance).concurrent_state().pending; 3580 let pending_count = pending.len(); 3581 pending.retain(|thread, _| thread.task != guest_task); 3582 // If there were no pending threads for this task, we're in an error state 3583 if pending.len() == pending_count { 3584 bail!("`subtask.cancel` called after terminal status delivered"); 3585 } 3586 return Ok(Status::StartCancelled as u32); 3587 } else if !task.returned_or_cancelled() { 3588 // Started, but not yet returned or cancelled; send the 3589 // `CANCELLED` event 3590 task.cancel_sent = true; 3591 // Note that this might overwrite an event that was set earlier 3592 // (e.g. `Event::None` if the task is yielding, or 3593 // `Event::Cancelled` if it was already cancelled), but that's 3594 // okay -- this should supersede the previous state. 3595 task.event = Some(Event::Cancelled); 3596 let runtime_instance = task.instance.index; 3597 for thread in task.threads.clone() { 3598 let thread = QualifiedThreadId { 3599 task: guest_task, 3600 thread, 3601 }; 3602 if let Some(set) = concurrent_state 3603 .get_mut(thread.thread) 3604 .unwrap() 3605 .wake_on_cancel 3606 .take() 3607 { 3608 let item = match concurrent_state 3609 .get_mut(set)? 3610 .waiting 3611 .remove(&thread) 3612 .unwrap() 3613 { 3614 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 3615 WaitMode::Callback(instance) => WorkItem::GuestCall( 3616 runtime_instance, 3617 GuestCall { 3618 thread, 3619 kind: GuestCallKind::DeliverEvent { 3620 instance, 3621 set: None, 3622 }, 3623 }, 3624 ), 3625 }; 3626 concurrent_state.push_high_priority(item); 3627 3628 store.suspend(SuspendReason::Yielding { 3629 thread: caller, 3630 // `subtask.cancel` is not allowed to be called in a 3631 // sync context, so we cannot skip the may-block check. 3632 skip_may_block_check: false, 3633 })?; 3634 break; 3635 } 3636 } 3637 3638 // Guest tasks need to block if they have not yet returned or 3639 // cancelled, even as a result of the event delivery above. 3640 needs_block = !store 3641 .concurrent_state_mut() 3642 .get_mut(guest_task)? 3643 .returned_or_cancelled() 3644 } else { 3645 needs_block = false; 3646 } 3647 }; 3648 3649 // If we need to block waiting on the terminal status of this subtask 3650 // then return immediately in `async` mode, or otherwise wait for the 3651 // event to get signaled through the store. 3652 if needs_block { 3653 if async_ { 3654 return Ok(BLOCKED); 3655 } 3656 3657 // Wait for this waitable to get signaled with its terminal status 3658 // from the completion callback enqueued by `first_poll`. Once 3659 // that's done fall through to the sahred 3660 store.wait_for_event(waitable)?; 3661 3662 // .. fall through to determine what event's in store for us. 3663 } 3664 3665 let event = waitable.take_event(store.concurrent_state_mut())?; 3666 if let Some(Event::Subtask { 3667 status: status @ (Status::Returned | Status::ReturnCancelled), 3668 }) = event 3669 { 3670 Ok(status as u32) 3671 } else { 3672 bail!("`subtask.cancel` called after terminal status delivered"); 3673 } 3674 } 3675 3676 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> { 3677 store.concurrent_state_mut().context_get(slot) 3678 } 3679 3680 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> { 3681 store.concurrent_state_mut().context_set(slot, value) 3682 } 3683 } 3684 3685 /// Trait representing component model ABI async intrinsics and fused adapter 3686 /// helper functions. 3687 /// 3688 /// SAFETY (callers): Most of the methods in this trait accept raw pointers, 3689 /// which must be valid for at least the duration of the call (and possibly for 3690 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` 3691 /// pointers used for async calls). 3692 pub trait VMComponentAsyncStore { 3693 /// A helper function for fused adapter modules involving calls where the 3694 /// one of the caller or callee is async. 3695 /// 3696 /// This helper is not used when the caller and callee both use the sync 3697 /// ABI, only when at least one is async is this used. 3698 unsafe fn prepare_call( 3699 &mut self, 3700 instance: Instance, 3701 memory: *mut VMMemoryDefinition, 3702 start: *mut VMFuncRef, 3703 return_: *mut VMFuncRef, 3704 caller_instance: RuntimeComponentInstanceIndex, 3705 callee_instance: RuntimeComponentInstanceIndex, 3706 task_return_type: TypeTupleIndex, 3707 callee_async: bool, 3708 string_encoding: u8, 3709 result_count: u32, 3710 storage: *mut ValRaw, 3711 storage_len: usize, 3712 ) -> Result<()>; 3713 3714 /// A helper function for fused adapter modules involving calls where the 3715 /// caller is sync-lowered but the callee is async-lifted. 3716 unsafe fn sync_start( 3717 &mut self, 3718 instance: Instance, 3719 callback: *mut VMFuncRef, 3720 callee: *mut VMFuncRef, 3721 param_count: u32, 3722 storage: *mut MaybeUninit<ValRaw>, 3723 storage_len: usize, 3724 ) -> Result<()>; 3725 3726 /// A helper function for fused adapter modules involving calls where the 3727 /// caller is async-lowered. 3728 unsafe fn async_start( 3729 &mut self, 3730 instance: Instance, 3731 callback: *mut VMFuncRef, 3732 post_return: *mut VMFuncRef, 3733 callee: *mut VMFuncRef, 3734 param_count: u32, 3735 result_count: u32, 3736 flags: u32, 3737 ) -> Result<u32>; 3738 3739 /// The `future.write` intrinsic. 3740 fn future_write( 3741 &mut self, 3742 instance: Instance, 3743 caller: RuntimeComponentInstanceIndex, 3744 ty: TypeFutureTableIndex, 3745 options: OptionsIndex, 3746 future: u32, 3747 address: u32, 3748 ) -> Result<u32>; 3749 3750 /// The `future.read` intrinsic. 3751 fn future_read( 3752 &mut self, 3753 instance: Instance, 3754 caller: RuntimeComponentInstanceIndex, 3755 ty: TypeFutureTableIndex, 3756 options: OptionsIndex, 3757 future: u32, 3758 address: u32, 3759 ) -> Result<u32>; 3760 3761 /// The `future.drop-writable` intrinsic. 3762 fn future_drop_writable( 3763 &mut self, 3764 instance: Instance, 3765 ty: TypeFutureTableIndex, 3766 writer: u32, 3767 ) -> Result<()>; 3768 3769 /// The `stream.write` intrinsic. 3770 fn stream_write( 3771 &mut self, 3772 instance: Instance, 3773 caller: RuntimeComponentInstanceIndex, 3774 ty: TypeStreamTableIndex, 3775 options: OptionsIndex, 3776 stream: u32, 3777 address: u32, 3778 count: u32, 3779 ) -> Result<u32>; 3780 3781 /// The `stream.read` intrinsic. 3782 fn stream_read( 3783 &mut self, 3784 instance: Instance, 3785 caller: RuntimeComponentInstanceIndex, 3786 ty: TypeStreamTableIndex, 3787 options: OptionsIndex, 3788 stream: u32, 3789 address: u32, 3790 count: u32, 3791 ) -> Result<u32>; 3792 3793 /// The "fast-path" implementation of the `stream.write` intrinsic for 3794 /// "flat" (i.e. memcpy-able) payloads. 3795 fn flat_stream_write( 3796 &mut self, 3797 instance: Instance, 3798 caller: RuntimeComponentInstanceIndex, 3799 ty: TypeStreamTableIndex, 3800 options: OptionsIndex, 3801 payload_size: u32, 3802 payload_align: u32, 3803 stream: u32, 3804 address: u32, 3805 count: u32, 3806 ) -> Result<u32>; 3807 3808 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" 3809 /// (i.e. memcpy-able) payloads. 3810 fn flat_stream_read( 3811 &mut self, 3812 instance: Instance, 3813 caller: RuntimeComponentInstanceIndex, 3814 ty: TypeStreamTableIndex, 3815 options: OptionsIndex, 3816 payload_size: u32, 3817 payload_align: u32, 3818 stream: u32, 3819 address: u32, 3820 count: u32, 3821 ) -> Result<u32>; 3822 3823 /// The `stream.drop-writable` intrinsic. 3824 fn stream_drop_writable( 3825 &mut self, 3826 instance: Instance, 3827 ty: TypeStreamTableIndex, 3828 writer: u32, 3829 ) -> Result<()>; 3830 3831 /// The `error-context.debug-message` intrinsic. 3832 fn error_context_debug_message( 3833 &mut self, 3834 instance: Instance, 3835 ty: TypeComponentLocalErrorContextTableIndex, 3836 options: OptionsIndex, 3837 err_ctx_handle: u32, 3838 debug_msg_address: u32, 3839 ) -> Result<()>; 3840 3841 /// The `thread.new-indirect` intrinsic 3842 fn thread_new_indirect( 3843 &mut self, 3844 instance: Instance, 3845 caller: RuntimeComponentInstanceIndex, 3846 func_ty_idx: TypeFuncIndex, 3847 start_func_table_idx: RuntimeTableIndex, 3848 start_func_idx: u32, 3849 context: i32, 3850 ) -> Result<u32>; 3851 } 3852 3853 /// SAFETY: See trait docs. 3854 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> { 3855 unsafe fn prepare_call( 3856 &mut self, 3857 instance: Instance, 3858 memory: *mut VMMemoryDefinition, 3859 start: *mut VMFuncRef, 3860 return_: *mut VMFuncRef, 3861 caller_instance: RuntimeComponentInstanceIndex, 3862 callee_instance: RuntimeComponentInstanceIndex, 3863 task_return_type: TypeTupleIndex, 3864 callee_async: bool, 3865 string_encoding: u8, 3866 result_count_or_max_if_async: u32, 3867 storage: *mut ValRaw, 3868 storage_len: usize, 3869 ) -> Result<()> { 3870 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3871 // this method will have ensured that `storage` is a valid 3872 // pointer containing at least `storage_len` items. 3873 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); 3874 3875 unsafe { 3876 instance.prepare_call( 3877 StoreContextMut(self), 3878 start, 3879 return_, 3880 caller_instance, 3881 callee_instance, 3882 task_return_type, 3883 callee_async, 3884 memory, 3885 string_encoding, 3886 match result_count_or_max_if_async { 3887 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { 3888 params, 3889 has_result: false, 3890 }, 3891 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { 3892 params, 3893 has_result: true, 3894 }, 3895 result_count => CallerInfo::Sync { 3896 params, 3897 result_count, 3898 }, 3899 }, 3900 ) 3901 } 3902 } 3903 3904 unsafe fn sync_start( 3905 &mut self, 3906 instance: Instance, 3907 callback: *mut VMFuncRef, 3908 callee: *mut VMFuncRef, 3909 param_count: u32, 3910 storage: *mut MaybeUninit<ValRaw>, 3911 storage_len: usize, 3912 ) -> Result<()> { 3913 unsafe { 3914 instance 3915 .start_call( 3916 StoreContextMut(self), 3917 callback, 3918 ptr::null_mut(), 3919 callee, 3920 param_count, 3921 1, 3922 START_FLAG_ASYNC_CALLEE, 3923 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3924 // this method will have ensured that `storage` is a valid 3925 // pointer containing at least `storage_len` items. 3926 Some(std::slice::from_raw_parts_mut(storage, storage_len)), 3927 ) 3928 .map(drop) 3929 } 3930 } 3931 3932 unsafe fn async_start( 3933 &mut self, 3934 instance: Instance, 3935 callback: *mut VMFuncRef, 3936 post_return: *mut VMFuncRef, 3937 callee: *mut VMFuncRef, 3938 param_count: u32, 3939 result_count: u32, 3940 flags: u32, 3941 ) -> Result<u32> { 3942 unsafe { 3943 instance.start_call( 3944 StoreContextMut(self), 3945 callback, 3946 post_return, 3947 callee, 3948 param_count, 3949 result_count, 3950 flags, 3951 None, 3952 ) 3953 } 3954 } 3955 3956 fn future_write( 3957 &mut self, 3958 instance: Instance, 3959 caller: RuntimeComponentInstanceIndex, 3960 ty: TypeFutureTableIndex, 3961 options: OptionsIndex, 3962 future: u32, 3963 address: u32, 3964 ) -> Result<u32> { 3965 instance 3966 .guest_write( 3967 StoreContextMut(self), 3968 caller, 3969 TransmitIndex::Future(ty), 3970 options, 3971 None, 3972 future, 3973 address, 3974 1, 3975 ) 3976 .map(|result| result.encode()) 3977 } 3978 3979 fn future_read( 3980 &mut self, 3981 instance: Instance, 3982 caller: RuntimeComponentInstanceIndex, 3983 ty: TypeFutureTableIndex, 3984 options: OptionsIndex, 3985 future: u32, 3986 address: u32, 3987 ) -> Result<u32> { 3988 instance 3989 .guest_read( 3990 StoreContextMut(self), 3991 caller, 3992 TransmitIndex::Future(ty), 3993 options, 3994 None, 3995 future, 3996 address, 3997 1, 3998 ) 3999 .map(|result| result.encode()) 4000 } 4001 4002 fn stream_write( 4003 &mut self, 4004 instance: Instance, 4005 caller: RuntimeComponentInstanceIndex, 4006 ty: TypeStreamTableIndex, 4007 options: OptionsIndex, 4008 stream: u32, 4009 address: u32, 4010 count: u32, 4011 ) -> Result<u32> { 4012 instance 4013 .guest_write( 4014 StoreContextMut(self), 4015 caller, 4016 TransmitIndex::Stream(ty), 4017 options, 4018 None, 4019 stream, 4020 address, 4021 count, 4022 ) 4023 .map(|result| result.encode()) 4024 } 4025 4026 fn stream_read( 4027 &mut self, 4028 instance: Instance, 4029 caller: RuntimeComponentInstanceIndex, 4030 ty: TypeStreamTableIndex, 4031 options: OptionsIndex, 4032 stream: u32, 4033 address: u32, 4034 count: u32, 4035 ) -> Result<u32> { 4036 instance 4037 .guest_read( 4038 StoreContextMut(self), 4039 caller, 4040 TransmitIndex::Stream(ty), 4041 options, 4042 None, 4043 stream, 4044 address, 4045 count, 4046 ) 4047 .map(|result| result.encode()) 4048 } 4049 4050 fn future_drop_writable( 4051 &mut self, 4052 instance: Instance, 4053 ty: TypeFutureTableIndex, 4054 writer: u32, 4055 ) -> Result<()> { 4056 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) 4057 } 4058 4059 fn flat_stream_write( 4060 &mut self, 4061 instance: Instance, 4062 caller: RuntimeComponentInstanceIndex, 4063 ty: TypeStreamTableIndex, 4064 options: OptionsIndex, 4065 payload_size: u32, 4066 payload_align: u32, 4067 stream: u32, 4068 address: u32, 4069 count: u32, 4070 ) -> Result<u32> { 4071 instance 4072 .guest_write( 4073 StoreContextMut(self), 4074 caller, 4075 TransmitIndex::Stream(ty), 4076 options, 4077 Some(FlatAbi { 4078 size: payload_size, 4079 align: payload_align, 4080 }), 4081 stream, 4082 address, 4083 count, 4084 ) 4085 .map(|result| result.encode()) 4086 } 4087 4088 fn flat_stream_read( 4089 &mut self, 4090 instance: Instance, 4091 caller: RuntimeComponentInstanceIndex, 4092 ty: TypeStreamTableIndex, 4093 options: OptionsIndex, 4094 payload_size: u32, 4095 payload_align: u32, 4096 stream: u32, 4097 address: u32, 4098 count: u32, 4099 ) -> Result<u32> { 4100 instance 4101 .guest_read( 4102 StoreContextMut(self), 4103 caller, 4104 TransmitIndex::Stream(ty), 4105 options, 4106 Some(FlatAbi { 4107 size: payload_size, 4108 align: payload_align, 4109 }), 4110 stream, 4111 address, 4112 count, 4113 ) 4114 .map(|result| result.encode()) 4115 } 4116 4117 fn stream_drop_writable( 4118 &mut self, 4119 instance: Instance, 4120 ty: TypeStreamTableIndex, 4121 writer: u32, 4122 ) -> Result<()> { 4123 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) 4124 } 4125 4126 fn error_context_debug_message( 4127 &mut self, 4128 instance: Instance, 4129 ty: TypeComponentLocalErrorContextTableIndex, 4130 options: OptionsIndex, 4131 err_ctx_handle: u32, 4132 debug_msg_address: u32, 4133 ) -> Result<()> { 4134 instance.error_context_debug_message( 4135 StoreContextMut(self), 4136 ty, 4137 options, 4138 err_ctx_handle, 4139 debug_msg_address, 4140 ) 4141 } 4142 4143 fn thread_new_indirect( 4144 &mut self, 4145 instance: Instance, 4146 caller: RuntimeComponentInstanceIndex, 4147 func_ty_idx: TypeFuncIndex, 4148 start_func_table_idx: RuntimeTableIndex, 4149 start_func_idx: u32, 4150 context: i32, 4151 ) -> Result<u32> { 4152 instance.thread_new_indirect( 4153 StoreContextMut(self), 4154 caller, 4155 func_ty_idx, 4156 start_func_table_idx, 4157 start_func_idx, 4158 context, 4159 ) 4160 } 4161 } 4162 4163 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>; 4164 4165 /// Represents the state of a pending host task. 4166 /// 4167 /// This is used to represent tasks when the guest calls into the host. 4168 struct HostTask { 4169 common: WaitableCommon, 4170 4171 /// Guest thread which called the host. 4172 caller: QualifiedThreadId, 4173 4174 /// State of borrows/etc the host needs to track. Used when the guest passes 4175 /// borrows to the host, for example. 4176 call_context: CallContext, 4177 4178 state: HostTaskState, 4179 } 4180 4181 enum HostTaskState { 4182 /// A host task has been created and it's considered "started". 4183 /// 4184 /// The host task has yet to enter `first_poll` or `poll_and_block` which 4185 /// is where this will get updated further. 4186 CalleeStarted, 4187 4188 /// State used for tasks in `first_poll` meaning that the guest did an async 4189 /// lower of a host async function which is blocked. The specified handle is 4190 /// linked to the future in the main `FuturesUnordered` of a store which is 4191 /// used to cancel it if the guest requests cancellation. 4192 CalleeRunning(JoinHandle), 4193 4194 /// Terminal state used for tasks in `poll_and_block` to store the result of 4195 /// their computation. Note that this state is not used for tasks in 4196 /// `first_poll`. 4197 CalleeFinished(LiftedResult), 4198 4199 /// Terminal state for host tasks meaning that the task was cancelled or the 4200 /// result was taken. 4201 CalleeDone, 4202 } 4203 4204 impl HostTask { 4205 fn new(caller: QualifiedThreadId, state: HostTaskState) -> Self { 4206 Self { 4207 common: WaitableCommon::default(), 4208 call_context: CallContext::default(), 4209 caller, 4210 state, 4211 } 4212 } 4213 } 4214 4215 impl TableDebug for HostTask { 4216 fn type_name() -> &'static str { 4217 "HostTask" 4218 } 4219 } 4220 4221 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>; 4222 4223 /// Represents the caller of a given guest task. 4224 enum Caller { 4225 /// The host called the guest task. 4226 Host { 4227 /// If present, may be used to deliver the result. 4228 tx: Option<oneshot::Sender<LiftedResult>>, 4229 /// If true, there's a host future that must be dropped before the task 4230 /// can be deleted. 4231 host_future_present: bool, 4232 /// Represents the caller of the host function which called back into a 4233 /// guest. Note that this thread could belong to an entirely unrelated 4234 /// top-level component instance than the one the host called into. 4235 caller: CurrentThread, 4236 }, 4237 /// Another guest thread called the guest task 4238 Guest { 4239 /// The id of the caller 4240 thread: QualifiedThreadId, 4241 }, 4242 } 4243 4244 /// Represents a closure and related canonical ABI parameters required to 4245 /// validate a `task.return` call at runtime and lift the result. 4246 struct LiftResult { 4247 lift: RawLift, 4248 ty: TypeTupleIndex, 4249 memory: Option<SendSyncPtr<VMMemoryDefinition>>, 4250 string_encoding: StringEncoding, 4251 } 4252 4253 /// The table ID for a guest thread, qualified by the task to which it belongs. 4254 /// 4255 /// This exists to minimize table lookups and the necessity to pass stores around mutably 4256 /// for the common case of identifying the task to which a thread belongs. 4257 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4258 struct QualifiedThreadId { 4259 task: TableId<GuestTask>, 4260 thread: TableId<GuestThread>, 4261 } 4262 4263 impl QualifiedThreadId { 4264 fn qualify( 4265 state: &mut ConcurrentState, 4266 thread: TableId<GuestThread>, 4267 ) -> Result<QualifiedThreadId> { 4268 Ok(QualifiedThreadId { 4269 task: state.get_mut(thread)?.parent_task, 4270 thread, 4271 }) 4272 } 4273 } 4274 4275 impl fmt::Debug for QualifiedThreadId { 4276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 4277 f.debug_tuple("QualifiedThreadId") 4278 .field(&self.task.rep()) 4279 .field(&self.thread.rep()) 4280 .finish() 4281 } 4282 } 4283 4284 enum GuestThreadState { 4285 NotStartedImplicit, 4286 NotStartedExplicit( 4287 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>, 4288 ), 4289 Running, 4290 Suspended(StoreFiber<'static>), 4291 Ready(StoreFiber<'static>), 4292 Completed, 4293 } 4294 pub struct GuestThread { 4295 /// Context-local state used to implement the `context.{get,set}` 4296 /// intrinsics. 4297 context: [u32; 2], 4298 /// The owning guest task. 4299 parent_task: TableId<GuestTask>, 4300 /// If present, indicates that the thread is currently waiting on the 4301 /// specified set but may be cancelled and woken immediately. 4302 wake_on_cancel: Option<TableId<WaitableSet>>, 4303 /// The execution state of this guest thread 4304 state: GuestThreadState, 4305 /// The index of this thread in the component instance's handle table. 4306 /// This must always be `Some` after initialization. 4307 instance_rep: Option<u32>, 4308 } 4309 4310 impl GuestThread { 4311 /// Retrieve the `GuestThread` corresponding to the specified guest-visible 4312 /// handle. 4313 fn from_instance( 4314 state: Pin<&mut ComponentInstance>, 4315 caller_instance: RuntimeComponentInstanceIndex, 4316 guest_thread: u32, 4317 ) -> Result<TableId<Self>> { 4318 let rep = state.instance_states().0[caller_instance] 4319 .thread_handle_table() 4320 .guest_thread_rep(guest_thread)?; 4321 Ok(TableId::new(rep)) 4322 } 4323 4324 fn new_implicit(parent_task: TableId<GuestTask>) -> Self { 4325 Self { 4326 context: [0; 2], 4327 parent_task, 4328 wake_on_cancel: None, 4329 state: GuestThreadState::NotStartedImplicit, 4330 instance_rep: None, 4331 } 4332 } 4333 4334 fn new_explicit( 4335 parent_task: TableId<GuestTask>, 4336 start_func: Box< 4337 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, 4338 >, 4339 ) -> Self { 4340 Self { 4341 context: [0; 2], 4342 parent_task, 4343 wake_on_cancel: None, 4344 state: GuestThreadState::NotStartedExplicit(start_func), 4345 instance_rep: None, 4346 } 4347 } 4348 } 4349 4350 impl TableDebug for GuestThread { 4351 fn type_name() -> &'static str { 4352 "GuestThread" 4353 } 4354 } 4355 4356 enum SyncResult { 4357 NotProduced, 4358 Produced(Option<ValRaw>), 4359 Taken, 4360 } 4361 4362 impl SyncResult { 4363 fn take(&mut self) -> Option<Option<ValRaw>> { 4364 match mem::replace(self, SyncResult::Taken) { 4365 SyncResult::NotProduced => None, 4366 SyncResult::Produced(val) => Some(val), 4367 SyncResult::Taken => { 4368 panic!("attempted to take a synchronous result that was already taken") 4369 } 4370 } 4371 } 4372 } 4373 4374 #[derive(Debug)] 4375 enum HostFutureState { 4376 NotApplicable, 4377 Live, 4378 Dropped, 4379 } 4380 4381 /// Represents a pending guest task. 4382 pub(crate) struct GuestTask { 4383 /// See `WaitableCommon` 4384 common: WaitableCommon, 4385 /// Closure to lower the parameters passed to this task. 4386 lower_params: Option<RawLower>, 4387 /// See `LiftResult` 4388 lift_result: Option<LiftResult>, 4389 /// A place to stash the type-erased lifted result if it can't be delivered 4390 /// immediately. 4391 result: Option<LiftedResult>, 4392 /// Closure to call the callback function for an async-lifted export, if 4393 /// provided. 4394 callback: Option<CallbackFn>, 4395 /// See `Caller` 4396 caller: Caller, 4397 /// Borrow state for this task. 4398 /// 4399 /// Keeps track of `borrow<T>` received to this task to ensure that 4400 /// everything is dropped by the time it exits. 4401 call_context: CallContext, 4402 /// A place to stash the lowered result for a sync-to-async call until it 4403 /// can be returned to the caller. 4404 sync_result: SyncResult, 4405 /// Whether or not the task has been cancelled (i.e. whether the task is 4406 /// permitted to call `task.cancel`). 4407 cancel_sent: bool, 4408 /// Whether or not we've sent a `Status::Starting` event to any current or 4409 /// future waiters for this waitable. 4410 starting_sent: bool, 4411 /// Scratch waitable set used to watch subtasks during synchronous calls. 4412 sync_call_set: TableId<WaitableSet>, 4413 /// The runtime instance to which the exported function for this guest task 4414 /// belongs. 4415 /// 4416 /// Note that the task may do a sync->sync call via a fused adapter which 4417 /// results in that task executing code in a different instance, and it may 4418 /// call host functions and intrinsics from that other instance. 4419 instance: RuntimeInstance, 4420 /// If present, a pending `Event::None` or `Event::Cancelled` to be 4421 /// delivered to this task. 4422 event: Option<Event>, 4423 /// Whether or not the task has exited. 4424 exited: bool, 4425 /// Threads belonging to this task 4426 threads: HashSet<TableId<GuestThread>>, 4427 /// The state of the host future that represents an async task, which must 4428 /// be dropped before we can delete the task. 4429 host_future_state: HostFutureState, 4430 /// Indicates whether this task was created for a call to an async-lifted 4431 /// export. 4432 async_function: bool, 4433 } 4434 4435 impl GuestTask { 4436 fn already_lowered_parameters(&self) -> bool { 4437 // We reset `lower_params` after we lower the parameters 4438 self.lower_params.is_none() 4439 } 4440 4441 fn returned_or_cancelled(&self) -> bool { 4442 // We reset `lift_result` after we return or exit 4443 self.lift_result.is_none() 4444 } 4445 4446 fn ready_to_delete(&self) -> bool { 4447 let threads_completed = self.threads.is_empty(); 4448 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); 4449 let pending_completion_event = matches!( 4450 self.common.event, 4451 Some(Event::Subtask { 4452 status: Status::Returned | Status::ReturnCancelled 4453 }) 4454 ); 4455 let ready = threads_completed 4456 && !has_sync_result 4457 && !pending_completion_event 4458 && !matches!(self.host_future_state, HostFutureState::Live); 4459 log::trace!( 4460 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", 4461 threads_completed, 4462 has_sync_result, 4463 pending_completion_event, 4464 self.host_future_state 4465 ); 4466 ready 4467 } 4468 4469 fn new( 4470 state: &mut ConcurrentState, 4471 lower_params: RawLower, 4472 lift_result: LiftResult, 4473 caller: Caller, 4474 callback: Option<CallbackFn>, 4475 instance: RuntimeInstance, 4476 async_function: bool, 4477 ) -> Result<Self> { 4478 let sync_call_set = state.push(WaitableSet::default())?; 4479 let host_future_state = match &caller { 4480 Caller::Guest { .. } => HostFutureState::NotApplicable, 4481 Caller::Host { 4482 host_future_present, 4483 .. 4484 } => { 4485 if *host_future_present { 4486 HostFutureState::Live 4487 } else { 4488 HostFutureState::NotApplicable 4489 } 4490 } 4491 }; 4492 Ok(Self { 4493 common: WaitableCommon::default(), 4494 lower_params: Some(lower_params), 4495 lift_result: Some(lift_result), 4496 result: None, 4497 callback, 4498 caller, 4499 call_context: CallContext::default(), 4500 sync_result: SyncResult::NotProduced, 4501 cancel_sent: false, 4502 starting_sent: false, 4503 sync_call_set, 4504 instance, 4505 event: None, 4506 exited: false, 4507 threads: HashSet::new(), 4508 host_future_state, 4509 async_function, 4510 }) 4511 } 4512 4513 /// Dispose of this guest task, reparenting any pending subtasks to the 4514 /// caller. 4515 fn dispose(self, state: &mut ConcurrentState) -> Result<()> { 4516 // If there are not-yet-delivered completion events for subtasks in 4517 // `self.sync_call_set`, recursively dispose of those subtasks as well. 4518 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) { 4519 if let Some(Event::Subtask { 4520 status: Status::Returned | Status::ReturnCancelled, 4521 }) = waitable.common(state)?.event 4522 { 4523 waitable.delete_from(state)?; 4524 } 4525 } 4526 4527 assert!(self.threads.is_empty()); 4528 4529 state.delete(self.sync_call_set)?; 4530 4531 Ok(()) 4532 } 4533 } 4534 4535 impl TableDebug for GuestTask { 4536 fn type_name() -> &'static str { 4537 "GuestTask" 4538 } 4539 } 4540 4541 /// Represents state common to all kinds of waitables. 4542 #[derive(Default)] 4543 struct WaitableCommon { 4544 /// The currently pending event for this waitable, if any. 4545 event: Option<Event>, 4546 /// The set to which this waitable belongs, if any. 4547 set: Option<TableId<WaitableSet>>, 4548 /// The handle with which the guest refers to this waitable, if any. 4549 handle: Option<u32>, 4550 } 4551 4552 /// Represents a Component Model Async `waitable`. 4553 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4554 enum Waitable { 4555 /// A host task 4556 Host(TableId<HostTask>), 4557 /// A guest task 4558 Guest(TableId<GuestTask>), 4559 /// The read or write end of a stream or future 4560 Transmit(TableId<TransmitHandle>), 4561 } 4562 4563 impl Waitable { 4564 /// Retrieve the `Waitable` corresponding to the specified guest-visible 4565 /// handle. 4566 fn from_instance( 4567 state: Pin<&mut ComponentInstance>, 4568 caller_instance: RuntimeComponentInstanceIndex, 4569 waitable: u32, 4570 ) -> Result<Self> { 4571 use crate::runtime::vm::component::Waitable; 4572 4573 let (waitable, kind) = state.instance_states().0[caller_instance] 4574 .handle_table() 4575 .waitable_rep(waitable)?; 4576 4577 Ok(match kind { 4578 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), 4579 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), 4580 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), 4581 }) 4582 } 4583 4584 /// Retrieve the host-visible identifier for this `Waitable`. 4585 fn rep(&self) -> u32 { 4586 match self { 4587 Self::Host(id) => id.rep(), 4588 Self::Guest(id) => id.rep(), 4589 Self::Transmit(id) => id.rep(), 4590 } 4591 } 4592 4593 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or 4594 /// remove it from any set it may currently belong to (when `set` is 4595 /// `None`). 4596 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> { 4597 log::trace!("waitable {self:?} join set {set:?}",); 4598 4599 let old = mem::replace(&mut self.common(state)?.set, set); 4600 4601 if let Some(old) = old { 4602 match *self { 4603 Waitable::Host(id) => state.remove_child(id, old), 4604 Waitable::Guest(id) => state.remove_child(id, old), 4605 Waitable::Transmit(id) => state.remove_child(id, old), 4606 }?; 4607 4608 state.get_mut(old)?.ready.remove(self); 4609 } 4610 4611 if let Some(set) = set { 4612 match *self { 4613 Waitable::Host(id) => state.add_child(id, set), 4614 Waitable::Guest(id) => state.add_child(id, set), 4615 Waitable::Transmit(id) => state.add_child(id, set), 4616 }?; 4617 4618 if self.common(state)?.event.is_some() { 4619 self.mark_ready(state)?; 4620 } 4621 } 4622 4623 Ok(()) 4624 } 4625 4626 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. 4627 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { 4628 Ok(match self { 4629 Self::Host(id) => &mut state.get_mut(*id)?.common, 4630 Self::Guest(id) => &mut state.get_mut(*id)?.common, 4631 Self::Transmit(id) => &mut state.get_mut(*id)?.common, 4632 }) 4633 } 4634 4635 /// Set or clear the pending event for this waitable and either deliver it 4636 /// to the first waiter, if any, or mark it as ready to be delivered to the 4637 /// next waiter that arrives. 4638 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> { 4639 log::trace!("set event for {self:?}: {event:?}"); 4640 self.common(state)?.event = event; 4641 self.mark_ready(state) 4642 } 4643 4644 /// Take the pending event from this waitable, leaving `None` in its place. 4645 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> { 4646 let common = self.common(state)?; 4647 let event = common.event.take(); 4648 if let Some(set) = self.common(state)?.set { 4649 state.get_mut(set)?.ready.remove(self); 4650 } 4651 4652 Ok(event) 4653 } 4654 4655 /// Deliver the current event for this waitable to the first waiter, if any, 4656 /// or else mark it as ready to be delivered to the next waiter that 4657 /// arrives. 4658 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { 4659 if let Some(set) = self.common(state)?.set { 4660 state.get_mut(set)?.ready.insert(*self); 4661 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { 4662 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); 4663 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); 4664 4665 let item = match mode { 4666 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 4667 WaitMode::Callback(instance) => WorkItem::GuestCall( 4668 state.get_mut(thread.task)?.instance.index, 4669 GuestCall { 4670 thread, 4671 kind: GuestCallKind::DeliverEvent { 4672 instance, 4673 set: Some(set), 4674 }, 4675 }, 4676 ), 4677 }; 4678 state.push_high_priority(item); 4679 } 4680 } 4681 Ok(()) 4682 } 4683 4684 /// Remove this waitable from the instance's rep table. 4685 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { 4686 match self { 4687 Self::Host(task) => { 4688 log::trace!("delete host task {task:?}"); 4689 state.delete(*task)?; 4690 } 4691 Self::Guest(task) => { 4692 log::trace!("delete guest task {task:?}"); 4693 state.delete(*task)?.dispose(state)?; 4694 } 4695 Self::Transmit(task) => { 4696 state.delete(*task)?; 4697 } 4698 } 4699 4700 Ok(()) 4701 } 4702 } 4703 4704 impl fmt::Debug for Waitable { 4705 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 4706 match self { 4707 Self::Host(id) => write!(f, "{id:?}"), 4708 Self::Guest(id) => write!(f, "{id:?}"), 4709 Self::Transmit(id) => write!(f, "{id:?}"), 4710 } 4711 } 4712 } 4713 4714 /// Represents a Component Model Async `waitable-set`. 4715 #[derive(Default)] 4716 struct WaitableSet { 4717 /// Which waitables in this set have pending events, if any. 4718 ready: BTreeSet<Waitable>, 4719 /// Which guest threads are currently waiting on this set, if any. 4720 waiting: BTreeMap<QualifiedThreadId, WaitMode>, 4721 } 4722 4723 impl TableDebug for WaitableSet { 4724 fn type_name() -> &'static str { 4725 "WaitableSet" 4726 } 4727 } 4728 4729 /// Type-erased closure to lower the parameters for a guest task. 4730 type RawLower = 4731 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>; 4732 4733 /// Type-erased closure to lift the result for a guest task. 4734 type RawLift = Box< 4735 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4736 >; 4737 4738 /// Type erased result of a guest task which may be downcast to the expected 4739 /// type by a host caller (or simply ignored in the case of a guest caller; see 4740 /// `DummyResult`). 4741 type LiftedResult = Box<dyn Any + Send + Sync>; 4742 4743 /// Used to return a result from a `LiftFn` when the actual result has already 4744 /// been lowered to a guest task's stack and linear memory. 4745 struct DummyResult; 4746 4747 /// Represents the Component Model Async state of a (sub-)component instance. 4748 #[derive(Default)] 4749 pub struct ConcurrentInstanceState { 4750 /// Whether backpressure is set for this instance (enabled if >0) 4751 backpressure: u16, 4752 /// Whether this instance can be entered 4753 do_not_enter: bool, 4754 /// Pending calls for this instance which require `Self::backpressure` to be 4755 /// `true` and/or `Self::do_not_enter` to be false before they can proceed. 4756 pending: BTreeMap<QualifiedThreadId, GuestCallKind>, 4757 } 4758 4759 impl ConcurrentInstanceState { 4760 pub fn pending_is_empty(&self) -> bool { 4761 self.pending.is_empty() 4762 } 4763 } 4764 4765 #[derive(Debug, Copy, Clone)] 4766 enum CurrentThread { 4767 Guest(QualifiedThreadId), 4768 Host(TableId<HostTask>), 4769 None, 4770 } 4771 4772 impl CurrentThread { 4773 fn guest(&self) -> Option<&QualifiedThreadId> { 4774 match self { 4775 Self::Guest(id) => Some(id), 4776 _ => None, 4777 } 4778 } 4779 4780 fn host(&self) -> Option<TableId<HostTask>> { 4781 match self { 4782 Self::Host(id) => Some(*id), 4783 _ => None, 4784 } 4785 } 4786 4787 fn is_none(&self) -> bool { 4788 matches!(self, Self::None) 4789 } 4790 } 4791 4792 impl From<QualifiedThreadId> for CurrentThread { 4793 fn from(id: QualifiedThreadId) -> Self { 4794 Self::Guest(id) 4795 } 4796 } 4797 4798 impl From<TableId<HostTask>> for CurrentThread { 4799 fn from(id: TableId<HostTask>) -> Self { 4800 Self::Host(id) 4801 } 4802 } 4803 4804 /// Represents the Component Model Async state of a store. 4805 pub struct ConcurrentState { 4806 /// The currently running thread, if any. 4807 current_thread: CurrentThread, 4808 4809 /// The set of pending host and background tasks, if any. 4810 /// 4811 /// See `ComponentInstance::poll_until` for where we temporarily take this 4812 /// out, poll it, then put it back to avoid any mutable aliasing hazards. 4813 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>, 4814 /// The table of waitables, waitable sets, etc. 4815 table: AlwaysMut<ResourceTable>, 4816 /// The "high priority" work queue for this store's event loop. 4817 high_priority: Vec<WorkItem>, 4818 /// The "low priority" work queue for this store's event loop. 4819 low_priority: VecDeque<WorkItem>, 4820 /// A place to stash the reason a fiber is suspending so that the code which 4821 /// resumed it will know under what conditions the fiber should be resumed 4822 /// again. 4823 suspend_reason: Option<SuspendReason>, 4824 /// A cached fiber which is waiting for work to do. 4825 /// 4826 /// This helps us avoid creating a new fiber for each `GuestCall` work item. 4827 worker: Option<StoreFiber<'static>>, 4828 /// A place to stash the work item for which we're resuming a worker fiber. 4829 worker_item: Option<WorkerItem>, 4830 4831 /// Reference counts for all component error contexts 4832 /// 4833 /// NOTE: it is possible the global ref count to be *greater* than the sum of 4834 /// (sub)component ref counts as tracked by `error_context_tables`, for 4835 /// example when the host holds one or more references to error contexts. 4836 /// 4837 /// The key of this primary map is often referred to as the "rep" (i.e. host-side 4838 /// component-wide representation) of the index into concurrent state for a given 4839 /// stored `ErrorContext`. 4840 /// 4841 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same 4842 /// as a `TableId<ErrorContextState>`. 4843 global_error_context_ref_counts: 4844 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>, 4845 } 4846 4847 impl Default for ConcurrentState { 4848 fn default() -> Self { 4849 Self { 4850 current_thread: CurrentThread::None, 4851 table: AlwaysMut::new(ResourceTable::new()), 4852 futures: AlwaysMut::new(Some(FuturesUnordered::new())), 4853 high_priority: Vec::new(), 4854 low_priority: VecDeque::new(), 4855 suspend_reason: None, 4856 worker: None, 4857 worker_item: None, 4858 global_error_context_ref_counts: BTreeMap::new(), 4859 } 4860 } 4861 } 4862 4863 impl ConcurrentState { 4864 /// Take ownership of any fibers and futures owned by this object. 4865 /// 4866 /// This should be used when disposing of the `Store` containing this object 4867 /// in order to gracefully resolve any and all fibers using 4868 /// `StoreFiber::dispose`. This is necessary to avoid possible 4869 /// use-after-free bugs due to fibers which may still have access to the 4870 /// `Store`. 4871 /// 4872 /// Additionally, the futures collected with this function should be dropped 4873 /// within a `tls::set` call, which will ensure than any futures closing 4874 /// over an `&Accessor` will have access to the store when dropped, allowing 4875 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without 4876 /// panicking. 4877 /// 4878 /// Note that this will leave the object in an inconsistent and unusable 4879 /// state, so it should only be used just prior to dropping it. 4880 pub(crate) fn take_fibers_and_futures( 4881 &mut self, 4882 fibers: &mut Vec<StoreFiber<'static>>, 4883 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, 4884 ) { 4885 for entry in self.table.get_mut().iter_mut() { 4886 if let Some(set) = entry.downcast_mut::<WaitableSet>() { 4887 for mode in mem::take(&mut set.waiting).into_values() { 4888 if let WaitMode::Fiber(fiber) = mode { 4889 fibers.push(fiber); 4890 } 4891 } 4892 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() { 4893 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) = 4894 mem::replace(&mut thread.state, GuestThreadState::Completed) 4895 { 4896 fibers.push(fiber); 4897 } 4898 } 4899 } 4900 4901 if let Some(fiber) = self.worker.take() { 4902 fibers.push(fiber); 4903 } 4904 4905 let mut handle_item = |item| match item { 4906 WorkItem::ResumeFiber(fiber) => { 4907 fibers.push(fiber); 4908 } 4909 WorkItem::PushFuture(future) => { 4910 self.futures 4911 .get_mut() 4912 .as_mut() 4913 .unwrap() 4914 .push(future.into_inner()); 4915 } 4916 _ => {} 4917 }; 4918 4919 for item in mem::take(&mut self.high_priority) { 4920 handle_item(item); 4921 } 4922 for item in mem::take(&mut self.low_priority) { 4923 handle_item(item); 4924 } 4925 4926 if let Some(them) = self.futures.get_mut().take() { 4927 futures.push(them); 4928 } 4929 } 4930 4931 /// Collect the next set of work items to run. This will be either all 4932 /// high-priority items, or a single low-priority item if there are no 4933 /// high-priority items. 4934 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> { 4935 let mut ready = mem::take(&mut self.high_priority); 4936 if ready.is_empty() { 4937 if let Some(item) = self.low_priority.pop_back() { 4938 ready.push(item); 4939 } 4940 } 4941 ready 4942 } 4943 4944 fn push<V: Send + Sync + 'static>( 4945 &mut self, 4946 value: V, 4947 ) -> Result<TableId<V>, ResourceTableError> { 4948 self.table.get_mut().push(value).map(TableId::from) 4949 } 4950 4951 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> { 4952 self.table.get_mut().get_mut(&Resource::from(id)) 4953 } 4954 4955 pub fn add_child<T: 'static, U: 'static>( 4956 &mut self, 4957 child: TableId<T>, 4958 parent: TableId<U>, 4959 ) -> Result<(), ResourceTableError> { 4960 self.table 4961 .get_mut() 4962 .add_child(Resource::from(child), Resource::from(parent)) 4963 } 4964 4965 pub fn remove_child<T: 'static, U: 'static>( 4966 &mut self, 4967 child: TableId<T>, 4968 parent: TableId<U>, 4969 ) -> Result<(), ResourceTableError> { 4970 self.table 4971 .get_mut() 4972 .remove_child(Resource::from(child), Resource::from(parent)) 4973 } 4974 4975 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> { 4976 self.table.get_mut().delete(Resource::from(id)) 4977 } 4978 4979 fn push_future(&mut self, future: HostTaskFuture) { 4980 // Note that we can't directly push to `ConcurrentState::futures` here 4981 // since this may be called from a future that's being polled inside 4982 // `Self::poll_until`, which temporarily removes the `FuturesUnordered` 4983 // so it has exclusive access while polling it. Therefore, we push a 4984 // work item to the "high priority" queue, which will actually push to 4985 // `ConcurrentState::futures` later. 4986 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); 4987 } 4988 4989 fn push_high_priority(&mut self, item: WorkItem) { 4990 log::trace!("push high priority: {item:?}"); 4991 self.high_priority.push(item); 4992 } 4993 4994 fn push_low_priority(&mut self, item: WorkItem) { 4995 log::trace!("push low priority: {item:?}"); 4996 self.low_priority.push_front(item); 4997 } 4998 4999 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { 5000 if high_priority { 5001 self.push_high_priority(item); 5002 } else { 5003 self.push_low_priority(item); 5004 } 5005 } 5006 5007 fn promote_instance_local_thread_work_item( 5008 &mut self, 5009 current_instance: RuntimeComponentInstanceIndex, 5010 ) -> bool { 5011 self.promote_work_items_matching(|item: &WorkItem| match item { 5012 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => { 5013 *instance == current_instance 5014 } 5015 _ => false, 5016 }) 5017 } 5018 5019 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool { 5020 self.promote_work_items_matching(|item: &WorkItem| match item { 5021 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => { 5022 *t == thread 5023 } 5024 _ => false, 5025 }) 5026 } 5027 5028 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool 5029 where 5030 F: FnMut(&WorkItem) -> bool, 5031 { 5032 // If there's a high-priority work item to resume the current guest thread, 5033 // we don't need to promote anything, but we return true to indicate that 5034 // work is pending for the current instance. 5035 if self.high_priority.iter().any(&mut predicate) { 5036 true 5037 } 5038 // Otherwise, look for a low-priority work item that matches the current 5039 // instance and promote it to high-priority. 5040 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) { 5041 let item = self.low_priority.remove(idx).unwrap(); 5042 self.push_high_priority(item); 5043 true 5044 } else { 5045 false 5046 } 5047 } 5048 5049 /// Implements the `context.get` intrinsic. 5050 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> { 5051 let thread = self.unwrap_current_guest_thread(); 5052 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()]; 5053 log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); 5054 Ok(val) 5055 } 5056 5057 /// Implements the `context.set` intrinsic. 5058 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { 5059 let thread = self.unwrap_current_guest_thread(); 5060 log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); 5061 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val; 5062 Ok(()) 5063 } 5064 5065 /// Returns whether there's a pending cancellation on the current guest thread, 5066 /// consuming the event if so. 5067 fn take_pending_cancellation(&mut self) -> bool { 5068 let thread = self.unwrap_current_guest_thread(); 5069 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() { 5070 assert!(matches!(event, Event::Cancelled)); 5071 true 5072 } else { 5073 false 5074 } 5075 } 5076 5077 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> { 5078 if self.may_block(task) { 5079 Ok(()) 5080 } else { 5081 Err(Trap::CannotBlockSyncTask.into()) 5082 } 5083 } 5084 5085 fn may_block(&mut self, task: TableId<GuestTask>) -> bool { 5086 let task = self.get_mut(task).unwrap(); 5087 task.async_function || task.returned_or_cancelled() 5088 } 5089 5090 /// Used by `ResourceTables` to acquire the current `CallContext` for the 5091 /// specified task. 5092 /// 5093 /// The `task` is bit-packed as returned by `current_call_context_scope_id` 5094 /// below. 5095 pub fn call_context(&mut self, task: u32) -> &mut CallContext { 5096 let (task, is_host) = (task >> 1, task & 1 == 1); 5097 if is_host { 5098 let task: TableId<HostTask> = TableId::new(task); 5099 &mut self.get_mut(task).unwrap().call_context 5100 } else { 5101 let task: TableId<GuestTask> = TableId::new(task); 5102 &mut self.get_mut(task).unwrap().call_context 5103 } 5104 } 5105 5106 /// Used by `ResourceTables` to record the scope of a borrow to get undone 5107 /// in the future. 5108 pub fn current_call_context_scope_id(&self) -> u32 { 5109 let (bits, is_host) = match self.current_thread { 5110 CurrentThread::Guest(id) => (id.task.rep(), false), 5111 CurrentThread::Host(id) => (id.rep(), true), 5112 CurrentThread::None => unreachable!(), 5113 }; 5114 assert_eq!((bits << 1) >> 1, bits); 5115 (bits << 1) | u32::from(is_host) 5116 } 5117 5118 fn unwrap_current_guest_thread(&self) -> QualifiedThreadId { 5119 *self.current_thread.guest().unwrap() 5120 } 5121 5122 fn unwrap_current_host_thread(&self) -> TableId<HostTask> { 5123 self.current_thread.host().unwrap() 5124 } 5125 } 5126 5127 /// Provide a type hint to compiler about the shape of a parameter lower 5128 /// closure. 5129 fn for_any_lower< 5130 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, 5131 >( 5132 fun: F, 5133 ) -> F { 5134 fun 5135 } 5136 5137 /// Provide a type hint to compiler about the shape of a result lift closure. 5138 fn for_any_lift< 5139 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 5140 >( 5141 fun: F, 5142 ) -> F { 5143 fun 5144 } 5145 5146 /// Wrap the specified future in a `poll_fn` which asserts that the future is 5147 /// only polled from the event loop of the specified `Store`. 5148 /// 5149 /// See `StoreContextMut::run_concurrent` for details. 5150 fn checked<F: Future + Send + 'static>( 5151 id: StoreId, 5152 fut: F, 5153 ) -> impl Future<Output = F::Output> + Send + 'static { 5154 async move { 5155 let mut fut = pin!(fut); 5156 future::poll_fn(move |cx| { 5157 let message = "\ 5158 `Future`s which depend on asynchronous component tasks, streams, or \ 5159 futures to complete may only be polled from the event loop of the \ 5160 store to which they belong. Please use \ 5161 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ 5162 "; 5163 tls::try_get(|store| { 5164 let matched = match store { 5165 tls::TryGet::Some(store) => store.id() == id, 5166 tls::TryGet::Taken | tls::TryGet::None => false, 5167 }; 5168 5169 if !matched { 5170 panic!("{message}") 5171 } 5172 }); 5173 fut.as_mut().poll(cx) 5174 }) 5175 .await 5176 } 5177 } 5178 5179 /// Assert that `StoreContextMut::run_concurrent` has not been called from 5180 /// within an store's event loop. 5181 fn check_recursive_run() { 5182 tls::try_get(|store| { 5183 if !matches!(store, tls::TryGet::None) { 5184 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") 5185 } 5186 }); 5187 } 5188 5189 fn unpack_callback_code(code: u32) -> (u32, u32) { 5190 (code & 0xF, code >> 4) 5191 } 5192 5193 /// Helper struct for packaging parameters to be passed to 5194 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or 5195 /// `waitable-set.poll`. 5196 struct WaitableCheckParams { 5197 set: TableId<WaitableSet>, 5198 options: OptionsIndex, 5199 payload: u32, 5200 } 5201 5202 /// Indicates whether `ComponentInstance::waitable_check` is being called for 5203 /// `waitable-set.wait` or `waitable-set.poll`. 5204 enum WaitableCheck { 5205 Wait, 5206 Poll, 5207 } 5208 5209 /// Represents a guest task called from the host, prepared using `prepare_call`. 5210 pub(crate) struct PreparedCall<R> { 5211 /// The guest export to be called 5212 handle: Func, 5213 /// The guest thread created by `prepare_call` 5214 thread: QualifiedThreadId, 5215 /// The number of lowered core Wasm parameters to pass to the call. 5216 param_count: usize, 5217 /// The `oneshot::Receiver` to which the result of the call will be 5218 /// delivered when it is available. 5219 rx: oneshot::Receiver<LiftedResult>, 5220 _phantom: PhantomData<R>, 5221 } 5222 5223 impl<R> PreparedCall<R> { 5224 /// Get a copy of the `TaskId` for this `PreparedCall`. 5225 pub(crate) fn task_id(&self) -> TaskId { 5226 TaskId { 5227 task: self.thread.task, 5228 } 5229 } 5230 } 5231 5232 /// Represents a task created by `prepare_call`. 5233 pub(crate) struct TaskId { 5234 task: TableId<GuestTask>, 5235 } 5236 5237 impl TaskId { 5238 /// The host future for an async task was dropped. If the parameters have not been lowered yet, 5239 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, 5240 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended 5241 /// and can be resumed by other tasks for this component, so we mark the future as dropped 5242 /// and delete the task when all threads are done. 5243 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> { 5244 let task = store.0.concurrent_state_mut().get_mut(self.task)?; 5245 if !task.already_lowered_parameters() { 5246 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5247 } else { 5248 task.host_future_state = HostFutureState::Dropped; 5249 if task.ready_to_delete() { 5250 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5251 } 5252 } 5253 Ok(()) 5254 } 5255 } 5256 5257 /// Prepare a call to the specified exported Wasm function, providing functions 5258 /// for lowering the parameters and lifting the result. 5259 /// 5260 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event 5261 /// loop, use `queue_call`. 5262 pub(crate) fn prepare_call<T, R>( 5263 mut store: StoreContextMut<T>, 5264 handle: Func, 5265 param_count: usize, 5266 host_future_present: bool, 5267 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> 5268 + Send 5269 + Sync 5270 + 'static, 5271 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> 5272 + Send 5273 + Sync 5274 + 'static, 5275 ) -> Result<PreparedCall<R>> { 5276 let (options, _flags, ty, raw_options) = handle.abi_info(store.0); 5277 5278 let instance = handle.instance().id().get(store.0); 5279 let options = &instance.component().env_component().options[options]; 5280 let ty = &instance.component().types()[ty]; 5281 let async_function = ty.async_; 5282 let task_return_type = ty.results; 5283 let component_instance = raw_options.instance; 5284 let callback = options.callback.map(|i| instance.runtime_callback(i)); 5285 let memory = options 5286 .memory() 5287 .map(|i| instance.runtime_memory(i)) 5288 .map(SendSyncPtr::new); 5289 let string_encoding = options.string_encoding; 5290 let token = StoreToken::new(store.as_context_mut()); 5291 let state = store.0.concurrent_state_mut(); 5292 5293 let (tx, rx) = oneshot::channel(); 5294 5295 let instance = RuntimeInstance { 5296 instance: handle.instance().id().instance(), 5297 index: component_instance, 5298 }; 5299 let caller = state.current_thread; 5300 let task = GuestTask::new( 5301 state, 5302 Box::new(for_any_lower(move |store, params| { 5303 lower_params(handle, token.as_context_mut(store), params) 5304 })), 5305 LiftResult { 5306 lift: Box::new(for_any_lift(move |store, result| { 5307 lift_result(handle, store, result) 5308 })), 5309 ty: task_return_type, 5310 memory, 5311 string_encoding, 5312 }, 5313 Caller::Host { 5314 tx: Some(tx), 5315 host_future_present, 5316 caller, 5317 }, 5318 callback.map(|callback| { 5319 let callback = SendSyncPtr::new(callback); 5320 let instance = handle.instance(); 5321 Box::new(move |store: &mut dyn VMStore, event, handle| { 5322 let store = token.as_context_mut(store); 5323 // SAFETY: Per the contract of `prepare_call`, the callback 5324 // will remain valid at least as long is this task exists. 5325 unsafe { instance.call_callback(store, callback, event, handle) } 5326 }) as CallbackFn 5327 }), 5328 instance, 5329 async_function, 5330 )?; 5331 5332 let task = state.push(task)?; 5333 let thread = state.push(GuestThread::new_implicit(task))?; 5334 state.get_mut(task)?.threads.insert(thread); 5335 5336 if !store.0.may_enter(instance) { 5337 bail!(crate::Trap::CannotEnterComponent); 5338 } 5339 5340 Ok(PreparedCall { 5341 handle, 5342 thread: QualifiedThreadId { task, thread }, 5343 param_count, 5344 rx, 5345 _phantom: PhantomData, 5346 }) 5347 } 5348 5349 /// Queue a call previously prepared using `prepare_call` to be run as part of 5350 /// the associated `ComponentInstance`'s event loop. 5351 /// 5352 /// The returned future will resolve to the result once it is available, but 5353 /// must only be polled via the instance's event loop. See 5354 /// `StoreContextMut::run_concurrent` for details. 5355 pub(crate) fn queue_call<T: 'static, R: Send + 'static>( 5356 mut store: StoreContextMut<T>, 5357 prepared: PreparedCall<R>, 5358 ) -> Result<impl Future<Output = Result<R>> + Send + 'static + use<T, R>> { 5359 let PreparedCall { 5360 handle, 5361 thread, 5362 param_count, 5363 rx, 5364 .. 5365 } = prepared; 5366 5367 queue_call0(store.as_context_mut(), handle, thread, param_count)?; 5368 5369 Ok(checked( 5370 store.0.id(), 5371 rx.map(move |result| { 5372 result 5373 .map(|v| *v.downcast().unwrap()) 5374 .map_err(crate::Error::from) 5375 }), 5376 )) 5377 } 5378 5379 /// Queue a call previously prepared using `prepare_call` to be run as part of 5380 /// the associated `ComponentInstance`'s event loop. 5381 fn queue_call0<T: 'static>( 5382 store: StoreContextMut<T>, 5383 handle: Func, 5384 guest_thread: QualifiedThreadId, 5385 param_count: usize, 5386 ) -> Result<()> { 5387 let (_options, _, _ty, raw_options) = handle.abi_info(store.0); 5388 let is_concurrent = raw_options.async_; 5389 let callback = raw_options.callback; 5390 let instance = handle.instance(); 5391 let callee = handle.lifted_core_func(store.0); 5392 let post_return = handle.post_return_core_func(store.0); 5393 let callback = callback.map(|i| { 5394 let instance = instance.id().get(store.0); 5395 SendSyncPtr::new(instance.runtime_callback(i)) 5396 }); 5397 5398 log::trace!("queueing call {guest_thread:?}"); 5399 5400 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers 5401 // (with signatures appropriate for this call) and will remain valid as 5402 // long as this instance is valid. 5403 unsafe { 5404 instance.queue_call( 5405 store, 5406 guest_thread, 5407 SendSyncPtr::new(callee), 5408 param_count, 5409 1, 5410 is_concurrent, 5411 callback, 5412 post_return.map(SendSyncPtr::new), 5413 ) 5414 } 5415 } 5416