1 use crate::prelude::*;
2 use crate::store::{AsStoreOpaque, Asyncness, Executor, StoreId, StoreOpaque};
3 use crate::vm::mpk::{self, ProtectionMask};
4 use crate::vm::{AlwaysMut, AsyncWasmCallState};
5 use crate::{Engine, StoreContextMut};
6 use core::mem;
7 use core::ops::Range;
8 use core::pin::Pin;
9 use core::ptr::{self, NonNull};
10 use core::task::{Context, Poll};
11 use wasmtime_fiber::{Fiber, FiberStack, Suspend};
12
13 type WasmtimeResume = Result<NonNull<Context<'static>>>;
14 type WasmtimeYield = StoreFiberYield;
15 type WasmtimeComplete = Result<()>;
16 type WasmtimeSuspend = Suspend<WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
17 type WasmtimeFiber<'a> = Fiber<'a, WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
18
19 /// State related to asynchronous computations stored within a `Store<T>`.
20 ///
21 /// This structure resides inside of a `Store<T>` and is used to manage the
22 /// various pieces of state associated with asynchronous computations. Chiefly
23 /// this manages the `WasmtimeSuspend` pointer as well as `&mut Context<'_>`
24 /// when polling futures. This serves as storage to use these pointers across a
25 /// WebAssembly function boundary, for example, where the values cannot
26 /// otherwise be explicitly threaded through.
27 pub(crate) struct AsyncState {
28 /// The `Suspend` for the current fiber (or null if no such fiber is
29 /// running).
30 ///
31 /// This pointer is provided by the `wasmtime_fiber` crate when a fiber
32 /// first starts, but this pointer is unable to be carried through
33 /// WebAssembly frames for example. This serves as an alternative storage
34 /// location for the pointer provided by `wasmtime_fiber` within a fiber's
35 /// execution.
36 ///
37 /// This pointer is null when a fiber is not executing, but it is also null
38 /// when a `BlockingContext` is created. Note that when a fiber is suspended
39 /// it's always through a `BlockingContext` so this field is null whenever a
40 /// fiber is suspended as well. Fiber resumption will save the prior value
41 /// in a store and then set it to null, where suspension will then restore
42 /// what was previously in the store.
43 current_suspend: Option<NonNull<WasmtimeSuspend>>,
44
45 /// The `Context` pointer last provided in `Future for FiberFuture`.
46 ///
47 /// Like `current_suspend` above this is an example of a piece of context
48 /// which needs to be carried over a WebAssembly function frame which
49 /// otherwise doesn't take this as a parameter. This differs from
50 /// `current_suspend` though in that it is provided as part of a `Future`
51 /// poll operation but is "gone" after that poll operation completes. That
52 /// means that while `current_suspend` is the same for the lifetime of a
53 /// future this field is always changing.
54 ///
55 /// Like `current_suspend` though this is null either when a fiber isn't
56 /// running or when a `BlockingContext` is created (in which case this is
57 /// "take"en). That means that this is null on suspension/resumption of a
58 /// fiber.
59 ///
60 /// The value for this pointer is threaded directly through the
61 /// `WasmtimeResume` type which is how a pointer flows into this field from
62 /// a future-related poll call. This means that the `BlockingContext`
63 /// creation may take one value of a pointer here but restore another. That
64 /// would represent suspending in one call to `Future::poll` and then
65 /// resuming some time later in a different call to `Future::poll`.
66 ///
67 /// # Safety
68 ///
69 /// Note that this is a pretty unsafe field for two reasons. One is that
70 /// it's a raw pointer to a `Context` provided ephemerally to some call to
71 /// `Future::poll` on the stack. Another reason is that the lifetime
72 /// parameter of `Context` is unsafely changed to `'static` here which is
73 /// not correct. The ephemeral nature of this pointer is managed through the
74 /// take-style operations in `BlockingContext` and the `'static` lifetime is
75 /// handled by ensuring the signatures that work with `BlockingContext` all
76 /// use constrained anonymous lifetimes that are guaranteed to be shorter
77 /// than the original `Context` lifetime.
78 current_future_cx: Option<NonNull<Context<'static>>>,
79
80 /// The last fiber stack that was in use by the store.
81 ///
82 /// We use this to cache and reuse stacks as a performance optimization.
83 // TODO: With stack switching and the Component Model Async ABI, there may
84 // be multiple concurrent fibers in play; consider caching more than one
85 // stack at a time and making the number tunable via `Config`.
86 last_fiber_stack: Option<wasmtime_fiber::FiberStack>,
87
88 /// Whether or not this store has async host functions defined somewhere
89 /// within it or some other store-related configuration (e.g. epochs
90 /// yielding) which requires that wasm is executed on a fiber, thus async
91 /// entrypoints are required.
92 pub(crate) async_required: bool,
93 }
94
95 // SAFETY: it's known that `std::task::Context` is neither `Send` nor `Sync`,
96 // but despite this the storage here is purely temporary in getting these
97 // pointers across function frames. The actual types are not sent across threads
98 // as when a store isn't polling anything the pointer values are all set to
99 // `None`. Thus if a store is being sent across threads that's done because no
100 // fibers are active, and once fibers are active everything will stick within
101 // the same thread.
102 unsafe impl Send for AsyncState {}
103 unsafe impl Sync for AsyncState {}
104
105 impl Default for AsyncState {
default() -> Self106 fn default() -> Self {
107 Self {
108 current_suspend: None,
109 current_future_cx: None,
110 last_fiber_stack: None,
111 async_required: false,
112 }
113 }
114 }
115
116 impl AsyncState {
last_fiber_stack(&mut self) -> &mut Option<wasmtime_fiber::FiberStack>117 pub(crate) fn last_fiber_stack(&mut self) -> &mut Option<wasmtime_fiber::FiberStack> {
118 &mut self.last_fiber_stack
119 }
120
121 /// Returns whether `block_on` will succeed or panic.
122 #[inline]
can_block(&mut self) -> bool123 pub(crate) fn can_block(&mut self) -> bool {
124 self.current_future_cx.is_some()
125 }
126 }
127
128 /// A helper structure used to block a fiber.
129 ///
130 /// This is acquired via either `StoreContextMut::with_blocking` or
131 /// `StoreOpaque::with_blocking`. This structure represents the "taken" state of
132 /// pointers from a store's `AsyncState`, then modeling them as safe pointers.
133 ///
134 /// Note that the lifetimes here are carefully controlled in instances of this
135 /// structure through the construction of the `with` function.
136 pub(crate) struct BlockingContext<'a, 'b> {
137 /// Pointer to `wasmtime_fiber::Suspend` which was supplied when a fiber
138 /// first started.
139 ///
140 /// When a `BlockingContext` is first created this pointer is "taken" from
141 /// the store (the store is null'd out) and then the raw pointer previously
142 /// in the store is unsafely transformed to this safe pointer. This
143 /// represents how a `BlockingContext` temporarily has access to this
144 /// suspend but when the `BlockingContext` goes away this'll make its way
145 /// back into the store.
146 suspend: &'a mut WasmtimeSuspend,
147
148 /// Pointer to the future `Context` that this fiber is being polled with.
149 ///
150 /// Similar to `suspend` above this is taken from a store when a
151 /// `BlockingContext` is created and it's restored when the
152 /// `BlockingContext` goes away. Note though that unlike `suspend`, as
153 /// alluded to in the documentation on `AsyncState`, this value changes over
154 /// time as calls to poll are made. This field becomes `None` during a
155 /// suspension because that means that the context is released and no longer
156 /// available. Upon resumption the context here is *optionally* provided.
157 /// Cancellation is a case where it isn't passed back and a re-poll is a
158 /// case where it's passed back.
159 future_cx: Option<&'a mut Context<'b>>,
160 }
161
162 impl<'a, 'b> BlockingContext<'a, 'b> {
163 /// Method to go from a `store` provided (which internally contains a
164 /// `StoreOpaque`) to a `BlockingContext`.
165 ///
166 /// This function will "take" context from `store`'s `AsyncState` field. It
167 /// will then construct a `BlockingContext` and yield it to the closure `f`
168 /// provided. The closure can then block on futures, suspend, etc.
169 ///
170 /// Upon return of the closure `f` the state from `BlockingContext` is
171 /// restored within the store. The return value of `f` is the return value
172 /// of this function.
173 ///
174 /// Note that the `store` must be provided to this function as an argument
175 /// to originally acquire state from `AsyncState`. This store is then
176 /// supplied back to the closure `f` provided here so the store can be used
177 /// to construct an asynchronous or blocking computation which the
178 /// `BlockingContext` tries to block on.
179 ///
180 /// # Safety
181 ///
182 /// This method is safe to call at any time, but it's worth noting that the
183 /// safety of this function relies on the signature of this function.
184 /// Notably the lifetime parameters of `BlockingContext` in the `f` closure
185 /// here must be anonymous. That ensures that the `BlockingContext` that
186 /// callers get access to cannot be persisted outside of that closure call
187 /// and everything is scoped to just the closure `f` provided with nothing
188 /// escaping.
with<S, R>(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R where S: AsStoreOpaque,189 fn with<S, R>(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R
190 where
191 S: AsStoreOpaque,
192 {
193 let opaque = store.as_store_opaque();
194
195 let state = opaque.fiber_async_state_mut();
196
197 // SAFETY: this is taking pointers from `AsyncState` and then unsafely
198 // turning them into safe references. Lifetime-wise this should be safe
199 // because the inferred lifetimes for all these pointers is constrained
200 // by the signature of `f` provided here. That ensures that everything
201 // is scoped purely to the closure `f` and nothing should be persisted
202 // outside of this function call. This, for example, ensures that the
203 // `Context<'static>` doesn't leak out, it's only with an anonymous
204 // lifetime that's forcibly shorter.
205 //
206 // Provenance-wise this should be safe as if these fields in the store
207 // are non-null then the pointers are provided up-the-stack on this
208 // fiber and for this fiber. The "take" pattern here ensures that if
209 // this `BlockingContext` context acquires the pointers then there are
210 // no other instances of these pointers in use anywhere else.
211 let future_cx = unsafe { Some(state.current_future_cx.take().unwrap().as_mut()) };
212 let suspend = unsafe { state.current_suspend.take().unwrap().as_mut() };
213
214 let mut reset = ResetBlockingContext {
215 store,
216 cx: BlockingContext { future_cx, suspend },
217 };
218 return f(&mut reset.store, &mut reset.cx);
219
220 struct ResetBlockingContext<'a, 'b, S: AsStoreOpaque> {
221 store: &'a mut S,
222 cx: BlockingContext<'a, 'b>,
223 }
224
225 impl<S: AsStoreOpaque> Drop for ResetBlockingContext<'_, '_, S> {
226 fn drop(&mut self) {
227 let store = self.store.as_store_opaque();
228 let state = store.fiber_async_state_mut();
229
230 debug_assert!(state.current_future_cx.is_none());
231 debug_assert!(state.current_suspend.is_none());
232 state.current_suspend = Some(NonNull::from(&mut *self.cx.suspend));
233
234 if let Some(cx) = &mut self.cx.future_cx {
235 // SAFETY: while this is changing the lifetime to `'static`
236 // it should never be used while it's `'static` given this
237 // `BlockingContext` abstraction.
238 state.current_future_cx =
239 Some(NonNull::from(unsafe { change_context_lifetime(cx) }));
240 }
241 }
242 }
243 }
244
245 /// Blocks on the asynchronous computation represented by `future` and
246 /// produces the result here, in-line.
247 ///
248 /// This function is designed to only work when it's currently executing on
249 /// a native fiber. This fiber provides the ability for us to handle the
250 /// future's `Pending` state as "jump back to whomever called the fiber in
251 /// an asynchronous fashion and propagate `Pending`". This tight coupling
252 /// with `on_fiber` below is what powers the asynchronicity of calling wasm.
253 ///
254 /// This function takes a `future` and will (appear to) synchronously wait
255 /// on the result. While this function is executing it will fiber switch
256 /// to-and-from the original frame calling `on_fiber` which should be a
257 /// guarantee due to how async stores are configured.
258 ///
259 /// The return value here is either the output of the future `T`, or a trap
260 /// which represents that the asynchronous computation was cancelled. It is
261 /// not recommended to catch the trap and try to keep executing wasm, so
262 /// we've tried to liberally document this.
263 ///
264 /// Note that this function suspends (if needed) with
265 /// `StoreFiberYield::KeepStore`, indicating that the store must not be used
266 /// (and that no other fibers may be resumed) until this fiber resumes.
267 /// Therefore, it is not appropriate for use in e.g. guest calls to
268 /// async-lowered imports implemented as host functions, since it will
269 /// prevent any other tasks from being run. Use `Instance::suspend` to
270 /// suspend and release the store to allow other tasks to run before this
271 /// fiber is resumed.
272 ///
273 /// # Return Value
274 ///
275 /// A return value of `Ok(value)` means that the future completed with
276 /// `value`. A return value of `Err(e)` means that the fiber and its future
277 /// have been cancelled and the fiber needs to exit and complete ASAP.
278 ///
279 /// # Safety
280 ///
281 /// This function is safe to call at any time but relies on a trait bound
282 /// that is manually placed here the compiler does not otherwise require.
283 /// Notably the `Send` bound on the future provided here is not required
284 /// insofar as things compile without that. The purpose of this, however, is
285 /// to make the `unsafe impl Send for StoreFiber` more safe. The `future`
286 /// here is state that is stored on the stack during the suspension of this
287 /// fiber and is otherwise not visible to the compiler. By having a `Send`
288 /// bound here it ensures that the future doesn't have things like `Rc` or
289 /// similar pointing into thread locals which would not be sound if this
290 /// fiber crosses threads.
block_on<F>(&mut self, future: F) -> Result<F::Output> where F: Future + Send,291 pub(crate) fn block_on<F>(&mut self, future: F) -> Result<F::Output>
292 where
293 F: Future + Send,
294 {
295 let mut future = core::pin::pin!(future);
296 loop {
297 match future.as_mut().poll(self.future_cx.as_mut().unwrap()) {
298 Poll::Ready(v) => break Ok(v),
299 Poll::Pending => self.suspend(StoreFiberYield::KeepStore)?,
300 }
301 }
302 }
303
304 /// Suspend this fiber with `yield_` as the reason.
305 ///
306 /// This function will suspend the current fiber and only return after the
307 /// fiber has resumed. This function return `Ok(())` if the fiber was
308 /// resumed to be completed, and `Err(e)` indicates that the fiber has been
309 /// cancelled and needs to exit/complete ASAP.
suspend(&mut self, yield_: StoreFiberYield) -> Result<()>310 pub(crate) fn suspend(&mut self, yield_: StoreFiberYield) -> Result<()> {
311 // Over a suspension point we're guaranteed that the `Context` provided
312 // here is no longer valid, so discard it. If we're supposed to be able
313 // to poll afterwards this will be given back as part of the resume
314 // value given back.
315 self.future_cx.take();
316
317 let mut new_future_cx: NonNull<Context<'static>> = self.suspend.suspend(yield_)?;
318
319 // SAFETY: this function is unsafe as we're doing "funky" things to the
320 // `new_future_cx` we have been given. The safety here relies on the
321 // fact that the lifetimes of `BlockingContext` are all "smaller" than
322 // the original `Context` itself, and that should be guaranteed through
323 // the exclusive constructor of this type `BlockingContext::with`.
324 unsafe {
325 self.future_cx = Some(change_context_lifetime(new_future_cx.as_mut()));
326 }
327 Ok(())
328 }
329 }
330
331 impl<T> StoreContextMut<'_, T> {
332 /// Blocks on the future computed by `f`.
333 ///
334 /// # Panics
335 ///
336 /// Panics if this is invoked outside the context of a fiber.
337 #[cfg(feature = "component-model")]
block_on<R>( self, f: impl FnOnce(StoreContextMut<'_, T>) -> Pin<Box<dyn Future<Output = R> + Send + '_>>, ) -> Result<R>338 pub(crate) fn block_on<R>(
339 self,
340 f: impl FnOnce(StoreContextMut<'_, T>) -> Pin<Box<dyn Future<Output = R> + Send + '_>>,
341 ) -> Result<R> {
342 self.with_blocking(|store, cx| cx.block_on(f(store).as_mut()))
343 }
344
345 /// Creates a `BlockingContext` suitable for blocking on futures or
346 /// suspending the current fiber.
347 ///
348 /// # Panics
349 ///
350 /// Panics if this is invoked outside the context of a fiber.
with_blocking<R>( self, f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R, ) -> R351 pub(crate) fn with_blocking<R>(
352 self,
353 f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R,
354 ) -> R {
355 BlockingContext::with(self.0, |store, cx| f(StoreContextMut(store), cx))
356 }
357 }
358
359 impl StoreOpaque {
360 /// Creates a `BlockingContext` suitable for blocking on futures or
361 /// suspending the current fiber.
362 ///
363 /// # Panics
364 ///
365 /// Panics if this is invoked outside the context of a fiber.
with_blocking<R>( &mut self, f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R, ) -> R366 pub(crate) fn with_blocking<R>(
367 &mut self,
368 f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R,
369 ) -> R {
370 BlockingContext::with(self, |store, cx| f(store, cx))
371 }
372
373 /// Used when any configuration option that affects a store, as a side
374 /// effect, disallows further use of sync APIs in Wasmtime.
375 ///
376 /// For example enabling async yielding epochs, async yielding fuel, or
377 /// async resource limiters all require that wasm is invoked on fibers.
378 /// These options, when enabled, will all set this flag.
379 ///
380 /// Note that this specifically only models the transition from "some
381 /// previous state" to "async is now required". There's no reasonable way to
382 /// iterate through a store and recompute this if epoch settings, for
383 /// example, are dynamically changed.
set_async_required(&mut self, asyncness: Asyncness)384 pub(crate) fn set_async_required(&mut self, asyncness: Asyncness) {
385 match asyncness {
386 Asyncness::Yes => {
387 self.fiber_async_state_mut().async_required = true;
388 }
389 Asyncness::No => {}
390 }
391 }
392 }
393
394 /// Indicates whether or not a fiber needs to retain exclusive access to its
395 /// store across a suspend/resume interval.
396 pub(crate) enum StoreFiberYield {
397 /// Indicates the fiber needs to retain exclusive access, meaning the store
398 /// should not be used outside of the fiber until after the fiber either
399 /// suspends with `ReleaseStore` or resolves.
400 KeepStore,
401 /// Indicates the fiber does _not_ need exclusive access across the
402 /// suspend/resume interval, meaning the store may be used as needed until
403 /// the fiber is resumed.
404 #[cfg(feature = "component-model-async")]
405 ReleaseStore,
406 }
407
408 pub(crate) struct StoreFiber<'a> {
409 /// The raw `wasmtime_fiber::Fiber`.
410 ///
411 /// Note that using `StoreFiberYield` as the `Yield` type parameter allows
412 /// the fiber to indicate whether it needs exclusive access to the store
413 /// across suspend points (in which case it will pass `KeepStore` when
414 /// suspending , meaning the store must not be used at all until the fiber
415 /// is resumed again) or whether it is giving up exclusive access (in which
416 /// case it will pass `ReleaseStore` when yielding, meaning exclusive access
417 /// may be given to another fiber that runs concurrently.
418 ///
419 /// Note also that every `StoreFiber` is implicitly granted exclusive access
420 /// to the store when it is resumed.
421 fiber: Option<AlwaysMut<RawFiber<'a>>>,
422 /// See `FiberResumeState`
423 state: Option<AlwaysMut<FiberResumeState>>,
424 /// The Wasmtime `Engine` to which this fiber belongs.
425 engine: Engine,
426 /// The id of the store with which this fiber was created.
427 ///
428 /// Any attempt to resume a fiber with a different store than the one with
429 /// which it was created will panic.
430 id: StoreId,
431 }
432
433 struct RawFiber<'a>(WasmtimeFiber<'a>);
434
435 impl<'a> StoreFiber<'a> {
436 /// Convenience method to peel off some layers of abstraction around the raw
437 /// `wasmtime_fiber::Fiber`.
fiber(&mut self) -> Option<&mut WasmtimeFiber<'a>>438 fn fiber(&mut self) -> Option<&mut WasmtimeFiber<'a>> {
439 Some(&mut self.fiber.as_mut()?.get_mut().0)
440 }
441
442 /// Convenience method take the internal fiber and consume it, yielding its
443 /// original stack.
take_fiber_stack(&mut self) -> Option<FiberStack>444 fn take_fiber_stack(&mut self) -> Option<FiberStack> {
445 self.fiber.take().map(|f| f.into_inner().0.into_stack())
446 }
447
dispose(&mut self, store: &mut StoreOpaque)448 pub(crate) fn dispose(&mut self, store: &mut StoreOpaque) {
449 if let Some(fiber) = self.fiber() {
450 if !fiber.done() {
451 let result = resume_fiber(store, self, Err(format_err!("future dropped")));
452 debug_assert!(result.is_ok());
453 }
454 }
455 }
456 }
457
458 // Note that this implementation will panic if the fiber is in-progress, which
459 // will abort the process if there is already a panic being unwound. That
460 // should only happen if we failed to call `StoreFiber::dispose` on the
461 // in-progress fiber prior to dropping it, which indicates a bug in this crate
462 // which must be fixed.
463 impl Drop for StoreFiber<'_> {
drop(&mut self)464 fn drop(&mut self) {
465 if self.fiber.is_none() {
466 return;
467 }
468
469 assert!(
470 self.fiber().unwrap().done(),
471 "attempted to drop in-progress fiber without first calling `StoreFiber::dispose`"
472 );
473
474 self.state.take().unwrap().into_inner().dispose();
475
476 unsafe {
477 let stack = self.take_fiber_stack().unwrap();
478 self.engine.allocator().deallocate_fiber_stack(stack);
479 }
480 }
481 }
482
483 // This is surely the most dangerous `unsafe impl Send` in the entire
484 // crate. There are two members in `StoreFiber` which cause it to not be
485 // `Send`. One is `suspend` and is entirely uninteresting. This is just used to
486 // manage `Suspend` when resuming, and requires raw pointers to get it to happen
487 // easily. Nothing too weird about the `Send`-ness, values aren't actually
488 // crossing threads.
489 //
490 // The really interesting piece is `fiber`. Now the "fiber" here is actual
491 // honest-to-god Rust code which we're moving around. What we're doing is the
492 // equivalent of moving our thread's stack to another OS thread. Turns out we,
493 // in general, have no idea what's on the stack and would generally have no way
494 // to verify that this is actually safe to do!
495 //
496 // Thankfully, though, Wasmtime has the power. Without being glib it's actually
497 // worth examining what's on the stack. It's unfortunately not super-local to
498 // this function itself. Our closure to `Fiber::new` runs `func`, which is given
499 // to us from the outside. Thankfully, though, we have tight control over
500 // this. Usage of `on_fiber` or `Instance::resume_fiber` is typically done
501 // *just* before entering WebAssembly itself, so we'll have a few stack frames
502 // of Rust code (all in Wasmtime itself) before we enter wasm.
503 //
504 // Once we've entered wasm, well then we have a whole bunch of wasm frames on
505 // the stack. We've got this nifty thing called Cranelift, though, which allows
506 // us to also have complete control over everything on the stack!
507 //
508 // Finally, when wasm switches back to the fiber's starting pointer (this future
509 // we're returning) then it means wasm has reentered Rust. Suspension can only
510 // happen via either `block_on` or `Instance::suspend`. This, conveniently, also
511 // happens entirely in Wasmtime controlled code!
512 //
513 // There's an extremely important point that should be called out here.
514 // User-provided futures **are not on the stack** during suspension points. This
515 // is extremely crucial because we in general cannot reason about Send/Sync for
516 // stack-local variables since rustc doesn't analyze them at all. With our
517 // construction, though, we are guaranteed that Wasmtime owns all stack frames
518 // between the stack of a fiber and when the fiber suspends (and it could move
519 // across threads). At this time the only user-provided piece of data on the
520 // stack is the future itself given to us. Lo-and-behold as you might notice the
521 // future is required to be `Send`!
522 //
523 // What this all boils down to is that we, as the authors of Wasmtime, need to
524 // be extremely careful that on the async fiber stack we only store Send
525 // things. For example we can't start using `Rc` willy nilly by accident and
526 // leave a copy in TLS somewhere. (similarly we have to be ready for TLS to
527 // change while we're executing wasm code between suspension points).
528 //
529 // While somewhat onerous it shouldn't be too too hard (the TLS bit is the
530 // hardest bit so far). This does mean, though, that no user should ever have to
531 // worry about the `Send`-ness of Wasmtime. If rustc says it's ok, then it's ok.
532 //
533 // With all that in mind we unsafely assert here that Wasmtime is correct. We
534 // declare the fiber as only containing Send data on its stack, despite not
535 // knowing for sure at compile time that this is correct. That's what `unsafe`
536 // in Rust is all about, though, right?
537 unsafe impl Send for RawFiber<'_> {}
538
539 /// State of the world when a fiber last suspended.
540 ///
541 /// This structure represents global state that a fiber clobbers during its
542 /// execution. For example TLS variables are updated, system resources like MPK
543 /// masks are updated, etc. The purpose of this structure is to track all of
544 /// this state and appropriately save/restore it around fiber suspension points.
545 struct FiberResumeState {
546 /// Saved list of `CallThreadState` activations that are stored on a fiber
547 /// stack.
548 ///
549 /// This is a linked list that references stack-stored nodes on the fiber
550 /// stack that is currently suspended. The `AsyncWasmCallState` type
551 /// documents this more thoroughly but the general gist is that when we this
552 /// fiber is resumed this linked list needs to be pushed on to the current
553 /// thread's linked list of activations.
554 tls: crate::runtime::vm::AsyncWasmCallState,
555
556 /// Saved MPK protection mask, if enabled.
557 ///
558 /// When MPK is enabled then executing WebAssembly will modify the
559 /// processor's current mask of addressable protection keys. This means that
560 /// our current state may get clobbered when a fiber suspends. To ensure
561 /// that this function preserves context it will, when MPK is enabled, save
562 /// the current mask when this function is called and then restore the mask
563 /// when the function returns (aka the fiber suspends).
564 mpk: Option<ProtectionMask>,
565
566 /// The current wasm stack limit, if in use.
567 ///
568 /// This field stores the old of `VMStoreContext::stack_limit` that this
569 /// fiber should be using during its execution. This is saved/restored when
570 /// a fiber is suspended/resumed to ensure that when there are multiple
571 /// fibers within the store they all maintain an appropriate fiber-relative
572 /// stack limit.
573 stack_limit: usize,
574
575 /// The executor (e.g. the Pulley interpreter state) belonging to this
576 /// fiber.
577 ///
578 /// This is swapped with `StoreOpaque::executor` whenever this fiber is
579 /// resumed, suspended, or resolved.
580 executor: Executor,
581 }
582
583 impl FiberResumeState {
replace( self, store: &mut StoreOpaque, fiber: &mut StoreFiber<'_>, ) -> PriorFiberResumeState584 unsafe fn replace(
585 self,
586 store: &mut StoreOpaque,
587 fiber: &mut StoreFiber<'_>,
588 ) -> PriorFiberResumeState {
589 let tls = unsafe { self.tls.push() };
590 let mpk = swap_mpk_states(self.mpk);
591 let async_guard_range = fiber
592 .fiber()
593 .unwrap()
594 .stack()
595 .guard_range()
596 .unwrap_or(ptr::null_mut()..ptr::null_mut());
597 let mut executor = self.executor;
598 store.swap_executor(&mut executor);
599 PriorFiberResumeState {
600 tls,
601 mpk,
602 executor,
603 stack_limit: store.replace_stack_limit(self.stack_limit),
604 async_guard_range: store.replace_async_guard_range(async_guard_range),
605
606 // The current suspend/future_cx are always null upon resumption, so
607 // insert null. Save the old values through to get preserved across
608 // this resume/suspend.
609 current_suspend: store.replace_current_suspend(None),
610 current_future_cx: store.replace_current_future_cx(None),
611 }
612 }
613
dispose(self)614 fn dispose(self) {
615 self.tls.assert_null();
616 }
617 }
618
619 impl StoreOpaque {
620 /// Helper function to swap the `stack_limit` field in the `VMStoreContext`
621 /// within this store.
replace_stack_limit(&mut self, stack_limit: usize) -> usize622 fn replace_stack_limit(&mut self, stack_limit: usize) -> usize {
623 mem::replace(
624 &mut self.vm_store_context_mut().stack_limit.get_mut(),
625 stack_limit,
626 )
627 }
628
629 /// Helper function to swap the `async_guard_range` field in the `VMStoreContext`
630 /// within this store.
replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8>631 fn replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8> {
632 mem::replace(&mut self.vm_store_context_mut().async_guard_range, range)
633 }
634
replace_current_suspend( &mut self, ptr: Option<NonNull<WasmtimeSuspend>>, ) -> Option<NonNull<WasmtimeSuspend>>635 fn replace_current_suspend(
636 &mut self,
637 ptr: Option<NonNull<WasmtimeSuspend>>,
638 ) -> Option<NonNull<WasmtimeSuspend>> {
639 mem::replace(&mut self.fiber_async_state_mut().current_suspend, ptr)
640 }
641
replace_current_future_cx( &mut self, ptr: Option<NonNull<Context<'static>>>, ) -> Option<NonNull<Context<'static>>>642 fn replace_current_future_cx(
643 &mut self,
644 ptr: Option<NonNull<Context<'static>>>,
645 ) -> Option<NonNull<Context<'static>>> {
646 mem::replace(&mut self.fiber_async_state_mut().current_future_cx, ptr)
647 }
648 }
649
650 struct PriorFiberResumeState {
651 tls: crate::runtime::vm::PreviousAsyncWasmCallState,
652 mpk: Option<ProtectionMask>,
653 stack_limit: usize,
654 async_guard_range: Range<*mut u8>,
655 current_suspend: Option<NonNull<WasmtimeSuspend>>,
656 current_future_cx: Option<NonNull<Context<'static>>>,
657 executor: Executor,
658 }
659
660 impl PriorFiberResumeState {
replace(self, store: &mut StoreOpaque) -> FiberResumeState661 unsafe fn replace(self, store: &mut StoreOpaque) -> FiberResumeState {
662 let tls = unsafe { self.tls.restore() };
663 let mpk = swap_mpk_states(self.mpk);
664 // No need to save `_my_guard` since we can re-infer it from the fiber
665 // that this state is attached to.
666 let _my_guard = store.replace_async_guard_range(self.async_guard_range);
667
668 // Restore the previous values of current_{suspend,future_cx} but we
669 // should be guaranteed that the prior values are null, so double-check
670 // that here.
671 let prev = store.replace_current_suspend(self.current_suspend);
672 assert!(prev.is_none());
673 let prev = store.replace_current_future_cx(self.current_future_cx);
674 assert!(prev.is_none());
675
676 let mut executor = self.executor;
677 store.swap_executor(&mut executor);
678
679 FiberResumeState {
680 tls,
681 mpk,
682 executor,
683 stack_limit: store.replace_stack_limit(self.stack_limit),
684 }
685 }
686 }
687
swap_mpk_states(mask: Option<ProtectionMask>) -> Option<ProtectionMask>688 fn swap_mpk_states(mask: Option<ProtectionMask>) -> Option<ProtectionMask> {
689 mask.map(|mask| {
690 let current = mpk::current_mask();
691 mpk::allow(mask);
692 current
693 })
694 }
695
696 /// Resume the specified fiber, granting it exclusive access to the store with
697 /// which it was created.
698 ///
699 /// This will return `Ok(result)` if the fiber resolved, where `result` is the
700 /// returned value; it will return `Err(yield_)` if the fiber suspended, where
701 /// `yield_` indicates whether it released access to the store or not. See
702 /// `StoreFiber::fiber` for details.
resume_fiber<'a>( store: &mut StoreOpaque, fiber: &mut StoreFiber<'a>, result: WasmtimeResume, ) -> Result<WasmtimeComplete, StoreFiberYield>703 fn resume_fiber<'a>(
704 store: &mut StoreOpaque,
705 fiber: &mut StoreFiber<'a>,
706 result: WasmtimeResume,
707 ) -> Result<WasmtimeComplete, StoreFiberYield> {
708 assert_eq!(store.id(), fiber.id);
709
710 struct Restore<'a, 'b> {
711 store: &'b mut StoreOpaque,
712 fiber: &'b mut StoreFiber<'a>,
713 state: Option<PriorFiberResumeState>,
714 }
715
716 impl Drop for Restore<'_, '_> {
717 fn drop(&mut self) {
718 self.fiber.state =
719 Some(unsafe { self.state.take().unwrap().replace(self.store).into() });
720 }
721 }
722 let result = unsafe {
723 let prev = fiber
724 .state
725 .take()
726 .unwrap()
727 .into_inner()
728 .replace(store, fiber);
729 let restore = Restore {
730 store,
731 fiber,
732 state: Some(prev),
733 };
734 restore.fiber.fiber().unwrap().resume(result)
735 };
736
737 match &result {
738 // The fiber has finished, so recycle its stack by disposing of the
739 // underlying fiber itself.
740 Ok(_) => {
741 if let Some(stack) = fiber.take_fiber_stack() {
742 store.deallocate_fiber_stack(stack);
743 }
744 }
745
746 // The fiber has not yet finished, so it stays as-is.
747 Err(_) => {
748 // If `Err` is returned that means the fiber suspended, so we
749 // propagate that here.
750 //
751 // An additional safety check is performed when leaving this
752 // function to help bolster the guarantees of `unsafe impl Send`
753 // above. Notably this future may get re-polled on a different
754 // thread. Wasmtime's thread-local state points to the stack,
755 // however, meaning that it would be incorrect to leave a pointer in
756 // TLS when this function returns. This function performs a runtime
757 // assert to verify that this is the case, notably that the one TLS
758 // pointer Wasmtime uses is not pointing anywhere within the
759 // stack. If it is then that's a bug indicating that TLS management
760 // in Wasmtime is incorrect.
761 if let Some(range) = fiber.fiber().unwrap().stack().range() {
762 AsyncWasmCallState::assert_current_state_not_in_range(range);
763 }
764 }
765 }
766
767 result
768 }
769
770 /// Create a new `StoreFiber` which runs the specified closure.
771 ///
772 /// # Safety
773 ///
774 /// The returned `StoreFiber<'a>` structure is unconditionally `Send` but the
775 /// send-ness is actually a function of `S`. When `S` is statically known to be
776 /// `Send` then use the safe [`make_fiber`] function.
make_fiber_unchecked<'a, S>( store: &mut S, fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, ) -> Result<StoreFiber<'a>> where S: AsStoreOpaque + ?Sized + 'a,777 pub(crate) unsafe fn make_fiber_unchecked<'a, S>(
778 store: &mut S,
779 fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
780 ) -> Result<StoreFiber<'a>>
781 where
782 S: AsStoreOpaque + ?Sized + 'a,
783 {
784 let opaque = store.as_store_opaque();
785 let engine = opaque.engine().clone();
786 let executor = Executor::new(&engine)?;
787 let id = opaque.id();
788 let stack = opaque.allocate_fiber_stack()?;
789 let track_pkey_context_switch = opaque.has_pkey();
790 let store = &raw mut *store;
791 let fiber = Fiber::new(stack, move |result: WasmtimeResume, suspend| {
792 let future_cx = match result {
793 Ok(cx) => cx,
794 // Cancelled before we started? Just return.
795 Err(_) => return Ok(()),
796 };
797
798 // SAFETY: This fiber will only be resumed using `resume_fiber`, which
799 // takes a `&mut StoreOpaque` parameter and has given us exclusive
800 // access to the store until we exit or yield it back to the resumer.
801 let store_ref = unsafe { &mut *store };
802
803 // It should be a guarantee that the store has null pointers here upon
804 // starting a fiber, so now's the time to fill in the pointers now that
805 // the fiber is running and `future_cx` and `suspend` are both in scope.
806 // Note that these pointers are removed when this function returns as
807 // that's when they fall out of scope.
808 let async_state = store_ref.as_store_opaque().fiber_async_state_mut();
809 assert!(async_state.current_suspend.is_none());
810 assert!(async_state.current_future_cx.is_none());
811 async_state.current_suspend = Some(NonNull::from(suspend));
812 async_state.current_future_cx = Some(future_cx);
813
814 struct ResetCurrentPointersToNull<'a, S>(&'a mut S)
815 where
816 S: AsStoreOpaque + ?Sized;
817
818 impl<S> Drop for ResetCurrentPointersToNull<'_, S>
819 where
820 S: AsStoreOpaque + ?Sized,
821 {
822 fn drop(&mut self) {
823 let state = self.0.as_store_opaque().fiber_async_state_mut();
824
825 // Double-check that the current suspension isn't null (it
826 // should be what's in this closure). Note though that we
827 // can't check `current_future_cx` because it may either be
828 // here or not be here depending on whether this was
829 // cancelled or not.
830 debug_assert!(state.current_suspend.is_some());
831
832 state.current_suspend = None;
833 state.current_future_cx = None;
834 }
835 }
836 let reset = ResetCurrentPointersToNull(store_ref);
837
838 fun(reset.0)
839 })?;
840 Ok(StoreFiber {
841 state: Some(
842 FiberResumeState {
843 tls: crate::runtime::vm::AsyncWasmCallState::new(),
844 mpk: if track_pkey_context_switch {
845 Some(ProtectionMask::all())
846 } else {
847 None
848 },
849 stack_limit: usize::MAX,
850 executor,
851 }
852 .into(),
853 ),
854 engine,
855 id,
856 fiber: Some(RawFiber(fiber).into()),
857 })
858 }
859
860 /// Safe wrapper around [`make_fiber_unchecked`] which requires that `S` is
861 /// `Send`.
862 #[cfg(feature = "component-model-async")]
make_fiber<'a, S>( store: &mut S, fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, ) -> Result<StoreFiber<'a>> where S: AsStoreOpaque + Send + ?Sized + 'a,863 pub(crate) fn make_fiber<'a, S>(
864 store: &mut S,
865 fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
866 ) -> Result<StoreFiber<'a>>
867 where
868 S: AsStoreOpaque + Send + ?Sized + 'a,
869 {
870 unsafe { make_fiber_unchecked(store, fun) }
871 }
872
873 /// Run the specified function on a newly-created fiber and `.await` its
874 /// completion.
on_fiber<S, R>( store: &mut S, func: impl FnOnce(&mut S) -> R + Send + Sync, ) -> Result<R> where S: AsStoreOpaque + ?Sized, R: Send + Sync,875 pub(crate) async fn on_fiber<S, R>(
876 store: &mut S,
877 func: impl FnOnce(&mut S) -> R + Send + Sync,
878 ) -> Result<R>
879 where
880 S: AsStoreOpaque + ?Sized,
881 R: Send + Sync,
882 {
883 let opaque = store.as_store_opaque();
884 let config = opaque.engine().config();
885 debug_assert!(config.async_stack_size > 0);
886
887 let mut result = None;
888
889 // SAFETY: the `StoreFiber` returned by `make_fiber_unchecked` is `Send`
890 // despite we not actually knowing here whether `S` is `Send` or not. That
891 // is safe here, however, because this function is already conditionally
892 // `Send` based on `S`. Additionally `fiber` doesn't escape this function,
893 // so the future-of-this-function is still correctly `Send`-vs-not.
894 let fiber = unsafe {
895 make_fiber_unchecked(store, |store| {
896 result = Some(func(store));
897 Ok(())
898 })?
899 };
900
901 {
902 let fiber = FiberFuture {
903 store: store.as_store_opaque(),
904 fiber: Some(fiber),
905 #[cfg(feature = "component-model-async")]
906 on_release: OnRelease::ReturnPending,
907 }
908 .await
909 .unwrap();
910
911 debug_assert!(fiber.is_none());
912 }
913
914 Ok(result.unwrap())
915 }
916
917 /// Run the specified fiber until it either suspends with
918 /// `StoreFiberYield::ReleaseStore` or resolves.
919 ///
920 /// This will return `Some` if the fiber suspends with
921 /// `StoreFiberYield::ReleaseStore` or else `None` if it resolves.
922 #[cfg(feature = "component-model-async")]
resolve_or_release<'a>( store: &mut StoreOpaque, fiber: StoreFiber<'a>, ) -> Result<Option<StoreFiber<'a>>>923 pub(crate) async fn resolve_or_release<'a>(
924 store: &mut StoreOpaque,
925 fiber: StoreFiber<'a>,
926 ) -> Result<Option<StoreFiber<'a>>> {
927 FiberFuture {
928 store,
929 fiber: Some(fiber),
930 on_release: OnRelease::ReturnReady,
931 }
932 .await
933 }
934
935 /// Tells a `FiberFuture` what to do if `poll_fiber` returns
936 /// `Err(StoreFiberYield::ReleaseStore)`.
937 #[cfg(feature = "component-model-async")]
938 enum OnRelease {
939 /// Return `Poll::Pending` from `FiberFuture::poll`
940 ReturnPending,
941 /// Return `Poll::Ready` from `FiberFuture::poll`, handing ownership of the
942 /// `StoreFiber` to the caller.
943 ReturnReady,
944 }
945
946 /// A `Future` implementation for running a `StoreFiber` to completion, giving
947 /// it exclusive access to its store until it resolves.
948 struct FiberFuture<'a, 'b> {
949 store: &'a mut StoreOpaque,
950 fiber: Option<StoreFiber<'b>>,
951 #[cfg(feature = "component-model-async")]
952 on_release: OnRelease,
953 }
954
955 impl<'b> Future for FiberFuture<'_, 'b> {
956 type Output = Result<Option<StoreFiber<'b>>>;
957
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>958 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
959 let me = self.get_mut();
960
961 // SAFETY: We need to carry over this `cx` into our fiber's runtime for
962 // when it tries to poll sub-futures that are created. Doing this must
963 // be done unsafely, however, since `cx` is only alive for this one
964 // singular function call. Here we do a `transmute` to extend the
965 // lifetime of `Context` so it can be stored in our `Store`, and then we
966 // replace the current polling context with this one.
967 //
968 // The safety of this extension relies on never actually using
969 // `Context<'static>` with `'static` actually there, which should be
970 // satisfied by the users of this in the `BlockingContext` structure
971 // where the lifetime parameters there are always more constrained than
972 // they are here.
973 let cx: &mut Context<'static> = unsafe { change_context_lifetime(cx) };
974 let cx = NonNull::from(cx);
975
976 match resume_fiber(me.store, me.fiber.as_mut().unwrap(), Ok(cx)) {
977 Ok(Ok(())) => Poll::Ready(Ok(None)),
978 Ok(Err(e)) => Poll::Ready(Err(e)),
979 Err(StoreFiberYield::KeepStore) => Poll::Pending,
980 #[cfg(feature = "component-model-async")]
981 Err(StoreFiberYield::ReleaseStore) => match &me.on_release {
982 OnRelease::ReturnPending => Poll::Pending,
983 OnRelease::ReturnReady => Poll::Ready(Ok(me.fiber.take())),
984 },
985 }
986 }
987 }
988
989 impl Drop for FiberFuture<'_, '_> {
drop(&mut self)990 fn drop(&mut self) {
991 if let Some(fiber) = &mut self.fiber {
992 fiber.dispose(self.store);
993 }
994 }
995 }
996
997 /// Changes the lifetime `'l` in `Context<'l>` to something else.
998 ///
999 /// # Safety
1000 ///
1001 /// Not a safe operation. Requires external knowledge about how the pointer is
1002 /// being used to determine whether it's actually safe or not. See docs on
1003 /// callers of this function. The purpose of this is to scope the `transmute` to
1004 /// as small an operation as possible.
change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b>1005 unsafe fn change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b> {
1006 // SAFETY: See the function documentation, this is not safe in general.
1007 unsafe { mem::transmute::<&mut Context<'_>, &mut Context<'b>>(cx) }
1008 }
1009