1 use crate::async_functions::{PollOnce, execute_across_threads};
2 use std::pin::Pin;
3 use std::task::{Context, Poll};
4 use wasmtime::Result;
5 use wasmtime::{AsContextMut, Config, Engine, Store, StoreContextMut, Trap, component::*};
6 use wasmtime_component_util::REALLOC_AND_FREE;
7 
8 /// This is super::func::thunks, except with an async store.
9 #[tokio::test]
10 #[cfg_attr(miri, ignore)]
smoke() -> Result<()>11 async fn smoke() -> Result<()> {
12     let component = r#"
13         (component
14             (core module $m
15                 (func (export "thunk"))
16                 (func (export "thunk-trap") unreachable)
17             )
18             (core instance $i (instantiate $m))
19             (func (export "thunk")
20                 (canon lift (core func $i "thunk"))
21             )
22             (func (export "thunk-trap")
23                 (canon lift (core func $i "thunk-trap"))
24             )
25         )
26     "#;
27 
28     let engine = super::async_engine();
29     let component = Component::new(&engine, component)?;
30     let mut store = Store::new(&engine, ());
31     let instance = Linker::new(&engine)
32         .instantiate_async(&mut store, &component)
33         .await?;
34 
35     let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
36 
37     thunk.call_async(&mut store, ()).await?;
38 
39     let err = instance
40         .get_typed_func::<(), ()>(&mut store, "thunk-trap")?
41         .call_async(&mut store, ())
42         .await
43         .unwrap_err();
44     assert_eq!(err.downcast::<Trap>()?, Trap::UnreachableCodeReached);
45 
46     Ok(())
47 }
48 
49 /// Handle an import function, created using component::Linker::func_wrap_async.
50 #[tokio::test]
51 #[cfg_attr(miri, ignore)]
smoke_func_wrap() -> Result<()>52 async fn smoke_func_wrap() -> Result<()> {
53     let component = r#"
54         (component
55             (type $f (func))
56             (import "i" (func $f))
57 
58             (core module $m
59                 (import "imports" "i" (func $i))
60                 (func (export "thunk") call $i)
61             )
62 
63             (core func $f (canon lower (func $f)))
64             (core instance $i (instantiate $m
65                 (with "imports" (instance
66                     (export "i" (func $f))
67                 ))
68              ))
69             (func (export "thunk")
70                 (canon lift (core func $i "thunk"))
71             )
72         )
73     "#;
74 
75     let engine = super::async_engine();
76     let component = Component::new(&engine, component)?;
77     let mut store = Store::new(&engine, ());
78     let mut linker = Linker::new(&engine);
79     let mut root = linker.root();
80     root.func_wrap_async("i", |_: StoreContextMut<()>, _: ()| {
81         Box::new(async { Ok(()) })
82     })?;
83 
84     let instance = linker.instantiate_async(&mut store, &component).await?;
85 
86     let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
87 
88     thunk.call_async(&mut store, ()).await?;
89 
90     Ok(())
91 }
92 
93 // This test stresses TLS management in combination with the `realloc` option
94 // for imported functions. This will create an async computation which invokes a
95 // component that invokes an imported function. The imported function returns a
96 // list which will require invoking malloc.
97 //
98 // As an added stressor all polls are sprinkled across threads through
99 // `execute_across_threads`. Yields are injected liberally by configuring 1
100 // fuel consumption to trigger a yield.
101 //
102 // Overall a yield should happen during malloc which should be an "interesting
103 // situation" with respect to the runtime.
104 #[tokio::test]
105 #[cfg_attr(miri, ignore)]
resume_separate_thread() -> Result<()>106 async fn resume_separate_thread() -> Result<()> {
107     let mut config = wasmtime_test_util::component::config();
108     config.consume_fuel(true);
109     let engine = Engine::new(&config)?;
110     let component = format!(
111         r#"
112             (component
113                 (import "yield" (func $yield (result (list u8))))
114                 (core module $libc
115                     (memory (export "memory") 1)
116                     {REALLOC_AND_FREE}
117                 )
118                 (core instance $libc (instantiate $libc))
119 
120                 (core func $yield
121                     (canon lower
122                         (func $yield)
123                         (memory $libc "memory")
124                         (realloc (func $libc "realloc"))
125                     )
126                 )
127 
128                 (core module $m
129                     (import "" "yield" (func $yield (param i32)))
130                     (import "libc" "memory" (memory 0))
131                     (func $start
132                         i32.const 8
133                         call $yield
134                     )
135                     (start $start)
136                 )
137                 (core instance (instantiate $m
138                     (with "" (instance (export "yield" (func $yield))))
139                     (with "libc" (instance $libc))
140                 ))
141             )
142         "#
143     );
144     let component = Component::new(&engine, component)?;
145     let mut linker = Linker::new(&engine);
146     linker
147         .root()
148         .func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| {
149             Box::new(async {
150                 tokio::task::yield_now().await;
151                 Ok((vec![1u8, 2u8],))
152             })
153         })?;
154 
155     execute_across_threads(async move {
156         let mut store = Store::new(&engine, ());
157         store.set_fuel(u64::MAX).unwrap();
158         store.fuel_async_yield_interval(Some(1)).unwrap();
159         linker.instantiate_async(&mut store, &component).await?;
160         Ok::<_, wasmtime::Error>(())
161     })
162     .await?;
163     Ok(())
164 }
165 
166 // This test is intended to stress TLS management in the component model around
167 // the management of the `realloc` function. This creates an async computation
168 // representing the execution of a component model function where entry into the
169 // component uses `realloc` and then the component runs. This async computation
170 // is then polled iteratively with another "wasm activation" (in this case a
171 // core wasm function) on the stack. The poll-per-call should work and nothing
172 // should in theory have problems here.
173 //
174 // As an added stressor all polls are sprinkled across threads through
175 // `execute_across_threads`. Yields are injected liberally by configuring 1
176 // fuel consumption to trigger a yield.
177 //
178 // Overall a yield should happen during malloc which should be an "interesting
179 // situation" with respect to the runtime.
180 #[tokio::test]
181 #[cfg_attr(miri, ignore)]
poll_through_wasm_activation() -> Result<()>182 async fn poll_through_wasm_activation() -> Result<()> {
183     let mut config = wasmtime_test_util::component::config();
184     config.consume_fuel(true);
185     let engine = Engine::new(&config)?;
186     let component = format!(
187         r#"
188             (component
189                 (core module $m
190                     {REALLOC_AND_FREE}
191                     (memory (export "memory") 1)
192                     (func (export "run") (param i32 i32)
193                     )
194                 )
195                 (core instance $i (instantiate $m))
196                 (func (export "run") (param "x" (list u8))
197                     (canon lift (core func $i "run")
198                                 (memory $i "memory")
199                                 (realloc (func $i "realloc"))))
200             )
201         "#
202     );
203     let component = Component::new(&engine, component)?;
204     let linker = Linker::new(&engine);
205 
206     let invoke_component = {
207         let engine = engine.clone();
208         async move {
209             let mut store = Store::new(&engine, ());
210             store.set_fuel(u64::MAX).unwrap();
211             store.fuel_async_yield_interval(Some(1)).unwrap();
212             let instance = linker.instantiate_async(&mut store, &component).await?;
213             let func = instance.get_typed_func::<(Vec<u8>,), ()>(&mut store, "run")?;
214             func.call_async(&mut store, (vec![1, 2, 3],)).await?;
215             Ok::<_, wasmtime::Error>(())
216         }
217     };
218 
219     execute_across_threads(async move {
220         let mut store = Store::new(&engine, Some(Box::pin(invoke_component)));
221         let poll_once = wasmtime::Func::wrap_async(&mut store, |mut cx, _: ()| {
222             let invoke_component = cx.data_mut().take().unwrap();
223             Box::new(async move {
224                 match PollOnce::new(invoke_component).await {
225                     Ok(result) => {
226                         result?;
227                         Ok(1)
228                     }
229                     Err(future) => {
230                         *cx.data_mut() = Some(future);
231                         Ok(0)
232                     }
233                 }
234             })
235         });
236         let poll_once = poll_once.typed::<(), i32>(&mut store)?;
237         while poll_once.call_async(&mut store, ()).await? != 1 {
238             // loop around to call again
239         }
240         Ok::<_, wasmtime::Error>(())
241     })
242     .await?;
243     Ok(())
244 }
245 
246 /// Test async drop method for host resources.
247 #[tokio::test]
248 #[cfg_attr(miri, ignore)]
drop_resource_async() -> Result<()>249 async fn drop_resource_async() -> Result<()> {
250     use std::sync::Arc;
251     use std::sync::Mutex;
252 
253     let engine = super::async_engine();
254     let c = Component::new(
255         &engine,
256         r#"
257             (component
258                 (import "t" (type $t (sub resource)))
259 
260                 (core func $drop (canon resource.drop $t))
261 
262                 (core module $m
263                     (import "" "drop" (func $drop (param i32)))
264                     (func (export "f") (param i32)
265                         (call $drop (local.get 0))
266                     )
267                 )
268                 (core instance $i (instantiate $m
269                     (with "" (instance
270                         (export "drop" (func $drop))
271                     ))
272                 ))
273 
274                 (func (export "f") (param "x" (own $t))
275                     (canon lift (core func $i "f")))
276             )
277         "#,
278     )?;
279 
280     struct MyType;
281 
282     let mut store = Store::new(&engine, ());
283     let mut linker = Linker::new(&engine);
284 
285     let drop_status = Arc::new(Mutex::new("not dropped"));
286     let ds = drop_status.clone();
287 
288     linker
289         .root()
290         .resource_async("t", ResourceType::host::<MyType>(), move |_, _| {
291             let ds = ds.clone();
292             Box::new(async move {
293                 *ds.lock().unwrap() = "before yield";
294                 tokio::task::yield_now().await;
295                 *ds.lock().unwrap() = "after yield";
296                 Ok(())
297             })
298         })?;
299     let i = linker.instantiate_async(&mut store, &c).await?;
300     let f = i.get_typed_func::<(Resource<MyType>,), ()>(&mut store, "f")?;
301 
302     execute_across_threads(async move {
303         let resource = Resource::new_own(100);
304         f.call_async(&mut store, (resource,)).await?;
305         Ok::<_, wasmtime::Error>(())
306     })
307     .await?;
308 
309     assert_eq!("after yield", *drop_status.lock().unwrap());
310 
311     Ok(())
312 }
313 
314 /// Test task deletion in three situations, for every combination of lift/lower/(guest/host):
315 /// 1. An explicit thread calls task.return
316 /// 2. An explicit thread suspends indefinitely
317 /// 3. An explicit thread yield loops indefinitely
318 #[tokio::test]
319 #[cfg_attr(miri, ignore)]
task_deletion() -> Result<()>320 async fn task_deletion() -> Result<()> {
321     let mut config = Config::new();
322     config.wasm_component_model_async(true);
323     config.wasm_component_model_threading(true);
324     config.wasm_component_model_async_stackful(true);
325     config.wasm_component_model_async_builtins(true);
326     let engine = Engine::new(&config)?;
327     let component = Component::new(
328         &engine,
329         r#"(component
330     (component $C
331         (core module $Memory (memory (export "mem") 1))
332         (core instance $memory (instantiate $Memory))
333         ;; Defines the table for the thread start functions
334         (core module $libc
335             (table (export "__indirect_function_table") 3 funcref))
336         (core module $CM
337             (import "" "mem" (memory 1))
338             (import "" "task.return" (func $task-return (param i32)))
339             (import "" "task.cancel" (func $task-cancel))
340             (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
341             (import "" "thread.suspend" (func $thread-suspend (result i32)))
342             (import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32)))
343             (import "" "thread.yield-to-suspended" (func $thread-yield-to-suspended (param i32) (result i32)))
344             (import "" "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable (param i32) (result i32)))
345             (import "" "thread.suspend-to" (func $thread-suspend-to (param i32) (result i32)))
346             (import "" "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable (param i32) (result i32)))
347             (import "" "thread.yield" (func $thread-yield (result i32)))
348             (import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32)))
349             (import "" "thread.index" (func $thread-index (result i32)))
350             (import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
351             (import "" "waitable.join" (func $waitable.join (param i32 i32)))
352             (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
353             (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
354             (import "libc" "__indirect_function_table" (table $indirect-function-table 3 funcref))
355 
356             ;; Indices into the function table for the thread start functions
357             (global $call-return-ftbl-idx i32 (i32.const 0))
358             (global $suspend-ftbl-idx i32 (i32.const 1))
359             (global $yield-loop-ftbl-idx i32 (i32.const 2))
360 
361             (func $call-return (param i32)
362                 (call $task-return (local.get 0)))
363 
364             (func $suspend (param i32)
365                 (drop (call $thread-suspend)))
366 
367             (func $yield-loop (param i32)
368                 (loop $top
369                     (drop (call $thread-yield))
370                     (br $top)))
371 
372             (func (export "explicit-thread-calls-return-stackful")
373                 (call $thread-unsuspend
374                     (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42))))
375 
376             (func (export "explicit-thread-calls-return-stackless") (result i32)
377                 (call $thread-unsuspend
378                     (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42)))
379                 (i32.const 0 (; EXIT ;)))
380 
381             (func (export "cb") (param i32 i32 i32) (result i32)
382                 (unreachable))
383 
384             (func (export "explicit-thread-suspends-sync") (result i32)
385                 (call $thread-unsuspend
386                     (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
387                 (i32.const 42))
388 
389             (func (export "explicit-thread-suspends-stackful")
390                 (call $thread-unsuspend
391                     (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
392                 (call $task-return (i32.const 42)))
393 
394             (func (export "explicit-thread-suspends-stackless") (result i32)
395                 (call $thread-unsuspend
396                     (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
397                 (call $task-return (i32.const 42))
398                 (i32.const 0))
399 
400             (func (export "explicit-thread-yield-loops-sync") (result i32)
401                 (call $thread-unsuspend
402                     (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
403                 (i32.const 42))
404 
405             (func (export "explicit-thread-yield-loops-stackful")
406                 (call $thread-unsuspend
407                     (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
408                 (call $task-return (i32.const 42)))
409 
410             (func (export "explicit-thread-yield-loops-stackless") (result i32)
411                 (call $thread-unsuspend
412                     (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
413                 (call $task-return (i32.const 42))
414                 (i32.const 0 (; EXIT ;)))
415 
416             ;; Initialize the function table that will be used by thread.new-indirect
417             (elem (table $indirect-function-table) (i32.const 0 (; call-return-ftbl-idx ;)) func $call-return)
418             (elem (table $indirect-function-table) (i32.const 1 (; suspend-ftbl-idx ;)) func $suspend)
419             (elem (table $indirect-function-table) (i32.const 2 (; yield-loop-ftbl-idx ;)) func $yield-loop)
420         )
421 
422         ;; Instantiate the libc module to get the table
423         (core instance $libc (instantiate $libc))
424         ;; Get access to `thread.new-indirect` that uses the table from libc
425         (core type $start-func-ty (func (param i32)))
426         (alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
427 
428         (core func $task-return (canon task.return (result u32)))
429         (core func $task-cancel (canon task.cancel))
430         (core func $thread-new-indirect
431             (canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
432         (core func $thread-yield (canon thread.yield))
433         (core func $thread-yield-cancellable (canon thread.yield cancellable))
434         (core func $thread-index (canon thread.index))
435         (core func $thread-yield-to-suspended (canon thread.yield-to-suspended))
436         (core func $thread-yield-to-suspended-cancellable (canon thread.yield-to-suspended cancellable))
437         (core func $thread-unsuspend (canon thread.unsuspend))
438         (core func $thread-suspend-to (canon thread.suspend-to))
439         (core func $thread-suspend-to-cancellable (canon thread.suspend-to cancellable))
440         (core func $thread-suspend (canon thread.suspend))
441         (core func $thread-suspend-cancellable (canon thread.suspend cancellable))
442         (core func $waitable-set.new (canon waitable-set.new))
443         (core func $waitable.join (canon waitable.join))
444         (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
445 
446         ;; Instantiate the main module
447         (core instance $cm (
448             instantiate $CM
449                 (with "" (instance
450                     (export "mem" (memory $memory "mem"))
451                     (export "task.return" (func $task-return))
452                     (export "task.cancel" (func $task-cancel))
453                     (export "thread.new-indirect" (func $thread-new-indirect))
454                     (export "thread.index" (func $thread-index))
455                     (export "thread.yield-to-suspended" (func $thread-yield-to-suspended))
456                     (export "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable))
457                     (export "thread.yield" (func $thread-yield))
458                     (export "thread.yield-cancellable" (func $thread-yield-cancellable))
459                     (export "thread.suspend-to" (func $thread-suspend-to))
460                     (export "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable))
461                     (export "thread.suspend" (func $thread-suspend))
462                     (export "thread.suspend-cancellable" (func $thread-suspend-cancellable))
463                     (export "thread.unsuspend" (func $thread-unsuspend))
464                     (export "waitable.join" (func $waitable.join))
465                     (export "waitable-set.wait" (func $waitable-set.wait))
466                     (export "waitable-set.new" (func $waitable-set.new))))
467                 (with "libc" (instance $libc))))
468 
469         (func (export "explicit-thread-calls-return-stackful") async (result u32)
470             (canon lift (core func $cm "explicit-thread-calls-return-stackful") async))
471         (func (export "explicit-thread-calls-return-stackless") async (result u32)
472             (canon lift (core func $cm "explicit-thread-calls-return-stackless") async (callback (func $cm "cb"))))
473         (func (export "explicit-thread-suspends-sync") async (result u32)
474             (canon lift (core func $cm "explicit-thread-suspends-sync")))
475         (func (export "explicit-thread-suspends-stackful") async (result u32)
476             (canon lift (core func $cm "explicit-thread-suspends-stackful") async))
477         (func (export "explicit-thread-suspends-stackless") async (result u32)
478             (canon lift (core func $cm "explicit-thread-suspends-stackless") async (callback (func $cm "cb"))))
479         (func (export "explicit-thread-yield-loops-sync") async (result u32)
480             (canon lift (core func $cm "explicit-thread-yield-loops-sync")))
481         (func (export "explicit-thread-yield-loops-stackful") async (result u32)
482             (canon lift (core func $cm "explicit-thread-yield-loops-stackful") async))
483         (func (export "explicit-thread-yield-loops-stackless") async (result u32)
484             (canon lift (core func $cm "explicit-thread-yield-loops-stackless") async (callback (func $cm "cb"))))
485     )
486 
487     (component $D
488         (import "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful async (result u32)))
489         (import "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless async (result u32)))
490         (import "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync async (result u32)))
491         (import "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful async (result u32)))
492         (import "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless async (result u32)))
493         (import "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync async (result u32)))
494         (import "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful async (result u32)))
495         (import "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless async (result u32)))
496 
497         (core module $Memory (memory (export "mem") 1))
498         (core instance $memory (instantiate $Memory))
499         (core module $DM
500             (import "" "mem" (memory 1))
501             (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
502             ;; sync lowered
503             (import "" "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful (result i32)))
504             (import "" "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless (result i32)))
505             (import "" "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync (result i32)))
506             (import "" "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful (result i32)))
507             (import "" "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless (result i32)))
508             (import "" "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync (result i32)))
509             (import "" "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful (result i32)))
510             (import "" "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless (result i32)))
511             ;; async lowered
512             (import "" "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async (param i32) (result i32)))
513             (import "" "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async (param i32) (result i32)))
514             (import "" "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async (param i32) (result i32)))
515             (import "" "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async (param i32) (result i32)))
516             (import "" "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async (param i32) (result i32)))
517             (import "" "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async (param i32) (result i32)))
518             (import "" "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async (param i32) (result i32)))
519             (import "" "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async (param i32) (result i32)))
520             (import "" "waitable.join" (func $waitable.join (param i32 i32)))
521             (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
522             (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
523             (import "" "thread.yield" (func $thread-yield (result i32)))
524 
525             (func $check (param i32)
526                 (if (i32.ne (local.get 0) (i32.const 42))
527                     (then unreachable))
528             )
529 
530             (func $check-async (param i32)
531                 (local $retp i32) (local $ws i32) (local $ws-retp i32)
532                 (local.set $retp (i32.const 8))
533                 (local.set $ws-retp (i32.const 16))
534                 (local.set $ws (call $waitable-set.new))
535 
536                 (if (i32.eq (i32.and (local.get 0) (i32.const 0xF)) (i32.const 2 (; RETURNED ;)))
537                     (then (call $check (i32.load (local.get $retp))))
538                     (else
539                         (call $waitable.join (i32.shr_u (local.get 0) (i32.const 4)) (local.get $ws))
540                         (drop (call $waitable-set.wait (local.get $ws) (local.get $ws-retp)))
541                         (call $check (i32.load (local.get $retp)))))
542             )
543 
544             (func $run (export "run") (result i32)
545                 (local $retp i32)
546                 (local.set $retp (i32.const 8))
547                 (call $check (call $explicit-thread-calls-return-stackless))
548                 (call $check (call $explicit-thread-calls-return-stackful))
549                 (call $check (call $explicit-thread-suspends-sync))
550                 (call $check (call $explicit-thread-suspends-stackful))
551                 (call $check (call $explicit-thread-suspends-stackless))
552                 (call $check (call $explicit-thread-yield-loops-sync))
553                 (call $check (call $explicit-thread-yield-loops-stackful))
554                 (call $check (call $explicit-thread-yield-loops-stackless))
555 
556                 (call $check-async (call $explicit-thread-calls-return-stackless-async (local.get $retp)))
557                 (call $check-async (call $explicit-thread-calls-return-stackful-async (local.get $retp)))
558                 (call $check-async (call $explicit-thread-suspends-sync-async (local.get $retp)))
559                 (call $check-async (call $explicit-thread-suspends-stackful-async (local.get $retp)))
560                 (call $check-async (call $explicit-thread-suspends-stackless-async (local.get $retp)))
561                 (call $check-async (call $explicit-thread-yield-loops-sync-async (local.get $retp)))
562                 (call $check-async (call $explicit-thread-yield-loops-stackful-async (local.get $retp)))
563                 (call $check-async (call $explicit-thread-yield-loops-stackless-async (local.get $retp)))
564 
565                 (i32.const 42)
566             )
567         )
568 
569         (core func $waitable-set.new (canon waitable-set.new))
570         (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
571         (core func $waitable.join (canon waitable.join))
572         (core func $subtask.cancel (canon subtask.cancel async))
573         (core func $thread.yield (canon thread.yield))
574         ;; sync lowered
575         (canon lower (func $explicit-thread-calls-return-stackful) (memory $memory "mem") (core func $explicit-thread-calls-return-stackful'))
576         (canon lower (func $explicit-thread-calls-return-stackless) (memory $memory "mem") (core func $explicit-thread-calls-return-stackless'))
577         (canon lower (func $explicit-thread-suspends-sync) (memory $memory "mem") (core func $explicit-thread-suspends-sync'))
578         (canon lower (func $explicit-thread-suspends-stackful) (memory $memory "mem") (core func $explicit-thread-suspends-stackful'))
579         (canon lower (func $explicit-thread-suspends-stackless) (memory $memory "mem") (core func $explicit-thread-suspends-stackless'))
580         (canon lower (func $explicit-thread-yield-loops-sync) (memory $memory "mem") (core func $explicit-thread-yield-loops-sync'))
581         (canon lower (func $explicit-thread-yield-loops-stackful) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful'))
582         (canon lower (func $explicit-thread-yield-loops-stackless) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless'))
583         ;; async lowered
584         (canon lower (func $explicit-thread-calls-return-stackful) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackful-async'))
585         (canon lower (func $explicit-thread-calls-return-stackless) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackless-async'))
586         (canon lower (func $explicit-thread-suspends-sync) async (memory $memory "mem") (core func $explicit-thread-suspends-sync-async'))
587         (canon lower (func $explicit-thread-suspends-stackful) async (memory $memory "mem") (core func $explicit-thread-suspends-stackful-async'))
588         (canon lower (func $explicit-thread-suspends-stackless) async (memory $memory "mem") (core func $explicit-thread-suspends-stackless-async'))
589         (canon lower (func $explicit-thread-yield-loops-sync) async (memory $memory "mem") (core func $explicit-thread-yield-loops-sync-async'))
590         (canon lower (func $explicit-thread-yield-loops-stackful) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful-async'))
591         (canon lower (func $explicit-thread-yield-loops-stackless) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless-async'))
592         (core instance $dm (instantiate $DM (with "" (instance
593             (export "mem" (memory $memory "mem"))
594             (export "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful'))
595             (export "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless'))
596             (export "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync'))
597             (export "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful'))
598             (export "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless'))
599             (export "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync'))
600             (export "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful'))
601             (export "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless'))
602             (export "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async'))
603             (export "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async'))
604             (export "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async'))
605             (export "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async'))
606             (export "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async'))
607             (export "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async'))
608             (export "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async'))
609             (export "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async'))
610             (export "waitable.join" (func $waitable.join))
611             (export "waitable-set.new" (func $waitable-set.new))
612             (export "waitable-set.wait" (func $waitable-set.wait))
613             (export "subtask.cancel" (func $subtask.cancel))
614             (export "thread.yield" (func $thread.yield))
615         ))))
616         (func (export "run") async (result u32) (canon lift (core func $dm "run")))
617     )
618 
619     (instance $c (instantiate $C))
620     (instance $d (instantiate $D
621         (with "explicit-thread-calls-return-stackful" (func $c "explicit-thread-calls-return-stackful"))
622         (with "explicit-thread-calls-return-stackless" (func $c "explicit-thread-calls-return-stackless"))
623         (with "explicit-thread-suspends-sync" (func $c "explicit-thread-suspends-sync"))
624         (with "explicit-thread-suspends-stackful" (func $c "explicit-thread-suspends-stackful"))
625         (with "explicit-thread-suspends-stackless" (func $c "explicit-thread-suspends-stackless"))
626         (with "explicit-thread-yield-loops-sync" (func $c "explicit-thread-yield-loops-sync"))
627         (with "explicit-thread-yield-loops-stackful" (func $c "explicit-thread-yield-loops-stackful"))
628         (with "explicit-thread-yield-loops-stackless" (func $c "explicit-thread-yield-loops-stackless"))
629     ))
630   (func (export "run") (alias export $d "run"))
631   (func (export "explicit-thread-calls-return-stackful") (alias export $c "explicit-thread-calls-return-stackful"))
632   (func (export "explicit-thread-calls-return-stackless") (alias export $c "explicit-thread-calls-return-stackless"))
633   (func (export "explicit-thread-suspends-sync") (alias export $c "explicit-thread-suspends-sync"))
634   (func (export "explicit-thread-suspends-stackful") (alias export $c "explicit-thread-suspends-stackful"))
635   (func (export "explicit-thread-suspends-stackless") (alias export $c "explicit-thread-suspends-stackless"))
636   (func (export "explicit-thread-yield-loops-sync") (alias export $c "explicit-thread-yield-loops-sync"))
637   (func (export "explicit-thread-yield-loops-stackful") (alias export $c "explicit-thread-yield-loops-stackful"))
638   (func (export "explicit-thread-yield-loops-stackless") (alias export $c "explicit-thread-yield-loops-stackless"))
639 )
640         "#,
641     )?
642     .serialize()?;
643 
644     let component = unsafe { Component::deserialize(&engine, &component)? };
645     let mut store = Store::new(&engine, ());
646     let instance = Linker::new(&engine)
647         .instantiate_async(&mut store, &component)
648         .await?;
649     let funcs = vec![
650         "run",
651         "explicit-thread-calls-return-stackful",
652         "explicit-thread-calls-return-stackless",
653         "explicit-thread-suspends-sync",
654         "explicit-thread-suspends-stackful",
655         "explicit-thread-suspends-stackless",
656         "explicit-thread-yield-loops-sync",
657         "explicit-thread-yield-loops-stackful",
658         "explicit-thread-yield-loops-stackless",
659     ];
660     for func in funcs {
661         let func = instance.get_typed_func::<(), (u32,)>(&mut store, func)?;
662         assert_eq!(func.call_async(&mut store, ()).await?, (42,));
663     }
664 
665     Ok(())
666 }
667 
668 #[tokio::test]
669 #[cfg_attr(miri, ignore)]
cancel_host_future() -> Result<()>670 async fn cancel_host_future() -> Result<()> {
671     let mut config = Config::new();
672     config.wasm_component_model_async(true);
673     let engine = Engine::new(&config)?;
674 
675     let component = Component::new(
676         &engine,
677         r#"
678 (component
679   (core module $libc (memory (export "memory") 1))
680   (core instance $libc (instantiate $libc))
681   (core module $m
682     (import "" "future.read" (func $future.read (param i32 i32) (result i32)))
683     (import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32)))
684     (memory (export "memory") 1)
685 
686     (func (export "run") (param i32)
687       ;; read/cancel attempt 1
688       (call $future.read (local.get 0) (i32.const 100))
689       i32.const -1 ;; BLOCKED
690       i32.ne
691       if unreachable end
692 
693       (call $future.cancel-read (local.get 0))
694       i32.const 2 ;; CANCELLED
695       i32.ne
696       if unreachable end
697 
698       ;; read/cancel attempt 2
699       (call $future.read (local.get 0) (i32.const 100))
700       i32.const -1 ;; BLOCKED
701       i32.ne
702       if unreachable end
703 
704       (call $future.cancel-read (local.get 0))
705       i32.const 2 ;; CANCELLED
706       i32.ne
707       if unreachable end
708     )
709   )
710 
711   (type $f (future u32))
712   (core func $future.read (canon future.read $f async (memory $libc "memory")))
713   (core func $future.cancel-read (canon future.cancel-read $f))
714 
715   (core instance $i (instantiate $m
716     (with "" (instance
717       (export "future.read" (func $future.read))
718       (export "future.cancel-read" (func $future.cancel-read))
719     ))
720   ))
721 
722   (func (export "run") async (param "f" $f)
723     (canon lift
724       (core func $i "run")
725       (memory $libc "memory")
726     )
727   )
728 )
729         "#,
730     )?;
731 
732     let mut store = Store::new(&engine, ());
733     let instance = Linker::new(&engine)
734         .instantiate_async(&mut store, &component)
735         .await?;
736     let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?;
737     let reader = FutureReader::new(&mut store, MyFutureReader)?;
738     func.call_async(&mut store, (reader,)).await?;
739 
740     return Ok(());
741 
742     struct MyFutureReader;
743 
744     impl FutureProducer<()> for MyFutureReader {
745         type Item = u32;
746 
747         fn poll_produce(
748             self: Pin<&mut Self>,
749             _cx: &mut Context<'_>,
750             _store: StoreContextMut<()>,
751             finish: bool,
752         ) -> Poll<Result<Option<Self::Item>>> {
753             if finish {
754                 Poll::Ready(Ok(None))
755             } else {
756                 Poll::Pending
757             }
758         }
759     }
760 }
761 
762 #[tokio::test]
763 #[cfg_attr(miri, ignore)]
run_wasm_in_call_async() -> Result<()>764 async fn run_wasm_in_call_async() -> Result<()> {
765     _ = env_logger::try_init();
766 
767     let mut config = Config::new();
768     config.wasm_component_model_async(true);
769     let engine = Engine::new(&config)?;
770 
771     let a = Component::new(
772         &engine,
773         r#"
774 (component
775   (type $t (func async))
776   (import "a" (func $f (type $t)))
777   (core func $f (canon lower (func $f)))
778   (core module $a
779     (import "" "f" (func $f))
780     (func (export "run") call $f)
781   )
782   (core instance $a (instantiate $a
783     (with "" (instance (export "f" (func $f))))
784   ))
785   (func (export "run") (type $t)
786     (canon lift (core func $a "run")))
787 )
788         "#,
789     )?;
790     let b = Component::new(
791         &engine,
792         r#"
793 (component
794   (type $t (func async))
795   (core module $a
796     (func (export "run"))
797   )
798   (core instance $a (instantiate $a))
799   (func (export "run") (type $t)
800     (canon lift (core func $a "run")))
801 )
802         "#,
803     )?;
804 
805     type State = Option<Instance>;
806 
807     let mut linker = Linker::new(&engine);
808     linker
809         .root()
810         .func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| {
811             Box::pin(async move {
812                 let func = accessor.with(|mut access| {
813                     access
814                         .get()
815                         .unwrap()
816                         .get_typed_func::<(), ()>(&mut access, "run")
817                 })?;
818                 func.call_concurrent(accessor, ()).await?;
819                 Ok(())
820             })
821         })?;
822     let mut store = Store::new(&engine, None);
823     let instance_a = linker.instantiate_async(&mut store, &a).await?;
824     let instance_b = linker.instantiate_async(&mut store, &b).await?;
825     *store.data_mut() = Some(instance_b);
826     let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?;
827     run.call_async(&mut store, ()).await?;
828     Ok(())
829 }
830 
831 #[tokio::test]
832 #[cfg_attr(miri, ignore)]
require_concurrency_support() -> Result<()>833 async fn require_concurrency_support() -> Result<()> {
834     let mut config = Config::new();
835     config.concurrency_support(false);
836     let engine = Engine::new(&config)?;
837 
838     let mut store = Store::new(&engine, ());
839 
840     assert!(
841         store
842             .run_concurrent(async |_| wasmtime::error::Ok(()))
843             .await
844             .is_err()
845     );
846 
847     assert!(StreamReader::<u32>::new(&mut store, Vec::new()).is_err());
848     assert!(FutureReader::new(&mut store, async { wasmtime::error::Ok(0) }).is_err());
849 
850     let mut linker = Linker::<()>::new(&engine);
851     let mut root = linker.root();
852 
853     assert!(
854         root.func_wrap_concurrent::<(), (), _>("f1", |_, _| { todo!() })
855             .is_err()
856     );
857     assert!(
858         root.func_new_concurrent("f2", |_, _, _, _| { todo!() })
859             .is_err()
860     );
861     assert!(
862         root.resource_concurrent("f3", ResourceType::host::<u32>(), |_, _| { todo!() })
863             .is_err()
864     );
865 
866     Ok(())
867 }
868 
869 #[tokio::test]
870 #[cfg_attr(miri, ignore)]
cancel_host_task_does_not_leak() -> Result<()>871 async fn cancel_host_task_does_not_leak() -> Result<()> {
872     let mut config = Config::new();
873     config.wasm_component_model_async(true);
874     let engine = Engine::new(&config)?;
875 
876     let mut store = Store::new(&engine, ());
877     let component = Component::new(
878         &engine,
879         r#"(component
880             (import "f" (func $f async))
881 
882             (core module $m
883                 (import "" "f" (func $f (result i32)))
884                 (import "" "cancel" (func $cancel (param i32) (result i32)))
885                 (import "" "drop" (func $drop (param i32)))
886                 (func (export "run")
887                     (local i32)
888 
889                     ;; start the subtask, asserting it's `STARTED`
890                     call $f
891                     local.tee 0
892                     i32.const 0xf
893                     i32.and
894                     i32.const 1 ;; STARTED
895                     i32.ne
896                     if unreachable end
897 
898                     ;; extract the task id
899                     local.get 0
900                     i32.const 4
901                     i32.shr_u
902                     local.set 0
903 
904                     ;; cancel the subtask asserting it's `RETURN_CANCELLED`
905                     local.get 0
906                     call $cancel
907                     i32.const 4 ;; RETURN_CANCELLED
908                     i32.ne
909                     if unreachable end
910 
911                     ;; drop the subtask
912                     local.get 0
913                     call $drop
914                 )
915             )
916             (core func $f (canon lower (func $f) async))
917             (core func $cancel (canon subtask.cancel))
918             (core func $drop (canon subtask.drop))
919             (core instance $i (instantiate $m
920                 (with "" (instance
921                     (export "f" (func $f))
922                     (export "cancel" (func $cancel))
923                     (export "drop" (func $drop))
924                 ))
925             ))
926 
927             (func (export "f") async
928                 (canon lift (core func $i "run")))
929 
930 
931         )"#,
932     )?;
933 
934     let mut linker = Linker::new(&engine);
935     linker.root().func_wrap_concurrent("f", |_, ()| {
936         Box::pin(async move {
937             std::future::pending::<()>().await;
938             Ok(())
939         })
940     })?;
941     let instance = linker.instantiate_async(&mut store, &component).await?;
942     let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
943     store
944         .run_concurrent(async |store| -> wasmtime::Result<()> {
945             func.call_concurrent(store, ()).await?;
946 
947             for _ in 0..5 {
948                 tokio::task::yield_now().await;
949             }
950             Ok(())
951         })
952         .await??;
953 
954     // The host task was cancelled, nothing should remain.
955     store.assert_concurrent_state_empty();
956 
957     Ok(())
958 }
959 
960 #[tokio::test]
961 #[cfg_attr(miri, ignore)]
sync_lower_async_host_does_not_leak() -> Result<()>962 async fn sync_lower_async_host_does_not_leak() -> Result<()> {
963     let mut config = Config::new();
964     config.wasm_component_model_async(true);
965     let engine = Engine::new(&config)?;
966 
967     let mut store = Store::new(&engine, 0);
968     let component = Component::new(
969         &engine,
970         r#"(component
971             (import "f" (func $f async))
972 
973             (core module $m
974                 (import "" "f" (func $f))
975                 (func (export "run")
976                     (local $c i32)
977 
978                     ;; call the host 100 times
979                     loop
980                       call $f
981                       (local.tee $c (i32.add (local.get $c) (i32.const 1)))
982                       i32.const 100
983                       i32.ne
984                       if br 1 end
985                     end
986                 )
987             )
988             (core func $f (canon lower (func $f) ))
989             (core instance $i (instantiate $m
990                 (with "" (instance
991                     (export "f" (func $f))
992                 ))
993             ))
994 
995             (func (export "f") async
996                 (canon lift (core func $i "run")))
997 
998 
999         )"#,
1000     )?;
1001 
1002     let mut linker = Linker::<usize>::new(&engine);
1003     linker
1004         .root()
1005         .func_wrap_concurrent("f", |accessor, (): ()| {
1006             Box::pin(async move {
1007                 // Ensure that this doesn't hit the fast path of "ready on
1008                 // first poll"
1009                 for _ in 0..5 {
1010                     tokio::task::yield_now().await;
1011                 }
1012 
1013                 // Keep track of the maximum size of the table in
1014                 // concurrent_state.
1015                 accessor.with(|mut s| {
1016                     let cur = s.as_context_mut().concurrent_state_table_size();
1017                     let max = s.data_mut();
1018                     *max = (*max).max(cur);
1019                 });
1020                 Ok(())
1021             })
1022         })?;
1023     let instance = linker.instantiate_async(&mut store, &component).await?;
1024     let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
1025     func.call_async(&mut store, ()).await?;
1026 
1027     // First-level assertion: nothing should remain after the guest has exited.
1028     store.assert_concurrent_state_empty();
1029 
1030     // Second-level assertion: state should be incrementally cleaned up along
1031     // the way as the guest calls the host. Things shouldn't leak until the
1032     // guest exits at the end, for example.
1033     assert!(
1034         *store.data() < 100,
1035         "the store peaked at over 100 items in the concurrent table which \
1036          indicates that something isn't getting cleaned up between executions \
1037          of the host"
1038     );
1039 
1040     Ok(())
1041 }
1042 
1043 /// Regression test: `stream.cancel-read` with `async` option must not corrupt
1044 /// the read state when it returns BLOCKED.
1045 ///
1046 /// Bug: cancel_read/cancel_write unconditionally transitioned the read/write
1047 /// state from GuestReady to Open after the cancel, even when the cancel
1048 /// returned BLOCKED. This destroyed the buffer address/count info, causing
1049 /// an error when the host later tried to access the stream state.
1050 #[tokio::test]
1051 #[cfg_attr(miri, ignore)]
stream_cancel_read_async_does_not_corrupt_state() -> Result<()>1052 async fn stream_cancel_read_async_does_not_corrupt_state() -> Result<()> {
1053     _ = env_logger::try_init();
1054 
1055     let mut config = Config::new();
1056     config.wasm_component_model_async(true);
1057     config.wasm_component_model_async_builtins(true);
1058     config.wasm_component_model_async_stackful(true);
1059     let engine = Engine::new(&config)?;
1060 
1061     let component = Component::new(
1062         &engine,
1063         r#"
1064 (component
1065   (core module $libc (memory (export "memory") 1))
1066   (core instance $libc (instantiate $libc))
1067   (core module $m
1068     (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1069     (import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32)))
1070     (import "" "stream.drop-readable" (func $stream.drop-readable (param i32)))
1071     (import "" "waitable.join" (func $waitable.join (param i32 i32)))
1072     (import "" "waitable-set.new" (func $waitable-set.new (result i32)))
1073     (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
1074     (import "" "waitable-set.drop" (func $waitable-set.drop (param i32)))
1075     (memory (export "memory") 1)
1076 
1077     (func (export "run") (param $sr i32)
1078       (local $cancel_result i32)
1079       (local $ws i32)
1080 
1081       ;; Async read into buffer at 0x100, length 4.
1082       ;; Should return BLOCKED (-1) since the host producer never writes.
1083       (call $stream.read (local.get $sr) (i32.const 0x100) (i32.const 4))
1084       i32.const -1 ;; BLOCKED
1085       i32.ne
1086       if unreachable end
1087 
1088       ;; Async cancel-read. The host write end is HostReady, so this returns
1089       ;; BLOCKED. Bug: the cancel unconditionally transitions GuestReady -> Open,
1090       ;; destroying the buffer info.
1091       (local.set $cancel_result (call $stream.cancel-read (local.get $sr)))
1092 
1093       ;; If cancel returned BLOCKED (-1), wait for the cancel to complete.
1094       ;; This is where the bug manifests: when the host processes the cancel,
1095       ;; it accesses the read state which was corrupted from GuestReady to Open.
1096       (if (i32.eq (local.get $cancel_result) (i32.const -1))
1097         (then
1098           (local.set $ws (call $waitable-set.new))
1099           (call $waitable.join (local.get $sr) (local.get $ws))
1100           ;; Wait for the stream event (cancel completion). Event buffer at 0x200.
1101           (drop (call $waitable-set.wait (local.get $ws) (i32.const 0x200)))
1102           ;; Unjoin stream from waitable-set (join to 0 = unjoin)
1103           (call $waitable.join (local.get $sr) (i32.const 0))
1104           (call $waitable-set.drop (local.get $ws))
1105         )
1106       )
1107 
1108       ;; Drop the stream
1109       (call $stream.drop-readable (local.get $sr))
1110     )
1111   )
1112 
1113   (type $s (stream u8))
1114   (core func $stream.read (canon stream.read $s async (memory $libc "memory")))
1115   (core func $stream.cancel-read (canon stream.cancel-read $s async))
1116   (core func $stream.drop-readable (canon stream.drop-readable $s))
1117   (canon waitable.join (core func $waitable.join))
1118   (canon waitable-set.new (core func $waitable-set.new))
1119   (canon waitable-set.wait (memory $libc "memory") (core func $waitable-set.wait))
1120   (canon waitable-set.drop (core func $waitable-set.drop))
1121 
1122   (core instance $i (instantiate $m
1123     (with "" (instance
1124       (export "stream.read" (func $stream.read))
1125       (export "stream.cancel-read" (func $stream.cancel-read))
1126       (export "stream.drop-readable" (func $stream.drop-readable))
1127       (export "waitable.join" (func $waitable.join))
1128       (export "waitable-set.new" (func $waitable-set.new))
1129       (export "waitable-set.wait" (func $waitable-set.wait))
1130       (export "waitable-set.drop" (func $waitable-set.drop))
1131     ))
1132   ))
1133 
1134   (func (export "run") async (param "s" (stream u8))
1135     (canon lift
1136       (core func $i "run")
1137       (memory $libc "memory")
1138     )
1139   )
1140 )
1141         "#,
1142     )?;
1143 
1144     let mut store = Store::new(&engine, ());
1145     let instance = Linker::new(&engine)
1146         .instantiate_async(&mut store, &component)
1147         .await?;
1148     let func = instance.get_typed_func::<(StreamReader<u8>,), ()>(&mut store, "run")?;
1149 
1150     // Create a host-side stream that never produces data (always Pending).
1151     // When cancel is requested (finish=true), it acknowledges the cancellation.
1152     let reader = StreamReader::new(&mut store, NeverWriteStreamProducer)?;
1153     func.call_async(&mut store, (reader,)).await?;
1154 
1155     return Ok(());
1156 
1157     struct NeverWriteStreamProducer;
1158 
1159     impl StreamProducer<()> for NeverWriteStreamProducer {
1160         type Item = u8;
1161         type Buffer = Option<u8>;
1162 
1163         fn poll_produce<'a>(
1164             self: Pin<&mut Self>,
1165             _cx: &mut Context<'_>,
1166             _store: StoreContextMut<'a, ()>,
1167             _destination: Destination<'a, Self::Item, Self::Buffer>,
1168             finish: bool,
1169         ) -> Poll<Result<StreamResult>> {
1170             if finish {
1171                 // Cancel requested — acknowledge it.
1172                 Poll::Ready(Ok(StreamResult::Cancelled))
1173             } else {
1174                 // Never produce data.
1175                 Poll::Pending
1176             }
1177         }
1178     }
1179 }
1180 
1181 /// Regression test: multiple threads may concurrently make a synchronous
1182 /// call into the same async host function without corrupting state.
1183 ///
1184 /// Bug: waitable sets for host calls used to be shared across all threads, so if two threads
1185 /// called a sync-lowered async host function concurrently, the waitable set state got overwritten.
1186 #[tokio::test]
1187 #[cfg_attr(miri, ignore)]
concurrent_sync_calls_to_async_host() -> Result<()>1188 async fn concurrent_sync_calls_to_async_host() -> Result<()> {
1189     _ = env_logger::try_init();
1190 
1191     let mut config = Config::new();
1192     config.wasm_component_model_async(true);
1193     config.wasm_component_model_async_builtins(true);
1194     config.wasm_component_model_async_stackful(true);
1195     config.wasm_component_model_threading(true);
1196     let engine = Engine::new(&config)?;
1197     let mut store = Store::new(&engine, 0);
1198 
1199     let component = Component::new(
1200         &engine,
1201         r#"(component
1202             (import "await-three-calls" (func $await-three-calls async))
1203 
1204             (core module $libc
1205                 (table (export "__indirect_function_table") 1 funcref))
1206 
1207             (core module $m
1208                 (import "" "await-three-calls" (func $await-three-calls))
1209                 (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
1210                 (import "" "thread.unsuspend" (func $thread-unsuspend (param i32)))
1211                 (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref))
1212 
1213                 (func (export "run")
1214                     (call $thread-new-indirect (i32.const 0) (i32.const 0))
1215                     (call $thread-unsuspend)
1216                     (call $thread-new-indirect (i32.const 0) (i32.const 0))
1217                     (call $thread-unsuspend)
1218                     (call $await-three-calls)
1219                 )
1220                 (func $thread-entry (param i32)
1221                     (call $await-three-calls)
1222                 )
1223                 (elem (table $indirect-function-table) (i32.const 0) func $thread-entry)
1224             )
1225             ;; Instantiate the libc module to get the table
1226             (core instance $libc (instantiate $libc))
1227             ;; Get access to `thread.new-indirect` that uses the table from libc
1228             (core type $start-func-ty (func (param i32)))
1229             (alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
1230             (core func $thread-new-indirect
1231                 (canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
1232             (core func $thread-unsuspend (canon thread.unsuspend))
1233 
1234             (core func $await-three-calls (canon lower (func $await-three-calls) ))
1235             (core instance $i (instantiate $m
1236                 (with "" (instance
1237                     (export "await-three-calls" (func $await-three-calls))
1238                     (export "thread.new-indirect" (func $thread-new-indirect))
1239                     (export "thread.unsuspend" (func $thread-unsuspend))
1240                 ))
1241                 (with "libc" (instance $libc))
1242             ))
1243             (func (export "run") async
1244                 (canon lift (core func $i "run")))
1245         )"#,
1246     )?;
1247 
1248     let mut linker = Linker::<i32>::new(&engine);
1249     linker
1250         .root()
1251         .func_wrap_concurrent("await-three-calls", |accessor, (): ()| {
1252             Box::pin(async move {
1253                 accessor.with(|mut s| {
1254                     *s.data_mut() += 1;
1255                 });
1256                 while accessor.with(|mut s| *s.data_mut()) < 3 {
1257                     tokio::task::yield_now().await;
1258                 }
1259                 Ok(())
1260             })
1261         })?;
1262     let instance = linker.instantiate_async(&mut store, &component).await?;
1263     let func = instance.get_typed_func::<(), ()>(&mut store, "run")?;
1264     func.call_async(&mut store, ()).await?;
1265 
1266     store.assert_concurrent_state_empty();
1267 
1268     Ok(())
1269 }
1270 
1271 #[tokio::test]
1272 #[cfg_attr(miri, ignore)]
bytes_stream_producer() -> Result<()>1273 async fn bytes_stream_producer() -> Result<()> {
1274     let mut config = Config::new();
1275     config.wasm_component_model_async(true);
1276     let engine = Engine::new(&config)?;
1277 
1278     let component = Component::new(
1279         &engine,
1280         r#"
1281         (component
1282             (core module $libc (memory (export "mem") 1))
1283             (core instance $libc (instantiate $libc))
1284             (core module $m
1285                 (import "" "mem" (memory 1))
1286                 (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32)))
1287 
1288                 (func (export "read") (param i32 i32) (result i32)
1289                     (call $stream.read (local.get 0) (i32.const 0) (local.get 1))
1290                 )
1291             )
1292             (type $s (stream u8))
1293             (core func $stream.read (canon stream.read $s async (memory $libc "mem")))
1294             (core instance $i (instantiate $m
1295                 (with "" (instance
1296                     (export "mem" (memory $libc "mem"))
1297                     (export "stream.read" (func $stream.read))
1298                 ))
1299             ))
1300             (func (export "read") (param "s" (stream u8)) (param "l" u32) (result u32)
1301                 (canon lift (core func $i "read")))
1302         )
1303     "#,
1304     )?;
1305 
1306     let linker = Linker::new(&engine);
1307     let mut store = Store::new(&engine, ());
1308     let instance = linker.instantiate_async(&mut store, &component).await?;
1309     let func = instance.get_typed_func::<(StreamReader<u8>, u32), (u32,)>(&mut store, "read")?;
1310 
1311     // read less than the capacity
1312     let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1313     assert_eq!(
1314         func.call_async(&mut store, (reader, 1)).await?,
1315         ((1 << 4) | 0,),
1316     );
1317 
1318     // read more than the capacity
1319     let reader = StreamReader::new(&mut store, bytes::Bytes::from_static(b"hello"))?;
1320     assert_eq!(
1321         func.call_async(&mut store, (reader, 100)).await?,
1322         ((5 << 4) | 1,),
1323     );
1324 
1325     Ok(())
1326 }
1327