1 //! For a high-level overview of this fuzz target see `fuzz_async.rs` 2 3 use crate::block_on; 4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest; 5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command}; 6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types; 7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope}; 8 use futures::channel::oneshot; 9 use std::collections::{HashMap, HashSet}; 10 use std::mem; 11 use std::pin::Pin; 12 use std::sync::{Arc, OnceLock, Weak}; 13 use std::task::{Context, Poll, Waker}; 14 use std::time::Instant; 15 use wasmtime::component::{ 16 Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer, 17 FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, 18 StreamReader, StreamResult, VecBuffer, 19 }; 20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut}; 21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView}; 22 23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new(); 24 25 /// Initializes state for future fuzz runs. 26 /// 27 /// This will create an `Engine` to run this fuzzer within and it will 28 /// additionally precompile the component that will be used for fuzzing. 29 /// 30 /// There are a few points of note about this: 31 /// 32 /// * The `misc` fuzzer is manually instrumented with this function as the init 33 /// hook to ensure this runs before any other fuzzing. 34 /// 35 /// * Compilation of the component takes quite some time with 36 /// fuzzing-instrumented Cranelift. To assist with local development this 37 /// implements a cache which is serialized/deserialized via an env var. 38 pub fn init() { 39 crate::init_fuzzing(); 40 41 STATE.get_or_init(|| { 42 let mut config = Config::new(); 43 config.wasm_component_model_async(true); 44 config.async_support(true); 45 let engine = Engine::new(&config).unwrap(); 46 let component = compile(&engine); 47 let mut linker = Linker::new(&engine); 48 wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap(); 49 async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 50 types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap(); 51 52 let pre = linker.instantiate_pre(&component).unwrap(); 53 let pre = FuzzAsyncPre::new(pre).unwrap(); 54 55 (engine, pre) 56 }); 57 58 fn compile(engine: &Engine) -> Component { 59 let wasm = test_programs_artifacts::FUZZ_ASYNC_COMPONENT; 60 let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok(); 61 if let Some(path) = &cwasm_cache 62 && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) 63 && let Ok(wasm_mtime) = std::fs::metadata(wasm).and_then(|m| m.modified()) 64 && cwasm_mtime > wasm_mtime 65 { 66 log::debug!("Using cached component async cwasm at {path}"); 67 unsafe { 68 return Component::deserialize_file(engine, path).unwrap(); 69 } 70 } 71 72 let composition = { 73 let mut config = wasm_compose::config::Config::default(); 74 let tempdir = tempfile::TempDir::new().unwrap(); 75 let path = tempdir.path().join("fuzz-async.wasm"); 76 std::fs::copy(wasm, &path).unwrap(); 77 config.definitions.push(path.clone()); 78 79 wasm_compose::composer::ComponentComposer::new(&path, &config) 80 .compose() 81 .unwrap() 82 }; 83 let start = Instant::now(); 84 let component = Component::new(&engine, &composition).unwrap(); 85 if let Some(path) = cwasm_cache { 86 log::debug!("Caching component async cwasm to {path}"); 87 std::fs::write(path, &component.serialize().unwrap()).unwrap(); 88 } else if start.elapsed() > std::time::Duration::from_secs(1) { 89 eprintln!( 90 " 91 !!!!!!!!!!!!!!!!!!!!!!!!!! 92 93 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to 94 cache compilation results 95 96 !!!!!!!!!!!!!!!!!!!!!!!!!! 97 " 98 ); 99 } 100 return component; 101 } 102 } 103 104 #[derive(Default)] 105 struct Data { 106 ctx: WasiCtx, 107 table: ResourceTable, 108 wakers: HashMap<Scope, Waker>, 109 commands: Vec<(Scope, Command)>, 110 111 guest_caller_stream: Option<StreamReader<Command>>, 112 guest_callee_stream: Option<StreamReader<Command>>, 113 114 host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>, 115 host_pending_async_calls_cancelled: HashSet<u32>, 116 guest_pending_async_calls_ready: HashSet<u32>, 117 118 // State of futures/streams. Note that while #12091 is unresolved an 119 // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams 120 // and the various halves we're interacting with using traits. 121 host_futures: HashMap<u32, FutureReader<u32>>, 122 host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>, 123 host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>, 124 host_streams: HashMap<u32, StreamReader<u32>>, 125 host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>, 126 host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>, 127 } 128 129 impl WasiView for Data { 130 fn ctx(&mut self) -> WasiCtxView<'_> { 131 WasiCtxView { 132 ctx: &mut self.ctx, 133 table: &mut self.table, 134 } 135 } 136 } 137 138 impl async_test::HostWithStore for HasSelf<Data> { 139 async fn async_ready<T>(_store: &Accessor<T, Self>) {} 140 141 async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) { 142 let (tx, rx) = oneshot::channel(); 143 store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx)); 144 let record = RecordCancelOnDrop { store, id }; 145 rx.await.unwrap(); 146 mem::forget(record); 147 148 struct RecordCancelOnDrop<'a, T: 'static> { 149 store: &'a Accessor<T, HasSelf<Data>>, 150 id: u32, 151 } 152 153 impl<T> Drop for RecordCancelOnDrop<'_, T> { 154 fn drop(&mut self) { 155 self.store.with(|mut s| { 156 s.get().host_pending_async_calls_cancelled.insert(self.id); 157 }); 158 } 159 } 160 } 161 162 async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {} 163 } 164 165 impl async_test::Host for Data { 166 fn sync_ready(&mut self) {} 167 168 fn future_take(&mut self, id: u32) -> FutureReader<u32> { 169 self.host_futures.remove(&id).unwrap() 170 } 171 172 fn future_receive(&mut self, id: u32, future: FutureReader<u32>) { 173 let prev = self.host_futures.insert(id, future); 174 assert!(prev.is_none()); 175 } 176 177 fn stream_take(&mut self, id: u32) -> StreamReader<u32> { 178 self.host_streams.remove(&id).unwrap() 179 } 180 181 fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) { 182 let prev = self.host_streams.insert(id, stream); 183 assert!(prev.is_none()); 184 } 185 } 186 187 impl types::HostWithStore for HasSelf<Data> { 188 fn get_commands<T>( 189 mut store: Access<'_, T, Self>, 190 scope: types::Scope, 191 ) -> StreamReader<Command> { 192 let data = store.get(); 193 match scope { 194 types::Scope::Caller => data.guest_caller_stream.take().unwrap(), 195 types::Scope::Callee => data.guest_callee_stream.take().unwrap(), 196 } 197 } 198 } 199 200 impl types::Host for Data {} 201 202 /// Executes the `input` provided, assuming that `init` has been previously 203 /// executed. 204 pub fn run(mut input: ComponentAsync) { 205 log::debug!("Running component async fuzz test with\n{input:?}"); 206 207 // Commands are executed in the order that they're listed in the input, but 208 // to make it easier on the `StreamProducer` implementation below they're 209 // popped off the back. To ensure that they're all delivered in the right 210 // order reverse the list to ensure the correct order is maintained. 211 input.commands.reverse(); 212 213 let (engine, pre) = STATE.get().unwrap(); 214 let mut store = Store::new( 215 engine, 216 Data { 217 ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(), 218 commands: input.commands, 219 ..Data::default() 220 }, 221 ); 222 223 let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)); 224 let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)); 225 store.data_mut().guest_caller_stream = Some(guest_caller_stream); 226 store.data_mut().guest_callee_stream = Some(guest_callee_stream); 227 block_on(async { 228 let instance = pre.instantiate_async(&mut store).await.unwrap(); 229 let test = instance.wasmtime_fuzz_fuzz_async_test(); 230 231 let mut host_caller = SharedStream(Scope::HostCaller); 232 let mut host_callee = SharedStream(Scope::HostCallee); 233 store 234 .run_concurrent(async |store| { 235 // Kick off stream reads in the guest. This function will return 236 // but the tasks in the guest will keep running after they 237 // return to process stream items. 238 test.call_init(store, types::Scope::Caller).await.unwrap(); 239 240 // Simultaneously process commands from both host streams. These 241 // will return once the entire command queue is exhausted. 242 futures::join!( 243 async { 244 while let Some(cmd) = host_caller.next(store).await { 245 host_caller_cmd(&test, store, cmd).await; 246 } 247 }, 248 async { 249 while let Some(cmd) = host_callee.next(store).await { 250 host_callee_cmd(store, cmd).await; 251 } 252 }, 253 ); 254 255 // Note that there may still be pending async work in the guest 256 // (or host). It's intentional that it's not cleaned up here to 257 // help test situations where async work is all abruptly 258 // cancelled by just being dropped in the host. 259 }) 260 .await 261 .unwrap(); 262 }); 263 } 264 265 /// See documentation in `fuzz_async.rs` for what's going on here. 266 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool 267 where 268 F: FnMut(&mut Data) -> bool, 269 { 270 for _ in 0..1000 { 271 let ready = store.with(|mut s| f(s.get())); 272 if ready { 273 return true; 274 } 275 276 crate::YieldN(1).await; 277 } 278 279 return false; 280 } 281 282 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F) 283 where 284 F: FnMut(&mut Data) -> bool, 285 { 286 assert!( 287 test_property(store, f).await, 288 "timed out waiting for {desc}", 289 ); 290 } 291 292 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) { 293 match cmd { 294 Command::Ack => {} 295 Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(), 296 Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(), 297 Command::AsyncPendingExportComplete(_i) => todo!(), 298 Command::AsyncPendingExportAssertCancelled(_i) => todo!(), 299 Command::AsyncPendingImportCall(i) => { 300 struct RunPendingImport { 301 test: Guest, 302 i: u32, 303 } 304 305 store.spawn(RunPendingImport { 306 test: test.clone(), 307 i, 308 }); 309 310 impl AccessorTask<Data> for RunPendingImport { 311 async fn run(self, store: &Accessor<Data>) -> Result<()> { 312 self.test.call_async_pending(store, self.i).await?; 313 store.with(|mut s| { 314 s.get().guest_pending_async_calls_ready.insert(self.i); 315 }); 316 Ok(()) 317 } 318 } 319 } 320 Command::AsyncPendingImportCancel(_i) => todo!(), 321 Command::AsyncPendingImportAssertReady(i) => { 322 assert!( 323 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await, 324 "expected async_pending import {i} to be ready", 325 ); 326 } 327 328 Command::FutureTake(i) => { 329 let future = test.call_future_take(store, i).await.unwrap(); 330 store.with(|mut s| { 331 let prev = s.get().host_futures.insert(i, future); 332 assert!(prev.is_none()); 333 }); 334 } 335 Command::FutureGive(i) => { 336 let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap()); 337 test.call_future_receive(store, i, future).await.unwrap(); 338 } 339 Command::StreamTake(i) => { 340 let stream = test.call_stream_take(store, i).await.unwrap(); 341 store.with(|mut s| { 342 let prev = s.get().host_streams.insert(i, stream); 343 assert!(prev.is_none()); 344 }); 345 } 346 Command::StreamGive(i) => { 347 let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap()); 348 test.call_stream_receive(store, i, stream).await.unwrap(); 349 } 350 351 other => future_or_stream_cmd(store, other).await, 352 } 353 } 354 355 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) { 356 match cmd { 357 Command::Ack => {} 358 Command::SyncReadyCall => todo!(), 359 Command::AsyncReadyCall => todo!(), 360 Command::AsyncPendingExportComplete(i) => store.with(|mut s| { 361 s.get() 362 .host_pending_async_calls 363 .remove(&i) 364 .unwrap() 365 .send(()) 366 .unwrap(); 367 }), 368 Command::AsyncPendingExportAssertCancelled(i) => { 369 assert!( 370 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await, 371 "expected async_pending export {i} to be cancelled", 372 ); 373 } 374 Command::AsyncPendingImportCall(_i) => todo!(), 375 Command::AsyncPendingImportCancel(_i) => todo!(), 376 Command::AsyncPendingImportAssertReady(_i) => todo!(), 377 378 other => future_or_stream_cmd(store, other).await, 379 } 380 } 381 382 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) { 383 match cmd { 384 // These commands should be handled above 385 Command::Ack 386 | Command::SyncReadyCall 387 | Command::AsyncReadyCall 388 | Command::AsyncPendingExportComplete(_) 389 | Command::AsyncPendingExportAssertCancelled(_) 390 | Command::AsyncPendingImportCall(_) 391 | Command::AsyncPendingImportCancel(_) 392 | Command::FutureTake(_) 393 | Command::FutureGive(_) 394 | Command::StreamTake(_) 395 | Command::StreamGive(_) 396 | Command::AsyncPendingImportAssertReady(_) => unreachable!(), 397 398 Command::FutureNew(id) => { 399 store.with(|mut s| { 400 let arc = Arc::new(()); 401 let weak = Arc::downgrade(&arc); 402 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)); 403 let data = s.get(); 404 let prev = data.host_futures.insert(id, future); 405 assert!(prev.is_none()); 406 let prev = data 407 .host_future_producers 408 .insert(id, (HostFutureProducerState::Idle, weak)); 409 assert!(prev.is_none()); 410 }); 411 } 412 Command::FutureDropReadable(id) => { 413 store.with(|mut s| match s.get().host_futures.remove(&id) { 414 Some(mut future) => future.close(&mut s), 415 None => { 416 let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap(); 417 state.wake_by_ref(); 418 } 419 }) 420 } 421 Command::FutureWriteReady(payload) => { 422 await_property(store, "future write should be waiting", |s| { 423 matches!( 424 s.host_future_producers.get(&payload.future), 425 Some((HostFutureProducerState::Waiting(_), _)) 426 ) 427 }) 428 .await; 429 store.with(|mut s| { 430 let state = s 431 .get() 432 .host_future_producers 433 .get_mut(&payload.future) 434 .unwrap(); 435 match state { 436 (HostFutureProducerState::Waiting(waker), _) => { 437 waker.wake_by_ref(); 438 state.0 = HostFutureProducerState::Writing(payload.item); 439 } 440 (state, _) => panic!("future not waiting: {state:?}"), 441 } 442 }) 443 } 444 Command::FutureWritePending(payload) => store.with(|mut s| { 445 let state = s 446 .get() 447 .host_future_producers 448 .get_mut(&payload.future) 449 .unwrap(); 450 match state { 451 (HostFutureProducerState::Idle, _) => { 452 state.0 = HostFutureProducerState::Writing(payload.item); 453 } 454 _ => panic!("future not idle"), 455 } 456 }), 457 Command::FutureWriteDropped(id) => store.with(|mut s| { 458 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 459 assert!(matches!(state, HostFutureProducerState::Idle)); 460 assert!(weak.upgrade().is_none()); 461 }), 462 Command::FutureReadReady(payload) => { 463 let id = payload.future; 464 store.with(|mut s| { 465 let arc = Arc::new(()); 466 let weak = Arc::downgrade(&arc); 467 let data = s.get(); 468 let future = data.host_futures.remove(&id).unwrap(); 469 let prev = data 470 .host_future_consumers 471 .insert(id, (HostFutureConsumerState::Consuming, weak)); 472 assert!(prev.is_none()); 473 future.pipe(&mut s, HostFutureConsumer(id, arc)); 474 }); 475 476 await_property(store, "future should be present", |s| { 477 matches!( 478 s.host_future_consumers[&id], 479 (HostFutureConsumerState::Complete(_), _) 480 ) 481 }) 482 .await; 483 484 store.with(|mut s| { 485 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap(); 486 match state { 487 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 488 _ => panic!("future not complete"), 489 } 490 }); 491 } 492 Command::FutureReadPending(id) => { 493 ensure_future_reading(store, id); 494 store.with(|mut s| { 495 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 496 state.wake_by_ref(); 497 assert!( 498 matches!(state, HostFutureConsumerState::Idle), 499 "bad state: {state:?}", 500 ); 501 *state = HostFutureConsumerState::Consuming; 502 }) 503 } 504 Command::FutureCancelWrite(id) => store.with(|mut s| { 505 let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap(); 506 assert!(matches!(state, HostFutureProducerState::Writing(_))); 507 *state = HostFutureProducerState::Idle; 508 }), 509 Command::FutureCancelRead(id) => store.with(|mut s| { 510 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap(); 511 assert!(matches!(state, HostFutureConsumerState::Consuming)); 512 *state = HostFutureConsumerState::Idle; 513 }), 514 Command::FutureReadAssertComplete(payload) => { 515 await_property(store, "future read should be complete", |s| { 516 matches!( 517 s.host_future_consumers.get(&payload.future), 518 Some((HostFutureConsumerState::Complete(_), _)) 519 ) 520 }) 521 .await; 522 store.with(|mut s| { 523 let (state, _) = s 524 .get() 525 .host_future_consumers 526 .remove(&payload.future) 527 .unwrap(); 528 match state { 529 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item), 530 _ => panic!("future not complete"), 531 } 532 }) 533 } 534 Command::FutureWriteAssertComplete(id) => store.with(|mut s| { 535 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 536 assert!(matches!(state, HostFutureProducerState::Complete)); 537 assert!(weak.upgrade().is_none()); 538 }), 539 Command::FutureWriteAssertDropped(id) => store.with(|mut s| { 540 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap(); 541 assert!(matches!(state, HostFutureProducerState::Writing(_))); 542 assert!(weak.upgrade().is_none()); 543 }), 544 545 Command::StreamNew(id) => { 546 store.with(|mut s| { 547 let arc = Arc::new(()); 548 let weak = Arc::downgrade(&arc); 549 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)); 550 let data = s.get(); 551 let prev = data.host_streams.insert(id, stream); 552 assert!(prev.is_none()); 553 let prev = data 554 .host_stream_producers 555 .insert(id, (HostStreamProducerState::idle(), weak)); 556 assert!(prev.is_none()); 557 }); 558 } 559 Command::StreamDropReadable(id) => { 560 store.with(|mut s| match s.get().host_streams.remove(&id) { 561 Some(mut stream) => { 562 stream.close(&mut s); 563 } 564 None => { 565 let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap(); 566 state.wake_by_ref(); 567 } 568 }) 569 } 570 Command::StreamDropWritable(id) => store.with(|mut s| { 571 let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap(); 572 state.wake_by_ref(); 573 }), 574 Command::StreamWriteReady(payload) => { 575 let id = payload.stream; 576 store.with(|mut s| { 577 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 578 state.wake_by_ref(); 579 match state.kind { 580 HostStreamProducerStateKind::Idle => { 581 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 582 payload.item, 583 payload.op_count, 584 )); 585 } 586 _ => panic!("stream not idle: {state:?}"), 587 } 588 }); 589 await_property(store, "stream should complete a write", |s| { 590 matches!( 591 s.host_stream_producers[&id].0.kind, 592 HostStreamProducerStateKind::Wrote(_), 593 ) 594 }) 595 .await; 596 store.with(|mut s| { 597 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 598 match state.kind { 599 HostStreamProducerStateKind::Wrote(amt) => { 600 assert_eq!(amt, payload.ready_count); 601 state.kind = HostStreamProducerStateKind::Idle; 602 } 603 _ => panic!("stream not idle: {state:?}"), 604 } 605 }); 606 } 607 Command::StreamReadReady(payload) => { 608 let id = payload.stream; 609 ensure_stream_reading(store, id); 610 store.with(|mut s| { 611 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 612 state.wake_by_ref(); 613 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count); 614 }); 615 await_property(store, "stream should complete a read", |s| { 616 matches!( 617 s.host_stream_consumers[&id].0.kind, 618 HostStreamConsumerStateKind::Consumed(_), 619 ) 620 }) 621 .await; 622 623 store.with(|mut s| { 624 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 625 match &state.kind { 626 HostStreamConsumerStateKind::Consumed(last_read) => { 627 assert_eq!( 628 *last_read, 629 stream_payload(payload.item, payload.ready_count) 630 ); 631 state.kind = HostStreamConsumerStateKind::Idle; 632 } 633 _ => panic!("future not complete"), 634 } 635 }); 636 } 637 Command::StreamWritePending(payload) => store.with(|mut s| { 638 let (state, _) = s 639 .get() 640 .host_stream_producers 641 .get_mut(&payload.stream) 642 .unwrap(); 643 state.wake_by_ref(); 644 match state.kind { 645 HostStreamProducerStateKind::Idle => { 646 state.kind = HostStreamProducerStateKind::Writing(stream_payload( 647 payload.item, 648 payload.count, 649 )); 650 } 651 _ => panic!("stream not idle {:?}", state.kind), 652 } 653 }), 654 Command::StreamReadPending(payload) => { 655 ensure_stream_reading(store, payload.stream); 656 store.with(|mut s| { 657 let (state, _) = s 658 .get() 659 .host_stream_consumers 660 .get_mut(&payload.stream) 661 .unwrap(); 662 state.wake_by_ref(); 663 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 664 state.kind = HostStreamConsumerStateKind::Consuming(payload.count); 665 }) 666 } 667 Command::StreamWriteDropped(payload) => store.with(|mut s| { 668 let (state, weak) = s 669 .get() 670 .host_stream_producers 671 .get_mut(&payload.stream) 672 .unwrap(); 673 assert!(matches!(state.kind, HostStreamProducerStateKind::Idle)); 674 assert!(weak.upgrade().is_none()); 675 }), 676 Command::StreamReadDropped(payload) => { 677 ensure_stream_reading(store, payload.stream); 678 await_property(store, "stream read should get dropped", |s| { 679 let weak = &s.host_stream_consumers[&payload.stream].1; 680 weak.upgrade().is_none() 681 }) 682 .await; 683 store.with(|mut s| { 684 let (state, weak) = s 685 .get() 686 .host_stream_consumers 687 .get_mut(&payload.stream) 688 .unwrap(); 689 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle)); 690 assert!(weak.upgrade().is_none()); 691 }) 692 } 693 Command::StreamCancelWrite(id) => store.with(|mut s| { 694 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap(); 695 assert!( 696 matches!(state.kind, HostStreamProducerStateKind::Writing(_)), 697 "invalid state {state:?}", 698 ); 699 state.kind = HostStreamProducerStateKind::Idle; 700 state.wake_by_ref(); 701 }), 702 Command::StreamCancelRead(id) => store.with(|mut s| { 703 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 704 assert!(matches!( 705 state.kind, 706 HostStreamConsumerStateKind::Consuming(_) 707 )); 708 state.kind = HostStreamConsumerStateKind::Idle; 709 }), 710 Command::StreamReadAssertComplete(payload) => store.with(|mut s| { 711 let (state, _) = s 712 .get() 713 .host_stream_consumers 714 .get_mut(&payload.stream) 715 .unwrap(); 716 match &state.kind { 717 HostStreamConsumerStateKind::Consumed(last_read) => { 718 assert_eq!(*last_read, stream_payload(payload.item, payload.count)); 719 state.kind = HostStreamConsumerStateKind::Idle; 720 } 721 _ => panic!("stream not complete"), 722 } 723 }), 724 Command::StreamWriteAssertComplete(payload) => store.with(|mut s| { 725 let (state, _) = s 726 .get() 727 .host_stream_producers 728 .get_mut(&payload.stream) 729 .unwrap(); 730 match state.kind { 731 HostStreamProducerStateKind::Wrote(amt) => { 732 assert_eq!(amt, payload.count); 733 state.kind = HostStreamProducerStateKind::Idle; 734 } 735 _ => panic!("stream not complete: {:?}", state.kind), 736 } 737 }), 738 Command::StreamWriteAssertDropped(payload) => { 739 await_property(store, "stream write should be dropped", |s| { 740 let weak = &s.host_stream_producers[&payload.stream].1; 741 weak.upgrade().is_none() 742 }) 743 .await; 744 store.with(|mut s| { 745 let (state, weak) = s 746 .get() 747 .host_stream_producers 748 .get_mut(&payload.stream) 749 .unwrap(); 750 assert!(matches!( 751 state.kind, 752 HostStreamProducerStateKind::Writing(_) 753 )); 754 assert!(weak.upgrade().is_none()); 755 }) 756 } 757 Command::StreamReadAssertDropped(id) => { 758 await_property(store, "stream read should be dropped", |s| { 759 let weak = &s.host_stream_consumers[&id].1; 760 weak.upgrade().is_none() 761 }) 762 .await; 763 store.with(|mut s| { 764 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap(); 765 assert!(matches!( 766 state.kind, 767 HostStreamConsumerStateKind::Consuming(_), 768 )); 769 assert!(weak.upgrade().is_none()); 770 }) 771 } 772 } 773 } 774 775 fn stream_payload(item: u32, count: u32) -> Vec<u32> { 776 (item..item + count).collect() 777 } 778 779 fn ensure_future_reading(store: &Accessor<Data>, id: u32) { 780 store.with(|mut s| { 781 let data = s.get(); 782 if !data.host_futures.contains_key(&id) { 783 return; 784 } 785 log::debug!("future consume: start {id}"); 786 let arc = Arc::new(()); 787 let weak = Arc::downgrade(&arc); 788 let data = s.get(); 789 let future = data.host_futures.remove(&id).unwrap(); 790 let prev = data 791 .host_future_consumers 792 .insert(id, (HostFutureConsumerState::Idle, weak)); 793 assert!(prev.is_none()); 794 future.pipe(&mut s, HostFutureConsumer(id, arc)); 795 }); 796 } 797 798 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) { 799 store.with(|mut s| { 800 let data = s.get(); 801 if !data.host_streams.contains_key(&id) { 802 return; 803 } 804 log::debug!("stream consume: start {id}"); 805 let arc = Arc::new(()); 806 let weak = Arc::downgrade(&arc); 807 let prev = data.host_stream_consumers.insert( 808 id, 809 ( 810 HostStreamConsumerState { 811 kind: HostStreamConsumerStateKind::Idle, 812 waker: None, 813 }, 814 weak, 815 ), 816 ); 817 assert!(prev.is_none()); 818 let stream = data.host_streams.remove(&id).unwrap(); 819 stream.pipe(&mut s, HostStreamConsumer(id, arc)); 820 }); 821 } 822 823 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 824 825 /// Note that this is only created once a read is actually initiated on a 826 /// future. It's also not possible to cancel a host-based read on a future, 827 /// hence why this is simpler than the `HostFutureProducerState` state below. 828 #[derive(Debug)] 829 enum HostFutureConsumerState { 830 Idle, 831 Waiting(Waker), 832 Consuming, 833 Complete(u32), 834 } 835 836 impl HostFutureConsumerState { 837 fn wake_by_ref(&mut self) { 838 if let HostFutureConsumerState::Waiting(waker) = &self { 839 waker.wake_by_ref(); 840 *self = HostFutureConsumerState::Idle; 841 } 842 } 843 } 844 845 impl FutureConsumer<Data> for HostFutureConsumer { 846 type Item = u32; 847 848 fn poll_consume( 849 self: Pin<&mut Self>, 850 cx: &mut Context<'_>, 851 mut store: StoreContextMut<'_, Data>, 852 mut source: Source<'_, Self::Item>, 853 finish: bool, 854 ) -> Poll<Result<()>> { 855 let state = match store.data_mut().host_future_consumers.get_mut(&self.0) { 856 Some(state) => state, 857 None => { 858 log::debug!("consume: closed {}", self.0); 859 return Poll::Ready(Ok(())); 860 } 861 }; 862 match state.0 { 863 HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => { 864 if finish { 865 log::debug!("consume: cancel {}", self.0); 866 state.0 = HostFutureConsumerState::Idle; 867 Poll::Ready(Ok(())) 868 } else { 869 log::debug!("consume: wait {}", self.0); 870 state.0 = HostFutureConsumerState::Waiting(cx.waker().clone()); 871 Poll::Pending 872 } 873 } 874 HostFutureConsumerState::Consuming => { 875 log::debug!("consume: done {}", self.0); 876 let mut item = None; 877 source.read(&mut store, &mut item).unwrap(); 878 store 879 .data_mut() 880 .host_future_consumers 881 .get_mut(&self.0) 882 .unwrap() 883 .0 = HostFutureConsumerState::Complete(item.unwrap()); 884 Poll::Ready(Ok(())) 885 } 886 HostFutureConsumerState::Complete(_) => unreachable!(), 887 } 888 } 889 } 890 891 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 892 893 #[derive(Debug)] 894 enum HostFutureProducerState { 895 Idle, 896 Waiting(Waker), 897 Writing(u32), 898 Complete, 899 } 900 901 impl FutureProducer<Data> for HostFutureProducer { 902 type Item = u32; 903 904 fn poll_produce( 905 self: Pin<&mut Self>, 906 cx: &mut Context<'_>, 907 mut store: StoreContextMut<'_, Data>, 908 finish: bool, 909 ) -> Poll<Result<Option<Self::Item>>> { 910 let state = store 911 .data_mut() 912 .host_future_producers 913 .get_mut(&self.0) 914 .unwrap(); 915 match state.0 { 916 HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => { 917 if finish { 918 log::debug!("produce: cancel {}", self.0); 919 state.0 = HostFutureProducerState::Idle; 920 Poll::Ready(Ok(None)) 921 } else { 922 log::debug!("produce: wait {}", self.0); 923 state.0 = HostFutureProducerState::Waiting(cx.waker().clone()); 924 Poll::Pending 925 } 926 } 927 HostFutureProducerState::Writing(item) => { 928 log::debug!("produce: done {}", self.0); 929 state.0 = HostFutureProducerState::Complete; 930 Poll::Ready(Ok(Some(item))) 931 } 932 HostFutureProducerState::Complete => unreachable!(), 933 } 934 } 935 } 936 937 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 938 939 #[derive(Debug)] 940 struct HostStreamConsumerState { 941 waker: Option<Waker>, 942 kind: HostStreamConsumerStateKind, 943 } 944 945 #[derive(Debug)] 946 enum HostStreamConsumerStateKind { 947 Idle, 948 Consuming(u32), 949 Consumed(Vec<u32>), 950 } 951 952 impl HostStreamConsumerState { 953 fn wake_by_ref(&mut self) { 954 if let Some(waker) = self.waker.take() { 955 waker.wake(); 956 } 957 } 958 } 959 960 impl StreamConsumer<Data> for HostStreamConsumer { 961 type Item = u32; 962 963 fn poll_consume( 964 self: Pin<&mut Self>, 965 cx: &mut Context<'_>, 966 mut store: StoreContextMut<'_, Data>, 967 mut source: Source<'_, Self::Item>, 968 finish: bool, 969 ) -> Poll<Result<StreamResult>> { 970 let remaining = source.remaining(&mut store); 971 let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) { 972 Some((state, _)) => state, 973 None => { 974 log::debug!("stream consume: dropped {}", self.0); 975 return Poll::Ready(Ok(StreamResult::Dropped)); 976 } 977 }; 978 match state.kind { 979 HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => { 980 if finish { 981 log::debug!("stream consume: cancel {}", self.0); 982 state.waker = None; 983 Poll::Ready(Ok(StreamResult::Cancelled)) 984 } else { 985 log::debug!("stream consume: wait {}", self.0); 986 state.waker = Some(cx.waker().clone()); 987 Poll::Pending 988 } 989 } 990 HostStreamConsumerStateKind::Consuming(amt) => { 991 // The writer is performing a zero-length write. We always 992 // complete that without updating our own state. 993 if remaining == 0 { 994 log::debug!("stream consume: completing zero-length write {}", self.0); 995 return Poll::Ready(Ok(StreamResult::Completed)); 996 } 997 998 // If this is a zero-length read then block the writer but update our own state. 999 if amt == 0 { 1000 log::debug!("stream consume: finishing zero-length read {}", self.0); 1001 state.kind = HostStreamConsumerStateKind::Consumed(Vec::new()); 1002 state.waker = Some(cx.waker().clone()); 1003 return Poll::Pending; 1004 } 1005 1006 // For non-zero sizes perform the read/copy. 1007 log::debug!("stream consume: done {}", self.0); 1008 let mut dst = Vec::with_capacity(amt as usize); 1009 source.read(&mut store, &mut dst).unwrap(); 1010 let state = &mut store 1011 .data_mut() 1012 .host_stream_consumers 1013 .get_mut(&self.0) 1014 .unwrap() 1015 .0; 1016 state.kind = HostStreamConsumerStateKind::Consumed(dst); 1017 state.waker = None; 1018 Poll::Ready(Ok(StreamResult::Completed)) 1019 } 1020 } 1021 } 1022 } 1023 1024 impl Drop for HostStreamConsumer { 1025 fn drop(&mut self) { 1026 log::debug!("stream consume: drop {}", self.0); 1027 } 1028 } 1029 1030 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>); 1031 1032 #[derive(Debug)] 1033 struct HostStreamProducerState { 1034 kind: HostStreamProducerStateKind, 1035 waker: Option<Waker>, 1036 } 1037 1038 #[derive(Debug)] 1039 enum HostStreamProducerStateKind { 1040 Idle, 1041 Writing(Vec<u32>), 1042 Wrote(u32), 1043 } 1044 1045 impl HostStreamProducerState { 1046 fn idle() -> Self { 1047 HostStreamProducerState { 1048 kind: HostStreamProducerStateKind::Idle, 1049 waker: None, 1050 } 1051 } 1052 1053 fn wake_by_ref(&mut self) { 1054 if let Some(waker) = self.waker.take() { 1055 waker.wake(); 1056 } 1057 } 1058 } 1059 1060 impl StreamProducer<Data> for HostStreamProducer { 1061 type Item = u32; 1062 type Buffer = VecBuffer<u32>; 1063 1064 fn poll_produce( 1065 self: Pin<&mut Self>, 1066 cx: &mut Context<'_>, 1067 mut store: StoreContextMut<'_, Data>, 1068 mut dst: Destination<'_, Self::Item, Self::Buffer>, 1069 finish: bool, 1070 ) -> Poll<Result<StreamResult>> { 1071 let remaining = dst.remaining(&mut store); 1072 let data = store.data_mut(); 1073 let state = match data.host_stream_producers.get_mut(&self.0) { 1074 Some((state, _)) => state, 1075 None => { 1076 log::debug!("stream produce: dropped {}", self.0); 1077 return Poll::Ready(Ok(StreamResult::Dropped)); 1078 } 1079 }; 1080 match &mut state.kind { 1081 HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => { 1082 if finish { 1083 log::debug!("stream produce: cancel {}", self.0); 1084 state.waker = None; 1085 Poll::Ready(Ok(StreamResult::Cancelled)) 1086 } else { 1087 log::debug!("stream produce: wait {}", self.0); 1088 state.waker = Some(cx.waker().clone()); 1089 Poll::Pending 1090 } 1091 } 1092 HostStreamProducerStateKind::Writing(buf) => { 1093 // Keep the other side blocked for a zero-length write 1094 // originated from the host. 1095 if buf.len() == 0 { 1096 log::debug!("stream produce: zero-length write {}", self.0); 1097 state.kind = HostStreamProducerStateKind::Wrote(0); 1098 state.waker = Some(cx.waker().clone()); 1099 return Poll::Pending; 1100 } 1101 log::debug!("stream produce: write {}", self.0); 1102 match remaining { 1103 Some(amt) => { 1104 // If the guest is doing a zero-length read then we've 1105 // got some data for them. Complete the read but leave 1106 // ourselves in the same `Writing` state as before. 1107 if amt == 0 { 1108 state.waker = None; 1109 return Poll::Ready(Ok(StreamResult::Completed)); 1110 } 1111 1112 // Don't let wasmtime buffer up data for us, so truncate 1113 // the buffer we're sending over to the amount that the 1114 // reader is requesting. 1115 if amt < buf.len() { 1116 buf.truncate(amt); 1117 } 1118 } 1119 1120 // At this time host<->host stream reads/writes aren't 1121 // fuzzed since that brings up a bunch of weird edge cases 1122 // which aren't fun to deal with and aren't interesting 1123 // either. 1124 None => unreachable!(), 1125 } 1126 let count = buf.len() as u32; 1127 dst.set_buffer(mem::take(buf).into()); 1128 state.kind = HostStreamProducerStateKind::Wrote(count); 1129 state.waker = None; 1130 Poll::Ready(Ok(StreamResult::Completed)) 1131 } 1132 } 1133 } 1134 } 1135 1136 impl Drop for HostStreamProducer { 1137 fn drop(&mut self) { 1138 log::debug!("stream produce: drop {}", self.0); 1139 } 1140 } 1141 1142 struct SharedStream(Scope); 1143 1144 impl SharedStream { 1145 async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> { 1146 std::future::poll_fn(|cx| { 1147 accessor.with(|mut store| { 1148 self.poll(cx, store.as_context_mut(), false) 1149 .map(|pair| match pair { 1150 (None, StreamResult::Dropped) => None, 1151 (Some(item), StreamResult::Completed) => Some(item), 1152 _ => unreachable!(), 1153 }) 1154 }) 1155 }) 1156 .await 1157 } 1158 1159 fn poll( 1160 &mut self, 1161 cx: &mut Context<'_>, 1162 mut store: StoreContextMut<'_, Data>, 1163 finish: bool, 1164 ) -> Poll<(Option<Command>, StreamResult)> { 1165 let data = store.data_mut(); 1166 1167 // If no more commands remain then this is a closed and dropped stream. 1168 let Some((scope, command)) = data.commands.last_mut() else { 1169 log::debug!("Stream closed: {:?}", self.0); 1170 return Poll::Ready((None, StreamResult::Dropped)); 1171 }; 1172 1173 // If the next queued up command is for the scope that this stream is 1174 // attached to then send off the command. 1175 if *scope == self.0 { 1176 let ret = Some(*command); 1177 1178 // All commands are followed up with an "ack", and after the "ack" 1179 // is delivered then the command is popped to move on to the next 1180 // command. The reason for this is to guarantee that a command has 1181 // been processed before moving on to the next command. This helps 1182 // make the fuzzing easier to work with by being able to implicitly 1183 // assume that a command has been processed by the time something 1184 // else is. Otherwise it might be possible that wasmtime has a set 1185 // of commands/callbacks that are all delivered at the same time and 1186 // the component model doesn't specify what order they happen 1187 // within. By forcing an "ack" it ensures a more expected ordering 1188 // of execution to assist with fuzzing without losing really all 1189 // that much coverage. 1190 if matches!(command, Command::Ack) { 1191 data.commands.pop(); 1192 } else { 1193 *command = Command::Ack; 1194 } 1195 1196 // After a command was popped other streams may be able to make 1197 // progress so wake them all up. 1198 for (_, waker) in data.wakers.drain() { 1199 waker.wake(); 1200 } 1201 log::debug!("Delivering command {ret:?} for {:?}", self.0); 1202 return Poll::Ready((ret, StreamResult::Completed)); 1203 } 1204 1205 // The command queue is non-empty and the next command isn't meant for 1206 // us, so someone else needs to drain the queue. Enqueue our waker. 1207 if finish { 1208 Poll::Ready((None, StreamResult::Cancelled)) 1209 } else { 1210 data.wakers.insert(self.0, cx.waker().clone()); 1211 Poll::Pending 1212 } 1213 } 1214 } 1215 1216 impl StreamProducer<Data> for SharedStream { 1217 type Item = Command; 1218 type Buffer = Option<Command>; 1219 1220 fn poll_produce<'a>( 1221 mut self: Pin<&mut Self>, 1222 cx: &mut Context<'_>, 1223 store: StoreContextMut<'a, Data>, 1224 mut destination: Destination<'a, Self::Item, Self::Buffer>, 1225 finish: bool, 1226 ) -> Poll<Result<StreamResult>> { 1227 let (item, result) = std::task::ready!(self.poll(cx, store, finish)); 1228 destination.set_buffer(item); 1229 Poll::Ready(Ok(result)) 1230 } 1231 } 1232 1233 #[cfg(test)] 1234 mod tests { 1235 use super::{ComponentAsync, Scope, init, run}; 1236 use crate::oracles::component_async::types::*; 1237 use crate::test::test_n_times; 1238 use Scope::*; 1239 1240 #[test] 1241 fn smoke() { 1242 init(); 1243 1244 test_n_times(50, |c, _| { 1245 run(c); 1246 Ok(()) 1247 }); 1248 } 1249 1250 // ======================================================================== 1251 // A series of fuzz-generated test cases which caused problems during the 1252 // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer 1253 // changes over time. 1254 1255 #[test] 1256 fn simple() { 1257 init(); 1258 1259 run(ComponentAsync { 1260 commands: vec![ 1261 (GuestCaller, Command::AsyncPendingImportCall(0)), 1262 (GuestCallee, Command::AsyncPendingImportCall(1)), 1263 (GuestCallee, Command::AsyncPendingExportComplete(0)), 1264 (GuestCaller, Command::AsyncPendingImportAssertReady(0)), 1265 (GuestCaller, Command::AsyncPendingImportCall(2)), 1266 ], 1267 }); 1268 } 1269 1270 #[test] 1271 fn somewhat_larger() { 1272 static COMMANDS: &[(Scope, Command)] = &[ 1273 (GuestCallee, Command::FutureNew(0)), 1274 (HostCaller, Command::FutureNew(1)), 1275 (GuestCallee, Command::FutureReadPending(0)), 1276 (GuestCaller, Command::AsyncPendingImportCall(2)), 1277 (GuestCaller, Command::AsyncPendingImportCall(3)), 1278 (GuestCaller, Command::AsyncPendingImportCall(4)), 1279 (GuestCaller, Command::AsyncPendingImportCall(5)), 1280 (GuestCallee, Command::AsyncPendingExportComplete(5)), 1281 (GuestCallee, Command::AsyncPendingExportComplete(3)), 1282 (GuestCallee, Command::AsyncPendingExportComplete(4)), 1283 (GuestCallee, Command::AsyncPendingExportComplete(2)), 1284 (GuestCaller, Command::AsyncPendingImportCall(6)), 1285 (GuestCallee, Command::AsyncPendingExportComplete(6)), 1286 (GuestCaller, Command::AsyncPendingImportCall(7)), 1287 (GuestCallee, Command::AsyncPendingExportComplete(7)), 1288 (GuestCaller, Command::AsyncPendingImportCall(8)), 1289 (GuestCallee, Command::AsyncPendingExportComplete(8)), 1290 (GuestCaller, Command::AsyncPendingImportCall(9)), 1291 (GuestCallee, Command::AsyncPendingExportComplete(9)), 1292 (GuestCaller, Command::AsyncPendingImportCall(10)), 1293 (GuestCallee, Command::AsyncPendingExportComplete(10)), 1294 (GuestCaller, Command::AsyncPendingImportCall(11)), 1295 (GuestCallee, Command::AsyncPendingExportComplete(11)), 1296 (GuestCaller, Command::AsyncPendingImportCall(12)), 1297 (GuestCallee, Command::AsyncPendingExportComplete(12)), 1298 (GuestCaller, Command::AsyncPendingImportCall(13)), 1299 (GuestCallee, Command::AsyncPendingExportComplete(13)), 1300 (GuestCaller, Command::AsyncPendingImportCall(14)), 1301 (GuestCallee, Command::AsyncPendingExportComplete(14)), 1302 (GuestCaller, Command::AsyncPendingImportCall(15)), 1303 (GuestCallee, Command::AsyncPendingExportComplete(15)), 1304 (GuestCaller, Command::AsyncPendingImportCall(16)), 1305 (GuestCallee, Command::AsyncPendingExportComplete(16)), 1306 (GuestCaller, Command::AsyncPendingImportCall(17)), 1307 (GuestCallee, Command::AsyncPendingExportComplete(17)), 1308 (GuestCaller, Command::AsyncPendingImportCall(18)), 1309 (GuestCallee, Command::AsyncPendingExportComplete(18)), 1310 (GuestCaller, Command::AsyncPendingImportCall(19)), 1311 (GuestCallee, Command::AsyncPendingExportComplete(19)), 1312 (GuestCaller, Command::AsyncPendingImportCall(20)), 1313 (GuestCallee, Command::AsyncPendingExportComplete(20)), 1314 (GuestCaller, Command::AsyncPendingImportCall(21)), 1315 (GuestCallee, Command::AsyncPendingExportComplete(21)), 1316 (GuestCaller, Command::AsyncPendingImportCall(22)), 1317 (GuestCallee, Command::AsyncPendingExportComplete(22)), 1318 (GuestCaller, Command::AsyncPendingImportCall(23)), 1319 (GuestCallee, Command::AsyncPendingExportComplete(23)), 1320 (GuestCaller, Command::AsyncPendingImportCall(24)), 1321 (GuestCallee, Command::AsyncPendingExportComplete(24)), 1322 (GuestCaller, Command::AsyncPendingImportCall(25)), 1323 (GuestCallee, Command::AsyncPendingExportComplete(25)), 1324 (GuestCaller, Command::AsyncPendingImportCall(26)), 1325 (GuestCallee, Command::AsyncPendingExportComplete(26)), 1326 (GuestCaller, Command::AsyncPendingImportCall(27)), 1327 (GuestCallee, Command::AsyncPendingExportComplete(27)), 1328 (GuestCaller, Command::AsyncPendingImportCall(28)), 1329 (GuestCallee, Command::AsyncPendingExportComplete(28)), 1330 (GuestCaller, Command::AsyncPendingImportCall(29)), 1331 (GuestCallee, Command::AsyncPendingExportComplete(29)), 1332 (GuestCaller, Command::AsyncPendingImportCall(30)), 1333 (GuestCallee, Command::AsyncPendingExportComplete(30)), 1334 (GuestCaller, Command::AsyncPendingImportCall(31)), 1335 (GuestCallee, Command::AsyncPendingExportComplete(31)), 1336 (GuestCaller, Command::AsyncPendingImportCall(32)), 1337 (GuestCallee, Command::AsyncPendingExportComplete(32)), 1338 (GuestCaller, Command::AsyncPendingImportCall(33)), 1339 (GuestCallee, Command::AsyncPendingExportComplete(33)), 1340 (GuestCaller, Command::AsyncPendingImportCall(34)), 1341 (GuestCallee, Command::AsyncPendingExportComplete(34)), 1342 (GuestCaller, Command::AsyncPendingImportCall(35)), 1343 (GuestCallee, Command::AsyncPendingExportComplete(35)), 1344 (GuestCaller, Command::AsyncPendingImportCall(36)), 1345 (GuestCallee, Command::AsyncPendingExportComplete(36)), 1346 (GuestCaller, Command::AsyncPendingImportCall(37)), 1347 (GuestCallee, Command::AsyncPendingExportComplete(37)), 1348 (GuestCaller, Command::AsyncPendingImportAssertReady(36)), 1349 ]; 1350 init(); 1351 1352 run(ComponentAsync { 1353 commands: COMMANDS.to_vec(), 1354 }); 1355 } 1356 1357 #[test] 1358 fn simple_stream1() { 1359 init(); 1360 1361 run(ComponentAsync { 1362 commands: vec![ 1363 (HostCallee, Command::StreamNew(1)), 1364 ( 1365 HostCallee, 1366 Command::StreamReadPending(StreamReadPayload { 1367 stream: 1, 1368 count: 2, 1369 }), 1370 ), 1371 (HostCallee, Command::StreamCancelRead(1)), 1372 (GuestCaller, Command::SyncReadyCall), 1373 ( 1374 HostCallee, 1375 Command::StreamWritePending(StreamWritePayload { 1376 stream: 1, 1377 item: 3, 1378 count: 2, 1379 }), 1380 ), 1381 (HostCallee, Command::StreamCancelWrite(1)), 1382 (HostCallee, Command::StreamDropWritable(1)), 1383 ( 1384 HostCallee, 1385 Command::StreamReadDropped(StreamReadPayload { 1386 stream: 1, 1387 count: 1, 1388 }), 1389 ), 1390 ], 1391 }); 1392 } 1393 1394 #[test] 1395 fn simple_stream3() { 1396 init(); 1397 1398 run(ComponentAsync { 1399 commands: vec![ 1400 (GuestCaller, Command::StreamNew(26)), 1401 ( 1402 GuestCaller, 1403 Command::StreamReadPending(StreamReadPayload { 1404 stream: 26, 1405 count: 10, 1406 }), 1407 ), 1408 (GuestCaller, Command::StreamDropWritable(26)), 1409 (GuestCaller, Command::StreamReadAssertDropped(26)), 1410 ], 1411 }); 1412 } 1413 1414 #[test] 1415 fn simple_stream4() { 1416 init(); 1417 1418 run(ComponentAsync { 1419 commands: vec![ 1420 (GuestCaller, Command::StreamNew(23)), 1421 ( 1422 GuestCaller, 1423 Command::StreamWritePending(StreamWritePayload { 1424 stream: 23, 1425 item: 24, 1426 count: 2, 1427 }), 1428 ), 1429 (GuestCaller, Command::StreamGive(23)), 1430 (GuestCallee, Command::StreamDropReadable(23)), 1431 ( 1432 GuestCaller, 1433 Command::StreamWriteAssertDropped(StreamReadPayload { 1434 stream: 23, 1435 count: 0, 1436 }), 1437 ), 1438 ], 1439 }); 1440 } 1441 1442 #[test] 1443 fn zero_length_behavior() { 1444 init(); 1445 1446 run(ComponentAsync { 1447 commands: vec![ 1448 (GuestCaller, Command::StreamNew(10)), 1449 (HostCaller, Command::StreamTake(10)), 1450 ( 1451 GuestCaller, 1452 Command::StreamWritePending(StreamWritePayload { 1453 stream: 10, 1454 item: 13, 1455 count: 5, 1456 }), 1457 ), 1458 ( 1459 HostCaller, 1460 Command::StreamReadReady(StreamReadyPayload { 1461 stream: 10, 1462 item: 0, 1463 op_count: 0, 1464 ready_count: 0, 1465 }), 1466 ), 1467 ( 1468 HostCaller, 1469 Command::StreamReadReady(StreamReadyPayload { 1470 stream: 10, 1471 item: 0, 1472 op_count: 0, 1473 ready_count: 0, 1474 }), 1475 ), 1476 ], 1477 }); 1478 } 1479 } 1480