1 //! For a high-level overview of this fuzz target see `fuzz_async.rs` 2 3 use crate::block_on; 4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest; 5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command}; 6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types; 7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope}; 8 use futures::channel::oneshot; 9 use std::collections::{HashMap, HashSet}; 10 use std::mem; 11 use std::pin::Pin; 12 use std::sync::{Arc, OnceLock, Weak}; 13 use std::task::{Context, Poll, Waker}; 14 use std::time::Instant; 15 use wasmtime::component::{ 16 Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer, 17 FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, 18 StreamReader, StreamResult, VecBuffer, 19 }; 20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut}; 21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView}; 22 23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new(); 24 25 /// Initializes state for future fuzz runs. 26 /// 27 /// This will create an `Engine` to run this fuzzer within and it will 28 /// additionally precompile the component that will be used for fuzzing. 29 /// 30 /// There are a few points of note about this: 31 /// 32 /// * The `misc` fuzzer is manually instrumented with this function as the init 33 /// hook to ensure this runs before any other fuzzing. 34 /// 35 /// * Compilation of the component takes quite some time with 36 /// fuzzing-instrumented Cranelift. To assist with local development this 37 /// implements a cache which is serialized/deserialized via an env var. 38 pub fn init() { 39 crate::init_fuzzing(); 40 41 STATE.get_or_init(|| { 42 let mut config = Config::new(); 43 config.wasm_component_model_async(true); 44 let engine = Engine::new(&config).unwrap(); 45 let component = compile(&engine); 46 let mut linker = Linker::new(&engine); 47 wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap(); 48 async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 49 types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 50 51 let pre = linker.instantiate_pre(&component).unwrap(); 52 let pre = FuzzAsyncPre::new(pre).unwrap(); 53 54 (engine, pre) 55 }); 56 57 fn compile(engine: &Engine) -> Component { 58 let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT; 59 let wasm = test_programs_artifacts::fuzz_async_component_bytes!(); 60 let wasm = &wasm[..]; 61 let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok(); 62 if let Some(path) = &cwasm_cache 63 && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) 64 && let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified()) 65 && cwasm_mtime > wasm_mtime 66 { 67 log::debug!("Using cached component async cwasm at {path}"); 68 unsafe { 69 return Component::deserialize_file(engine, path).unwrap(); 70 } 71 } 72 73 let composition = { 74 let mut config = wasm_compose::config::Config::default(); 75 let tempdir = tempfile::TempDir::new().unwrap(); 76 let path = tempdir.path().join("fuzz-async.wasm"); 77 std::fs::write(&path, wasm).unwrap(); 78 config.definitions.push(path.clone()); 79 80 wasm_compose::composer::ComponentComposer::new(&path, &config) 81 .compose() 82 .unwrap() 83 }; 84 let start = Instant::now(); 85 let component = Component::new(&engine, &composition).unwrap(); 86 if let Some(path) = cwasm_cache { 87 log::debug!("Caching component async cwasm to {path}"); 88 std::fs::write(path, &component.serialize().unwrap()).unwrap(); 89 } else if start.elapsed() > std::time::Duration::from_secs(1) { 90 eprintln!( 91 " 92 !!!!!!!!!!!!!!!!!!!!!!!!!! 93 94 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to 95 cache compilation results 96 97 !!!!!!!!!!!!!!!!!!!!!!!!!! 98 " 99 ); 100 } 101 return component; 102 } 103 } 104 105 #[derive(Default)] 106 struct Data { 107 ctx: WasiCtx, 108 table: ResourceTable, 109 wakers: HashMap<Scope, Waker>, 110 commands: Vec<(Scope, Command)>, 111 112 guest_caller_stream: Option<StreamReader<Command>>, 113 guest_callee_stream: Option<StreamReader<Command>>, 114 115 host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>, 116 host_pending_async_calls_cancelled: HashSet<u32>, 117 guest_pending_async_calls_ready: HashSet<u32>, 118 119 // State of futures/streams. Note that while #12091 is unresolved an 120 // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams 121 // and the various halves we're interacting with using traits. 122 host_futures: HashMap<u32, FutureReader<u32>>, 123 host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>, 124 host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>, 125 host_streams: HashMap<u32, StreamReader<u32>>, 126 host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>, 127 host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>, 128 } 129 130 impl WasiView for Data { 131 fn ctx(&mut self) -> WasiCtxView<'_> { 132 WasiCtxView { 133 ctx: &mut self.ctx, 134 table: &mut self.table, 135 } 136 } 137 } 138 139 impl async_test::HostWithStore for HasSelf<Data> { 140 async fn async_ready<T>(_store: &Accessor<T, Self>) {} 141 142 async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) { 143 let (tx, rx) = oneshot::channel(); 144 store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx)); 145 let record = RecordCancelOnDrop { store, id }; 146 rx.await.unwrap(); 147 mem::forget(record); 148 149 struct RecordCancelOnDrop<'a, T: 'static> { 150 store: &'a Accessor<T, HasSelf<Data>>, 151 id: u32, 152 } 153 154 impl<T> Drop for RecordCancelOnDrop<'_, T> { 155 fn drop(&mut self) { 156 self.store.with(|mut s| { 157 s.get().host_pending_async_calls_cancelled.insert(self.id); 158 }); 159 } 160 } 161 } 162 163 async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {} 164 } 165 166 impl async_test::Host for Data { 167 fn sync_ready(&mut self) {} 168 169 fn future_take(&mut self, id: u32) -> FutureReader<u32> { 170 self.host_futures.remove(&id).unwrap() 171 } 172 173 fn future_receive(&mut self, id: u32, future: FutureReader<u32>) { 174 let prev = self.host_futures.insert(id, future); 175 assert!(prev.is_none()); 176 } 177 178 fn stream_take(&mut self, id: u32) -> StreamReader<u32> { 179 self.host_streams.remove(&id).unwrap() 180 } 181 182 fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) { 183 let prev = self.host_streams.insert(id, stream); 184 assert!(prev.is_none()); 185 } 186 } 187 188 impl types::HostWithStore for HasSelf<Data> { 189 fn get_commands<T>( 190 mut store: Access<'_, T, Self>, 191 scope: types::Scope, 192 ) -> StreamReader<Command> { 193 let data = store.get(); 194 match scope { 195 types::Scope::Caller => data.guest_caller_stream.take().unwrap(), 196 types::Scope::Callee => data.guest_callee_stream.take().unwrap(), 197 } 198 } 199 } 200 201 impl types::Host for Data {} 202 203 /// Executes the `input` provided, assuming that `init` has been previously 204 /// executed. 205 pub fn run(mut input: ComponentAsync) { 206 log::debug!("Running component async fuzz test with\n{input:?}"); 207 208 // Commands are executed in the order that they're listed in the input, but 209 // to make it easier on the `StreamProducer` implementation below they're 210 // popped off the back. To ensure that they're all delivered in the right 211 // order reverse the list to ensure the correct order is maintained. 212 input.commands.reverse(); 213 214 let (engine, pre) = STATE.get().unwrap(); 215 let mut store = Store::new( 216 engine, 217 Data { 218 ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(), 219 commands: input.commands, 220 ..Data::default() 221 }, 222 ); 223 224 let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)); 225 let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)); 226 store.data_mut().guest_caller_stream = Some(guest_caller_stream); 227 store.data_mut().guest_callee_stream = Some(guest_callee_stream); 228 block_on(async { 229 let instance = pre.instantiate_async(&mut store).await.unwrap(); 230 let test = instance.wasmtime_fuzz_fuzz_async_test(); 231 232 let mut host_caller = SharedStream(Scope::HostCaller); 233 let mut host_callee = SharedStream(Scope::HostCallee); 234 store 235 .run_concurrent(async |store| { 236 // Kick off stream reads in the guest. This function will return 237 // but the tasks in the guest will keep running after they 238 // return to process stream items. 239 test.call_init(store, types::Scope::Caller).await.unwrap(); 240 241 // Simultaneously process commands from both host streams. These 242 // will return once the entire command queue is exhausted. 243 futures::join!( 244 async { 245 while let Some(cmd) = host_caller.next(store).await { 246 host_caller_cmd(&test, store, cmd).await; 247 } 248 }, 249 async { 250 while let Some(cmd) = host_callee.next(store).await { 251 host_callee_cmd(store, cmd).await; 252 } 253 }, 254 ); 255 256 // Note that there may still be pending async work in the guest 257 // (or host). It's intentional that it's not cleaned up here to 258 // help test situations where async work is all abruptly 259 // cancelled by just being dropped in the host. 260 }) 261 .await 262 .unwrap(); 263 }); 264 } 265 266 /// See documentation in `fuzz_async.rs` for what's going on here. 267 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool 268 where 269 F: FnMut(&mut Data) -> bool, 270 { 271 for _ in 0..1000 { 272 let ready = store.with(|mut s| f(s.get())); 273 if ready { 274 return true; 275 } 276 277 crate::YieldN(1).await; 278 } 279 280 return false; 281 } 282 283 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F) 284 where 285 F: FnMut(&mut Data) -> bool, 286 { 287 assert!( 288 test_property(store, f).await, 289 "timed out waiting for {desc}", 290 ); 291 } 292 293 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) { 294 match cmd { 295 Command::Ack => {} 296 Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(), 297 Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(), 298 Command::AsyncPendingExportComplete(_i) => todo!(), 299 Command::AsyncPendingExportAssertCancelled(_i) => todo!(), 300 Command::AsyncPendingImportCall(i) => { 301 struct RunPendingImport { 302 test: Guest, 303 i: u32, 304 } 305 306 store.spawn(RunPendingImport { 307 test: test.clone(), 308 i, 309 }); 310 311 impl AccessorTask<Data> for RunPendingImport { 312 async fn run(self, store: &Accessor<Data>) -> Result<()> { 313 self.test.call_async_pending(store, self.i).await?; 314 store.with(|mut s| { 315 s.get().guest_pending_async_calls_ready.insert(self.i); 316 }); 317 Ok(()) 318 } 319 } 320 } 321 Command::AsyncPendingImportCancel(_i) => todo!(), 322 Command::AsyncPendingImportAssertReady(i) => { 323 assert!( 324 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await, 325 "expected async_pending import {i} to be ready", 326 ); 327 } 328 329 Command::FutureTake(i) => { 330 let future = test.call_future_take(store, i).await.unwrap(); 331 store.with(|mut s| { 332 let prev = s.get().host_futures.insert(i, future); 333 assert!(prev.is_none()); 334 }); 335 } 336 Command::FutureGive(i) => { 337 let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap()); 338 test.call_future_receive(store, i, future).await.unwrap(); 339 } 340 Command::StreamTake(i) => { 341 let stream = test.call_stream_take(store, i).await.unwrap(); 342 store.with(|mut s| { 343 let prev = s.get().host_streams.insert(i, stream); 344 assert!(prev.is_none()); 345 }); 346 } 347 Command::StreamGive(i) => { 348 let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap()); 349 test.call_stream_receive(store, i, stream).await.unwrap(); 350 } 351 352 other => future_or_stream_cmd(store, other).await, 353 } 354 } 355 356 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) { 357 match cmd { 358 Command::Ack => {} 359 Command::SyncReadyCall => todo!(), 360 Command::AsyncReadyCall => todo!(), 361 Command::AsyncPendingExportComplete(i) => store.with(|mut s| { 362 s.get() 363 .host_pending_async_calls 364 .remove(&i) 365 .unwrap() 366 .send(()) 367 .unwrap(); 368 }), 369 Command::AsyncPendingExportAssertCancelled(i) => { 370 assert!( 371 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await, 372 "expected async_pending export {i} to be cancelled", 373 ); 374 } 375 Command::AsyncPendingImportCall(_i) => todo!(), 376 Command::AsyncPendingImportCancel(_i) => todo!(), 377 Command::AsyncPendingImportAssertReady(_i) => todo!(), 378 379 other => future_or_stream_cmd(store, other).await, 380 } 381 } 382 383 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) { 384 match cmd { 385 // These commands should be handled above 386 Command::Ack 387 | Command::SyncReadyCall 388 | Command::AsyncReadyCall 389 | Command::AsyncPendingExportComplete(_) 390 | Command::AsyncPendingExportAssertCancelled(_) 391 | Command::AsyncPendingImportCall(_) 392 | Command::AsyncPendingImportCancel(_) 393 | Command::FutureTake(_) 394 | Command::FutureGive(_) 395 | Command::StreamTake(_) 396 | Command::StreamGive(_) 397 | Command::AsyncPendingImportAssertReady(_) => unreachable!(), 398 399 Command::FutureNew(id) => { 400 store.with(|mut s| { 401 let arc = Arc::new(()); 402 let weak = Arc::downgrade(&arc); 403 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)); 404 let data = s.get(); 405 let prev = data.host_futures.insert(id, future); 406 assert!(prev.is_none()); 407 let prev = data 408 .host_future_producers 409 .insert(id, (HostFutureProducerState::Idle, weak)); 410 assert!(prev.is_none()); 411 }); 412 } 413 Command::FutureDropReadable(id) => { 414 store.with(|mut s| match s.get().host_futures.remove(&id) { 415 Some(mut future) => future.close(&mut s), 416 None => { 417 let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap(); 418 state.wake_by_ref(); 419 } 420 }) 421 } 422 Command::FutureWriteReady(payload) => { 423 await_property(store, "future write should be waiting", |s| { 424 matches!( 425 s.host_future_producers.get(&payload.future), 426 Some((HostFutureProducerState::Waiting(_), _)) 427 ) 428 }) 429 .await; 430 store.with(|mut s| { 431 let state = s 432 .get() 433 .host_future_producers 434 .get_mut(&payload.future) 435 .unwrap(); 436 match state { 437 (HostFutureProducerState::Waiting(waker), _) => { 438 waker.wake_by_ref(); 439 state.0 = HostFutureProducerState::Writing(payload.item); 440 } 441 (state, _) => panic!("future not waiting: {state:?}"), 442 } 443 }) 444 } 445 Command::FutureWritePending(payload) => store.with(|mut s| { 446 let state = s 447 .get() 448 .host_future_producers 449 .get_mut(&payload.future) 450 .unwrap(); 451 match state { 452 (HostFutureProducerState::Idle, _) => { 453 state.0 = HostFutureProducerState::Writing(payload.item); 454 } 455 _ => panic!("future not idle"), 456 } 457 }), 458 Command::FutureWriteDropped(id) => store.with(|mut s| { 459 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 460 assert!(matches!(state, HostFutureProducerState::Idle)); 461 assert!(weak.upgrade().is_none()); 462 }), 463 Command::FutureReadReady(payload) => { 464 let id = payload.future; 465 store.with(|mut s| { 466 let arc = Arc::new(()); 467 let weak = Arc::downgrade(&arc); 468 let data = s.get(); 469 let future = data.host_futures.remove(&id).unwrap(); 470 let prev = data 471 .host_future_consumers 472 .insert(id, (HostFutureConsumerState::Consuming, weak)); 473 assert!(prev.is_none()); 474 future.pipe(&mut s, HostFutureConsumer(id, arc)); 475 }); 476 477 await_property(store, "future should be present", |s| { 478 matches!( 479 s.host_future_consumers[&id], 480 (HostFutureConsumerState::Complete(_), _) 481 ) 482 }) 483 .await; 484 485 store.with(|mut s| { 486 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap(); 487 match state { 488 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 489 _ => panic!("future not complete"), 490 } 491 }); 492 } 493 Command::FutureReadPending(id) => { 494 ensure_future_reading(store, id); 495 store.with(|mut s| { 496 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 497 state.wake_by_ref(); 498 assert!( 499 matches!(state, HostFutureConsumerState::Idle), 500 "bad state: {state:?}", 501 ); 502 *state = HostFutureConsumerState::Consuming; 503 }) 504 } 505 Command::FutureCancelWrite(id) => store.with(|mut s| { 506 let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap(); 507 assert!(matches!(state, HostFutureProducerState::Writing(_))); 508 *state = HostFutureProducerState::Idle; 509 }), 510 Command::FutureCancelRead(id) => store.with(|mut s| { 511 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 512 assert!(matches!(state, HostFutureConsumerState::Consuming)); 513 *state = HostFutureConsumerState::Idle; 514 }), 515 Command::FutureReadAssertComplete(payload) => { 516 await_property(store, "future read should be complete", |s| { 517 matches!( 518 s.host_future_consumers.get(&payload.future), 519 Some((HostFutureConsumerState::Complete(_), _)) 520 ) 521 }) 522 .await; 523 store.with(|mut s| { 524 let (state, _) = s 525 .get() 526 .host_future_consumers 527 .remove(&payload.future) 528 .unwrap(); 529 match state { 530 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 531 _ => panic!("future not complete"), 532 } 533 }) 534 } 535 Command::FutureWriteAssertComplete(id) => store.with(|mut s| { 536 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 537 assert!(matches!(state, HostFutureProducerState::Complete)); 538 assert!(weak.upgrade().is_none()); 539 }), 540 Command::FutureWriteAssertDropped(id) => store.with(|mut s| { 541 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 542 assert!(matches!(state, HostFutureProducerState::Writing(_))); 543 assert!(weak.upgrade().is_none()); 544 }), 545 546 Command::StreamNew(id) => { 547 store.with(|mut s| { 548 let arc = Arc::new(()); 549 let weak = Arc::downgrade(&arc); 550 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)); 551 let data = s.get(); 552 let prev = data.host_streams.insert(id, stream); 553 assert!(prev.is_none()); 554 let prev = data 555 .host_stream_producers 556 .insert(id, (HostStreamProducerState::idle(), weak)); 557 assert!(prev.is_none()); 558 }); 559 } 560 Command::StreamDropReadable(id) => { 561 store.with(|mut s| match s.get().host_streams.remove(&id) { 562 Some(mut stream) => { 563 stream.close(&mut s); 564 } 565 None => { 566 let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap(); 567 state.wake_by_ref(); 568 } 569 }) 570 } 571 Command::StreamDropWritable(id) => store.with(|mut s| { 572 let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap(); 573 state.wake_by_ref(); 574 }), 575 Command::StreamWriteReady(payload) => { 576 let id = payload.stream; 577 store.with(|mut s| { 578 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 579 state.wake_by_ref(); 580 match state.kind { 581 HostStreamProducerStateKind::Idle => { 582 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 583 payload.item, 584 payload.op_count, 585 )); 586 } 587 _ => panic!("stream not idle: {state:?}"), 588 } 589 }); 590 await_property(store, "stream should complete a write", |s| { 591 matches!( 592 s.host_stream_producers[&id].0.kind, 593 HostStreamProducerStateKind::Wrote(_), 594 ) 595 }) 596 .await; 597 store.with(|mut s| { 598 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 599 match state.kind { 600 HostStreamProducerStateKind::Wrote(amt) => { 601 assert_eq!(amt, payload.ready_count); 602 state.kind = HostStreamProducerStateKind::Idle; 603 } 604 _ => panic!("stream not idle: {state:?}"), 605 } 606 }); 607 } 608 Command::StreamReadReady(payload) => { 609 let id = payload.stream; 610 ensure_stream_reading(store, id); 611 store.with(|mut s| { 612 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 613 state.wake_by_ref(); 614 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count); 615 }); 616 await_property(store, "stream should complete a read", |s| { 617 matches!( 618 s.host_stream_consumers[&id].0.kind, 619 HostStreamConsumerStateKind::Consumed(_), 620 ) 621 }) 622 .await; 623 624 store.with(|mut s| { 625 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 626 match &state.kind { 627 HostStreamConsumerStateKind::Consumed(last_read) => { 628 assert_eq!( 629 *last_read, 630 stream_payload(payload.item, payload.ready_count) 631 ); 632 state.kind = HostStreamConsumerStateKind::Idle; 633 } 634 _ => panic!("future not complete"), 635 } 636 }); 637 } 638 Command::StreamWritePending(payload) => store.with(|mut s| { 639 let (state, _) = s 640 .get() 641 .host_stream_producers 642 .get_mut(&payload.stream) 643 .unwrap(); 644 state.wake_by_ref(); 645 match state.kind { 646 HostStreamProducerStateKind::Idle => { 647 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 648 payload.item, 649 payload.count, 650 )); 651 } 652 _ => panic!("stream not idle {:?}", state.kind), 653 } 654 }), 655 Command::StreamReadPending(payload) => { 656 ensure_stream_reading(store, payload.stream); 657 store.with(|mut s| { 658 let (state, _) = s 659 .get() 660 .host_stream_consumers 661 .get_mut(&payload.stream) 662 .unwrap(); 663 state.wake_by_ref(); 664 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 665 state.kind = HostStreamConsumerStateKind::Consuming(payload.count); 666 }) 667 } 668 Command::StreamWriteDropped(payload) => store.with(|mut s| { 669 let (state, weak) = s 670 .get() 671 .host_stream_producers 672 .get_mut(&payload.stream) 673 .unwrap(); 674 assert!(matches!(state.kind, HostStreamProducerStateKind::Idle)); 675 assert!(weak.upgrade().is_none()); 676 }), 677 Command::StreamReadDropped(payload) => { 678 ensure_stream_reading(store, payload.stream); 679 await_property(store, "stream read should get dropped", |s| { 680 let weak = &s.host_stream_consumers[&payload.stream].1; 681 weak.upgrade().is_none() 682 }) 683 .await; 684 store.with(|mut s| { 685 let (state, weak) = s 686 .get() 687 .host_stream_consumers 688 .get_mut(&payload.stream) 689 .unwrap(); 690 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 691 assert!(weak.upgrade().is_none()); 692 }) 693 } 694 Command::StreamCancelWrite(id) => store.with(|mut s| { 695 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 696 assert!( 697 matches!(state.kind, HostStreamProducerStateKind::Writing(_)), 698 "invalid state {state:?}", 699 ); 700 state.kind = HostStreamProducerStateKind::Idle; 701 state.wake_by_ref(); 702 }), 703 Command::StreamCancelRead(id) => store.with(|mut s| { 704 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 705 assert!(matches!( 706 state.kind, 707 HostStreamConsumerStateKind::Consuming(_) 708 )); 709 state.kind = HostStreamConsumerStateKind::Idle; 710 }), 711 Command::StreamReadAssertComplete(payload) => store.with(|mut s| { 712 let (state, _) = s 713 .get() 714 .host_stream_consumers 715 .get_mut(&payload.stream) 716 .unwrap(); 717 match &state.kind { 718 HostStreamConsumerStateKind::Consumed(last_read) => { 719 assert_eq!(*last_read, stream_payload(payload.item, payload.count)); 720 state.kind = HostStreamConsumerStateKind::Idle; 721 } 722 _ => panic!("stream not complete"), 723 } 724 }), 725 Command::StreamWriteAssertComplete(payload) => store.with(|mut s| { 726 let (state, _) = s 727 .get() 728 .host_stream_producers 729 .get_mut(&payload.stream) 730 .unwrap(); 731 match state.kind { 732 HostStreamProducerStateKind::Wrote(amt) => { 733 assert_eq!(amt, payload.count); 734 state.kind = HostStreamProducerStateKind::Idle; 735 } 736 _ => panic!("stream not complete: {:?}", state.kind), 737 } 738 }), 739 Command::StreamWriteAssertDropped(payload) => { 740 await_property(store, "stream write should be dropped", |s| { 741 let weak = &s.host_stream_producers[&payload.stream].1; 742 weak.upgrade().is_none() 743 }) 744 .await; 745 store.with(|mut s| { 746 let (state, weak) = s 747 .get() 748 .host_stream_producers 749 .get_mut(&payload.stream) 750 .unwrap(); 751 assert!(matches!( 752 state.kind, 753 HostStreamProducerStateKind::Writing(_) 754 )); 755 assert!(weak.upgrade().is_none()); 756 }) 757 } 758 Command::StreamReadAssertDropped(id) => { 759 await_property(store, "stream read should be dropped", |s| { 760 let weak = &s.host_stream_consumers[&id].1; 761 weak.upgrade().is_none() 762 }) 763 .await; 764 store.with(|mut s| { 765 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 766 assert!(matches!( 767 state.kind, 768 HostStreamConsumerStateKind::Consuming(_), 769 )); 770 assert!(weak.upgrade().is_none()); 771 }) 772 } 773 } 774 } 775 776 fn stream_payload(item: u32, count: u32) -> Vec<u32> { 777 (item..item + count).collect() 778 } 779 780 fn ensure_future_reading(store: &Accessor<Data>, id: u32) { 781 store.with(|mut s| { 782 let data = s.get(); 783 if !data.host_futures.contains_key(&id) { 784 return; 785 } 786 log::debug!("future consume: start {id}"); 787 let arc = Arc::new(()); 788 let weak = Arc::downgrade(&arc); 789 let data = s.get(); 790 let future = data.host_futures.remove(&id).unwrap(); 791 let prev = data 792 .host_future_consumers 793 .insert(id, (HostFutureConsumerState::Idle, weak)); 794 assert!(prev.is_none()); 795 future.pipe(&mut s, HostFutureConsumer(id, arc)); 796 }); 797 } 798 799 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) { 800 store.with(|mut s| { 801 let data = s.get(); 802 if !data.host_streams.contains_key(&id) { 803 return; 804 } 805 log::debug!("stream consume: start {id}"); 806 let arc = Arc::new(()); 807 let weak = Arc::downgrade(&arc); 808 let prev = data.host_stream_consumers.insert( 809 id, 810 ( 811 HostStreamConsumerState { 812 kind: HostStreamConsumerStateKind::Idle, 813 waker: None, 814 }, 815 weak, 816 ), 817 ); 818 assert!(prev.is_none()); 819 let stream = data.host_streams.remove(&id).unwrap(); 820 stream.pipe(&mut s, HostStreamConsumer(id, arc)); 821 }); 822 } 823 824 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 825 826 /// Note that this is only created once a read is actually initiated on a 827 /// future. It's also not possible to cancel a host-based read on a future, 828 /// hence why this is simpler than the `HostFutureProducerState` state below. 829 #[derive(Debug)] 830 enum HostFutureConsumerState { 831 Idle, 832 Waiting(Waker), 833 Consuming, 834 Complete(u32), 835 } 836 837 impl HostFutureConsumerState { 838 fn wake_by_ref(&mut self) { 839 if let HostFutureConsumerState::Waiting(waker) = &self { 840 waker.wake_by_ref(); 841 *self = HostFutureConsumerState::Idle; 842 } 843 } 844 } 845 846 impl FutureConsumer<Data> for HostFutureConsumer { 847 type Item = u32; 848 849 fn poll_consume( 850 self: Pin<&mut Self>, 851 cx: &mut Context<'_>, 852 mut store: StoreContextMut<'_, Data>, 853 mut source: Source<'_, Self::Item>, 854 finish: bool, 855 ) -> Poll<Result<()>> { 856 let state = match store.data_mut().host_future_consumers.get_mut(&self.0) { 857 Some(state) => state, 858 None => { 859 log::debug!("consume: closed {}", self.0); 860 return Poll::Ready(Ok(())); 861 } 862 }; 863 match state.0 { 864 HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => { 865 if finish { 866 log::debug!("consume: cancel {}", self.0); 867 state.0 = HostFutureConsumerState::Idle; 868 Poll::Ready(Ok(())) 869 } else { 870 log::debug!("consume: wait {}", self.0); 871 state.0 = HostFutureConsumerState::Waiting(cx.waker().clone()); 872 Poll::Pending 873 } 874 } 875 HostFutureConsumerState::Consuming => { 876 log::debug!("consume: done {}", self.0); 877 let mut item = None; 878 source.read(&mut store, &mut item).unwrap(); 879 store 880 .data_mut() 881 .host_future_consumers 882 .get_mut(&self.0) 883 .unwrap() 884 .0 = HostFutureConsumerState::Complete(item.unwrap()); 885 Poll::Ready(Ok(())) 886 } 887 HostFutureConsumerState::Complete(_) => unreachable!(), 888 } 889 } 890 } 891 892 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 893 894 #[derive(Debug)] 895 enum HostFutureProducerState { 896 Idle, 897 Waiting(Waker), 898 Writing(u32), 899 Complete, 900 } 901 902 impl FutureProducer<Data> for HostFutureProducer { 903 type Item = u32; 904 905 fn poll_produce( 906 self: Pin<&mut Self>, 907 cx: &mut Context<'_>, 908 mut store: StoreContextMut<'_, Data>, 909 finish: bool, 910 ) -> Poll<Result<Option<Self::Item>>> { 911 let state = store 912 .data_mut() 913 .host_future_producers 914 .get_mut(&self.0) 915 .unwrap(); 916 match state.0 { 917 HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => { 918 if finish { 919 log::debug!("produce: cancel {}", self.0); 920 state.0 = HostFutureProducerState::Idle; 921 Poll::Ready(Ok(None)) 922 } else { 923 log::debug!("produce: wait {}", self.0); 924 state.0 = HostFutureProducerState::Waiting(cx.waker().clone()); 925 Poll::Pending 926 } 927 } 928 HostFutureProducerState::Writing(item) => { 929 log::debug!("produce: done {}", self.0); 930 state.0 = HostFutureProducerState::Complete; 931 Poll::Ready(Ok(Some(item))) 932 } 933 HostFutureProducerState::Complete => unreachable!(), 934 } 935 } 936 } 937 938 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 939 940 #[derive(Debug)] 941 struct HostStreamConsumerState { 942 waker: Option<Waker>, 943 kind: HostStreamConsumerStateKind, 944 } 945 946 #[derive(Debug)] 947 enum HostStreamConsumerStateKind { 948 Idle, 949 Consuming(u32), 950 Consumed(Vec<u32>), 951 } 952 953 impl HostStreamConsumerState { 954 fn wake_by_ref(&mut self) { 955 if let Some(waker) = self.waker.take() { 956 waker.wake(); 957 } 958 } 959 } 960 961 impl StreamConsumer<Data> for HostStreamConsumer { 962 type Item = u32; 963 964 fn poll_consume( 965 self: Pin<&mut Self>, 966 cx: &mut Context<'_>, 967 mut store: StoreContextMut<'_, Data>, 968 mut source: Source<'_, Self::Item>, 969 finish: bool, 970 ) -> Poll<Result<StreamResult>> { 971 let remaining = source.remaining(&mut store); 972 let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) { 973 Some((state, _)) => state, 974 None => { 975 log::debug!("stream consume: dropped {}", self.0); 976 return Poll::Ready(Ok(StreamResult::Dropped)); 977 } 978 }; 979 match state.kind { 980 HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => { 981 if finish { 982 log::debug!("stream consume: cancel {}", self.0); 983 state.waker = None; 984 Poll::Ready(Ok(StreamResult::Cancelled)) 985 } else { 986 log::debug!("stream consume: wait {}", self.0); 987 state.waker = Some(cx.waker().clone()); 988 Poll::Pending 989 } 990 } 991 HostStreamConsumerStateKind::Consuming(amt) => { 992 // The writer is performing a zero-length write. We always 993 // complete that without updating our own state. 994 if remaining == 0 { 995 log::debug!("stream consume: completing zero-length write {}", self.0); 996 return Poll::Ready(Ok(StreamResult::Completed)); 997 } 998 999 // If this is a zero-length read then block the writer but update our own state. 1000 if amt == 0 { 1001 log::debug!("stream consume: finishing zero-length read {}", self.0); 1002 state.kind = HostStreamConsumerStateKind::Consumed(Vec::new()); 1003 state.waker = Some(cx.waker().clone()); 1004 return Poll::Pending; 1005 } 1006 1007 // For non-zero sizes perform the read/copy. 1008 log::debug!("stream consume: done {}", self.0); 1009 let mut dst = Vec::with_capacity(amt as usize); 1010 source.read(&mut store, &mut dst).unwrap(); 1011 let state = &mut store 1012 .data_mut() 1013 .host_stream_consumers 1014 .get_mut(&self.0) 1015 .unwrap() 1016 .0; 1017 state.kind = HostStreamConsumerStateKind::Consumed(dst); 1018 state.waker = None; 1019 Poll::Ready(Ok(StreamResult::Completed)) 1020 } 1021 } 1022 } 1023 } 1024 1025 impl Drop for HostStreamConsumer { 1026 fn drop(&mut self) { 1027 log::debug!("stream consume: drop {}", self.0); 1028 } 1029 } 1030 1031 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 1032 1033 #[derive(Debug)] 1034 struct HostStreamProducerState { 1035 kind: HostStreamProducerStateKind, 1036 waker: Option<Waker>, 1037 } 1038 1039 #[derive(Debug)] 1040 enum HostStreamProducerStateKind { 1041 Idle, 1042 Writing(Vec<u32>), 1043 Wrote(u32), 1044 } 1045 1046 impl HostStreamProducerState { 1047 fn idle() -> Self { 1048 HostStreamProducerState { 1049 kind: HostStreamProducerStateKind::Idle, 1050 waker: None, 1051 } 1052 } 1053 1054 fn wake_by_ref(&mut self) { 1055 if let Some(waker) = self.waker.take() { 1056 waker.wake(); 1057 } 1058 } 1059 } 1060 1061 impl StreamProducer<Data> for HostStreamProducer { 1062 type Item = u32; 1063 type Buffer = VecBuffer<u32>; 1064 1065 fn poll_produce( 1066 self: Pin<&mut Self>, 1067 cx: &mut Context<'_>, 1068 mut store: StoreContextMut<'_, Data>, 1069 mut dst: Destination<'_, Self::Item, Self::Buffer>, 1070 finish: bool, 1071 ) -> Poll<Result<StreamResult>> { 1072 let remaining = dst.remaining(&mut store); 1073 let data = store.data_mut(); 1074 let state = match data.host_stream_producers.get_mut(&self.0) { 1075 Some((state, _)) => state, 1076 None => { 1077 log::debug!("stream produce: dropped {}", self.0); 1078 return Poll::Ready(Ok(StreamResult::Dropped)); 1079 } 1080 }; 1081 match &mut state.kind { 1082 HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => { 1083 if finish { 1084 log::debug!("stream produce: cancel {}", self.0); 1085 state.waker = None; 1086 Poll::Ready(Ok(StreamResult::Cancelled)) 1087 } else { 1088 log::debug!("stream produce: wait {}", self.0); 1089 state.waker = Some(cx.waker().clone()); 1090 Poll::Pending 1091 } 1092 } 1093 HostStreamProducerStateKind::Writing(buf) => { 1094 // Keep the other side blocked for a zero-length write 1095 // originated from the host. 1096 if buf.len() == 0 { 1097 log::debug!("stream produce: zero-length write {}", self.0); 1098 state.kind = HostStreamProducerStateKind::Wrote(0); 1099 state.waker = Some(cx.waker().clone()); 1100 return Poll::Pending; 1101 } 1102 log::debug!("stream produce: write {}", self.0); 1103 match remaining { 1104 Some(amt) => { 1105 // If the guest is doing a zero-length read then we've 1106 // got some data for them. Complete the read but leave 1107 // ourselves in the same `Writing` state as before. 1108 if amt == 0 { 1109 state.waker = None; 1110 return Poll::Ready(Ok(StreamResult::Completed)); 1111 } 1112 1113 // Don't let wasmtime buffer up data for us, so truncate 1114 // the buffer we're sending over to the amount that the 1115 // reader is requesting. 1116 if amt < buf.len() { 1117 buf.truncate(amt); 1118 } 1119 } 1120 1121 // At this time host<->host stream reads/writes aren't 1122 // fuzzed since that brings up a bunch of weird edge cases 1123 // which aren't fun to deal with and aren't interesting 1124 // either. 1125 None => unreachable!(), 1126 } 1127 let count = buf.len() as u32; 1128 dst.set_buffer(mem::take(buf).into()); 1129 state.kind = HostStreamProducerStateKind::Wrote(count); 1130 state.waker = None; 1131 Poll::Ready(Ok(StreamResult::Completed)) 1132 } 1133 } 1134 } 1135 } 1136 1137 impl Drop for HostStreamProducer { 1138 fn drop(&mut self) { 1139 log::debug!("stream produce: drop {}", self.0); 1140 } 1141 } 1142 1143 struct SharedStream(Scope); 1144 1145 impl SharedStream { 1146 async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> { 1147 std::future::poll_fn(|cx| { 1148 accessor.with(|mut store| { 1149 self.poll(cx, store.as_context_mut(), false) 1150 .map(|pair| match pair { 1151 (None, StreamResult::Dropped) => None, 1152 (Some(item), StreamResult::Completed) => Some(item), 1153 _ => unreachable!(), 1154 }) 1155 }) 1156 }) 1157 .await 1158 } 1159 1160 fn poll( 1161 &mut self, 1162 cx: &mut Context<'_>, 1163 mut store: StoreContextMut<'_, Data>, 1164 finish: bool, 1165 ) -> Poll<(Option<Command>, StreamResult)> { 1166 let data = store.data_mut(); 1167 1168 // If no more commands remain then this is a closed and dropped stream. 1169 let Some((scope, command)) = data.commands.last_mut() else { 1170 log::debug!("Stream closed: {:?}", self.0); 1171 return Poll::Ready((None, StreamResult::Dropped)); 1172 }; 1173 1174 // If the next queued up command is for the scope that this stream is 1175 // attached to then send off the command. 1176 if *scope == self.0 { 1177 let ret = Some(*command); 1178 1179 // All commands are followed up with an "ack", and after the "ack" 1180 // is delivered then the command is popped to move on to the next 1181 // command. The reason for this is to guarantee that a command has 1182 // been processed before moving on to the next command. This helps 1183 // make the fuzzing easier to work with by being able to implicitly 1184 // assume that a command has been processed by the time something 1185 // else is. Otherwise it might be possible that wasmtime has a set 1186 // of commands/callbacks that are all delivered at the same time and 1187 // the component model doesn't specify what order they happen 1188 // within. By forcing an "ack" it ensures a more expected ordering 1189 // of execution to assist with fuzzing without losing really all 1190 // that much coverage. 1191 if matches!(command, Command::Ack) { 1192 data.commands.pop(); 1193 } else { 1194 *command = Command::Ack; 1195 } 1196 1197 // After a command was popped other streams may be able to make 1198 // progress so wake them all up. 1199 for (_, waker) in data.wakers.drain() { 1200 waker.wake(); 1201 } 1202 log::debug!("Delivering command {ret:?} for {:?}", self.0); 1203 return Poll::Ready((ret, StreamResult::Completed)); 1204 } 1205 1206 // The command queue is non-empty and the next command isn't meant for 1207 // us, so someone else needs to drain the queue. Enqueue our waker. 1208 if finish { 1209 Poll::Ready((None, StreamResult::Cancelled)) 1210 } else { 1211 data.wakers.insert(self.0, cx.waker().clone()); 1212 Poll::Pending 1213 } 1214 } 1215 } 1216 1217 impl StreamProducer<Data> for SharedStream { 1218 type Item = Command; 1219 type Buffer = Option<Command>; 1220 1221 fn poll_produce<'a>( 1222 mut self: Pin<&mut Self>, 1223 cx: &mut Context<'_>, 1224 store: StoreContextMut<'a, Data>, 1225 mut destination: Destination<'a, Self::Item, Self::Buffer>, 1226 finish: bool, 1227 ) -> Poll<Result<StreamResult>> { 1228 let (item, result) = std::task::ready!(self.poll(cx, store, finish)); 1229 destination.set_buffer(item); 1230 Poll::Ready(Ok(result)) 1231 } 1232 } 1233 1234 #[cfg(test)] 1235 mod tests { 1236 use super::{ComponentAsync, Scope, init, run}; 1237 use crate::oracles::component_async::types::*; 1238 use crate::test::test_n_times; 1239 use Scope::*; 1240 1241 #[test] 1242 fn smoke() { 1243 init(); 1244 1245 test_n_times(50, |c, _| { 1246 run(c); 1247 Ok(()) 1248 }); 1249 } 1250 1251 // ======================================================================== 1252 // A series of fuzz-generated test cases which caused problems during the 1253 // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer 1254 // changes over time. 1255 1256 #[test] 1257 fn simple() { 1258 init(); 1259 1260 run(ComponentAsync { 1261 commands: vec![ 1262 (GuestCaller, Command::AsyncPendingImportCall(0)), 1263 (GuestCallee, Command::AsyncPendingImportCall(1)), 1264 (GuestCallee, Command::AsyncPendingExportComplete(0)), 1265 (GuestCaller, Command::AsyncPendingImportAssertReady(0)), 1266 (GuestCaller, Command::AsyncPendingImportCall(2)), 1267 ], 1268 }); 1269 } 1270 1271 #[test] 1272 fn somewhat_larger() { 1273 static COMMANDS: &[(Scope, Command)] = &[ 1274 (GuestCallee, Command::FutureNew(0)), 1275 (HostCaller, Command::FutureNew(1)), 1276 (GuestCallee, Command::FutureReadPending(0)), 1277 (GuestCaller, Command::AsyncPendingImportCall(2)), 1278 (GuestCaller, Command::AsyncPendingImportCall(3)), 1279 (GuestCaller, Command::AsyncPendingImportCall(4)), 1280 (GuestCaller, Command::AsyncPendingImportCall(5)), 1281 (GuestCallee, Command::AsyncPendingExportComplete(5)), 1282 (GuestCallee, Command::AsyncPendingExportComplete(3)), 1283 (GuestCallee, Command::AsyncPendingExportComplete(4)), 1284 (GuestCallee, Command::AsyncPendingExportComplete(2)), 1285 (GuestCaller, Command::AsyncPendingImportCall(6)), 1286 (GuestCallee, Command::AsyncPendingExportComplete(6)), 1287 (GuestCaller, Command::AsyncPendingImportCall(7)), 1288 (GuestCallee, Command::AsyncPendingExportComplete(7)), 1289 (GuestCaller, Command::AsyncPendingImportCall(8)), 1290 (GuestCallee, Command::AsyncPendingExportComplete(8)), 1291 (GuestCaller, Command::AsyncPendingImportCall(9)), 1292 (GuestCallee, Command::AsyncPendingExportComplete(9)), 1293 (GuestCaller, Command::AsyncPendingImportCall(10)), 1294 (GuestCallee, Command::AsyncPendingExportComplete(10)), 1295 (GuestCaller, Command::AsyncPendingImportCall(11)), 1296 (GuestCallee, Command::AsyncPendingExportComplete(11)), 1297 (GuestCaller, Command::AsyncPendingImportCall(12)), 1298 (GuestCallee, Command::AsyncPendingExportComplete(12)), 1299 (GuestCaller, Command::AsyncPendingImportCall(13)), 1300 (GuestCallee, Command::AsyncPendingExportComplete(13)), 1301 (GuestCaller, Command::AsyncPendingImportCall(14)), 1302 (GuestCallee, Command::AsyncPendingExportComplete(14)), 1303 (GuestCaller, Command::AsyncPendingImportCall(15)), 1304 (GuestCallee, Command::AsyncPendingExportComplete(15)), 1305 (GuestCaller, Command::AsyncPendingImportCall(16)), 1306 (GuestCallee, Command::AsyncPendingExportComplete(16)), 1307 (GuestCaller, Command::AsyncPendingImportCall(17)), 1308 (GuestCallee, Command::AsyncPendingExportComplete(17)), 1309 (GuestCaller, Command::AsyncPendingImportCall(18)), 1310 (GuestCallee, Command::AsyncPendingExportComplete(18)), 1311 (GuestCaller, Command::AsyncPendingImportCall(19)), 1312 (GuestCallee, Command::AsyncPendingExportComplete(19)), 1313 (GuestCaller, Command::AsyncPendingImportCall(20)), 1314 (GuestCallee, Command::AsyncPendingExportComplete(20)), 1315 (GuestCaller, Command::AsyncPendingImportCall(21)), 1316 (GuestCallee, Command::AsyncPendingExportComplete(21)), 1317 (GuestCaller, Command::AsyncPendingImportCall(22)), 1318 (GuestCallee, Command::AsyncPendingExportComplete(22)), 1319 (GuestCaller, Command::AsyncPendingImportCall(23)), 1320 (GuestCallee, Command::AsyncPendingExportComplete(23)), 1321 (GuestCaller, Command::AsyncPendingImportCall(24)), 1322 (GuestCallee, Command::AsyncPendingExportComplete(24)), 1323 (GuestCaller, Command::AsyncPendingImportCall(25)), 1324 (GuestCallee, Command::AsyncPendingExportComplete(25)), 1325 (GuestCaller, Command::AsyncPendingImportCall(26)), 1326 (GuestCallee, Command::AsyncPendingExportComplete(26)), 1327 (GuestCaller, Command::AsyncPendingImportCall(27)), 1328 (GuestCallee, Command::AsyncPendingExportComplete(27)), 1329 (GuestCaller, Command::AsyncPendingImportCall(28)), 1330 (GuestCallee, Command::AsyncPendingExportComplete(28)), 1331 (GuestCaller, Command::AsyncPendingImportCall(29)), 1332 (GuestCallee, Command::AsyncPendingExportComplete(29)), 1333 (GuestCaller, Command::AsyncPendingImportCall(30)), 1334 (GuestCallee, Command::AsyncPendingExportComplete(30)), 1335 (GuestCaller, Command::AsyncPendingImportCall(31)), 1336 (GuestCallee, Command::AsyncPendingExportComplete(31)), 1337 (GuestCaller, Command::AsyncPendingImportCall(32)), 1338 (GuestCallee, Command::AsyncPendingExportComplete(32)), 1339 (GuestCaller, Command::AsyncPendingImportCall(33)), 1340 (GuestCallee, Command::AsyncPendingExportComplete(33)), 1341 (GuestCaller, Command::AsyncPendingImportCall(34)), 1342 (GuestCallee, Command::AsyncPendingExportComplete(34)), 1343 (GuestCaller, Command::AsyncPendingImportCall(35)), 1344 (GuestCallee, Command::AsyncPendingExportComplete(35)), 1345 (GuestCaller, Command::AsyncPendingImportCall(36)), 1346 (GuestCallee, Command::AsyncPendingExportComplete(36)), 1347 (GuestCaller, Command::AsyncPendingImportCall(37)), 1348 (GuestCallee, Command::AsyncPendingExportComplete(37)), 1349 (GuestCaller, Command::AsyncPendingImportAssertReady(36)), 1350 ]; 1351 init(); 1352 1353 run(ComponentAsync { 1354 commands: COMMANDS.to_vec(), 1355 }); 1356 } 1357 1358 #[test] 1359 fn simple_stream1() { 1360 init(); 1361 1362 run(ComponentAsync { 1363 commands: vec![ 1364 (HostCallee, Command::StreamNew(1)), 1365 ( 1366 HostCallee, 1367 Command::StreamReadPending(StreamReadPayload { 1368 stream: 1, 1369 count: 2, 1370 }), 1371 ), 1372 (HostCallee, Command::StreamCancelRead(1)), 1373 (GuestCaller, Command::SyncReadyCall), 1374 ( 1375 HostCallee, 1376 Command::StreamWritePending(StreamWritePayload { 1377 stream: 1, 1378 item: 3, 1379 count: 2, 1380 }), 1381 ), 1382 (HostCallee, Command::StreamCancelWrite(1)), 1383 (HostCallee, Command::StreamDropWritable(1)), 1384 ( 1385 HostCallee, 1386 Command::StreamReadDropped(StreamReadPayload { 1387 stream: 1, 1388 count: 1, 1389 }), 1390 ), 1391 ], 1392 }); 1393 } 1394 1395 #[test] 1396 fn simple_stream3() { 1397 init(); 1398 1399 run(ComponentAsync { 1400 commands: vec![ 1401 (GuestCaller, Command::StreamNew(26)), 1402 ( 1403 GuestCaller, 1404 Command::StreamReadPending(StreamReadPayload { 1405 stream: 26, 1406 count: 10, 1407 }), 1408 ), 1409 (GuestCaller, Command::StreamDropWritable(26)), 1410 (GuestCaller, Command::StreamReadAssertDropped(26)), 1411 ], 1412 }); 1413 } 1414 1415 #[test] 1416 fn simple_stream4() { 1417 init(); 1418 1419 run(ComponentAsync { 1420 commands: vec![ 1421 (GuestCaller, Command::StreamNew(23)), 1422 ( 1423 GuestCaller, 1424 Command::StreamWritePending(StreamWritePayload { 1425 stream: 23, 1426 item: 24, 1427 count: 2, 1428 }), 1429 ), 1430 (GuestCaller, Command::StreamGive(23)), 1431 (GuestCallee, Command::StreamDropReadable(23)), 1432 ( 1433 GuestCaller, 1434 Command::StreamWriteAssertDropped(StreamReadPayload { 1435 stream: 23, 1436 count: 0, 1437 }), 1438 ), 1439 ], 1440 }); 1441 } 1442 1443 #[test] 1444 fn zero_length_behavior() { 1445 init(); 1446 1447 run(ComponentAsync { 1448 commands: vec![ 1449 (GuestCaller, Command::StreamNew(10)), 1450 (HostCaller, Command::StreamTake(10)), 1451 ( 1452 GuestCaller, 1453 Command::StreamWritePending(StreamWritePayload { 1454 stream: 10, 1455 item: 13, 1456 count: 5, 1457 }), 1458 ), 1459 ( 1460 HostCaller, 1461 Command::StreamReadReady(StreamReadyPayload { 1462 stream: 10, 1463 item: 0, 1464 op_count: 0, 1465 ready_count: 0, 1466 }), 1467 ), 1468 ( 1469 HostCaller, 1470 Command::StreamReadReady(StreamReadyPayload { 1471 stream: 10, 1472 item: 0, 1473 op_count: 0, 1474 ready_count: 0, 1475 }), 1476 ), 1477 ], 1478 }); 1479 } 1480 } 1481