1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering;
5 use std::sync::{Arc, Mutex};
6 use std::task::{Context, Poll, Waker};
7 use wasmtime::*;
8
async_store() -> Store<()>9 fn async_store() -> Store<()> {
10 Store::new(&Engine::default(), ())
11 }
12
run_smoke_test(store: &mut Store<()>, func: Func)13 async fn run_smoke_test(store: &mut Store<()>, func: Func) {
14 func.call_async(&mut *store, &[], &mut []).await.unwrap();
15 func.call_async(&mut *store, &[], &mut []).await.unwrap();
16 }
17
run_smoke_typed_test(store: &mut Store<()>, func: Func)18 async fn run_smoke_typed_test(store: &mut Store<()>, func: Func) {
19 let func = func.typed::<(), ()>(&store).unwrap();
20 func.call_async(&mut *store, ()).await.unwrap();
21 func.call_async(&mut *store, ()).await.unwrap();
22 }
23
24 #[tokio::test]
smoke()25 async fn smoke() {
26 let mut store = async_store();
27 let func_ty = FuncType::new(store.engine(), None, None);
28 let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
29 Box::new(async { Ok(()) })
30 });
31 run_smoke_test(&mut store, func).await;
32 run_smoke_typed_test(&mut store, func).await;
33
34 let func = Func::wrap_async(&mut store, move |_caller, _: ()| Box::new(async { Ok(()) }));
35 run_smoke_test(&mut store, func).await;
36 run_smoke_typed_test(&mut store, func).await;
37 }
38
39 #[tokio::test]
smoke_host_func() -> Result<()>40 async fn smoke_host_func() -> Result<()> {
41 let mut store = async_store();
42 let mut linker = Linker::new(store.engine());
43
44 linker.func_new_async(
45 "",
46 "first",
47 FuncType::new(store.engine(), None, None),
48 move |_caller, _params, _results| Box::new(async { Ok(()) }),
49 )?;
50
51 linker.func_wrap_async("", "second", move |_caller, _: ()| {
52 Box::new(async { Ok(()) })
53 })?;
54
55 let func = linker
56 .get(&mut store, "", "first")
57 .unwrap()
58 .into_func()
59 .unwrap();
60 run_smoke_test(&mut store, func).await;
61 run_smoke_typed_test(&mut store, func).await;
62
63 let func = linker
64 .get(&mut store, "", "second")
65 .unwrap()
66 .into_func()
67 .unwrap();
68 run_smoke_test(&mut store, func).await;
69 run_smoke_typed_test(&mut store, func).await;
70
71 Ok(())
72 }
73
74 #[tokio::test]
smoke_with_suspension()75 async fn smoke_with_suspension() {
76 let mut store = async_store();
77 let func_ty = FuncType::new(store.engine(), None, None);
78 let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
79 Box::new(async {
80 tokio::task::yield_now().await;
81 Ok(())
82 })
83 });
84 run_smoke_test(&mut store, func).await;
85 run_smoke_typed_test(&mut store, func).await;
86
87 let func = Func::wrap_async(&mut store, move |_caller, _: ()| {
88 Box::new(async {
89 tokio::task::yield_now().await;
90 Ok(())
91 })
92 });
93 run_smoke_test(&mut store, func).await;
94 run_smoke_typed_test(&mut store, func).await;
95 }
96
97 #[tokio::test]
smoke_host_func_with_suspension() -> Result<()>98 async fn smoke_host_func_with_suspension() -> Result<()> {
99 let mut store = async_store();
100 let mut linker = Linker::new(store.engine());
101
102 linker.func_new_async(
103 "",
104 "first",
105 FuncType::new(store.engine(), None, None),
106 move |_caller, _params, _results| {
107 Box::new(async {
108 tokio::task::yield_now().await;
109 Ok(())
110 })
111 },
112 )?;
113
114 linker.func_wrap_async("", "second", move |_caller, _: ()| {
115 Box::new(async {
116 tokio::task::yield_now().await;
117 Ok(())
118 })
119 })?;
120
121 let func = linker
122 .get(&mut store, "", "first")
123 .unwrap()
124 .into_func()
125 .unwrap();
126 run_smoke_test(&mut store, func).await;
127 run_smoke_typed_test(&mut store, func).await;
128
129 let func = linker
130 .get(&mut store, "", "second")
131 .unwrap()
132 .into_func()
133 .unwrap();
134 run_smoke_test(&mut store, func).await;
135 run_smoke_typed_test(&mut store, func).await;
136
137 Ok(())
138 }
139
140 #[tokio::test]
141 #[cfg_attr(miri, ignore)]
recursive_call()142 async fn recursive_call() {
143 let mut store = async_store();
144 let func_ty = FuncType::new(store.engine(), None, None);
145 let async_wasm_func = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
146 Box::new(async {
147 tokio::task::yield_now().await;
148 Ok(())
149 })
150 });
151
152 // Create an imported function which recursively invokes another wasm
153 // function asynchronously, although this one is just our own host function
154 // which suffices for this test.
155 let func_ty = FuncType::new(store.engine(), None, None);
156 let func2 = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
157 Box::new(async move {
158 async_wasm_func
159 .call_async(&mut caller, &[], &mut [])
160 .await?;
161 Ok(())
162 })
163 });
164
165 // Create an instance which calls an async import twice.
166 let module = Module::new(
167 store.engine(),
168 "
169 (module
170 (import \"\" \"\" (func))
171 (func (export \"\")
172 ;; call imported function which recursively does an async
173 ;; call
174 call 0
175 ;; do it again, and our various pointers all better align
176 call 0))
177 ",
178 )
179 .unwrap();
180
181 let instance = Instance::new_async(&mut store, &module, &[func2.into()])
182 .await
183 .unwrap();
184 let func = instance.get_func(&mut store, "").unwrap();
185 func.call_async(&mut store, &[], &mut []).await.unwrap();
186 }
187
188 #[tokio::test]
189 #[cfg_attr(miri, ignore)]
suspend_while_suspending()190 async fn suspend_while_suspending() {
191 let mut store = async_store();
192
193 // Create a synchronous function which calls our asynchronous function and
194 // runs it locally. This shouldn't generally happen but we know everything
195 // is synchronous in this test so it's fine for us to do this.
196 //
197 // The purpose of this test is intended to stress various cases in how
198 // we manage pointers in ways that are not necessarily common but are still
199 // possible in safe code.
200 let func_ty = FuncType::new(store.engine(), None, None);
201 let async_thunk = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
202 Box::new(async { Ok(()) })
203 });
204 let func_ty = FuncType::new(store.engine(), None, None);
205 let sync_call_async_thunk =
206 Func::new(&mut store, func_ty, move |mut caller, _params, _results| {
207 let mut future = Box::pin(async_thunk.call_async(&mut caller, &[], &mut []));
208 let poll = future
209 .as_mut()
210 .poll(&mut Context::from_waker(Waker::noop()));
211 assert!(poll.is_ready());
212 Ok(())
213 });
214
215 // A small async function that simply awaits once to pump the loops and
216 // then finishes.
217 let func_ty = FuncType::new(store.engine(), None, None);
218 let async_import = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
219 Box::new(async move {
220 tokio::task::yield_now().await;
221 Ok(())
222 })
223 });
224
225 let module = Module::new(
226 store.engine(),
227 "
228 (module
229 (import \"\" \"\" (func $sync_call_async_thunk))
230 (import \"\" \"\" (func $async_import))
231 (func (export \"\")
232 ;; Set some store-local state and pointers
233 call $sync_call_async_thunk
234 ;; .. and hopefully it's all still configured correctly
235 call $async_import))
236 ",
237 )
238 .unwrap();
239 let instance = Instance::new_async(
240 &mut store,
241 &module,
242 &[sync_call_async_thunk.into(), async_import.into()],
243 )
244 .await
245 .unwrap();
246 let func = instance.get_func(&mut store, "").unwrap();
247 func.call_async(&mut store, &[], &mut []).await.unwrap();
248 }
249
250 #[tokio::test]
cancel_during_run()251 async fn cancel_during_run() {
252 let mut store = Store::new(&Engine::default(), 0);
253
254 let func_ty = FuncType::new(store.engine(), None, None);
255 let async_thunk = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
256 assert_eq!(*caller.data(), 0);
257 *caller.data_mut() = 1;
258 let dtor = SetOnDrop(caller);
259 Box::new(async move {
260 // SetOnDrop is not destroyed when dropping the reference of it
261 // here. Instead, it is moved into the future where it's forced
262 // to live in and will be destroyed at the end of the future.
263 let _ = &dtor;
264 tokio::task::yield_now().await;
265 Ok(())
266 })
267 });
268 // Shouldn't have called anything yet...
269 assert_eq!(*store.data(), 0);
270
271 // Create our future, but as per async conventions this still doesn't
272 // actually do anything. No wasm or host function has been called yet.
273 let future = Box::pin(async_thunk.call_async(&mut store, &[], &mut []));
274
275 // Push the future forward one tick, which actually runs the host code in
276 // our async func. Our future is designed to be pending once, however.
277 let future = PollOnce::new(future).await;
278
279 // Now that our future is running (on a separate, now-suspended fiber), drop
280 // the future and that should deallocate all the Rust bits as well.
281 drop(future);
282 assert_eq!(*store.data(), 2);
283
284 struct SetOnDrop<'a>(Caller<'a, usize>);
285
286 impl Drop for SetOnDrop<'_> {
287 fn drop(&mut self) {
288 assert_eq!(*self.0.data(), 1);
289 *self.0.data_mut() = 2;
290 }
291 }
292 }
293
294 #[tokio::test]
295 #[cfg_attr(miri, ignore)]
iloop_with_fuel()296 async fn iloop_with_fuel() {
297 let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
298 let mut store = Store::new(&engine, ());
299 store.set_fuel(10_000).unwrap();
300 store.fuel_async_yield_interval(Some(100)).unwrap();
301 let module = Module::new(
302 &engine,
303 "
304 (module
305 (func (loop br 0))
306 (start 0)
307 )
308 ",
309 )
310 .unwrap();
311 let instance = Instance::new_async(&mut store, &module, &[]);
312
313 // This should yield a bunch of times but eventually finish
314 let (_, pending) = CountPending::new(Box::pin(instance)).await;
315 assert_eq!(pending, 99);
316 }
317
318 #[tokio::test]
319 #[cfg_attr(miri, ignore)]
fuel_eventually_finishes()320 async fn fuel_eventually_finishes() {
321 let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
322 let mut store = Store::new(&engine, ());
323 store.set_fuel(u64::MAX).unwrap();
324 store.fuel_async_yield_interval(Some(10)).unwrap();
325 let module = Module::new(
326 &engine,
327 "
328 (module
329 (func
330 (local i32)
331 i32.const 100
332 local.set 0
333 (loop
334 local.get 0
335 i32.const -1
336 i32.add
337 local.tee 0
338 br_if 0)
339 )
340 (start 0)
341 )
342 ",
343 )
344 .unwrap();
345 let instance = Instance::new_async(&mut store, &module, &[]);
346 instance.await.unwrap();
347 }
348
349 #[tokio::test]
async_with_pooling_stacks()350 async fn async_with_pooling_stacks() {
351 let mut pool = crate::small_pool_config();
352 pool.total_stacks(1)
353 .max_memory_size(1 << 16)
354 .table_elements(0);
355 let mut config = Config::new();
356 config.allocation_strategy(InstanceAllocationStrategy::Pooling(pool));
357 config.memory_guard_size(0);
358 config.memory_reservation(1 << 16);
359
360 let engine = Engine::new(&config).unwrap();
361 let mut store = Store::new(&engine, ());
362 let func_ty = FuncType::new(store.engine(), None, None);
363 let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
364 Box::new(async { Ok(()) })
365 });
366
367 run_smoke_test(&mut store, func).await;
368 run_smoke_typed_test(&mut store, func).await;
369 }
370
371 #[tokio::test]
async_host_func_with_pooling_stacks() -> Result<()>372 async fn async_host_func_with_pooling_stacks() -> Result<()> {
373 let mut pooling = crate::small_pool_config();
374 pooling
375 .total_stacks(1)
376 .max_memory_size(1 << 16)
377 .table_elements(0);
378 let mut config = Config::new();
379 config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
380 config.memory_guard_size(0);
381 config.memory_reservation(1 << 16);
382
383 let mut store = Store::new(&Engine::new(&config)?, ());
384 let mut linker = Linker::new(store.engine());
385 linker.func_new_async(
386 "",
387 "",
388 FuncType::new(store.engine(), None, None),
389 move |_caller, _params, _results| Box::new(async { Ok(()) }),
390 )?;
391
392 let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap();
393 run_smoke_test(&mut store, func).await;
394 run_smoke_typed_test(&mut store, func).await;
395 Ok(())
396 }
397
398 #[tokio::test]
399 #[cfg_attr(miri, ignore)]
async_mpk_protection() -> Result<()>400 async fn async_mpk_protection() -> Result<()> {
401 let _ = env_logger::try_init();
402
403 // Construct a pool with MPK protection enabled; note that the MPK
404 // protection is configured in `small_pool_config`.
405 let mut pooling = crate::small_pool_config();
406 pooling
407 .total_memories(10)
408 .total_stacks(2)
409 .max_memory_size(1 << 16)
410 .table_elements(0);
411 let mut config = Config::new();
412 config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
413 config.memory_reservation(1 << 26);
414 config.epoch_interruption(true);
415 let engine = Engine::new(&config)?;
416
417 // Craft a module that loops for several iterations and checks whether it
418 // has access to its memory range (0x0-0x10000).
419 const WAT: &str = "
420 (module
421 (func $start
422 (local $i i32)
423 (local.set $i (i32.const 3))
424 (loop $cont
425 (drop (i32.load (i32.const 0)))
426 (drop (i32.load (i32.const 0xfffc)))
427 (br_if $cont (local.tee $i (i32.sub (local.get $i) (i32.const 1))))))
428 (memory 1)
429 (start $start))
430 ";
431
432 // Start two instances of the module in separate fibers, `a` and `b`.
433 async fn run_instance(engine: &Engine, name: &str) -> Instance {
434 let mut store = Store::new(&engine, ());
435 store.set_epoch_deadline(0);
436 store.epoch_deadline_async_yield_and_update(0);
437 let module = Module::new(store.engine(), WAT).unwrap();
438 println!("[{name}] building instance");
439 Instance::new_async(&mut store, &module, &[]).await.unwrap()
440 }
441 let mut a = Box::pin(run_instance(&engine, "a"));
442 let mut b = Box::pin(run_instance(&engine, "b"));
443
444 // Alternately poll each instance until completion. This should exercise
445 // fiber suspensions requiring the `Store` to appropriately save and restore
446 // the PKRU context between suspensions (see `AsyncCx::block_on`).
447 for i in 0..10 {
448 if i % 2 == 0 {
449 match PollOnce::new(a).await {
450 Ok(_) => {
451 println!("[a] done");
452 break;
453 }
454 Err(a_) => {
455 println!("[a] not done");
456 a = a_;
457 }
458 }
459 } else {
460 match PollOnce::new(b).await {
461 Ok(_) => {
462 println!("[b] done");
463 break;
464 }
465 Err(b_) => {
466 println!("[b] not done");
467 b = b_;
468 }
469 }
470 }
471 }
472
473 Ok(())
474 }
475
476 /// This will execute the `future` provided to completion and each invocation of
477 /// `poll` for the future will be executed on a separate thread.
execute_across_threads<F>(future: F) -> F::Output where F: Future + Send + 'static, F::Output: Send,478 pub async fn execute_across_threads<F>(future: F) -> F::Output
479 where
480 F: Future + Send + 'static,
481 F::Output: Send,
482 {
483 let mut future = Box::pin(future);
484 loop {
485 let once = PollOnce::new(future);
486 let handle = tokio::runtime::Handle::current();
487 let result = std::thread::spawn(move || handle.block_on(once))
488 .join()
489 .unwrap();
490 match result {
491 Ok(val) => break val,
492 Err(f) => future = f,
493 }
494 }
495 }
496
497 #[tokio::test]
498 #[cfg_attr(miri, ignore)]
resume_separate_thread()499 async fn resume_separate_thread() {
500 // This test will poll the following future on two threads. Simulating a
501 // trap requires accessing TLS info, so that should be preserved correctly.
502 execute_across_threads(async {
503 let mut store = async_store();
504 let module = Module::new(
505 store.engine(),
506 "
507 (module
508 (import \"\" \"\" (func))
509 (start 0)
510 )
511 ",
512 )
513 .unwrap();
514 let func = Func::wrap_async(&mut store, |_, _: ()| {
515 Box::new(async {
516 tokio::task::yield_now().await;
517 Err::<(), _>(format_err!("test"))
518 })
519 });
520 let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
521 assert!(result.is_err());
522 })
523 .await;
524 }
525
526 #[tokio::test]
527 #[cfg_attr(miri, ignore)]
resume_separate_thread2()528 async fn resume_separate_thread2() {
529 // This test will poll the following future on two threads. Catching a
530 // signal requires looking up TLS information to determine whether it's a
531 // trap to handle or not, so that must be preserved correctly across threads.
532 execute_across_threads(async {
533 let mut store = async_store();
534 let module = Module::new(
535 store.engine(),
536 "
537 (module
538 (import \"\" \"\" (func))
539 (func $start
540 call 0
541 unreachable)
542 (start $start)
543 )
544 ",
545 )
546 .unwrap();
547 let func = Func::wrap_async(&mut store, |_, _: ()| {
548 Box::new(async {
549 tokio::task::yield_now().await;
550 })
551 });
552 let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
553 assert!(result.is_err());
554 })
555 .await;
556 }
557
558 #[tokio::test]
559 #[cfg_attr(miri, ignore)]
resume_separate_thread3()560 async fn resume_separate_thread3() {
561 let _ = env_logger::try_init();
562
563 // This test doesn't actually do anything with cross-thread polls, but
564 // instead it deals with scheduling futures at "odd" times.
565 //
566 // First we'll set up a *synchronous* call which will initialize TLS info.
567 // This call is simply to a host-defined function, but it still has the same
568 // "enter into wasm" semantics since it's just calling a trampoline. In this
569 // situation we'll set up the TLS info so it's in place while the body of
570 // the function executes...
571 let mut store = Store::new(&Engine::default(), None);
572 let f = Func::wrap(&mut store, move |mut caller: Caller<'_, _>| -> Result<()> {
573 // ... and the execution of this host-defined function (while the TLS
574 // info is initialized), will set up a recursive call into wasm. This
575 // recursive call will be done asynchronously so we can suspend it
576 // halfway through.
577 let f = async {
578 let mut store = async_store();
579 let module = Module::new(
580 store.engine(),
581 "
582 (module
583 (import \"\" \"\" (func))
584 (start 0)
585 )
586 ",
587 )
588 .unwrap();
589 let func = Func::wrap_async(&mut store, |_, _: ()| {
590 Box::new(async {
591 tokio::task::yield_now().await;
592 })
593 });
594 drop(Instance::new_async(&mut store, &module, &[func.into()]).await);
595 unreachable!()
596 };
597 let mut future = Box::pin(f);
598 let poll = future
599 .as_mut()
600 .poll(&mut Context::from_waker(Waker::noop()));
601 assert!(poll.is_pending());
602
603 // ... so at this point our call into wasm is suspended. The call into
604 // wasm will have overwritten TLS info, and we sure hope that the
605 // information is restored at this point. Note that we squirrel away the
606 // future somewhere else to get dropped later. If we were to drop it
607 // here then we would reenter the future's suspended stack to clean it
608 // up, which would do more alterations of TLS information we're not
609 // testing here.
610 *caller.data_mut() = Some(future);
611
612 // ... all in all this function will need access to the original TLS
613 // information to raise the trap. This TLS information should be
614 // restored even though the asynchronous execution is suspended.
615 bail!("")
616 });
617 assert!(f.call(&mut store, &[], &mut []).is_err());
618 }
619
620 #[tokio::test]
621 #[cfg_attr(miri, ignore)]
resume_separate_thread_tls()622 async fn resume_separate_thread_tls() {
623 static COUNTER: AtomicUsize = AtomicUsize::new(0);
624
625 struct IncOnDrop;
626 impl Drop for IncOnDrop {
627 fn drop(&mut self) {
628 COUNTER.fetch_add(1, Ordering::SeqCst);
629 }
630 }
631
632 thread_local!(static FOO: IncOnDrop = IncOnDrop);
633
634 // This test will poll the following future on two threads.
635 // We verify that TLS destructors are run correctly.
636 execute_across_threads(async move {
637 let mut store = async_store();
638 let module = Module::new(
639 store.engine(),
640 "
641 (module
642 (import \"\" \"\" (func))
643 (start 0)
644 )
645 ",
646 )
647 .unwrap();
648 let func = Func::wrap_async(&mut store, |_, _: ()| {
649 Box::new(async {
650 tokio::task::yield_now().await;
651 FOO.with(|_f| {});
652 Err::<(), _>(format_err!("test"))
653 })
654 });
655 let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
656 assert!(result.is_err());
657 })
658 .await;
659
660 assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
661 }
662
663 #[tokio::test]
664 #[cfg_attr(miri, ignore)]
recursive_async() -> Result<()>665 async fn recursive_async() -> Result<()> {
666 let _ = env_logger::try_init();
667 let mut store = async_store();
668 let m = Module::new(
669 store.engine(),
670 "(module
671 (func (export \"overflow\") call 0)
672 (func (export \"normal\"))
673 )",
674 )?;
675 let i = Instance::new_async(&mut store, &m, &[]).await?;
676 let overflow = i.get_typed_func::<(), ()>(&mut store, "overflow")?;
677 let normal = i.get_typed_func::<(), ()>(&mut store, "normal")?;
678 let f2 = Func::wrap_async(&mut store, move |mut caller, _: ()| {
679 let normal = normal.clone();
680 let overflow = overflow.clone();
681 Box::new(async move {
682 // recursive async calls shouldn't immediately stack overflow...
683 normal.call_async(&mut caller, ()).await?;
684
685 // ... but calls that actually stack overflow should indeed stack
686 // overflow
687 let err = overflow
688 .call_async(&mut caller, ())
689 .await
690 .unwrap_err()
691 .downcast::<Trap>()?;
692 assert_eq!(err, Trap::StackOverflow);
693 Ok(())
694 })
695 });
696 f2.call_async(&mut store, &[], &mut []).await?;
697 Ok(())
698 }
699
700 #[tokio::test]
701 #[cfg_attr(miri, ignore)]
linker_module_command() -> Result<()>702 async fn linker_module_command() -> Result<()> {
703 let mut store = async_store();
704 let mut linker = Linker::new(store.engine());
705
706 let module1 = Module::new(
707 store.engine(),
708 r#"
709 (module
710 (global $g (mut i32) (i32.const 0))
711
712 (func (export "_start"))
713
714 (func (export "g") (result i32)
715 global.get $g
716 i32.const 1
717 global.set $g)
718 )
719 "#,
720 )?;
721
722 let module2 = Module::new(
723 store.engine(),
724 r#"
725 (module
726 (import "" "g" (func (result i32)))
727
728 (func (export "get") (result i32)
729 call 0)
730 )
731 "#,
732 )?;
733
734 linker.module_async(&mut store, "", &module1).await?;
735 let instance = linker.instantiate_async(&mut store, &module2).await?;
736 let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
737 assert_eq!(f.call_async(&mut store, ()).await?, 0);
738 assert_eq!(f.call_async(&mut store, ()).await?, 0);
739
740 Ok(())
741 }
742
743 #[tokio::test]
744 #[cfg_attr(miri, ignore)]
linker_module_reactor() -> Result<()>745 async fn linker_module_reactor() -> Result<()> {
746 let mut store = async_store();
747 let mut linker = Linker::new(store.engine());
748 let module1 = Module::new(
749 store.engine(),
750 r#"
751 (module
752 (global $g (mut i32) (i32.const 0))
753
754 (func (export "g") (result i32)
755 global.get $g
756 i32.const 1
757 global.set $g)
758 )
759 "#,
760 )?;
761 let module2 = Module::new(
762 store.engine(),
763 r#"
764 (module
765 (import "" "g" (func (result i32)))
766
767 (func (export "get") (result i32)
768 call 0)
769 )
770 "#,
771 )?;
772
773 linker.module_async(&mut store, "", &module1).await?;
774 let instance = linker.instantiate_async(&mut store, &module2).await?;
775 let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
776 assert_eq!(f.call_async(&mut store, ()).await?, 0);
777 assert_eq!(f.call_async(&mut store, ()).await?, 1);
778
779 Ok(())
780 }
781
782 pub struct CountPending<F> {
783 future: F,
784 yields: usize,
785 }
786
787 impl<F> CountPending<F> {
new(future: F) -> CountPending<F>788 pub fn new(future: F) -> CountPending<F> {
789 CountPending { future, yields: 0 }
790 }
791 }
792
793 impl<F> Future for CountPending<F>
794 where
795 F: Future + Unpin,
796 {
797 type Output = (F::Output, usize);
798
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>799 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
800 match Pin::new(&mut self.future).poll(cx) {
801 Poll::Pending => {
802 self.yields += 1;
803 Poll::Pending
804 }
805 Poll::Ready(e) => Poll::Ready((e, self.yields)),
806 }
807 }
808 }
809
810 pub struct PollOnce<F>(Option<F>);
811
812 impl<F> PollOnce<F> {
new(future: F) -> PollOnce<F>813 pub fn new(future: F) -> PollOnce<F> {
814 PollOnce(Some(future))
815 }
816 }
817
818 impl<F> Future for PollOnce<F>
819 where
820 F: Future + Unpin,
821 {
822 type Output = Result<F::Output, F>;
823
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>824 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
825 let mut future = self.0.take().unwrap();
826 match Pin::new(&mut future).poll(cx) {
827 Poll::Pending => Poll::Ready(Err(future)),
828 Poll::Ready(val) => Poll::Ready(Ok(val)),
829 }
830 }
831 }
832
833 #[tokio::test]
834 #[cfg_attr(miri, ignore)]
non_stacky_async_activations() -> Result<()>835 async fn non_stacky_async_activations() -> Result<()> {
836 let engine = Engine::default();
837 let mut store1: Store<Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>> =
838 Store::new(&engine, None);
839 let mut linker1 = Linker::new(&engine);
840
841 let module1 = Module::new(
842 &engine,
843 r#"
844 (module $m1
845 (import "" "host_capture_stack" (func $host_capture_stack))
846 (import "" "start_async_instance" (func $start_async_instance))
847 (func $capture_stack (export "capture_stack")
848 call $host_capture_stack
849 )
850 (func $run_sync (export "run_sync")
851 call $start_async_instance
852 )
853 )
854 "#,
855 )?;
856
857 let module2 = Module::new(
858 &engine,
859 r#"
860 (module $m2
861 (import "" "yield" (func $yield))
862
863 (func $run_async (export "run_async")
864 call $yield
865 )
866 )
867 "#,
868 )?;
869
870 let stacks = Arc::new(Mutex::new(vec![]));
871 fn capture_stack(stacks: &Arc<Mutex<Vec<WasmBacktrace>>>, store: impl AsContext) {
872 let mut stacks = stacks.lock().unwrap();
873 stacks.push(wasmtime::WasmBacktrace::force_capture(store));
874 }
875
876 linker1.func_wrap_async("", "host_capture_stack", {
877 let stacks = stacks.clone();
878 move |caller, _: ()| {
879 capture_stack(&stacks, &caller);
880 Box::new(async { Ok(()) })
881 }
882 })?;
883
884 linker1.func_wrap_async("", "start_async_instance", {
885 let stacks = stacks.clone();
886 move |mut caller, _: ()| {
887 let stacks = stacks.clone();
888 capture_stack(&stacks, &caller);
889
890 let module2 = module2.clone();
891 let mut store2 = Store::new(caller.engine(), ());
892 let mut linker2 = Linker::<()>::new(caller.engine());
893 linker2
894 .func_wrap_async("", "yield", {
895 let stacks = stacks.clone();
896 move |caller, _: ()| {
897 let stacks = stacks.clone();
898 Box::new(async move {
899 capture_stack(&stacks, &caller);
900 tokio::task::yield_now().await;
901 capture_stack(&stacks, &caller);
902 Ok(())
903 })
904 }
905 })
906 .unwrap();
907
908 Box::new(async move {
909 let future = PollOnce::new(Box::pin({
910 let stacks = stacks.clone();
911 async move {
912 let instance2 = linker2.instantiate_async(&mut store2, &module2).await?;
913
914 instance2
915 .get_func(&mut store2, "run_async")
916 .unwrap()
917 .call_async(&mut store2, &[], &mut [])
918 .await?;
919
920 capture_stack(&stacks, &store2);
921 wasmtime::error::Ok(())
922 }
923 }) as _)
924 .await
925 .err()
926 .unwrap();
927 capture_stack(&stacks, &caller);
928 *caller.data_mut() = Some(future);
929 Ok(())
930 })
931 }
932 })?;
933
934 let instance1 = linker1.instantiate_async(&mut store1, &module1).await?;
935 instance1
936 .get_typed_func::<(), ()>(&mut store1, "run_sync")?
937 .call_async(&mut store1, ())
938 .await?;
939 let future = store1.data_mut().take().unwrap();
940 future.await?;
941
942 instance1
943 .get_typed_func::<(), ()>(&mut store1, "capture_stack")?
944 .call_async(&mut store1, ())
945 .await?;
946
947 let stacks = stacks.lock().unwrap();
948 eprintln!("stacks = {stacks:#?}");
949
950 assert_eq!(stacks.len(), 6);
951 for (actual, expected) in stacks.iter().zip(vec![
952 vec!["run_sync"],
953 vec!["run_async"],
954 vec!["run_sync"],
955 vec!["run_async"],
956 vec![],
957 vec!["capture_stack"],
958 ]) {
959 eprintln!("expected = {expected:?}");
960 eprintln!("actual = {actual:?}");
961 assert_eq!(actual.frames().len(), expected.len());
962 for (actual, expected) in actual.frames().iter().zip(expected) {
963 assert_eq!(actual.func_name(), Some(expected));
964 }
965 }
966
967 Ok(())
968 }
969
970 #[tokio::test]
971 #[cfg_attr(miri, ignore)]
gc_preserves_externref_on_historical_async_stacks() -> Result<()>972 async fn gc_preserves_externref_on_historical_async_stacks() -> Result<()> {
973 let _ = env_logger::try_init();
974
975 let engine = Engine::default();
976
977 let module = Module::new(
978 &engine,
979 r#"
980 (module $m1
981 (import "" "gc" (func $gc))
982 (import "" "recurse" (func $recurse (param i32)))
983 (import "" "test" (func $test (param i32 externref)))
984 (func (export "run") (param i32 externref)
985 local.get 0
986 if
987 local.get 0
988 i32.const -1
989 i32.add
990 call $recurse
991 else
992 call $gc
993 end
994
995 local.get 0
996 local.get 1
997 call $test
998 )
999 )
1000 "#,
1001 )?;
1002
1003 type F = TypedFunc<(i32, Option<Rooted<ExternRef>>), ()>;
1004
1005 let mut store = Store::new(&engine, None);
1006 let mut linker = Linker::<Option<F>>::new(&engine);
1007 linker.func_wrap_async("", "gc", |mut cx: Caller<'_, _>, ()| {
1008 Box::new(async move { cx.gc_async(None).await })
1009 })?;
1010 linker.func_wrap(
1011 "",
1012 "test",
1013 |cx: Caller<'_, _>, val: i32, handle: Option<Rooted<ExternRef>>| -> Result<()> {
1014 assert_eq!(
1015 handle.unwrap().data(&cx)?.unwrap().downcast_ref(),
1016 Some(&val)
1017 );
1018 Ok(())
1019 },
1020 )?;
1021 linker.func_wrap_async(
1022 "",
1023 "recurse",
1024 |mut cx: Caller<'_, Option<F>>, (val,): (i32,)| {
1025 let func = cx.data().clone().unwrap();
1026 Box::new(async move {
1027 let r = Some(ExternRef::new_async(&mut cx, val).await?);
1028 Ok(func.call_async(&mut cx, (val, r)).await)
1029 })
1030 },
1031 )?;
1032 let instance = linker.instantiate_async(&mut store, &module).await?;
1033 let func: F = instance.get_typed_func(&mut store, "run")?;
1034 *store.data_mut() = Some(func.clone());
1035
1036 let r = Some(ExternRef::new_async(&mut store, 5).await?);
1037 func.call_async(&mut store, (5, r)).await?;
1038
1039 Ok(())
1040 }
1041
1042 #[tokio::test]
1043 #[cfg_attr(miri, ignore)]
async_gc_with_func_new_and_func_wrap() -> Result<()>1044 async fn async_gc_with_func_new_and_func_wrap() -> Result<()> {
1045 let _ = env_logger::try_init();
1046
1047 let mut config = Config::new();
1048 config.wasm_gc(true);
1049 let engine = Engine::new(&config)?;
1050
1051 let module = Module::new(
1052 &engine,
1053 r#"
1054 (module $m1
1055 (import "" "a" (func $a (result externref structref arrayref)))
1056 (import "" "b" (func $b (result externref structref arrayref)))
1057
1058 (table 2 funcref)
1059 (elem (i32.const 0) func $a $b)
1060
1061 (func (export "a")
1062 (call $call (i32.const 0))
1063 )
1064 (func (export "b")
1065 (call $call (i32.const 1))
1066 )
1067
1068 (func $call (param i32)
1069 (local $cnt i32)
1070 (loop $l
1071 (call_indirect (result externref structref arrayref) (local.get 0))
1072 drop
1073 drop
1074 drop
1075
1076 (local.set $cnt (i32.add (local.get $cnt) (i32.const 1)))
1077
1078 (if (i32.lt_u (local.get $cnt) (i32.const 5000))
1079 (then (br $l)))
1080 )
1081 )
1082 )
1083 "#,
1084 )?;
1085
1086 let mut linker = Linker::<()>::new(&engine);
1087 linker.func_wrap_async("", "a", |mut cx: Caller<'_, _>, ()| {
1088 Box::new(async move {
1089 let externref = ExternRef::new_async(&mut cx, 100).await?;
1090
1091 let struct_ty = StructType::new(cx.engine(), [])?;
1092 let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1093 let structref = StructRef::new_async(&mut cx, &struct_pre, &[]).await?;
1094
1095 let array_ty = ArrayType::new(
1096 cx.engine(),
1097 FieldType::new(Mutability::Var, ValType::I32.into()),
1098 );
1099 let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1100 let arrayref = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[]).await?;
1101
1102 Ok((Some(externref), Some(structref), Some(arrayref)))
1103 })
1104 })?;
1105 let ty = FuncType::new(
1106 &engine,
1107 [],
1108 [ValType::EXTERNREF, ValType::STRUCTREF, ValType::ARRAYREF],
1109 );
1110 linker.func_new_async("", "b", ty, |mut cx, _, results| {
1111 Box::new(async move {
1112 results[0] = ExternRef::new_async(&mut cx, 100).await?.into();
1113
1114 let struct_ty = StructType::new(cx.engine(), [])?;
1115 let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1116 results[1] = StructRef::new_async(&mut cx, &struct_pre, &[])
1117 .await?
1118 .into();
1119
1120 let array_ty = ArrayType::new(
1121 cx.engine(),
1122 FieldType::new(Mutability::Var, ValType::I32.into()),
1123 );
1124 let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1125 results[2] = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[])
1126 .await?
1127 .into();
1128
1129 Ok(())
1130 })
1131 })?;
1132
1133 let mut store = Store::new(&engine, ());
1134 let instance = linker.instantiate_async(&mut store, &module).await?;
1135 let a = instance.get_typed_func::<(), ()>(&mut store, "a")?;
1136 a.call_async(&mut store, ()).await?;
1137
1138 let mut store = Store::new(&engine, ());
1139 let instance = linker.instantiate_async(&mut store, &module).await?;
1140 let b = instance.get_typed_func::<(), ()>(&mut store, "b")?;
1141 b.call_async(&mut store, ()).await?;
1142
1143 Ok(())
1144 }
1145