1 //! For a high-level overview of this fuzz target see `fuzz_async.rs` 2 3 use crate::block_on; 4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest; 5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command}; 6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types; 7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope}; 8 use futures::channel::oneshot; 9 use std::collections::{HashMap, HashSet}; 10 use std::mem; 11 use std::pin::Pin; 12 use std::sync::{Arc, OnceLock, Weak}; 13 use std::task::{Context, Poll, Waker}; 14 use std::time::Instant; 15 use wasmtime::component::{ 16 Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer, 17 FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, 18 StreamReader, StreamResult, VecBuffer, 19 }; 20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut}; 21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView}; 22 23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new(); 24 25 /// Initializes state for future fuzz runs. 26 /// 27 /// This will create an `Engine` to run this fuzzer within and it will 28 /// additionally precompile the component that will be used for fuzzing. 29 /// 30 /// There are a few points of note about this: 31 /// 32 /// * The `misc` fuzzer is manually instrumented with this function as the init 33 /// hook to ensure this runs before any other fuzzing. 34 /// 35 /// * Compilation of the component takes quite some time with 36 /// fuzzing-instrumented Cranelift. To assist with local development this 37 /// implements a cache which is serialized/deserialized via an env var. 38 pub fn init() { 39 crate::init_fuzzing(); 40 41 STATE.get_or_init(|| { 42 let mut config = Config::new(); 43 config.wasm_component_model_async(true); 44 let engine = Engine::new(&config).unwrap(); 45 let component = compile(&engine); 46 let mut linker = Linker::new(&engine); 47 wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap(); 48 async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 49 types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 50 51 let pre = linker.instantiate_pre(&component).unwrap(); 52 let pre = FuzzAsyncPre::new(pre).unwrap(); 53 54 (engine, pre) 55 }); 56 57 fn compile(engine: &Engine) -> Component { 58 let wasm = test_programs_artifacts::FUZZ_ASYNC_COMPONENT; 59 let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok(); 60 if let Some(path) = &cwasm_cache 61 && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) 62 && let Ok(wasm_mtime) = std::fs::metadata(wasm).and_then(|m| m.modified()) 63 && cwasm_mtime > wasm_mtime 64 { 65 log::debug!("Using cached component async cwasm at {path}"); 66 unsafe { 67 return Component::deserialize_file(engine, path).unwrap(); 68 } 69 } 70 71 let composition = { 72 let mut config = wasm_compose::config::Config::default(); 73 let tempdir = tempfile::TempDir::new().unwrap(); 74 let path = tempdir.path().join("fuzz-async.wasm"); 75 std::fs::copy(wasm, &path).unwrap(); 76 config.definitions.push(path.clone()); 77 78 wasm_compose::composer::ComponentComposer::new(&path, &config) 79 .compose() 80 .unwrap() 81 }; 82 let start = Instant::now(); 83 let component = Component::new(&engine, &composition).unwrap(); 84 if let Some(path) = cwasm_cache { 85 log::debug!("Caching component async cwasm to {path}"); 86 std::fs::write(path, &component.serialize().unwrap()).unwrap(); 87 } else if start.elapsed() > std::time::Duration::from_secs(1) { 88 eprintln!( 89 " 90 !!!!!!!!!!!!!!!!!!!!!!!!!! 91 92 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to 93 cache compilation results 94 95 !!!!!!!!!!!!!!!!!!!!!!!!!! 96 " 97 ); 98 } 99 return component; 100 } 101 } 102 103 #[derive(Default)] 104 struct Data { 105 ctx: WasiCtx, 106 table: ResourceTable, 107 wakers: HashMap<Scope, Waker>, 108 commands: Vec<(Scope, Command)>, 109 110 guest_caller_stream: Option<StreamReader<Command>>, 111 guest_callee_stream: Option<StreamReader<Command>>, 112 113 host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>, 114 host_pending_async_calls_cancelled: HashSet<u32>, 115 guest_pending_async_calls_ready: HashSet<u32>, 116 117 // State of futures/streams. Note that while #12091 is unresolved an 118 // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams 119 // and the various halves we're interacting with using traits. 120 host_futures: HashMap<u32, FutureReader<u32>>, 121 host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>, 122 host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>, 123 host_streams: HashMap<u32, StreamReader<u32>>, 124 host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>, 125 host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>, 126 } 127 128 impl WasiView for Data { 129 fn ctx(&mut self) -> WasiCtxView<'_> { 130 WasiCtxView { 131 ctx: &mut self.ctx, 132 table: &mut self.table, 133 } 134 } 135 } 136 137 impl async_test::HostWithStore for HasSelf<Data> { 138 async fn async_ready<T>(_store: &Accessor<T, Self>) {} 139 140 async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) { 141 let (tx, rx) = oneshot::channel(); 142 store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx)); 143 let record = RecordCancelOnDrop { store, id }; 144 rx.await.unwrap(); 145 mem::forget(record); 146 147 struct RecordCancelOnDrop<'a, T: 'static> { 148 store: &'a Accessor<T, HasSelf<Data>>, 149 id: u32, 150 } 151 152 impl<T> Drop for RecordCancelOnDrop<'_, T> { 153 fn drop(&mut self) { 154 self.store.with(|mut s| { 155 s.get().host_pending_async_calls_cancelled.insert(self.id); 156 }); 157 } 158 } 159 } 160 161 async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {} 162 } 163 164 impl async_test::Host for Data { 165 fn sync_ready(&mut self) {} 166 167 fn future_take(&mut self, id: u32) -> FutureReader<u32> { 168 self.host_futures.remove(&id).unwrap() 169 } 170 171 fn future_receive(&mut self, id: u32, future: FutureReader<u32>) { 172 let prev = self.host_futures.insert(id, future); 173 assert!(prev.is_none()); 174 } 175 176 fn stream_take(&mut self, id: u32) -> StreamReader<u32> { 177 self.host_streams.remove(&id).unwrap() 178 } 179 180 fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) { 181 let prev = self.host_streams.insert(id, stream); 182 assert!(prev.is_none()); 183 } 184 } 185 186 impl types::HostWithStore for HasSelf<Data> { 187 fn get_commands<T>( 188 mut store: Access<'_, T, Self>, 189 scope: types::Scope, 190 ) -> StreamReader<Command> { 191 let data = store.get(); 192 match scope { 193 types::Scope::Caller => data.guest_caller_stream.take().unwrap(), 194 types::Scope::Callee => data.guest_callee_stream.take().unwrap(), 195 } 196 } 197 } 198 199 impl types::Host for Data {} 200 201 /// Executes the `input` provided, assuming that `init` has been previously 202 /// executed. 203 pub fn run(mut input: ComponentAsync) { 204 log::debug!("Running component async fuzz test with\n{input:?}"); 205 206 // Commands are executed in the order that they're listed in the input, but 207 // to make it easier on the `StreamProducer` implementation below they're 208 // popped off the back. To ensure that they're all delivered in the right 209 // order reverse the list to ensure the correct order is maintained. 210 input.commands.reverse(); 211 212 let (engine, pre) = STATE.get().unwrap(); 213 let mut store = Store::new( 214 engine, 215 Data { 216 ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(), 217 commands: input.commands, 218 ..Data::default() 219 }, 220 ); 221 222 let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)); 223 let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)); 224 store.data_mut().guest_caller_stream = Some(guest_caller_stream); 225 store.data_mut().guest_callee_stream = Some(guest_callee_stream); 226 block_on(async { 227 let instance = pre.instantiate_async(&mut store).await.unwrap(); 228 let test = instance.wasmtime_fuzz_fuzz_async_test(); 229 230 let mut host_caller = SharedStream(Scope::HostCaller); 231 let mut host_callee = SharedStream(Scope::HostCallee); 232 store 233 .run_concurrent(async |store| { 234 // Kick off stream reads in the guest. This function will return 235 // but the tasks in the guest will keep running after they 236 // return to process stream items. 237 test.call_init(store, types::Scope::Caller).await.unwrap(); 238 239 // Simultaneously process commands from both host streams. These 240 // will return once the entire command queue is exhausted. 241 futures::join!( 242 async { 243 while let Some(cmd) = host_caller.next(store).await { 244 host_caller_cmd(&test, store, cmd).await; 245 } 246 }, 247 async { 248 while let Some(cmd) = host_callee.next(store).await { 249 host_callee_cmd(store, cmd).await; 250 } 251 }, 252 ); 253 254 // Note that there may still be pending async work in the guest 255 // (or host). It's intentional that it's not cleaned up here to 256 // help test situations where async work is all abruptly 257 // cancelled by just being dropped in the host. 258 }) 259 .await 260 .unwrap(); 261 }); 262 } 263 264 /// See documentation in `fuzz_async.rs` for what's going on here. 265 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool 266 where 267 F: FnMut(&mut Data) -> bool, 268 { 269 for _ in 0..1000 { 270 let ready = store.with(|mut s| f(s.get())); 271 if ready { 272 return true; 273 } 274 275 crate::YieldN(1).await; 276 } 277 278 return false; 279 } 280 281 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F) 282 where 283 F: FnMut(&mut Data) -> bool, 284 { 285 assert!( 286 test_property(store, f).await, 287 "timed out waiting for {desc}", 288 ); 289 } 290 291 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) { 292 match cmd { 293 Command::Ack => {} 294 Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(), 295 Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(), 296 Command::AsyncPendingExportComplete(_i) => todo!(), 297 Command::AsyncPendingExportAssertCancelled(_i) => todo!(), 298 Command::AsyncPendingImportCall(i) => { 299 struct RunPendingImport { 300 test: Guest, 301 i: u32, 302 } 303 304 store.spawn(RunPendingImport { 305 test: test.clone(), 306 i, 307 }); 308 309 impl AccessorTask<Data> for RunPendingImport { 310 async fn run(self, store: &Accessor<Data>) -> Result<()> { 311 self.test.call_async_pending(store, self.i).await?; 312 store.with(|mut s| { 313 s.get().guest_pending_async_calls_ready.insert(self.i); 314 }); 315 Ok(()) 316 } 317 } 318 } 319 Command::AsyncPendingImportCancel(_i) => todo!(), 320 Command::AsyncPendingImportAssertReady(i) => { 321 assert!( 322 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await, 323 "expected async_pending import {i} to be ready", 324 ); 325 } 326 327 Command::FutureTake(i) => { 328 let future = test.call_future_take(store, i).await.unwrap(); 329 store.with(|mut s| { 330 let prev = s.get().host_futures.insert(i, future); 331 assert!(prev.is_none()); 332 }); 333 } 334 Command::FutureGive(i) => { 335 let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap()); 336 test.call_future_receive(store, i, future).await.unwrap(); 337 } 338 Command::StreamTake(i) => { 339 let stream = test.call_stream_take(store, i).await.unwrap(); 340 store.with(|mut s| { 341 let prev = s.get().host_streams.insert(i, stream); 342 assert!(prev.is_none()); 343 }); 344 } 345 Command::StreamGive(i) => { 346 let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap()); 347 test.call_stream_receive(store, i, stream).await.unwrap(); 348 } 349 350 other => future_or_stream_cmd(store, other).await, 351 } 352 } 353 354 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) { 355 match cmd { 356 Command::Ack => {} 357 Command::SyncReadyCall => todo!(), 358 Command::AsyncReadyCall => todo!(), 359 Command::AsyncPendingExportComplete(i) => store.with(|mut s| { 360 s.get() 361 .host_pending_async_calls 362 .remove(&i) 363 .unwrap() 364 .send(()) 365 .unwrap(); 366 }), 367 Command::AsyncPendingExportAssertCancelled(i) => { 368 assert!( 369 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await, 370 "expected async_pending export {i} to be cancelled", 371 ); 372 } 373 Command::AsyncPendingImportCall(_i) => todo!(), 374 Command::AsyncPendingImportCancel(_i) => todo!(), 375 Command::AsyncPendingImportAssertReady(_i) => todo!(), 376 377 other => future_or_stream_cmd(store, other).await, 378 } 379 } 380 381 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) { 382 match cmd { 383 // These commands should be handled above 384 Command::Ack 385 | Command::SyncReadyCall 386 | Command::AsyncReadyCall 387 | Command::AsyncPendingExportComplete(_) 388 | Command::AsyncPendingExportAssertCancelled(_) 389 | Command::AsyncPendingImportCall(_) 390 | Command::AsyncPendingImportCancel(_) 391 | Command::FutureTake(_) 392 | Command::FutureGive(_) 393 | Command::StreamTake(_) 394 | Command::StreamGive(_) 395 | Command::AsyncPendingImportAssertReady(_) => unreachable!(), 396 397 Command::FutureNew(id) => { 398 store.with(|mut s| { 399 let arc = Arc::new(()); 400 let weak = Arc::downgrade(&arc); 401 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)); 402 let data = s.get(); 403 let prev = data.host_futures.insert(id, future); 404 assert!(prev.is_none()); 405 let prev = data 406 .host_future_producers 407 .insert(id, (HostFutureProducerState::Idle, weak)); 408 assert!(prev.is_none()); 409 }); 410 } 411 Command::FutureDropReadable(id) => { 412 store.with(|mut s| match s.get().host_futures.remove(&id) { 413 Some(mut future) => future.close(&mut s), 414 None => { 415 let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap(); 416 state.wake_by_ref(); 417 } 418 }) 419 } 420 Command::FutureWriteReady(payload) => { 421 await_property(store, "future write should be waiting", |s| { 422 matches!( 423 s.host_future_producers.get(&payload.future), 424 Some((HostFutureProducerState::Waiting(_), _)) 425 ) 426 }) 427 .await; 428 store.with(|mut s| { 429 let state = s 430 .get() 431 .host_future_producers 432 .get_mut(&payload.future) 433 .unwrap(); 434 match state { 435 (HostFutureProducerState::Waiting(waker), _) => { 436 waker.wake_by_ref(); 437 state.0 = HostFutureProducerState::Writing(payload.item); 438 } 439 (state, _) => panic!("future not waiting: {state:?}"), 440 } 441 }) 442 } 443 Command::FutureWritePending(payload) => store.with(|mut s| { 444 let state = s 445 .get() 446 .host_future_producers 447 .get_mut(&payload.future) 448 .unwrap(); 449 match state { 450 (HostFutureProducerState::Idle, _) => { 451 state.0 = HostFutureProducerState::Writing(payload.item); 452 } 453 _ => panic!("future not idle"), 454 } 455 }), 456 Command::FutureWriteDropped(id) => store.with(|mut s| { 457 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 458 assert!(matches!(state, HostFutureProducerState::Idle)); 459 assert!(weak.upgrade().is_none()); 460 }), 461 Command::FutureReadReady(payload) => { 462 let id = payload.future; 463 store.with(|mut s| { 464 let arc = Arc::new(()); 465 let weak = Arc::downgrade(&arc); 466 let data = s.get(); 467 let future = data.host_futures.remove(&id).unwrap(); 468 let prev = data 469 .host_future_consumers 470 .insert(id, (HostFutureConsumerState::Consuming, weak)); 471 assert!(prev.is_none()); 472 future.pipe(&mut s, HostFutureConsumer(id, arc)); 473 }); 474 475 await_property(store, "future should be present", |s| { 476 matches!( 477 s.host_future_consumers[&id], 478 (HostFutureConsumerState::Complete(_), _) 479 ) 480 }) 481 .await; 482 483 store.with(|mut s| { 484 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap(); 485 match state { 486 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 487 _ => panic!("future not complete"), 488 } 489 }); 490 } 491 Command::FutureReadPending(id) => { 492 ensure_future_reading(store, id); 493 store.with(|mut s| { 494 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 495 state.wake_by_ref(); 496 assert!( 497 matches!(state, HostFutureConsumerState::Idle), 498 "bad state: {state:?}", 499 ); 500 *state = HostFutureConsumerState::Consuming; 501 }) 502 } 503 Command::FutureCancelWrite(id) => store.with(|mut s| { 504 let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap(); 505 assert!(matches!(state, HostFutureProducerState::Writing(_))); 506 *state = HostFutureProducerState::Idle; 507 }), 508 Command::FutureCancelRead(id) => store.with(|mut s| { 509 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 510 assert!(matches!(state, HostFutureConsumerState::Consuming)); 511 *state = HostFutureConsumerState::Idle; 512 }), 513 Command::FutureReadAssertComplete(payload) => { 514 await_property(store, "future read should be complete", |s| { 515 matches!( 516 s.host_future_consumers.get(&payload.future), 517 Some((HostFutureConsumerState::Complete(_), _)) 518 ) 519 }) 520 .await; 521 store.with(|mut s| { 522 let (state, _) = s 523 .get() 524 .host_future_consumers 525 .remove(&payload.future) 526 .unwrap(); 527 match state { 528 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 529 _ => panic!("future not complete"), 530 } 531 }) 532 } 533 Command::FutureWriteAssertComplete(id) => store.with(|mut s| { 534 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 535 assert!(matches!(state, HostFutureProducerState::Complete)); 536 assert!(weak.upgrade().is_none()); 537 }), 538 Command::FutureWriteAssertDropped(id) => store.with(|mut s| { 539 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 540 assert!(matches!(state, HostFutureProducerState::Writing(_))); 541 assert!(weak.upgrade().is_none()); 542 }), 543 544 Command::StreamNew(id) => { 545 store.with(|mut s| { 546 let arc = Arc::new(()); 547 let weak = Arc::downgrade(&arc); 548 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)); 549 let data = s.get(); 550 let prev = data.host_streams.insert(id, stream); 551 assert!(prev.is_none()); 552 let prev = data 553 .host_stream_producers 554 .insert(id, (HostStreamProducerState::idle(), weak)); 555 assert!(prev.is_none()); 556 }); 557 } 558 Command::StreamDropReadable(id) => { 559 store.with(|mut s| match s.get().host_streams.remove(&id) { 560 Some(mut stream) => { 561 stream.close(&mut s); 562 } 563 None => { 564 let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap(); 565 state.wake_by_ref(); 566 } 567 }) 568 } 569 Command::StreamDropWritable(id) => store.with(|mut s| { 570 let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap(); 571 state.wake_by_ref(); 572 }), 573 Command::StreamWriteReady(payload) => { 574 let id = payload.stream; 575 store.with(|mut s| { 576 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 577 state.wake_by_ref(); 578 match state.kind { 579 HostStreamProducerStateKind::Idle => { 580 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 581 payload.item, 582 payload.op_count, 583 )); 584 } 585 _ => panic!("stream not idle: {state:?}"), 586 } 587 }); 588 await_property(store, "stream should complete a write", |s| { 589 matches!( 590 s.host_stream_producers[&id].0.kind, 591 HostStreamProducerStateKind::Wrote(_), 592 ) 593 }) 594 .await; 595 store.with(|mut s| { 596 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 597 match state.kind { 598 HostStreamProducerStateKind::Wrote(amt) => { 599 assert_eq!(amt, payload.ready_count); 600 state.kind = HostStreamProducerStateKind::Idle; 601 } 602 _ => panic!("stream not idle: {state:?}"), 603 } 604 }); 605 } 606 Command::StreamReadReady(payload) => { 607 let id = payload.stream; 608 ensure_stream_reading(store, id); 609 store.with(|mut s| { 610 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 611 state.wake_by_ref(); 612 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count); 613 }); 614 await_property(store, "stream should complete a read", |s| { 615 matches!( 616 s.host_stream_consumers[&id].0.kind, 617 HostStreamConsumerStateKind::Consumed(_), 618 ) 619 }) 620 .await; 621 622 store.with(|mut s| { 623 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 624 match &state.kind { 625 HostStreamConsumerStateKind::Consumed(last_read) => { 626 assert_eq!( 627 *last_read, 628 stream_payload(payload.item, payload.ready_count) 629 ); 630 state.kind = HostStreamConsumerStateKind::Idle; 631 } 632 _ => panic!("future not complete"), 633 } 634 }); 635 } 636 Command::StreamWritePending(payload) => store.with(|mut s| { 637 let (state, _) = s 638 .get() 639 .host_stream_producers 640 .get_mut(&payload.stream) 641 .unwrap(); 642 state.wake_by_ref(); 643 match state.kind { 644 HostStreamProducerStateKind::Idle => { 645 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 646 payload.item, 647 payload.count, 648 )); 649 } 650 _ => panic!("stream not idle {:?}", state.kind), 651 } 652 }), 653 Command::StreamReadPending(payload) => { 654 ensure_stream_reading(store, payload.stream); 655 store.with(|mut s| { 656 let (state, _) = s 657 .get() 658 .host_stream_consumers 659 .get_mut(&payload.stream) 660 .unwrap(); 661 state.wake_by_ref(); 662 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 663 state.kind = HostStreamConsumerStateKind::Consuming(payload.count); 664 }) 665 } 666 Command::StreamWriteDropped(payload) => store.with(|mut s| { 667 let (state, weak) = s 668 .get() 669 .host_stream_producers 670 .get_mut(&payload.stream) 671 .unwrap(); 672 assert!(matches!(state.kind, HostStreamProducerStateKind::Idle)); 673 assert!(weak.upgrade().is_none()); 674 }), 675 Command::StreamReadDropped(payload) => { 676 ensure_stream_reading(store, payload.stream); 677 await_property(store, "stream read should get dropped", |s| { 678 let weak = &s.host_stream_consumers[&payload.stream].1; 679 weak.upgrade().is_none() 680 }) 681 .await; 682 store.with(|mut s| { 683 let (state, weak) = s 684 .get() 685 .host_stream_consumers 686 .get_mut(&payload.stream) 687 .unwrap(); 688 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 689 assert!(weak.upgrade().is_none()); 690 }) 691 } 692 Command::StreamCancelWrite(id) => store.with(|mut s| { 693 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 694 assert!( 695 matches!(state.kind, HostStreamProducerStateKind::Writing(_)), 696 "invalid state {state:?}", 697 ); 698 state.kind = HostStreamProducerStateKind::Idle; 699 state.wake_by_ref(); 700 }), 701 Command::StreamCancelRead(id) => store.with(|mut s| { 702 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 703 assert!(matches!( 704 state.kind, 705 HostStreamConsumerStateKind::Consuming(_) 706 )); 707 state.kind = HostStreamConsumerStateKind::Idle; 708 }), 709 Command::StreamReadAssertComplete(payload) => store.with(|mut s| { 710 let (state, _) = s 711 .get() 712 .host_stream_consumers 713 .get_mut(&payload.stream) 714 .unwrap(); 715 match &state.kind { 716 HostStreamConsumerStateKind::Consumed(last_read) => { 717 assert_eq!(*last_read, stream_payload(payload.item, payload.count)); 718 state.kind = HostStreamConsumerStateKind::Idle; 719 } 720 _ => panic!("stream not complete"), 721 } 722 }), 723 Command::StreamWriteAssertComplete(payload) => store.with(|mut s| { 724 let (state, _) = s 725 .get() 726 .host_stream_producers 727 .get_mut(&payload.stream) 728 .unwrap(); 729 match state.kind { 730 HostStreamProducerStateKind::Wrote(amt) => { 731 assert_eq!(amt, payload.count); 732 state.kind = HostStreamProducerStateKind::Idle; 733 } 734 _ => panic!("stream not complete: {:?}", state.kind), 735 } 736 }), 737 Command::StreamWriteAssertDropped(payload) => { 738 await_property(store, "stream write should be dropped", |s| { 739 let weak = &s.host_stream_producers[&payload.stream].1; 740 weak.upgrade().is_none() 741 }) 742 .await; 743 store.with(|mut s| { 744 let (state, weak) = s 745 .get() 746 .host_stream_producers 747 .get_mut(&payload.stream) 748 .unwrap(); 749 assert!(matches!( 750 state.kind, 751 HostStreamProducerStateKind::Writing(_) 752 )); 753 assert!(weak.upgrade().is_none()); 754 }) 755 } 756 Command::StreamReadAssertDropped(id) => { 757 await_property(store, "stream read should be dropped", |s| { 758 let weak = &s.host_stream_consumers[&id].1; 759 weak.upgrade().is_none() 760 }) 761 .await; 762 store.with(|mut s| { 763 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 764 assert!(matches!( 765 state.kind, 766 HostStreamConsumerStateKind::Consuming(_), 767 )); 768 assert!(weak.upgrade().is_none()); 769 }) 770 } 771 } 772 } 773 774 fn stream_payload(item: u32, count: u32) -> Vec<u32> { 775 (item..item + count).collect() 776 } 777 778 fn ensure_future_reading(store: &Accessor<Data>, id: u32) { 779 store.with(|mut s| { 780 let data = s.get(); 781 if !data.host_futures.contains_key(&id) { 782 return; 783 } 784 log::debug!("future consume: start {id}"); 785 let arc = Arc::new(()); 786 let weak = Arc::downgrade(&arc); 787 let data = s.get(); 788 let future = data.host_futures.remove(&id).unwrap(); 789 let prev = data 790 .host_future_consumers 791 .insert(id, (HostFutureConsumerState::Idle, weak)); 792 assert!(prev.is_none()); 793 future.pipe(&mut s, HostFutureConsumer(id, arc)); 794 }); 795 } 796 797 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) { 798 store.with(|mut s| { 799 let data = s.get(); 800 if !data.host_streams.contains_key(&id) { 801 return; 802 } 803 log::debug!("stream consume: start {id}"); 804 let arc = Arc::new(()); 805 let weak = Arc::downgrade(&arc); 806 let prev = data.host_stream_consumers.insert( 807 id, 808 ( 809 HostStreamConsumerState { 810 kind: HostStreamConsumerStateKind::Idle, 811 waker: None, 812 }, 813 weak, 814 ), 815 ); 816 assert!(prev.is_none()); 817 let stream = data.host_streams.remove(&id).unwrap(); 818 stream.pipe(&mut s, HostStreamConsumer(id, arc)); 819 }); 820 } 821 822 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 823 824 /// Note that this is only created once a read is actually initiated on a 825 /// future. It's also not possible to cancel a host-based read on a future, 826 /// hence why this is simpler than the `HostFutureProducerState` state below. 827 #[derive(Debug)] 828 enum HostFutureConsumerState { 829 Idle, 830 Waiting(Waker), 831 Consuming, 832 Complete(u32), 833 } 834 835 impl HostFutureConsumerState { 836 fn wake_by_ref(&mut self) { 837 if let HostFutureConsumerState::Waiting(waker) = &self { 838 waker.wake_by_ref(); 839 *self = HostFutureConsumerState::Idle; 840 } 841 } 842 } 843 844 impl FutureConsumer<Data> for HostFutureConsumer { 845 type Item = u32; 846 847 fn poll_consume( 848 self: Pin<&mut Self>, 849 cx: &mut Context<'_>, 850 mut store: StoreContextMut<'_, Data>, 851 mut source: Source<'_, Self::Item>, 852 finish: bool, 853 ) -> Poll<Result<()>> { 854 let state = match store.data_mut().host_future_consumers.get_mut(&self.0) { 855 Some(state) => state, 856 None => { 857 log::debug!("consume: closed {}", self.0); 858 return Poll::Ready(Ok(())); 859 } 860 }; 861 match state.0 { 862 HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => { 863 if finish { 864 log::debug!("consume: cancel {}", self.0); 865 state.0 = HostFutureConsumerState::Idle; 866 Poll::Ready(Ok(())) 867 } else { 868 log::debug!("consume: wait {}", self.0); 869 state.0 = HostFutureConsumerState::Waiting(cx.waker().clone()); 870 Poll::Pending 871 } 872 } 873 HostFutureConsumerState::Consuming => { 874 log::debug!("consume: done {}", self.0); 875 let mut item = None; 876 source.read(&mut store, &mut item).unwrap(); 877 store 878 .data_mut() 879 .host_future_consumers 880 .get_mut(&self.0) 881 .unwrap() 882 .0 = HostFutureConsumerState::Complete(item.unwrap()); 883 Poll::Ready(Ok(())) 884 } 885 HostFutureConsumerState::Complete(_) => unreachable!(), 886 } 887 } 888 } 889 890 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 891 892 #[derive(Debug)] 893 enum HostFutureProducerState { 894 Idle, 895 Waiting(Waker), 896 Writing(u32), 897 Complete, 898 } 899 900 impl FutureProducer<Data> for HostFutureProducer { 901 type Item = u32; 902 903 fn poll_produce( 904 self: Pin<&mut Self>, 905 cx: &mut Context<'_>, 906 mut store: StoreContextMut<'_, Data>, 907 finish: bool, 908 ) -> Poll<Result<Option<Self::Item>>> { 909 let state = store 910 .data_mut() 911 .host_future_producers 912 .get_mut(&self.0) 913 .unwrap(); 914 match state.0 { 915 HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => { 916 if finish { 917 log::debug!("produce: cancel {}", self.0); 918 state.0 = HostFutureProducerState::Idle; 919 Poll::Ready(Ok(None)) 920 } else { 921 log::debug!("produce: wait {}", self.0); 922 state.0 = HostFutureProducerState::Waiting(cx.waker().clone()); 923 Poll::Pending 924 } 925 } 926 HostFutureProducerState::Writing(item) => { 927 log::debug!("produce: done {}", self.0); 928 state.0 = HostFutureProducerState::Complete; 929 Poll::Ready(Ok(Some(item))) 930 } 931 HostFutureProducerState::Complete => unreachable!(), 932 } 933 } 934 } 935 936 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 937 938 #[derive(Debug)] 939 struct HostStreamConsumerState { 940 waker: Option<Waker>, 941 kind: HostStreamConsumerStateKind, 942 } 943 944 #[derive(Debug)] 945 enum HostStreamConsumerStateKind { 946 Idle, 947 Consuming(u32), 948 Consumed(Vec<u32>), 949 } 950 951 impl HostStreamConsumerState { 952 fn wake_by_ref(&mut self) { 953 if let Some(waker) = self.waker.take() { 954 waker.wake(); 955 } 956 } 957 } 958 959 impl StreamConsumer<Data> for HostStreamConsumer { 960 type Item = u32; 961 962 fn poll_consume( 963 self: Pin<&mut Self>, 964 cx: &mut Context<'_>, 965 mut store: StoreContextMut<'_, Data>, 966 mut source: Source<'_, Self::Item>, 967 finish: bool, 968 ) -> Poll<Result<StreamResult>> { 969 let remaining = source.remaining(&mut store); 970 let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) { 971 Some((state, _)) => state, 972 None => { 973 log::debug!("stream consume: dropped {}", self.0); 974 return Poll::Ready(Ok(StreamResult::Dropped)); 975 } 976 }; 977 match state.kind { 978 HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => { 979 if finish { 980 log::debug!("stream consume: cancel {}", self.0); 981 state.waker = None; 982 Poll::Ready(Ok(StreamResult::Cancelled)) 983 } else { 984 log::debug!("stream consume: wait {}", self.0); 985 state.waker = Some(cx.waker().clone()); 986 Poll::Pending 987 } 988 } 989 HostStreamConsumerStateKind::Consuming(amt) => { 990 // The writer is performing a zero-length write. We always 991 // complete that without updating our own state. 992 if remaining == 0 { 993 log::debug!("stream consume: completing zero-length write {}", self.0); 994 return Poll::Ready(Ok(StreamResult::Completed)); 995 } 996 997 // If this is a zero-length read then block the writer but update our own state. 998 if amt == 0 { 999 log::debug!("stream consume: finishing zero-length read {}", self.0); 1000 state.kind = HostStreamConsumerStateKind::Consumed(Vec::new()); 1001 state.waker = Some(cx.waker().clone()); 1002 return Poll::Pending; 1003 } 1004 1005 // For non-zero sizes perform the read/copy. 1006 log::debug!("stream consume: done {}", self.0); 1007 let mut dst = Vec::with_capacity(amt as usize); 1008 source.read(&mut store, &mut dst).unwrap(); 1009 let state = &mut store 1010 .data_mut() 1011 .host_stream_consumers 1012 .get_mut(&self.0) 1013 .unwrap() 1014 .0; 1015 state.kind = HostStreamConsumerStateKind::Consumed(dst); 1016 state.waker = None; 1017 Poll::Ready(Ok(StreamResult::Completed)) 1018 } 1019 } 1020 } 1021 } 1022 1023 impl Drop for HostStreamConsumer { 1024 fn drop(&mut self) { 1025 log::debug!("stream consume: drop {}", self.0); 1026 } 1027 } 1028 1029 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 1030 1031 #[derive(Debug)] 1032 struct HostStreamProducerState { 1033 kind: HostStreamProducerStateKind, 1034 waker: Option<Waker>, 1035 } 1036 1037 #[derive(Debug)] 1038 enum HostStreamProducerStateKind { 1039 Idle, 1040 Writing(Vec<u32>), 1041 Wrote(u32), 1042 } 1043 1044 impl HostStreamProducerState { 1045 fn idle() -> Self { 1046 HostStreamProducerState { 1047 kind: HostStreamProducerStateKind::Idle, 1048 waker: None, 1049 } 1050 } 1051 1052 fn wake_by_ref(&mut self) { 1053 if let Some(waker) = self.waker.take() { 1054 waker.wake(); 1055 } 1056 } 1057 } 1058 1059 impl StreamProducer<Data> for HostStreamProducer { 1060 type Item = u32; 1061 type Buffer = VecBuffer<u32>; 1062 1063 fn poll_produce( 1064 self: Pin<&mut Self>, 1065 cx: &mut Context<'_>, 1066 mut store: StoreContextMut<'_, Data>, 1067 mut dst: Destination<'_, Self::Item, Self::Buffer>, 1068 finish: bool, 1069 ) -> Poll<Result<StreamResult>> { 1070 let remaining = dst.remaining(&mut store); 1071 let data = store.data_mut(); 1072 let state = match data.host_stream_producers.get_mut(&self.0) { 1073 Some((state, _)) => state, 1074 None => { 1075 log::debug!("stream produce: dropped {}", self.0); 1076 return Poll::Ready(Ok(StreamResult::Dropped)); 1077 } 1078 }; 1079 match &mut state.kind { 1080 HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => { 1081 if finish { 1082 log::debug!("stream produce: cancel {}", self.0); 1083 state.waker = None; 1084 Poll::Ready(Ok(StreamResult::Cancelled)) 1085 } else { 1086 log::debug!("stream produce: wait {}", self.0); 1087 state.waker = Some(cx.waker().clone()); 1088 Poll::Pending 1089 } 1090 } 1091 HostStreamProducerStateKind::Writing(buf) => { 1092 // Keep the other side blocked for a zero-length write 1093 // originated from the host. 1094 if buf.len() == 0 { 1095 log::debug!("stream produce: zero-length write {}", self.0); 1096 state.kind = HostStreamProducerStateKind::Wrote(0); 1097 state.waker = Some(cx.waker().clone()); 1098 return Poll::Pending; 1099 } 1100 log::debug!("stream produce: write {}", self.0); 1101 match remaining { 1102 Some(amt) => { 1103 // If the guest is doing a zero-length read then we've 1104 // got some data for them. Complete the read but leave 1105 // ourselves in the same `Writing` state as before. 1106 if amt == 0 { 1107 state.waker = None; 1108 return Poll::Ready(Ok(StreamResult::Completed)); 1109 } 1110 1111 // Don't let wasmtime buffer up data for us, so truncate 1112 // the buffer we're sending over to the amount that the 1113 // reader is requesting. 1114 if amt < buf.len() { 1115 buf.truncate(amt); 1116 } 1117 } 1118 1119 // At this time host<->host stream reads/writes aren't 1120 // fuzzed since that brings up a bunch of weird edge cases 1121 // which aren't fun to deal with and aren't interesting 1122 // either. 1123 None => unreachable!(), 1124 } 1125 let count = buf.len() as u32; 1126 dst.set_buffer(mem::take(buf).into()); 1127 state.kind = HostStreamProducerStateKind::Wrote(count); 1128 state.waker = None; 1129 Poll::Ready(Ok(StreamResult::Completed)) 1130 } 1131 } 1132 } 1133 } 1134 1135 impl Drop for HostStreamProducer { 1136 fn drop(&mut self) { 1137 log::debug!("stream produce: drop {}", self.0); 1138 } 1139 } 1140 1141 struct SharedStream(Scope); 1142 1143 impl SharedStream { 1144 async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> { 1145 std::future::poll_fn(|cx| { 1146 accessor.with(|mut store| { 1147 self.poll(cx, store.as_context_mut(), false) 1148 .map(|pair| match pair { 1149 (None, StreamResult::Dropped) => None, 1150 (Some(item), StreamResult::Completed) => Some(item), 1151 _ => unreachable!(), 1152 }) 1153 }) 1154 }) 1155 .await 1156 } 1157 1158 fn poll( 1159 &mut self, 1160 cx: &mut Context<'_>, 1161 mut store: StoreContextMut<'_, Data>, 1162 finish: bool, 1163 ) -> Poll<(Option<Command>, StreamResult)> { 1164 let data = store.data_mut(); 1165 1166 // If no more commands remain then this is a closed and dropped stream. 1167 let Some((scope, command)) = data.commands.last_mut() else { 1168 log::debug!("Stream closed: {:?}", self.0); 1169 return Poll::Ready((None, StreamResult::Dropped)); 1170 }; 1171 1172 // If the next queued up command is for the scope that this stream is 1173 // attached to then send off the command. 1174 if *scope == self.0 { 1175 let ret = Some(*command); 1176 1177 // All commands are followed up with an "ack", and after the "ack" 1178 // is delivered then the command is popped to move on to the next 1179 // command. The reason for this is to guarantee that a command has 1180 // been processed before moving on to the next command. This helps 1181 // make the fuzzing easier to work with by being able to implicitly 1182 // assume that a command has been processed by the time something 1183 // else is. Otherwise it might be possible that wasmtime has a set 1184 // of commands/callbacks that are all delivered at the same time and 1185 // the component model doesn't specify what order they happen 1186 // within. By forcing an "ack" it ensures a more expected ordering 1187 // of execution to assist with fuzzing without losing really all 1188 // that much coverage. 1189 if matches!(command, Command::Ack) { 1190 data.commands.pop(); 1191 } else { 1192 *command = Command::Ack; 1193 } 1194 1195 // After a command was popped other streams may be able to make 1196 // progress so wake them all up. 1197 for (_, waker) in data.wakers.drain() { 1198 waker.wake(); 1199 } 1200 log::debug!("Delivering command {ret:?} for {:?}", self.0); 1201 return Poll::Ready((ret, StreamResult::Completed)); 1202 } 1203 1204 // The command queue is non-empty and the next command isn't meant for 1205 // us, so someone else needs to drain the queue. Enqueue our waker. 1206 if finish { 1207 Poll::Ready((None, StreamResult::Cancelled)) 1208 } else { 1209 data.wakers.insert(self.0, cx.waker().clone()); 1210 Poll::Pending 1211 } 1212 } 1213 } 1214 1215 impl StreamProducer<Data> for SharedStream { 1216 type Item = Command; 1217 type Buffer = Option<Command>; 1218 1219 fn poll_produce<'a>( 1220 mut self: Pin<&mut Self>, 1221 cx: &mut Context<'_>, 1222 store: StoreContextMut<'a, Data>, 1223 mut destination: Destination<'a, Self::Item, Self::Buffer>, 1224 finish: bool, 1225 ) -> Poll<Result<StreamResult>> { 1226 let (item, result) = std::task::ready!(self.poll(cx, store, finish)); 1227 destination.set_buffer(item); 1228 Poll::Ready(Ok(result)) 1229 } 1230 } 1231 1232 #[cfg(test)] 1233 mod tests { 1234 use super::{ComponentAsync, Scope, init, run}; 1235 use crate::oracles::component_async::types::*; 1236 use crate::test::test_n_times; 1237 use Scope::*; 1238 1239 #[test] 1240 fn smoke() { 1241 init(); 1242 1243 test_n_times(50, |c, _| { 1244 run(c); 1245 Ok(()) 1246 }); 1247 } 1248 1249 // ======================================================================== 1250 // A series of fuzz-generated test cases which caused problems during the 1251 // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer 1252 // changes over time. 1253 1254 #[test] 1255 fn simple() { 1256 init(); 1257 1258 run(ComponentAsync { 1259 commands: vec![ 1260 (GuestCaller, Command::AsyncPendingImportCall(0)), 1261 (GuestCallee, Command::AsyncPendingImportCall(1)), 1262 (GuestCallee, Command::AsyncPendingExportComplete(0)), 1263 (GuestCaller, Command::AsyncPendingImportAssertReady(0)), 1264 (GuestCaller, Command::AsyncPendingImportCall(2)), 1265 ], 1266 }); 1267 } 1268 1269 #[test] 1270 fn somewhat_larger() { 1271 static COMMANDS: &[(Scope, Command)] = &[ 1272 (GuestCallee, Command::FutureNew(0)), 1273 (HostCaller, Command::FutureNew(1)), 1274 (GuestCallee, Command::FutureReadPending(0)), 1275 (GuestCaller, Command::AsyncPendingImportCall(2)), 1276 (GuestCaller, Command::AsyncPendingImportCall(3)), 1277 (GuestCaller, Command::AsyncPendingImportCall(4)), 1278 (GuestCaller, Command::AsyncPendingImportCall(5)), 1279 (GuestCallee, Command::AsyncPendingExportComplete(5)), 1280 (GuestCallee, Command::AsyncPendingExportComplete(3)), 1281 (GuestCallee, Command::AsyncPendingExportComplete(4)), 1282 (GuestCallee, Command::AsyncPendingExportComplete(2)), 1283 (GuestCaller, Command::AsyncPendingImportCall(6)), 1284 (GuestCallee, Command::AsyncPendingExportComplete(6)), 1285 (GuestCaller, Command::AsyncPendingImportCall(7)), 1286 (GuestCallee, Command::AsyncPendingExportComplete(7)), 1287 (GuestCaller, Command::AsyncPendingImportCall(8)), 1288 (GuestCallee, Command::AsyncPendingExportComplete(8)), 1289 (GuestCaller, Command::AsyncPendingImportCall(9)), 1290 (GuestCallee, Command::AsyncPendingExportComplete(9)), 1291 (GuestCaller, Command::AsyncPendingImportCall(10)), 1292 (GuestCallee, Command::AsyncPendingExportComplete(10)), 1293 (GuestCaller, Command::AsyncPendingImportCall(11)), 1294 (GuestCallee, Command::AsyncPendingExportComplete(11)), 1295 (GuestCaller, Command::AsyncPendingImportCall(12)), 1296 (GuestCallee, Command::AsyncPendingExportComplete(12)), 1297 (GuestCaller, Command::AsyncPendingImportCall(13)), 1298 (GuestCallee, Command::AsyncPendingExportComplete(13)), 1299 (GuestCaller, Command::AsyncPendingImportCall(14)), 1300 (GuestCallee, Command::AsyncPendingExportComplete(14)), 1301 (GuestCaller, Command::AsyncPendingImportCall(15)), 1302 (GuestCallee, Command::AsyncPendingExportComplete(15)), 1303 (GuestCaller, Command::AsyncPendingImportCall(16)), 1304 (GuestCallee, Command::AsyncPendingExportComplete(16)), 1305 (GuestCaller, Command::AsyncPendingImportCall(17)), 1306 (GuestCallee, Command::AsyncPendingExportComplete(17)), 1307 (GuestCaller, Command::AsyncPendingImportCall(18)), 1308 (GuestCallee, Command::AsyncPendingExportComplete(18)), 1309 (GuestCaller, Command::AsyncPendingImportCall(19)), 1310 (GuestCallee, Command::AsyncPendingExportComplete(19)), 1311 (GuestCaller, Command::AsyncPendingImportCall(20)), 1312 (GuestCallee, Command::AsyncPendingExportComplete(20)), 1313 (GuestCaller, Command::AsyncPendingImportCall(21)), 1314 (GuestCallee, Command::AsyncPendingExportComplete(21)), 1315 (GuestCaller, Command::AsyncPendingImportCall(22)), 1316 (GuestCallee, Command::AsyncPendingExportComplete(22)), 1317 (GuestCaller, Command::AsyncPendingImportCall(23)), 1318 (GuestCallee, Command::AsyncPendingExportComplete(23)), 1319 (GuestCaller, Command::AsyncPendingImportCall(24)), 1320 (GuestCallee, Command::AsyncPendingExportComplete(24)), 1321 (GuestCaller, Command::AsyncPendingImportCall(25)), 1322 (GuestCallee, Command::AsyncPendingExportComplete(25)), 1323 (GuestCaller, Command::AsyncPendingImportCall(26)), 1324 (GuestCallee, Command::AsyncPendingExportComplete(26)), 1325 (GuestCaller, Command::AsyncPendingImportCall(27)), 1326 (GuestCallee, Command::AsyncPendingExportComplete(27)), 1327 (GuestCaller, Command::AsyncPendingImportCall(28)), 1328 (GuestCallee, Command::AsyncPendingExportComplete(28)), 1329 (GuestCaller, Command::AsyncPendingImportCall(29)), 1330 (GuestCallee, Command::AsyncPendingExportComplete(29)), 1331 (GuestCaller, Command::AsyncPendingImportCall(30)), 1332 (GuestCallee, Command::AsyncPendingExportComplete(30)), 1333 (GuestCaller, Command::AsyncPendingImportCall(31)), 1334 (GuestCallee, Command::AsyncPendingExportComplete(31)), 1335 (GuestCaller, Command::AsyncPendingImportCall(32)), 1336 (GuestCallee, Command::AsyncPendingExportComplete(32)), 1337 (GuestCaller, Command::AsyncPendingImportCall(33)), 1338 (GuestCallee, Command::AsyncPendingExportComplete(33)), 1339 (GuestCaller, Command::AsyncPendingImportCall(34)), 1340 (GuestCallee, Command::AsyncPendingExportComplete(34)), 1341 (GuestCaller, Command::AsyncPendingImportCall(35)), 1342 (GuestCallee, Command::AsyncPendingExportComplete(35)), 1343 (GuestCaller, Command::AsyncPendingImportCall(36)), 1344 (GuestCallee, Command::AsyncPendingExportComplete(36)), 1345 (GuestCaller, Command::AsyncPendingImportCall(37)), 1346 (GuestCallee, Command::AsyncPendingExportComplete(37)), 1347 (GuestCaller, Command::AsyncPendingImportAssertReady(36)), 1348 ]; 1349 init(); 1350 1351 run(ComponentAsync { 1352 commands: COMMANDS.to_vec(), 1353 }); 1354 } 1355 1356 #[test] 1357 fn simple_stream1() { 1358 init(); 1359 1360 run(ComponentAsync { 1361 commands: vec![ 1362 (HostCallee, Command::StreamNew(1)), 1363 ( 1364 HostCallee, 1365 Command::StreamReadPending(StreamReadPayload { 1366 stream: 1, 1367 count: 2, 1368 }), 1369 ), 1370 (HostCallee, Command::StreamCancelRead(1)), 1371 (GuestCaller, Command::SyncReadyCall), 1372 ( 1373 HostCallee, 1374 Command::StreamWritePending(StreamWritePayload { 1375 stream: 1, 1376 item: 3, 1377 count: 2, 1378 }), 1379 ), 1380 (HostCallee, Command::StreamCancelWrite(1)), 1381 (HostCallee, Command::StreamDropWritable(1)), 1382 ( 1383 HostCallee, 1384 Command::StreamReadDropped(StreamReadPayload { 1385 stream: 1, 1386 count: 1, 1387 }), 1388 ), 1389 ], 1390 }); 1391 } 1392 1393 #[test] 1394 fn simple_stream3() { 1395 init(); 1396 1397 run(ComponentAsync { 1398 commands: vec![ 1399 (GuestCaller, Command::StreamNew(26)), 1400 ( 1401 GuestCaller, 1402 Command::StreamReadPending(StreamReadPayload { 1403 stream: 26, 1404 count: 10, 1405 }), 1406 ), 1407 (GuestCaller, Command::StreamDropWritable(26)), 1408 (GuestCaller, Command::StreamReadAssertDropped(26)), 1409 ], 1410 }); 1411 } 1412 1413 #[test] 1414 fn simple_stream4() { 1415 init(); 1416 1417 run(ComponentAsync { 1418 commands: vec![ 1419 (GuestCaller, Command::StreamNew(23)), 1420 ( 1421 GuestCaller, 1422 Command::StreamWritePending(StreamWritePayload { 1423 stream: 23, 1424 item: 24, 1425 count: 2, 1426 }), 1427 ), 1428 (GuestCaller, Command::StreamGive(23)), 1429 (GuestCallee, Command::StreamDropReadable(23)), 1430 ( 1431 GuestCaller, 1432 Command::StreamWriteAssertDropped(StreamReadPayload { 1433 stream: 23, 1434 count: 0, 1435 }), 1436 ), 1437 ], 1438 }); 1439 } 1440 1441 #[test] 1442 fn zero_length_behavior() { 1443 init(); 1444 1445 run(ComponentAsync { 1446 commands: vec![ 1447 (GuestCaller, Command::StreamNew(10)), 1448 (HostCaller, Command::StreamTake(10)), 1449 ( 1450 GuestCaller, 1451 Command::StreamWritePending(StreamWritePayload { 1452 stream: 10, 1453 item: 13, 1454 count: 5, 1455 }), 1456 ), 1457 ( 1458 HostCaller, 1459 Command::StreamReadReady(StreamReadyPayload { 1460 stream: 10, 1461 item: 0, 1462 op_count: 0, 1463 ready_count: 0, 1464 }), 1465 ), 1466 ( 1467 HostCaller, 1468 Command::StreamReadReady(StreamReadyPayload { 1469 stream: 10, 1470 item: 0, 1471 op_count: 0, 1472 ready_count: 0, 1473 }), 1474 ), 1475 ], 1476 }); 1477 } 1478 } 1479