1 use crate::async_functions::{PollOnce, execute_across_threads};
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4 use wasmtime::Result;
5 use wasmtime::{AsContextMut, Config, Engine, Store, StoreContextMut, Trap, component::*};
6 use wasmtime_component_util::REALLOC_AND_FREE;
7
8 /// This is super::func::thunks, except with an async store.
9 #[tokio::test]
10 #[cfg_attr(miri, ignore)]
smoke() -> Result<()>11 async fn smoke() -> Result<()> {
12 let component = r#"
13 (component
14 (core module $m
15 (func (export "thunk"))
16 (func (export "thunk-trap") unreachable)
17 )
18 (core instance $i (instantiate $m))
19 (func (export "thunk")
20 (canon lift (core func $i "thunk"))
21 )
22 (func (export "thunk-trap")
23 (canon lift (core func $i "thunk-trap"))
24 )
25 )
26 "#;
27
28 let engine = super::async_engine();
29 let component = Component::new(&engine, component)?;
30 let mut store = Store::new(&engine, ());
31 let instance = Linker::new(&engine)
32 .instantiate_async(&mut store, &component)
33 .await?;
34
35 let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
36
37 thunk.call_async(&mut store, ()).await?;
38
39 let err = instance
40 .get_typed_func::<(), ()>(&mut store, "thunk-trap")?
41 .call_async(&mut store, ())
42 .await
43 .unwrap_err();
44 assert_eq!(err.downcast::<Trap>()?, Trap::UnreachableCodeReached);
45
46 Ok(())
47 }
48
49 /// Handle an import function, created using component::Linker::func_wrap_async.
50 #[tokio::test]
51 #[cfg_attr(miri, ignore)]
smoke_func_wrap() -> Result<()>52 async fn smoke_func_wrap() -> Result<()> {
53 let component = r#"
54 (component
55 (type $f (func))
56 (import "i" (func $f))
57
58 (core module $m
59 (import "imports" "i" (func $i))
60 (func (export "thunk") call $i)
61 )
62
63 (core func $f (canon lower (func $f)))
64 (core instance $i (instantiate $m
65 (with "imports" (instance
66 (export "i" (func $f))
67 ))
68 ))
69 (func (export "thunk")
70 (canon lift (core func $i "thunk"))
71 )
72 )
73 "#;
74
75 let engine = super::async_engine();
76 let component = Component::new(&engine, component)?;
77 let mut store = Store::new(&engine, ());
78 let mut linker = Linker::new(&engine);
79 let mut root = linker.root();
80 root.func_wrap_async("i", |_: StoreContextMut<()>, _: ()| {
81 Box::new(async { Ok(()) })
82 })?;
83
84 let instance = linker.instantiate_async(&mut store, &component).await?;
85
86 let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
87
88 thunk.call_async(&mut store, ()).await?;
89
90 Ok(())
91 }
92
93 // This test stresses TLS management in combination with the `realloc` option
94 // for imported functions. This will create an async computation which invokes a
95 // component that invokes an imported function. The imported function returns a
96 // list which will require invoking malloc.
97 //
98 // As an added stressor all polls are sprinkled across threads through
99 // `execute_across_threads`. Yields are injected liberally by configuring 1
100 // fuel consumption to trigger a yield.
101 //
102 // Overall a yield should happen during malloc which should be an "interesting
103 // situation" with respect to the runtime.
104 #[tokio::test]
105 #[cfg_attr(miri, ignore)]
resume_separate_thread() -> Result<()>106 async fn resume_separate_thread() -> Result<()> {
107 let mut config = wasmtime_test_util::component::config();
108 config.consume_fuel(true);
109 let engine = Engine::new(&config)?;
110 let component = format!(
111 r#"
112 (component
113 (import "yield" (func $yield (result (list u8))))
114 (core module $libc
115 (memory (export "memory") 1)
116 {REALLOC_AND_FREE}
117 )
118 (core instance $libc (instantiate $libc))
119
120 (core func $yield
121 (canon lower
122 (func $yield)
123 (memory $libc "memory")
124 (realloc (func $libc "realloc"))
125 )
126 )
127
128 (core module $m
129 (import "" "yield" (func $yield (param i32)))
130 (import "libc" "memory" (memory 0))
131 (func $start
132 i32.const 8
133 call $yield
134 )
135 (start $start)
136 )
137 (core instance (instantiate $m
138 (with "" (instance (export "yield" (func $yield))))
139 (with "libc" (instance $libc))
140 ))
141 )
142 "#
143 );
144 let component = Component::new(&engine, component)?;
145 let mut linker = Linker::new(&engine);
146 linker
147 .root()
148 .func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| {
149 Box::new(async {
150 tokio::task::yield_now().await;
151 Ok((vec![1u8, 2u8],))
152 })
153 })?;
154
155 execute_across_threads(async move {
156 let mut store = Store::new(&engine, ());
157 store.set_fuel(u64::MAX).unwrap();
158 store.fuel_async_yield_interval(Some(1)).unwrap();
159 linker.instantiate_async(&mut store, &component).await?;
160 Ok::<_, wasmtime::Error>(())
161 })
162 .await?;
163 Ok(())
164 }
165
166 // This test is intended to stress TLS management in the component model around
167 // the management of the `realloc` function. This creates an async computation
168 // representing the execution of a component model function where entry into the
169 // component uses `realloc` and then the component runs. This async computation
170 // is then polled iteratively with another "wasm activation" (in this case a
171 // core wasm function) on the stack. The poll-per-call should work and nothing
172 // should in theory have problems here.
173 //
174 // As an added stressor all polls are sprinkled across threads through
175 // `execute_across_threads`. Yields are injected liberally by configuring 1
176 // fuel consumption to trigger a yield.
177 //
178 // Overall a yield should happen during malloc which should be an "interesting
179 // situation" with respect to the runtime.
180 #[tokio::test]
181 #[cfg_attr(miri, ignore)]
poll_through_wasm_activation() -> Result<()>182 async fn poll_through_wasm_activation() -> Result<()> {
183 let mut config = wasmtime_test_util::component::config();
184 config.consume_fuel(true);
185 let engine = Engine::new(&config)?;
186 let component = format!(
187 r#"
188 (component
189 (core module $m
190 {REALLOC_AND_FREE}
191 (memory (export "memory") 1)
192 (func (export "run") (param i32 i32)
193 )
194 )
195 (core instance $i (instantiate $m))
196 (func (export "run") (param "x" (list u8))
197 (canon lift (core func $i "run")
198 (memory $i "memory")
199 (realloc (func $i "realloc"))))
200 )
201 "#
202 );
203 let component = Component::new(&engine, component)?;
204 let linker = Linker::new(&engine);
205
206 let invoke_component = {
207 let engine = engine.clone();
208 async move {
209 let mut store = Store::new(&engine, ());
210 store.set_fuel(u64::MAX).unwrap();
211 store.fuel_async_yield_interval(Some(1)).unwrap();
212 let instance = linker.instantiate_async(&mut store, &component).await?;
213 let func = instance.get_typed_func::<(Vec<u8>,), ()>(&mut store, "run")?;
214 func.call_async(&mut store, (vec![1, 2, 3],)).await?;
215 Ok::<_, wasmtime::Error>(())
216 }
217 };
218
219 execute_across_threads(async move {
220 let mut store = Store::new(&engine, Some(Box::pin(invoke_component)));
221 let poll_once = wasmtime::Func::wrap_async(&mut store, |mut cx, _: ()| {
222 let invoke_component = cx.data_mut().take().unwrap();
223 Box::new(async move {
224 match PollOnce::new(invoke_component).await {
225 Ok(result) => {
226 result?;
227 Ok(1)
228 }
229 Err(future) => {
230 *cx.data_mut() = Some(future);
231 Ok(0)
232 }
233 }
234 })
235 });
236 let poll_once = poll_once.typed::<(), i32>(&mut store)?;
237 while poll_once.call_async(&mut store, ()).await? != 1 {
238 // loop around to call again
239 }
240 Ok::<_, wasmtime::Error>(())
241 })
242 .await?;
243 Ok(())
244 }
245
246 /// Test async drop method for host resources.
247 #[tokio::test]
248 #[cfg_attr(miri, ignore)]
drop_resource_async() -> Result<()>249 async fn drop_resource_async() -> Result<()> {
250 use std::sync::Arc;
251 use std::sync::Mutex;
252
253 let engine = super::async_engine();
254 let c = Component::new(
255 &engine,
256 r#"
257 (component
258 (import "t" (type $t (sub resource)))
259
260 (core func $drop (canon resource.drop $t))
261
262 (core module $m
263 (import "" "drop" (func $drop (param i32)))
264 (func (export "f") (param i32)
265 (call $drop (local.get 0))
266 )
267 )
268 (core instance $i (instantiate $m
269 (with "" (instance
270 (export "drop" (func $drop))
271 ))
272 ))
273
274 (func (export "f") (param "x" (own $t))
275 (canon lift (core func $i "f")))
276 )
277 "#,
278 )?;
279
280 struct MyType;
281
282 let mut store = Store::new(&engine, ());
283 let mut linker = Linker::new(&engine);
284
285 let drop_status = Arc::new(Mutex::new("not dropped"));
286 let ds = drop_status.clone();
287
288 linker
289 .root()
290 .resource_async("t", ResourceType::host::<MyType>(), move |_, _| {
291 let ds = ds.clone();
292 Box::new(async move {
293 *ds.lock().unwrap() = "before yield";
294 tokio::task::yield_now().await;
295 *ds.lock().unwrap() = "after yield";
296 Ok(())
297 })
298 })?;
299 let i = linker.instantiate_async(&mut store, &c).await?;
300 let f = i.get_typed_func::<(Resource<MyType>,), ()>(&mut store, "f")?;
301
302 execute_across_threads(async move {
303 let resource = Resource::new_own(100);
304 f.call_async(&mut store, (resource,)).await?;
305 Ok::<_, wasmtime::Error>(())
306 })
307 .await?;
308
309 assert_eq!("after yield", *drop_status.lock().unwrap());
310
311 Ok(())
312 }
313
314 /// Test task deletion in three situations, for every combination of lift/lower/(guest/host):
315 /// 1. An explicit thread calls task.return
316 /// 2. An explicit thread suspends indefinitely
317 /// 3. An explicit thread yield loops indefinitely
318 #[tokio::test]
319 #[cfg_attr(miri, ignore)]
task_deletion() -> Result<()>320 async fn task_deletion() -> Result<()> {
321 let mut config = Config::new();
322 config.wasm_component_model_async(true);
323 config.wasm_component_model_threading(true);
324 config.wasm_component_model_async_stackful(true);
325 config.wasm_component_model_async_builtins(true);
326 let engine = Engine::new(&config)?;
327 let component = Component::new(
328 &engine,
329 r#"(component
330 (component $C
331 (core module $Memory (memory (export "mem") 1))
332 (core instance $memory (instantiate $Memory))
333 ;; Defines the table for the thread start functions
334 (core module $libc
335 (table (export "__indirect_function_table") 3 funcref))
336 (core module $CM
337 (import "" "mem" (memory 1))
338 (import "" "task.return" (func $task-return (param i32)))
339 (import "" "task.cancel" (func $task-cancel))
340 (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
341 (import "" "thread.suspend" (func $thread-suspend (result i32)))
342 (import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32)))
343 (import "" "thread.yield-to-suspended" (func $thread-yield-to-suspended (param i32) (result i32)))
344 (import "" "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable (param i32) (result i32)))
345 (import "" "thread.suspend-to" (func $thread-suspend-to (param i32) (result i32)))
346 (import "" "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable (param i32) (result i32)))
347 (import "" "thread.yield" (func $thread-yield (result i32)))
348 (import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32)))
349 (import "" "thread.index" (func $thread-index (result i32)))
350 (import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
351 (import "" "waitable.join" (func $waitable.join (param i32 i32)))
352 (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
353 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
354 (import "libc" "__indirect_function_table" (table $indirect-function-table 3 funcref))
355
356 ;; Indices into the function table for the thread start functions
357 (global $call-return-ftbl-idx i32 (i32.const 0))
358 (global $suspend-ftbl-idx i32 (i32.const 1))
359 (global $yield-loop-ftbl-idx i32 (i32.const 2))
360
361 (func $call-return (param i32)
362 (call $task-return (local.get 0)))
363
364 (func $suspend (param i32)
365 (drop (call $thread-suspend)))
366
367 (func $yield-loop (param i32)
368 (loop $top
369 (drop (call $thread-yield))
370 (br $top)))
371
372 (func (export "explicit-thread-calls-return-stackful")
373 (call $thread-unsuspend
374 (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42))))
375
376 (func (export "explicit-thread-calls-return-stackless") (result i32)
377 (call $thread-unsuspend
378 (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42)))
379 (i32.const 0 (; EXIT ;)))
380
381 (func (export "cb") (param i32 i32 i32) (result i32)
382 (unreachable))
383
384 (func (export "explicit-thread-suspends-sync") (result i32)
385 (call $thread-unsuspend
386 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
387 (i32.const 42))
388
389 (func (export "explicit-thread-suspends-stackful")
390 (call $thread-unsuspend
391 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
392 (call $task-return (i32.const 42)))
393
394 (func (export "explicit-thread-suspends-stackless") (result i32)
395 (call $thread-unsuspend
396 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
397 (call $task-return (i32.const 42))
398 (i32.const 0))
399
400 (func (export "explicit-thread-yield-loops-sync") (result i32)
401 (call $thread-unsuspend
402 (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
403 (i32.const 42))
404
405 (func (export "explicit-thread-yield-loops-stackful")
406 (call $thread-unsuspend
407 (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
408 (call $task-return (i32.const 42)))
409
410 (func (export "explicit-thread-yield-loops-stackless") (result i32)
411 (call $thread-unsuspend
412 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
413 (call $task-return (i32.const 42))
414 (i32.const 0 (; EXIT ;)))
415
416 ;; Initialize the function table that will be used by thread.new-indirect
417 (elem (table $indirect-function-table) (i32.const 0 (; call-return-ftbl-idx ;)) func $call-return)
418 (elem (table $indirect-function-table) (i32.const 1 (; suspend-ftbl-idx ;)) func $suspend)
419 (elem (table $indirect-function-table) (i32.const 2 (; yield-loop-ftbl-idx ;)) func $yield-loop)
420 )
421
422 ;; Instantiate the libc module to get the table
423 (core instance $libc (instantiate $libc))
424 ;; Get access to `thread.new-indirect` that uses the table from libc
425 (core type $start-func-ty (func (param i32)))
426 (alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
427
428 (core func $task-return (canon task.return (result u32)))
429 (core func $task-cancel (canon task.cancel))
430 (core func $thread-new-indirect
431 (canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
432 (core func $thread-yield (canon thread.yield))
433 (core func $thread-yield-cancellable (canon thread.yield cancellable))
434 (core func $thread-index (canon thread.index))
435 (core func $thread-yield-to-suspended (canon thread.yield-to-suspended))
436 (core func $thread-yield-to-suspended-cancellable (canon thread.yield-to-suspended cancellable))
437 (core func $thread-unsuspend (canon thread.unsuspend))
438 (core func $thread-suspend-to (canon thread.suspend-to))
439 (core func $thread-suspend-to-cancellable (canon thread.suspend-to cancellable))
440 (core func $thread-suspend (canon thread.suspend))
441 (core func $thread-suspend-cancellable (canon thread.suspend cancellable))
442 (core func $waitable-set.new (canon waitable-set.new))
443 (core func $waitable.join (canon waitable.join))
444 (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
445
446 ;; Instantiate the main module
447 (core instance $cm (
448 instantiate $CM
449 (with "" (instance
450 (export "mem" (memory $memory "mem"))
451 (export "task.return" (func $task-return))
452 (export "task.cancel" (func $task-cancel))
453 (export "thread.new-indirect" (func $thread-new-indirect))
454 (export "thread.index" (func $thread-index))
455 (export "thread.yield-to-suspended" (func $thread-yield-to-suspended))
456 (export "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable))
457 (export "thread.yield" (func $thread-yield))
458 (export "thread.yield-cancellable" (func $thread-yield-cancellable))
459 (export "thread.suspend-to" (func $thread-suspend-to))
460 (export "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable))
461 (export "thread.suspend" (func $thread-suspend))
462 (export "thread.suspend-cancellable" (func $thread-suspend-cancellable))
463 (export "thread.unsuspend" (func $thread-unsuspend))
464 (export "waitable.join" (func $waitable.join))
465 (export "waitable-set.wait" (func $waitable-set.wait))
466 (export "waitable-set.new" (func $waitable-set.new))))
467 (with "libc" (instance $libc))))
468
469 (func (export "explicit-thread-calls-return-stackful") async (result u32)
470 (canon lift (core func $cm "explicit-thread-calls-return-stackful") async))
471 (func (export "explicit-thread-calls-return-stackless") async (result u32)
472 (canon lift (core func $cm "explicit-thread-calls-return-stackless") async (callback (func $cm "cb"))))
473 (func (export "explicit-thread-suspends-sync") async (result u32)
474 (canon lift (core func $cm "explicit-thread-suspends-sync")))
475 (func (export "explicit-thread-suspends-stackful") async (result u32)
476 (canon lift (core func $cm "explicit-thread-suspends-stackful") async))
477 (func (export "explicit-thread-suspends-stackless") async (result u32)
478 (canon lift (core func $cm "explicit-thread-suspends-stackless") async (callback (func $cm "cb"))))
479 (func (export "explicit-thread-yield-loops-sync") async (result u32)
480 (canon lift (core func $cm "explicit-thread-yield-loops-sync")))
481 (func (export "explicit-thread-yield-loops-stackful") async (result u32)
482 (canon lift (core func $cm "explicit-thread-yield-loops-stackful") async))
483 (func (export "explicit-thread-yield-loops-stackless") async (result u32)
484 (canon lift (core func $cm "explicit-thread-yield-loops-stackless") async (callback (func $cm "cb"))))
485 )
486
487 (component $D
488 (import "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful async (result u32)))
489 (import "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless async (result u32)))
490 (import "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync async (result u32)))
491 (import "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful async (result u32)))
492 (import "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless async (result u32)))
493 (import "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync async (result u32)))
494 (import "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful async (result u32)))
495 (import "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless async (result u32)))
496
497 (core module $Memory (memory (export "mem") 1))
498 (core instance $memory (instantiate $Memory))
499 (core module $DM
500 (import "" "mem" (memory 1))
501 (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
502 ;; sync lowered
503 (import "" "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful (result i32)))
504 (import "" "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless (result i32)))
505 (import "" "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync (result i32)))
506 (import "" "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful (result i32)))
507 (import "" "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless (result i32)))
508 (import "" "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync (result i32)))
509 (import "" "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful (result i32)))
510 (import "" "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless (result i32)))
511 ;; async lowered
512 (import "" "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async (param i32) (result i32)))
513 (import "" "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async (param i32) (result i32)))
514 (import "" "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async (param i32) (result i32)))
515 (import "" "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async (param i32) (result i32)))
516 (import "" "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async (param i32) (result i32)))
517 (import "" "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async (param i32) (result i32)))
518 (import "" "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async (param i32) (result i32)))
519 (import "" "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async (param i32) (result i32)))
520 (import "" "waitable.join" (func $waitable.join (param i32 i32)))
521 (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
522 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
523 (import "" "thread.yield" (func $thread-yield (result i32)))
524
525 (func $check (param i32)
526 (if (i32.ne (local.get 0) (i32.const 42))
527 (then unreachable))
528 )
529
530 (func $check-async (param i32)
531 (local $retp i32) (local $ws i32) (local $ws-retp i32)
532 (local.set $retp (i32.const 8))
533 (local.set $ws-retp (i32.const 16))
534 (local.set $ws (call $waitable-set.new))
535
536 (if (i32.eq (i32.and (local.get 0) (i32.const 0xF)) (i32.const 2 (; RETURNED ;)))
537 (then (call $check (i32.load (local.get $retp))))
538 (else
539 (call $waitable.join (i32.shr_u (local.get 0) (i32.const 4)) (local.get $ws))
540 (drop (call $waitable-set.wait (local.get $ws) (local.get $ws-retp)))
541 (call $check (i32.load (local.get $retp)))))
542 )
543
544 (func $run (export "run") (result i32)
545 (local $retp i32)
546 (local.set $retp (i32.const 8))
547 (call $check (call $explicit-thread-calls-return-stackless))
548 (call $check (call $explicit-thread-calls-return-stackful))
549 (call $check (call $explicit-thread-suspends-sync))
550 (call $check (call $explicit-thread-suspends-stackful))
551 (call $check (call $explicit-thread-suspends-stackless))
552 (call $check (call $explicit-thread-yield-loops-sync))
553 (call $check (call $explicit-thread-yield-loops-stackful))
554 (call $check (call $explicit-thread-yield-loops-stackless))
555
556 (call $check-async (call $explicit-thread-calls-return-stackless-async (local.get $retp)))
557 (call $check-async (call $explicit-thread-calls-return-stackful-async (local.get $retp)))
558 (call $check-async (call $explicit-thread-suspends-sync-async (local.get $retp)))
559 (call $check-async (call $explicit-thread-suspends-stackful-async (local.get $retp)))
560 (call $check-async (call $explicit-thread-suspends-stackless-async (local.get $retp)))
561 (call $check-async (call $explicit-thread-yield-loops-sync-async (local.get $retp)))
562 (call $check-async (call $explicit-thread-yield-loops-stackful-async (local.get $retp)))
563 (call $check-async (call $explicit-thread-yield-loops-stackless-async (local.get $retp)))
564
565 (i32.const 42)
566 )
567 )
568
569 (core func $waitable-set.new (canon waitable-set.new))
570 (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
571 (core func $waitable.join (canon waitable.join))
572 (core func $subtask.cancel (canon subtask.cancel async))
573 (core func $thread.yield (canon thread.yield))
574 ;; sync lowered
575 (canon lower (func $explicit-thread-calls-return-stackful) (memory $memory "mem") (core func $explicit-thread-calls-return-stackful'))
576 (canon lower (func $explicit-thread-calls-return-stackless) (memory $memory "mem") (core func $explicit-thread-calls-return-stackless'))
577 (canon lower (func $explicit-thread-suspends-sync) (memory $memory "mem") (core func $explicit-thread-suspends-sync'))
578 (canon lower (func $explicit-thread-suspends-stackful) (memory $memory "mem") (core func $explicit-thread-suspends-stackful'))
579 (canon lower (func $explicit-thread-suspends-stackless) (memory $memory "mem") (core func $explicit-thread-suspends-stackless'))
580 (canon lower (func $explicit-thread-yield-loops-sync) (memory $memory "mem") (core func $explicit-thread-yield-loops-sync'))
581 (canon lower (func $explicit-thread-yield-loops-stackful) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful'))
582 (canon lower (func $explicit-thread-yield-loops-stackless) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless'))
583 ;; async lowered
584 (canon lower (func $explicit-thread-calls-return-stackful) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackful-async'))
585 (canon lower (func $explicit-thread-calls-return-stackless) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackless-async'))
586 (canon lower (func $explicit-thread-suspends-sync) async (memory $memory "mem") (core func $explicit-thread-suspends-sync-async'))
587 (canon lower (func $explicit-thread-suspends-stackful) async (memory $memory "mem") (core func $explicit-thread-suspends-stackful-async'))
588 (canon lower (func $explicit-thread-suspends-stackless) async (memory $memory "mem") (core func $explicit-thread-suspends-stackless-async'))
589 (canon lower (func $explicit-thread-yield-loops-sync) async (memory $memory "mem") (core func $explicit-thread-yield-loops-sync-async'))
590 (canon lower (func $explicit-thread-yield-loops-stackful) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful-async'))
591 (canon lower (func $explicit-thread-yield-loops-stackless) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless-async'))
592 (core instance $dm (instantiate $DM (with "" (instance
593 (export "mem" (memory $memory "mem"))
594 (export "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful'))
595 (export "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless'))
596 (export "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync'))
597 (export "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful'))
598 (export "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless'))
599 (export "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync'))
600 (export "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful'))
601 (export "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless'))
602 (export "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async'))
603 (export "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async'))
604 (export "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async'))
605 (export "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async'))
606 (export "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async'))
607 (export "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async'))
608 (export "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async'))
609 (export "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async'))
610 (export "waitable.join" (func $waitable.join))
611 (export "waitable-set.new" (func $waitable-set.new))
612 (export "waitable-set.wait" (func $waitable-set.wait))
613 (export "subtask.cancel" (func $subtask.cancel))
614 (export "thread.yield" (func $thread.yield))
615 ))))
616 (func (export "run") async (result u32) (canon lift (core func $dm "run")))
617 )
618
619 (instance $c (instantiate $C))
620 (instance $d (instantiate $D
621 (with "explicit-thread-calls-return-stackful" (func $c "explicit-thread-calls-return-stackful"))
622 (with "explicit-thread-calls-return-stackless" (func $c "explicit-thread-calls-return-stackless"))
623 (with "explicit-thread-suspends-sync" (func $c "explicit-thread-suspends-sync"))
624 (with "explicit-thread-suspends-stackful" (func $c "explicit-thread-suspends-stackful"))
625 (with "explicit-thread-suspends-stackless" (func $c "explicit-thread-suspends-stackless"))
626 (with "explicit-thread-yield-loops-sync" (func $c "explicit-thread-yield-loops-sync"))
627 (with "explicit-thread-yield-loops-stackful" (func $c "explicit-thread-yield-loops-stackful"))
628 (with "explicit-thread-yield-loops-stackless" (func $c "explicit-thread-yield-loops-stackless"))
629 ))
630 (func (export "run") (alias export $d "run"))
631 (func (export "explicit-thread-calls-return-stackful") (alias export $c "explicit-thread-calls-return-stackful"))
632 (func (export "explicit-thread-calls-return-stackless") (alias export $c "explicit-thread-calls-return-stackless"))
633 (func (export "explicit-thread-suspends-sync") (alias export $c "explicit-thread-suspends-sync"))
634 (func (export "explicit-thread-suspends-stackful") (alias export $c "explicit-thread-suspends-stackful"))
635 (func (export "explicit-thread-suspends-stackless") (alias export $c "explicit-thread-suspends-stackless"))
636 (func (export "explicit-thread-yield-loops-sync") (alias export $c "explicit-thread-yield-loops-sync"))
637 (func (export "explicit-thread-yield-loops-stackful") (alias export $c "explicit-thread-yield-loops-stackful"))
638 (func (export "explicit-thread-yield-loops-stackless") (alias export $c "explicit-thread-yield-loops-stackless"))
639 )
640 "#,
641 )?
642 .serialize()?;
643
644 let component = unsafe { Component::deserialize(&engine, &component)? };
645 let mut store = Store::new(&engine, ());
646 let instance = Linker::new(&engine)
647 .instantiate_async(&mut store, &component)
648 .await?;
649 let funcs = vec![
650 "run",
651 "explicit-thread-calls-return-stackful",
652 "explicit-thread-calls-return-stackless",
653 "explicit-thread-suspends-sync",
654 "explicit-thread-suspends-stackful",
655 "explicit-thread-suspends-stackless",
656 "explicit-thread-yield-loops-sync",
657 "explicit-thread-yield-loops-stackful",
658 "explicit-thread-yield-loops-stackless",
659 ];
660 for func in funcs {
661 let func = instance.get_typed_func::<(), (u32,)>(&mut store, func)?;
662 assert_eq!(func.call_async(&mut store, ()).await?, (42,));
663 }
664
665 Ok(())
666 }
667
668 #[tokio::test]
669 #[cfg_attr(miri, ignore)]
cancel_host_future() -> Result<()>670 async fn cancel_host_future() -> Result<()> {
671 let mut config = Config::new();
672 config.wasm_component_model_async(true);
673 let engine = Engine::new(&config)?;
674
675 let component = Component::new(
676 &engine,
677 r#"
678 (component
679 (core module $libc (memory (export "memory") 1))
680 (core instance $libc (instantiate $libc))
681 (core module $m
682 (import "" "future.read" (func $future.read (param i32 i32) (result i32)))
683 (import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32)))
684 (memory (export "memory") 1)
685
686 (func (export "run") (param i32)
687 ;; read/cancel attempt 1
688 (call $future.read (local.get 0) (i32.const 100))
689 i32.const -1 ;; BLOCKED
690 i32.ne
691 if unreachable end
692
693 (call $future.cancel-read (local.get 0))
694 i32.const 2 ;; CANCELLED
695 i32.ne
696 if unreachable end
697
698 ;; read/cancel attempt 2
699 (call $future.read (local.get 0) (i32.const 100))
700 i32.const -1 ;; BLOCKED
701 i32.ne
702 if unreachable end
703
704 (call $future.cancel-read (local.get 0))
705 i32.const 2 ;; CANCELLED
706 i32.ne
707 if unreachable end
708 )
709 )
710
711 (type $f (future u32))
712 (core func $future.read (canon future.read $f async (memory $libc "memory")))
713 (core func $future.cancel-read (canon future.cancel-read $f))
714
715 (core instance $i (instantiate $m
716 (with "" (instance
717 (export "future.read" (func $future.read))
718 (export "future.cancel-read" (func $future.cancel-read))
719 ))
720 ))
721
722 (func (export "run") async (param "f" $f)
723 (canon lift
724 (core func $i "run")
725 (memory $libc "memory")
726 )
727 )
728 )
729 "#,
730 )?;
731
732 let mut store = Store::new(&engine, ());
733 let instance = Linker::new(&engine)
734 .instantiate_async(&mut store, &component)
735 .await?;
736 let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?;
737 let reader = FutureReader::new(&mut store, MyFutureReader)?;
738 func.call_async(&mut store, (reader,)).await?;
739
740 return Ok(());
741
742 struct MyFutureReader;
743
744 impl FutureProducer<()> for MyFutureReader {
745 type Item = u32;
746
747 fn poll_produce(
748 self: Pin<&mut Self>,
749 _cx: &mut Context<'_>,
750 _store: StoreContextMut<()>,
751 finish: bool,
752 ) -> Poll<Result<Option<Self::Item>>> {
753 if finish {
754 Poll::Ready(Ok(None))
755 } else {
756 Poll::Pending
757 }
758 }
759 }
760 }
761
762 #[tokio::test]
763 #[cfg_attr(miri, ignore)]
run_wasm_in_call_async() -> Result<()>764 async fn run_wasm_in_call_async() -> Result<()> {
765 _ = env_logger::try_init();
766
767 let mut config = Config::new();
768 config.wasm_component_model_async(true);
769 let engine = Engine::new(&config)?;
770
771 let a = Component::new(
772 &engine,
773 r#"
774 (component
775 (type $t (func async))
776 (import "a" (func $f (type $t)))
777 (core func $f (canon lower (func $f)))
778 (core module $a
779 (import "" "f" (func $f))
780 (func (export "run") call $f)
781 )
782 (core instance $a (instantiate $a
783 (with "" (instance (export "f" (func $f))))
784 ))
785 (func (export "run") (type $t)
786 (canon lift (core func $a "run")))
787 )
788 "#,
789 )?;
790 let b = Component::new(
791 &engine,
792 r#"
793 (component
794 (type $t (func async))
795 (core module $a
796 (func (export "run"))
797 )
798 (core instance $a (instantiate $a))
799 (func (export "run") (type $t)
800 (canon lift (core func $a "run")))
801 )
802 "#,
803 )?;
804
805 type State = Option<Instance>;
806
807 let mut linker = Linker::new(&engine);
808 linker
809 .root()
810 .func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| {
811 Box::pin(async move {
812 let func = accessor.with(|mut access| {
813 access
814 .get()
815 .unwrap()
816 .get_typed_func::<(), ()>(&mut access, "run")
817 })?;
818 func.call_concurrent(accessor, ()).await?;
819 Ok(())
820 })
821 })?;
822 let mut store = Store::new(&engine, None);
823 let instance_a = linker.instantiate_async(&mut store, &a).await?;
824 let instance_b = linker.instantiate_async(&mut store, &b).await?;
825 *store.data_mut() = Some(instance_b);
826 let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?;
827 run.call_async(&mut store, ()).await?;
828 Ok(())
829 }
830
831 #[tokio::test]
832 #[cfg_attr(miri, ignore)]
require_concurrency_support() -> Result<()>833 async fn require_concurrency_support() -> Result<()> {
834 let mut config = Config::new();
835 config.concurrency_support(false);
836 let engine = Engine::new(&config)?;
837
838 let mut store = Store::new(&engine, ());
839
840 assert!(
841 store
842 .run_concurrent(async |_| wasmtime::error::Ok(()))
843 .await
844 .is_err()
845 );
846
847 assert!(StreamReader::<u32>::new(&mut store, Vec::new()).is_err());
848 assert!(FutureReader::new(&mut store, async { wasmtime::error::Ok(0) }).is_err());
849
850 let mut linker = Linker::<()>::new(&engine);
851 let mut root = linker.root();
852
853 assert!(
854 root.func_wrap_concurrent::<(), (), _>("f1", |_, _| { todo!() })
855 .is_err()
856 );
857 assert!(
858 root.func_new_concurrent("f2", |_, _, _, _| { todo!() })
859 .is_err()
860 );
861 assert!(
862 root.resource_concurrent("f3", ResourceType::host::<u32>(), |_, _| { todo!() })
863 .is_err()
864 );
865
866 Ok(())
867 }
868
869 #[tokio::test]
870 #[cfg_attr(miri, ignore)]
cancel_host_task_does_not_leak() -> Result<()>871 async fn cancel_host_task_does_not_leak() -> Result<()> {
872 let mut config = Config::new();
873 config.wasm_component_model_async(true);
874 let engine = Engine::new(&config)?;
875
876 let mut store = Store::new(&engine, ());
877 let component = Component::new(
878 &engine,
879 r#"(component
880 (import "f" (func $f async))
881
882 (core module $m
883 (import "" "f" (func $f (result i32)))
884 (import "" "cancel" (func $cancel (param i32) (result i32)))
885 (import "" "drop" (func $drop (param i32)))
886 (func (export "run")
887 (local i32)
888
889 ;; start the subtask, asserting it's `STARTED`
890 call $f
891 local.tee 0
892 i32.const 0xf
893 i32.and
894 i32.const 1 ;; STARTED
895 i32.ne
896 if unreachable end
897
898 ;; extract the task id
899 local.get 0
900 i32.const 4
901 i32.shr_u
902 local.set 0
903
904 ;; cancel the subtask asserting it's `RETURN_CANCELLED`
905 local.get 0
906 call $cancel
907 i32.const 4 ;; RETURN_CANCELLED
908 i32.ne
909 if unreachable end
910
911 ;; drop the subtask
912 local.get 0
913 call $drop
914 )
915 )
916 (core func $f (canon lower (func $f) async))
917 (core func $cancel (canon subtask.cancel))
918 (core func $drop (canon subtask.drop))
919 (core instance $i (instantiate $m
920 (with "" (instance
921 (export "f" (func $f))
922 (export "cancel" (func $cancel))
923 (export "drop" (func $drop))
924 ))
925 ))
926
927 (func (export "f") async
928 (canon lift (core func $i "run")))
929
930
931 )"#,
932 )?;
933
934 let mut linker = Linker::new(&engine);
935 linker.root().func_wrap_concurrent("f", |_, ()| {
936 Box::pin(async move {
937 std::future::pending::<()>().await;
938 Ok(())
939 })
940 })?;
941 let instance = linker.instantiate_async(&mut store, &component).await?;
942 let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
943 store
944 .run_concurrent(async |store| -> wasmtime::Result<()> {
945 func.call_concurrent(store, ()).await?;
946
947 for _ in 0..5 {
948 tokio::task::yield_now().await;
949 }
950 Ok(())
951 })
952 .await??;
953
954 // The host task was cancelled, nothing should remain.
955 store.assert_concurrent_state_empty();
956
957 Ok(())
958 }
959
960 #[tokio::test]
961 #[cfg_attr(miri, ignore)]
sync_lower_async_host_does_not_leak() -> Result<()>962 async fn sync_lower_async_host_does_not_leak() -> Result<()> {
963 let mut config = Config::new();
964 config.wasm_component_model_async(true);
965 let engine = Engine::new(&config)?;
966
967 let mut store = Store::new(&engine, 0);
968 let component = Component::new(
969 &engine,
970 r#"(component
971 (import "f" (func $f async))
972
973 (core module $m
974 (import "" "f" (func $f))
975 (func (export "run")
976 (local $c i32)
977
978 ;; call the host 100 times
979 loop
980 call $f
981 (local.tee $c (i32.add (local.get $c) (i32.const 1)))
982 i32.const 100
983 i32.ne
984 if br 1 end
985 end
986 )
987 )
988 (core func $f (canon lower (func $f) ))
989 (core instance $i (instantiate $m
990 (with "" (instance
991 (export "f" (func $f))
992 ))
993 ))
994
995 (func (export "f") async
996 (canon lift (core func $i "run")))
997
998
999 )"#,
1000 )?;
1001
1002 let mut linker = Linker::<usize>::new(&engine);
1003 linker
1004 .root()
1005 .func_wrap_concurrent("f", |accessor, (): ()| {
1006 Box::pin(async move {
1007 // Ensure that this doesn't hit the fast path of "ready on
1008 // first poll"
1009 for _ in 0..5 {
1010 tokio::task::yield_now().await;
1011 }
1012
1013 // Keep track of the maximum size of the table in
1014 // concurrent_state.
1015 accessor.with(|mut s| {
1016 let cur = s.as_context_mut().concurrent_state_table_size();
1017 let max = s.data_mut();
1018 *max = (*max).max(cur);
1019 });
1020 Ok(())
1021 })
1022 })?;
1023 let instance = linker.instantiate_async(&mut store, &component).await?;
1024 let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
1025 func.call_async(&mut store, ()).await?;
1026
1027 // First-level assertion: nothing should remain after the guest has exited.
1028 store.assert_concurrent_state_empty();
1029
1030 // Second-level assertion: state should be incrementally cleaned up along
1031 // the way as the guest calls the host. Things shouldn't leak until the
1032 // guest exits at the end, for example.
1033 assert!(
1034 *store.data() < 100,
1035 "the store peaked at over 100 items in the concurrent table which \
1036 indicates that something isn't getting cleaned up between executions \
1037 of the host"
1038 );
1039
1040 Ok(())
1041 }
1042
1043 /// Regression test: `stream.cancel-read` with `async` option must not corrupt
1044 /// the read state when it returns BLOCKED.
1045 ///
1046 /// Bug: cancel_read/cancel_write unconditionally transitioned the read/write
1047 /// state from GuestReady to Open after the cancel, even when the cancel
1048 /// returned BLOCKED. This destroyed the buffer address/count info, causing
1049 /// an error when the host later tried to access the stream state.
1050 #[tokio::test]
1051 #[cfg_attr(miri, ignore)]
stream_cancel_read_async_does_not_corrupt_state() -> Result<()>1052 async fn stream_cancel_read_async_does_not_corrupt_state() -> Result<()> {
1053 _ = env_logger::try_init();
1054
1055 let mut config = Config::new();
1056 config.wasm_component_model_async(true);
1057 config.wasm_component_model_async_builtins(true);
1058 config.wasm_component_model_async_stackful(true);
1059 let engine = Engine::new(&config)?;
1060
1061 let component = Component::new(
1062 &engine,
1063 r#"
1064 (component
1065 (core module $libc (memory (export "memory") 1))
1066 (core instance $libc (instantiate $libc))
1067 (core module $m
1068 (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1069 (import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
1070 (import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
1071 (import "" "waitable.join" (func $waitable.join (param i32 i32)))
1072 (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
1073 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
1074 (import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
1075 (memory (export "memory") 1)
1076
1077 (func (export "run") (param $sr i32)
1078 (local $cancel_result i32)
1079 (local $ws i32)
1080
1081 ;; Async read into buffer at 0x100, length 4.
1082 ;; Should return BLOCKED (-1) since the host producer never writes.
1083 (call $stream.read (local.get $sr) (i32.const 0x100) (i32.const 4))
1084 i32.const -1 ;; BLOCKED
1085 i32.ne
1086 if unreachable end
1087
1088 ;; Async cancel-read. The host write end is HostReady, so this returns
1089 ;; BLOCKED. Bug: the cancel unconditionally transitions GuestReady -> Open,
1090 ;; destroying the buffer info.
1091 (local.set $cancel_result (call $stream.cancel-read (local.get $sr)))
1092
1093 ;; If cancel returned BLOCKED (-1), wait for the cancel to complete.
1094 ;; This is where the bug manifests: when the host processes the cancel,
1095 ;; it accesses the read state which was corrupted from GuestReady to Open.
1096 (if (i32.eq (local.get $cancel_result) (i32.const -1))
1097 (then
1098 (local.set $ws (call $waitable-set.new))
1099 (call $waitable.join (local.get $sr) (local.get $ws))
1100 ;; Wait for the stream event (cancel completion). Event buffer at 0x200.
1101 (drop (call $waitable-set.wait (local.get $ws) (i32.const 0x200)))
1102 ;; Unjoin stream from waitable-set (join to 0 = unjoin)
1103 (call $waitable.join (local.get $sr) (i32.const 0))
1104 (call $waitable-set.drop (local.get $ws))
1105 )
1106 )
1107
1108 ;; Drop the stream
1109 (call $stream.drop-readable (local.get $sr))
1110 )
1111 )
1112
1113 (type $s (stream u8))
1114 (core func $stream.read (canon stream.read $s async (memory $libc "memory")))
1115 (core func $stream.cancel-read (canon stream.cancel-read $s async))
1116 (core func $stream.drop-readable (canon stream.drop-readable $s))
1117 (canon waitable.join (core func $waitable.join))
1118 (canon waitable-set.new (core func $waitable-set.new))
1119 (canon waitable-set.wait (memory $libc "memory") (core func $waitable-set.wait))
1120 (canon waitable-set.drop (core func $waitable-set.drop))
1121
1122 (core instance $i (instantiate $m
1123 (with "" (instance
1124 (export "stream.read" (func $stream.read))
1125 (export "stream.cancel-read" (func $stream.cancel-read))
1126 (export "stream.drop-readable" (func $stream.drop-readable))
1127 (export "waitable.join" (func $waitable.join))
1128 (export "waitable-set.new" (func $waitable-set.new))
1129 (export "waitable-set.wait" (func $waitable-set.wait))
1130 (export "waitable-set.drop" (func $waitable-set.drop))
1131 ))
1132 ))
1133
1134 (func (export "run") async (param "s" (stream u8))
1135 (canon lift
1136 (core func $i "run")
1137 (memory $libc "memory")
1138 )
1139 )
1140 )
1141 "#,
1142 )?;
1143
1144 let mut store = Store::new(&engine, ());
1145 let instance = Linker::new(&engine)
1146 .instantiate_async(&mut store, &component)
1147 .await?;
1148 let func = instance.get_typed_func::<(StreamReader<u8>,), ()>(&mut store, "run")?;
1149
1150 // Create a host-side stream that never produces data (always Pending).
1151 // When cancel is requested (finish=true), it acknowledges the cancellation.
1152 let reader = StreamReader::new(&mut store, NeverWriteStreamProducer)?;
1153 func.call_async(&mut store, (reader,)).await?;
1154
1155 return Ok(());
1156
1157 struct NeverWriteStreamProducer;
1158
1159 impl StreamProducer<()> for NeverWriteStreamProducer {
1160 type Item = u8;
1161 type Buffer = Option<u8>;
1162
1163 fn poll_produce<'a>(
1164 self: Pin<&mut Self>,
1165 _cx: &mut Context<'_>,
1166 _store: StoreContextMut<'a, ()>,
1167 _destination: Destination<'a, Self::Item, Self::Buffer>,
1168 finish: bool,
1169 ) -> Poll<Result<StreamResult>> {
1170 if finish {
1171 // Cancel requested — acknowledge it.
1172 Poll::Ready(Ok(StreamResult::Cancelled))
1173 } else {
1174 // Never produce data.
1175 Poll::Pending
1176 }
1177 }
1178 }
1179 }
1180
1181 /// Regression test: multiple threads may concurrently make a synchronous
1182 /// call into the same async host function without corrupting state.
1183 ///
1184 /// Bug: waitable sets for host calls used to be shared across all threads, so if two threads
1185 /// called a sync-lowered async host function concurrently, the waitable set state got overwritten.
1186 #[tokio::test]
1187 #[cfg_attr(miri, ignore)]
concurrent_sync_calls_to_async_host() -> Result<()>1188 async fn concurrent_sync_calls_to_async_host() -> Result<()> {
1189 _ = env_logger::try_init();
1190
1191 let mut config = Config::new();
1192 config.wasm_component_model_async(true);
1193 config.wasm_component_model_async_builtins(true);
1194 config.wasm_component_model_async_stackful(true);
1195 config.wasm_component_model_threading(true);
1196 let engine = Engine::new(&config)?;
1197 let mut store = Store::new(&engine, 0);
1198
1199 let component = Component::new(
1200 &engine,
1201 r#"(component
1202 (import "await-three-calls" (func $await-three-calls async))
1203
1204 (core module $libc
1205 (table (export "__indirect_function_table") 1 funcref))
1206
1207 (core module $m
1208 (import "" "await-three-calls" (func $await-three-calls))
1209 (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
1210 (import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
1211 (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref))
1212
1213 (func (export "run")
1214 (call $thread-new-indirect (i32.const 0) (i32.const 0))
1215 (call $thread-unsuspend)
1216 (call $thread-new-indirect (i32.const 0) (i32.const 0))
1217 (call $thread-unsuspend)
1218 (call $await-three-calls)
1219 )
1220 (func $thread-entry (param i32)
1221 (call $await-three-calls)
1222 )
1223 (elem (table $indirect-function-table) (i32.const 0) func $thread-entry)
1224 )
1225 ;; Instantiate the libc module to get the table
1226 (core instance $libc (instantiate $libc))
1227 ;; Get access to `thread.new-indirect` that uses the table from libc
1228 (core type $start-func-ty (func (param i32)))
1229 (alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
1230 (core func $thread-new-indirect
1231 (canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
1232 (core func $thread-unsuspend (canon thread.unsuspend))
1233
1234 (core func $await-three-calls (canon lower (func $await-three-calls) ))
1235 (core instance $i (instantiate $m
1236 (with "" (instance
1237 (export "await-three-calls" (func $await-three-calls))
1238 (export "thread.new-indirect" (func $thread-new-indirect))
1239 (export "thread.unsuspend" (func $thread-unsuspend))
1240 ))
1241 (with "libc" (instance $libc))
1242 ))
1243 (func (export "run") async
1244 (canon lift (core func $i "run")))
1245 )"#,
1246 )?;
1247
1248 let mut linker = Linker::<i32>::new(&engine);
1249 linker
1250 .root()
1251 .func_wrap_concurrent("await-three-calls", |accessor, (): ()| {
1252 Box::pin(async move {
1253 accessor.with(|mut s| {
1254 *s.data_mut() += 1;
1255 });
1256 while accessor.with(|mut s| *s.data_mut()) < 3 {
1257 tokio::task::yield_now().await;
1258 }
1259 Ok(())
1260 })
1261 })?;
1262 let instance = linker.instantiate_async(&mut store, &component).await?;
1263 let func = instance.get_typed_func::<(), ()>(&mut store, "run")?;
1264 func.call_async(&mut store, ()).await?;
1265
1266 store.assert_concurrent_state_empty();
1267
1268 Ok(())
1269 }
1270
1271 #[tokio::test]
1272 #[cfg_attr(miri, ignore)]
bytes_stream_producer() -> Result<()>1273 async fn bytes_stream_producer() -> Result<()> {
1274 let mut config = Config::new();
1275 config.wasm_component_model_async(true);
1276 let engine = Engine::new(&config)?;
1277
1278 let component = Component::new(
1279 &engine,
1280 r#"
1281 (component
1282 (core module $libc (memory (export "mem") 1))
1283 (core instance $libc (instantiate $libc))
1284 (core module $m
1285 (import "" "mem" (memory 1))
1286 (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1287
1288 (func (export "read") (param i32 i32) (result i32)
1289 (call $stream.read (local.get 0) (i32.const 0) (local.get 1))
1290 )
1291 )
1292 (type $s (stream u8))
1293 (core func $stream.read (canon stream.read $s async (memory $libc "mem")))
1294 (core instance $i (instantiate $m
1295 (with "" (instance
1296 (export "mem" (memory $libc "mem"))
1297 (export "stream.read" (func $stream.read))
1298 ))
1299 ))
1300 (func (export "read") (param "s" (stream u8)) (param "l" u32) (result u32)
1301 (canon lift (core func $i "read")))
1302 )
1303 "#,
1304 )?;
1305
1306 let linker = Linker::new(&engine);
1307 let mut store = Store::new(&engine, ());
1308 let instance = linker.instantiate_async(&mut store, &component).await?;
1309 let func = instance.get_typed_func::<(StreamReader<u8>, u32), (u32,)>(&mut store, "read")?;
1310
1311 // read less than the capacity
1312 let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1313 assert_eq!(
1314 func.call_async(&mut store, (reader, 1)).await?,
1315 ((1 << 4) | 0,),
1316 );
1317
1318 // read more than the capacity
1319 let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1320 assert_eq!(
1321 func.call_async(&mut store, (reader, 100)).await?,
1322 ((5 << 4) | 1,),
1323 );
1324
1325 Ok(())
1326 }
1327