1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll, Waker};
7 use wasmtime::*;
8 
async_store() -> Store<()>9 fn async_store() -> Store<()> {
10     Store::new(&Engine::default(), ())
11 }
12 
run_smoke_test(store: &mut Store<()>, func: Func)13 async fn run_smoke_test(store: &mut Store<()>, func: Func) {
14     func.call_async(&mut *store, &[], &mut []).await.unwrap();
15     func.call_async(&mut *store, &[], &mut []).await.unwrap();
16 }
17 
run_smoke_typed_test(store: &mut Store<()>, func: Func)18 async fn run_smoke_typed_test(store: &mut Store<()>, func: Func) {
19     let func = func.typed::<(), ()>(&store).unwrap();
20     func.call_async(&mut *store, ()).await.unwrap();
21     func.call_async(&mut *store, ()).await.unwrap();
22 }
23 
24 #[tokio::test]
smoke()25 async fn smoke() {
26     let mut store = async_store();
27     let func_ty = FuncType::new(store.engine(), None, None);
28     let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
29         Box::new(async { Ok(()) })
30     });
31     run_smoke_test(&mut store, func).await;
32     run_smoke_typed_test(&mut store, func).await;
33 
34     let func = Func::wrap_async(&mut store, move |_caller, _: ()| Box::new(async { Ok(()) }));
35     run_smoke_test(&mut store, func).await;
36     run_smoke_typed_test(&mut store, func).await;
37 }
38 
39 #[tokio::test]
smoke_host_func() -> Result<()>40 async fn smoke_host_func() -> Result<()> {
41     let mut store = async_store();
42     let mut linker = Linker::new(store.engine());
43 
44     linker.func_new_async(
45         "",
46         "first",
47         FuncType::new(store.engine(), None, None),
48         move |_caller, _params, _results| Box::new(async { Ok(()) }),
49     )?;
50 
51     linker.func_wrap_async("", "second", move |_caller, _: ()| {
52         Box::new(async { Ok(()) })
53     })?;
54 
55     let func = linker
56         .get(&mut store, "", "first")
57         .unwrap()
58         .into_func()
59         .unwrap();
60     run_smoke_test(&mut store, func).await;
61     run_smoke_typed_test(&mut store, func).await;
62 
63     let func = linker
64         .get(&mut store, "", "second")
65         .unwrap()
66         .into_func()
67         .unwrap();
68     run_smoke_test(&mut store, func).await;
69     run_smoke_typed_test(&mut store, func).await;
70 
71     Ok(())
72 }
73 
74 #[tokio::test]
smoke_with_suspension()75 async fn smoke_with_suspension() {
76     let mut store = async_store();
77     let func_ty = FuncType::new(store.engine(), None, None);
78     let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
79         Box::new(async {
80             tokio::task::yield_now().await;
81             Ok(())
82         })
83     });
84     run_smoke_test(&mut store, func).await;
85     run_smoke_typed_test(&mut store, func).await;
86 
87     let func = Func::wrap_async(&mut store, move |_caller, _: ()| {
88         Box::new(async {
89             tokio::task::yield_now().await;
90             Ok(())
91         })
92     });
93     run_smoke_test(&mut store, func).await;
94     run_smoke_typed_test(&mut store, func).await;
95 }
96 
97 #[tokio::test]
smoke_host_func_with_suspension() -> Result<()>98 async fn smoke_host_func_with_suspension() -> Result<()> {
99     let mut store = async_store();
100     let mut linker = Linker::new(store.engine());
101 
102     linker.func_new_async(
103         "",
104         "first",
105         FuncType::new(store.engine(), None, None),
106         move |_caller, _params, _results| {
107             Box::new(async {
108                 tokio::task::yield_now().await;
109                 Ok(())
110             })
111         },
112     )?;
113 
114     linker.func_wrap_async("", "second", move |_caller, _: ()| {
115         Box::new(async {
116             tokio::task::yield_now().await;
117             Ok(())
118         })
119     })?;
120 
121     let func = linker
122         .get(&mut store, "", "first")
123         .unwrap()
124         .into_func()
125         .unwrap();
126     run_smoke_test(&mut store, func).await;
127     run_smoke_typed_test(&mut store, func).await;
128 
129     let func = linker
130         .get(&mut store, "", "second")
131         .unwrap()
132         .into_func()
133         .unwrap();
134     run_smoke_test(&mut store, func).await;
135     run_smoke_typed_test(&mut store, func).await;
136 
137     Ok(())
138 }
139 
140 #[tokio::test]
141 #[cfg_attr(miri, ignore)]
recursive_call()142 async fn recursive_call() {
143     let mut store = async_store();
144     let func_ty = FuncType::new(store.engine(), None, None);
145     let async_wasm_func = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
146         Box::new(async {
147             tokio::task::yield_now().await;
148             Ok(())
149         })
150     });
151 
152     // Create an imported function which recursively invokes another wasm
153     // function asynchronously, although this one is just our own host function
154     // which suffices for this test.
155     let func_ty = FuncType::new(store.engine(), None, None);
156     let func2 = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
157         Box::new(async move {
158             async_wasm_func
159                 .call_async(&mut caller, &[], &mut [])
160                 .await?;
161             Ok(())
162         })
163     });
164 
165     // Create an instance which calls an async import twice.
166     let module = Module::new(
167         store.engine(),
168         "
169             (module
170                 (import \"\" \"\" (func))
171                 (func (export \"\")
172                     ;; call imported function which recursively does an async
173                     ;; call
174                     call 0
175                     ;; do it again, and our various pointers all better align
176                     call 0))
177         ",
178     )
179     .unwrap();
180 
181     let instance = Instance::new_async(&mut store, &module, &[func2.into()])
182         .await
183         .unwrap();
184     let func = instance.get_func(&mut store, "").unwrap();
185     func.call_async(&mut store, &[], &mut []).await.unwrap();
186 }
187 
188 #[tokio::test]
189 #[cfg_attr(miri, ignore)]
suspend_while_suspending()190 async fn suspend_while_suspending() {
191     let mut store = async_store();
192 
193     // Create a synchronous function which calls our asynchronous function and
194     // runs it locally. This shouldn't generally happen but we know everything
195     // is synchronous in this test so it's fine for us to do this.
196     //
197     // The purpose of this test is intended to stress various cases in how
198     // we manage pointers in ways that are not necessarily common but are still
199     // possible in safe code.
200     let func_ty = FuncType::new(store.engine(), None, None);
201     let async_thunk = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
202         Box::new(async { Ok(()) })
203     });
204     let func_ty = FuncType::new(store.engine(), None, None);
205     let sync_call_async_thunk =
206         Func::new(&mut store, func_ty, move |mut caller, _params, _results| {
207             let mut future = Box::pin(async_thunk.call_async(&mut caller, &[], &mut []));
208             let poll = future
209                 .as_mut()
210                 .poll(&mut Context::from_waker(Waker::noop()));
211             assert!(poll.is_ready());
212             Ok(())
213         });
214 
215     // A small async function that simply awaits once to pump the loops and
216     // then finishes.
217     let func_ty = FuncType::new(store.engine(), None, None);
218     let async_import = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
219         Box::new(async move {
220             tokio::task::yield_now().await;
221             Ok(())
222         })
223     });
224 
225     let module = Module::new(
226         store.engine(),
227         "
228             (module
229                 (import \"\" \"\" (func $sync_call_async_thunk))
230                 (import \"\" \"\" (func $async_import))
231                 (func (export \"\")
232                     ;; Set some store-local state and pointers
233                     call $sync_call_async_thunk
234                     ;; .. and hopefully it's all still configured correctly
235                     call $async_import))
236         ",
237     )
238     .unwrap();
239     let instance = Instance::new_async(
240         &mut store,
241         &module,
242         &[sync_call_async_thunk.into(), async_import.into()],
243     )
244     .await
245     .unwrap();
246     let func = instance.get_func(&mut store, "").unwrap();
247     func.call_async(&mut store, &[], &mut []).await.unwrap();
248 }
249 
250 #[tokio::test]
cancel_during_run()251 async fn cancel_during_run() {
252     let mut store = Store::new(&Engine::default(), 0);
253 
254     let func_ty = FuncType::new(store.engine(), None, None);
255     let async_thunk = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
256         assert_eq!(*caller.data(), 0);
257         *caller.data_mut() = 1;
258         let dtor = SetOnDrop(caller);
259         Box::new(async move {
260             // SetOnDrop is not destroyed when dropping the reference of it
261             // here. Instead, it is moved into the future where it's forced
262             // to live in and will be destroyed at the end of the future.
263             let _ = &dtor;
264             tokio::task::yield_now().await;
265             Ok(())
266         })
267     });
268     // Shouldn't have called anything yet...
269     assert_eq!(*store.data(), 0);
270 
271     // Create our future, but as per async conventions this still doesn't
272     // actually do anything. No wasm or host function has been called yet.
273     let future = Box::pin(async_thunk.call_async(&mut store, &[], &mut []));
274 
275     // Push the future forward one tick, which actually runs the host code in
276     // our async func. Our future is designed to be pending once, however.
277     let future = PollOnce::new(future).await;
278 
279     // Now that our future is running (on a separate, now-suspended fiber), drop
280     // the future and that should deallocate all the Rust bits as well.
281     drop(future);
282     assert_eq!(*store.data(), 2);
283 
284     struct SetOnDrop<'a>(Caller<'a, usize>);
285 
286     impl Drop for SetOnDrop<'_> {
287         fn drop(&mut self) {
288             assert_eq!(*self.0.data(), 1);
289             *self.0.data_mut() = 2;
290         }
291     }
292 }
293 
294 #[tokio::test]
295 #[cfg_attr(miri, ignore)]
iloop_with_fuel()296 async fn iloop_with_fuel() {
297     let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
298     let mut store = Store::new(&engine, ());
299     store.set_fuel(10_000).unwrap();
300     store.fuel_async_yield_interval(Some(100)).unwrap();
301     let module = Module::new(
302         &engine,
303         "
304             (module
305                 (func (loop br 0))
306                 (start 0)
307             )
308         ",
309     )
310     .unwrap();
311     let instance = Instance::new_async(&mut store, &module, &[]);
312 
313     // This should yield a bunch of times but eventually finish
314     let (_, pending) = CountPending::new(Box::pin(instance)).await;
315     assert_eq!(pending, 99);
316 }
317 
318 #[tokio::test]
319 #[cfg_attr(miri, ignore)]
fuel_eventually_finishes()320 async fn fuel_eventually_finishes() {
321     let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
322     let mut store = Store::new(&engine, ());
323     store.set_fuel(u64::MAX).unwrap();
324     store.fuel_async_yield_interval(Some(10)).unwrap();
325     let module = Module::new(
326         &engine,
327         "
328             (module
329                 (func
330                     (local i32)
331                     i32.const 100
332                     local.set 0
333                     (loop
334                         local.get 0
335                         i32.const -1
336                         i32.add
337                         local.tee 0
338                         br_if 0)
339                 )
340                 (start 0)
341             )
342         ",
343     )
344     .unwrap();
345     let instance = Instance::new_async(&mut store, &module, &[]);
346     instance.await.unwrap();
347 }
348 
349 #[tokio::test]
async_with_pooling_stacks()350 async fn async_with_pooling_stacks() {
351     let mut pool = crate::small_pool_config();
352     pool.total_stacks(1)
353         .max_memory_size(1 << 16)
354         .table_elements(0);
355     let mut config = Config::new();
356     config.allocation_strategy(InstanceAllocationStrategy::Pooling(pool));
357     config.memory_guard_size(0);
358     config.memory_reservation(1 << 16);
359 
360     let engine = Engine::new(&config).unwrap();
361     let mut store = Store::new(&engine, ());
362     let func_ty = FuncType::new(store.engine(), None, None);
363     let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
364         Box::new(async { Ok(()) })
365     });
366 
367     run_smoke_test(&mut store, func).await;
368     run_smoke_typed_test(&mut store, func).await;
369 }
370 
371 #[tokio::test]
async_host_func_with_pooling_stacks() -> Result<()>372 async fn async_host_func_with_pooling_stacks() -> Result<()> {
373     let mut pooling = crate::small_pool_config();
374     pooling
375         .total_stacks(1)
376         .max_memory_size(1 << 16)
377         .table_elements(0);
378     let mut config = Config::new();
379     config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
380     config.memory_guard_size(0);
381     config.memory_reservation(1 << 16);
382 
383     let mut store = Store::new(&Engine::new(&config)?, ());
384     let mut linker = Linker::new(store.engine());
385     linker.func_new_async(
386         "",
387         "",
388         FuncType::new(store.engine(), None, None),
389         move |_caller, _params, _results| Box::new(async { Ok(()) }),
390     )?;
391 
392     let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap();
393     run_smoke_test(&mut store, func).await;
394     run_smoke_typed_test(&mut store, func).await;
395     Ok(())
396 }
397 
398 #[tokio::test]
399 #[cfg_attr(miri, ignore)]
async_mpk_protection() -> Result<()>400 async fn async_mpk_protection() -> Result<()> {
401     let _ = env_logger::try_init();
402 
403     // Construct a pool with MPK protection enabled; note that the MPK
404     // protection is configured in `small_pool_config`.
405     let mut pooling = crate::small_pool_config();
406     pooling
407         .total_memories(10)
408         .total_stacks(2)
409         .max_memory_size(1 << 16)
410         .table_elements(0);
411     let mut config = Config::new();
412     config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
413     config.memory_reservation(1 << 26);
414     config.epoch_interruption(true);
415     let engine = Engine::new(&config)?;
416 
417     // Craft a module that loops for several iterations and checks whether it
418     // has access to its memory range (0x0-0x10000).
419     const WAT: &str = "
420     (module
421         (func $start
422             (local $i i32)
423             (local.set $i (i32.const 3))
424             (loop $cont
425                 (drop (i32.load (i32.const 0)))
426                 (drop (i32.load (i32.const 0xfffc)))
427                 (br_if $cont (local.tee $i (i32.sub (local.get $i) (i32.const 1))))))
428         (memory 1)
429         (start $start))
430     ";
431 
432     // Start two instances of the module in separate fibers, `a` and `b`.
433     async fn run_instance(engine: &Engine, name: &str) -> Instance {
434         let mut store = Store::new(&engine, ());
435         store.set_epoch_deadline(0);
436         store.epoch_deadline_async_yield_and_update(0);
437         let module = Module::new(store.engine(), WAT).unwrap();
438         println!("[{name}] building instance");
439         Instance::new_async(&mut store, &module, &[]).await.unwrap()
440     }
441     let mut a = Box::pin(run_instance(&engine, "a"));
442     let mut b = Box::pin(run_instance(&engine, "b"));
443 
444     // Alternately poll each instance until completion. This should exercise
445     // fiber suspensions requiring the `Store` to appropriately save and restore
446     // the PKRU context between suspensions (see `AsyncCx::block_on`).
447     for i in 0..10 {
448         if i % 2 == 0 {
449             match PollOnce::new(a).await {
450                 Ok(_) => {
451                     println!("[a] done");
452                     break;
453                 }
454                 Err(a_) => {
455                     println!("[a] not done");
456                     a = a_;
457                 }
458             }
459         } else {
460             match PollOnce::new(b).await {
461                 Ok(_) => {
462                     println!("[b] done");
463                     break;
464                 }
465                 Err(b_) => {
466                     println!("[b] not done");
467                     b = b_;
468                 }
469             }
470         }
471     }
472 
473     Ok(())
474 }
475 
476 /// This will execute the `future` provided to completion and each invocation of
477 /// `poll` for the future will be executed on a separate thread.
execute_across_threads<F>(future: F) -> F::Output where F: Future + Send + 'static, F::Output: Send,478 pub async fn execute_across_threads<F>(future: F) -> F::Output
479 where
480     F: Future + Send + 'static,
481     F::Output: Send,
482 {
483     let mut future = Box::pin(future);
484     loop {
485         let once = PollOnce::new(future);
486         let handle = tokio::runtime::Handle::current();
487         let result = std::thread::spawn(move || handle.block_on(once))
488             .join()
489             .unwrap();
490         match result {
491             Ok(val) => break val,
492             Err(f) => future = f,
493         }
494     }
495 }
496 
497 #[tokio::test]
498 #[cfg_attr(miri, ignore)]
resume_separate_thread()499 async fn resume_separate_thread() {
500     // This test will poll the following future on two threads. Simulating a
501     // trap requires accessing TLS info, so that should be preserved correctly.
502     execute_across_threads(async {
503         let mut store = async_store();
504         let module = Module::new(
505             store.engine(),
506             "
507             (module
508                 (import \"\" \"\" (func))
509                 (start 0)
510             )
511             ",
512         )
513         .unwrap();
514         let func = Func::wrap_async(&mut store, |_, _: ()| {
515             Box::new(async {
516                 tokio::task::yield_now().await;
517                 Err::<(), _>(format_err!("test"))
518             })
519         });
520         let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
521         assert!(result.is_err());
522     })
523     .await;
524 }
525 
526 #[tokio::test]
527 #[cfg_attr(miri, ignore)]
resume_separate_thread2()528 async fn resume_separate_thread2() {
529     // This test will poll the following future on two threads. Catching a
530     // signal requires looking up TLS information to determine whether it's a
531     // trap to handle or not, so that must be preserved correctly across threads.
532     execute_across_threads(async {
533         let mut store = async_store();
534         let module = Module::new(
535             store.engine(),
536             "
537             (module
538                 (import \"\" \"\" (func))
539                 (func $start
540                     call 0
541                     unreachable)
542                 (start $start)
543             )
544             ",
545         )
546         .unwrap();
547         let func = Func::wrap_async(&mut store, |_, _: ()| {
548             Box::new(async {
549                 tokio::task::yield_now().await;
550             })
551         });
552         let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
553         assert!(result.is_err());
554     })
555     .await;
556 }
557 
558 #[tokio::test]
559 #[cfg_attr(miri, ignore)]
resume_separate_thread3()560 async fn resume_separate_thread3() {
561     let _ = env_logger::try_init();
562 
563     // This test doesn't actually do anything with cross-thread polls, but
564     // instead it deals with scheduling futures at "odd" times.
565     //
566     // First we'll set up a *synchronous* call which will initialize TLS info.
567     // This call is simply to a host-defined function, but it still has the same
568     // "enter into wasm" semantics since it's just calling a trampoline. In this
569     // situation we'll set up the TLS info so it's in place while the body of
570     // the function executes...
571     let mut store = Store::new(&Engine::default(), None);
572     let f = Func::wrap(&mut store, move |mut caller: Caller<'_, _>| -> Result<()> {
573         // ... and the execution of this host-defined function (while the TLS
574         // info is initialized), will set up a recursive call into wasm. This
575         // recursive call will be done asynchronously so we can suspend it
576         // halfway through.
577         let f = async {
578             let mut store = async_store();
579             let module = Module::new(
580                 store.engine(),
581                 "
582                     (module
583                         (import \"\" \"\" (func))
584                         (start 0)
585                     )
586                 ",
587             )
588             .unwrap();
589             let func = Func::wrap_async(&mut store, |_, _: ()| {
590                 Box::new(async {
591                     tokio::task::yield_now().await;
592                 })
593             });
594             drop(Instance::new_async(&mut store, &module, &[func.into()]).await);
595             unreachable!()
596         };
597         let mut future = Box::pin(f);
598         let poll = future
599             .as_mut()
600             .poll(&mut Context::from_waker(Waker::noop()));
601         assert!(poll.is_pending());
602 
603         // ... so at this point our call into wasm is suspended. The call into
604         // wasm will have overwritten TLS info, and we sure hope that the
605         // information is restored at this point. Note that we squirrel away the
606         // future somewhere else to get dropped later. If we were to drop it
607         // here then we would reenter the future's suspended stack to clean it
608         // up, which would do more alterations of TLS information we're not
609         // testing here.
610         *caller.data_mut() = Some(future);
611 
612         // ... all in all this function will need access to the original TLS
613         // information to raise the trap. This TLS information should be
614         // restored even though the asynchronous execution is suspended.
615         bail!("")
616     });
617     assert!(f.call(&mut store, &[], &mut []).is_err());
618 }
619 
620 #[tokio::test]
621 #[cfg_attr(miri, ignore)]
resume_separate_thread_tls()622 async fn resume_separate_thread_tls() {
623     static COUNTER: AtomicUsize = AtomicUsize::new(0);
624 
625     struct IncOnDrop;
626     impl Drop for IncOnDrop {
627         fn drop(&mut self) {
628             COUNTER.fetch_add(1, Ordering::SeqCst);
629         }
630     }
631 
632     thread_local!(static FOO: IncOnDrop = IncOnDrop);
633 
634     // This test will poll the following future on two threads.
635     // We verify that TLS destructors are run correctly.
636     execute_across_threads(async move {
637         let mut store = async_store();
638         let module = Module::new(
639             store.engine(),
640             "
641             (module
642                 (import \"\" \"\" (func))
643                 (start 0)
644             )
645             ",
646         )
647         .unwrap();
648         let func = Func::wrap_async(&mut store, |_, _: ()| {
649             Box::new(async {
650                 tokio::task::yield_now().await;
651                 FOO.with(|_f| {});
652                 Err::<(), _>(format_err!("test"))
653             })
654         });
655         let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
656         assert!(result.is_err());
657     })
658     .await;
659 
660     assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
661 }
662 
663 #[tokio::test]
664 #[cfg_attr(miri, ignore)]
recursive_async() -> Result<()>665 async fn recursive_async() -> Result<()> {
666     let _ = env_logger::try_init();
667     let mut store = async_store();
668     let m = Module::new(
669         store.engine(),
670         "(module
671             (func (export \"overflow\") call 0)
672             (func (export \"normal\"))
673         )",
674     )?;
675     let i = Instance::new_async(&mut store, &m, &[]).await?;
676     let overflow = i.get_typed_func::<(), ()>(&mut store, "overflow")?;
677     let normal = i.get_typed_func::<(), ()>(&mut store, "normal")?;
678     let f2 = Func::wrap_async(&mut store, move |mut caller, _: ()| {
679         let normal = normal.clone();
680         let overflow = overflow.clone();
681         Box::new(async move {
682             // recursive async calls shouldn't immediately stack overflow...
683             normal.call_async(&mut caller, ()).await?;
684 
685             // ... but calls that actually stack overflow should indeed stack
686             // overflow
687             let err = overflow
688                 .call_async(&mut caller, ())
689                 .await
690                 .unwrap_err()
691                 .downcast::<Trap>()?;
692             assert_eq!(err, Trap::StackOverflow);
693             Ok(())
694         })
695     });
696     f2.call_async(&mut store, &[], &mut []).await?;
697     Ok(())
698 }
699 
700 #[tokio::test]
701 #[cfg_attr(miri, ignore)]
linker_module_command() -> Result<()>702 async fn linker_module_command() -> Result<()> {
703     let mut store = async_store();
704     let mut linker = Linker::new(store.engine());
705 
706     let module1 = Module::new(
707         store.engine(),
708         r#"
709             (module
710                 (global $g (mut i32) (i32.const 0))
711 
712                 (func (export "_start"))
713 
714                 (func (export "g") (result i32)
715                     global.get $g
716                     i32.const 1
717                     global.set $g)
718             )
719         "#,
720     )?;
721 
722     let module2 = Module::new(
723         store.engine(),
724         r#"
725             (module
726                 (import "" "g" (func (result i32)))
727 
728                 (func (export "get") (result i32)
729                     call 0)
730             )
731         "#,
732     )?;
733 
734     linker.module_async(&mut store, "", &module1).await?;
735     let instance = linker.instantiate_async(&mut store, &module2).await?;
736     let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
737     assert_eq!(f.call_async(&mut store, ()).await?, 0);
738     assert_eq!(f.call_async(&mut store, ()).await?, 0);
739 
740     Ok(())
741 }
742 
743 #[tokio::test]
744 #[cfg_attr(miri, ignore)]
linker_module_reactor() -> Result<()>745 async fn linker_module_reactor() -> Result<()> {
746     let mut store = async_store();
747     let mut linker = Linker::new(store.engine());
748     let module1 = Module::new(
749         store.engine(),
750         r#"
751             (module
752                 (global $g (mut i32) (i32.const 0))
753 
754                 (func (export "g") (result i32)
755                     global.get $g
756                     i32.const 1
757                     global.set $g)
758             )
759         "#,
760     )?;
761     let module2 = Module::new(
762         store.engine(),
763         r#"
764             (module
765                 (import "" "g" (func (result i32)))
766 
767                 (func (export "get") (result i32)
768                     call 0)
769             )
770         "#,
771     )?;
772 
773     linker.module_async(&mut store, "", &module1).await?;
774     let instance = linker.instantiate_async(&mut store, &module2).await?;
775     let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
776     assert_eq!(f.call_async(&mut store, ()).await?, 0);
777     assert_eq!(f.call_async(&mut store, ()).await?, 1);
778 
779     Ok(())
780 }
781 
782 pub struct CountPending<F> {
783     future: F,
784     yields: usize,
785 }
786 
787 impl<F> CountPending<F> {
new(future: F) -> CountPending<F>788     pub fn new(future: F) -> CountPending<F> {
789         CountPending { future, yields: 0 }
790     }
791 }
792 
793 impl<F> Future for CountPending<F>
794 where
795     F: Future + Unpin,
796 {
797     type Output = (F::Output, usize);
798 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>799     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
800         match Pin::new(&mut self.future).poll(cx) {
801             Poll::Pending => {
802                 self.yields += 1;
803                 Poll::Pending
804             }
805             Poll::Ready(e) => Poll::Ready((e, self.yields)),
806         }
807     }
808 }
809 
810 pub struct PollOnce<F>(Option<F>);
811 
812 impl<F> PollOnce<F> {
new(future: F) -> PollOnce<F>813     pub fn new(future: F) -> PollOnce<F> {
814         PollOnce(Some(future))
815     }
816 }
817 
818 impl<F> Future for PollOnce<F>
819 where
820     F: Future + Unpin,
821 {
822     type Output = Result<F::Output, F>;
823 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>824     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
825         let mut future = self.0.take().unwrap();
826         match Pin::new(&mut future).poll(cx) {
827             Poll::Pending => Poll::Ready(Err(future)),
828             Poll::Ready(val) => Poll::Ready(Ok(val)),
829         }
830     }
831 }
832 
833 #[tokio::test]
834 #[cfg_attr(miri, ignore)]
non_stacky_async_activations() -> Result<()>835 async fn non_stacky_async_activations() -> Result<()> {
836     let engine = Engine::default();
837     let mut store1: Store<Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>> =
838         Store::new(&engine, None);
839     let mut linker1 = Linker::new(&engine);
840 
841     let module1 = Module::new(
842         &engine,
843         r#"
844             (module $m1
845                 (import "" "host_capture_stack" (func $host_capture_stack))
846                 (import "" "start_async_instance" (func $start_async_instance))
847                 (func $capture_stack (export "capture_stack")
848                     call $host_capture_stack
849                 )
850                 (func $run_sync (export "run_sync")
851                     call $start_async_instance
852                 )
853             )
854         "#,
855     )?;
856 
857     let module2 = Module::new(
858         &engine,
859         r#"
860             (module $m2
861                 (import "" "yield" (func $yield))
862 
863                 (func $run_async (export "run_async")
864                     call $yield
865                 )
866             )
867         "#,
868     )?;
869 
870     let stacks = Arc::new(Mutex::new(vec![]));
871     fn capture_stack(stacks: &Arc<Mutex<Vec<WasmBacktrace>>>, store: impl AsContext) {
872         let mut stacks = stacks.lock().unwrap();
873         stacks.push(wasmtime::WasmBacktrace::force_capture(store));
874     }
875 
876     linker1.func_wrap_async("", "host_capture_stack", {
877         let stacks = stacks.clone();
878         move |caller, _: ()| {
879             capture_stack(&stacks, &caller);
880             Box::new(async { Ok(()) })
881         }
882     })?;
883 
884     linker1.func_wrap_async("", "start_async_instance", {
885         let stacks = stacks.clone();
886         move |mut caller, _: ()| {
887             let stacks = stacks.clone();
888             capture_stack(&stacks, &caller);
889 
890             let module2 = module2.clone();
891             let mut store2 = Store::new(caller.engine(), ());
892             let mut linker2 = Linker::<()>::new(caller.engine());
893             linker2
894                 .func_wrap_async("", "yield", {
895                     let stacks = stacks.clone();
896                     move |caller, _: ()| {
897                         let stacks = stacks.clone();
898                         Box::new(async move {
899                             capture_stack(&stacks, &caller);
900                             tokio::task::yield_now().await;
901                             capture_stack(&stacks, &caller);
902                             Ok(())
903                         })
904                     }
905                 })
906                 .unwrap();
907 
908             Box::new(async move {
909                 let future = PollOnce::new(Box::pin({
910                     let stacks = stacks.clone();
911                     async move {
912                         let instance2 = linker2.instantiate_async(&mut store2, &module2).await?;
913 
914                         instance2
915                             .get_func(&mut store2, "run_async")
916                             .unwrap()
917                             .call_async(&mut store2, &[], &mut [])
918                             .await?;
919 
920                         capture_stack(&stacks, &store2);
921                         wasmtime::error::Ok(())
922                     }
923                 }) as _)
924                 .await
925                 .err()
926                 .unwrap();
927                 capture_stack(&stacks, &caller);
928                 *caller.data_mut() = Some(future);
929                 Ok(())
930             })
931         }
932     })?;
933 
934     let instance1 = linker1.instantiate_async(&mut store1, &module1).await?;
935     instance1
936         .get_typed_func::<(), ()>(&mut store1, "run_sync")?
937         .call_async(&mut store1, ())
938         .await?;
939     let future = store1.data_mut().take().unwrap();
940     future.await?;
941 
942     instance1
943         .get_typed_func::<(), ()>(&mut store1, "capture_stack")?
944         .call_async(&mut store1, ())
945         .await?;
946 
947     let stacks = stacks.lock().unwrap();
948     eprintln!("stacks = {stacks:#?}");
949 
950     assert_eq!(stacks.len(), 6);
951     for (actual, expected) in stacks.iter().zip(vec![
952         vec!["run_sync"],
953         vec!["run_async"],
954         vec!["run_sync"],
955         vec!["run_async"],
956         vec![],
957         vec!["capture_stack"],
958     ]) {
959         eprintln!("expected = {expected:?}");
960         eprintln!("actual = {actual:?}");
961         assert_eq!(actual.frames().len(), expected.len());
962         for (actual, expected) in actual.frames().iter().zip(expected) {
963             assert_eq!(actual.func_name(), Some(expected));
964         }
965     }
966 
967     Ok(())
968 }
969 
970 #[tokio::test]
971 #[cfg_attr(miri, ignore)]
gc_preserves_externref_on_historical_async_stacks() -> Result<()>972 async fn gc_preserves_externref_on_historical_async_stacks() -> Result<()> {
973     let _ = env_logger::try_init();
974 
975     let engine = Engine::default();
976 
977     let module = Module::new(
978         &engine,
979         r#"
980             (module $m1
981                 (import "" "gc" (func $gc))
982                 (import "" "recurse" (func $recurse (param i32)))
983                 (import "" "test" (func $test (param i32 externref)))
984                 (func (export "run") (param i32 externref)
985                     local.get 0
986                     if
987                         local.get 0
988                         i32.const -1
989                         i32.add
990                         call $recurse
991                     else
992                         call $gc
993                     end
994 
995                     local.get 0
996                     local.get 1
997                     call $test
998                 )
999             )
1000         "#,
1001     )?;
1002 
1003     type F = TypedFunc<(i32, Option<Rooted<ExternRef>>), ()>;
1004 
1005     let mut store = Store::new(&engine, None);
1006     let mut linker = Linker::<Option<F>>::new(&engine);
1007     linker.func_wrap_async("", "gc", |mut cx: Caller<'_, _>, ()| {
1008         Box::new(async move { cx.gc_async(None).await })
1009     })?;
1010     linker.func_wrap(
1011         "",
1012         "test",
1013         |cx: Caller<'_, _>, val: i32, handle: Option<Rooted<ExternRef>>| -> Result<()> {
1014             assert_eq!(
1015                 handle.unwrap().data(&cx)?.unwrap().downcast_ref(),
1016                 Some(&val)
1017             );
1018             Ok(())
1019         },
1020     )?;
1021     linker.func_wrap_async(
1022         "",
1023         "recurse",
1024         |mut cx: Caller<'_, Option<F>>, (val,): (i32,)| {
1025             let func = cx.data().clone().unwrap();
1026             Box::new(async move {
1027                 let r = Some(ExternRef::new_async(&mut cx, val).await?);
1028                 Ok(func.call_async(&mut cx, (val, r)).await)
1029             })
1030         },
1031     )?;
1032     let instance = linker.instantiate_async(&mut store, &module).await?;
1033     let func: F = instance.get_typed_func(&mut store, "run")?;
1034     *store.data_mut() = Some(func.clone());
1035 
1036     let r = Some(ExternRef::new_async(&mut store, 5).await?);
1037     func.call_async(&mut store, (5, r)).await?;
1038 
1039     Ok(())
1040 }
1041 
1042 #[tokio::test]
1043 #[cfg_attr(miri, ignore)]
async_gc_with_func_new_and_func_wrap() -> Result<()>1044 async fn async_gc_with_func_new_and_func_wrap() -> Result<()> {
1045     let _ = env_logger::try_init();
1046 
1047     let mut config = Config::new();
1048     config.wasm_gc(true);
1049     let engine = Engine::new(&config)?;
1050 
1051     let module = Module::new(
1052         &engine,
1053         r#"
1054             (module $m1
1055                 (import "" "a" (func $a (result externref structref arrayref)))
1056                 (import "" "b" (func $b (result externref structref arrayref)))
1057 
1058                 (table 2 funcref)
1059                 (elem (i32.const 0) func $a $b)
1060 
1061                 (func (export "a")
1062                     (call $call (i32.const 0))
1063                 )
1064                 (func (export "b")
1065                     (call $call (i32.const 1))
1066                 )
1067 
1068                 (func $call (param i32)
1069                     (local $cnt i32)
1070                     (loop $l
1071                         (call_indirect (result externref structref arrayref) (local.get 0))
1072                         drop
1073                         drop
1074                         drop
1075 
1076                         (local.set $cnt (i32.add (local.get $cnt) (i32.const 1)))
1077 
1078                         (if (i32.lt_u (local.get $cnt) (i32.const 5000))
1079                          (then (br $l)))
1080                     )
1081                 )
1082             )
1083         "#,
1084     )?;
1085 
1086     let mut linker = Linker::<()>::new(&engine);
1087     linker.func_wrap_async("", "a", |mut cx: Caller<'_, _>, ()| {
1088         Box::new(async move {
1089             let externref = ExternRef::new_async(&mut cx, 100).await?;
1090 
1091             let struct_ty = StructType::new(cx.engine(), [])?;
1092             let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1093             let structref = StructRef::new_async(&mut cx, &struct_pre, &[]).await?;
1094 
1095             let array_ty = ArrayType::new(
1096                 cx.engine(),
1097                 FieldType::new(Mutability::Var, ValType::I32.into()),
1098             );
1099             let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1100             let arrayref = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[]).await?;
1101 
1102             Ok((Some(externref), Some(structref), Some(arrayref)))
1103         })
1104     })?;
1105     let ty = FuncType::new(
1106         &engine,
1107         [],
1108         [ValType::EXTERNREF, ValType::STRUCTREF, ValType::ARRAYREF],
1109     );
1110     linker.func_new_async("", "b", ty, |mut cx, _, results| {
1111         Box::new(async move {
1112             results[0] = ExternRef::new_async(&mut cx, 100).await?.into();
1113 
1114             let struct_ty = StructType::new(cx.engine(), [])?;
1115             let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1116             results[1] = StructRef::new_async(&mut cx, &struct_pre, &[])
1117                 .await?
1118                 .into();
1119 
1120             let array_ty = ArrayType::new(
1121                 cx.engine(),
1122                 FieldType::new(Mutability::Var, ValType::I32.into()),
1123             );
1124             let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1125             results[2] = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[])
1126                 .await?
1127                 .into();
1128 
1129             Ok(())
1130         })
1131     })?;
1132 
1133     let mut store = Store::new(&engine, ());
1134     let instance = linker.instantiate_async(&mut store, &module).await?;
1135     let a = instance.get_typed_func::<(), ()>(&mut store, "a")?;
1136     a.call_async(&mut store, ()).await?;
1137 
1138     let mut store = Store::new(&engine, ());
1139     let instance = linker.instantiate_async(&mut store, &module).await?;
1140     let b = instance.get_typed_func::<(), ()>(&mut store, "b")?;
1141     b.call_async(&mut store, ()).await?;
1142 
1143     Ok(())
1144 }
1145