1 //! Runtime support for the Component Model Async ABI. 2 //! 3 //! This module and its submodules provide host runtime support for Component 4 //! Model Async features such as async-lifted exports, async-lowered imports, 5 //! streams, futures, and related intrinsics. See [the Async 6 //! Explainer](https://github.com/WebAssembly/component-model/blob/main/design/mvp/Concurrency.md) 7 //! for a high-level overview. 8 //! 9 //! At the core of this support is an event loop which schedules and switches 10 //! between guest tasks and any host tasks they create. Each 11 //! `Store` will have at most one event loop running at any given 12 //! time, and that loop may be suspended and resumed by the host embedder using 13 //! e.g. `StoreContextMut::run_concurrent`. The `StoreContextMut::poll_until` 14 //! function contains the loop itself, while the 15 //! `StoreOpaque::concurrent_state` field holds its state. 16 //! 17 //! # Public API Overview 18 //! 19 //! ## Top-level API (e.g. kicking off host->guest calls and driving the event loop) 20 //! 21 //! - `[Typed]Func::call_concurrent`: Start a host->guest call to an 22 //! async-lifted or sync-lifted import, creating a guest task. 23 //! 24 //! - `StoreContextMut::run_concurrent`: Run the event loop for the specified 25 //! instance, allowing any and all tasks belonging to that instance to make 26 //! progress. 27 //! 28 //! - `StoreContextMut::spawn`: Run a background task as part of the event loop 29 //! for the specified instance. 30 //! 31 //! - `{Future,Stream}Reader::new`: Create a new Component Model `future` or 32 //! `stream` which may be passed to the guest. This takes a 33 //! `{Future,Stream}Producer` implementation which will be polled for items when 34 //! the consumer requests them. 35 //! 36 //! - `{Future,Stream}Reader::pipe`: Consume a `future` or `stream` by 37 //! connecting it to a `{Future,Stream}Consumer` which will consume any items 38 //! produced by the write end. 39 //! 40 //! ## Host Task API (e.g. implementing concurrent host functions and background tasks) 41 //! 42 //! - `LinkerInstance::func_wrap_concurrent`: Register a concurrent host 43 //! function with the linker. That function will take an `Accessor` as its 44 //! first parameter, which provides access to the store between (but not across) 45 //! await points. 46 //! 47 //! - `Accessor::with`: Access the store and its associated data. 48 //! 49 //! - `Accessor::spawn`: Run a background task as part of the event loop for the 50 //! store. This is equivalent to `StoreContextMut::spawn` but more convenient to use 51 //! in host functions. 52 53 use crate::component::func::{self, Func, call_post_return}; 54 use crate::component::{ 55 HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance, 56 }; 57 use crate::fiber::{self, StoreFiber, StoreFiberYield}; 58 use crate::prelude::*; 59 use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken}; 60 use crate::vm::component::{CallContext, ComponentInstance, InstanceState}; 61 use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore}; 62 use crate::{ 63 AsContext, AsContextMut, FuncType, Result, StoreContext, StoreContextMut, ValRaw, ValType, 64 bail, error::format_err, 65 }; 66 use error_contexts::GlobalErrorContextRefCount; 67 use futures::channel::oneshot; 68 use futures::future::{self, FutureExt}; 69 use futures::stream::{FuturesUnordered, StreamExt}; 70 use futures_and_streams::{FlatAbi, ReturnCode, TransmitHandle, TransmitIndex}; 71 use std::any::Any; 72 use std::borrow::ToOwned; 73 use std::boxed::Box; 74 use std::cell::UnsafeCell; 75 use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque}; 76 use std::fmt; 77 use std::future::Future; 78 use std::marker::PhantomData; 79 use std::mem::{self, ManuallyDrop, MaybeUninit}; 80 use std::ops::DerefMut; 81 use std::pin::{Pin, pin}; 82 use std::ptr::{self, NonNull}; 83 use std::sync::Arc; 84 use std::task::{Context, Poll, Waker}; 85 use std::vec::Vec; 86 use table::{TableDebug, TableId}; 87 use wasmtime_environ::Trap; 88 use wasmtime_environ::component::{ 89 CanonicalAbiInfo, CanonicalOptions, CanonicalOptionsDataModel, MAX_FLAT_PARAMS, 90 MAX_FLAT_RESULTS, OptionsIndex, PREPARE_ASYNC_NO_RESULT, PREPARE_ASYNC_WITH_RESULT, 91 RuntimeComponentInstanceIndex, RuntimeTableIndex, StringEncoding, 92 TypeComponentGlobalErrorContextTableIndex, TypeComponentLocalErrorContextTableIndex, 93 TypeFuncIndex, TypeFutureTableIndex, TypeStreamTableIndex, TypeTupleIndex, 94 }; 95 use wasmtime_environ::packed_option::ReservedValue; 96 97 pub use abort::JoinHandle; 98 pub use future_stream_any::{FutureAny, StreamAny}; 99 pub use futures_and_streams::{ 100 Destination, DirectDestination, DirectSource, ErrorContext, FutureConsumer, FutureProducer, 101 FutureReader, GuardedFutureReader, GuardedStreamReader, ReadBuffer, Source, StreamConsumer, 102 StreamProducer, StreamReader, StreamResult, VecBuffer, WriteBuffer, 103 }; 104 pub(crate) use futures_and_streams::{ResourcePair, lower_error_context_to_index}; 105 106 mod abort; 107 mod error_contexts; 108 mod future_stream_any; 109 mod futures_and_streams; 110 pub(crate) mod table; 111 pub(crate) mod tls; 112 113 /// Constant defined in the Component Model spec to indicate that the async 114 /// intrinsic (e.g. `future.write`) has not yet completed. 115 const BLOCKED: u32 = 0xffff_ffff; 116 117 /// Corresponds to `CallState` in the upstream spec. 118 #[derive(Clone, Copy, Eq, PartialEq, Debug)] 119 pub enum Status { 120 Starting = 0, 121 Started = 1, 122 Returned = 2, 123 StartCancelled = 3, 124 ReturnCancelled = 4, 125 } 126 127 impl Status { 128 /// Packs this status and the optional `waitable` provided into a 32-bit 129 /// result that the canonical ABI requires. 130 /// 131 /// The low 4 bits are reserved for the status while the upper 28 bits are 132 /// the waitable, if present. 133 pub fn pack(self, waitable: Option<u32>) -> u32 { 134 assert!(matches!(self, Status::Returned) == waitable.is_none()); 135 let waitable = waitable.unwrap_or(0); 136 assert!(waitable < (1 << 28)); 137 (waitable << 4) | (self as u32) 138 } 139 } 140 141 /// Corresponds to `EventCode` in the Component Model spec, plus related payload 142 /// data. 143 #[derive(Clone, Copy, Debug)] 144 enum Event { 145 None, 146 Cancelled, 147 Subtask { 148 status: Status, 149 }, 150 StreamRead { 151 code: ReturnCode, 152 pending: Option<(TypeStreamTableIndex, u32)>, 153 }, 154 StreamWrite { 155 code: ReturnCode, 156 pending: Option<(TypeStreamTableIndex, u32)>, 157 }, 158 FutureRead { 159 code: ReturnCode, 160 pending: Option<(TypeFutureTableIndex, u32)>, 161 }, 162 FutureWrite { 163 code: ReturnCode, 164 pending: Option<(TypeFutureTableIndex, u32)>, 165 }, 166 } 167 168 impl Event { 169 /// Lower this event to core Wasm integers for delivery to the guest. 170 /// 171 /// Note that the waitable handle, if any, is assumed to be lowered 172 /// separately. 173 fn parts(self) -> (u32, u32) { 174 const EVENT_NONE: u32 = 0; 175 const EVENT_SUBTASK: u32 = 1; 176 const EVENT_STREAM_READ: u32 = 2; 177 const EVENT_STREAM_WRITE: u32 = 3; 178 const EVENT_FUTURE_READ: u32 = 4; 179 const EVENT_FUTURE_WRITE: u32 = 5; 180 const EVENT_CANCELLED: u32 = 6; 181 match self { 182 Event::None => (EVENT_NONE, 0), 183 Event::Cancelled => (EVENT_CANCELLED, 0), 184 Event::Subtask { status } => (EVENT_SUBTASK, status as u32), 185 Event::StreamRead { code, .. } => (EVENT_STREAM_READ, code.encode()), 186 Event::StreamWrite { code, .. } => (EVENT_STREAM_WRITE, code.encode()), 187 Event::FutureRead { code, .. } => (EVENT_FUTURE_READ, code.encode()), 188 Event::FutureWrite { code, .. } => (EVENT_FUTURE_WRITE, code.encode()), 189 } 190 } 191 } 192 193 /// Corresponds to `CallbackCode` in the spec. 194 mod callback_code { 195 pub const EXIT: u32 = 0; 196 pub const YIELD: u32 = 1; 197 pub const WAIT: u32 = 2; 198 } 199 200 /// A flag indicating that the callee is an async-lowered export. 201 /// 202 /// This may be passed to the `async-start` intrinsic from a fused adapter. 203 const START_FLAG_ASYNC_CALLEE: u32 = wasmtime_environ::component::START_FLAG_ASYNC_CALLEE as u32; 204 205 /// Provides access to either store data (via the `get` method) or the store 206 /// itself (via [`AsContext`]/[`AsContextMut`]), as well as the component 207 /// instance to which the current host task belongs. 208 /// 209 /// See [`Accessor::with`] for details. 210 pub struct Access<'a, T: 'static, D: HasData + ?Sized = HasSelf<T>> { 211 store: StoreContextMut<'a, T>, 212 get_data: fn(&mut T) -> D::Data<'_>, 213 } 214 215 impl<'a, T, D> Access<'a, T, D> 216 where 217 D: HasData + ?Sized, 218 T: 'static, 219 { 220 /// Creates a new [`Access`] from its component parts. 221 pub fn new(store: StoreContextMut<'a, T>, get_data: fn(&mut T) -> D::Data<'_>) -> Self { 222 Self { store, get_data } 223 } 224 225 /// Get mutable access to the store data. 226 pub fn data_mut(&mut self) -> &mut T { 227 self.store.data_mut() 228 } 229 230 /// Get mutable access to the store data. 231 pub fn get(&mut self) -> D::Data<'_> { 232 (self.get_data)(self.data_mut()) 233 } 234 235 /// Spawn a background task. 236 /// 237 /// See [`Accessor::spawn`] for details. 238 pub fn spawn(&mut self, task: impl AccessorTask<T, D>) -> JoinHandle 239 where 240 T: 'static, 241 { 242 let accessor = Accessor { 243 get_data: self.get_data, 244 token: StoreToken::new(self.store.as_context_mut()), 245 }; 246 self.store 247 .as_context_mut() 248 .spawn_with_accessor(accessor, task) 249 } 250 251 /// Returns the getter this accessor is using to project from `T` into 252 /// `D::Data`. 253 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 254 self.get_data 255 } 256 } 257 258 impl<'a, T, D> AsContext for Access<'a, T, D> 259 where 260 D: HasData + ?Sized, 261 T: 'static, 262 { 263 type Data = T; 264 265 fn as_context(&self) -> StoreContext<'_, T> { 266 self.store.as_context() 267 } 268 } 269 270 impl<'a, T, D> AsContextMut for Access<'a, T, D> 271 where 272 D: HasData + ?Sized, 273 T: 'static, 274 { 275 fn as_context_mut(&mut self) -> StoreContextMut<'_, T> { 276 self.store.as_context_mut() 277 } 278 } 279 280 /// Provides scoped mutable access to store data in the context of a concurrent 281 /// host task future. 282 /// 283 /// This allows multiple host task futures to execute concurrently and access 284 /// the store between (but not across) `await` points. 285 /// 286 /// # Rationale 287 /// 288 /// This structure is sort of like `&mut T` plus a projection from `&mut T` to 289 /// `D::Data<'_>`. The problem this is solving, however, is that it does not 290 /// literally store these values. The basic problem is that when a concurrent 291 /// host future is being polled it has access to `&mut T` (and the whole 292 /// `Store`) but when it's not being polled it does not have access to these 293 /// values. This reflects how the store is only ever polling one future at a 294 /// time so the store is effectively being passed between futures. 295 /// 296 /// Rust's `Future` trait, however, has no means of passing a `Store` 297 /// temporarily between futures. The [`Context`](std::task::Context) type does 298 /// not have the ability to attach arbitrary information to it at this time. 299 /// This type, [`Accessor`], is used to bridge this expressivity gap. 300 /// 301 /// The [`Accessor`] type here represents the ability to acquire, temporarily in 302 /// a synchronous manner, the current store. The [`Accessor::with`] function 303 /// yields an [`Access`] which can be used to access [`StoreContextMut`], `&mut 304 /// T`, or `D::Data<'_>`. Note though that [`Accessor::with`] intentionally does 305 /// not take an `async` closure as its argument, instead it's a synchronous 306 /// closure which must complete during on run of `Future::poll`. This reflects 307 /// how the store is temporarily made available while a host future is being 308 /// polled. 309 /// 310 /// # Implementation 311 /// 312 /// This type does not actually store `&mut T` nor `StoreContextMut<T>`, and 313 /// this type additionally doesn't even have a lifetime parameter. This is 314 /// instead a representation of proof of the ability to acquire these while a 315 /// future is being polled. Wasmtime will, when it polls a host future, 316 /// configure ambient state such that the `Accessor` that a future closes over 317 /// will work and be able to access the store. 318 /// 319 /// This has a number of implications for users such as: 320 /// 321 /// * It's intentional that `Accessor` cannot be cloned, it needs to stay within 322 /// the lifetime of a single future. 323 /// * A future is expected to, however, close over an `Accessor` and keep it 324 /// alive probably for the duration of the entire future. 325 /// * Different host futures will be given different `Accessor`s, and that's 326 /// intentional. 327 /// * The `Accessor` type is `Send` and `Sync` irrespective of `T` which 328 /// alleviates some otherwise required bounds to be written down. 329 /// 330 /// # Using `Accessor` in `Drop` 331 /// 332 /// The methods on `Accessor` are only expected to work in the context of 333 /// `Future::poll` and are not guaranteed to work in `Drop`. This is because a 334 /// host future can be dropped at any time throughout the system and Wasmtime 335 /// store context is not necessarily available at that time. It's recommended to 336 /// not use `Accessor` methods in anything connected to a `Drop` implementation 337 /// as they will panic and have unintended results. If you run into this though 338 /// feel free to file an issue on the Wasmtime repository. 339 pub struct Accessor<T: 'static, D = HasSelf<T>> 340 where 341 D: HasData + ?Sized, 342 { 343 token: StoreToken<T>, 344 get_data: fn(&mut T) -> D::Data<'_>, 345 } 346 347 /// A helper trait to take any type of accessor-with-data in functions. 348 /// 349 /// This trait is similar to [`AsContextMut`] except that it's used when 350 /// working with an [`Accessor`] instead of a [`StoreContextMut`]. The 351 /// [`Accessor`] is the main type used in concurrent settings and is passed to 352 /// functions such as [`Func::call_concurrent`]. 353 /// 354 /// This trait is implemented for [`Accessor`] and `&T` where `T` implements 355 /// this trait. This effectively means that regardless of the `D` in 356 /// `Accessor<T, D>` it can still be passed to a function which just needs a 357 /// store accessor. 358 /// 359 /// Acquiring an [`Accessor`] can be done through 360 /// [`StoreContextMut::run_concurrent`] for example or in a host function 361 /// through 362 /// [`Linker::func_wrap_concurrent`](crate::component::LinkerInstance::func_wrap_concurrent). 363 pub trait AsAccessor { 364 /// The `T` in `Store<T>` that this accessor refers to. 365 type Data: 'static; 366 367 /// The `D` in `Accessor<T, D>`, or the projection out of 368 /// `Self::Data`. 369 type AccessorData: HasData + ?Sized; 370 371 /// Returns the accessor that this is referring to. 372 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData>; 373 } 374 375 impl<T: AsAccessor + ?Sized> AsAccessor for &T { 376 type Data = T::Data; 377 type AccessorData = T::AccessorData; 378 379 fn as_accessor(&self) -> &Accessor<Self::Data, Self::AccessorData> { 380 T::as_accessor(self) 381 } 382 } 383 384 impl<T, D: HasData + ?Sized> AsAccessor for Accessor<T, D> { 385 type Data = T; 386 type AccessorData = D; 387 388 fn as_accessor(&self) -> &Accessor<T, D> { 389 self 390 } 391 } 392 393 // Note that it is intentional at this time that `Accessor` does not actually 394 // store `&mut T` or anything similar. This distinctly enables the `Accessor` 395 // structure to be both `Send` and `Sync` regardless of what `T` is (or `D` for 396 // that matter). This is used to ergonomically simplify bindings where the 397 // majority of the time `Accessor` is closed over in a future which then needs 398 // to be `Send` and `Sync`. To avoid needing to write `T: Send` everywhere (as 399 // you already have to write `T: 'static`...) it helps to avoid this. 400 // 401 // Note as well that `Accessor` doesn't actually store its data at all. Instead 402 // it's more of a "proof" of what can be accessed from TLS. API design around 403 // `Accessor` and functions like `Linker::func_wrap_concurrent` are 404 // intentionally made to ensure that `Accessor` is ideally only used in the 405 // context that TLS variables are actually set. For example host functions are 406 // given `&Accessor`, not `Accessor`, and this prevents them from persisting 407 // the value outside of a future. Within the future the TLS variables are all 408 // guaranteed to be set while the future is being polled. 409 // 410 // Finally though this is not an ironclad guarantee, but nor does it need to be. 411 // The TLS APIs are designed to panic or otherwise model usage where they're 412 // called recursively or similar. It's hoped that code cannot be constructed to 413 // actually hit this at runtime but this is not a safety requirement at this 414 // time. 415 const _: () = { 416 const fn assert<T: Send + Sync>() {} 417 assert::<Accessor<UnsafeCell<u32>>>(); 418 }; 419 420 impl<T> Accessor<T> { 421 /// Creates a new `Accessor` backed by the specified functions. 422 /// 423 /// - `get`: used to retrieve the store 424 /// 425 /// - `get_data`: used to "project" from the store's associated data to 426 /// another type (e.g. a field of that data or a wrapper around it). 427 /// 428 /// - `spawn`: used to queue spawned background tasks to be run later 429 pub(crate) fn new(token: StoreToken<T>) -> Self { 430 Self { 431 token, 432 get_data: |x| x, 433 } 434 } 435 } 436 437 impl<T, D> Accessor<T, D> 438 where 439 D: HasData + ?Sized, 440 { 441 /// Run the specified closure, passing it mutable access to the store. 442 /// 443 /// This function is one of the main building blocks of the [`Accessor`] 444 /// type. This yields synchronous, blocking, access to the store via an 445 /// [`Access`]. The [`Access`] implements [`AsContextMut`] in addition to 446 /// providing the ability to access `D` via [`Access::get`]. Note that the 447 /// `fun` here is given only temporary access to the store and `T`/`D` 448 /// meaning that the return value `R` here is not allowed to capture borrows 449 /// into the two. If access is needed to data within `T` or `D` outside of 450 /// this closure then it must be `clone`d out, for example. 451 /// 452 /// # Panics 453 /// 454 /// This function will panic if it is call recursively with any other 455 /// accessor already in scope. For example if `with` is called within `fun`, 456 /// then this function will panic. It is up to the embedder to ensure that 457 /// this does not happen. 458 pub fn with<R>(&self, fun: impl FnOnce(Access<'_, T, D>) -> R) -> R { 459 tls::get(|vmstore| { 460 fun(Access { 461 store: self.token.as_context_mut(vmstore), 462 get_data: self.get_data, 463 }) 464 }) 465 } 466 467 /// Returns the getter this accessor is using to project from `T` into 468 /// `D::Data`. 469 pub fn getter(&self) -> fn(&mut T) -> D::Data<'_> { 470 self.get_data 471 } 472 473 /// Changes this accessor to access `D2` instead of the current type 474 /// parameter `D`. 475 /// 476 /// This changes the underlying data access from `T` to `D2::Data<'_>`. 477 /// 478 /// # Panics 479 /// 480 /// When using this API the returned value is disconnected from `&self` and 481 /// the lifetime binding the `self` argument. An `Accessor` only works 482 /// within the context of the closure or async closure that it was 483 /// originally given to, however. This means that due to the fact that the 484 /// returned value has no lifetime connection it's possible to use the 485 /// accessor outside of `&self`, the original accessor, and panic. 486 /// 487 /// The returned value should only be used within the scope of the original 488 /// `Accessor` that `self` refers to. 489 pub fn with_getter<D2: HasData>( 490 &self, 491 get_data: fn(&mut T) -> D2::Data<'_>, 492 ) -> Accessor<T, D2> { 493 Accessor { 494 token: self.token, 495 get_data, 496 } 497 } 498 499 /// Spawn a background task which will receive an `&Accessor<T, D>` and 500 /// run concurrently with any other tasks in progress for the current 501 /// store. 502 /// 503 /// This is particularly useful for host functions which return a `stream` 504 /// or `future` such that the code to write to the write end of that 505 /// `stream` or `future` must run after the function returns. 506 /// 507 /// The returned [`JoinHandle`] may be used to cancel the task. 508 /// 509 /// # Panics 510 /// 511 /// Panics if called within a closure provided to the [`Accessor::with`] 512 /// function. This can only be called outside an active invocation of 513 /// [`Accessor::with`]. 514 pub fn spawn(&self, task: impl AccessorTask<T, D>) -> JoinHandle 515 where 516 T: 'static, 517 { 518 let accessor = self.clone_for_spawn(); 519 self.with(|mut access| access.as_context_mut().spawn_with_accessor(accessor, task)) 520 } 521 522 fn clone_for_spawn(&self) -> Self { 523 Self { 524 token: self.token, 525 get_data: self.get_data, 526 } 527 } 528 } 529 530 /// Represents a task which may be provided to `Accessor::spawn`, 531 /// `Accessor::forward`, or `StorecContextMut::spawn`. 532 // TODO: Replace this with `std::ops::AsyncFnOnce` when that becomes a viable 533 // option. 534 // 535 // As of this writing, it's not possible to specify e.g. `Send` and `Sync` 536 // bounds on the `Future` type returned by an `AsyncFnOnce`. Also, using `F: 537 // Future<Output = Result<()>> + Send + Sync, FN: FnOnce(&Accessor<T>) -> F + 538 // Send + Sync + 'static` fails with a type mismatch error when we try to pass 539 // it an async closure (e.g. `async move |_| { ... }`). So this seems to be the 540 // best we can do for the time being. 541 pub trait AccessorTask<T, D = HasSelf<T>>: Send + 'static 542 where 543 D: HasData + ?Sized, 544 { 545 /// Run the task. 546 fn run(self, accessor: &Accessor<T, D>) -> impl Future<Output = Result<()>> + Send; 547 } 548 549 /// Represents parameter and result metadata for the caller side of a 550 /// guest->guest call orchestrated by a fused adapter. 551 enum CallerInfo { 552 /// Metadata for a call to an async-lowered import 553 Async { 554 params: Vec<ValRaw>, 555 has_result: bool, 556 }, 557 /// Metadata for a call to an sync-lowered import 558 Sync { 559 params: Vec<ValRaw>, 560 result_count: u32, 561 }, 562 } 563 564 /// Indicates how a guest task is waiting on a waitable set. 565 enum WaitMode { 566 /// The guest task is waiting using `task.wait` 567 Fiber(StoreFiber<'static>), 568 /// The guest task is waiting via a callback declared as part of an 569 /// async-lifted export. 570 Callback(Instance), 571 } 572 573 /// Represents the reason a fiber is suspending itself. 574 #[derive(Debug)] 575 enum SuspendReason { 576 /// The fiber is waiting for an event to be delivered to the specified 577 /// waitable set or task. 578 Waiting { 579 set: TableId<WaitableSet>, 580 thread: QualifiedThreadId, 581 skip_may_block_check: bool, 582 }, 583 /// The fiber has finished handling its most recent work item and is waiting 584 /// for another (or to be dropped if it is no longer needed). 585 NeedWork, 586 /// The fiber is yielding and should be resumed once other tasks have had a 587 /// chance to run. 588 Yielding { 589 thread: QualifiedThreadId, 590 skip_may_block_check: bool, 591 }, 592 /// The fiber was explicitly suspended with a call to `thread.suspend` or `thread.switch-to`. 593 ExplicitlySuspending { 594 thread: QualifiedThreadId, 595 skip_may_block_check: bool, 596 }, 597 } 598 599 /// Represents a pending call into guest code for a given guest task. 600 enum GuestCallKind { 601 /// Indicates there's an event to deliver to the task, possibly related to a 602 /// waitable set the task has been waiting on or polling. 603 DeliverEvent { 604 /// The instance to which the task belongs. 605 instance: Instance, 606 /// The waitable set the event belongs to, if any. 607 /// 608 /// If this is `None` the event will be waiting in the 609 /// `GuestTask::event` field for the task. 610 set: Option<TableId<WaitableSet>>, 611 }, 612 /// Indicates that a new guest task call is pending and may be executed 613 /// using the specified closure. 614 /// 615 /// If the closure returns `Ok(Some(call))`, the `call` should be run 616 /// immediately using `handle_guest_call`. 617 StartImplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync>), 618 StartExplicit(Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send + Sync>), 619 } 620 621 impl fmt::Debug for GuestCallKind { 622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 623 match self { 624 Self::DeliverEvent { instance, set } => f 625 .debug_struct("DeliverEvent") 626 .field("instance", instance) 627 .field("set", set) 628 .finish(), 629 Self::StartImplicit(_) => f.debug_tuple("StartImplicit").finish(), 630 Self::StartExplicit(_) => f.debug_tuple("StartExplicit").finish(), 631 } 632 } 633 } 634 635 /// The target of a suspension intrinsic. 636 #[derive(Copy, Clone, Debug)] 637 pub enum SuspensionTarget { 638 SomeSuspended(u32), 639 Some(u32), 640 None, 641 } 642 643 impl SuspensionTarget { 644 fn is_none(&self) -> bool { 645 matches!(self, SuspensionTarget::None) 646 } 647 fn is_some(&self) -> bool { 648 !self.is_none() 649 } 650 } 651 652 /// Represents a pending call into guest code for a given guest thread. 653 #[derive(Debug)] 654 struct GuestCall { 655 thread: QualifiedThreadId, 656 kind: GuestCallKind, 657 } 658 659 impl GuestCall { 660 /// Returns whether or not the call is ready to run. 661 /// 662 /// A call will not be ready to run if either: 663 /// 664 /// - the (sub-)component instance to be called has already been entered and 665 /// cannot be reentered until an in-progress call completes 666 /// 667 /// - the call is for a not-yet started task and the (sub-)component 668 /// instance to be called has backpressure enabled 669 fn is_ready(&self, store: &mut StoreOpaque) -> Result<bool> { 670 let instance = store 671 .concurrent_state_mut() 672 .get_mut(self.thread.task)? 673 .instance; 674 let state = store.instance_state(instance).concurrent_state(); 675 676 let ready = match &self.kind { 677 GuestCallKind::DeliverEvent { .. } => !state.do_not_enter, 678 GuestCallKind::StartImplicit(_) => !(state.do_not_enter || state.backpressure > 0), 679 GuestCallKind::StartExplicit(_) => true, 680 }; 681 log::trace!( 682 "call {self:?} ready? {ready} (do_not_enter: {}; backpressure: {})", 683 state.do_not_enter, 684 state.backpressure 685 ); 686 Ok(ready) 687 } 688 } 689 690 /// Job to be run on a worker fiber. 691 enum WorkerItem { 692 GuestCall(GuestCall), 693 Function(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 694 } 695 696 /// Represents a pending work item to be handled by the event loop for a given 697 /// component instance. 698 enum WorkItem { 699 /// A host task to be pushed to `ConcurrentState::futures`. 700 PushFuture(AlwaysMut<HostTaskFuture>), 701 /// A fiber to resume. 702 ResumeFiber(StoreFiber<'static>), 703 /// A thread to resume. 704 ResumeThread(RuntimeComponentInstanceIndex, QualifiedThreadId), 705 /// A pending call into guest code for a given guest task. 706 GuestCall(RuntimeComponentInstanceIndex, GuestCall), 707 /// A job to run on a worker fiber. 708 WorkerFunction(AlwaysMut<Box<dyn FnOnce(&mut dyn VMStore) -> Result<()> + Send>>), 709 } 710 711 impl fmt::Debug for WorkItem { 712 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 713 match self { 714 Self::PushFuture(_) => f.debug_tuple("PushFuture").finish(), 715 Self::ResumeFiber(_) => f.debug_tuple("ResumeFiber").finish(), 716 Self::ResumeThread(instance, thread) => f 717 .debug_tuple("ResumeThread") 718 .field(instance) 719 .field(thread) 720 .finish(), 721 Self::GuestCall(instance, call) => f 722 .debug_tuple("GuestCall") 723 .field(instance) 724 .field(call) 725 .finish(), 726 Self::WorkerFunction(_) => f.debug_tuple("WorkerFunction").finish(), 727 } 728 } 729 } 730 731 /// Whether a suspension intrinsic was cancelled or completed 732 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 733 pub(crate) enum WaitResult { 734 Cancelled, 735 Completed, 736 } 737 738 /// Poll the specified future until it completes on behalf of a guest->host call 739 /// using a sync-lowered import. 740 /// 741 /// This is similar to `Instance::first_poll` except it's for sync-lowered 742 /// imports, meaning we don't need to handle cancellation and we can block the 743 /// caller until the task completes, at which point the caller can handle 744 /// lowering the result to the guest's stack and linear memory. 745 pub(crate) fn poll_and_block<R: Send + Sync + 'static>( 746 store: &mut dyn VMStore, 747 future: impl Future<Output = Result<R>> + Send + 'static, 748 ) -> Result<R> { 749 let state = store.concurrent_state_mut(); 750 let task = state.unwrap_current_host_thread(); 751 752 // Wrap the future in a closure which will take care of stashing the result 753 // in `GuestTask::result` and resuming this fiber when the host task 754 // completes. 755 let mut future = Box::pin(async move { 756 let result = future.await?; 757 tls::get(move |store| { 758 let state = store.concurrent_state_mut(); 759 state.get_mut(task)?.result = Some(Box::new(result) as _); 760 761 Waitable::Host(task).set_event( 762 state, 763 Some(Event::Subtask { 764 status: Status::Returned, 765 }), 766 )?; 767 768 Ok(()) 769 }) 770 }) as HostTaskFuture; 771 772 // Finally, poll the future. We can use a dummy `Waker` here because we'll 773 // add the future to `ConcurrentState::futures` and poll it automatically 774 // from the event loop if it doesn't complete immediately here. 775 let poll = tls::set(store, || { 776 future 777 .as_mut() 778 .poll(&mut Context::from_waker(&Waker::noop())) 779 }); 780 781 match poll { 782 // It completed immediately; check the result and delete the task. 783 Poll::Ready(result) => result?, 784 785 // It did not complete immediately; add it to 786 // `ConcurrentState::futures` so it will be polled via the event loop; 787 // then use `GuestTask::sync_call_set` to wait for the task to 788 // complete, suspending the current fiber until it does so. 789 Poll::Pending => { 790 let state = store.concurrent_state_mut(); 791 state.push_future(future); 792 793 let caller = state.get_mut(task)?.caller; 794 let set = state.get_mut(caller.task)?.sync_call_set; 795 Waitable::Host(task).join(state, Some(set))?; 796 797 store.suspend(SuspendReason::Waiting { 798 set, 799 thread: caller, 800 skip_may_block_check: false, 801 })?; 802 803 // Remove the `task` from the `sync_call_set` to ensure that when 804 // this function returns and the task is deleted that there are no 805 // more lingering references to this host task. 806 Waitable::Host(task).join(store.concurrent_state_mut(), None)?; 807 } 808 } 809 810 // Retrieve and return the result. 811 Ok(*store 812 .concurrent_state_mut() 813 .get_mut(task)? 814 .result 815 .take() 816 .unwrap() 817 .downcast() 818 .unwrap()) 819 } 820 821 /// Execute the specified guest call. 822 fn handle_guest_call(store: &mut dyn VMStore, call: GuestCall) -> Result<()> { 823 let mut next = Some(call); 824 while let Some(call) = next.take() { 825 match call.kind { 826 GuestCallKind::DeliverEvent { instance, set } => { 827 let (event, waitable) = instance 828 .get_event(store, call.thread.task, set, true)? 829 .unwrap(); 830 let state = store.concurrent_state_mut(); 831 let task = state.get_mut(call.thread.task)?; 832 let runtime_instance = task.instance; 833 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 834 835 log::trace!( 836 "use callback to deliver event {event:?} to {:?} for {waitable:?}", 837 call.thread, 838 ); 839 840 let old_thread = store.set_thread(call.thread); 841 log::trace!( 842 "GuestCallKind::DeliverEvent: replaced {old_thread:?} with {:?} as current thread", 843 call.thread 844 ); 845 846 store.enter_instance(runtime_instance); 847 848 let callback = store 849 .concurrent_state_mut() 850 .get_mut(call.thread.task)? 851 .callback 852 .take() 853 .unwrap(); 854 855 let code = callback(store, event, handle)?; 856 857 store 858 .concurrent_state_mut() 859 .get_mut(call.thread.task)? 860 .callback = Some(callback); 861 862 store.exit_instance(runtime_instance)?; 863 864 store.set_thread(old_thread); 865 866 next = instance.handle_callback_code( 867 store, 868 call.thread, 869 runtime_instance.index, 870 code, 871 )?; 872 873 log::trace!( 874 "GuestCallKind::DeliverEvent: restored {old_thread:?} as current thread" 875 ); 876 } 877 GuestCallKind::StartImplicit(fun) => { 878 next = fun(store)?; 879 } 880 GuestCallKind::StartExplicit(fun) => { 881 fun(store)?; 882 } 883 } 884 } 885 886 Ok(()) 887 } 888 889 impl<T> Store<T> { 890 /// Convenience wrapper for [`StoreContextMut::run_concurrent`]. 891 pub async fn run_concurrent<R>(&mut self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 892 where 893 T: Send + 'static, 894 { 895 ensure!( 896 self.as_context().0.concurrency_support(), 897 "cannot use `run_concurrent` when Config::concurrency_support disabled", 898 ); 899 self.as_context_mut().run_concurrent(fun).await 900 } 901 902 #[doc(hidden)] 903 pub fn assert_concurrent_state_empty(&mut self) { 904 self.as_context_mut().assert_concurrent_state_empty(); 905 } 906 907 #[doc(hidden)] 908 pub fn concurrent_state_table_size(&mut self) -> usize { 909 self.as_context_mut().concurrent_state_table_size() 910 } 911 912 /// Convenience wrapper for [`StoreContextMut::spawn`]. 913 pub fn spawn(&mut self, task: impl AccessorTask<T, HasSelf<T>>) -> JoinHandle 914 where 915 T: 'static, 916 { 917 self.as_context_mut().spawn(task) 918 } 919 } 920 921 impl<T> StoreContextMut<'_, T> { 922 /// Assert that all the relevant tables and queues in the concurrent state 923 /// for this store are empty. 924 /// 925 /// This is for sanity checking in integration tests 926 /// (e.g. `component-async-tests`) that the relevant state has been cleared 927 /// after each test concludes. This should help us catch leaks, e.g. guest 928 /// tasks which haven't been deleted despite having completed and having 929 /// been dropped by their supertasks. 930 /// 931 /// Only intended for use in Wasmtime's own testing. 932 #[doc(hidden)] 933 pub fn assert_concurrent_state_empty(self) { 934 let store = self.0; 935 store 936 .store_data_mut() 937 .components 938 .assert_instance_states_empty(); 939 let state = store.concurrent_state_mut(); 940 assert!( 941 state.table.get_mut().is_empty(), 942 "non-empty table: {:?}", 943 state.table.get_mut() 944 ); 945 assert!(state.high_priority.is_empty()); 946 assert!(state.low_priority.is_empty()); 947 assert!(state.current_thread.is_none()); 948 assert!(state.futures.get_mut().as_ref().unwrap().is_empty()); 949 assert!(state.global_error_context_ref_counts.is_empty()); 950 } 951 952 /// Helper function to perform tests over the size of the concurrent state 953 /// table which can be useful for detecting leaks. 954 /// 955 /// Only intended for use in Wasmtime's own testing. 956 #[doc(hidden)] 957 pub fn concurrent_state_table_size(&mut self) -> usize { 958 self.0 959 .concurrent_state_mut() 960 .table 961 .get_mut() 962 .iter_mut() 963 .count() 964 } 965 966 /// Spawn a background task to run as part of this instance's event loop. 967 /// 968 /// The task will receive an `&Accessor<U>` and run concurrently with 969 /// any other tasks in progress for the instance. 970 /// 971 /// Note that the task will only make progress if and when the event loop 972 /// for this instance is run. 973 /// 974 /// The returned [`JoinHandle`] may be used to cancel the task. 975 pub fn spawn(mut self, task: impl AccessorTask<T>) -> JoinHandle 976 where 977 T: 'static, 978 { 979 let accessor = Accessor::new(StoreToken::new(self.as_context_mut())); 980 self.spawn_with_accessor(accessor, task) 981 } 982 983 /// Internal implementation of `spawn` functions where a `store` is 984 /// available along with an `Accessor`. 985 fn spawn_with_accessor<D>( 986 self, 987 accessor: Accessor<T, D>, 988 task: impl AccessorTask<T, D>, 989 ) -> JoinHandle 990 where 991 T: 'static, 992 D: HasData + ?Sized, 993 { 994 // Create an "abortable future" here where internally the future will 995 // hook calls to poll and possibly spawn more background tasks on each 996 // iteration. 997 let (handle, future) = JoinHandle::run(async move { task.run(&accessor).await }); 998 self.0 999 .concurrent_state_mut() 1000 .push_future(Box::pin(async move { future.await.unwrap_or(Ok(())) })); 1001 handle 1002 } 1003 1004 /// Run the specified closure `fun` to completion as part of this store's 1005 /// event loop. 1006 /// 1007 /// This will run `fun` as part of this store's event loop until it 1008 /// yields a result. `fun` is provided an [`Accessor`], which provides 1009 /// controlled access to the store and its data. 1010 /// 1011 /// This function can be used to invoke [`Func::call_concurrent`] for 1012 /// example within the async closure provided here. 1013 /// 1014 /// This function will unconditionally return an error if 1015 /// [`Config::concurrency_support`] is disabled. 1016 /// 1017 /// [`Config::concurrency_support`]: crate::Config::concurrency_support 1018 /// 1019 /// # Store-blocking behavior 1020 /// 1021 /// At this time there are certain situations in which the `Future` returned 1022 /// by the `AsyncFnOnce` passed to this function will not be polled for an 1023 /// extended period of time, despite one or more `Waker::wake` events having 1024 /// occurred for the task to which it belongs. This can manifest as the 1025 /// `Future` seeming to be "blocked" or "locked up", but is actually due to 1026 /// the `Store` being held by e.g. a blocking host function, preventing the 1027 /// `Future` from being polled. A canonical example of this is when the 1028 /// `fun` provided to this function attempts to set a timeout for an 1029 /// invocation of a wasm function. In this situation the async closure is 1030 /// waiting both on (a) the wasm computation to finish, and (b) the timeout 1031 /// to elapse. At this time this setup will not always work and the timeout 1032 /// may not reliably fire. 1033 /// 1034 /// This function will not block the current thread and as such is always 1035 /// suitable to run in an `async` context, but the current implementation of 1036 /// Wasmtime can lead to situations where a certain wasm computation is 1037 /// required to make progress the closure to make progress. This is an 1038 /// artifact of Wasmtime's historical implementation of `async` functions 1039 /// and is the topic of [#11869] and [#11870]. In the timeout example from 1040 /// above it means that Wasmtime can get "wedged" for a bit where (a) must 1041 /// progress for a readiness notification of (b) to get delivered. 1042 /// 1043 /// This effectively means that it's not possible to reliably perform a 1044 /// "select" operation within the `fun` closure, which timeouts for example 1045 /// are based on. Fixing this requires some relatively major refactoring 1046 /// work within Wasmtime itself. This is a known pitfall otherwise and one 1047 /// that is intended to be fixed one day. In the meantime it's recommended 1048 /// to apply timeouts or such to the entire `run_concurrent` call itself 1049 /// rather than internally. 1050 /// 1051 /// [#11869]: https://github.com/bytecodealliance/wasmtime/issues/11869 1052 /// [#11870]: https://github.com/bytecodealliance/wasmtime/issues/11870 1053 /// 1054 /// # Example 1055 /// 1056 /// ``` 1057 /// # use { 1058 /// # wasmtime::{ 1059 /// # error::{Result}, 1060 /// # component::{ Component, Linker, Resource, ResourceTable}, 1061 /// # Config, Engine, Store 1062 /// # }, 1063 /// # }; 1064 /// # 1065 /// # struct MyResource(u32); 1066 /// # struct Ctx { table: ResourceTable } 1067 /// # 1068 /// # async fn foo() -> Result<()> { 1069 /// # let mut config = Config::new(); 1070 /// # let engine = Engine::new(&config)?; 1071 /// # let mut store = Store::new(&engine, Ctx { table: ResourceTable::new() }); 1072 /// # let mut linker = Linker::new(&engine); 1073 /// # let component = Component::new(&engine, "")?; 1074 /// # let instance = linker.instantiate_async(&mut store, &component).await?; 1075 /// # let foo = instance.get_typed_func::<(Resource<MyResource>,), (Resource<MyResource>,)>(&mut store, "foo")?; 1076 /// # let bar = instance.get_typed_func::<(u32,), ()>(&mut store, "bar")?; 1077 /// store.run_concurrent(async |accessor| -> wasmtime::Result<_> { 1078 /// let resource = accessor.with(|mut access| access.get().table.push(MyResource(42)))?; 1079 /// let (another_resource,) = foo.call_concurrent(accessor, (resource,)).await?.0; 1080 /// let value = accessor.with(|mut access| access.get().table.delete(another_resource))?; 1081 /// bar.call_concurrent(accessor, (value.0,)).await?; 1082 /// Ok(()) 1083 /// }).await??; 1084 /// # Ok(()) 1085 /// # } 1086 /// ``` 1087 pub async fn run_concurrent<R>(self, fun: impl AsyncFnOnce(&Accessor<T>) -> R) -> Result<R> 1088 where 1089 T: Send + 'static, 1090 { 1091 ensure!( 1092 self.0.concurrency_support(), 1093 "cannot use `run_concurrent` when Config::concurrency_support disabled", 1094 ); 1095 self.do_run_concurrent(fun, false).await 1096 } 1097 1098 pub(super) async fn run_concurrent_trap_on_idle<R>( 1099 self, 1100 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1101 ) -> Result<R> 1102 where 1103 T: Send + 'static, 1104 { 1105 self.do_run_concurrent(fun, true).await 1106 } 1107 1108 async fn do_run_concurrent<R>( 1109 mut self, 1110 fun: impl AsyncFnOnce(&Accessor<T>) -> R, 1111 trap_on_idle: bool, 1112 ) -> Result<R> 1113 where 1114 T: Send + 'static, 1115 { 1116 debug_assert!(self.0.concurrency_support()); 1117 check_recursive_run(); 1118 let token = StoreToken::new(self.as_context_mut()); 1119 1120 struct Dropper<'a, T: 'static, V> { 1121 store: StoreContextMut<'a, T>, 1122 value: ManuallyDrop<V>, 1123 } 1124 1125 impl<'a, T, V> Drop for Dropper<'a, T, V> { 1126 fn drop(&mut self) { 1127 tls::set(self.store.0, || { 1128 // SAFETY: Here we drop the value without moving it for the 1129 // first and only time -- per the contract for `Drop::drop`, 1130 // this code won't run again, and the `value` field will no 1131 // longer be accessible. 1132 unsafe { ManuallyDrop::drop(&mut self.value) } 1133 }); 1134 } 1135 } 1136 1137 let accessor = &Accessor::new(token); 1138 let dropper = &mut Dropper { 1139 store: self, 1140 value: ManuallyDrop::new(fun(accessor)), 1141 }; 1142 // SAFETY: We never move `dropper` nor its `value` field. 1143 let future = unsafe { Pin::new_unchecked(dropper.value.deref_mut()) }; 1144 1145 dropper 1146 .store 1147 .as_context_mut() 1148 .poll_until(future, trap_on_idle) 1149 .await 1150 } 1151 1152 /// Run this store's event loop. 1153 /// 1154 /// The returned future will resolve when the specified future completes or, 1155 /// if `trap_on_idle` is true, when the event loop can't make further 1156 /// progress. 1157 async fn poll_until<R>( 1158 mut self, 1159 mut future: Pin<&mut impl Future<Output = R>>, 1160 trap_on_idle: bool, 1161 ) -> Result<R> 1162 where 1163 T: Send + 'static, 1164 { 1165 struct Reset<'a, T: 'static> { 1166 store: StoreContextMut<'a, T>, 1167 futures: Option<FuturesUnordered<HostTaskFuture>>, 1168 } 1169 1170 impl<'a, T> Drop for Reset<'a, T> { 1171 fn drop(&mut self) { 1172 if let Some(futures) = self.futures.take() { 1173 *self.store.0.concurrent_state_mut().futures.get_mut() = Some(futures); 1174 } 1175 } 1176 } 1177 1178 loop { 1179 // Take `ConcurrentState::futures` out of the store so we can poll 1180 // it while also safely giving any of the futures inside access to 1181 // `self`. 1182 let futures = self.0.concurrent_state_mut().futures.get_mut().take(); 1183 let mut reset = Reset { 1184 store: self.as_context_mut(), 1185 futures, 1186 }; 1187 let mut next = pin!(reset.futures.as_mut().unwrap().next()); 1188 1189 enum PollResult<R> { 1190 Complete(R), 1191 ProcessWork(Vec<WorkItem>), 1192 } 1193 let result = future::poll_fn(|cx| { 1194 // First, poll the future we were passed as an argument and 1195 // return immediately if it's ready. 1196 if let Poll::Ready(value) = tls::set(reset.store.0, || future.as_mut().poll(cx)) { 1197 return Poll::Ready(Ok(PollResult::Complete(value))); 1198 } 1199 1200 // Next, poll `ConcurrentState::futures` (which includes any 1201 // pending host tasks and/or background tasks), returning 1202 // immediately if one of them fails. 1203 let next = match tls::set(reset.store.0, || next.as_mut().poll(cx)) { 1204 Poll::Ready(Some(output)) => { 1205 match output { 1206 Err(e) => return Poll::Ready(Err(e)), 1207 Ok(()) => {} 1208 } 1209 Poll::Ready(true) 1210 } 1211 Poll::Ready(None) => Poll::Ready(false), 1212 Poll::Pending => Poll::Pending, 1213 }; 1214 1215 // Next, collect the next batch of work items to process, if any. 1216 // This will be either all of the high-priority work items, or if 1217 // there are none, a single low-priority work item. 1218 let state = reset.store.0.concurrent_state_mut(); 1219 let ready = state.collect_work_items_to_run(); 1220 if !ready.is_empty() { 1221 return Poll::Ready(Ok(PollResult::ProcessWork(ready))); 1222 } 1223 1224 // Finally, if we have nothing else to do right now, determine what to do 1225 // based on whether there are any pending futures in 1226 // `ConcurrentState::futures`. 1227 return match next { 1228 Poll::Ready(true) => { 1229 // In this case, one of the futures in 1230 // `ConcurrentState::futures` completed 1231 // successfully, so we return now and continue 1232 // the outer loop in case there is another one 1233 // ready to complete. 1234 Poll::Ready(Ok(PollResult::ProcessWork(Vec::new()))) 1235 } 1236 Poll::Ready(false) => { 1237 // Poll the future we were passed one last time 1238 // in case one of `ConcurrentState::futures` had 1239 // the side effect of unblocking it. 1240 if let Poll::Ready(value) = 1241 tls::set(reset.store.0, || future.as_mut().poll(cx)) 1242 { 1243 Poll::Ready(Ok(PollResult::Complete(value))) 1244 } else { 1245 // In this case, there are no more pending 1246 // futures in `ConcurrentState::futures`, 1247 // there are no remaining work items, _and_ 1248 // the future we were passed as an argument 1249 // still hasn't completed. 1250 if trap_on_idle { 1251 // `trap_on_idle` is true, so we exit 1252 // immediately. 1253 Poll::Ready(Err(format_err!(crate::Trap::AsyncDeadlock))) 1254 } else { 1255 // `trap_on_idle` is false, so we assume 1256 // that future will wake up and give us 1257 // more work to do when it's ready to. 1258 Poll::Pending 1259 } 1260 } 1261 } 1262 // There is at least one pending future in 1263 // `ConcurrentState::futures` and we have nothing 1264 // else to do but wait for now, so we return 1265 // `Pending`. 1266 Poll::Pending => Poll::Pending, 1267 }; 1268 }) 1269 .await; 1270 1271 // Put the `ConcurrentState::futures` back into the store before we 1272 // return or handle any work items since one or more of those items 1273 // might append more futures. 1274 drop(reset); 1275 1276 match result? { 1277 // The future we were passed as an argument completed, so we 1278 // return the result. 1279 PollResult::Complete(value) => break Ok(value), 1280 // The future we were passed has not yet completed, so handle 1281 // any work items and then loop again. 1282 PollResult::ProcessWork(ready) => { 1283 struct Dispose<'a, T: 'static, I: Iterator<Item = WorkItem>> { 1284 store: StoreContextMut<'a, T>, 1285 ready: I, 1286 } 1287 1288 impl<'a, T, I: Iterator<Item = WorkItem>> Drop for Dispose<'a, T, I> { 1289 fn drop(&mut self) { 1290 while let Some(item) = self.ready.next() { 1291 match item { 1292 WorkItem::ResumeFiber(mut fiber) => fiber.dispose(self.store.0), 1293 WorkItem::PushFuture(future) => { 1294 tls::set(self.store.0, move || drop(future)) 1295 } 1296 _ => {} 1297 } 1298 } 1299 } 1300 } 1301 1302 let mut dispose = Dispose { 1303 store: self.as_context_mut(), 1304 ready: ready.into_iter(), 1305 }; 1306 1307 while let Some(item) = dispose.ready.next() { 1308 dispose 1309 .store 1310 .as_context_mut() 1311 .handle_work_item(item) 1312 .await?; 1313 } 1314 } 1315 } 1316 } 1317 } 1318 1319 /// Handle the specified work item, possibly resuming a fiber if applicable. 1320 async fn handle_work_item(self, item: WorkItem) -> Result<()> 1321 where 1322 T: Send, 1323 { 1324 log::trace!("handle work item {item:?}"); 1325 match item { 1326 WorkItem::PushFuture(future) => { 1327 self.0 1328 .concurrent_state_mut() 1329 .futures 1330 .get_mut() 1331 .as_mut() 1332 .unwrap() 1333 .push(future.into_inner()); 1334 } 1335 WorkItem::ResumeFiber(fiber) => { 1336 self.0.resume_fiber(fiber).await?; 1337 } 1338 WorkItem::ResumeThread(_, thread) => { 1339 if let GuestThreadState::Ready(fiber) = mem::replace( 1340 &mut self.0.concurrent_state_mut().get_mut(thread.thread)?.state, 1341 GuestThreadState::Running, 1342 ) { 1343 self.0.resume_fiber(fiber).await?; 1344 } else { 1345 bail!("cannot resume non-pending thread {thread:?}"); 1346 } 1347 } 1348 WorkItem::GuestCall(_, call) => { 1349 if call.is_ready(self.0)? { 1350 self.run_on_worker(WorkerItem::GuestCall(call)).await?; 1351 } else { 1352 let state = self.0.concurrent_state_mut(); 1353 let task = state.get_mut(call.thread.task)?; 1354 if !task.starting_sent { 1355 task.starting_sent = true; 1356 if let GuestCallKind::StartImplicit(_) = &call.kind { 1357 Waitable::Guest(call.thread.task).set_event( 1358 state, 1359 Some(Event::Subtask { 1360 status: Status::Starting, 1361 }), 1362 )?; 1363 } 1364 } 1365 1366 let instance = state.get_mut(call.thread.task)?.instance; 1367 self.0 1368 .instance_state(instance) 1369 .concurrent_state() 1370 .pending 1371 .insert(call.thread, call.kind); 1372 } 1373 } 1374 WorkItem::WorkerFunction(fun) => { 1375 self.run_on_worker(WorkerItem::Function(fun)).await?; 1376 } 1377 } 1378 1379 Ok(()) 1380 } 1381 1382 /// Execute the specified guest call on a worker fiber. 1383 async fn run_on_worker(self, item: WorkerItem) -> Result<()> 1384 where 1385 T: Send, 1386 { 1387 let worker = if let Some(fiber) = self.0.concurrent_state_mut().worker.take() { 1388 fiber 1389 } else { 1390 fiber::make_fiber(self.0, move |store| { 1391 loop { 1392 match store.concurrent_state_mut().worker_item.take().unwrap() { 1393 WorkerItem::GuestCall(call) => handle_guest_call(store, call)?, 1394 WorkerItem::Function(fun) => fun.into_inner()(store)?, 1395 } 1396 1397 store.suspend(SuspendReason::NeedWork)?; 1398 } 1399 })? 1400 }; 1401 1402 let worker_item = &mut self.0.concurrent_state_mut().worker_item; 1403 assert!(worker_item.is_none()); 1404 *worker_item = Some(item); 1405 1406 self.0.resume_fiber(worker).await 1407 } 1408 1409 /// Wrap the specified host function in a future which will call it, passing 1410 /// it an `&Accessor<T>`. 1411 /// 1412 /// See the `Accessor` documentation for details. 1413 pub(crate) fn wrap_call<F, R>(self, closure: F) -> impl Future<Output = Result<R>> + 'static 1414 where 1415 T: 'static, 1416 F: FnOnce(&Accessor<T>) -> Pin<Box<dyn Future<Output = Result<R>> + Send + '_>> 1417 + Send 1418 + Sync 1419 + 'static, 1420 R: Send + Sync + 'static, 1421 { 1422 let token = StoreToken::new(self); 1423 async move { 1424 let mut accessor = Accessor::new(token); 1425 closure(&mut accessor).await 1426 } 1427 } 1428 } 1429 1430 impl StoreOpaque { 1431 /// Push a `GuestTask` onto the task stack for either a sync-to-sync, 1432 /// guest-to-guest call or a sync host-to-guest call. 1433 /// 1434 /// This task will only be used for the purpose of handling calls to 1435 /// intrinsic functions; both parameter lowering and result lifting are 1436 /// assumed to be taken care of elsewhere. 1437 pub(crate) fn enter_guest_sync_call( 1438 &mut self, 1439 guest_caller: Option<RuntimeInstance>, 1440 callee_async: bool, 1441 callee: RuntimeInstance, 1442 ) -> Result<()> { 1443 log::trace!("enter sync call {callee:?}"); 1444 if !self.concurrency_support() { 1445 return Ok(self.enter_call_not_concurrent()); 1446 } 1447 1448 let state = self.concurrent_state_mut(); 1449 let thread = state.current_thread; 1450 let instance = if let Some(thread) = thread.guest() { 1451 Some(state.get_mut(thread.task)?.instance) 1452 } else { 1453 None 1454 }; 1455 let task = GuestTask::new( 1456 state, 1457 Box::new(move |_, _| unreachable!()), 1458 LiftResult { 1459 lift: Box::new(move |_, _| unreachable!()), 1460 ty: TypeTupleIndex::reserved_value(), 1461 memory: None, 1462 string_encoding: StringEncoding::Utf8, 1463 }, 1464 if let Some(caller) = guest_caller { 1465 assert_eq!(caller, instance.unwrap()); 1466 Caller::Guest { 1467 thread: *thread.guest().unwrap(), 1468 } 1469 } else { 1470 Caller::Host { 1471 tx: None, 1472 exit_tx: Arc::new(oneshot::channel().0), 1473 host_future_present: false, 1474 caller: thread, 1475 } 1476 }, 1477 None, 1478 callee, 1479 callee_async, 1480 )?; 1481 1482 let guest_task = state.push(task)?; 1483 let new_thread = GuestThread::new_implicit(guest_task); 1484 let guest_thread = state.push(new_thread)?; 1485 Instance::from_wasmtime(self, callee.instance).add_guest_thread_to_instance_table( 1486 guest_thread, 1487 self, 1488 callee.index, 1489 )?; 1490 1491 let state = self.concurrent_state_mut(); 1492 state.get_mut(guest_task)?.threads.insert(guest_thread); 1493 if guest_caller.is_some() { 1494 let thread = thread.guest().unwrap(); 1495 state.get_mut(thread.task)?.subtasks.insert(guest_task); 1496 } 1497 1498 self.set_thread(QualifiedThreadId { 1499 task: guest_task, 1500 thread: guest_thread, 1501 }); 1502 1503 Ok(()) 1504 } 1505 1506 /// Pop a `GuestTask` previously pushed using `enter_sync_call`. 1507 pub(crate) fn exit_guest_sync_call(&mut self, guest_caller: bool) -> Result<()> { 1508 if !self.concurrency_support() { 1509 return Ok(self.exit_call_not_concurrent()); 1510 } 1511 let thread = *self.set_thread(CurrentThread::None).guest().unwrap(); 1512 let instance = self.concurrent_state_mut().get_mut(thread.task)?.instance; 1513 log::trace!("exit sync call {instance:?}"); 1514 Instance::from_wasmtime(self, instance.instance).cleanup_thread( 1515 self, 1516 thread, 1517 instance.index, 1518 )?; 1519 1520 let state = self.concurrent_state_mut(); 1521 let task = state.get_mut(thread.task)?; 1522 let caller = match &task.caller { 1523 &Caller::Guest { thread } => { 1524 assert!(guest_caller); 1525 thread.into() 1526 } 1527 &Caller::Host { caller, .. } => { 1528 assert!(!guest_caller); 1529 caller 1530 } 1531 }; 1532 self.set_thread(caller); 1533 1534 let state = self.concurrent_state_mut(); 1535 let task = state.get_mut(thread.task)?; 1536 if task.ready_to_delete() { 1537 state.delete(thread.task)?.dispose(state, thread.task)?; 1538 } 1539 1540 Ok(()) 1541 } 1542 1543 /// Similar to `enter_guest_sync_call` except for when the guest makes a 1544 /// transition to the host. 1545 /// 1546 /// FIXME: this is called for all guest->host transitions and performs some 1547 /// relatively expensive table manipulations. This would ideally be 1548 /// optimized to avoid the full allocation of a `HostTask` in at least some 1549 /// situations. 1550 pub fn enter_host_call(&mut self) -> Result<()> { 1551 let state = self.concurrent_state_mut(); 1552 let caller = state.unwrap_current_guest_thread(); 1553 let task = state.push(HostTask::new(caller))?; 1554 log::trace!("new host task {task:?}"); 1555 self.set_thread(task); 1556 Ok(()) 1557 } 1558 1559 /// Dual of `enter_host_call` and signifies that the host has finished and 1560 /// will be cleaned up. 1561 /// 1562 /// Note that this isn't invoked when the host is invoked asynchronously and 1563 /// the host isn't complete yet. In that situation the host task persists 1564 /// and will be cleaned up separately. 1565 pub fn exit_host_call(&mut self) -> Result<()> { 1566 let task = self.concurrent_state_mut().unwrap_current_host_thread(); 1567 log::trace!("delete host task {task:?}"); 1568 let task = self.concurrent_state_mut().delete(task)?; 1569 self.set_thread(task.caller); 1570 Ok(()) 1571 } 1572 1573 /// Determine whether the specified instance may be entered from the host. 1574 /// 1575 /// We return `true` here only if all of the following hold: 1576 /// 1577 /// - The top-level instance is not already on the current task's call stack. 1578 /// - The instance is not in need of a post-return function call. 1579 /// - `self` has not been poisoned due to a trap. 1580 pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool { 1581 if self.trapped() { 1582 return false; 1583 } 1584 if !self.concurrency_support() { 1585 return true; 1586 } 1587 let state = self.concurrent_state_mut(); 1588 let mut cur = state.current_thread; 1589 loop { 1590 match cur { 1591 CurrentThread::None => break true, 1592 CurrentThread::Guest(thread) => { 1593 let task = state.get_mut(thread.task).unwrap(); 1594 1595 // Note that we only compare top-level instance IDs here. 1596 // The idea is that the host is not allowed to recursively 1597 // enter a top-level instance even if the specific leaf 1598 // instance is not on the stack. This the behavior defined 1599 // in the spec, and it allows us to elide runtime checks in 1600 // guest-to-guest adapters. 1601 if task.instance.instance == instance.instance { 1602 break false; 1603 } 1604 cur = match task.caller { 1605 Caller::Host { caller, .. } => caller, 1606 Caller::Guest { thread } => thread.into(), 1607 }; 1608 } 1609 CurrentThread::Host(id) => { 1610 cur = state.get_mut(id).unwrap().caller.into(); 1611 } 1612 } 1613 } 1614 } 1615 1616 /// Helper function to retrieve the `InstanceState` for the 1617 /// specified instance. 1618 fn instance_state(&mut self, instance: RuntimeInstance) -> &mut InstanceState { 1619 self.component_instance_mut(instance.instance) 1620 .instance_state(instance.index) 1621 } 1622 1623 fn set_thread(&mut self, thread: impl Into<CurrentThread>) -> CurrentThread { 1624 // Each time we switch threads, we conservatively set `task_may_block` 1625 // to `false` for the component instance we're switching away from (if 1626 // any), meaning it will be `false` for any new thread created for that 1627 // instance unless explicitly set otherwise. 1628 let state = self.concurrent_state_mut(); 1629 let old_thread = mem::replace(&mut state.current_thread, thread.into()); 1630 if let Some(old_thread) = old_thread.guest() { 1631 let instance = state.get_mut(old_thread.task).unwrap().instance.instance; 1632 self.component_instance_mut(instance) 1633 .set_task_may_block(false) 1634 } 1635 1636 // If we're switching to a new thread, set its component instance's 1637 // `task_may_block` according to where it left off. 1638 if self.concurrent_state_mut().current_thread.guest().is_some() { 1639 self.set_task_may_block(); 1640 } 1641 1642 old_thread 1643 } 1644 1645 /// Set the global variable representing whether the current task may block 1646 /// prior to entering Wasm code. 1647 fn set_task_may_block(&mut self) { 1648 let state = self.concurrent_state_mut(); 1649 let guest_thread = state.unwrap_current_guest_thread(); 1650 let instance = state.get_mut(guest_thread.task).unwrap().instance.instance; 1651 let may_block = self.concurrent_state_mut().may_block(guest_thread.task); 1652 self.component_instance_mut(instance) 1653 .set_task_may_block(may_block) 1654 } 1655 1656 pub(crate) fn check_blocking(&mut self) -> Result<()> { 1657 if !self.concurrency_support() { 1658 return Ok(()); 1659 } 1660 let state = self.concurrent_state_mut(); 1661 let task = state.unwrap_current_guest_thread().task; 1662 let instance = state.get_mut(task).unwrap().instance.instance; 1663 let task_may_block = self.component_instance(instance).get_task_may_block(); 1664 1665 if task_may_block { 1666 Ok(()) 1667 } else { 1668 Err(Trap::CannotBlockSyncTask.into()) 1669 } 1670 } 1671 1672 /// Record that we're about to enter a (sub-)component instance which does 1673 /// not support more than one concurrent, stackful activation, meaning it 1674 /// cannot be entered again until the next call returns. 1675 fn enter_instance(&mut self, instance: RuntimeInstance) { 1676 log::trace!("enter {instance:?}"); 1677 self.instance_state(instance) 1678 .concurrent_state() 1679 .do_not_enter = true; 1680 } 1681 1682 /// Record that we've exited a (sub-)component instance previously entered 1683 /// with `Self::enter_instance` and then calls `Self::partition_pending`. 1684 /// See the documentation for the latter for details. 1685 fn exit_instance(&mut self, instance: RuntimeInstance) -> Result<()> { 1686 log::trace!("exit {instance:?}"); 1687 self.instance_state(instance) 1688 .concurrent_state() 1689 .do_not_enter = false; 1690 self.partition_pending(instance) 1691 } 1692 1693 /// Iterate over `InstanceState::pending`, moving any ready items into the 1694 /// "high priority" work item queue. 1695 /// 1696 /// See `GuestCall::is_ready` for details. 1697 fn partition_pending(&mut self, instance: RuntimeInstance) -> Result<()> { 1698 for (thread, kind) in 1699 mem::take(&mut self.instance_state(instance).concurrent_state().pending).into_iter() 1700 { 1701 let call = GuestCall { thread, kind }; 1702 if call.is_ready(self)? { 1703 self.concurrent_state_mut() 1704 .push_high_priority(WorkItem::GuestCall(instance.index, call)); 1705 } else { 1706 self.instance_state(instance) 1707 .concurrent_state() 1708 .pending 1709 .insert(call.thread, call.kind); 1710 } 1711 } 1712 1713 Ok(()) 1714 } 1715 1716 /// Implements the `backpressure.{inc,dec}` intrinsics. 1717 pub(crate) fn backpressure_modify( 1718 &mut self, 1719 caller_instance: RuntimeInstance, 1720 modify: impl FnOnce(u16) -> Option<u16>, 1721 ) -> Result<()> { 1722 let state = self.instance_state(caller_instance).concurrent_state(); 1723 let old = state.backpressure; 1724 let new = modify(old).ok_or_else(|| format_err!("backpressure counter overflow"))?; 1725 state.backpressure = new; 1726 1727 if old > 0 && new == 0 { 1728 // Backpressure was previously enabled and is now disabled; move any 1729 // newly-eligible guest calls to the "high priority" queue. 1730 self.partition_pending(caller_instance)?; 1731 } 1732 1733 Ok(()) 1734 } 1735 1736 /// Resume the specified fiber, giving it exclusive access to the specified 1737 /// store. 1738 async fn resume_fiber(&mut self, fiber: StoreFiber<'static>) -> Result<()> { 1739 let old_thread = self.concurrent_state_mut().current_thread; 1740 log::trace!("resume_fiber: save current thread {old_thread:?}"); 1741 1742 let fiber = fiber::resolve_or_release(self, fiber).await?; 1743 1744 self.set_thread(old_thread); 1745 1746 let state = self.concurrent_state_mut(); 1747 1748 if let Some(ot) = old_thread.guest() { 1749 state.get_mut(ot.thread)?.state = GuestThreadState::Running; 1750 } 1751 log::trace!("resume_fiber: restore current thread {old_thread:?}"); 1752 1753 if let Some(mut fiber) = fiber { 1754 log::trace!("resume_fiber: suspend reason {:?}", &state.suspend_reason); 1755 // See the `SuspendReason` documentation for what each case means. 1756 match state.suspend_reason.take().unwrap() { 1757 SuspendReason::NeedWork => { 1758 if state.worker.is_none() { 1759 state.worker = Some(fiber); 1760 } else { 1761 fiber.dispose(self); 1762 } 1763 } 1764 SuspendReason::Yielding { thread, .. } => { 1765 state.get_mut(thread.thread)?.state = GuestThreadState::Ready(fiber); 1766 let instance = state.get_mut(thread.task)?.instance.index; 1767 state.push_low_priority(WorkItem::ResumeThread(instance, thread)); 1768 } 1769 SuspendReason::ExplicitlySuspending { thread, .. } => { 1770 state.get_mut(thread.thread)?.state = GuestThreadState::Suspended(fiber); 1771 } 1772 SuspendReason::Waiting { set, thread, .. } => { 1773 let old = state 1774 .get_mut(set)? 1775 .waiting 1776 .insert(thread, WaitMode::Fiber(fiber)); 1777 assert!(old.is_none()); 1778 } 1779 }; 1780 } else { 1781 log::trace!("resume_fiber: fiber has exited"); 1782 } 1783 1784 Ok(()) 1785 } 1786 1787 /// Suspend the current fiber, storing the reason in 1788 /// `ConcurrentState::suspend_reason` to indicate the conditions under which 1789 /// it should be resumed. 1790 /// 1791 /// See the `SuspendReason` documentation for details. 1792 fn suspend(&mut self, reason: SuspendReason) -> Result<()> { 1793 log::trace!("suspend fiber: {reason:?}"); 1794 1795 // If we're yielding or waiting on behalf of a guest thread, we'll need to 1796 // pop the call context which manages resource borrows before suspending 1797 // and then push it again once we've resumed. 1798 let task = match &reason { 1799 SuspendReason::Yielding { thread, .. } 1800 | SuspendReason::Waiting { thread, .. } 1801 | SuspendReason::ExplicitlySuspending { thread, .. } => Some(thread.task), 1802 SuspendReason::NeedWork => None, 1803 }; 1804 1805 let old_guest_thread = if task.is_some() { 1806 self.concurrent_state_mut().current_thread 1807 } else { 1808 CurrentThread::None 1809 }; 1810 1811 // We should not have reached here unless either there's no current 1812 // task, or the current task is permitted to block. In addition, we 1813 // special-case `thread.switch-to` and waiting for a subtask to go from 1814 // `starting` to `started`, both of which we consider non-blocking 1815 // operations despite requiring a suspend. 1816 assert!( 1817 matches!( 1818 reason, 1819 SuspendReason::ExplicitlySuspending { 1820 skip_may_block_check: true, 1821 .. 1822 } | SuspendReason::Waiting { 1823 skip_may_block_check: true, 1824 .. 1825 } | SuspendReason::Yielding { 1826 skip_may_block_check: true, 1827 .. 1828 } 1829 ) || old_guest_thread 1830 .guest() 1831 .map(|thread| self.concurrent_state_mut().may_block(thread.task)) 1832 .unwrap_or(true) 1833 ); 1834 1835 let suspend_reason = &mut self.concurrent_state_mut().suspend_reason; 1836 assert!(suspend_reason.is_none()); 1837 *suspend_reason = Some(reason); 1838 1839 self.with_blocking(|_, cx| cx.suspend(StoreFiberYield::ReleaseStore))?; 1840 1841 if task.is_some() { 1842 self.set_thread(old_guest_thread); 1843 } 1844 1845 Ok(()) 1846 } 1847 1848 fn wait_for_event(&mut self, waitable: Waitable) -> Result<()> { 1849 let state = self.concurrent_state_mut(); 1850 let caller = state.unwrap_current_guest_thread(); 1851 let old_set = waitable.common(state)?.set; 1852 let set = state.get_mut(caller.task)?.sync_call_set; 1853 waitable.join(state, Some(set))?; 1854 self.suspend(SuspendReason::Waiting { 1855 set, 1856 thread: caller, 1857 skip_may_block_check: false, 1858 })?; 1859 let state = self.concurrent_state_mut(); 1860 waitable.join(state, old_set) 1861 } 1862 } 1863 1864 impl Instance { 1865 /// Get the next pending event for the specified task and (optional) 1866 /// waitable set, along with the waitable handle if applicable. 1867 fn get_event( 1868 self, 1869 store: &mut StoreOpaque, 1870 guest_task: TableId<GuestTask>, 1871 set: Option<TableId<WaitableSet>>, 1872 cancellable: bool, 1873 ) -> Result<Option<(Event, Option<(Waitable, u32)>)>> { 1874 let state = store.concurrent_state_mut(); 1875 1876 if let Some(event) = state.get_mut(guest_task)?.event.take() { 1877 log::trace!("deliver event {event:?} to {guest_task:?}"); 1878 1879 if cancellable || !matches!(event, Event::Cancelled) { 1880 return Ok(Some((event, None))); 1881 } else { 1882 state.get_mut(guest_task)?.event = Some(event); 1883 } 1884 } 1885 1886 Ok( 1887 if let Some((set, waitable)) = set 1888 .and_then(|set| { 1889 state 1890 .get_mut(set) 1891 .map(|v| v.ready.pop_first().map(|v| (set, v))) 1892 .transpose() 1893 }) 1894 .transpose()? 1895 { 1896 let common = waitable.common(state)?; 1897 let handle = common.handle.unwrap(); 1898 let event = common.event.take().unwrap(); 1899 1900 log::trace!( 1901 "deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}" 1902 ); 1903 1904 waitable.on_delivery(store, self, event); 1905 1906 Some((event, Some((waitable, handle)))) 1907 } else { 1908 None 1909 }, 1910 ) 1911 } 1912 1913 /// Handle the `CallbackCode` returned from an async-lifted export or its 1914 /// callback. 1915 /// 1916 /// If this returns `Ok(Some(call))`, then `call` should be run immediately 1917 /// using `handle_guest_call`. 1918 fn handle_callback_code( 1919 self, 1920 store: &mut StoreOpaque, 1921 guest_thread: QualifiedThreadId, 1922 runtime_instance: RuntimeComponentInstanceIndex, 1923 code: u32, 1924 ) -> Result<Option<GuestCall>> { 1925 let (code, set) = unpack_callback_code(code); 1926 1927 log::trace!("received callback code from {guest_thread:?}: {code} (set: {set})"); 1928 1929 let state = store.concurrent_state_mut(); 1930 1931 let get_set = |store: &mut StoreOpaque, handle| { 1932 if handle == 0 { 1933 bail!("invalid waitable-set handle"); 1934 } 1935 1936 let set = store 1937 .instance_state(RuntimeInstance { 1938 instance: self.id().instance(), 1939 index: runtime_instance, 1940 }) 1941 .handle_table() 1942 .waitable_set_rep(handle)?; 1943 1944 Ok(TableId::<WaitableSet>::new(set)) 1945 }; 1946 1947 Ok(match code { 1948 callback_code::EXIT => { 1949 log::trace!("implicit thread {guest_thread:?} completed"); 1950 self.cleanup_thread(store, guest_thread, runtime_instance)?; 1951 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 1952 if task.threads.is_empty() && !task.returned_or_cancelled() { 1953 bail!(Trap::NoAsyncResult); 1954 } 1955 match &task.caller { 1956 Caller::Host { .. } => { 1957 if task.ready_to_delete() { 1958 Waitable::Guest(guest_thread.task) 1959 .delete_from(store.concurrent_state_mut())?; 1960 } 1961 } 1962 Caller::Guest { .. } => { 1963 task.exited = true; 1964 task.callback = None; 1965 } 1966 } 1967 None 1968 } 1969 callback_code::YIELD => { 1970 let task = state.get_mut(guest_thread.task)?; 1971 // If an `Event::Cancelled` is pending, we'll deliver that; 1972 // otherwise, we'll deliver `Event::None`. Note that 1973 // `GuestTask::event` is only ever set to one of those two 1974 // `Event` variants. 1975 if let Some(event) = task.event { 1976 assert!(matches!(event, Event::None | Event::Cancelled)); 1977 } else { 1978 task.event = Some(Event::None); 1979 } 1980 let call = GuestCall { 1981 thread: guest_thread, 1982 kind: GuestCallKind::DeliverEvent { 1983 instance: self, 1984 set: None, 1985 }, 1986 }; 1987 if state.may_block(guest_thread.task) { 1988 // Push this thread onto the "low priority" queue so it runs 1989 // after any other threads have had a chance to run. 1990 state.push_low_priority(WorkItem::GuestCall(runtime_instance, call)); 1991 None 1992 } else { 1993 // Yielding in a non-blocking context is defined as a no-op 1994 // according to the spec, so we must run this thread 1995 // immediately without allowing any others to run. 1996 Some(call) 1997 } 1998 } 1999 callback_code::WAIT => { 2000 // The task may only return `WAIT` if it was created for a call 2001 // to an async export). Otherwise, we'll trap. 2002 state.check_blocking_for(guest_thread.task)?; 2003 2004 let set = get_set(store, set)?; 2005 let state = store.concurrent_state_mut(); 2006 2007 if state.get_mut(guest_thread.task)?.event.is_some() 2008 || !state.get_mut(set)?.ready.is_empty() 2009 { 2010 // An event is immediately available; deliver it ASAP. 2011 state.push_high_priority(WorkItem::GuestCall( 2012 runtime_instance, 2013 GuestCall { 2014 thread: guest_thread, 2015 kind: GuestCallKind::DeliverEvent { 2016 instance: self, 2017 set: Some(set), 2018 }, 2019 }, 2020 )); 2021 } else { 2022 // No event is immediately available. 2023 // 2024 // We're waiting, so register to be woken up when an event 2025 // is published for this waitable set. 2026 // 2027 // Here we also set `GuestTask::wake_on_cancel` which allows 2028 // `subtask.cancel` to interrupt the wait. 2029 let old = state 2030 .get_mut(guest_thread.thread)? 2031 .wake_on_cancel 2032 .replace(set); 2033 assert!(old.is_none()); 2034 let old = state 2035 .get_mut(set)? 2036 .waiting 2037 .insert(guest_thread, WaitMode::Callback(self)); 2038 assert!(old.is_none()); 2039 } 2040 None 2041 } 2042 _ => bail!("unsupported callback code: {code}"), 2043 }) 2044 } 2045 2046 fn cleanup_thread( 2047 self, 2048 store: &mut StoreOpaque, 2049 guest_thread: QualifiedThreadId, 2050 runtime_instance: RuntimeComponentInstanceIndex, 2051 ) -> Result<()> { 2052 let guest_id = store 2053 .concurrent_state_mut() 2054 .get_mut(guest_thread.thread)? 2055 .instance_rep; 2056 store 2057 .instance_state(RuntimeInstance { 2058 instance: self.id().instance(), 2059 index: runtime_instance, 2060 }) 2061 .thread_handle_table() 2062 .guest_thread_remove(guest_id.unwrap())?; 2063 2064 store.concurrent_state_mut().delete(guest_thread.thread)?; 2065 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2066 task.threads.remove(&guest_thread.thread); 2067 Ok(()) 2068 } 2069 2070 /// Add the specified guest call to the "high priority" work item queue, to 2071 /// be started as soon as backpressure and/or reentrance rules allow. 2072 /// 2073 /// SAFETY: The raw pointer arguments must be valid references to guest 2074 /// functions (with the appropriate signatures) when the closures queued by 2075 /// this function are called. 2076 unsafe fn queue_call<T: 'static>( 2077 self, 2078 mut store: StoreContextMut<T>, 2079 guest_thread: QualifiedThreadId, 2080 callee: SendSyncPtr<VMFuncRef>, 2081 param_count: usize, 2082 result_count: usize, 2083 async_: bool, 2084 callback: Option<SendSyncPtr<VMFuncRef>>, 2085 post_return: Option<SendSyncPtr<VMFuncRef>>, 2086 ) -> Result<()> { 2087 /// Return a closure which will call the specified function in the scope 2088 /// of the specified task. 2089 /// 2090 /// This will use `GuestTask::lower_params` to lower the parameters, but 2091 /// will not lift the result; instead, it returns a 2092 /// `[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]` from which the result, if 2093 /// any, may be lifted. Note that an async-lifted export will have 2094 /// returned its result using the `task.return` intrinsic (or not 2095 /// returned a result at all, in the case of `task.cancel`), in which 2096 /// case the "result" of this call will either be a callback code or 2097 /// nothing. 2098 /// 2099 /// SAFETY: `callee` must be a valid `*mut VMFuncRef` at the time when 2100 /// the returned closure is called. 2101 unsafe fn make_call<T: 'static>( 2102 store: StoreContextMut<T>, 2103 guest_thread: QualifiedThreadId, 2104 callee: SendSyncPtr<VMFuncRef>, 2105 param_count: usize, 2106 result_count: usize, 2107 ) -> impl FnOnce(&mut dyn VMStore) -> Result<[MaybeUninit<ValRaw>; MAX_FLAT_PARAMS]> 2108 + Send 2109 + Sync 2110 + 'static 2111 + use<T> { 2112 let token = StoreToken::new(store); 2113 move |store: &mut dyn VMStore| { 2114 let mut storage = [MaybeUninit::uninit(); MAX_FLAT_PARAMS]; 2115 2116 store 2117 .concurrent_state_mut() 2118 .get_mut(guest_thread.thread)? 2119 .state = GuestThreadState::Running; 2120 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2121 let lower = task.lower_params.take().unwrap(); 2122 2123 lower(store, &mut storage[..param_count])?; 2124 2125 let mut store = token.as_context_mut(store); 2126 2127 // SAFETY: Per the contract documented in `make_call's` 2128 // documentation, `callee` must be a valid pointer. 2129 unsafe { 2130 crate::Func::call_unchecked_raw( 2131 &mut store, 2132 callee.as_non_null(), 2133 NonNull::new( 2134 &mut storage[..param_count.max(result_count)] 2135 as *mut [MaybeUninit<ValRaw>] as _, 2136 ) 2137 .unwrap(), 2138 )?; 2139 } 2140 2141 Ok(storage) 2142 } 2143 } 2144 2145 // SAFETY: Per the contract described in this function documentation, 2146 // the `callee` pointer which `call` closes over must be valid when 2147 // called by the closure we queue below. 2148 let call = unsafe { 2149 make_call( 2150 store.as_context_mut(), 2151 guest_thread, 2152 callee, 2153 param_count, 2154 result_count, 2155 ) 2156 }; 2157 2158 let callee_instance = store 2159 .0 2160 .concurrent_state_mut() 2161 .get_mut(guest_thread.task)? 2162 .instance; 2163 2164 let fun = if callback.is_some() { 2165 assert!(async_); 2166 2167 Box::new(move |store: &mut dyn VMStore| { 2168 self.add_guest_thread_to_instance_table( 2169 guest_thread.thread, 2170 store, 2171 callee_instance.index, 2172 )?; 2173 let old_thread = store.set_thread(guest_thread); 2174 log::trace!( 2175 "stackless call: replaced {old_thread:?} with {guest_thread:?} as current thread" 2176 ); 2177 2178 store.enter_instance(callee_instance); 2179 2180 // SAFETY: See the documentation for `make_call` to review the 2181 // contract we must uphold for `call` here. 2182 // 2183 // Per the contract described in the `queue_call` 2184 // documentation, the `callee` pointer which `call` closes 2185 // over must be valid. 2186 let storage = call(store)?; 2187 2188 store.exit_instance(callee_instance)?; 2189 2190 store.set_thread(old_thread); 2191 let state = store.concurrent_state_mut(); 2192 old_thread 2193 .guest() 2194 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 2195 log::trace!("stackless call: restored {old_thread:?} as current thread"); 2196 2197 // SAFETY: `wasmparser` will have validated that the callback 2198 // function returns a `i32` result. 2199 let code = unsafe { storage[0].assume_init() }.get_i32() as u32; 2200 2201 self.handle_callback_code(store, guest_thread, callee_instance.index, code) 2202 }) 2203 as Box<dyn FnOnce(&mut dyn VMStore) -> Result<Option<GuestCall>> + Send + Sync> 2204 } else { 2205 let token = StoreToken::new(store.as_context_mut()); 2206 Box::new(move |store: &mut dyn VMStore| { 2207 self.add_guest_thread_to_instance_table( 2208 guest_thread.thread, 2209 store, 2210 callee_instance.index, 2211 )?; 2212 let old_thread = store.set_thread(guest_thread); 2213 log::trace!( 2214 "sync/async-stackful call: replaced {old_thread:?} with {guest_thread:?} as current thread", 2215 ); 2216 let flags = self.id().get(store).instance_flags(callee_instance.index); 2217 2218 // Unless this is a callback-less (i.e. stackful) 2219 // async-lifted export, we need to record that the instance 2220 // cannot be entered until the call returns. 2221 if !async_ { 2222 store.enter_instance(callee_instance); 2223 } 2224 2225 // SAFETY: See the documentation for `make_call` to review the 2226 // contract we must uphold for `call` here. 2227 // 2228 // Per the contract described in the `queue_call` 2229 // documentation, the `callee` pointer which `call` closes 2230 // over must be valid. 2231 let storage = call(store)?; 2232 2233 if async_ { 2234 let task = store.concurrent_state_mut().get_mut(guest_thread.task)?; 2235 if task.threads.len() == 1 && !task.returned_or_cancelled() { 2236 bail!(Trap::NoAsyncResult); 2237 } 2238 } else { 2239 // This is a sync-lifted export, so now is when we lift the 2240 // result, optionally call the post-return function, if any, 2241 // and finally notify any current or future waiters that the 2242 // subtask has returned. 2243 2244 let lift = { 2245 store.exit_instance(callee_instance)?; 2246 2247 let state = store.concurrent_state_mut(); 2248 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2249 2250 state 2251 .get_mut(guest_thread.task)? 2252 .lift_result 2253 .take() 2254 .unwrap() 2255 }; 2256 2257 // SAFETY: `result_count` represents the number of core Wasm 2258 // results returned, per `wasmparser`. 2259 let result = (lift.lift)(store, unsafe { 2260 mem::transmute::<&[MaybeUninit<ValRaw>], &[ValRaw]>( 2261 &storage[..result_count], 2262 ) 2263 })?; 2264 2265 let post_return_arg = match result_count { 2266 0 => ValRaw::i32(0), 2267 // SAFETY: `result_count` represents the number of 2268 // core Wasm results returned, per `wasmparser`. 2269 1 => unsafe { storage[0].assume_init() }, 2270 _ => unreachable!(), 2271 }; 2272 2273 unsafe { 2274 call_post_return( 2275 token.as_context_mut(store), 2276 post_return.map(|v| v.as_non_null()), 2277 post_return_arg, 2278 flags, 2279 )?; 2280 } 2281 2282 self.task_complete(store, guest_thread.task, result, Status::Returned)?; 2283 } 2284 2285 // This is a callback-less call, so the implicit thread has now completed 2286 self.cleanup_thread(store, guest_thread, callee_instance.index)?; 2287 2288 store.set_thread(old_thread); 2289 2290 let state = store.concurrent_state_mut(); 2291 let task = state.get_mut(guest_thread.task)?; 2292 2293 match &task.caller { 2294 Caller::Host { .. } => { 2295 if task.ready_to_delete() { 2296 Waitable::Guest(guest_thread.task).delete_from(state)?; 2297 } 2298 } 2299 Caller::Guest { .. } => { 2300 task.exited = true; 2301 } 2302 } 2303 2304 Ok(None) 2305 }) 2306 }; 2307 2308 store 2309 .0 2310 .concurrent_state_mut() 2311 .push_high_priority(WorkItem::GuestCall( 2312 callee_instance.index, 2313 GuestCall { 2314 thread: guest_thread, 2315 kind: GuestCallKind::StartImplicit(fun), 2316 }, 2317 )); 2318 2319 Ok(()) 2320 } 2321 2322 /// Prepare (but do not start) a guest->guest call. 2323 /// 2324 /// This is called from fused adapter code generated in 2325 /// `wasmtime_environ::fact::trampoline::Compiler`. `start` and `return_` 2326 /// are synthesized Wasm functions which move the parameters from the caller 2327 /// to the callee and the result from the callee to the caller, 2328 /// respectively. The adapter will call `Self::start_call` immediately 2329 /// after calling this function. 2330 /// 2331 /// SAFETY: All the pointer arguments must be valid pointers to guest 2332 /// entities (and with the expected signatures for the function references 2333 /// -- see `wasmtime_environ::fact::trampoline::Compiler` for details). 2334 unsafe fn prepare_call<T: 'static>( 2335 self, 2336 mut store: StoreContextMut<T>, 2337 start: *mut VMFuncRef, 2338 return_: *mut VMFuncRef, 2339 caller_instance: RuntimeComponentInstanceIndex, 2340 callee_instance: RuntimeComponentInstanceIndex, 2341 task_return_type: TypeTupleIndex, 2342 callee_async: bool, 2343 memory: *mut VMMemoryDefinition, 2344 string_encoding: u8, 2345 caller_info: CallerInfo, 2346 ) -> Result<()> { 2347 if let (CallerInfo::Sync { .. }, true) = (&caller_info, callee_async) { 2348 // A task may only call an async-typed function via a sync lower if 2349 // it was created by a call to an async export. Otherwise, we'll 2350 // trap. 2351 store.0.check_blocking()?; 2352 } 2353 2354 enum ResultInfo { 2355 Heap { results: u32 }, 2356 Stack { result_count: u32 }, 2357 } 2358 2359 let result_info = match &caller_info { 2360 CallerInfo::Async { 2361 has_result: true, 2362 params, 2363 } => ResultInfo::Heap { 2364 results: params.last().unwrap().get_u32(), 2365 }, 2366 CallerInfo::Async { 2367 has_result: false, .. 2368 } => ResultInfo::Stack { result_count: 0 }, 2369 CallerInfo::Sync { 2370 result_count, 2371 params, 2372 } if *result_count > u32::try_from(MAX_FLAT_RESULTS).unwrap() => ResultInfo::Heap { 2373 results: params.last().unwrap().get_u32(), 2374 }, 2375 CallerInfo::Sync { result_count, .. } => ResultInfo::Stack { 2376 result_count: *result_count, 2377 }, 2378 }; 2379 2380 let sync_caller = matches!(caller_info, CallerInfo::Sync { .. }); 2381 2382 // Create a new guest task for the call, closing over the `start` and 2383 // `return_` functions to lift the parameters and lower the result, 2384 // respectively. 2385 let start = SendSyncPtr::new(NonNull::new(start).unwrap()); 2386 let return_ = SendSyncPtr::new(NonNull::new(return_).unwrap()); 2387 let token = StoreToken::new(store.as_context_mut()); 2388 let state = store.0.concurrent_state_mut(); 2389 let old_thread = state.unwrap_current_guest_thread(); 2390 2391 assert_eq!( 2392 state.get_mut(old_thread.task)?.instance, 2393 RuntimeInstance { 2394 instance: self.id().instance(), 2395 index: caller_instance, 2396 } 2397 ); 2398 2399 let new_task = GuestTask::new( 2400 state, 2401 Box::new(move |store, dst| { 2402 let mut store = token.as_context_mut(store); 2403 assert!(dst.len() <= MAX_FLAT_PARAMS); 2404 // The `+ 1` here accounts for the return pointer, if any: 2405 let mut src = [MaybeUninit::uninit(); MAX_FLAT_PARAMS + 1]; 2406 let count = match caller_info { 2407 // Async callers, if they have a result, use the last 2408 // parameter as a return pointer so chop that off if 2409 // relevant here. 2410 CallerInfo::Async { params, has_result } => { 2411 let params = ¶ms[..params.len() - usize::from(has_result)]; 2412 for (param, src) in params.iter().zip(&mut src) { 2413 src.write(*param); 2414 } 2415 params.len() 2416 } 2417 2418 // Sync callers forward everything directly. 2419 CallerInfo::Sync { params, .. } => { 2420 for (param, src) in params.iter().zip(&mut src) { 2421 src.write(*param); 2422 } 2423 params.len() 2424 } 2425 }; 2426 // SAFETY: `start` is a valid `*mut VMFuncRef` from 2427 // `wasmtime-cranelift`-generated fused adapter code. Based on 2428 // how it was constructed (see 2429 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_start_adapter` 2430 // for details) we know it takes count parameters and returns 2431 // `dst.len()` results. 2432 unsafe { 2433 crate::Func::call_unchecked_raw( 2434 &mut store, 2435 start.as_non_null(), 2436 NonNull::new( 2437 &mut src[..count.max(dst.len())] as *mut [MaybeUninit<ValRaw>] as _, 2438 ) 2439 .unwrap(), 2440 )?; 2441 } 2442 dst.copy_from_slice(&src[..dst.len()]); 2443 let state = store.0.concurrent_state_mut(); 2444 Waitable::Guest(state.unwrap_current_guest_thread().task).set_event( 2445 state, 2446 Some(Event::Subtask { 2447 status: Status::Started, 2448 }), 2449 )?; 2450 Ok(()) 2451 }), 2452 LiftResult { 2453 lift: Box::new(move |store, src| { 2454 // SAFETY: See comment in closure passed as `lower_params` 2455 // parameter above. 2456 let mut store = token.as_context_mut(store); 2457 let mut my_src = src.to_owned(); // TODO: use stack to avoid allocation? 2458 if let ResultInfo::Heap { results } = &result_info { 2459 my_src.push(ValRaw::u32(*results)); 2460 } 2461 // SAFETY: `return_` is a valid `*mut VMFuncRef` from 2462 // `wasmtime-cranelift`-generated fused adapter code. Based 2463 // on how it was constructed (see 2464 // `wasmtime_environ::fact::trampoline::Compiler::compile_async_return_adapter` 2465 // for details) we know it takes `src.len()` parameters and 2466 // returns up to 1 result. 2467 unsafe { 2468 crate::Func::call_unchecked_raw( 2469 &mut store, 2470 return_.as_non_null(), 2471 my_src.as_mut_slice().into(), 2472 )?; 2473 } 2474 let state = store.0.concurrent_state_mut(); 2475 let thread = state.unwrap_current_guest_thread(); 2476 if sync_caller { 2477 state.get_mut(thread.task)?.sync_result = SyncResult::Produced( 2478 if let ResultInfo::Stack { result_count } = &result_info { 2479 match result_count { 2480 0 => None, 2481 1 => Some(my_src[0]), 2482 _ => unreachable!(), 2483 } 2484 } else { 2485 None 2486 }, 2487 ); 2488 } 2489 Ok(Box::new(DummyResult) as Box<dyn Any + Send + Sync>) 2490 }), 2491 ty: task_return_type, 2492 memory: NonNull::new(memory).map(SendSyncPtr::new), 2493 string_encoding: StringEncoding::from_u8(string_encoding).unwrap(), 2494 }, 2495 Caller::Guest { thread: old_thread }, 2496 None, 2497 RuntimeInstance { 2498 instance: self.id().instance(), 2499 index: callee_instance, 2500 }, 2501 callee_async, 2502 )?; 2503 2504 let guest_task = state.push(new_task)?; 2505 let new_thread = GuestThread::new_implicit(guest_task); 2506 let guest_thread = state.push(new_thread)?; 2507 state.get_mut(guest_task)?.threads.insert(guest_thread); 2508 2509 store 2510 .0 2511 .concurrent_state_mut() 2512 .get_mut(old_thread.task)? 2513 .subtasks 2514 .insert(guest_task); 2515 2516 // Make the new thread the current one so that `Self::start_call` knows 2517 // which one to start. 2518 store.0.set_thread(QualifiedThreadId { 2519 task: guest_task, 2520 thread: guest_thread, 2521 }); 2522 log::trace!( 2523 "pushed {guest_task:?}:{guest_thread:?} as current thread; old thread was {old_thread:?}" 2524 ); 2525 2526 Ok(()) 2527 } 2528 2529 /// Call the specified callback function for an async-lifted export. 2530 /// 2531 /// SAFETY: `function` must be a valid reference to a guest function of the 2532 /// correct signature for a callback. 2533 unsafe fn call_callback<T>( 2534 self, 2535 mut store: StoreContextMut<T>, 2536 function: SendSyncPtr<VMFuncRef>, 2537 event: Event, 2538 handle: u32, 2539 ) -> Result<u32> { 2540 let (ordinal, result) = event.parts(); 2541 let params = &mut [ 2542 ValRaw::u32(ordinal), 2543 ValRaw::u32(handle), 2544 ValRaw::u32(result), 2545 ]; 2546 // SAFETY: `func` is a valid `*mut VMFuncRef` from either 2547 // `wasmtime-cranelift`-generated fused adapter code or 2548 // `component::Options`. Per `wasmparser` callback signature 2549 // validation, we know it takes three parameters and returns one. 2550 unsafe { 2551 crate::Func::call_unchecked_raw( 2552 &mut store, 2553 function.as_non_null(), 2554 params.as_mut_slice().into(), 2555 )?; 2556 } 2557 Ok(params[0].get_u32()) 2558 } 2559 2560 /// Start a guest->guest call previously prepared using 2561 /// `Self::prepare_call`. 2562 /// 2563 /// This is called from fused adapter code generated in 2564 /// `wasmtime_environ::fact::trampoline::Compiler`. The adapter will call 2565 /// this function immediately after calling `Self::prepare_call`. 2566 /// 2567 /// SAFETY: The `*mut VMFuncRef` arguments must be valid pointers to guest 2568 /// functions with the appropriate signatures for the current guest task. 2569 /// If this is a call to an async-lowered import, the actual call may be 2570 /// deferred and run after this function returns, in which case the pointer 2571 /// arguments must also be valid when the call happens. 2572 unsafe fn start_call<T: 'static>( 2573 self, 2574 mut store: StoreContextMut<T>, 2575 callback: *mut VMFuncRef, 2576 post_return: *mut VMFuncRef, 2577 callee: *mut VMFuncRef, 2578 param_count: u32, 2579 result_count: u32, 2580 flags: u32, 2581 storage: Option<&mut [MaybeUninit<ValRaw>]>, 2582 ) -> Result<u32> { 2583 let token = StoreToken::new(store.as_context_mut()); 2584 let async_caller = storage.is_none(); 2585 let state = store.0.concurrent_state_mut(); 2586 let guest_thread = state.unwrap_current_guest_thread(); 2587 let callee_async = state.get_mut(guest_thread.task)?.async_function; 2588 let callee = SendSyncPtr::new(NonNull::new(callee).unwrap()); 2589 let param_count = usize::try_from(param_count).unwrap(); 2590 assert!(param_count <= MAX_FLAT_PARAMS); 2591 let result_count = usize::try_from(result_count).unwrap(); 2592 assert!(result_count <= MAX_FLAT_RESULTS); 2593 2594 let task = state.get_mut(guest_thread.task)?; 2595 if !callback.is_null() { 2596 // We're calling an async-lifted export with a callback, so store 2597 // the callback and related context as part of the task so we can 2598 // call it later when needed. 2599 let callback = SendSyncPtr::new(NonNull::new(callback).unwrap()); 2600 task.callback = Some(Box::new(move |store, event, handle| { 2601 let store = token.as_context_mut(store); 2602 unsafe { self.call_callback::<T>(store, callback, event, handle) } 2603 })); 2604 } 2605 2606 let Caller::Guest { thread: caller } = &task.caller else { 2607 // As of this writing, `start_call` is only used for guest->guest 2608 // calls. 2609 unreachable!() 2610 }; 2611 let caller = *caller; 2612 let caller_instance = state.get_mut(caller.task)?.instance; 2613 2614 // Queue the call as a "high priority" work item. 2615 unsafe { 2616 self.queue_call( 2617 store.as_context_mut(), 2618 guest_thread, 2619 callee, 2620 param_count, 2621 result_count, 2622 (flags & START_FLAG_ASYNC_CALLEE) != 0, 2623 NonNull::new(callback).map(SendSyncPtr::new), 2624 NonNull::new(post_return).map(SendSyncPtr::new), 2625 )?; 2626 } 2627 2628 let state = store.0.concurrent_state_mut(); 2629 2630 // Use the caller's `GuestTask::sync_call_set` to register interest in 2631 // the subtask... 2632 let guest_waitable = Waitable::Guest(guest_thread.task); 2633 let old_set = guest_waitable.common(state)?.set; 2634 let set = state.get_mut(caller.task)?.sync_call_set; 2635 guest_waitable.join(state, Some(set))?; 2636 2637 // ... and suspend this fiber temporarily while we wait for it to start. 2638 // 2639 // Note that we _could_ call the callee directly using the current fiber 2640 // rather than suspend this one, but that would make reasoning about the 2641 // event loop more complicated and is probably only worth doing if 2642 // there's a measurable performance benefit. In addition, it would mean 2643 // blocking the caller if the callee calls a blocking sync-lowered 2644 // import, and as of this writing the spec says we must not do that. 2645 // 2646 // Alternatively, the fused adapter code could be modified to call the 2647 // callee directly without calling a host-provided intrinsic at all (in 2648 // which case it would need to do its own, inline backpressure checks, 2649 // etc.). Again, we'd want to see a measurable performance benefit 2650 // before committing to such an optimization. And again, we'd need to 2651 // update the spec to allow that. 2652 let (status, waitable) = loop { 2653 store.0.suspend(SuspendReason::Waiting { 2654 set, 2655 thread: caller, 2656 // Normally, `StoreOpaque::suspend` would assert it's being 2657 // called from a context where blocking is allowed. However, if 2658 // `async_caller` is `true`, we'll only "block" long enough for 2659 // the callee to start, i.e. we won't repeat this loop, so we 2660 // tell `suspend` it's okay even if we're not allowed to block. 2661 // Alternatively, if the callee is not an async function, then 2662 // we know it won't block anyway. 2663 skip_may_block_check: async_caller || !callee_async, 2664 })?; 2665 2666 let state = store.0.concurrent_state_mut(); 2667 2668 log::trace!("taking event for {:?}", guest_thread.task); 2669 let event = guest_waitable.take_event(state)?; 2670 let Some(Event::Subtask { status }) = event else { 2671 unreachable!(); 2672 }; 2673 2674 log::trace!("status {status:?} for {:?}", guest_thread.task); 2675 2676 if status == Status::Returned { 2677 // It returned, so we can stop waiting. 2678 break (status, None); 2679 } else if async_caller { 2680 // It hasn't returned yet, but the caller is calling via an 2681 // async-lowered import, so we generate a handle for the task 2682 // waitable and return the status. 2683 let handle = store 2684 .0 2685 .instance_state(caller_instance) 2686 .handle_table() 2687 .subtask_insert_guest(guest_thread.task.rep())?; 2688 store 2689 .0 2690 .concurrent_state_mut() 2691 .get_mut(guest_thread.task)? 2692 .common 2693 .handle = Some(handle); 2694 break (status, Some(handle)); 2695 } else { 2696 // The callee hasn't returned yet, and the caller is calling via 2697 // a sync-lowered import, so we loop and keep waiting until the 2698 // callee returns. 2699 } 2700 }; 2701 2702 guest_waitable.join(store.0.concurrent_state_mut(), old_set)?; 2703 2704 // Reset the current thread to point to the caller as it resumes control. 2705 store.0.set_thread(caller); 2706 store.0.concurrent_state_mut().get_mut(caller.thread)?.state = GuestThreadState::Running; 2707 log::trace!("popped current thread {guest_thread:?}; new thread is {caller:?}"); 2708 2709 if let Some(storage) = storage { 2710 // The caller used a sync-lowered import to call an async-lifted 2711 // export, in which case the result, if any, has been stashed in 2712 // `GuestTask::sync_result`. 2713 let state = store.0.concurrent_state_mut(); 2714 let task = state.get_mut(guest_thread.task)?; 2715 if let Some(result) = task.sync_result.take() { 2716 if let Some(result) = result { 2717 storage[0] = MaybeUninit::new(result); 2718 } 2719 2720 if task.exited && task.ready_to_delete() { 2721 Waitable::Guest(guest_thread.task).delete_from(state)?; 2722 } 2723 } 2724 } 2725 2726 Ok(status.pack(waitable)) 2727 } 2728 2729 /// Poll the specified future once on behalf of a guest->host call using an 2730 /// async-lowered import. 2731 /// 2732 /// If it returns `Ready`, return `Ok(None)`. Otherwise, if it returns 2733 /// `Pending`, add it to the set of futures to be polled as part of this 2734 /// instance's event loop until it completes, and then return 2735 /// `Ok(Some(handle))` where `handle` is the waitable handle to return. 2736 /// 2737 /// Whether the future returns `Ready` immediately or later, the `lower` 2738 /// function will be used to lower the result, if any, into the guest caller's 2739 /// stack and linear memory unless the task has been cancelled. 2740 pub(crate) fn first_poll<T: 'static, R: Send + 'static>( 2741 self, 2742 mut store: StoreContextMut<'_, T>, 2743 future: impl Future<Output = Result<R>> + Send + 'static, 2744 lower: impl FnOnce(StoreContextMut<T>, R) -> Result<()> + Send + 'static, 2745 ) -> Result<Option<u32>> { 2746 let token = StoreToken::new(store.as_context_mut()); 2747 let state = store.0.concurrent_state_mut(); 2748 let task = state.unwrap_current_host_thread(); 2749 2750 // Create an abortable future which hooks calls to poll and manages call 2751 // context state for the future. 2752 let (join_handle, future) = JoinHandle::run(future); 2753 { 2754 let task = state.get_mut(task)?; 2755 assert!(task.join_handle.is_none()); 2756 task.join_handle = Some(join_handle); 2757 } 2758 2759 let mut future = Box::pin(future); 2760 2761 // Finally, poll the future. We can use a dummy `Waker` here because 2762 // we'll add the future to `ConcurrentState::futures` and poll it 2763 // automatically from the event loop if it doesn't complete immediately 2764 // here. 2765 let poll = tls::set(store.0, || { 2766 future 2767 .as_mut() 2768 .poll(&mut Context::from_waker(&Waker::noop())) 2769 }); 2770 2771 match poll { 2772 // It finished immediately; lower the result and delete the task. 2773 Poll::Ready(Some(result)) => { 2774 lower(store.as_context_mut(), result?)?; 2775 return Ok(None); 2776 } 2777 2778 // Shouldn't be possible since the future isn't cancelled via the 2779 // `join_handle`. 2780 Poll::Ready(None) => unreachable!(), 2781 2782 // Future isn't ready yet, so fall through. 2783 Poll::Pending => {} 2784 } 2785 2786 // It hasn't finished yet; add the future to 2787 // `ConcurrentState::futures` so it will be polled by the event 2788 // loop and allocate a waitable handle to return to the guest. 2789 2790 // Wrap the future in a closure responsible for lowering the result into 2791 // the guest's stack and memory, as well as notifying any waiters that 2792 // the task returned. 2793 let future = Box::pin(async move { 2794 let result = match future.await { 2795 Some(result) => result?, 2796 // Task was cancelled; nothing left to do. 2797 None => return Ok(()), 2798 }; 2799 let on_complete = move |store: &mut dyn VMStore| { 2800 // Restore the `current_thread` to be the host so `lower` knows 2801 // how to manipulate borrows and knows which scope of borrows 2802 // to check. 2803 let mut store = token.as_context_mut(store); 2804 let state = store.0.concurrent_state_mut(); 2805 assert!(state.current_thread.is_none()); 2806 store.0.set_thread(task); 2807 2808 lower(store.as_context_mut(), result)?; 2809 let state = store.0.concurrent_state_mut(); 2810 state.get_mut(task)?.join_handle.take(); 2811 Waitable::Host(task).set_event( 2812 state, 2813 Some(Event::Subtask { 2814 status: Status::Returned, 2815 }), 2816 )?; 2817 2818 // Go back to "no current thread" at the end. 2819 store.0.set_thread(CurrentThread::None); 2820 Ok(()) 2821 }; 2822 2823 // Here we schedule a task to run on a worker fiber to do the 2824 // lowering since it may involve a call to the guest's realloc 2825 // function. This is necessary because calling the guest while 2826 // there are host embedder frames on the stack is unsound. 2827 tls::get(move |store| { 2828 store 2829 .concurrent_state_mut() 2830 .push_high_priority(WorkItem::WorkerFunction(AlwaysMut::new(Box::new( 2831 on_complete, 2832 )))); 2833 Ok(()) 2834 }) 2835 }); 2836 2837 // Make this task visible to the guest and then record what it 2838 // was made visible as. 2839 let state = store.0.concurrent_state_mut(); 2840 state.push_future(future); 2841 let caller = state.get_mut(task)?.caller; 2842 let instance = state.get_mut(caller.task)?.instance; 2843 let handle = store 2844 .0 2845 .instance_state(instance) 2846 .handle_table() 2847 .subtask_insert_host(task.rep())?; 2848 store.0.concurrent_state_mut().get_mut(task)?.common.handle = Some(handle); 2849 log::trace!("assign {task:?} handle {handle} for {caller:?} instance {instance:?}"); 2850 2851 // Restore the currently running thread to this host task's 2852 // caller. Note that the host task isn't deallocated as it's 2853 // within the store and will get deallocated later. 2854 store.0.set_thread(caller); 2855 Ok(Some(handle)) 2856 } 2857 2858 /// Implements the `task.return` intrinsic, lifting the result for the 2859 /// current guest task. 2860 pub(crate) fn task_return( 2861 self, 2862 store: &mut dyn VMStore, 2863 ty: TypeTupleIndex, 2864 options: OptionsIndex, 2865 storage: &[ValRaw], 2866 ) -> Result<()> { 2867 let state = store.concurrent_state_mut(); 2868 let guest_thread = state.unwrap_current_guest_thread(); 2869 let lift = state 2870 .get_mut(guest_thread.task)? 2871 .lift_result 2872 .take() 2873 .ok_or_else(|| { 2874 format_err!("`task.return` or `task.cancel` called more than once for current task") 2875 })?; 2876 assert!(state.get_mut(guest_thread.task)?.result.is_none()); 2877 2878 let CanonicalOptions { 2879 string_encoding, 2880 data_model, 2881 .. 2882 } = &self.id().get(store).component().env_component().options[options]; 2883 2884 let invalid = ty != lift.ty 2885 || string_encoding != &lift.string_encoding 2886 || match data_model { 2887 CanonicalOptionsDataModel::LinearMemory(opts) => match opts.memory { 2888 Some(memory) => { 2889 let expected = lift.memory.map(|v| v.as_ptr()).unwrap_or(ptr::null_mut()); 2890 let actual = self.id().get(store).runtime_memory(memory); 2891 expected != actual.as_ptr() 2892 } 2893 // Memory not specified, meaning it didn't need to be 2894 // specified per validation, so not invalid. 2895 None => false, 2896 }, 2897 // Always invalid as this isn't supported. 2898 CanonicalOptionsDataModel::Gc { .. } => true, 2899 }; 2900 2901 if invalid { 2902 bail!("invalid `task.return` signature and/or options for current task"); 2903 } 2904 2905 log::trace!("task.return for {guest_thread:?}"); 2906 2907 let result = (lift.lift)(store, storage)?; 2908 self.task_complete(store, guest_thread.task, result, Status::Returned) 2909 } 2910 2911 /// Implements the `task.cancel` intrinsic. 2912 pub(crate) fn task_cancel(self, store: &mut StoreOpaque) -> Result<()> { 2913 let state = store.concurrent_state_mut(); 2914 let guest_thread = state.unwrap_current_guest_thread(); 2915 let task = state.get_mut(guest_thread.task)?; 2916 if !task.cancel_sent { 2917 bail!("`task.cancel` called by task which has not been cancelled") 2918 } 2919 _ = task.lift_result.take().ok_or_else(|| { 2920 format_err!("`task.return` or `task.cancel` called more than once for current task") 2921 })?; 2922 2923 assert!(task.result.is_none()); 2924 2925 log::trace!("task.cancel for {guest_thread:?}"); 2926 2927 self.task_complete( 2928 store, 2929 guest_thread.task, 2930 Box::new(DummyResult), 2931 Status::ReturnCancelled, 2932 ) 2933 } 2934 2935 /// Complete the specified guest task (i.e. indicate that it has either 2936 /// returned a (possibly empty) result or cancelled itself). 2937 /// 2938 /// This will return any resource borrows and notify any current or future 2939 /// waiters that the task has completed. 2940 fn task_complete( 2941 self, 2942 store: &mut StoreOpaque, 2943 guest_task: TableId<GuestTask>, 2944 result: Box<dyn Any + Send + Sync>, 2945 status: Status, 2946 ) -> Result<()> { 2947 store 2948 .component_resource_tables(Some(self)) 2949 .validate_scope_exit()?; 2950 2951 let state = store.concurrent_state_mut(); 2952 let task = state.get_mut(guest_task)?; 2953 2954 if let Caller::Host { tx, .. } = &mut task.caller { 2955 if let Some(tx) = tx.take() { 2956 _ = tx.send(result); 2957 } 2958 } else { 2959 task.result = Some(result); 2960 Waitable::Guest(guest_task).set_event(state, Some(Event::Subtask { status }))?; 2961 } 2962 2963 Ok(()) 2964 } 2965 2966 /// Implements the `waitable-set.new` intrinsic. 2967 pub(crate) fn waitable_set_new( 2968 self, 2969 store: &mut StoreOpaque, 2970 caller_instance: RuntimeComponentInstanceIndex, 2971 ) -> Result<u32> { 2972 let set = store.concurrent_state_mut().push(WaitableSet::default())?; 2973 let handle = store 2974 .instance_state(RuntimeInstance { 2975 instance: self.id().instance(), 2976 index: caller_instance, 2977 }) 2978 .handle_table() 2979 .waitable_set_insert(set.rep())?; 2980 log::trace!("new waitable set {set:?} (handle {handle})"); 2981 Ok(handle) 2982 } 2983 2984 /// Implements the `waitable-set.drop` intrinsic. 2985 pub(crate) fn waitable_set_drop( 2986 self, 2987 store: &mut StoreOpaque, 2988 caller_instance: RuntimeComponentInstanceIndex, 2989 set: u32, 2990 ) -> Result<()> { 2991 let rep = store 2992 .instance_state(RuntimeInstance { 2993 instance: self.id().instance(), 2994 index: caller_instance, 2995 }) 2996 .handle_table() 2997 .waitable_set_remove(set)?; 2998 2999 log::trace!("drop waitable set {rep} (handle {set})"); 3000 3001 let set = store 3002 .concurrent_state_mut() 3003 .delete(TableId::<WaitableSet>::new(rep))?; 3004 3005 if !set.waiting.is_empty() { 3006 bail!("cannot drop waitable set with waiters"); 3007 } 3008 3009 Ok(()) 3010 } 3011 3012 /// Implements the `waitable.join` intrinsic. 3013 pub(crate) fn waitable_join( 3014 self, 3015 store: &mut StoreOpaque, 3016 caller_instance: RuntimeComponentInstanceIndex, 3017 waitable_handle: u32, 3018 set_handle: u32, 3019 ) -> Result<()> { 3020 let mut instance = self.id().get_mut(store); 3021 let waitable = 3022 Waitable::from_instance(instance.as_mut(), caller_instance, waitable_handle)?; 3023 3024 let set = if set_handle == 0 { 3025 None 3026 } else { 3027 let set = instance.instance_states().0[caller_instance] 3028 .handle_table() 3029 .waitable_set_rep(set_handle)?; 3030 3031 Some(TableId::<WaitableSet>::new(set)) 3032 }; 3033 3034 log::trace!( 3035 "waitable {waitable:?} (handle {waitable_handle}) join set {set:?} (handle {set_handle})", 3036 ); 3037 3038 waitable.join(store.concurrent_state_mut(), set) 3039 } 3040 3041 /// Implements the `subtask.drop` intrinsic. 3042 pub(crate) fn subtask_drop( 3043 self, 3044 store: &mut StoreOpaque, 3045 caller_instance: RuntimeComponentInstanceIndex, 3046 task_id: u32, 3047 ) -> Result<()> { 3048 self.waitable_join(store, caller_instance, task_id, 0)?; 3049 3050 let (rep, is_host) = store 3051 .instance_state(RuntimeInstance { 3052 instance: self.id().instance(), 3053 index: caller_instance, 3054 }) 3055 .handle_table() 3056 .subtask_remove(task_id)?; 3057 3058 let concurrent_state = store.concurrent_state_mut(); 3059 let (waitable, expected_caller, delete) = if is_host { 3060 let id = TableId::<HostTask>::new(rep); 3061 let task = concurrent_state.get_mut(id)?; 3062 if task.join_handle.is_some() { 3063 bail!("cannot drop a subtask which has not yet resolved"); 3064 } 3065 (Waitable::Host(id), task.caller, true) 3066 } else { 3067 let id = TableId::<GuestTask>::new(rep); 3068 let task = concurrent_state.get_mut(id)?; 3069 if task.lift_result.is_some() { 3070 bail!("cannot drop a subtask which has not yet resolved"); 3071 } 3072 if let Caller::Guest { thread } = task.caller { 3073 ( 3074 Waitable::Guest(id), 3075 thread, 3076 concurrent_state.get_mut(id)?.exited, 3077 ) 3078 } else { 3079 unreachable!() 3080 } 3081 }; 3082 3083 waitable.common(concurrent_state)?.handle = None; 3084 3085 if waitable.take_event(concurrent_state)?.is_some() { 3086 bail!("cannot drop a subtask with an undelivered event"); 3087 } 3088 3089 if delete { 3090 waitable.delete_from(concurrent_state)?; 3091 } 3092 3093 // Since waitables can neither be passed between instances nor forged, 3094 // this should never fail unless there's a bug in Wasmtime, but we check 3095 // here to be sure: 3096 assert_eq!( 3097 expected_caller, 3098 concurrent_state.unwrap_current_guest_thread(), 3099 ); 3100 log::trace!("subtask_drop {waitable:?} (handle {task_id})"); 3101 Ok(()) 3102 } 3103 3104 /// Implements the `waitable-set.wait` intrinsic. 3105 pub(crate) fn waitable_set_wait( 3106 self, 3107 store: &mut StoreOpaque, 3108 options: OptionsIndex, 3109 set: u32, 3110 payload: u32, 3111 ) -> Result<u32> { 3112 if !self.options(store, options).async_ { 3113 // The caller may only call `waitable-set.wait` from an async task 3114 // (i.e. a task created via a call to an async export). 3115 // Otherwise, we'll trap. 3116 store.check_blocking()?; 3117 } 3118 3119 let &CanonicalOptions { 3120 cancellable, 3121 instance: caller_instance, 3122 .. 3123 } = &self.id().get(store).component().env_component().options[options]; 3124 let rep = store 3125 .instance_state(RuntimeInstance { 3126 instance: self.id().instance(), 3127 index: caller_instance, 3128 }) 3129 .handle_table() 3130 .waitable_set_rep(set)?; 3131 3132 self.waitable_check( 3133 store, 3134 cancellable, 3135 WaitableCheck::Wait, 3136 WaitableCheckParams { 3137 set: TableId::new(rep), 3138 options, 3139 payload, 3140 }, 3141 ) 3142 } 3143 3144 /// Implements the `waitable-set.poll` intrinsic. 3145 pub(crate) fn waitable_set_poll( 3146 self, 3147 store: &mut StoreOpaque, 3148 options: OptionsIndex, 3149 set: u32, 3150 payload: u32, 3151 ) -> Result<u32> { 3152 let &CanonicalOptions { 3153 cancellable, 3154 instance: caller_instance, 3155 .. 3156 } = &self.id().get(store).component().env_component().options[options]; 3157 let rep = store 3158 .instance_state(RuntimeInstance { 3159 instance: self.id().instance(), 3160 index: caller_instance, 3161 }) 3162 .handle_table() 3163 .waitable_set_rep(set)?; 3164 3165 self.waitable_check( 3166 store, 3167 cancellable, 3168 WaitableCheck::Poll, 3169 WaitableCheckParams { 3170 set: TableId::new(rep), 3171 options, 3172 payload, 3173 }, 3174 ) 3175 } 3176 3177 /// Implements the `thread.index` intrinsic. 3178 pub(crate) fn thread_index(&self, store: &mut dyn VMStore) -> Result<u32> { 3179 let thread_id = store 3180 .concurrent_state_mut() 3181 .unwrap_current_guest_thread() 3182 .thread; 3183 // The unwrap is safe because `instance_rep` must be `Some` by this point 3184 Ok(store 3185 .concurrent_state_mut() 3186 .get_mut(thread_id)? 3187 .instance_rep 3188 .unwrap()) 3189 } 3190 3191 /// Implements the `thread.new-indirect` intrinsic. 3192 pub(crate) fn thread_new_indirect<T: 'static>( 3193 self, 3194 mut store: StoreContextMut<T>, 3195 runtime_instance: RuntimeComponentInstanceIndex, 3196 _func_ty_idx: TypeFuncIndex, // currently unused 3197 start_func_table_idx: RuntimeTableIndex, 3198 start_func_idx: u32, 3199 context: i32, 3200 ) -> Result<u32> { 3201 log::trace!("creating new thread"); 3202 3203 let start_func_ty = FuncType::new(store.engine(), [ValType::I32], []); 3204 let (instance, registry) = self.id().get_mut_and_registry(store.0); 3205 let callee = instance 3206 .index_runtime_func_table(registry, start_func_table_idx, start_func_idx as u64)? 3207 .ok_or_else(|| { 3208 format_err!("the start function index points to an uninitialized function") 3209 })?; 3210 if callee.type_index(store.0) != start_func_ty.type_index() { 3211 bail!( 3212 "start function does not match expected type (currently only `(i32) -> ()` is supported)" 3213 ); 3214 } 3215 3216 let token = StoreToken::new(store.as_context_mut()); 3217 let start_func = Box::new( 3218 move |store: &mut dyn VMStore, guest_thread: QualifiedThreadId| -> Result<()> { 3219 let old_thread = store.set_thread(guest_thread); 3220 log::trace!( 3221 "thread start: replaced {old_thread:?} with {guest_thread:?} as current thread" 3222 ); 3223 3224 let mut store = token.as_context_mut(store); 3225 let mut params = [ValRaw::i32(context)]; 3226 // Use call_unchecked rather than call or call_async, as we don't want to run the function 3227 // on a separate fiber if we're running in an async store. 3228 unsafe { callee.call_unchecked(store.as_context_mut(), &mut params)? }; 3229 3230 self.cleanup_thread(store.0, guest_thread, runtime_instance)?; 3231 log::trace!("explicit thread {guest_thread:?} completed"); 3232 let state = store.0.concurrent_state_mut(); 3233 let task = state.get_mut(guest_thread.task)?; 3234 if task.threads.is_empty() && !task.returned_or_cancelled() { 3235 bail!(Trap::NoAsyncResult); 3236 } 3237 store.0.set_thread(old_thread); 3238 let state = store.0.concurrent_state_mut(); 3239 old_thread 3240 .guest() 3241 .map(|t| state.get_mut(t.thread).unwrap().state = GuestThreadState::Running); 3242 if state.get_mut(guest_thread.task)?.ready_to_delete() { 3243 Waitable::Guest(guest_thread.task).delete_from(state)?; 3244 } 3245 log::trace!("thread start: restored {old_thread:?} as current thread"); 3246 3247 Ok(()) 3248 }, 3249 ); 3250 3251 let state = store.0.concurrent_state_mut(); 3252 let current_thread = state.unwrap_current_guest_thread(); 3253 let parent_task = current_thread.task; 3254 3255 let new_thread = GuestThread::new_explicit(parent_task, start_func); 3256 let thread_id = state.push(new_thread)?; 3257 state.get_mut(parent_task)?.threads.insert(thread_id); 3258 3259 log::trace!("new thread with id {thread_id:?} created"); 3260 3261 self.add_guest_thread_to_instance_table(thread_id, store.0, runtime_instance) 3262 } 3263 3264 pub(crate) fn resume_thread( 3265 self, 3266 store: &mut StoreOpaque, 3267 runtime_instance: RuntimeComponentInstanceIndex, 3268 thread_idx: u32, 3269 high_priority: bool, 3270 allow_ready: bool, 3271 ) -> Result<()> { 3272 let thread_id = 3273 GuestThread::from_instance(self.id().get_mut(store), runtime_instance, thread_idx)?; 3274 let state = store.concurrent_state_mut(); 3275 let guest_thread = QualifiedThreadId::qualify(state, thread_id)?; 3276 let thread = state.get_mut(guest_thread.thread)?; 3277 3278 match mem::replace(&mut thread.state, GuestThreadState::Running) { 3279 GuestThreadState::NotStartedExplicit(start_func) => { 3280 log::trace!("starting thread {guest_thread:?}"); 3281 let guest_call = WorkItem::GuestCall( 3282 runtime_instance, 3283 GuestCall { 3284 thread: guest_thread, 3285 kind: GuestCallKind::StartExplicit(Box::new(move |store| { 3286 start_func(store, guest_thread) 3287 })), 3288 }, 3289 ); 3290 store 3291 .concurrent_state_mut() 3292 .push_work_item(guest_call, high_priority); 3293 } 3294 GuestThreadState::Suspended(fiber) => { 3295 log::trace!("resuming thread {thread_id:?} that was suspended"); 3296 store 3297 .concurrent_state_mut() 3298 .push_work_item(WorkItem::ResumeFiber(fiber), high_priority); 3299 } 3300 GuestThreadState::Ready(fiber) if allow_ready => { 3301 log::trace!("resuming thread {thread_id:?} that was ready"); 3302 thread.state = GuestThreadState::Ready(fiber); 3303 store 3304 .concurrent_state_mut() 3305 .promote_thread_work_item(guest_thread); 3306 } 3307 other => { 3308 thread.state = other; 3309 bail!("cannot resume thread which is not suspended"); 3310 } 3311 } 3312 Ok(()) 3313 } 3314 3315 fn add_guest_thread_to_instance_table( 3316 self, 3317 thread_id: TableId<GuestThread>, 3318 store: &mut StoreOpaque, 3319 runtime_instance: RuntimeComponentInstanceIndex, 3320 ) -> Result<u32> { 3321 let guest_id = store 3322 .instance_state(RuntimeInstance { 3323 instance: self.id().instance(), 3324 index: runtime_instance, 3325 }) 3326 .thread_handle_table() 3327 .guest_thread_insert(thread_id.rep())?; 3328 store 3329 .concurrent_state_mut() 3330 .get_mut(thread_id)? 3331 .instance_rep = Some(guest_id); 3332 Ok(guest_id) 3333 } 3334 3335 /// Helper function for the `thread.yield`, `thread.yield-to-suspended`, `thread.suspend`, 3336 /// `thread.suspend-to`, and `thread.suspend-to-suspended` intrinsics. 3337 pub(crate) fn suspension_intrinsic( 3338 self, 3339 store: &mut StoreOpaque, 3340 caller: RuntimeComponentInstanceIndex, 3341 cancellable: bool, 3342 yielding: bool, 3343 to_thread: SuspensionTarget, 3344 ) -> Result<WaitResult> { 3345 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread(); 3346 if to_thread.is_none() { 3347 let state = store.concurrent_state_mut(); 3348 if yielding { 3349 // This is a `thread.yield` call 3350 if !state.may_block(guest_thread.task) { 3351 // In a non-blocking context, a `thread.yield` may trigger 3352 // other threads in the same component instance to run. 3353 if !state.promote_instance_local_thread_work_item(caller) { 3354 // No other threads are runnable, so just return 3355 return Ok(WaitResult::Completed); 3356 } 3357 } 3358 } else { 3359 // The caller may only call `thread.suspend` from an async task 3360 // (i.e. a task created via a call to an async export). 3361 // Otherwise, we'll trap. 3362 store.check_blocking()?; 3363 } 3364 } 3365 3366 // There could be a pending cancellation from a previous uncancellable wait 3367 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3368 return Ok(WaitResult::Cancelled); 3369 } 3370 3371 match to_thread { 3372 SuspensionTarget::SomeSuspended(thread) => { 3373 self.resume_thread(store, caller, thread, true, false)? 3374 } 3375 SuspensionTarget::Some(thread) => { 3376 self.resume_thread(store, caller, thread, true, true)? 3377 } 3378 SuspensionTarget::None => { /* nothing to do */ } 3379 } 3380 3381 let reason = if yielding { 3382 SuspendReason::Yielding { 3383 thread: guest_thread, 3384 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3385 // we're handling a `thread.yield-to-suspended` call; otherwise it would 3386 // panic if we called it in a non-blocking context. 3387 skip_may_block_check: to_thread.is_some(), 3388 } 3389 } else { 3390 SuspendReason::ExplicitlySuspending { 3391 thread: guest_thread, 3392 // Tell `StoreOpaque::suspend` it's okay to suspend here since 3393 // we're handling a `thread.suspend-to(-suspended)` call; otherwise it would 3394 // panic if we called it in a non-blocking context. 3395 skip_may_block_check: to_thread.is_some(), 3396 } 3397 }; 3398 3399 store.suspend(reason)?; 3400 3401 if cancellable && store.concurrent_state_mut().take_pending_cancellation() { 3402 Ok(WaitResult::Cancelled) 3403 } else { 3404 Ok(WaitResult::Completed) 3405 } 3406 } 3407 3408 /// Helper function for the `waitable-set.wait` and `waitable-set.poll` intrinsics. 3409 fn waitable_check( 3410 self, 3411 store: &mut StoreOpaque, 3412 cancellable: bool, 3413 check: WaitableCheck, 3414 params: WaitableCheckParams, 3415 ) -> Result<u32> { 3416 let guest_thread = store.concurrent_state_mut().unwrap_current_guest_thread(); 3417 3418 log::trace!("waitable check for {guest_thread:?}; set {:?}", params.set); 3419 3420 let state = store.concurrent_state_mut(); 3421 let task = state.get_mut(guest_thread.task)?; 3422 3423 // If we're waiting, and there are no events immediately available, 3424 // suspend the fiber until that changes. 3425 match &check { 3426 WaitableCheck::Wait => { 3427 let set = params.set; 3428 3429 if (task.event.is_none() 3430 || (matches!(task.event, Some(Event::Cancelled)) && !cancellable)) 3431 && state.get_mut(set)?.ready.is_empty() 3432 { 3433 if cancellable { 3434 let old = state 3435 .get_mut(guest_thread.thread)? 3436 .wake_on_cancel 3437 .replace(set); 3438 assert!(old.is_none()); 3439 } 3440 3441 store.suspend(SuspendReason::Waiting { 3442 set, 3443 thread: guest_thread, 3444 skip_may_block_check: false, 3445 })?; 3446 } 3447 } 3448 WaitableCheck::Poll => {} 3449 } 3450 3451 log::trace!( 3452 "waitable check for {guest_thread:?}; set {:?}, part two", 3453 params.set 3454 ); 3455 3456 // Deliver any pending events to the guest and return. 3457 let event = self.get_event(store, guest_thread.task, Some(params.set), cancellable)?; 3458 3459 let (ordinal, handle, result) = match &check { 3460 WaitableCheck::Wait => { 3461 let (event, waitable) = event.unwrap(); 3462 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3463 let (ordinal, result) = event.parts(); 3464 (ordinal, handle, result) 3465 } 3466 WaitableCheck::Poll => { 3467 if let Some((event, waitable)) = event { 3468 let handle = waitable.map(|(_, v)| v).unwrap_or(0); 3469 let (ordinal, result) = event.parts(); 3470 (ordinal, handle, result) 3471 } else { 3472 log::trace!( 3473 "no events ready to deliver via waitable-set.poll to {:?}; set {:?}", 3474 guest_thread.task, 3475 params.set 3476 ); 3477 let (ordinal, result) = Event::None.parts(); 3478 (ordinal, 0, result) 3479 } 3480 } 3481 }; 3482 let memory = self.options_memory_mut(store, params.options); 3483 let ptr = func::validate_inbounds_dynamic( 3484 &CanonicalAbiInfo::POINTER_PAIR, 3485 memory, 3486 &ValRaw::u32(params.payload), 3487 )?; 3488 memory[ptr + 0..][..4].copy_from_slice(&handle.to_le_bytes()); 3489 memory[ptr + 4..][..4].copy_from_slice(&result.to_le_bytes()); 3490 Ok(ordinal) 3491 } 3492 3493 /// Implements the `subtask.cancel` intrinsic. 3494 pub(crate) fn subtask_cancel( 3495 self, 3496 store: &mut StoreOpaque, 3497 caller_instance: RuntimeComponentInstanceIndex, 3498 async_: bool, 3499 task_id: u32, 3500 ) -> Result<u32> { 3501 if !async_ { 3502 // The caller may only sync call `subtask.cancel` from an async task 3503 // (i.e. a task created via a call to an async export). Otherwise, 3504 // we'll trap. 3505 store.check_blocking()?; 3506 } 3507 3508 let (rep, is_host) = store 3509 .instance_state(RuntimeInstance { 3510 instance: self.id().instance(), 3511 index: caller_instance, 3512 }) 3513 .handle_table() 3514 .subtask_rep(task_id)?; 3515 let (waitable, expected_caller) = if is_host { 3516 let id = TableId::<HostTask>::new(rep); 3517 ( 3518 Waitable::Host(id), 3519 store.concurrent_state_mut().get_mut(id)?.caller, 3520 ) 3521 } else { 3522 let id = TableId::<GuestTask>::new(rep); 3523 if let Caller::Guest { thread } = store.concurrent_state_mut().get_mut(id)?.caller { 3524 (Waitable::Guest(id), thread) 3525 } else { 3526 unreachable!() 3527 } 3528 }; 3529 // Since waitables can neither be passed between instances nor forged, 3530 // this should never fail unless there's a bug in Wasmtime, but we check 3531 // here to be sure: 3532 let concurrent_state = store.concurrent_state_mut(); 3533 assert_eq!( 3534 expected_caller, 3535 concurrent_state.unwrap_current_guest_thread(), 3536 ); 3537 3538 log::trace!("subtask_cancel {waitable:?} (handle {task_id})"); 3539 3540 if let Waitable::Host(host_task) = waitable { 3541 if let Some(handle) = concurrent_state.get_mut(host_task)?.join_handle.take() { 3542 handle.abort(); 3543 return Ok(Status::ReturnCancelled as u32); 3544 } 3545 } else { 3546 let caller = concurrent_state.unwrap_current_guest_thread(); 3547 let guest_task = TableId::<GuestTask>::new(rep); 3548 let task = concurrent_state.get_mut(guest_task)?; 3549 if !task.already_lowered_parameters() { 3550 // The task is in a `starting` state, meaning it hasn't run at 3551 // all yet. Here we update its fields to indicate that it is 3552 // ready to delete immediately once `subtask.drop` is called. 3553 task.lower_params = None; 3554 task.lift_result = None; 3555 task.exited = true; 3556 3557 let instance = task.instance; 3558 3559 assert_eq!(1, task.threads.len()); 3560 let thread = mem::take(&mut task.threads).into_iter().next().unwrap(); 3561 let concurrent_state = store.concurrent_state_mut(); 3562 concurrent_state.delete(thread)?; 3563 assert!(concurrent_state.get_mut(guest_task)?.ready_to_delete()); 3564 3565 // Not yet started; cancel and remove from pending 3566 let pending = &mut store.instance_state(instance).concurrent_state().pending; 3567 let pending_count = pending.len(); 3568 pending.retain(|thread, _| thread.task != guest_task); 3569 // If there were no pending threads for this task, we're in an error state 3570 if pending.len() == pending_count { 3571 bail!("`subtask.cancel` called after terminal status delivered"); 3572 } 3573 return Ok(Status::StartCancelled as u32); 3574 } else if !task.returned_or_cancelled() { 3575 // Started, but not yet returned or cancelled; send the 3576 // `CANCELLED` event 3577 task.cancel_sent = true; 3578 // Note that this might overwrite an event that was set earlier 3579 // (e.g. `Event::None` if the task is yielding, or 3580 // `Event::Cancelled` if it was already cancelled), but that's 3581 // okay -- this should supersede the previous state. 3582 task.event = Some(Event::Cancelled); 3583 let runtime_instance = task.instance.index; 3584 for thread in task.threads.clone() { 3585 let thread = QualifiedThreadId { 3586 task: guest_task, 3587 thread, 3588 }; 3589 if let Some(set) = concurrent_state 3590 .get_mut(thread.thread) 3591 .unwrap() 3592 .wake_on_cancel 3593 .take() 3594 { 3595 let item = match concurrent_state 3596 .get_mut(set)? 3597 .waiting 3598 .remove(&thread) 3599 .unwrap() 3600 { 3601 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 3602 WaitMode::Callback(instance) => WorkItem::GuestCall( 3603 runtime_instance, 3604 GuestCall { 3605 thread, 3606 kind: GuestCallKind::DeliverEvent { 3607 instance, 3608 set: None, 3609 }, 3610 }, 3611 ), 3612 }; 3613 concurrent_state.push_high_priority(item); 3614 3615 store.suspend(SuspendReason::Yielding { 3616 thread: caller, 3617 // `subtask.cancel` is not allowed to be called in a 3618 // sync context, so we cannot skip the may-block check. 3619 skip_may_block_check: false, 3620 })?; 3621 break; 3622 } 3623 } 3624 3625 let concurrent_state = store.concurrent_state_mut(); 3626 let task = concurrent_state.get_mut(guest_task)?; 3627 if !task.returned_or_cancelled() { 3628 if async_ { 3629 return Ok(BLOCKED); 3630 } else { 3631 store.wait_for_event(Waitable::Guest(guest_task))?; 3632 } 3633 } 3634 } 3635 } 3636 3637 let event = waitable.take_event(store.concurrent_state_mut())?; 3638 if let Some(Event::Subtask { 3639 status: status @ (Status::Returned | Status::ReturnCancelled), 3640 }) = event 3641 { 3642 Ok(status as u32) 3643 } else { 3644 bail!("`subtask.cancel` called after terminal status delivered"); 3645 } 3646 } 3647 3648 pub(crate) fn context_get(self, store: &mut StoreOpaque, slot: u32) -> Result<u32> { 3649 store.concurrent_state_mut().context_get(slot) 3650 } 3651 3652 pub(crate) fn context_set(self, store: &mut StoreOpaque, slot: u32, value: u32) -> Result<()> { 3653 store.concurrent_state_mut().context_set(slot, value) 3654 } 3655 } 3656 3657 /// Trait representing component model ABI async intrinsics and fused adapter 3658 /// helper functions. 3659 /// 3660 /// SAFETY (callers): Most of the methods in this trait accept raw pointers, 3661 /// which must be valid for at least the duration of the call (and possibly for 3662 /// as long as the relevant guest task exists, in the case of `*mut VMFuncRef` 3663 /// pointers used for async calls). 3664 pub trait VMComponentAsyncStore { 3665 /// A helper function for fused adapter modules involving calls where the 3666 /// one of the caller or callee is async. 3667 /// 3668 /// This helper is not used when the caller and callee both use the sync 3669 /// ABI, only when at least one is async is this used. 3670 unsafe fn prepare_call( 3671 &mut self, 3672 instance: Instance, 3673 memory: *mut VMMemoryDefinition, 3674 start: *mut VMFuncRef, 3675 return_: *mut VMFuncRef, 3676 caller_instance: RuntimeComponentInstanceIndex, 3677 callee_instance: RuntimeComponentInstanceIndex, 3678 task_return_type: TypeTupleIndex, 3679 callee_async: bool, 3680 string_encoding: u8, 3681 result_count: u32, 3682 storage: *mut ValRaw, 3683 storage_len: usize, 3684 ) -> Result<()>; 3685 3686 /// A helper function for fused adapter modules involving calls where the 3687 /// caller is sync-lowered but the callee is async-lifted. 3688 unsafe fn sync_start( 3689 &mut self, 3690 instance: Instance, 3691 callback: *mut VMFuncRef, 3692 callee: *mut VMFuncRef, 3693 param_count: u32, 3694 storage: *mut MaybeUninit<ValRaw>, 3695 storage_len: usize, 3696 ) -> Result<()>; 3697 3698 /// A helper function for fused adapter modules involving calls where the 3699 /// caller is async-lowered. 3700 unsafe fn async_start( 3701 &mut self, 3702 instance: Instance, 3703 callback: *mut VMFuncRef, 3704 post_return: *mut VMFuncRef, 3705 callee: *mut VMFuncRef, 3706 param_count: u32, 3707 result_count: u32, 3708 flags: u32, 3709 ) -> Result<u32>; 3710 3711 /// The `future.write` intrinsic. 3712 fn future_write( 3713 &mut self, 3714 instance: Instance, 3715 caller: RuntimeComponentInstanceIndex, 3716 ty: TypeFutureTableIndex, 3717 options: OptionsIndex, 3718 future: u32, 3719 address: u32, 3720 ) -> Result<u32>; 3721 3722 /// The `future.read` intrinsic. 3723 fn future_read( 3724 &mut self, 3725 instance: Instance, 3726 caller: RuntimeComponentInstanceIndex, 3727 ty: TypeFutureTableIndex, 3728 options: OptionsIndex, 3729 future: u32, 3730 address: u32, 3731 ) -> Result<u32>; 3732 3733 /// The `future.drop-writable` intrinsic. 3734 fn future_drop_writable( 3735 &mut self, 3736 instance: Instance, 3737 ty: TypeFutureTableIndex, 3738 writer: u32, 3739 ) -> Result<()>; 3740 3741 /// The `stream.write` intrinsic. 3742 fn stream_write( 3743 &mut self, 3744 instance: Instance, 3745 caller: RuntimeComponentInstanceIndex, 3746 ty: TypeStreamTableIndex, 3747 options: OptionsIndex, 3748 stream: u32, 3749 address: u32, 3750 count: u32, 3751 ) -> Result<u32>; 3752 3753 /// The `stream.read` intrinsic. 3754 fn stream_read( 3755 &mut self, 3756 instance: Instance, 3757 caller: RuntimeComponentInstanceIndex, 3758 ty: TypeStreamTableIndex, 3759 options: OptionsIndex, 3760 stream: u32, 3761 address: u32, 3762 count: u32, 3763 ) -> Result<u32>; 3764 3765 /// The "fast-path" implementation of the `stream.write` intrinsic for 3766 /// "flat" (i.e. memcpy-able) payloads. 3767 fn flat_stream_write( 3768 &mut self, 3769 instance: Instance, 3770 caller: RuntimeComponentInstanceIndex, 3771 ty: TypeStreamTableIndex, 3772 options: OptionsIndex, 3773 payload_size: u32, 3774 payload_align: u32, 3775 stream: u32, 3776 address: u32, 3777 count: u32, 3778 ) -> Result<u32>; 3779 3780 /// The "fast-path" implementation of the `stream.read` intrinsic for "flat" 3781 /// (i.e. memcpy-able) payloads. 3782 fn flat_stream_read( 3783 &mut self, 3784 instance: Instance, 3785 caller: RuntimeComponentInstanceIndex, 3786 ty: TypeStreamTableIndex, 3787 options: OptionsIndex, 3788 payload_size: u32, 3789 payload_align: u32, 3790 stream: u32, 3791 address: u32, 3792 count: u32, 3793 ) -> Result<u32>; 3794 3795 /// The `stream.drop-writable` intrinsic. 3796 fn stream_drop_writable( 3797 &mut self, 3798 instance: Instance, 3799 ty: TypeStreamTableIndex, 3800 writer: u32, 3801 ) -> Result<()>; 3802 3803 /// The `error-context.debug-message` intrinsic. 3804 fn error_context_debug_message( 3805 &mut self, 3806 instance: Instance, 3807 ty: TypeComponentLocalErrorContextTableIndex, 3808 options: OptionsIndex, 3809 err_ctx_handle: u32, 3810 debug_msg_address: u32, 3811 ) -> Result<()>; 3812 3813 /// The `thread.new-indirect` intrinsic 3814 fn thread_new_indirect( 3815 &mut self, 3816 instance: Instance, 3817 caller: RuntimeComponentInstanceIndex, 3818 func_ty_idx: TypeFuncIndex, 3819 start_func_table_idx: RuntimeTableIndex, 3820 start_func_idx: u32, 3821 context: i32, 3822 ) -> Result<u32>; 3823 } 3824 3825 /// SAFETY: See trait docs. 3826 impl<T: 'static> VMComponentAsyncStore for StoreInner<T> { 3827 unsafe fn prepare_call( 3828 &mut self, 3829 instance: Instance, 3830 memory: *mut VMMemoryDefinition, 3831 start: *mut VMFuncRef, 3832 return_: *mut VMFuncRef, 3833 caller_instance: RuntimeComponentInstanceIndex, 3834 callee_instance: RuntimeComponentInstanceIndex, 3835 task_return_type: TypeTupleIndex, 3836 callee_async: bool, 3837 string_encoding: u8, 3838 result_count_or_max_if_async: u32, 3839 storage: *mut ValRaw, 3840 storage_len: usize, 3841 ) -> Result<()> { 3842 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3843 // this method will have ensured that `storage` is a valid 3844 // pointer containing at least `storage_len` items. 3845 let params = unsafe { std::slice::from_raw_parts(storage, storage_len) }.to_vec(); 3846 3847 unsafe { 3848 instance.prepare_call( 3849 StoreContextMut(self), 3850 start, 3851 return_, 3852 caller_instance, 3853 callee_instance, 3854 task_return_type, 3855 callee_async, 3856 memory, 3857 string_encoding, 3858 match result_count_or_max_if_async { 3859 PREPARE_ASYNC_NO_RESULT => CallerInfo::Async { 3860 params, 3861 has_result: false, 3862 }, 3863 PREPARE_ASYNC_WITH_RESULT => CallerInfo::Async { 3864 params, 3865 has_result: true, 3866 }, 3867 result_count => CallerInfo::Sync { 3868 params, 3869 result_count, 3870 }, 3871 }, 3872 ) 3873 } 3874 } 3875 3876 unsafe fn sync_start( 3877 &mut self, 3878 instance: Instance, 3879 callback: *mut VMFuncRef, 3880 callee: *mut VMFuncRef, 3881 param_count: u32, 3882 storage: *mut MaybeUninit<ValRaw>, 3883 storage_len: usize, 3884 ) -> Result<()> { 3885 unsafe { 3886 instance 3887 .start_call( 3888 StoreContextMut(self), 3889 callback, 3890 ptr::null_mut(), 3891 callee, 3892 param_count, 3893 1, 3894 START_FLAG_ASYNC_CALLEE, 3895 // SAFETY: The `wasmtime_cranelift`-generated code that calls 3896 // this method will have ensured that `storage` is a valid 3897 // pointer containing at least `storage_len` items. 3898 Some(std::slice::from_raw_parts_mut(storage, storage_len)), 3899 ) 3900 .map(drop) 3901 } 3902 } 3903 3904 unsafe fn async_start( 3905 &mut self, 3906 instance: Instance, 3907 callback: *mut VMFuncRef, 3908 post_return: *mut VMFuncRef, 3909 callee: *mut VMFuncRef, 3910 param_count: u32, 3911 result_count: u32, 3912 flags: u32, 3913 ) -> Result<u32> { 3914 unsafe { 3915 instance.start_call( 3916 StoreContextMut(self), 3917 callback, 3918 post_return, 3919 callee, 3920 param_count, 3921 result_count, 3922 flags, 3923 None, 3924 ) 3925 } 3926 } 3927 3928 fn future_write( 3929 &mut self, 3930 instance: Instance, 3931 caller: RuntimeComponentInstanceIndex, 3932 ty: TypeFutureTableIndex, 3933 options: OptionsIndex, 3934 future: u32, 3935 address: u32, 3936 ) -> Result<u32> { 3937 instance 3938 .guest_write( 3939 StoreContextMut(self), 3940 caller, 3941 TransmitIndex::Future(ty), 3942 options, 3943 None, 3944 future, 3945 address, 3946 1, 3947 ) 3948 .map(|result| result.encode()) 3949 } 3950 3951 fn future_read( 3952 &mut self, 3953 instance: Instance, 3954 caller: RuntimeComponentInstanceIndex, 3955 ty: TypeFutureTableIndex, 3956 options: OptionsIndex, 3957 future: u32, 3958 address: u32, 3959 ) -> Result<u32> { 3960 instance 3961 .guest_read( 3962 StoreContextMut(self), 3963 caller, 3964 TransmitIndex::Future(ty), 3965 options, 3966 None, 3967 future, 3968 address, 3969 1, 3970 ) 3971 .map(|result| result.encode()) 3972 } 3973 3974 fn stream_write( 3975 &mut self, 3976 instance: Instance, 3977 caller: RuntimeComponentInstanceIndex, 3978 ty: TypeStreamTableIndex, 3979 options: OptionsIndex, 3980 stream: u32, 3981 address: u32, 3982 count: u32, 3983 ) -> Result<u32> { 3984 instance 3985 .guest_write( 3986 StoreContextMut(self), 3987 caller, 3988 TransmitIndex::Stream(ty), 3989 options, 3990 None, 3991 stream, 3992 address, 3993 count, 3994 ) 3995 .map(|result| result.encode()) 3996 } 3997 3998 fn stream_read( 3999 &mut self, 4000 instance: Instance, 4001 caller: RuntimeComponentInstanceIndex, 4002 ty: TypeStreamTableIndex, 4003 options: OptionsIndex, 4004 stream: u32, 4005 address: u32, 4006 count: u32, 4007 ) -> Result<u32> { 4008 instance 4009 .guest_read( 4010 StoreContextMut(self), 4011 caller, 4012 TransmitIndex::Stream(ty), 4013 options, 4014 None, 4015 stream, 4016 address, 4017 count, 4018 ) 4019 .map(|result| result.encode()) 4020 } 4021 4022 fn future_drop_writable( 4023 &mut self, 4024 instance: Instance, 4025 ty: TypeFutureTableIndex, 4026 writer: u32, 4027 ) -> Result<()> { 4028 instance.guest_drop_writable(self, TransmitIndex::Future(ty), writer) 4029 } 4030 4031 fn flat_stream_write( 4032 &mut self, 4033 instance: Instance, 4034 caller: RuntimeComponentInstanceIndex, 4035 ty: TypeStreamTableIndex, 4036 options: OptionsIndex, 4037 payload_size: u32, 4038 payload_align: u32, 4039 stream: u32, 4040 address: u32, 4041 count: u32, 4042 ) -> Result<u32> { 4043 instance 4044 .guest_write( 4045 StoreContextMut(self), 4046 caller, 4047 TransmitIndex::Stream(ty), 4048 options, 4049 Some(FlatAbi { 4050 size: payload_size, 4051 align: payload_align, 4052 }), 4053 stream, 4054 address, 4055 count, 4056 ) 4057 .map(|result| result.encode()) 4058 } 4059 4060 fn flat_stream_read( 4061 &mut self, 4062 instance: Instance, 4063 caller: RuntimeComponentInstanceIndex, 4064 ty: TypeStreamTableIndex, 4065 options: OptionsIndex, 4066 payload_size: u32, 4067 payload_align: u32, 4068 stream: u32, 4069 address: u32, 4070 count: u32, 4071 ) -> Result<u32> { 4072 instance 4073 .guest_read( 4074 StoreContextMut(self), 4075 caller, 4076 TransmitIndex::Stream(ty), 4077 options, 4078 Some(FlatAbi { 4079 size: payload_size, 4080 align: payload_align, 4081 }), 4082 stream, 4083 address, 4084 count, 4085 ) 4086 .map(|result| result.encode()) 4087 } 4088 4089 fn stream_drop_writable( 4090 &mut self, 4091 instance: Instance, 4092 ty: TypeStreamTableIndex, 4093 writer: u32, 4094 ) -> Result<()> { 4095 instance.guest_drop_writable(self, TransmitIndex::Stream(ty), writer) 4096 } 4097 4098 fn error_context_debug_message( 4099 &mut self, 4100 instance: Instance, 4101 ty: TypeComponentLocalErrorContextTableIndex, 4102 options: OptionsIndex, 4103 err_ctx_handle: u32, 4104 debug_msg_address: u32, 4105 ) -> Result<()> { 4106 instance.error_context_debug_message( 4107 StoreContextMut(self), 4108 ty, 4109 options, 4110 err_ctx_handle, 4111 debug_msg_address, 4112 ) 4113 } 4114 4115 fn thread_new_indirect( 4116 &mut self, 4117 instance: Instance, 4118 caller: RuntimeComponentInstanceIndex, 4119 func_ty_idx: TypeFuncIndex, 4120 start_func_table_idx: RuntimeTableIndex, 4121 start_func_idx: u32, 4122 context: i32, 4123 ) -> Result<u32> { 4124 instance.thread_new_indirect( 4125 StoreContextMut(self), 4126 caller, 4127 func_ty_idx, 4128 start_func_table_idx, 4129 start_func_idx, 4130 context, 4131 ) 4132 } 4133 } 4134 4135 type HostTaskFuture = Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>; 4136 4137 /// Represents the state of a pending host task. 4138 /// 4139 /// This is used to represent tasks when the guest calls into the host. 4140 struct HostTask { 4141 common: WaitableCommon, 4142 4143 /// Guest thread which called the host. 4144 caller: QualifiedThreadId, 4145 4146 /// State of borrows/etc the host needs to track. Used when the guest passes 4147 /// borrows to the host, for example. 4148 call_context: CallContext, 4149 4150 /// For host tasks which end up doing some asynchronous work (e.g. 4151 /// async-lowered and didn't complete on the first poll) this handle is used 4152 /// as a signal to cancel the future as it resides in the store's 4153 /// `FuturesUnordered`. 4154 join_handle: Option<JoinHandle>, 4155 4156 /// Box<Any> of the result of this host task. 4157 result: Option<LiftedResult>, 4158 } 4159 4160 impl HostTask { 4161 fn new(caller: QualifiedThreadId) -> Self { 4162 Self { 4163 common: WaitableCommon::default(), 4164 call_context: CallContext::default(), 4165 caller, 4166 join_handle: None, 4167 result: None, 4168 } 4169 } 4170 } 4171 4172 impl TableDebug for HostTask { 4173 fn type_name() -> &'static str { 4174 "HostTask" 4175 } 4176 } 4177 4178 type CallbackFn = Box<dyn Fn(&mut dyn VMStore, Event, u32) -> Result<u32> + Send + Sync + 'static>; 4179 4180 /// Represents the caller of a given guest task. 4181 enum Caller { 4182 /// The host called the guest task. 4183 Host { 4184 /// If present, may be used to deliver the result. 4185 tx: Option<oneshot::Sender<LiftedResult>>, 4186 /// Channel to notify once all subtasks spawned by this caller have 4187 /// completed. 4188 /// 4189 /// Note that we'll never actually send anything to this channel; 4190 /// dropping it when the refcount goes to zero is sufficient to notify 4191 /// the receiver. 4192 exit_tx: Arc<oneshot::Sender<()>>, 4193 /// If true, there's a host future that must be dropped before the task 4194 /// can be deleted. 4195 host_future_present: bool, 4196 /// Represents the caller of the host function which called back into a 4197 /// guest. Note that this thread could belong to an entirely unrelated 4198 /// top-level component instance than the one the host called into. 4199 caller: CurrentThread, 4200 }, 4201 /// Another guest thread called the guest task 4202 Guest { 4203 /// The id of the caller 4204 thread: QualifiedThreadId, 4205 }, 4206 } 4207 4208 /// Represents a closure and related canonical ABI parameters required to 4209 /// validate a `task.return` call at runtime and lift the result. 4210 struct LiftResult { 4211 lift: RawLift, 4212 ty: TypeTupleIndex, 4213 memory: Option<SendSyncPtr<VMMemoryDefinition>>, 4214 string_encoding: StringEncoding, 4215 } 4216 4217 /// The table ID for a guest thread, qualified by the task to which it belongs. 4218 /// 4219 /// This exists to minimize table lookups and the necessity to pass stores around mutably 4220 /// for the common case of identifying the task to which a thread belongs. 4221 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4222 struct QualifiedThreadId { 4223 task: TableId<GuestTask>, 4224 thread: TableId<GuestThread>, 4225 } 4226 4227 impl QualifiedThreadId { 4228 fn qualify( 4229 state: &mut ConcurrentState, 4230 thread: TableId<GuestThread>, 4231 ) -> Result<QualifiedThreadId> { 4232 Ok(QualifiedThreadId { 4233 task: state.get_mut(thread)?.parent_task, 4234 thread, 4235 }) 4236 } 4237 } 4238 4239 impl fmt::Debug for QualifiedThreadId { 4240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 4241 f.debug_tuple("QualifiedThreadId") 4242 .field(&self.task.rep()) 4243 .field(&self.thread.rep()) 4244 .finish() 4245 } 4246 } 4247 4248 enum GuestThreadState { 4249 NotStartedImplicit, 4250 NotStartedExplicit( 4251 Box<dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync>, 4252 ), 4253 Running, 4254 Suspended(StoreFiber<'static>), 4255 Ready(StoreFiber<'static>), 4256 Completed, 4257 } 4258 pub struct GuestThread { 4259 /// Context-local state used to implement the `context.{get,set}` 4260 /// intrinsics. 4261 context: [u32; 2], 4262 /// The owning guest task. 4263 parent_task: TableId<GuestTask>, 4264 /// If present, indicates that the thread is currently waiting on the 4265 /// specified set but may be cancelled and woken immediately. 4266 wake_on_cancel: Option<TableId<WaitableSet>>, 4267 /// The execution state of this guest thread 4268 state: GuestThreadState, 4269 /// The index of this thread in the component instance's handle table. 4270 /// This must always be `Some` after initialization. 4271 instance_rep: Option<u32>, 4272 } 4273 4274 impl GuestThread { 4275 /// Retrieve the `GuestThread` corresponding to the specified guest-visible 4276 /// handle. 4277 fn from_instance( 4278 state: Pin<&mut ComponentInstance>, 4279 caller_instance: RuntimeComponentInstanceIndex, 4280 guest_thread: u32, 4281 ) -> Result<TableId<Self>> { 4282 let rep = state.instance_states().0[caller_instance] 4283 .thread_handle_table() 4284 .guest_thread_rep(guest_thread)?; 4285 Ok(TableId::new(rep)) 4286 } 4287 4288 fn new_implicit(parent_task: TableId<GuestTask>) -> Self { 4289 Self { 4290 context: [0; 2], 4291 parent_task, 4292 wake_on_cancel: None, 4293 state: GuestThreadState::NotStartedImplicit, 4294 instance_rep: None, 4295 } 4296 } 4297 4298 fn new_explicit( 4299 parent_task: TableId<GuestTask>, 4300 start_func: Box< 4301 dyn FnOnce(&mut dyn VMStore, QualifiedThreadId) -> Result<()> + Send + Sync, 4302 >, 4303 ) -> Self { 4304 Self { 4305 context: [0; 2], 4306 parent_task, 4307 wake_on_cancel: None, 4308 state: GuestThreadState::NotStartedExplicit(start_func), 4309 instance_rep: None, 4310 } 4311 } 4312 } 4313 4314 impl TableDebug for GuestThread { 4315 fn type_name() -> &'static str { 4316 "GuestThread" 4317 } 4318 } 4319 4320 enum SyncResult { 4321 NotProduced, 4322 Produced(Option<ValRaw>), 4323 Taken, 4324 } 4325 4326 impl SyncResult { 4327 fn take(&mut self) -> Option<Option<ValRaw>> { 4328 match mem::replace(self, SyncResult::Taken) { 4329 SyncResult::NotProduced => None, 4330 SyncResult::Produced(val) => Some(val), 4331 SyncResult::Taken => { 4332 panic!("attempted to take a synchronous result that was already taken") 4333 } 4334 } 4335 } 4336 } 4337 4338 #[derive(Debug)] 4339 enum HostFutureState { 4340 NotApplicable, 4341 Live, 4342 Dropped, 4343 } 4344 4345 /// Represents a pending guest task. 4346 pub(crate) struct GuestTask { 4347 /// See `WaitableCommon` 4348 common: WaitableCommon, 4349 /// Closure to lower the parameters passed to this task. 4350 lower_params: Option<RawLower>, 4351 /// See `LiftResult` 4352 lift_result: Option<LiftResult>, 4353 /// A place to stash the type-erased lifted result if it can't be delivered 4354 /// immediately. 4355 result: Option<LiftedResult>, 4356 /// Closure to call the callback function for an async-lifted export, if 4357 /// provided. 4358 callback: Option<CallbackFn>, 4359 /// See `Caller` 4360 caller: Caller, 4361 /// Borrow state for this task. 4362 /// 4363 /// Keeps track of `borrow<T>` received to this task to ensure that 4364 /// everything is dropped by the time it exits. 4365 call_context: CallContext, 4366 /// A place to stash the lowered result for a sync-to-async call until it 4367 /// can be returned to the caller. 4368 sync_result: SyncResult, 4369 /// Whether or not the task has been cancelled (i.e. whether the task is 4370 /// permitted to call `task.cancel`). 4371 cancel_sent: bool, 4372 /// Whether or not we've sent a `Status::Starting` event to any current or 4373 /// future waiters for this waitable. 4374 starting_sent: bool, 4375 /// Pending guest subtasks created by this task (directly or indirectly). 4376 /// 4377 /// This is used to re-parent subtasks which are still running when their 4378 /// parent task is disposed. 4379 subtasks: HashSet<TableId<GuestTask>>, 4380 /// Scratch waitable set used to watch subtasks during synchronous calls. 4381 sync_call_set: TableId<WaitableSet>, 4382 /// The runtime instance to which the exported function for this guest task 4383 /// belongs. 4384 /// 4385 /// Note that the task may do a sync->sync call via a fused adapter which 4386 /// results in that task executing code in a different instance, and it may 4387 /// call host functions and intrinsics from that other instance. 4388 instance: RuntimeInstance, 4389 /// If present, a pending `Event::None` or `Event::Cancelled` to be 4390 /// delivered to this task. 4391 event: Option<Event>, 4392 /// Whether or not the task has exited. 4393 exited: bool, 4394 /// Threads belonging to this task 4395 threads: HashSet<TableId<GuestThread>>, 4396 /// The state of the host future that represents an async task, which must 4397 /// be dropped before we can delete the task. 4398 host_future_state: HostFutureState, 4399 /// Indicates whether this task was created for a call to an async-lifted 4400 /// export. 4401 async_function: bool, 4402 } 4403 4404 impl GuestTask { 4405 fn already_lowered_parameters(&self) -> bool { 4406 // We reset `lower_params` after we lower the parameters 4407 self.lower_params.is_none() 4408 } 4409 4410 fn returned_or_cancelled(&self) -> bool { 4411 // We reset `lift_result` after we return or exit 4412 self.lift_result.is_none() 4413 } 4414 4415 fn ready_to_delete(&self) -> bool { 4416 let threads_completed = self.threads.is_empty(); 4417 let has_sync_result = matches!(self.sync_result, SyncResult::Produced(_)); 4418 let pending_completion_event = matches!( 4419 self.common.event, 4420 Some(Event::Subtask { 4421 status: Status::Returned | Status::ReturnCancelled 4422 }) 4423 ); 4424 let ready = threads_completed 4425 && !has_sync_result 4426 && !pending_completion_event 4427 && !matches!(self.host_future_state, HostFutureState::Live); 4428 log::trace!( 4429 "ready to delete? {ready} (threads_completed: {}, has_sync_result: {}, pending_completion_event: {}, host_future_state: {:?})", 4430 threads_completed, 4431 has_sync_result, 4432 pending_completion_event, 4433 self.host_future_state 4434 ); 4435 ready 4436 } 4437 4438 fn new( 4439 state: &mut ConcurrentState, 4440 lower_params: RawLower, 4441 lift_result: LiftResult, 4442 caller: Caller, 4443 callback: Option<CallbackFn>, 4444 instance: RuntimeInstance, 4445 async_function: bool, 4446 ) -> Result<Self> { 4447 let sync_call_set = state.push(WaitableSet::default())?; 4448 let host_future_state = match &caller { 4449 Caller::Guest { .. } => HostFutureState::NotApplicable, 4450 Caller::Host { 4451 host_future_present, 4452 .. 4453 } => { 4454 if *host_future_present { 4455 HostFutureState::Live 4456 } else { 4457 HostFutureState::NotApplicable 4458 } 4459 } 4460 }; 4461 Ok(Self { 4462 common: WaitableCommon::default(), 4463 lower_params: Some(lower_params), 4464 lift_result: Some(lift_result), 4465 result: None, 4466 callback, 4467 caller, 4468 call_context: CallContext::default(), 4469 sync_result: SyncResult::NotProduced, 4470 cancel_sent: false, 4471 starting_sent: false, 4472 subtasks: HashSet::new(), 4473 sync_call_set, 4474 instance, 4475 event: None, 4476 exited: false, 4477 threads: HashSet::new(), 4478 host_future_state, 4479 async_function, 4480 }) 4481 } 4482 4483 /// Dispose of this guest task, reparenting any pending subtasks to the 4484 /// caller. 4485 fn dispose(self, state: &mut ConcurrentState, me: TableId<GuestTask>) -> Result<()> { 4486 // If there are not-yet-delivered completion events for subtasks in 4487 // `self.sync_call_set`, recursively dispose of those subtasks as well. 4488 for waitable in mem::take(&mut state.get_mut(self.sync_call_set)?.ready) { 4489 if let Some(Event::Subtask { 4490 status: Status::Returned | Status::ReturnCancelled, 4491 }) = waitable.common(state)?.event 4492 { 4493 waitable.delete_from(state)?; 4494 } 4495 } 4496 4497 assert!(self.threads.is_empty()); 4498 4499 state.delete(self.sync_call_set)?; 4500 4501 // Reparent any pending subtasks to the caller. 4502 match &self.caller { 4503 Caller::Guest { thread } => { 4504 let task_mut = state.get_mut(thread.task)?; 4505 let present = task_mut.subtasks.remove(&me); 4506 assert!(present); 4507 4508 for subtask in &self.subtasks { 4509 task_mut.subtasks.insert(*subtask); 4510 } 4511 4512 for subtask in &self.subtasks { 4513 state.get_mut(*subtask)?.caller = Caller::Guest { thread: *thread }; 4514 } 4515 } 4516 Caller::Host { 4517 exit_tx, caller, .. 4518 } => { 4519 for subtask in &self.subtasks { 4520 state.get_mut(*subtask)?.caller = Caller::Host { 4521 tx: None, 4522 // Clone `exit_tx` to ensure that it is only dropped 4523 // once all transitive subtasks of the host call have 4524 // exited: 4525 exit_tx: exit_tx.clone(), 4526 host_future_present: false, 4527 caller: *caller, 4528 }; 4529 } 4530 } 4531 } 4532 4533 for subtask in self.subtasks { 4534 let task = state.get_mut(subtask)?; 4535 if task.exited && task.ready_to_delete() { 4536 Waitable::Guest(subtask).delete_from(state)?; 4537 } 4538 } 4539 4540 Ok(()) 4541 } 4542 } 4543 4544 impl TableDebug for GuestTask { 4545 fn type_name() -> &'static str { 4546 "GuestTask" 4547 } 4548 } 4549 4550 /// Represents state common to all kinds of waitables. 4551 #[derive(Default)] 4552 struct WaitableCommon { 4553 /// The currently pending event for this waitable, if any. 4554 event: Option<Event>, 4555 /// The set to which this waitable belongs, if any. 4556 set: Option<TableId<WaitableSet>>, 4557 /// The handle with which the guest refers to this waitable, if any. 4558 handle: Option<u32>, 4559 } 4560 4561 /// Represents a Component Model Async `waitable`. 4562 #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] 4563 enum Waitable { 4564 /// A host task 4565 Host(TableId<HostTask>), 4566 /// A guest task 4567 Guest(TableId<GuestTask>), 4568 /// The read or write end of a stream or future 4569 Transmit(TableId<TransmitHandle>), 4570 } 4571 4572 impl Waitable { 4573 /// Retrieve the `Waitable` corresponding to the specified guest-visible 4574 /// handle. 4575 fn from_instance( 4576 state: Pin<&mut ComponentInstance>, 4577 caller_instance: RuntimeComponentInstanceIndex, 4578 waitable: u32, 4579 ) -> Result<Self> { 4580 use crate::runtime::vm::component::Waitable; 4581 4582 let (waitable, kind) = state.instance_states().0[caller_instance] 4583 .handle_table() 4584 .waitable_rep(waitable)?; 4585 4586 Ok(match kind { 4587 Waitable::Subtask { is_host: true } => Self::Host(TableId::new(waitable)), 4588 Waitable::Subtask { is_host: false } => Self::Guest(TableId::new(waitable)), 4589 Waitable::Stream | Waitable::Future => Self::Transmit(TableId::new(waitable)), 4590 }) 4591 } 4592 4593 /// Retrieve the host-visible identifier for this `Waitable`. 4594 fn rep(&self) -> u32 { 4595 match self { 4596 Self::Host(id) => id.rep(), 4597 Self::Guest(id) => id.rep(), 4598 Self::Transmit(id) => id.rep(), 4599 } 4600 } 4601 4602 /// Move this `Waitable` to the specified set (when `set` is `Some(_)`) or 4603 /// remove it from any set it may currently belong to (when `set` is 4604 /// `None`). 4605 fn join(&self, state: &mut ConcurrentState, set: Option<TableId<WaitableSet>>) -> Result<()> { 4606 log::trace!("waitable {self:?} join set {set:?}",); 4607 4608 let old = mem::replace(&mut self.common(state)?.set, set); 4609 4610 if let Some(old) = old { 4611 match *self { 4612 Waitable::Host(id) => state.remove_child(id, old), 4613 Waitable::Guest(id) => state.remove_child(id, old), 4614 Waitable::Transmit(id) => state.remove_child(id, old), 4615 }?; 4616 4617 state.get_mut(old)?.ready.remove(self); 4618 } 4619 4620 if let Some(set) = set { 4621 match *self { 4622 Waitable::Host(id) => state.add_child(id, set), 4623 Waitable::Guest(id) => state.add_child(id, set), 4624 Waitable::Transmit(id) => state.add_child(id, set), 4625 }?; 4626 4627 if self.common(state)?.event.is_some() { 4628 self.mark_ready(state)?; 4629 } 4630 } 4631 4632 Ok(()) 4633 } 4634 4635 /// Retrieve mutable access to the `WaitableCommon` for this `Waitable`. 4636 fn common<'a>(&self, state: &'a mut ConcurrentState) -> Result<&'a mut WaitableCommon> { 4637 Ok(match self { 4638 Self::Host(id) => &mut state.get_mut(*id)?.common, 4639 Self::Guest(id) => &mut state.get_mut(*id)?.common, 4640 Self::Transmit(id) => &mut state.get_mut(*id)?.common, 4641 }) 4642 } 4643 4644 /// Set or clear the pending event for this waitable and either deliver it 4645 /// to the first waiter, if any, or mark it as ready to be delivered to the 4646 /// next waiter that arrives. 4647 fn set_event(&self, state: &mut ConcurrentState, event: Option<Event>) -> Result<()> { 4648 log::trace!("set event for {self:?}: {event:?}"); 4649 self.common(state)?.event = event; 4650 self.mark_ready(state) 4651 } 4652 4653 /// Take the pending event from this waitable, leaving `None` in its place. 4654 fn take_event(&self, state: &mut ConcurrentState) -> Result<Option<Event>> { 4655 let common = self.common(state)?; 4656 let event = common.event.take(); 4657 if let Some(set) = self.common(state)?.set { 4658 state.get_mut(set)?.ready.remove(self); 4659 } 4660 4661 Ok(event) 4662 } 4663 4664 /// Deliver the current event for this waitable to the first waiter, if any, 4665 /// or else mark it as ready to be delivered to the next waiter that 4666 /// arrives. 4667 fn mark_ready(&self, state: &mut ConcurrentState) -> Result<()> { 4668 if let Some(set) = self.common(state)?.set { 4669 state.get_mut(set)?.ready.insert(*self); 4670 if let Some((thread, mode)) = state.get_mut(set)?.waiting.pop_first() { 4671 let wake_on_cancel = state.get_mut(thread.thread)?.wake_on_cancel.take(); 4672 assert!(wake_on_cancel.is_none() || wake_on_cancel == Some(set)); 4673 4674 let item = match mode { 4675 WaitMode::Fiber(fiber) => WorkItem::ResumeFiber(fiber), 4676 WaitMode::Callback(instance) => WorkItem::GuestCall( 4677 state.get_mut(thread.task)?.instance.index, 4678 GuestCall { 4679 thread, 4680 kind: GuestCallKind::DeliverEvent { 4681 instance, 4682 set: Some(set), 4683 }, 4684 }, 4685 ), 4686 }; 4687 state.push_high_priority(item); 4688 } 4689 } 4690 Ok(()) 4691 } 4692 4693 /// Remove this waitable from the instance's rep table. 4694 fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> { 4695 match self { 4696 Self::Host(task) => { 4697 log::trace!("delete host task {task:?}"); 4698 state.delete(*task)?; 4699 } 4700 Self::Guest(task) => { 4701 log::trace!("delete guest task {task:?}"); 4702 state.delete(*task)?.dispose(state, *task)?; 4703 } 4704 Self::Transmit(task) => { 4705 state.delete(*task)?; 4706 } 4707 } 4708 4709 Ok(()) 4710 } 4711 } 4712 4713 impl fmt::Debug for Waitable { 4714 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 4715 match self { 4716 Self::Host(id) => write!(f, "{id:?}"), 4717 Self::Guest(id) => write!(f, "{id:?}"), 4718 Self::Transmit(id) => write!(f, "{id:?}"), 4719 } 4720 } 4721 } 4722 4723 /// Represents a Component Model Async `waitable-set`. 4724 #[derive(Default)] 4725 struct WaitableSet { 4726 /// Which waitables in this set have pending events, if any. 4727 ready: BTreeSet<Waitable>, 4728 /// Which guest threads are currently waiting on this set, if any. 4729 waiting: BTreeMap<QualifiedThreadId, WaitMode>, 4730 } 4731 4732 impl TableDebug for WaitableSet { 4733 fn type_name() -> &'static str { 4734 "WaitableSet" 4735 } 4736 } 4737 4738 /// Type-erased closure to lower the parameters for a guest task. 4739 type RawLower = 4740 Box<dyn FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync>; 4741 4742 /// Type-erased closure to lift the result for a guest task. 4743 type RawLift = Box< 4744 dyn FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 4745 >; 4746 4747 /// Type erased result of a guest task which may be downcast to the expected 4748 /// type by a host caller (or simply ignored in the case of a guest caller; see 4749 /// `DummyResult`). 4750 type LiftedResult = Box<dyn Any + Send + Sync>; 4751 4752 /// Used to return a result from a `LiftFn` when the actual result has already 4753 /// been lowered to a guest task's stack and linear memory. 4754 struct DummyResult; 4755 4756 /// Represents the Component Model Async state of a (sub-)component instance. 4757 #[derive(Default)] 4758 pub struct ConcurrentInstanceState { 4759 /// Whether backpressure is set for this instance (enabled if >0) 4760 backpressure: u16, 4761 /// Whether this instance can be entered 4762 do_not_enter: bool, 4763 /// Pending calls for this instance which require `Self::backpressure` to be 4764 /// `true` and/or `Self::do_not_enter` to be false before they can proceed. 4765 pending: BTreeMap<QualifiedThreadId, GuestCallKind>, 4766 } 4767 4768 impl ConcurrentInstanceState { 4769 pub fn pending_is_empty(&self) -> bool { 4770 self.pending.is_empty() 4771 } 4772 } 4773 4774 #[derive(Debug, Copy, Clone)] 4775 enum CurrentThread { 4776 Guest(QualifiedThreadId), 4777 Host(TableId<HostTask>), 4778 None, 4779 } 4780 4781 impl CurrentThread { 4782 fn guest(&self) -> Option<&QualifiedThreadId> { 4783 match self { 4784 Self::Guest(id) => Some(id), 4785 _ => None, 4786 } 4787 } 4788 4789 fn host(&self) -> Option<TableId<HostTask>> { 4790 match self { 4791 Self::Host(id) => Some(*id), 4792 _ => None, 4793 } 4794 } 4795 4796 fn is_none(&self) -> bool { 4797 matches!(self, Self::None) 4798 } 4799 } 4800 4801 impl From<QualifiedThreadId> for CurrentThread { 4802 fn from(id: QualifiedThreadId) -> Self { 4803 Self::Guest(id) 4804 } 4805 } 4806 4807 impl From<TableId<HostTask>> for CurrentThread { 4808 fn from(id: TableId<HostTask>) -> Self { 4809 Self::Host(id) 4810 } 4811 } 4812 4813 /// Represents the Component Model Async state of a store. 4814 pub struct ConcurrentState { 4815 /// The currently running thread, if any. 4816 current_thread: CurrentThread, 4817 4818 /// The set of pending host and background tasks, if any. 4819 /// 4820 /// See `ComponentInstance::poll_until` for where we temporarily take this 4821 /// out, poll it, then put it back to avoid any mutable aliasing hazards. 4822 futures: AlwaysMut<Option<FuturesUnordered<HostTaskFuture>>>, 4823 /// The table of waitables, waitable sets, etc. 4824 table: AlwaysMut<ResourceTable>, 4825 /// The "high priority" work queue for this store's event loop. 4826 high_priority: Vec<WorkItem>, 4827 /// The "low priority" work queue for this store's event loop. 4828 low_priority: VecDeque<WorkItem>, 4829 /// A place to stash the reason a fiber is suspending so that the code which 4830 /// resumed it will know under what conditions the fiber should be resumed 4831 /// again. 4832 suspend_reason: Option<SuspendReason>, 4833 /// A cached fiber which is waiting for work to do. 4834 /// 4835 /// This helps us avoid creating a new fiber for each `GuestCall` work item. 4836 worker: Option<StoreFiber<'static>>, 4837 /// A place to stash the work item for which we're resuming a worker fiber. 4838 worker_item: Option<WorkerItem>, 4839 4840 /// Reference counts for all component error contexts 4841 /// 4842 /// NOTE: it is possible the global ref count to be *greater* than the sum of 4843 /// (sub)component ref counts as tracked by `error_context_tables`, for 4844 /// example when the host holds one or more references to error contexts. 4845 /// 4846 /// The key of this primary map is often referred to as the "rep" (i.e. host-side 4847 /// component-wide representation) of the index into concurrent state for a given 4848 /// stored `ErrorContext`. 4849 /// 4850 /// Stated another way, `TypeComponentGlobalErrorContextTableIndex` is essentially the same 4851 /// as a `TableId<ErrorContextState>`. 4852 global_error_context_ref_counts: 4853 BTreeMap<TypeComponentGlobalErrorContextTableIndex, GlobalErrorContextRefCount>, 4854 } 4855 4856 impl Default for ConcurrentState { 4857 fn default() -> Self { 4858 Self { 4859 current_thread: CurrentThread::None, 4860 table: AlwaysMut::new(ResourceTable::new()), 4861 futures: AlwaysMut::new(Some(FuturesUnordered::new())), 4862 high_priority: Vec::new(), 4863 low_priority: VecDeque::new(), 4864 suspend_reason: None, 4865 worker: None, 4866 worker_item: None, 4867 global_error_context_ref_counts: BTreeMap::new(), 4868 } 4869 } 4870 } 4871 4872 impl ConcurrentState { 4873 /// Take ownership of any fibers and futures owned by this object. 4874 /// 4875 /// This should be used when disposing of the `Store` containing this object 4876 /// in order to gracefully resolve any and all fibers using 4877 /// `StoreFiber::dispose`. This is necessary to avoid possible 4878 /// use-after-free bugs due to fibers which may still have access to the 4879 /// `Store`. 4880 /// 4881 /// Additionally, the futures collected with this function should be dropped 4882 /// within a `tls::set` call, which will ensure than any futures closing 4883 /// over an `&Accessor` will have access to the store when dropped, allowing 4884 /// e.g. `WithAccessor[AndValue]` instances to be disposed of without 4885 /// panicking. 4886 /// 4887 /// Note that this will leave the object in an inconsistent and unusable 4888 /// state, so it should only be used just prior to dropping it. 4889 pub(crate) fn take_fibers_and_futures( 4890 &mut self, 4891 fibers: &mut Vec<StoreFiber<'static>>, 4892 futures: &mut Vec<FuturesUnordered<HostTaskFuture>>, 4893 ) { 4894 for entry in self.table.get_mut().iter_mut() { 4895 if let Some(set) = entry.downcast_mut::<WaitableSet>() { 4896 for mode in mem::take(&mut set.waiting).into_values() { 4897 if let WaitMode::Fiber(fiber) = mode { 4898 fibers.push(fiber); 4899 } 4900 } 4901 } else if let Some(thread) = entry.downcast_mut::<GuestThread>() { 4902 if let GuestThreadState::Suspended(fiber) | GuestThreadState::Ready(fiber) = 4903 mem::replace(&mut thread.state, GuestThreadState::Completed) 4904 { 4905 fibers.push(fiber); 4906 } 4907 } 4908 } 4909 4910 if let Some(fiber) = self.worker.take() { 4911 fibers.push(fiber); 4912 } 4913 4914 let mut handle_item = |item| match item { 4915 WorkItem::ResumeFiber(fiber) => { 4916 fibers.push(fiber); 4917 } 4918 WorkItem::PushFuture(future) => { 4919 self.futures 4920 .get_mut() 4921 .as_mut() 4922 .unwrap() 4923 .push(future.into_inner()); 4924 } 4925 _ => {} 4926 }; 4927 4928 for item in mem::take(&mut self.high_priority) { 4929 handle_item(item); 4930 } 4931 for item in mem::take(&mut self.low_priority) { 4932 handle_item(item); 4933 } 4934 4935 if let Some(them) = self.futures.get_mut().take() { 4936 futures.push(them); 4937 } 4938 } 4939 4940 /// Collect the next set of work items to run. This will be either all 4941 /// high-priority items, or a single low-priority item if there are no 4942 /// high-priority items. 4943 fn collect_work_items_to_run(&mut self) -> Vec<WorkItem> { 4944 let mut ready = mem::take(&mut self.high_priority); 4945 if ready.is_empty() { 4946 if let Some(item) = self.low_priority.pop_back() { 4947 ready.push(item); 4948 } 4949 } 4950 ready 4951 } 4952 4953 fn push<V: Send + Sync + 'static>( 4954 &mut self, 4955 value: V, 4956 ) -> Result<TableId<V>, ResourceTableError> { 4957 self.table.get_mut().push(value).map(TableId::from) 4958 } 4959 4960 fn get_mut<V: 'static>(&mut self, id: TableId<V>) -> Result<&mut V, ResourceTableError> { 4961 self.table.get_mut().get_mut(&Resource::from(id)) 4962 } 4963 4964 pub fn add_child<T: 'static, U: 'static>( 4965 &mut self, 4966 child: TableId<T>, 4967 parent: TableId<U>, 4968 ) -> Result<(), ResourceTableError> { 4969 self.table 4970 .get_mut() 4971 .add_child(Resource::from(child), Resource::from(parent)) 4972 } 4973 4974 pub fn remove_child<T: 'static, U: 'static>( 4975 &mut self, 4976 child: TableId<T>, 4977 parent: TableId<U>, 4978 ) -> Result<(), ResourceTableError> { 4979 self.table 4980 .get_mut() 4981 .remove_child(Resource::from(child), Resource::from(parent)) 4982 } 4983 4984 fn delete<V: 'static>(&mut self, id: TableId<V>) -> Result<V, ResourceTableError> { 4985 self.table.get_mut().delete(Resource::from(id)) 4986 } 4987 4988 fn push_future(&mut self, future: HostTaskFuture) { 4989 // Note that we can't directly push to `ConcurrentState::futures` here 4990 // since this may be called from a future that's being polled inside 4991 // `Self::poll_until`, which temporarily removes the `FuturesUnordered` 4992 // so it has exclusive access while polling it. Therefore, we push a 4993 // work item to the "high priority" queue, which will actually push to 4994 // `ConcurrentState::futures` later. 4995 self.push_high_priority(WorkItem::PushFuture(AlwaysMut::new(future))); 4996 } 4997 4998 fn push_high_priority(&mut self, item: WorkItem) { 4999 log::trace!("push high priority: {item:?}"); 5000 self.high_priority.push(item); 5001 } 5002 5003 fn push_low_priority(&mut self, item: WorkItem) { 5004 log::trace!("push low priority: {item:?}"); 5005 self.low_priority.push_front(item); 5006 } 5007 5008 fn push_work_item(&mut self, item: WorkItem, high_priority: bool) { 5009 if high_priority { 5010 self.push_high_priority(item); 5011 } else { 5012 self.push_low_priority(item); 5013 } 5014 } 5015 5016 fn promote_instance_local_thread_work_item( 5017 &mut self, 5018 current_instance: RuntimeComponentInstanceIndex, 5019 ) -> bool { 5020 self.promote_work_items_matching(|item: &WorkItem| match item { 5021 WorkItem::ResumeThread(instance, _) | WorkItem::GuestCall(instance, _) => { 5022 *instance == current_instance 5023 } 5024 _ => false, 5025 }) 5026 } 5027 5028 fn promote_thread_work_item(&mut self, thread: QualifiedThreadId) -> bool { 5029 self.promote_work_items_matching(|item: &WorkItem| match item { 5030 WorkItem::ResumeThread(_, t) | WorkItem::GuestCall(_, GuestCall { thread: t, .. }) => { 5031 *t == thread 5032 } 5033 _ => false, 5034 }) 5035 } 5036 5037 fn promote_work_items_matching<F>(&mut self, mut predicate: F) -> bool 5038 where 5039 F: FnMut(&WorkItem) -> bool, 5040 { 5041 // If there's a high-priority work item to resume the current guest thread, 5042 // we don't need to promote anything, but we return true to indicate that 5043 // work is pending for the current instance. 5044 if self.high_priority.iter().any(&mut predicate) { 5045 true 5046 } 5047 // Otherwise, look for a low-priority work item that matches the current 5048 // instance and promote it to high-priority. 5049 else if let Some(idx) = self.low_priority.iter().position(&mut predicate) { 5050 let item = self.low_priority.remove(idx).unwrap(); 5051 self.push_high_priority(item); 5052 true 5053 } else { 5054 false 5055 } 5056 } 5057 5058 /// Implements the `context.get` intrinsic. 5059 pub(crate) fn context_get(&mut self, slot: u32) -> Result<u32> { 5060 let thread = self.unwrap_current_guest_thread(); 5061 let val = self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()]; 5062 log::trace!("context_get {thread:?} slot {slot} val {val:#x}"); 5063 Ok(val) 5064 } 5065 5066 /// Implements the `context.set` intrinsic. 5067 pub(crate) fn context_set(&mut self, slot: u32, val: u32) -> Result<()> { 5068 let thread = self.unwrap_current_guest_thread(); 5069 log::trace!("context_set {thread:?} slot {slot} val {val:#x}"); 5070 self.get_mut(thread.thread)?.context[usize::try_from(slot).unwrap()] = val; 5071 Ok(()) 5072 } 5073 5074 /// Returns whether there's a pending cancellation on the current guest thread, 5075 /// consuming the event if so. 5076 fn take_pending_cancellation(&mut self) -> bool { 5077 let thread = self.unwrap_current_guest_thread(); 5078 if let Some(event) = self.get_mut(thread.task).unwrap().event.take() { 5079 assert!(matches!(event, Event::Cancelled)); 5080 true 5081 } else { 5082 false 5083 } 5084 } 5085 5086 fn check_blocking_for(&mut self, task: TableId<GuestTask>) -> Result<()> { 5087 if self.may_block(task) { 5088 Ok(()) 5089 } else { 5090 Err(Trap::CannotBlockSyncTask.into()) 5091 } 5092 } 5093 5094 fn may_block(&mut self, task: TableId<GuestTask>) -> bool { 5095 let task = self.get_mut(task).unwrap(); 5096 task.async_function || task.returned_or_cancelled() 5097 } 5098 5099 /// Used by `ResourceTables` to acquire the current `CallContext` for the 5100 /// specified task. 5101 /// 5102 /// The `task` is bit-packed as returned by `current_call_context_scope_id` 5103 /// below. 5104 pub fn call_context(&mut self, task: u32) -> &mut CallContext { 5105 let (task, is_host) = (task >> 1, task & 1 == 1); 5106 if is_host { 5107 let task: TableId<HostTask> = TableId::new(task); 5108 &mut self.get_mut(task).unwrap().call_context 5109 } else { 5110 let task: TableId<GuestTask> = TableId::new(task); 5111 &mut self.get_mut(task).unwrap().call_context 5112 } 5113 } 5114 5115 /// Used by `ResourceTables` to record the scope of a borrow to get undone 5116 /// in the future. 5117 pub fn current_call_context_scope_id(&self) -> u32 { 5118 let (bits, is_host) = match self.current_thread { 5119 CurrentThread::Guest(id) => (id.task.rep(), false), 5120 CurrentThread::Host(id) => (id.rep(), true), 5121 CurrentThread::None => unreachable!(), 5122 }; 5123 assert_eq!((bits << 1) >> 1, bits); 5124 (bits << 1) | u32::from(is_host) 5125 } 5126 5127 fn unwrap_current_guest_thread(&self) -> QualifiedThreadId { 5128 *self.current_thread.guest().unwrap() 5129 } 5130 5131 fn unwrap_current_host_thread(&self) -> TableId<HostTask> { 5132 self.current_thread.host().unwrap() 5133 } 5134 } 5135 5136 /// Provide a type hint to compiler about the shape of a parameter lower 5137 /// closure. 5138 fn for_any_lower< 5139 F: FnOnce(&mut dyn VMStore, &mut [MaybeUninit<ValRaw>]) -> Result<()> + Send + Sync, 5140 >( 5141 fun: F, 5142 ) -> F { 5143 fun 5144 } 5145 5146 /// Provide a type hint to compiler about the shape of a result lift closure. 5147 fn for_any_lift< 5148 F: FnOnce(&mut dyn VMStore, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> + Send + Sync, 5149 >( 5150 fun: F, 5151 ) -> F { 5152 fun 5153 } 5154 5155 /// Wrap the specified future in a `poll_fn` which asserts that the future is 5156 /// only polled from the event loop of the specified `Store`. 5157 /// 5158 /// See `StoreContextMut::run_concurrent` for details. 5159 fn checked<F: Future + Send + 'static>( 5160 id: StoreId, 5161 fut: F, 5162 ) -> impl Future<Output = F::Output> + Send + 'static { 5163 async move { 5164 let mut fut = pin!(fut); 5165 future::poll_fn(move |cx| { 5166 let message = "\ 5167 `Future`s which depend on asynchronous component tasks, streams, or \ 5168 futures to complete may only be polled from the event loop of the \ 5169 store to which they belong. Please use \ 5170 `StoreContextMut::{run_concurrent,spawn}` to poll or await them.\ 5171 "; 5172 tls::try_get(|store| { 5173 let matched = match store { 5174 tls::TryGet::Some(store) => store.id() == id, 5175 tls::TryGet::Taken | tls::TryGet::None => false, 5176 }; 5177 5178 if !matched { 5179 panic!("{message}") 5180 } 5181 }); 5182 fut.as_mut().poll(cx) 5183 }) 5184 .await 5185 } 5186 } 5187 5188 /// Assert that `StoreContextMut::run_concurrent` has not been called from 5189 /// within an store's event loop. 5190 fn check_recursive_run() { 5191 tls::try_get(|store| { 5192 if !matches!(store, tls::TryGet::None) { 5193 panic!("Recursive `StoreContextMut::run_concurrent` calls not supported") 5194 } 5195 }); 5196 } 5197 5198 fn unpack_callback_code(code: u32) -> (u32, u32) { 5199 (code & 0xF, code >> 4) 5200 } 5201 5202 /// Helper struct for packaging parameters to be passed to 5203 /// `ComponentInstance::waitable_check` for calls to `waitable-set.wait` or 5204 /// `waitable-set.poll`. 5205 struct WaitableCheckParams { 5206 set: TableId<WaitableSet>, 5207 options: OptionsIndex, 5208 payload: u32, 5209 } 5210 5211 /// Indicates whether `ComponentInstance::waitable_check` is being called for 5212 /// `waitable-set.wait` or `waitable-set.poll`. 5213 enum WaitableCheck { 5214 Wait, 5215 Poll, 5216 } 5217 5218 /// Represents a guest task called from the host, prepared using `prepare_call`. 5219 pub(crate) struct PreparedCall<R> { 5220 /// The guest export to be called 5221 handle: Func, 5222 /// The guest thread created by `prepare_call` 5223 thread: QualifiedThreadId, 5224 /// The number of lowered core Wasm parameters to pass to the call. 5225 param_count: usize, 5226 /// The `oneshot::Receiver` to which the result of the call will be 5227 /// delivered when it is available. 5228 rx: oneshot::Receiver<LiftedResult>, 5229 /// The `oneshot::Receiver` which will resolve when the task -- and any 5230 /// transitive subtasks -- have all exited. 5231 exit_rx: oneshot::Receiver<()>, 5232 _phantom: PhantomData<R>, 5233 } 5234 5235 impl<R> PreparedCall<R> { 5236 /// Get a copy of the `TaskId` for this `PreparedCall`. 5237 pub(crate) fn task_id(&self) -> TaskId { 5238 TaskId { 5239 task: self.thread.task, 5240 } 5241 } 5242 } 5243 5244 /// Represents a task created by `prepare_call`. 5245 pub(crate) struct TaskId { 5246 task: TableId<GuestTask>, 5247 } 5248 5249 impl TaskId { 5250 /// The host future for an async task was dropped. If the parameters have not been lowered yet, 5251 /// it is no longer valid to do so, as the lowering closure would see a dangling pointer. In this case, 5252 /// we delete the task eagerly. Otherwise, there may be running threads, or ones that are suspended 5253 /// and can be resumed by other tasks for this component, so we mark the future as dropped 5254 /// and delete the task when all threads are done. 5255 pub(crate) fn host_future_dropped<T>(&self, store: StoreContextMut<T>) -> Result<()> { 5256 let task = store.0.concurrent_state_mut().get_mut(self.task)?; 5257 if !task.already_lowered_parameters() { 5258 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5259 } else { 5260 task.host_future_state = HostFutureState::Dropped; 5261 if task.ready_to_delete() { 5262 Waitable::Guest(self.task).delete_from(store.0.concurrent_state_mut())? 5263 } 5264 } 5265 Ok(()) 5266 } 5267 } 5268 5269 /// Prepare a call to the specified exported Wasm function, providing functions 5270 /// for lowering the parameters and lifting the result. 5271 /// 5272 /// To enqueue the returned `PreparedCall` in the `ComponentInstance`'s event 5273 /// loop, use `queue_call`. 5274 pub(crate) fn prepare_call<T, R>( 5275 mut store: StoreContextMut<T>, 5276 handle: Func, 5277 param_count: usize, 5278 host_future_present: bool, 5279 lower_params: impl FnOnce(Func, StoreContextMut<T>, &mut [MaybeUninit<ValRaw>]) -> Result<()> 5280 + Send 5281 + Sync 5282 + 'static, 5283 lift_result: impl FnOnce(Func, &mut StoreOpaque, &[ValRaw]) -> Result<Box<dyn Any + Send + Sync>> 5284 + Send 5285 + Sync 5286 + 'static, 5287 ) -> Result<PreparedCall<R>> { 5288 let (options, _flags, ty, raw_options) = handle.abi_info(store.0); 5289 5290 let instance = handle.instance().id().get(store.0); 5291 let options = &instance.component().env_component().options[options]; 5292 let ty = &instance.component().types()[ty]; 5293 let async_function = ty.async_; 5294 let task_return_type = ty.results; 5295 let component_instance = raw_options.instance; 5296 let callback = options.callback.map(|i| instance.runtime_callback(i)); 5297 let memory = options 5298 .memory() 5299 .map(|i| instance.runtime_memory(i)) 5300 .map(SendSyncPtr::new); 5301 let string_encoding = options.string_encoding; 5302 let token = StoreToken::new(store.as_context_mut()); 5303 let state = store.0.concurrent_state_mut(); 5304 5305 let (tx, rx) = oneshot::channel(); 5306 let (exit_tx, exit_rx) = oneshot::channel(); 5307 5308 let instance = RuntimeInstance { 5309 instance: handle.instance().id().instance(), 5310 index: component_instance, 5311 }; 5312 let caller = state.current_thread; 5313 let task = GuestTask::new( 5314 state, 5315 Box::new(for_any_lower(move |store, params| { 5316 lower_params(handle, token.as_context_mut(store), params) 5317 })), 5318 LiftResult { 5319 lift: Box::new(for_any_lift(move |store, result| { 5320 lift_result(handle, store, result) 5321 })), 5322 ty: task_return_type, 5323 memory, 5324 string_encoding, 5325 }, 5326 Caller::Host { 5327 tx: Some(tx), 5328 exit_tx: Arc::new(exit_tx), 5329 host_future_present, 5330 caller, 5331 }, 5332 callback.map(|callback| { 5333 let callback = SendSyncPtr::new(callback); 5334 let instance = handle.instance(); 5335 Box::new(move |store: &mut dyn VMStore, event, handle| { 5336 let store = token.as_context_mut(store); 5337 // SAFETY: Per the contract of `prepare_call`, the callback 5338 // will remain valid at least as long is this task exists. 5339 unsafe { instance.call_callback(store, callback, event, handle) } 5340 }) as CallbackFn 5341 }), 5342 instance, 5343 async_function, 5344 )?; 5345 5346 let task = state.push(task)?; 5347 let thread = state.push(GuestThread::new_implicit(task))?; 5348 state.get_mut(task)?.threads.insert(thread); 5349 5350 if !store.0.may_enter(instance) { 5351 bail!(crate::Trap::CannotEnterComponent); 5352 } 5353 5354 Ok(PreparedCall { 5355 handle, 5356 thread: QualifiedThreadId { task, thread }, 5357 param_count, 5358 rx, 5359 exit_rx, 5360 _phantom: PhantomData, 5361 }) 5362 } 5363 5364 /// Queue a call previously prepared using `prepare_call` to be run as part of 5365 /// the associated `ComponentInstance`'s event loop. 5366 /// 5367 /// The returned future will resolve to the result once it is available, but 5368 /// must only be polled via the instance's event loop. See 5369 /// `StoreContextMut::run_concurrent` for details. 5370 pub(crate) fn queue_call<T: 'static, R: Send + 'static>( 5371 mut store: StoreContextMut<T>, 5372 prepared: PreparedCall<R>, 5373 ) -> Result<impl Future<Output = Result<(R, oneshot::Receiver<()>)>> + Send + 'static + use<T, R>> { 5374 let PreparedCall { 5375 handle, 5376 thread, 5377 param_count, 5378 rx, 5379 exit_rx, 5380 .. 5381 } = prepared; 5382 5383 queue_call0(store.as_context_mut(), handle, thread, param_count)?; 5384 5385 Ok(checked( 5386 store.0.id(), 5387 rx.map(move |result| { 5388 result 5389 .map(|v| (*v.downcast().unwrap(), exit_rx)) 5390 .map_err(crate::Error::from) 5391 }), 5392 )) 5393 } 5394 5395 /// Queue a call previously prepared using `prepare_call` to be run as part of 5396 /// the associated `ComponentInstance`'s event loop. 5397 fn queue_call0<T: 'static>( 5398 store: StoreContextMut<T>, 5399 handle: Func, 5400 guest_thread: QualifiedThreadId, 5401 param_count: usize, 5402 ) -> Result<()> { 5403 let (_options, _, _ty, raw_options) = handle.abi_info(store.0); 5404 let is_concurrent = raw_options.async_; 5405 let callback = raw_options.callback; 5406 let instance = handle.instance(); 5407 let callee = handle.lifted_core_func(store.0); 5408 let post_return = handle.post_return_core_func(store.0); 5409 let callback = callback.map(|i| { 5410 let instance = instance.id().get(store.0); 5411 SendSyncPtr::new(instance.runtime_callback(i)) 5412 }); 5413 5414 log::trace!("queueing call {guest_thread:?}"); 5415 5416 // SAFETY: `callee`, `callback`, and `post_return` are valid pointers 5417 // (with signatures appropriate for this call) and will remain valid as 5418 // long as this instance is valid. 5419 unsafe { 5420 instance.queue_call( 5421 store, 5422 guest_thread, 5423 SendSyncPtr::new(callee), 5424 param_count, 5425 1, 5426 is_concurrent, 5427 callback, 5428 post_return.map(SendSyncPtr::new), 5429 ) 5430 } 5431 } 5432