1 use crate::prelude::*;
2 use crate::store::{AsStoreOpaque, Asyncness, Executor, StoreId, StoreOpaque};
3 use crate::vm::mpk::{self, ProtectionMask};
4 use crate::vm::{AlwaysMut, AsyncWasmCallState};
5 use crate::{Engine, StoreContextMut};
6 use core::mem;
7 use core::ops::Range;
8 use core::pin::Pin;
9 use core::ptr::{self, NonNull};
10 use core::task::{Context, Poll};
11 use wasmtime_fiber::{Fiber, FiberStack, Suspend};
12 
13 type WasmtimeResume = Result<NonNull<Context<'static>>>;
14 type WasmtimeYield = StoreFiberYield;
15 type WasmtimeComplete = Result<()>;
16 type WasmtimeSuspend = Suspend<WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
17 type WasmtimeFiber<'a> = Fiber<'a, WasmtimeResume, WasmtimeYield, WasmtimeComplete>;
18 
19 /// State related to asynchronous computations stored within a `Store<T>`.
20 ///
21 /// This structure resides inside of a `Store<T>` and is used to manage the
22 /// various pieces of state associated with asynchronous computations. Chiefly
23 /// this manages the `WasmtimeSuspend` pointer as well as `&mut Context<'_>`
24 /// when polling futures. This serves as storage to use these pointers across a
25 /// WebAssembly function boundary, for example, where the values cannot
26 /// otherwise be explicitly threaded through.
27 pub(crate) struct AsyncState {
28     /// The `Suspend` for the current fiber (or null if no such fiber is
29     /// running).
30     ///
31     /// This pointer is provided by the `wasmtime_fiber` crate when a fiber
32     /// first starts, but this pointer is unable to be carried through
33     /// WebAssembly frames for example. This serves as an alternative storage
34     /// location for the pointer provided by `wasmtime_fiber` within a fiber's
35     /// execution.
36     ///
37     /// This pointer is null when a fiber is not executing, but it is also null
38     /// when a `BlockingContext` is created. Note that when a fiber is suspended
39     /// it's always through a `BlockingContext` so this field is null whenever a
40     /// fiber is suspended as well. Fiber resumption will save the prior value
41     /// in a store and then set it to null, where suspension will then restore
42     /// what was previously in the store.
43     current_suspend: Option<NonNull<WasmtimeSuspend>>,
44 
45     /// The `Context` pointer last provided in `Future for FiberFuture`.
46     ///
47     /// Like `current_suspend` above this is an example of a piece of context
48     /// which needs to be carried over a WebAssembly function frame which
49     /// otherwise doesn't take this as a parameter. This differs from
50     /// `current_suspend` though in that it is provided as part of a `Future`
51     /// poll operation but is "gone" after that poll operation completes. That
52     /// means that while `current_suspend` is the same for the lifetime of a
53     /// future this field is always changing.
54     ///
55     /// Like `current_suspend` though this is null either when a fiber isn't
56     /// running or when a `BlockingContext` is created (in which case this is
57     /// "take"en). That means that this is null on suspension/resumption of a
58     /// fiber.
59     ///
60     /// The value for this pointer is threaded directly through the
61     /// `WasmtimeResume` type which is how a pointer flows into this field from
62     /// a future-related poll call. This means that the `BlockingContext`
63     /// creation may take one value of a pointer here but restore another. That
64     /// would represent suspending in one call to `Future::poll` and then
65     /// resuming some time later in a different call to `Future::poll`.
66     ///
67     /// # Safety
68     ///
69     /// Note that this is a pretty unsafe field for two reasons. One is that
70     /// it's a raw pointer to a `Context` provided ephemerally to some call to
71     /// `Future::poll` on the stack. Another reason is that the lifetime
72     /// parameter of `Context` is unsafely changed to `'static` here which is
73     /// not correct. The ephemeral nature of this pointer is managed through the
74     /// take-style operations in `BlockingContext` and the `'static` lifetime is
75     /// handled by ensuring the signatures that work with `BlockingContext` all
76     /// use constrained anonymous lifetimes that are guaranteed to be shorter
77     /// than the original `Context` lifetime.
78     current_future_cx: Option<NonNull<Context<'static>>>,
79 
80     /// The last fiber stack that was in use by the store.
81     ///
82     /// We use this to cache and reuse stacks as a performance optimization.
83     // TODO: With stack switching and the Component Model Async ABI, there may
84     // be multiple concurrent fibers in play; consider caching more than one
85     // stack at a time and making the number tunable via `Config`.
86     last_fiber_stack: Option<wasmtime_fiber::FiberStack>,
87 
88     /// Whether or not this store has async host functions defined somewhere
89     /// within it or some other store-related configuration (e.g. epochs
90     /// yielding) which requires that wasm is executed on a fiber, thus async
91     /// entrypoints are required.
92     pub(crate) async_required: bool,
93 }
94 
95 // SAFETY: it's known that `std::task::Context` is neither `Send` nor `Sync`,
96 // but despite this the storage here is purely temporary in getting these
97 // pointers across function frames. The actual types are not sent across threads
98 // as when a store isn't polling anything the pointer values are all set to
99 // `None`. Thus if a store is being sent across threads that's done because no
100 // fibers are active, and once fibers are active everything will stick within
101 // the same thread.
102 unsafe impl Send for AsyncState {}
103 unsafe impl Sync for AsyncState {}
104 
105 impl Default for AsyncState {
default() -> Self106     fn default() -> Self {
107         Self {
108             current_suspend: None,
109             current_future_cx: None,
110             last_fiber_stack: None,
111             async_required: false,
112         }
113     }
114 }
115 
116 impl AsyncState {
last_fiber_stack(&mut self) -> &mut Option<wasmtime_fiber::FiberStack>117     pub(crate) fn last_fiber_stack(&mut self) -> &mut Option<wasmtime_fiber::FiberStack> {
118         &mut self.last_fiber_stack
119     }
120 
121     /// Returns whether `block_on` will succeed or panic.
122     #[inline]
can_block(&mut self) -> bool123     pub(crate) fn can_block(&mut self) -> bool {
124         self.current_future_cx.is_some()
125     }
126 }
127 
128 /// A helper structure used to block a fiber.
129 ///
130 /// This is acquired via either `StoreContextMut::with_blocking` or
131 /// `StoreOpaque::with_blocking`. This structure represents the "taken" state of
132 /// pointers from a store's `AsyncState`, then modeling them as safe pointers.
133 ///
134 /// Note that the lifetimes here are carefully controlled in instances of this
135 /// structure through the construction of the `with` function.
136 pub(crate) struct BlockingContext<'a, 'b> {
137     /// Pointer to `wasmtime_fiber::Suspend` which was supplied when a fiber
138     /// first started.
139     ///
140     /// When a `BlockingContext` is first created this pointer is "taken" from
141     /// the store (the store is null'd out) and then the raw pointer previously
142     /// in the store is unsafely transformed to this safe pointer. This
143     /// represents how a `BlockingContext` temporarily has access to this
144     /// suspend but when the `BlockingContext` goes away this'll make its way
145     /// back into the store.
146     suspend: &'a mut WasmtimeSuspend,
147 
148     /// Pointer to the future `Context` that this fiber is being polled with.
149     ///
150     /// Similar to `suspend` above this is taken from a store when a
151     /// `BlockingContext` is created and it's restored when the
152     /// `BlockingContext` goes away. Note though that unlike `suspend`, as
153     /// alluded to in the documentation on `AsyncState`, this value changes over
154     /// time as calls to poll are made. This field becomes `None` during a
155     /// suspension because that means that the context is released and no longer
156     /// available. Upon resumption the context here is *optionally* provided.
157     /// Cancellation is a case where it isn't passed back and a re-poll is a
158     /// case where it's passed back.
159     future_cx: Option<&'a mut Context<'b>>,
160 }
161 
162 impl<'a, 'b> BlockingContext<'a, 'b> {
163     /// Method to go from a `store` provided (which internally contains a
164     /// `StoreOpaque`) to a `BlockingContext`.
165     ///
166     /// This function will "take" context from `store`'s `AsyncState` field. It
167     /// will then construct a `BlockingContext` and yield it to the closure `f`
168     /// provided. The closure can then block on futures, suspend, etc.
169     ///
170     /// Upon return of the closure `f` the state from `BlockingContext` is
171     /// restored within the store. The return value of `f` is the return value
172     /// of this function.
173     ///
174     /// Note that the `store` must be provided to this function as an argument
175     /// to originally acquire state from `AsyncState`. This store is then
176     /// supplied back to the closure `f` provided here so the store can be used
177     /// to construct an asynchronous or blocking computation which the
178     /// `BlockingContext` tries to block on.
179     ///
180     /// # Safety
181     ///
182     /// This method is safe to call at any time, but it's worth noting that the
183     /// safety of this function relies on the signature of this function.
184     /// Notably the lifetime parameters of `BlockingContext` in the `f` closure
185     /// here must be anonymous. That ensures that the `BlockingContext` that
186     /// callers get access to cannot be persisted outside of that closure call
187     /// and everything is scoped to just the closure `f` provided with nothing
188     /// escaping.
with<S, R>(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R where S: AsStoreOpaque,189     fn with<S, R>(store: &mut S, f: impl FnOnce(&mut S, &mut BlockingContext<'_, '_>) -> R) -> R
190     where
191         S: AsStoreOpaque,
192     {
193         let opaque = store.as_store_opaque();
194 
195         let state = opaque.fiber_async_state_mut();
196 
197         // SAFETY: this is taking pointers from `AsyncState` and then unsafely
198         // turning them into safe references. Lifetime-wise this should be safe
199         // because the inferred lifetimes for all these pointers is constrained
200         // by the signature of `f` provided here. That ensures that everything
201         // is scoped purely to the closure `f` and nothing should be persisted
202         // outside of this function call. This, for example, ensures that the
203         // `Context<'static>` doesn't leak out, it's only with an anonymous
204         // lifetime that's forcibly shorter.
205         //
206         // Provenance-wise this should be safe as if these fields in the store
207         // are non-null then the pointers are provided up-the-stack on this
208         // fiber and for this fiber. The "take" pattern here ensures that if
209         // this `BlockingContext` context acquires the pointers then there are
210         // no other instances of these pointers in use anywhere else.
211         let future_cx = unsafe { Some(state.current_future_cx.take().unwrap().as_mut()) };
212         let suspend = unsafe { state.current_suspend.take().unwrap().as_mut() };
213 
214         let mut reset = ResetBlockingContext {
215             store,
216             cx: BlockingContext { future_cx, suspend },
217         };
218         return f(&mut reset.store, &mut reset.cx);
219 
220         struct ResetBlockingContext<'a, 'b, S: AsStoreOpaque> {
221             store: &'a mut S,
222             cx: BlockingContext<'a, 'b>,
223         }
224 
225         impl<S: AsStoreOpaque> Drop for ResetBlockingContext<'_, '_, S> {
226             fn drop(&mut self) {
227                 let store = self.store.as_store_opaque();
228                 let state = store.fiber_async_state_mut();
229 
230                 debug_assert!(state.current_future_cx.is_none());
231                 debug_assert!(state.current_suspend.is_none());
232                 state.current_suspend = Some(NonNull::from(&mut *self.cx.suspend));
233 
234                 if let Some(cx) = &mut self.cx.future_cx {
235                     // SAFETY: while this is changing the lifetime to `'static`
236                     // it should never be used while it's `'static` given this
237                     // `BlockingContext` abstraction.
238                     state.current_future_cx =
239                         Some(NonNull::from(unsafe { change_context_lifetime(cx) }));
240                 }
241             }
242         }
243     }
244 
245     /// Blocks on the asynchronous computation represented by `future` and
246     /// produces the result here, in-line.
247     ///
248     /// This function is designed to only work when it's currently executing on
249     /// a native fiber. This fiber provides the ability for us to handle the
250     /// future's `Pending` state as "jump back to whomever called the fiber in
251     /// an asynchronous fashion and propagate `Pending`". This tight coupling
252     /// with `on_fiber` below is what powers the asynchronicity of calling wasm.
253     ///
254     /// This function takes a `future` and will (appear to) synchronously wait
255     /// on the result. While this function is executing it will fiber switch
256     /// to-and-from the original frame calling `on_fiber` which should be a
257     /// guarantee due to how async stores are configured.
258     ///
259     /// The return value here is either the output of the future `T`, or a trap
260     /// which represents that the asynchronous computation was cancelled. It is
261     /// not recommended to catch the trap and try to keep executing wasm, so
262     /// we've tried to liberally document this.
263     ///
264     /// Note that this function suspends (if needed) with
265     /// `StoreFiberYield::KeepStore`, indicating that the store must not be used
266     /// (and that no other fibers may be resumed) until this fiber resumes.
267     /// Therefore, it is not appropriate for use in e.g. guest calls to
268     /// async-lowered imports implemented as host functions, since it will
269     /// prevent any other tasks from being run.  Use `Instance::suspend` to
270     /// suspend and release the store to allow other tasks to run before this
271     /// fiber is resumed.
272     ///
273     /// # Return Value
274     ///
275     /// A return value of `Ok(value)` means that the future completed with
276     /// `value`. A return value of `Err(e)` means that the fiber and its future
277     /// have been cancelled and the fiber needs to exit and complete ASAP.
278     ///
279     /// # Safety
280     ///
281     /// This function is safe to call at any time but relies on a trait bound
282     /// that is manually placed here the compiler does not otherwise require.
283     /// Notably the `Send` bound on the future provided here is not required
284     /// insofar as things compile without that. The purpose of this, however, is
285     /// to make the `unsafe impl Send for StoreFiber` more safe. The `future`
286     /// here is state that is stored on the stack during the suspension of this
287     /// fiber and is otherwise not visible to the compiler. By having a `Send`
288     /// bound here it ensures that the future doesn't have things like `Rc` or
289     /// similar pointing into thread locals which would not be sound if this
290     /// fiber crosses threads.
block_on<F>(&mut self, future: F) -> Result<F::Output> where F: Future + Send,291     pub(crate) fn block_on<F>(&mut self, future: F) -> Result<F::Output>
292     where
293         F: Future + Send,
294     {
295         let mut future = core::pin::pin!(future);
296         loop {
297             match future.as_mut().poll(self.future_cx.as_mut().unwrap()) {
298                 Poll::Ready(v) => break Ok(v),
299                 Poll::Pending => self.suspend(StoreFiberYield::KeepStore)?,
300             }
301         }
302     }
303 
304     /// Suspend this fiber with `yield_` as the reason.
305     ///
306     /// This function will suspend the current fiber and only return after the
307     /// fiber has resumed. This function return `Ok(())` if the fiber was
308     /// resumed to be completed, and `Err(e)` indicates that the fiber has been
309     /// cancelled and needs to exit/complete ASAP.
suspend(&mut self, yield_: StoreFiberYield) -> Result<()>310     pub(crate) fn suspend(&mut self, yield_: StoreFiberYield) -> Result<()> {
311         // Over a suspension point we're guaranteed that the `Context` provided
312         // here is no longer valid, so discard it. If we're supposed to be able
313         // to poll afterwards this will be given back as part of the resume
314         // value given back.
315         self.future_cx.take();
316 
317         let mut new_future_cx: NonNull<Context<'static>> = self.suspend.suspend(yield_)?;
318 
319         // SAFETY: this function is unsafe as we're doing "funky" things to the
320         // `new_future_cx` we have been given. The safety here relies on the
321         // fact that the lifetimes of `BlockingContext` are all "smaller" than
322         // the original `Context` itself, and that should be guaranteed through
323         // the exclusive constructor of this type `BlockingContext::with`.
324         unsafe {
325             self.future_cx = Some(change_context_lifetime(new_future_cx.as_mut()));
326         }
327         Ok(())
328     }
329 }
330 
331 impl<T> StoreContextMut<'_, T> {
332     /// Blocks on the future computed by `f`.
333     ///
334     /// # Panics
335     ///
336     /// Panics if this is invoked outside the context of a fiber.
337     #[cfg(feature = "component-model")]
block_on<R>( self, f: impl FnOnce(StoreContextMut<'_, T>) -> Pin<Box<dyn Future<Output = R> + Send + '_>>, ) -> Result<R>338     pub(crate) fn block_on<R>(
339         self,
340         f: impl FnOnce(StoreContextMut<'_, T>) -> Pin<Box<dyn Future<Output = R> + Send + '_>>,
341     ) -> Result<R> {
342         self.with_blocking(|store, cx| cx.block_on(f(store).as_mut()))
343     }
344 
345     /// Creates a `BlockingContext` suitable for blocking on futures or
346     /// suspending the current fiber.
347     ///
348     /// # Panics
349     ///
350     /// Panics if this is invoked outside the context of a fiber.
with_blocking<R>( self, f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R, ) -> R351     pub(crate) fn with_blocking<R>(
352         self,
353         f: impl FnOnce(StoreContextMut<'_, T>, &mut BlockingContext<'_, '_>) -> R,
354     ) -> R {
355         BlockingContext::with(self.0, |store, cx| f(StoreContextMut(store), cx))
356     }
357 }
358 
359 impl StoreOpaque {
360     /// Creates a `BlockingContext` suitable for blocking on futures or
361     /// suspending the current fiber.
362     ///
363     /// # Panics
364     ///
365     /// Panics if this is invoked outside the context of a fiber.
with_blocking<R>( &mut self, f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R, ) -> R366     pub(crate) fn with_blocking<R>(
367         &mut self,
368         f: impl FnOnce(&mut Self, &mut BlockingContext<'_, '_>) -> R,
369     ) -> R {
370         BlockingContext::with(self, |store, cx| f(store, cx))
371     }
372 
373     /// Used when any configuration option that affects a store, as a side
374     /// effect, disallows further use of sync APIs in Wasmtime.
375     ///
376     /// For example enabling async yielding epochs, async yielding fuel, or
377     /// async resource limiters all require that wasm is invoked on fibers.
378     /// These options, when enabled, will all set this flag.
379     ///
380     /// Note that this specifically only models the transition from "some
381     /// previous state" to "async is now required". There's no reasonable way to
382     /// iterate through a store and recompute this if epoch settings, for
383     /// example, are dynamically changed.
set_async_required(&mut self, asyncness: Asyncness)384     pub(crate) fn set_async_required(&mut self, asyncness: Asyncness) {
385         match asyncness {
386             Asyncness::Yes => {
387                 self.fiber_async_state_mut().async_required = true;
388             }
389             Asyncness::No => {}
390         }
391     }
392 }
393 
394 /// Indicates whether or not a fiber needs to retain exclusive access to its
395 /// store across a suspend/resume interval.
396 pub(crate) enum StoreFiberYield {
397     /// Indicates the fiber needs to retain exclusive access, meaning the store
398     /// should not be used outside of the fiber until after the fiber either
399     /// suspends with `ReleaseStore` or resolves.
400     KeepStore,
401     /// Indicates the fiber does _not_ need exclusive access across the
402     /// suspend/resume interval, meaning the store may be used as needed until
403     /// the fiber is resumed.
404     #[cfg(feature = "component-model-async")]
405     ReleaseStore,
406 }
407 
408 pub(crate) struct StoreFiber<'a> {
409     /// The raw `wasmtime_fiber::Fiber`.
410     ///
411     /// Note that using `StoreFiberYield` as the `Yield` type parameter allows
412     /// the fiber to indicate whether it needs exclusive access to the store
413     /// across suspend points (in which case it will pass `KeepStore` when
414     /// suspending , meaning the store must not be used at all until the fiber
415     /// is resumed again) or whether it is giving up exclusive access (in which
416     /// case it will pass `ReleaseStore` when yielding, meaning exclusive access
417     /// may be given to another fiber that runs concurrently.
418     ///
419     /// Note also that every `StoreFiber` is implicitly granted exclusive access
420     /// to the store when it is resumed.
421     fiber: Option<AlwaysMut<RawFiber<'a>>>,
422     /// See `FiberResumeState`
423     state: Option<AlwaysMut<FiberResumeState>>,
424     /// The Wasmtime `Engine` to which this fiber belongs.
425     engine: Engine,
426     /// The id of the store with which this fiber was created.
427     ///
428     /// Any attempt to resume a fiber with a different store than the one with
429     /// which it was created will panic.
430     id: StoreId,
431 }
432 
433 struct RawFiber<'a>(WasmtimeFiber<'a>);
434 
435 impl<'a> StoreFiber<'a> {
436     /// Convenience method to peel off some layers of abstraction around the raw
437     /// `wasmtime_fiber::Fiber`.
fiber(&mut self) -> Option<&mut WasmtimeFiber<'a>>438     fn fiber(&mut self) -> Option<&mut WasmtimeFiber<'a>> {
439         Some(&mut self.fiber.as_mut()?.get_mut().0)
440     }
441 
442     /// Convenience method take the internal fiber and consume it, yielding its
443     /// original stack.
take_fiber_stack(&mut self) -> Option<FiberStack>444     fn take_fiber_stack(&mut self) -> Option<FiberStack> {
445         self.fiber.take().map(|f| f.into_inner().0.into_stack())
446     }
447 
dispose(&mut self, store: &mut StoreOpaque)448     pub(crate) fn dispose(&mut self, store: &mut StoreOpaque) {
449         if let Some(fiber) = self.fiber() {
450             if !fiber.done() {
451                 let result = resume_fiber(store, self, Err(format_err!("future dropped")));
452                 debug_assert!(result.is_ok());
453             }
454         }
455     }
456 }
457 
458 // Note that this implementation will panic if the fiber is in-progress, which
459 // will abort the process if there is already a panic being unwound.  That
460 // should only happen if we failed to call `StoreFiber::dispose` on the
461 // in-progress fiber prior to dropping it, which indicates a bug in this crate
462 // which must be fixed.
463 impl Drop for StoreFiber<'_> {
drop(&mut self)464     fn drop(&mut self) {
465         if self.fiber.is_none() {
466             return;
467         }
468 
469         assert!(
470             self.fiber().unwrap().done(),
471             "attempted to drop in-progress fiber without first calling `StoreFiber::dispose`"
472         );
473 
474         self.state.take().unwrap().into_inner().dispose();
475 
476         unsafe {
477             let stack = self.take_fiber_stack().unwrap();
478             self.engine.allocator().deallocate_fiber_stack(stack);
479         }
480     }
481 }
482 
483 // This is surely the most dangerous `unsafe impl Send` in the entire
484 // crate. There are two members in `StoreFiber` which cause it to not be
485 // `Send`. One is `suspend` and is entirely uninteresting.  This is just used to
486 // manage `Suspend` when resuming, and requires raw pointers to get it to happen
487 // easily.  Nothing too weird about the `Send`-ness, values aren't actually
488 // crossing threads.
489 //
490 // The really interesting piece is `fiber`. Now the "fiber" here is actual
491 // honest-to-god Rust code which we're moving around. What we're doing is the
492 // equivalent of moving our thread's stack to another OS thread. Turns out we,
493 // in general, have no idea what's on the stack and would generally have no way
494 // to verify that this is actually safe to do!
495 //
496 // Thankfully, though, Wasmtime has the power. Without being glib it's actually
497 // worth examining what's on the stack. It's unfortunately not super-local to
498 // this function itself. Our closure to `Fiber::new` runs `func`, which is given
499 // to us from the outside. Thankfully, though, we have tight control over
500 // this. Usage of `on_fiber` or `Instance::resume_fiber` is typically done
501 // *just* before entering WebAssembly itself, so we'll have a few stack frames
502 // of Rust code (all in Wasmtime itself) before we enter wasm.
503 //
504 // Once we've entered wasm, well then we have a whole bunch of wasm frames on
505 // the stack. We've got this nifty thing called Cranelift, though, which allows
506 // us to also have complete control over everything on the stack!
507 //
508 // Finally, when wasm switches back to the fiber's starting pointer (this future
509 // we're returning) then it means wasm has reentered Rust.  Suspension can only
510 // happen via either `block_on` or `Instance::suspend`. This, conveniently, also
511 // happens entirely in Wasmtime controlled code!
512 //
513 // There's an extremely important point that should be called out here.
514 // User-provided futures **are not on the stack** during suspension points. This
515 // is extremely crucial because we in general cannot reason about Send/Sync for
516 // stack-local variables since rustc doesn't analyze them at all. With our
517 // construction, though, we are guaranteed that Wasmtime owns all stack frames
518 // between the stack of a fiber and when the fiber suspends (and it could move
519 // across threads). At this time the only user-provided piece of data on the
520 // stack is the future itself given to us. Lo-and-behold as you might notice the
521 // future is required to be `Send`!
522 //
523 // What this all boils down to is that we, as the authors of Wasmtime, need to
524 // be extremely careful that on the async fiber stack we only store Send
525 // things. For example we can't start using `Rc` willy nilly by accident and
526 // leave a copy in TLS somewhere. (similarly we have to be ready for TLS to
527 // change while we're executing wasm code between suspension points).
528 //
529 // While somewhat onerous it shouldn't be too too hard (the TLS bit is the
530 // hardest bit so far). This does mean, though, that no user should ever have to
531 // worry about the `Send`-ness of Wasmtime. If rustc says it's ok, then it's ok.
532 //
533 // With all that in mind we unsafely assert here that Wasmtime is correct. We
534 // declare the fiber as only containing Send data on its stack, despite not
535 // knowing for sure at compile time that this is correct. That's what `unsafe`
536 // in Rust is all about, though, right?
537 unsafe impl Send for RawFiber<'_> {}
538 
539 /// State of the world when a fiber last suspended.
540 ///
541 /// This structure represents global state that a fiber clobbers during its
542 /// execution. For example TLS variables are updated, system resources like MPK
543 /// masks are updated, etc. The purpose of this structure is to track all of
544 /// this state and appropriately save/restore it around fiber suspension points.
545 struct FiberResumeState {
546     /// Saved list of `CallThreadState` activations that are stored on a fiber
547     /// stack.
548     ///
549     /// This is a linked list that references stack-stored nodes on the fiber
550     /// stack that is currently suspended. The `AsyncWasmCallState` type
551     /// documents this more thoroughly but the general gist is that when we this
552     /// fiber is resumed this linked list needs to be pushed on to the current
553     /// thread's linked list of activations.
554     tls: crate::runtime::vm::AsyncWasmCallState,
555 
556     /// Saved MPK protection mask, if enabled.
557     ///
558     /// When MPK is enabled then executing WebAssembly will modify the
559     /// processor's current mask of addressable protection keys. This means that
560     /// our current state may get clobbered when a fiber suspends. To ensure
561     /// that this function preserves context it will, when MPK is enabled, save
562     /// the current mask when this function is called and then restore the mask
563     /// when the function returns (aka the fiber suspends).
564     mpk: Option<ProtectionMask>,
565 
566     /// The current wasm stack limit, if in use.
567     ///
568     /// This field stores the old of `VMStoreContext::stack_limit` that this
569     /// fiber should be using during its execution. This is saved/restored when
570     /// a fiber is suspended/resumed to ensure that when there are multiple
571     /// fibers within the store they all maintain an appropriate fiber-relative
572     /// stack limit.
573     stack_limit: usize,
574 
575     /// The executor (e.g. the Pulley interpreter state) belonging to this
576     /// fiber.
577     ///
578     /// This is swapped with `StoreOpaque::executor` whenever this fiber is
579     /// resumed, suspended, or resolved.
580     executor: Executor,
581 }
582 
583 impl FiberResumeState {
replace( self, store: &mut StoreOpaque, fiber: &mut StoreFiber<'_>, ) -> PriorFiberResumeState584     unsafe fn replace(
585         self,
586         store: &mut StoreOpaque,
587         fiber: &mut StoreFiber<'_>,
588     ) -> PriorFiberResumeState {
589         let tls = unsafe { self.tls.push() };
590         let mpk = swap_mpk_states(self.mpk);
591         let async_guard_range = fiber
592             .fiber()
593             .unwrap()
594             .stack()
595             .guard_range()
596             .unwrap_or(ptr::null_mut()..ptr::null_mut());
597         let mut executor = self.executor;
598         store.swap_executor(&mut executor);
599         PriorFiberResumeState {
600             tls,
601             mpk,
602             executor,
603             stack_limit: store.replace_stack_limit(self.stack_limit),
604             async_guard_range: store.replace_async_guard_range(async_guard_range),
605 
606             // The current suspend/future_cx are always null upon resumption, so
607             // insert null. Save the old values through to get preserved across
608             // this resume/suspend.
609             current_suspend: store.replace_current_suspend(None),
610             current_future_cx: store.replace_current_future_cx(None),
611         }
612     }
613 
dispose(self)614     fn dispose(self) {
615         self.tls.assert_null();
616     }
617 }
618 
619 impl StoreOpaque {
620     /// Helper function to swap the `stack_limit` field in the `VMStoreContext`
621     /// within this store.
replace_stack_limit(&mut self, stack_limit: usize) -> usize622     fn replace_stack_limit(&mut self, stack_limit: usize) -> usize {
623         mem::replace(
624             &mut self.vm_store_context_mut().stack_limit.get_mut(),
625             stack_limit,
626         )
627     }
628 
629     /// Helper function to swap the `async_guard_range` field in the `VMStoreContext`
630     /// within this store.
replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8>631     fn replace_async_guard_range(&mut self, range: Range<*mut u8>) -> Range<*mut u8> {
632         mem::replace(&mut self.vm_store_context_mut().async_guard_range, range)
633     }
634 
replace_current_suspend( &mut self, ptr: Option<NonNull<WasmtimeSuspend>>, ) -> Option<NonNull<WasmtimeSuspend>>635     fn replace_current_suspend(
636         &mut self,
637         ptr: Option<NonNull<WasmtimeSuspend>>,
638     ) -> Option<NonNull<WasmtimeSuspend>> {
639         mem::replace(&mut self.fiber_async_state_mut().current_suspend, ptr)
640     }
641 
replace_current_future_cx( &mut self, ptr: Option<NonNull<Context<'static>>>, ) -> Option<NonNull<Context<'static>>>642     fn replace_current_future_cx(
643         &mut self,
644         ptr: Option<NonNull<Context<'static>>>,
645     ) -> Option<NonNull<Context<'static>>> {
646         mem::replace(&mut self.fiber_async_state_mut().current_future_cx, ptr)
647     }
648 }
649 
650 struct PriorFiberResumeState {
651     tls: crate::runtime::vm::PreviousAsyncWasmCallState,
652     mpk: Option<ProtectionMask>,
653     stack_limit: usize,
654     async_guard_range: Range<*mut u8>,
655     current_suspend: Option<NonNull<WasmtimeSuspend>>,
656     current_future_cx: Option<NonNull<Context<'static>>>,
657     executor: Executor,
658 }
659 
660 impl PriorFiberResumeState {
replace(self, store: &mut StoreOpaque) -> FiberResumeState661     unsafe fn replace(self, store: &mut StoreOpaque) -> FiberResumeState {
662         let tls = unsafe { self.tls.restore() };
663         let mpk = swap_mpk_states(self.mpk);
664         // No need to save `_my_guard` since we can re-infer it from the fiber
665         // that this state is attached to.
666         let _my_guard = store.replace_async_guard_range(self.async_guard_range);
667 
668         // Restore the previous values of current_{suspend,future_cx} but we
669         // should be guaranteed that the prior values are null, so double-check
670         // that here.
671         let prev = store.replace_current_suspend(self.current_suspend);
672         assert!(prev.is_none());
673         let prev = store.replace_current_future_cx(self.current_future_cx);
674         assert!(prev.is_none());
675 
676         let mut executor = self.executor;
677         store.swap_executor(&mut executor);
678 
679         FiberResumeState {
680             tls,
681             mpk,
682             executor,
683             stack_limit: store.replace_stack_limit(self.stack_limit),
684         }
685     }
686 }
687 
swap_mpk_states(mask: Option<ProtectionMask>) -> Option<ProtectionMask>688 fn swap_mpk_states(mask: Option<ProtectionMask>) -> Option<ProtectionMask> {
689     mask.map(|mask| {
690         let current = mpk::current_mask();
691         mpk::allow(mask);
692         current
693     })
694 }
695 
696 /// Resume the specified fiber, granting it exclusive access to the store with
697 /// which it was created.
698 ///
699 /// This will return `Ok(result)` if the fiber resolved, where `result` is the
700 /// returned value; it will return `Err(yield_)` if the fiber suspended, where
701 /// `yield_` indicates whether it released access to the store or not.  See
702 /// `StoreFiber::fiber` for details.
resume_fiber<'a>( store: &mut StoreOpaque, fiber: &mut StoreFiber<'a>, result: WasmtimeResume, ) -> Result<WasmtimeComplete, StoreFiberYield>703 fn resume_fiber<'a>(
704     store: &mut StoreOpaque,
705     fiber: &mut StoreFiber<'a>,
706     result: WasmtimeResume,
707 ) -> Result<WasmtimeComplete, StoreFiberYield> {
708     assert_eq!(store.id(), fiber.id);
709 
710     struct Restore<'a, 'b> {
711         store: &'b mut StoreOpaque,
712         fiber: &'b mut StoreFiber<'a>,
713         state: Option<PriorFiberResumeState>,
714     }
715 
716     impl Drop for Restore<'_, '_> {
717         fn drop(&mut self) {
718             self.fiber.state =
719                 Some(unsafe { self.state.take().unwrap().replace(self.store).into() });
720         }
721     }
722     let result = unsafe {
723         let prev = fiber
724             .state
725             .take()
726             .unwrap()
727             .into_inner()
728             .replace(store, fiber);
729         let restore = Restore {
730             store,
731             fiber,
732             state: Some(prev),
733         };
734         restore.fiber.fiber().unwrap().resume(result)
735     };
736 
737     match &result {
738         // The fiber has finished, so recycle its stack by disposing of the
739         // underlying fiber itself.
740         Ok(_) => {
741             if let Some(stack) = fiber.take_fiber_stack() {
742                 store.deallocate_fiber_stack(stack);
743             }
744         }
745 
746         // The fiber has not yet finished, so it stays as-is.
747         Err(_) => {
748             // If `Err` is returned that means the fiber suspended, so we
749             // propagate that here.
750             //
751             // An additional safety check is performed when leaving this
752             // function to help bolster the guarantees of `unsafe impl Send`
753             // above. Notably this future may get re-polled on a different
754             // thread. Wasmtime's thread-local state points to the stack,
755             // however, meaning that it would be incorrect to leave a pointer in
756             // TLS when this function returns. This function performs a runtime
757             // assert to verify that this is the case, notably that the one TLS
758             // pointer Wasmtime uses is not pointing anywhere within the
759             // stack. If it is then that's a bug indicating that TLS management
760             // in Wasmtime is incorrect.
761             if let Some(range) = fiber.fiber().unwrap().stack().range() {
762                 AsyncWasmCallState::assert_current_state_not_in_range(range);
763             }
764         }
765     }
766 
767     result
768 }
769 
770 /// Create a new `StoreFiber` which runs the specified closure.
771 ///
772 /// # Safety
773 ///
774 /// The returned `StoreFiber<'a>` structure is unconditionally `Send` but the
775 /// send-ness is actually a function of `S`. When `S` is statically known to be
776 /// `Send` then use the safe [`make_fiber`] function.
make_fiber_unchecked<'a, S>( store: &mut S, fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, ) -> Result<StoreFiber<'a>> where S: AsStoreOpaque + ?Sized + 'a,777 pub(crate) unsafe fn make_fiber_unchecked<'a, S>(
778     store: &mut S,
779     fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
780 ) -> Result<StoreFiber<'a>>
781 where
782     S: AsStoreOpaque + ?Sized + 'a,
783 {
784     let opaque = store.as_store_opaque();
785     let engine = opaque.engine().clone();
786     let executor = Executor::new(&engine)?;
787     let id = opaque.id();
788     let stack = opaque.allocate_fiber_stack()?;
789     let track_pkey_context_switch = opaque.has_pkey();
790     let store = &raw mut *store;
791     let fiber = Fiber::new(stack, move |result: WasmtimeResume, suspend| {
792         let future_cx = match result {
793             Ok(cx) => cx,
794             // Cancelled before we started? Just return.
795             Err(_) => return Ok(()),
796         };
797 
798         // SAFETY: This fiber will only be resumed using `resume_fiber`, which
799         // takes a `&mut StoreOpaque` parameter and has given us exclusive
800         // access to the store until we exit or yield it back to the resumer.
801         let store_ref = unsafe { &mut *store };
802 
803         // It should be a guarantee that the store has null pointers here upon
804         // starting a fiber, so now's the time to fill in the pointers now that
805         // the fiber is running and `future_cx` and `suspend` are both in scope.
806         // Note that these pointers are removed when this function returns as
807         // that's when they fall out of scope.
808         let async_state = store_ref.as_store_opaque().fiber_async_state_mut();
809         assert!(async_state.current_suspend.is_none());
810         assert!(async_state.current_future_cx.is_none());
811         async_state.current_suspend = Some(NonNull::from(suspend));
812         async_state.current_future_cx = Some(future_cx);
813 
814         struct ResetCurrentPointersToNull<'a, S>(&'a mut S)
815         where
816             S: AsStoreOpaque + ?Sized;
817 
818         impl<S> Drop for ResetCurrentPointersToNull<'_, S>
819         where
820             S: AsStoreOpaque + ?Sized,
821         {
822             fn drop(&mut self) {
823                 let state = self.0.as_store_opaque().fiber_async_state_mut();
824 
825                 // Double-check that the current suspension isn't null (it
826                 // should be what's in this closure). Note though that we
827                 // can't check `current_future_cx` because it may either be
828                 // here or not be here depending on whether this was
829                 // cancelled or not.
830                 debug_assert!(state.current_suspend.is_some());
831 
832                 state.current_suspend = None;
833                 state.current_future_cx = None;
834             }
835         }
836         let reset = ResetCurrentPointersToNull(store_ref);
837 
838         fun(reset.0)
839     })?;
840     Ok(StoreFiber {
841         state: Some(
842             FiberResumeState {
843                 tls: crate::runtime::vm::AsyncWasmCallState::new(),
844                 mpk: if track_pkey_context_switch {
845                     Some(ProtectionMask::all())
846                 } else {
847                     None
848                 },
849                 stack_limit: usize::MAX,
850                 executor,
851             }
852             .into(),
853         ),
854         engine,
855         id,
856         fiber: Some(RawFiber(fiber).into()),
857     })
858 }
859 
860 /// Safe wrapper around [`make_fiber_unchecked`] which requires that `S` is
861 /// `Send`.
862 #[cfg(feature = "component-model-async")]
make_fiber<'a, S>( store: &mut S, fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a, ) -> Result<StoreFiber<'a>> where S: AsStoreOpaque + Send + ?Sized + 'a,863 pub(crate) fn make_fiber<'a, S>(
864     store: &mut S,
865     fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
866 ) -> Result<StoreFiber<'a>>
867 where
868     S: AsStoreOpaque + Send + ?Sized + 'a,
869 {
870     unsafe { make_fiber_unchecked(store, fun) }
871 }
872 
873 /// Run the specified function on a newly-created fiber and `.await` its
874 /// completion.
on_fiber<S, R>( store: &mut S, func: impl FnOnce(&mut S) -> R + Send + Sync, ) -> Result<R> where S: AsStoreOpaque + ?Sized, R: Send + Sync,875 pub(crate) async fn on_fiber<S, R>(
876     store: &mut S,
877     func: impl FnOnce(&mut S) -> R + Send + Sync,
878 ) -> Result<R>
879 where
880     S: AsStoreOpaque + ?Sized,
881     R: Send + Sync,
882 {
883     let opaque = store.as_store_opaque();
884     let config = opaque.engine().config();
885     debug_assert!(config.async_stack_size > 0);
886 
887     let mut result = None;
888 
889     // SAFETY: the `StoreFiber` returned by `make_fiber_unchecked` is `Send`
890     // despite we not actually knowing here whether `S` is `Send` or not. That
891     // is safe here, however, because this function is already conditionally
892     // `Send` based on `S`. Additionally `fiber` doesn't escape this function,
893     // so the future-of-this-function is still correctly `Send`-vs-not.
894     let fiber = unsafe {
895         make_fiber_unchecked(store, |store| {
896             result = Some(func(store));
897             Ok(())
898         })?
899     };
900 
901     {
902         let fiber = FiberFuture {
903             store: store.as_store_opaque(),
904             fiber: Some(fiber),
905             #[cfg(feature = "component-model-async")]
906             on_release: OnRelease::ReturnPending,
907         }
908         .await
909         .unwrap();
910 
911         debug_assert!(fiber.is_none());
912     }
913 
914     Ok(result.unwrap())
915 }
916 
917 /// Run the specified fiber until it either suspends with
918 /// `StoreFiberYield::ReleaseStore` or resolves.
919 ///
920 /// This will return `Some` if the fiber suspends with
921 /// `StoreFiberYield::ReleaseStore` or else `None` if it resolves.
922 #[cfg(feature = "component-model-async")]
resolve_or_release<'a>( store: &mut StoreOpaque, fiber: StoreFiber<'a>, ) -> Result<Option<StoreFiber<'a>>>923 pub(crate) async fn resolve_or_release<'a>(
924     store: &mut StoreOpaque,
925     fiber: StoreFiber<'a>,
926 ) -> Result<Option<StoreFiber<'a>>> {
927     FiberFuture {
928         store,
929         fiber: Some(fiber),
930         on_release: OnRelease::ReturnReady,
931     }
932     .await
933 }
934 
935 /// Tells a `FiberFuture` what to do if `poll_fiber` returns
936 /// `Err(StoreFiberYield::ReleaseStore)`.
937 #[cfg(feature = "component-model-async")]
938 enum OnRelease {
939     /// Return `Poll::Pending` from `FiberFuture::poll`
940     ReturnPending,
941     /// Return `Poll::Ready` from `FiberFuture::poll`, handing ownership of the
942     /// `StoreFiber` to the caller.
943     ReturnReady,
944 }
945 
946 /// A `Future` implementation for running a `StoreFiber` to completion, giving
947 /// it exclusive access to its store until it resolves.
948 struct FiberFuture<'a, 'b> {
949     store: &'a mut StoreOpaque,
950     fiber: Option<StoreFiber<'b>>,
951     #[cfg(feature = "component-model-async")]
952     on_release: OnRelease,
953 }
954 
955 impl<'b> Future for FiberFuture<'_, 'b> {
956     type Output = Result<Option<StoreFiber<'b>>>;
957 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>958     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
959         let me = self.get_mut();
960 
961         // SAFETY: We need to carry over this `cx` into our fiber's runtime for
962         // when it tries to poll sub-futures that are created. Doing this must
963         // be done unsafely, however, since `cx` is only alive for this one
964         // singular function call. Here we do a `transmute` to extend the
965         // lifetime of `Context` so it can be stored in our `Store`, and then we
966         // replace the current polling context with this one.
967         //
968         // The safety of this extension relies on never actually using
969         // `Context<'static>` with `'static` actually there, which should be
970         // satisfied by the users of this in the `BlockingContext` structure
971         // where the lifetime parameters there are always more constrained than
972         // they are here.
973         let cx: &mut Context<'static> = unsafe { change_context_lifetime(cx) };
974         let cx = NonNull::from(cx);
975 
976         match resume_fiber(me.store, me.fiber.as_mut().unwrap(), Ok(cx)) {
977             Ok(Ok(())) => Poll::Ready(Ok(None)),
978             Ok(Err(e)) => Poll::Ready(Err(e)),
979             Err(StoreFiberYield::KeepStore) => Poll::Pending,
980             #[cfg(feature = "component-model-async")]
981             Err(StoreFiberYield::ReleaseStore) => match &me.on_release {
982                 OnRelease::ReturnPending => Poll::Pending,
983                 OnRelease::ReturnReady => Poll::Ready(Ok(me.fiber.take())),
984             },
985         }
986     }
987 }
988 
989 impl Drop for FiberFuture<'_, '_> {
drop(&mut self)990     fn drop(&mut self) {
991         if let Some(fiber) = &mut self.fiber {
992             fiber.dispose(self.store);
993         }
994     }
995 }
996 
997 /// Changes the lifetime `'l` in `Context<'l>` to something else.
998 ///
999 /// # Safety
1000 ///
1001 /// Not a safe operation. Requires external knowledge about how the pointer is
1002 /// being used to determine whether it's actually safe or not. See docs on
1003 /// callers of this function. The purpose of this is to scope the `transmute` to
1004 /// as small an operation as possible.
change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b>1005 unsafe fn change_context_lifetime<'a, 'b>(cx: &'a mut Context<'_>) -> &'a mut Context<'b> {
1006     // SAFETY: See the function documentation, this is not safe in general.
1007     unsafe { mem::transmute::<&mut Context<'_>, &mut Context<'b>>(cx) }
1008 }
1009