1 use crate::async_functions::{PollOnce, execute_across_threads}; 2 use std::pin::Pin; 3 use std::task::{Context, Poll}; 4 use wasmtime::Result; 5 use wasmtime::{AsContextMut, Config, Engine, Store, StoreContextMut, Trap, component::*}; 6 use wasmtime_component_util::REALLOC_AND_FREE; 7 8 /// This is super::func::thunks, except with an async store. 9 #[tokio::test] 10 #[cfg_attr(miri, ignore)] 11 async fn smoke() -> Result<()> { 12 let component = r#" 13 (component 14 (core module $m 15 (func (export "thunk")) 16 (func (export "thunk-trap") unreachable) 17 ) 18 (core instance $i (instantiate $m)) 19 (func (export "thunk") 20 (canon lift (core func $i "thunk")) 21 ) 22 (func (export "thunk-trap") 23 (canon lift (core func $i "thunk-trap")) 24 ) 25 ) 26 "#; 27 28 let engine = super::async_engine(); 29 let component = Component::new(&engine, component)?; 30 let mut store = Store::new(&engine, ()); 31 let instance = Linker::new(&engine) 32 .instantiate_async(&mut store, &component) 33 .await?; 34 35 let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?; 36 37 thunk.call_async(&mut store, ()).await?; 38 39 let err = instance 40 .get_typed_func::<(), ()>(&mut store, "thunk-trap")? 41 .call_async(&mut store, ()) 42 .await 43 .unwrap_err(); 44 assert_eq!(err.downcast::<Trap>()?, Trap::UnreachableCodeReached); 45 46 Ok(()) 47 } 48 49 /// Handle an import function, created using component::Linker::func_wrap_async. 50 #[tokio::test] 51 #[cfg_attr(miri, ignore)] 52 async fn smoke_func_wrap() -> Result<()> { 53 let component = r#" 54 (component 55 (type $f (func)) 56 (import "i" (func $f)) 57 58 (core module $m 59 (import "imports" "i" (func $i)) 60 (func (export "thunk") call $i) 61 ) 62 63 (core func $f (canon lower (func $f))) 64 (core instance $i (instantiate $m 65 (with "imports" (instance 66 (export "i" (func $f)) 67 )) 68 )) 69 (func (export "thunk") 70 (canon lift (core func $i "thunk")) 71 ) 72 ) 73 "#; 74 75 let engine = super::async_engine(); 76 let component = Component::new(&engine, component)?; 77 let mut store = Store::new(&engine, ()); 78 let mut linker = Linker::new(&engine); 79 let mut root = linker.root(); 80 root.func_wrap_async("i", |_: StoreContextMut<()>, _: ()| { 81 Box::new(async { Ok(()) }) 82 })?; 83 84 let instance = linker.instantiate_async(&mut store, &component).await?; 85 86 let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?; 87 88 thunk.call_async(&mut store, ()).await?; 89 90 Ok(()) 91 } 92 93 // This test stresses TLS management in combination with the `realloc` option 94 // for imported functions. This will create an async computation which invokes a 95 // component that invokes an imported function. The imported function returns a 96 // list which will require invoking malloc. 97 // 98 // As an added stressor all polls are sprinkled across threads through 99 // `execute_across_threads`. Yields are injected liberally by configuring 1 100 // fuel consumption to trigger a yield. 101 // 102 // Overall a yield should happen during malloc which should be an "interesting 103 // situation" with respect to the runtime. 104 #[tokio::test] 105 #[cfg_attr(miri, ignore)] 106 async fn resume_separate_thread() -> Result<()> { 107 let mut config = wasmtime_test_util::component::config(); 108 config.consume_fuel(true); 109 let engine = Engine::new(&config)?; 110 let component = format!( 111 r#" 112 (component 113 (import "yield" (func $yield (result (list u8)))) 114 (core module $libc 115 (memory (export "memory") 1) 116 {REALLOC_AND_FREE} 117 ) 118 (core instance $libc (instantiate $libc)) 119 120 (core func $yield 121 (canon lower 122 (func $yield) 123 (memory $libc "memory") 124 (realloc (func $libc "realloc")) 125 ) 126 ) 127 128 (core module $m 129 (import "" "yield" (func $yield (param i32))) 130 (import "libc" "memory" (memory 0)) 131 (func $start 132 i32.const 8 133 call $yield 134 ) 135 (start $start) 136 ) 137 (core instance (instantiate $m 138 (with "" (instance (export "yield" (func $yield)))) 139 (with "libc" (instance $libc)) 140 )) 141 ) 142 "# 143 ); 144 let component = Component::new(&engine, component)?; 145 let mut linker = Linker::new(&engine); 146 linker 147 .root() 148 .func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| { 149 Box::new(async { 150 tokio::task::yield_now().await; 151 Ok((vec![1u8, 2u8],)) 152 }) 153 })?; 154 155 execute_across_threads(async move { 156 let mut store = Store::new(&engine, ()); 157 store.set_fuel(u64::MAX).unwrap(); 158 store.fuel_async_yield_interval(Some(1)).unwrap(); 159 linker.instantiate_async(&mut store, &component).await?; 160 Ok::<_, wasmtime::Error>(()) 161 }) 162 .await?; 163 Ok(()) 164 } 165 166 // This test is intended to stress TLS management in the component model around 167 // the management of the `realloc` function. This creates an async computation 168 // representing the execution of a component model function where entry into the 169 // component uses `realloc` and then the component runs. This async computation 170 // is then polled iteratively with another "wasm activation" (in this case a 171 // core wasm function) on the stack. The poll-per-call should work and nothing 172 // should in theory have problems here. 173 // 174 // As an added stressor all polls are sprinkled across threads through 175 // `execute_across_threads`. Yields are injected liberally by configuring 1 176 // fuel consumption to trigger a yield. 177 // 178 // Overall a yield should happen during malloc which should be an "interesting 179 // situation" with respect to the runtime. 180 #[tokio::test] 181 #[cfg_attr(miri, ignore)] 182 async fn poll_through_wasm_activation() -> Result<()> { 183 let mut config = wasmtime_test_util::component::config(); 184 config.consume_fuel(true); 185 let engine = Engine::new(&config)?; 186 let component = format!( 187 r#" 188 (component 189 (core module $m 190 {REALLOC_AND_FREE} 191 (memory (export "memory") 1) 192 (func (export "run") (param i32 i32) 193 ) 194 ) 195 (core instance $i (instantiate $m)) 196 (func (export "run") (param "x" (list u8)) 197 (canon lift (core func $i "run") 198 (memory $i "memory") 199 (realloc (func $i "realloc")))) 200 ) 201 "# 202 ); 203 let component = Component::new(&engine, component)?; 204 let linker = Linker::new(&engine); 205 206 let invoke_component = { 207 let engine = engine.clone(); 208 async move { 209 let mut store = Store::new(&engine, ()); 210 store.set_fuel(u64::MAX).unwrap(); 211 store.fuel_async_yield_interval(Some(1)).unwrap(); 212 let instance = linker.instantiate_async(&mut store, &component).await?; 213 let func = instance.get_typed_func::<(Vec<u8>,), ()>(&mut store, "run")?; 214 func.call_async(&mut store, (vec![1, 2, 3],)).await?; 215 Ok::<_, wasmtime::Error>(()) 216 } 217 }; 218 219 execute_across_threads(async move { 220 let mut store = Store::new(&engine, Some(Box::pin(invoke_component))); 221 let poll_once = wasmtime::Func::wrap_async(&mut store, |mut cx, _: ()| { 222 let invoke_component = cx.data_mut().take().unwrap(); 223 Box::new(async move { 224 match PollOnce::new(invoke_component).await { 225 Ok(result) => { 226 result?; 227 Ok(1) 228 } 229 Err(future) => { 230 *cx.data_mut() = Some(future); 231 Ok(0) 232 } 233 } 234 }) 235 }); 236 let poll_once = poll_once.typed::<(), i32>(&mut store)?; 237 while poll_once.call_async(&mut store, ()).await? != 1 { 238 // loop around to call again 239 } 240 Ok::<_, wasmtime::Error>(()) 241 }) 242 .await?; 243 Ok(()) 244 } 245 246 /// Test async drop method for host resources. 247 #[tokio::test] 248 #[cfg_attr(miri, ignore)] 249 async fn drop_resource_async() -> Result<()> { 250 use std::sync::Arc; 251 use std::sync::Mutex; 252 253 let engine = super::async_engine(); 254 let c = Component::new( 255 &engine, 256 r#" 257 (component 258 (import "t" (type $t (sub resource))) 259 260 (core func $drop (canon resource.drop $t)) 261 262 (core module $m 263 (import "" "drop" (func $drop (param i32))) 264 (func (export "f") (param i32) 265 (call $drop (local.get 0)) 266 ) 267 ) 268 (core instance $i (instantiate $m 269 (with "" (instance 270 (export "drop" (func $drop)) 271 )) 272 )) 273 274 (func (export "f") (param "x" (own $t)) 275 (canon lift (core func $i "f"))) 276 ) 277 "#, 278 )?; 279 280 struct MyType; 281 282 let mut store = Store::new(&engine, ()); 283 let mut linker = Linker::new(&engine); 284 285 let drop_status = Arc::new(Mutex::new("not dropped")); 286 let ds = drop_status.clone(); 287 288 linker 289 .root() 290 .resource_async("t", ResourceType::host::<MyType>(), move |_, _| { 291 let ds = ds.clone(); 292 Box::new(async move { 293 *ds.lock().unwrap() = "before yield"; 294 tokio::task::yield_now().await; 295 *ds.lock().unwrap() = "after yield"; 296 Ok(()) 297 }) 298 })?; 299 let i = linker.instantiate_async(&mut store, &c).await?; 300 let f = i.get_typed_func::<(Resource<MyType>,), ()>(&mut store, "f")?; 301 302 execute_across_threads(async move { 303 let resource = Resource::new_own(100); 304 f.call_async(&mut store, (resource,)).await?; 305 Ok::<_, wasmtime::Error>(()) 306 }) 307 .await?; 308 309 assert_eq!("after yield", *drop_status.lock().unwrap()); 310 311 Ok(()) 312 } 313 314 /// Test task deletion in three situations, for every combination of lift/lower/(guest/host): 315 /// 1. An explicit thread calls task.return 316 /// 2. An explicit thread suspends indefinitely 317 /// 3. An explicit thread yield loops indefinitely 318 #[tokio::test] 319 #[cfg_attr(miri, ignore)] 320 async fn task_deletion() -> Result<()> { 321 let mut config = Config::new(); 322 config.wasm_component_model_async(true); 323 config.wasm_component_model_threading(true); 324 config.wasm_component_model_async_stackful(true); 325 config.wasm_component_model_async_builtins(true); 326 let engine = Engine::new(&config)?; 327 let component = Component::new( 328 &engine, 329 r#"(component 330 (component $C 331 (core module $Memory (memory (export "mem") 1)) 332 (core instance $memory (instantiate $Memory)) 333 ;; Defines the table for the thread start functions 334 (core module $libc 335 (table (export "__indirect_function_table") 3 funcref)) 336 (core module $CM 337 (import "" "mem" (memory 1)) 338 (import "" "task.return" (func $task-return (param i32))) 339 (import "" "task.cancel" (func $task-cancel)) 340 (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32))) 341 (import "" "thread.suspend" (func $thread-suspend (result i32))) 342 (import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32))) 343 (import "" "thread.yield-to-suspended" (func $thread-yield-to-suspended (param i32) (result i32))) 344 (import "" "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable (param i32) (result i32))) 345 (import "" "thread.suspend-to" (func $thread-suspend-to (param i32) (result i32))) 346 (import "" "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable (param i32) (result i32))) 347 (import "" "thread.yield" (func $thread-yield (result i32))) 348 (import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32))) 349 (import "" "thread.index" (func $thread-index (result i32))) 350 (import "" "thread.unsuspend" (func $thread-unsuspend (param i32))) 351 (import "" "waitable.join" (func $waitable.join (param i32 i32))) 352 (import "" "waitable-set.new" (func $waitable-set.new (result i32))) 353 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) 354 (import "libc" "__indirect_function_table" (table $indirect-function-table 3 funcref)) 355 356 ;; Indices into the function table for the thread start functions 357 (global $call-return-ftbl-idx i32 (i32.const 0)) 358 (global $suspend-ftbl-idx i32 (i32.const 1)) 359 (global $yield-loop-ftbl-idx i32 (i32.const 2)) 360 361 (func $call-return (param i32) 362 (call $task-return (local.get 0))) 363 364 (func $suspend (param i32) 365 (drop (call $thread-suspend))) 366 367 (func $yield-loop (param i32) 368 (loop $top 369 (drop (call $thread-yield)) 370 (br $top))) 371 372 (func (export "explicit-thread-calls-return-stackful") 373 (call $thread-unsuspend 374 (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42)))) 375 376 (func (export "explicit-thread-calls-return-stackless") (result i32) 377 (call $thread-unsuspend 378 (call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42))) 379 (i32.const 0 (; EXIT ;))) 380 381 (func (export "cb") (param i32 i32 i32) (result i32) 382 (unreachable)) 383 384 (func (export "explicit-thread-suspends-sync") (result i32) 385 (call $thread-unsuspend 386 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42))) 387 (i32.const 42)) 388 389 (func (export "explicit-thread-suspends-stackful") 390 (call $thread-unsuspend 391 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42))) 392 (call $task-return (i32.const 42))) 393 394 (func (export "explicit-thread-suspends-stackless") (result i32) 395 (call $thread-unsuspend 396 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42))) 397 (call $task-return (i32.const 42)) 398 (i32.const 0)) 399 400 (func (export "explicit-thread-yield-loops-sync") (result i32) 401 (call $thread-unsuspend 402 (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42))) 403 (i32.const 42)) 404 405 (func (export "explicit-thread-yield-loops-stackful") 406 (call $thread-unsuspend 407 (call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42))) 408 (call $task-return (i32.const 42))) 409 410 (func (export "explicit-thread-yield-loops-stackless") (result i32) 411 (call $thread-unsuspend 412 (call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42))) 413 (call $task-return (i32.const 42)) 414 (i32.const 0 (; EXIT ;))) 415 416 ;; Initialize the function table that will be used by thread.new-indirect 417 (elem (table $indirect-function-table) (i32.const 0 (; call-return-ftbl-idx ;)) func $call-return) 418 (elem (table $indirect-function-table) (i32.const 1 (; suspend-ftbl-idx ;)) func $suspend) 419 (elem (table $indirect-function-table) (i32.const 2 (; yield-loop-ftbl-idx ;)) func $yield-loop) 420 ) 421 422 ;; Instantiate the libc module to get the table 423 (core instance $libc (instantiate $libc)) 424 ;; Get access to `thread.new-indirect` that uses the table from libc 425 (core type $start-func-ty (func (param i32))) 426 (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) 427 428 (core func $task-return (canon task.return (result u32))) 429 (core func $task-cancel (canon task.cancel)) 430 (core func $thread-new-indirect 431 (canon thread.new-indirect $start-func-ty (table $indirect-function-table))) 432 (core func $thread-yield (canon thread.yield)) 433 (core func $thread-yield-cancellable (canon thread.yield cancellable)) 434 (core func $thread-index (canon thread.index)) 435 (core func $thread-yield-to-suspended (canon thread.yield-to-suspended)) 436 (core func $thread-yield-to-suspended-cancellable (canon thread.yield-to-suspended cancellable)) 437 (core func $thread-unsuspend (canon thread.unsuspend)) 438 (core func $thread-suspend-to (canon thread.suspend-to)) 439 (core func $thread-suspend-to-cancellable (canon thread.suspend-to cancellable)) 440 (core func $thread-suspend (canon thread.suspend)) 441 (core func $thread-suspend-cancellable (canon thread.suspend cancellable)) 442 (core func $waitable-set.new (canon waitable-set.new)) 443 (core func $waitable.join (canon waitable.join)) 444 (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem"))) 445 446 ;; Instantiate the main module 447 (core instance $cm ( 448 instantiate $CM 449 (with "" (instance 450 (export "mem" (memory $memory "mem")) 451 (export "task.return" (func $task-return)) 452 (export "task.cancel" (func $task-cancel)) 453 (export "thread.new-indirect" (func $thread-new-indirect)) 454 (export "thread.index" (func $thread-index)) 455 (export "thread.yield-to-suspended" (func $thread-yield-to-suspended)) 456 (export "thread.yield-to-suspended-cancellable" (func $thread-yield-to-suspended-cancellable)) 457 (export "thread.yield" (func $thread-yield)) 458 (export "thread.yield-cancellable" (func $thread-yield-cancellable)) 459 (export "thread.suspend-to" (func $thread-suspend-to)) 460 (export "thread.suspend-to-cancellable" (func $thread-suspend-to-cancellable)) 461 (export "thread.suspend" (func $thread-suspend)) 462 (export "thread.suspend-cancellable" (func $thread-suspend-cancellable)) 463 (export "thread.unsuspend" (func $thread-unsuspend)) 464 (export "waitable.join" (func $waitable.join)) 465 (export "waitable-set.wait" (func $waitable-set.wait)) 466 (export "waitable-set.new" (func $waitable-set.new)))) 467 (with "libc" (instance $libc)))) 468 469 (func (export "explicit-thread-calls-return-stackful") async (result u32) 470 (canon lift (core func $cm "explicit-thread-calls-return-stackful") async)) 471 (func (export "explicit-thread-calls-return-stackless") async (result u32) 472 (canon lift (core func $cm "explicit-thread-calls-return-stackless") async (callback (func $cm "cb")))) 473 (func (export "explicit-thread-suspends-sync") async (result u32) 474 (canon lift (core func $cm "explicit-thread-suspends-sync"))) 475 (func (export "explicit-thread-suspends-stackful") async (result u32) 476 (canon lift (core func $cm "explicit-thread-suspends-stackful") async)) 477 (func (export "explicit-thread-suspends-stackless") async (result u32) 478 (canon lift (core func $cm "explicit-thread-suspends-stackless") async (callback (func $cm "cb")))) 479 (func (export "explicit-thread-yield-loops-sync") async (result u32) 480 (canon lift (core func $cm "explicit-thread-yield-loops-sync"))) 481 (func (export "explicit-thread-yield-loops-stackful") async (result u32) 482 (canon lift (core func $cm "explicit-thread-yield-loops-stackful") async)) 483 (func (export "explicit-thread-yield-loops-stackless") async (result u32) 484 (canon lift (core func $cm "explicit-thread-yield-loops-stackless") async (callback (func $cm "cb")))) 485 ) 486 487 (component $D 488 (import "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful async (result u32))) 489 (import "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless async (result u32))) 490 (import "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync async (result u32))) 491 (import "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful async (result u32))) 492 (import "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless async (result u32))) 493 (import "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync async (result u32))) 494 (import "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful async (result u32))) 495 (import "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless async (result u32))) 496 497 (core module $Memory (memory (export "mem") 1)) 498 (core instance $memory (instantiate $Memory)) 499 (core module $DM 500 (import "" "mem" (memory 1)) 501 (import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32))) 502 ;; sync lowered 503 (import "" "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful (result i32))) 504 (import "" "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless (result i32))) 505 (import "" "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync (result i32))) 506 (import "" "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful (result i32))) 507 (import "" "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless (result i32))) 508 (import "" "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync (result i32))) 509 (import "" "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful (result i32))) 510 (import "" "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless (result i32))) 511 ;; async lowered 512 (import "" "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async (param i32) (result i32))) 513 (import "" "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async (param i32) (result i32))) 514 (import "" "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async (param i32) (result i32))) 515 (import "" "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async (param i32) (result i32))) 516 (import "" "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async (param i32) (result i32))) 517 (import "" "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async (param i32) (result i32))) 518 (import "" "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async (param i32) (result i32))) 519 (import "" "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async (param i32) (result i32))) 520 (import "" "waitable.join" (func $waitable.join (param i32 i32))) 521 (import "" "waitable-set.new" (func $waitable-set.new (result i32))) 522 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) 523 (import "" "thread.yield" (func $thread-yield (result i32))) 524 525 (func $check (param i32) 526 (if (i32.ne (local.get 0) (i32.const 42)) 527 (then unreachable)) 528 ) 529 530 (func $check-async (param i32) 531 (local $retp i32) (local $ws i32) (local $ws-retp i32) 532 (local.set $retp (i32.const 8)) 533 (local.set $ws-retp (i32.const 16)) 534 (local.set $ws (call $waitable-set.new)) 535 536 (if (i32.eq (i32.and (local.get 0) (i32.const 0xF)) (i32.const 2 (; RETURNED ;))) 537 (then (call $check (i32.load (local.get $retp)))) 538 (else 539 (call $waitable.join (i32.shr_u (local.get 0) (i32.const 4)) (local.get $ws)) 540 (drop (call $waitable-set.wait (local.get $ws) (local.get $ws-retp))) 541 (call $check (i32.load (local.get $retp))))) 542 ) 543 544 (func $run (export "run") (result i32) 545 (local $retp i32) 546 (local.set $retp (i32.const 8)) 547 (call $check (call $explicit-thread-calls-return-stackless)) 548 (call $check (call $explicit-thread-calls-return-stackful)) 549 (call $check (call $explicit-thread-suspends-sync)) 550 (call $check (call $explicit-thread-suspends-stackful)) 551 (call $check (call $explicit-thread-suspends-stackless)) 552 (call $check (call $explicit-thread-yield-loops-sync)) 553 (call $check (call $explicit-thread-yield-loops-stackful)) 554 (call $check (call $explicit-thread-yield-loops-stackless)) 555 556 (call $check-async (call $explicit-thread-calls-return-stackless-async (local.get $retp))) 557 (call $check-async (call $explicit-thread-calls-return-stackful-async (local.get $retp))) 558 (call $check-async (call $explicit-thread-suspends-sync-async (local.get $retp))) 559 (call $check-async (call $explicit-thread-suspends-stackful-async (local.get $retp))) 560 (call $check-async (call $explicit-thread-suspends-stackless-async (local.get $retp))) 561 (call $check-async (call $explicit-thread-yield-loops-sync-async (local.get $retp))) 562 (call $check-async (call $explicit-thread-yield-loops-stackful-async (local.get $retp))) 563 (call $check-async (call $explicit-thread-yield-loops-stackless-async (local.get $retp))) 564 565 (i32.const 42) 566 ) 567 ) 568 569 (core func $waitable-set.new (canon waitable-set.new)) 570 (core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem"))) 571 (core func $waitable.join (canon waitable.join)) 572 (core func $subtask.cancel (canon subtask.cancel async)) 573 (core func $thread.yield (canon thread.yield)) 574 ;; sync lowered 575 (canon lower (func $explicit-thread-calls-return-stackful) (memory $memory "mem") (core func $explicit-thread-calls-return-stackful')) 576 (canon lower (func $explicit-thread-calls-return-stackless) (memory $memory "mem") (core func $explicit-thread-calls-return-stackless')) 577 (canon lower (func $explicit-thread-suspends-sync) (memory $memory "mem") (core func $explicit-thread-suspends-sync')) 578 (canon lower (func $explicit-thread-suspends-stackful) (memory $memory "mem") (core func $explicit-thread-suspends-stackful')) 579 (canon lower (func $explicit-thread-suspends-stackless) (memory $memory "mem") (core func $explicit-thread-suspends-stackless')) 580 (canon lower (func $explicit-thread-yield-loops-sync) (memory $memory "mem") (core func $explicit-thread-yield-loops-sync')) 581 (canon lower (func $explicit-thread-yield-loops-stackful) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful')) 582 (canon lower (func $explicit-thread-yield-loops-stackless) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless')) 583 ;; async lowered 584 (canon lower (func $explicit-thread-calls-return-stackful) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackful-async')) 585 (canon lower (func $explicit-thread-calls-return-stackless) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackless-async')) 586 (canon lower (func $explicit-thread-suspends-sync) async (memory $memory "mem") (core func $explicit-thread-suspends-sync-async')) 587 (canon lower (func $explicit-thread-suspends-stackful) async (memory $memory "mem") (core func $explicit-thread-suspends-stackful-async')) 588 (canon lower (func $explicit-thread-suspends-stackless) async (memory $memory "mem") (core func $explicit-thread-suspends-stackless-async')) 589 (canon lower (func $explicit-thread-yield-loops-sync) async (memory $memory "mem") (core func $explicit-thread-yield-loops-sync-async')) 590 (canon lower (func $explicit-thread-yield-loops-stackful) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful-async')) 591 (canon lower (func $explicit-thread-yield-loops-stackless) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless-async')) 592 (core instance $dm (instantiate $DM (with "" (instance 593 (export "mem" (memory $memory "mem")) 594 (export "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful')) 595 (export "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless')) 596 (export "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync')) 597 (export "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful')) 598 (export "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless')) 599 (export "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync')) 600 (export "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful')) 601 (export "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless')) 602 (export "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async')) 603 (export "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async')) 604 (export "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async')) 605 (export "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async')) 606 (export "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async')) 607 (export "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async')) 608 (export "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async')) 609 (export "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async')) 610 (export "waitable.join" (func $waitable.join)) 611 (export "waitable-set.new" (func $waitable-set.new)) 612 (export "waitable-set.wait" (func $waitable-set.wait)) 613 (export "subtask.cancel" (func $subtask.cancel)) 614 (export "thread.yield" (func $thread.yield)) 615 )))) 616 (func (export "run") async (result u32) (canon lift (core func $dm "run"))) 617 ) 618 619 (instance $c (instantiate $C)) 620 (instance $d (instantiate $D 621 (with "explicit-thread-calls-return-stackful" (func $c "explicit-thread-calls-return-stackful")) 622 (with "explicit-thread-calls-return-stackless" (func $c "explicit-thread-calls-return-stackless")) 623 (with "explicit-thread-suspends-sync" (func $c "explicit-thread-suspends-sync")) 624 (with "explicit-thread-suspends-stackful" (func $c "explicit-thread-suspends-stackful")) 625 (with "explicit-thread-suspends-stackless" (func $c "explicit-thread-suspends-stackless")) 626 (with "explicit-thread-yield-loops-sync" (func $c "explicit-thread-yield-loops-sync")) 627 (with "explicit-thread-yield-loops-stackful" (func $c "explicit-thread-yield-loops-stackful")) 628 (with "explicit-thread-yield-loops-stackless" (func $c "explicit-thread-yield-loops-stackless")) 629 )) 630 (func (export "run") (alias export $d "run")) 631 (func (export "explicit-thread-calls-return-stackful") (alias export $c "explicit-thread-calls-return-stackful")) 632 (func (export "explicit-thread-calls-return-stackless") (alias export $c "explicit-thread-calls-return-stackless")) 633 (func (export "explicit-thread-suspends-sync") (alias export $c "explicit-thread-suspends-sync")) 634 (func (export "explicit-thread-suspends-stackful") (alias export $c "explicit-thread-suspends-stackful")) 635 (func (export "explicit-thread-suspends-stackless") (alias export $c "explicit-thread-suspends-stackless")) 636 (func (export "explicit-thread-yield-loops-sync") (alias export $c "explicit-thread-yield-loops-sync")) 637 (func (export "explicit-thread-yield-loops-stackful") (alias export $c "explicit-thread-yield-loops-stackful")) 638 (func (export "explicit-thread-yield-loops-stackless") (alias export $c "explicit-thread-yield-loops-stackless")) 639 ) 640 "#, 641 )? 642 .serialize()?; 643 644 let component = unsafe { Component::deserialize(&engine, &component)? }; 645 let mut store = Store::new(&engine, ()); 646 let instance = Linker::new(&engine) 647 .instantiate_async(&mut store, &component) 648 .await?; 649 let funcs = vec![ 650 "run", 651 "explicit-thread-calls-return-stackful", 652 "explicit-thread-calls-return-stackless", 653 "explicit-thread-suspends-sync", 654 "explicit-thread-suspends-stackful", 655 "explicit-thread-suspends-stackless", 656 "explicit-thread-yield-loops-sync", 657 "explicit-thread-yield-loops-stackful", 658 "explicit-thread-yield-loops-stackless", 659 ]; 660 for func in funcs { 661 let func = instance.get_typed_func::<(), (u32,)>(&mut store, func)?; 662 assert_eq!(func.call_async(&mut store, ()).await?, (42,)); 663 } 664 665 Ok(()) 666 } 667 668 #[tokio::test] 669 #[cfg_attr(miri, ignore)] 670 async fn cancel_host_future() -> Result<()> { 671 let mut config = Config::new(); 672 config.wasm_component_model_async(true); 673 let engine = Engine::new(&config)?; 674 675 let component = Component::new( 676 &engine, 677 r#" 678 (component 679 (core module $libc (memory (export "memory") 1)) 680 (core instance $libc (instantiate $libc)) 681 (core module $m 682 (import "" "future.read" (func $future.read (param i32 i32) (result i32))) 683 (import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32))) 684 (memory (export "memory") 1) 685 686 (func (export "run") (param i32) 687 ;; read/cancel attempt 1 688 (call $future.read (local.get 0) (i32.const 100)) 689 i32.const -1 ;; BLOCKED 690 i32.ne 691 if unreachable end 692 693 (call $future.cancel-read (local.get 0)) 694 i32.const 2 ;; CANCELLED 695 i32.ne 696 if unreachable end 697 698 ;; read/cancel attempt 2 699 (call $future.read (local.get 0) (i32.const 100)) 700 i32.const -1 ;; BLOCKED 701 i32.ne 702 if unreachable end 703 704 (call $future.cancel-read (local.get 0)) 705 i32.const 2 ;; CANCELLED 706 i32.ne 707 if unreachable end 708 ) 709 ) 710 711 (type $f (future u32)) 712 (core func $future.read (canon future.read $f async (memory $libc "memory"))) 713 (core func $future.cancel-read (canon future.cancel-read $f)) 714 715 (core instance $i (instantiate $m 716 (with "" (instance 717 (export "future.read" (func $future.read)) 718 (export "future.cancel-read" (func $future.cancel-read)) 719 )) 720 )) 721 722 (func (export "run") async (param "f" $f) 723 (canon lift 724 (core func $i "run") 725 (memory $libc "memory") 726 ) 727 ) 728 ) 729 "#, 730 )?; 731 732 let mut store = Store::new(&engine, ()); 733 let instance = Linker::new(&engine) 734 .instantiate_async(&mut store, &component) 735 .await?; 736 let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?; 737 let reader = FutureReader::new(&mut store, MyFutureReader)?; 738 func.call_async(&mut store, (reader,)).await?; 739 740 return Ok(()); 741 742 struct MyFutureReader; 743 744 impl FutureProducer<()> for MyFutureReader { 745 type Item = u32; 746 747 fn poll_produce( 748 self: Pin<&mut Self>, 749 _cx: &mut Context<'_>, 750 _store: StoreContextMut<()>, 751 finish: bool, 752 ) -> Poll<Result<Option<Self::Item>>> { 753 if finish { 754 Poll::Ready(Ok(None)) 755 } else { 756 Poll::Pending 757 } 758 } 759 } 760 } 761 762 #[tokio::test] 763 #[cfg_attr(miri, ignore)] 764 async fn run_wasm_in_call_async() -> Result<()> { 765 _ = env_logger::try_init(); 766 767 let mut config = Config::new(); 768 config.wasm_component_model_async(true); 769 let engine = Engine::new(&config)?; 770 771 let a = Component::new( 772 &engine, 773 r#" 774 (component 775 (type $t (func async)) 776 (import "a" (func $f (type $t))) 777 (core func $f (canon lower (func $f))) 778 (core module $a 779 (import "" "f" (func $f)) 780 (func (export "run") call $f) 781 ) 782 (core instance $a (instantiate $a 783 (with "" (instance (export "f" (func $f)))) 784 )) 785 (func (export "run") (type $t) 786 (canon lift (core func $a "run"))) 787 ) 788 "#, 789 )?; 790 let b = Component::new( 791 &engine, 792 r#" 793 (component 794 (type $t (func async)) 795 (core module $a 796 (func (export "run")) 797 ) 798 (core instance $a (instantiate $a)) 799 (func (export "run") (type $t) 800 (canon lift (core func $a "run"))) 801 ) 802 "#, 803 )?; 804 805 type State = Option<Instance>; 806 807 let mut linker = Linker::new(&engine); 808 linker 809 .root() 810 .func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| { 811 Box::pin(async move { 812 let func = accessor.with(|mut access| { 813 access 814 .get() 815 .unwrap() 816 .get_typed_func::<(), ()>(&mut access, "run") 817 })?; 818 func.call_concurrent(accessor, ()).await?; 819 Ok(()) 820 }) 821 })?; 822 let mut store = Store::new(&engine, None); 823 let instance_a = linker.instantiate_async(&mut store, &a).await?; 824 let instance_b = linker.instantiate_async(&mut store, &b).await?; 825 *store.data_mut() = Some(instance_b); 826 let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?; 827 run.call_async(&mut store, ()).await?; 828 Ok(()) 829 } 830 831 #[tokio::test] 832 #[cfg_attr(miri, ignore)] 833 async fn require_concurrency_support() -> Result<()> { 834 let mut config = Config::new(); 835 config.concurrency_support(false); 836 let engine = Engine::new(&config)?; 837 838 let mut store = Store::new(&engine, ()); 839 840 assert!( 841 store 842 .run_concurrent(async |_| wasmtime::error::Ok(())) 843 .await 844 .is_err() 845 ); 846 847 assert!(StreamReader::<u32>::new(&mut store, Vec::new()).is_err()); 848 assert!(FutureReader::new(&mut store, async { wasmtime::error::Ok(0) }).is_err()); 849 850 let mut linker = Linker::<()>::new(&engine); 851 let mut root = linker.root(); 852 853 assert!( 854 root.func_wrap_concurrent::<(), (), _>("f1", |_, _| { todo!() }) 855 .is_err() 856 ); 857 assert!( 858 root.func_new_concurrent("f2", |_, _, _, _| { todo!() }) 859 .is_err() 860 ); 861 assert!( 862 root.resource_concurrent("f3", ResourceType::host::<u32>(), |_, _| { todo!() }) 863 .is_err() 864 ); 865 866 Ok(()) 867 } 868 869 #[tokio::test] 870 #[cfg_attr(miri, ignore)] 871 async fn cancel_host_task_does_not_leak() -> Result<()> { 872 let mut config = Config::new(); 873 config.wasm_component_model_async(true); 874 let engine = Engine::new(&config)?; 875 876 let mut store = Store::new(&engine, ()); 877 let component = Component::new( 878 &engine, 879 r#"(component 880 (import "f" (func $f async)) 881 882 (core module $m 883 (import "" "f" (func $f (result i32))) 884 (import "" "cancel" (func $cancel (param i32) (result i32))) 885 (import "" "drop" (func $drop (param i32))) 886 (func (export "run") 887 (local i32) 888 889 ;; start the subtask, asserting it's `STARTED` 890 call $f 891 local.tee 0 892 i32.const 0xf 893 i32.and 894 i32.const 1 ;; STARTED 895 i32.ne 896 if unreachable end 897 898 ;; extract the task id 899 local.get 0 900 i32.const 4 901 i32.shr_u 902 local.set 0 903 904 ;; cancel the subtask asserting it's `RETURN_CANCELLED` 905 local.get 0 906 call $cancel 907 i32.const 4 ;; RETURN_CANCELLED 908 i32.ne 909 if unreachable end 910 911 ;; drop the subtask 912 local.get 0 913 call $drop 914 ) 915 ) 916 (core func $f (canon lower (func $f) async)) 917 (core func $cancel (canon subtask.cancel)) 918 (core func $drop (canon subtask.drop)) 919 (core instance $i (instantiate $m 920 (with "" (instance 921 (export "f" (func $f)) 922 (export "cancel" (func $cancel)) 923 (export "drop" (func $drop)) 924 )) 925 )) 926 927 (func (export "f") async 928 (canon lift (core func $i "run"))) 929 930 931 )"#, 932 )?; 933 934 let mut linker = Linker::new(&engine); 935 linker.root().func_wrap_concurrent("f", |_, ()| { 936 Box::pin(async move { 937 std::future::pending::<()>().await; 938 Ok(()) 939 }) 940 })?; 941 let instance = linker.instantiate_async(&mut store, &component).await?; 942 let func = instance.get_typed_func::<(), ()>(&mut store, "f")?; 943 store 944 .run_concurrent(async |store| -> wasmtime::Result<()> { 945 func.call_concurrent(store, ()).await?; 946 947 for _ in 0..5 { 948 tokio::task::yield_now().await; 949 } 950 Ok(()) 951 }) 952 .await??; 953 954 // The host task was cancelled, nothing should remain. 955 store.assert_concurrent_state_empty(); 956 957 Ok(()) 958 } 959 960 #[tokio::test] 961 #[cfg_attr(miri, ignore)] 962 async fn sync_lower_async_host_does_not_leak() -> Result<()> { 963 let mut config = Config::new(); 964 config.wasm_component_model_async(true); 965 let engine = Engine::new(&config)?; 966 967 let mut store = Store::new(&engine, 0); 968 let component = Component::new( 969 &engine, 970 r#"(component 971 (import "f" (func $f async)) 972 973 (core module $m 974 (import "" "f" (func $f)) 975 (func (export "run") 976 (local $c i32) 977 978 ;; call the host 100 times 979 loop 980 call $f 981 (local.tee $c (i32.add (local.get $c) (i32.const 1))) 982 i32.const 100 983 i32.ne 984 if br 1 end 985 end 986 ) 987 ) 988 (core func $f (canon lower (func $f) )) 989 (core instance $i (instantiate $m 990 (with "" (instance 991 (export "f" (func $f)) 992 )) 993 )) 994 995 (func (export "f") async 996 (canon lift (core func $i "run"))) 997 998 999 )"#, 1000 )?; 1001 1002 let mut linker = Linker::<usize>::new(&engine); 1003 linker 1004 .root() 1005 .func_wrap_concurrent("f", |accessor, (): ()| { 1006 Box::pin(async move { 1007 // Ensure that this doesn't hit the fast path of "ready on 1008 // first poll" 1009 for _ in 0..5 { 1010 tokio::task::yield_now().await; 1011 } 1012 1013 // Keep track of the maximum size of the table in 1014 // concurrent_state. 1015 accessor.with(|mut s| { 1016 let cur = s.as_context_mut().concurrent_state_table_size(); 1017 let max = s.data_mut(); 1018 *max = (*max).max(cur); 1019 }); 1020 Ok(()) 1021 }) 1022 })?; 1023 let instance = linker.instantiate_async(&mut store, &component).await?; 1024 let func = instance.get_typed_func::<(), ()>(&mut store, "f")?; 1025 func.call_async(&mut store, ()).await?; 1026 1027 // First-level assertion: nothing should remain after the guest has exited. 1028 store.assert_concurrent_state_empty(); 1029 1030 // Second-level assertion: state should be incrementally cleaned up along 1031 // the way as the guest calls the host. Things shouldn't leak until the 1032 // guest exits at the end, for example. 1033 assert!( 1034 *store.data() < 100, 1035 "the store peaked at over 100 items in the concurrent table which \ 1036 indicates that something isn't getting cleaned up between executions \ 1037 of the host" 1038 ); 1039 1040 Ok(()) 1041 } 1042 1043 /// Regression test: `stream.cancel-read` with `async` option must not corrupt 1044 /// the read state when it returns BLOCKED. 1045 /// 1046 /// Bug: cancel_read/cancel_write unconditionally transitioned the read/write 1047 /// state from GuestReady to Open after the cancel, even when the cancel 1048 /// returned BLOCKED. This destroyed the buffer address/count info, causing 1049 /// an error when the host later tried to access the stream state. 1050 #[tokio::test] 1051 #[cfg_attr(miri, ignore)] 1052 async fn stream_cancel_read_async_does_not_corrupt_state() -> Result<()> { 1053 _ = env_logger::try_init(); 1054 1055 let mut config = Config::new(); 1056 config.wasm_component_model_async(true); 1057 config.wasm_component_model_async_builtins(true); 1058 config.wasm_component_model_async_stackful(true); 1059 let engine = Engine::new(&config)?; 1060 1061 let component = Component::new( 1062 &engine, 1063 r#" 1064 (component 1065 (core module $libc (memory (export "memory") 1)) 1066 (core instance $libc (instantiate $libc)) 1067 (core module $m 1068 (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) 1069 (import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32))) 1070 (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) 1071 (import "" "waitable.join" (func $waitable.join (param i32 i32))) 1072 (import "" "waitable-set.new" (func $waitable-set.new (result i32))) 1073 (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) 1074 (import "" "waitable-set.drop" (func $waitable-set.drop (param i32))) 1075 (memory (export "memory") 1) 1076 1077 (func (export "run") (param $sr i32) 1078 (local $cancel_result i32) 1079 (local $ws i32) 1080 1081 ;; Async read into buffer at 0x100, length 4. 1082 ;; Should return BLOCKED (-1) since the host producer never writes. 1083 (call $stream.read (local.get $sr) (i32.const 0x100) (i32.const 4)) 1084 i32.const -1 ;; BLOCKED 1085 i32.ne 1086 if unreachable end 1087 1088 ;; Async cancel-read. The host write end is HostReady, so this returns 1089 ;; BLOCKED. Bug: the cancel unconditionally transitions GuestReady -> Open, 1090 ;; destroying the buffer info. 1091 (local.set $cancel_result (call $stream.cancel-read (local.get $sr))) 1092 1093 ;; If cancel returned BLOCKED (-1), wait for the cancel to complete. 1094 ;; This is where the bug manifests: when the host processes the cancel, 1095 ;; it accesses the read state which was corrupted from GuestReady to Open. 1096 (if (i32.eq (local.get $cancel_result) (i32.const -1)) 1097 (then 1098 (local.set $ws (call $waitable-set.new)) 1099 (call $waitable.join (local.get $sr) (local.get $ws)) 1100 ;; Wait for the stream event (cancel completion). Event buffer at 0x200. 1101 (drop (call $waitable-set.wait (local.get $ws) (i32.const 0x200))) 1102 ;; Unjoin stream from waitable-set (join to 0 = unjoin) 1103 (call $waitable.join (local.get $sr) (i32.const 0)) 1104 (call $waitable-set.drop (local.get $ws)) 1105 ) 1106 ) 1107 1108 ;; Drop the stream 1109 (call $stream.drop-readable (local.get $sr)) 1110 ) 1111 ) 1112 1113 (type $s (stream u8)) 1114 (core func $stream.read (canon stream.read $s async (memory $libc "memory"))) 1115 (core func $stream.cancel-read (canon stream.cancel-read $s async)) 1116 (core func $stream.drop-readable (canon stream.drop-readable $s)) 1117 (canon waitable.join (core func $waitable.join)) 1118 (canon waitable-set.new (core func $waitable-set.new)) 1119 (canon waitable-set.wait (memory $libc "memory") (core func $waitable-set.wait)) 1120 (canon waitable-set.drop (core func $waitable-set.drop)) 1121 1122 (core instance $i (instantiate $m 1123 (with "" (instance 1124 (export "stream.read" (func $stream.read)) 1125 (export "stream.cancel-read" (func $stream.cancel-read)) 1126 (export "stream.drop-readable" (func $stream.drop-readable)) 1127 (export "waitable.join" (func $waitable.join)) 1128 (export "waitable-set.new" (func $waitable-set.new)) 1129 (export "waitable-set.wait" (func $waitable-set.wait)) 1130 (export "waitable-set.drop" (func $waitable-set.drop)) 1131 )) 1132 )) 1133 1134 (func (export "run") async (param "s" (stream u8)) 1135 (canon lift 1136 (core func $i "run") 1137 (memory $libc "memory") 1138 ) 1139 ) 1140 ) 1141 "#, 1142 )?; 1143 1144 let mut store = Store::new(&engine, ()); 1145 let instance = Linker::new(&engine) 1146 .instantiate_async(&mut store, &component) 1147 .await?; 1148 let func = instance.get_typed_func::<(StreamReader<u8>,), ()>(&mut store, "run")?; 1149 1150 // Create a host-side stream that never produces data (always Pending). 1151 // When cancel is requested (finish=true), it acknowledges the cancellation. 1152 let reader = StreamReader::new(&mut store, NeverWriteStreamProducer)?; 1153 func.call_async(&mut store, (reader,)).await?; 1154 1155 return Ok(()); 1156 1157 struct NeverWriteStreamProducer; 1158 1159 impl StreamProducer<()> for NeverWriteStreamProducer { 1160 type Item = u8; 1161 type Buffer = Option<u8>; 1162 1163 fn poll_produce<'a>( 1164 self: Pin<&mut Self>, 1165 _cx: &mut Context<'_>, 1166 _store: StoreContextMut<'a, ()>, 1167 _destination: Destination<'a, Self::Item, Self::Buffer>, 1168 finish: bool, 1169 ) -> Poll<Result<StreamResult>> { 1170 if finish { 1171 // Cancel requested — acknowledge it. 1172 Poll::Ready(Ok(StreamResult::Cancelled)) 1173 } else { 1174 // Never produce data. 1175 Poll::Pending 1176 } 1177 } 1178 } 1179 } 1180 1181 /// Regression test: multiple threads may concurrently make a synchronous 1182 /// call into the same async host function without corrupting state. 1183 /// 1184 /// Bug: waitable sets for host calls used to be shared across all threads, so if two threads 1185 /// called a sync-lowered async host function concurrently, the waitable set state got overwritten. 1186 #[tokio::test] 1187 #[cfg_attr(miri, ignore)] 1188 async fn concurrent_sync_calls_to_async_host() -> Result<()> { 1189 _ = env_logger::try_init(); 1190 1191 let mut config = Config::new(); 1192 config.wasm_component_model_async(true); 1193 config.wasm_component_model_async_builtins(true); 1194 config.wasm_component_model_async_stackful(true); 1195 config.wasm_component_model_threading(true); 1196 let engine = Engine::new(&config)?; 1197 let mut store = Store::new(&engine, 0); 1198 1199 let component = Component::new( 1200 &engine, 1201 r#"(component 1202 (import "await-three-calls" (func $await-three-calls async)) 1203 1204 (core module $libc 1205 (table (export "__indirect_function_table") 1 funcref)) 1206 1207 (core module $m 1208 (import "" "await-three-calls" (func $await-three-calls)) 1209 (import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32))) 1210 (import "" "thread.unsuspend" (func $thread-unsuspend (param i32))) 1211 (import "libc" "__indirect_function_table" (table $indirect-function-table 1 funcref)) 1212 1213 (func (export "run") 1214 (call $thread-new-indirect (i32.const 0) (i32.const 0)) 1215 (call $thread-unsuspend) 1216 (call $thread-new-indirect (i32.const 0) (i32.const 0)) 1217 (call $thread-unsuspend) 1218 (call $await-three-calls) 1219 ) 1220 (func $thread-entry (param i32) 1221 (call $await-three-calls) 1222 ) 1223 (elem (table $indirect-function-table) (i32.const 0) func $thread-entry) 1224 ) 1225 ;; Instantiate the libc module to get the table 1226 (core instance $libc (instantiate $libc)) 1227 ;; Get access to `thread.new-indirect` that uses the table from libc 1228 (core type $start-func-ty (func (param i32))) 1229 (alias core export $libc "__indirect_function_table" (core table $indirect-function-table)) 1230 (core func $thread-new-indirect 1231 (canon thread.new-indirect $start-func-ty (table $indirect-function-table))) 1232 (core func $thread-unsuspend (canon thread.unsuspend)) 1233 1234 (core func $await-three-calls (canon lower (func $await-three-calls) )) 1235 (core instance $i (instantiate $m 1236 (with "" (instance 1237 (export "await-three-calls" (func $await-three-calls)) 1238 (export "thread.new-indirect" (func $thread-new-indirect)) 1239 (export "thread.unsuspend" (func $thread-unsuspend)) 1240 )) 1241 (with "libc" (instance $libc)) 1242 )) 1243 (func (export "run") async 1244 (canon lift (core func $i "run"))) 1245 )"#, 1246 )?; 1247 1248 let mut linker = Linker::<i32>::new(&engine); 1249 linker 1250 .root() 1251 .func_wrap_concurrent("await-three-calls", |accessor, (): ()| { 1252 Box::pin(async move { 1253 accessor.with(|mut s| { 1254 *s.data_mut() += 1; 1255 }); 1256 while accessor.with(|mut s| *s.data_mut()) < 3 { 1257 tokio::task::yield_now().await; 1258 } 1259 Ok(()) 1260 }) 1261 })?; 1262 let instance = linker.instantiate_async(&mut store, &component).await?; 1263 let func = instance.get_typed_func::<(), ()>(&mut store, "run")?; 1264 func.call_async(&mut store, ()).await?; 1265 1266 store.assert_concurrent_state_empty(); 1267 1268 Ok(()) 1269 } 1270