1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2 
3 use crate::block_on;
4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8 use futures::channel::oneshot;
9 use std::collections::{HashMap, HashSet};
10 use std::mem;
11 use std::pin::Pin;
12 use std::sync::{Arc, OnceLock, Weak};
13 use std::task::{Context, Poll, Waker};
14 use std::time::Instant;
15 use wasmtime::component::{
16     Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17     FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18     StreamReader, StreamResult, VecBuffer,
19 };
20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22 
23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24 
25 /// Initializes state for future fuzz runs.
26 ///
27 /// This will create an `Engine` to run this fuzzer within and it will
28 /// additionally precompile the component that will be used for fuzzing.
29 ///
30 /// There are a few points of note about this:
31 ///
32 /// * The `misc` fuzzer is manually instrumented with this function as the init
33 ///   hook to ensure this runs before any other fuzzing.
34 ///
35 /// * Compilation of the component takes quite some time with
36 ///   fuzzing-instrumented Cranelift. To assist with local development this
37 ///   implements a cache which is serialized/deserialized via an env var.
38 pub fn init() {
39     crate::init_fuzzing();
40 
41     STATE.get_or_init(|| {
42         let mut config = Config::new();
43         config.wasm_component_model_async(true);
44         config.async_support(true);
45         let engine = Engine::new(&config).unwrap();
46         let component = compile(&engine);
47         let mut linker = Linker::new(&engine);
48         wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
49         async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50         types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
51 
52         let pre = linker.instantiate_pre(&component).unwrap();
53         let pre = FuzzAsyncPre::new(pre).unwrap();
54 
55         (engine, pre)
56     });
57 
58     fn compile(engine: &Engine) -> Component {
59         let wasm = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
60         let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
61         if let Some(path) = &cwasm_cache
62             && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
63             && let Ok(wasm_mtime) = std::fs::metadata(wasm).and_then(|m| m.modified())
64             && cwasm_mtime > wasm_mtime
65         {
66             log::debug!("Using cached component async cwasm at {path}");
67             unsafe {
68                 return Component::deserialize_file(engine, path).unwrap();
69             }
70         }
71 
72         let composition = {
73             let mut config = wasm_compose::config::Config::default();
74             let tempdir = tempfile::TempDir::new().unwrap();
75             let path = tempdir.path().join("fuzz-async.wasm");
76             std::fs::copy(wasm, &path).unwrap();
77             config.definitions.push(path.clone());
78 
79             wasm_compose::composer::ComponentComposer::new(&path, &config)
80                 .compose()
81                 .unwrap()
82         };
83         let start = Instant::now();
84         let component = Component::new(&engine, &composition).unwrap();
85         if let Some(path) = cwasm_cache {
86             log::debug!("Caching component async cwasm to {path}");
87             std::fs::write(path, &component.serialize().unwrap()).unwrap();
88         } else if start.elapsed() > std::time::Duration::from_secs(1) {
89             eprintln!(
90                 "
91 !!!!!!!!!!!!!!!!!!!!!!!!!!
92 
93 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
94 cache compilation results
95 
96 !!!!!!!!!!!!!!!!!!!!!!!!!!
97 "
98             );
99         }
100         return component;
101     }
102 }
103 
104 #[derive(Default)]
105 struct Data {
106     ctx: WasiCtx,
107     table: ResourceTable,
108     wakers: HashMap<Scope, Waker>,
109     commands: Vec<(Scope, Command)>,
110 
111     guest_caller_stream: Option<StreamReader<Command>>,
112     guest_callee_stream: Option<StreamReader<Command>>,
113 
114     host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
115     host_pending_async_calls_cancelled: HashSet<u32>,
116     guest_pending_async_calls_ready: HashSet<u32>,
117 
118     // State of futures/streams. Note that while #12091 is unresolved an
119     // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
120     // and the various halves we're interacting with using traits.
121     host_futures: HashMap<u32, FutureReader<u32>>,
122     host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
123     host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
124     host_streams: HashMap<u32, StreamReader<u32>>,
125     host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
126     host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
127 }
128 
129 impl WasiView for Data {
130     fn ctx(&mut self) -> WasiCtxView<'_> {
131         WasiCtxView {
132             ctx: &mut self.ctx,
133             table: &mut self.table,
134         }
135     }
136 }
137 
138 impl async_test::HostWithStore for HasSelf<Data> {
139     async fn async_ready<T>(_store: &Accessor<T, Self>) {}
140 
141     async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
142         let (tx, rx) = oneshot::channel();
143         store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
144         let record = RecordCancelOnDrop { store, id };
145         rx.await.unwrap();
146         mem::forget(record);
147 
148         struct RecordCancelOnDrop<'a, T: 'static> {
149             store: &'a Accessor<T, HasSelf<Data>>,
150             id: u32,
151         }
152 
153         impl<T> Drop for RecordCancelOnDrop<'_, T> {
154             fn drop(&mut self) {
155                 self.store.with(|mut s| {
156                     s.get().host_pending_async_calls_cancelled.insert(self.id);
157                 });
158             }
159         }
160     }
161 
162     async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
163 }
164 
165 impl async_test::Host for Data {
166     fn sync_ready(&mut self) {}
167 
168     fn future_take(&mut self, id: u32) -> FutureReader<u32> {
169         self.host_futures.remove(&id).unwrap()
170     }
171 
172     fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
173         let prev = self.host_futures.insert(id, future);
174         assert!(prev.is_none());
175     }
176 
177     fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
178         self.host_streams.remove(&id).unwrap()
179     }
180 
181     fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
182         let prev = self.host_streams.insert(id, stream);
183         assert!(prev.is_none());
184     }
185 }
186 
187 impl types::HostWithStore for HasSelf<Data> {
188     fn get_commands<T>(
189         mut store: Access<'_, T, Self>,
190         scope: types::Scope,
191     ) -> StreamReader<Command> {
192         let data = store.get();
193         match scope {
194             types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
195             types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
196         }
197     }
198 }
199 
200 impl types::Host for Data {}
201 
202 /// Executes the `input` provided, assuming that `init` has been previously
203 /// executed.
204 pub fn run(mut input: ComponentAsync) {
205     log::debug!("Running component async fuzz test with\n{input:?}");
206 
207     // Commands are executed in the order that they're listed in the input, but
208     // to make it easier on the `StreamProducer` implementation below they're
209     // popped off the back. To ensure that they're all delivered in the right
210     // order reverse the list to ensure the correct order is maintained.
211     input.commands.reverse();
212 
213     let (engine, pre) = STATE.get().unwrap();
214     let mut store = Store::new(
215         engine,
216         Data {
217             ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
218             commands: input.commands,
219             ..Data::default()
220         },
221     );
222 
223     let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller));
224     let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee));
225     store.data_mut().guest_caller_stream = Some(guest_caller_stream);
226     store.data_mut().guest_callee_stream = Some(guest_callee_stream);
227     block_on(async {
228         let instance = pre.instantiate_async(&mut store).await.unwrap();
229         let test = instance.wasmtime_fuzz_fuzz_async_test();
230 
231         let mut host_caller = SharedStream(Scope::HostCaller);
232         let mut host_callee = SharedStream(Scope::HostCallee);
233         store
234             .run_concurrent(async |store| {
235                 // Kick off stream reads in the guest. This function will return
236                 // but the tasks in the guest will keep running after they
237                 // return to process stream items.
238                 test.call_init(store, types::Scope::Caller).await.unwrap();
239 
240                 // Simultaneously process commands from both host streams. These
241                 // will return once the entire command queue is exhausted.
242                 futures::join!(
243                     async {
244                         while let Some(cmd) = host_caller.next(store).await {
245                             host_caller_cmd(&test, store, cmd).await;
246                         }
247                     },
248                     async {
249                         while let Some(cmd) = host_callee.next(store).await {
250                             host_callee_cmd(store, cmd).await;
251                         }
252                     },
253                 );
254 
255                 // Note that there may still be pending async work in the guest
256                 // (or host). It's intentional that it's not cleaned up here to
257                 // help test situations where async work is all abruptly
258                 // cancelled by just being dropped in the host.
259             })
260             .await
261             .unwrap();
262     });
263 }
264 
265 /// See documentation in `fuzz_async.rs` for what's going on here.
266 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
267 where
268     F: FnMut(&mut Data) -> bool,
269 {
270     for _ in 0..1000 {
271         let ready = store.with(|mut s| f(s.get()));
272         if ready {
273             return true;
274         }
275 
276         crate::YieldN(1).await;
277     }
278 
279     return false;
280 }
281 
282 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
283 where
284     F: FnMut(&mut Data) -> bool,
285 {
286     assert!(
287         test_property(store, f).await,
288         "timed out waiting for {desc}",
289     );
290 }
291 
292 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
293     match cmd {
294         Command::Ack => {}
295         Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
296         Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
297         Command::AsyncPendingExportComplete(_i) => todo!(),
298         Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
299         Command::AsyncPendingImportCall(i) => {
300             struct RunPendingImport {
301                 test: Guest,
302                 i: u32,
303             }
304 
305             store.spawn(RunPendingImport {
306                 test: test.clone(),
307                 i,
308             });
309 
310             impl AccessorTask<Data> for RunPendingImport {
311                 async fn run(self, store: &Accessor<Data>) -> Result<()> {
312                     self.test.call_async_pending(store, self.i).await?;
313                     store.with(|mut s| {
314                         s.get().guest_pending_async_calls_ready.insert(self.i);
315                     });
316                     Ok(())
317                 }
318             }
319         }
320         Command::AsyncPendingImportCancel(_i) => todo!(),
321         Command::AsyncPendingImportAssertReady(i) => {
322             assert!(
323                 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
324                 "expected async_pending import {i} to be ready",
325             );
326         }
327 
328         Command::FutureTake(i) => {
329             let future = test.call_future_take(store, i).await.unwrap();
330             store.with(|mut s| {
331                 let prev = s.get().host_futures.insert(i, future);
332                 assert!(prev.is_none());
333             });
334         }
335         Command::FutureGive(i) => {
336             let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
337             test.call_future_receive(store, i, future).await.unwrap();
338         }
339         Command::StreamTake(i) => {
340             let stream = test.call_stream_take(store, i).await.unwrap();
341             store.with(|mut s| {
342                 let prev = s.get().host_streams.insert(i, stream);
343                 assert!(prev.is_none());
344             });
345         }
346         Command::StreamGive(i) => {
347             let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
348             test.call_stream_receive(store, i, stream).await.unwrap();
349         }
350 
351         other => future_or_stream_cmd(store, other).await,
352     }
353 }
354 
355 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
356     match cmd {
357         Command::Ack => {}
358         Command::SyncReadyCall => todo!(),
359         Command::AsyncReadyCall => todo!(),
360         Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
361             s.get()
362                 .host_pending_async_calls
363                 .remove(&i)
364                 .unwrap()
365                 .send(())
366                 .unwrap();
367         }),
368         Command::AsyncPendingExportAssertCancelled(i) => {
369             assert!(
370                 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
371                 "expected async_pending export {i} to be cancelled",
372             );
373         }
374         Command::AsyncPendingImportCall(_i) => todo!(),
375         Command::AsyncPendingImportCancel(_i) => todo!(),
376         Command::AsyncPendingImportAssertReady(_i) => todo!(),
377 
378         other => future_or_stream_cmd(store, other).await,
379     }
380 }
381 
382 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
383     match cmd {
384         // These commands should be handled above
385         Command::Ack
386         | Command::SyncReadyCall
387         | Command::AsyncReadyCall
388         | Command::AsyncPendingExportComplete(_)
389         | Command::AsyncPendingExportAssertCancelled(_)
390         | Command::AsyncPendingImportCall(_)
391         | Command::AsyncPendingImportCancel(_)
392         | Command::FutureTake(_)
393         | Command::FutureGive(_)
394         | Command::StreamTake(_)
395         | Command::StreamGive(_)
396         | Command::AsyncPendingImportAssertReady(_) => unreachable!(),
397 
398         Command::FutureNew(id) => {
399             store.with(|mut s| {
400                 let arc = Arc::new(());
401                 let weak = Arc::downgrade(&arc);
402                 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc));
403                 let data = s.get();
404                 let prev = data.host_futures.insert(id, future);
405                 assert!(prev.is_none());
406                 let prev = data
407                     .host_future_producers
408                     .insert(id, (HostFutureProducerState::Idle, weak));
409                 assert!(prev.is_none());
410             });
411         }
412         Command::FutureDropReadable(id) => {
413             store.with(|mut s| match s.get().host_futures.remove(&id) {
414                 Some(mut future) => future.close(&mut s),
415                 None => {
416                     let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
417                     state.wake_by_ref();
418                 }
419             })
420         }
421         Command::FutureWriteReady(payload) => {
422             await_property(store, "future write should be waiting", |s| {
423                 matches!(
424                     s.host_future_producers.get(&payload.future),
425                     Some((HostFutureProducerState::Waiting(_), _))
426                 )
427             })
428             .await;
429             store.with(|mut s| {
430                 let state = s
431                     .get()
432                     .host_future_producers
433                     .get_mut(&payload.future)
434                     .unwrap();
435                 match state {
436                     (HostFutureProducerState::Waiting(waker), _) => {
437                         waker.wake_by_ref();
438                         state.0 = HostFutureProducerState::Writing(payload.item);
439                     }
440                     (state, _) => panic!("future not waiting: {state:?}"),
441                 }
442             })
443         }
444         Command::FutureWritePending(payload) => store.with(|mut s| {
445             let state = s
446                 .get()
447                 .host_future_producers
448                 .get_mut(&payload.future)
449                 .unwrap();
450             match state {
451                 (HostFutureProducerState::Idle, _) => {
452                     state.0 = HostFutureProducerState::Writing(payload.item);
453                 }
454                 _ => panic!("future not idle"),
455             }
456         }),
457         Command::FutureWriteDropped(id) => store.with(|mut s| {
458             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
459             assert!(matches!(state, HostFutureProducerState::Idle));
460             assert!(weak.upgrade().is_none());
461         }),
462         Command::FutureReadReady(payload) => {
463             let id = payload.future;
464             store.with(|mut s| {
465                 let arc = Arc::new(());
466                 let weak = Arc::downgrade(&arc);
467                 let data = s.get();
468                 let future = data.host_futures.remove(&id).unwrap();
469                 let prev = data
470                     .host_future_consumers
471                     .insert(id, (HostFutureConsumerState::Consuming, weak));
472                 assert!(prev.is_none());
473                 future.pipe(&mut s, HostFutureConsumer(id, arc));
474             });
475 
476             await_property(store, "future should be present", |s| {
477                 matches!(
478                     s.host_future_consumers[&id],
479                     (HostFutureConsumerState::Complete(_), _)
480                 )
481             })
482             .await;
483 
484             store.with(|mut s| {
485                 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
486                 match state {
487                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
488                     _ => panic!("future not complete"),
489                 }
490             });
491         }
492         Command::FutureReadPending(id) => {
493             ensure_future_reading(store, id);
494             store.with(|mut s| {
495                 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
496                 state.wake_by_ref();
497                 assert!(
498                     matches!(state, HostFutureConsumerState::Idle),
499                     "bad state: {state:?}",
500                 );
501                 *state = HostFutureConsumerState::Consuming;
502             })
503         }
504         Command::FutureCancelWrite(id) => store.with(|mut s| {
505             let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
506             assert!(matches!(state, HostFutureProducerState::Writing(_)));
507             *state = HostFutureProducerState::Idle;
508         }),
509         Command::FutureCancelRead(id) => store.with(|mut s| {
510             let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
511             assert!(matches!(state, HostFutureConsumerState::Consuming));
512             *state = HostFutureConsumerState::Idle;
513         }),
514         Command::FutureReadAssertComplete(payload) => {
515             await_property(store, "future read should be complete", |s| {
516                 matches!(
517                     s.host_future_consumers.get(&payload.future),
518                     Some((HostFutureConsumerState::Complete(_), _))
519                 )
520             })
521             .await;
522             store.with(|mut s| {
523                 let (state, _) = s
524                     .get()
525                     .host_future_consumers
526                     .remove(&payload.future)
527                     .unwrap();
528                 match state {
529                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
530                     _ => panic!("future not complete"),
531                 }
532             })
533         }
534         Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
535             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
536             assert!(matches!(state, HostFutureProducerState::Complete));
537             assert!(weak.upgrade().is_none());
538         }),
539         Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
540             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
541             assert!(matches!(state, HostFutureProducerState::Writing(_)));
542             assert!(weak.upgrade().is_none());
543         }),
544 
545         Command::StreamNew(id) => {
546             store.with(|mut s| {
547                 let arc = Arc::new(());
548                 let weak = Arc::downgrade(&arc);
549                 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc));
550                 let data = s.get();
551                 let prev = data.host_streams.insert(id, stream);
552                 assert!(prev.is_none());
553                 let prev = data
554                     .host_stream_producers
555                     .insert(id, (HostStreamProducerState::idle(), weak));
556                 assert!(prev.is_none());
557             });
558         }
559         Command::StreamDropReadable(id) => {
560             store.with(|mut s| match s.get().host_streams.remove(&id) {
561                 Some(mut stream) => {
562                     stream.close(&mut s);
563                 }
564                 None => {
565                     let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
566                     state.wake_by_ref();
567                 }
568             })
569         }
570         Command::StreamDropWritable(id) => store.with(|mut s| {
571             let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
572             state.wake_by_ref();
573         }),
574         Command::StreamWriteReady(payload) => {
575             let id = payload.stream;
576             store.with(|mut s| {
577                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
578                 state.wake_by_ref();
579                 match state.kind {
580                     HostStreamProducerStateKind::Idle => {
581                         state.kind = HostStreamProducerStateKind::Writing(stream_payload(
582                             payload.item,
583                             payload.op_count,
584                         ));
585                     }
586                     _ => panic!("stream not idle: {state:?}"),
587                 }
588             });
589             await_property(store, "stream should complete a write", |s| {
590                 matches!(
591                     s.host_stream_producers[&id].0.kind,
592                     HostStreamProducerStateKind::Wrote(_),
593                 )
594             })
595             .await;
596             store.with(|mut s| {
597                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
598                 match state.kind {
599                     HostStreamProducerStateKind::Wrote(amt) => {
600                         assert_eq!(amt, payload.ready_count);
601                         state.kind = HostStreamProducerStateKind::Idle;
602                     }
603                     _ => panic!("stream not idle: {state:?}"),
604                 }
605             });
606         }
607         Command::StreamReadReady(payload) => {
608             let id = payload.stream;
609             ensure_stream_reading(store, id);
610             store.with(|mut s| {
611                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
612                 state.wake_by_ref();
613                 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
614             });
615             await_property(store, "stream should complete a read", |s| {
616                 matches!(
617                     s.host_stream_consumers[&id].0.kind,
618                     HostStreamConsumerStateKind::Consumed(_),
619                 )
620             })
621             .await;
622 
623             store.with(|mut s| {
624                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
625                 match &state.kind {
626                     HostStreamConsumerStateKind::Consumed(last_read) => {
627                         assert_eq!(
628                             *last_read,
629                             stream_payload(payload.item, payload.ready_count)
630                         );
631                         state.kind = HostStreamConsumerStateKind::Idle;
632                     }
633                     _ => panic!("future not complete"),
634                 }
635             });
636         }
637         Command::StreamWritePending(payload) => store.with(|mut s| {
638             let (state, _) = s
639                 .get()
640                 .host_stream_producers
641                 .get_mut(&payload.stream)
642                 .unwrap();
643             state.wake_by_ref();
644             match state.kind {
645                 HostStreamProducerStateKind::Idle => {
646                     state.kind = HostStreamProducerStateKind::Writing(stream_payload(
647                         payload.item,
648                         payload.count,
649                     ));
650                 }
651                 _ => panic!("stream not idle {:?}", state.kind),
652             }
653         }),
654         Command::StreamReadPending(payload) => {
655             ensure_stream_reading(store, payload.stream);
656             store.with(|mut s| {
657                 let (state, _) = s
658                     .get()
659                     .host_stream_consumers
660                     .get_mut(&payload.stream)
661                     .unwrap();
662                 state.wake_by_ref();
663                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
664                 state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
665             })
666         }
667         Command::StreamWriteDropped(payload) => store.with(|mut s| {
668             let (state, weak) = s
669                 .get()
670                 .host_stream_producers
671                 .get_mut(&payload.stream)
672                 .unwrap();
673             assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
674             assert!(weak.upgrade().is_none());
675         }),
676         Command::StreamReadDropped(payload) => {
677             ensure_stream_reading(store, payload.stream);
678             await_property(store, "stream read should get dropped", |s| {
679                 let weak = &s.host_stream_consumers[&payload.stream].1;
680                 weak.upgrade().is_none()
681             })
682             .await;
683             store.with(|mut s| {
684                 let (state, weak) = s
685                     .get()
686                     .host_stream_consumers
687                     .get_mut(&payload.stream)
688                     .unwrap();
689                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
690                 assert!(weak.upgrade().is_none());
691             })
692         }
693         Command::StreamCancelWrite(id) => store.with(|mut s| {
694             let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
695             assert!(
696                 matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
697                 "invalid state {state:?}",
698             );
699             state.kind = HostStreamProducerStateKind::Idle;
700             state.wake_by_ref();
701         }),
702         Command::StreamCancelRead(id) => store.with(|mut s| {
703             let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
704             assert!(matches!(
705                 state.kind,
706                 HostStreamConsumerStateKind::Consuming(_)
707             ));
708             state.kind = HostStreamConsumerStateKind::Idle;
709         }),
710         Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
711             let (state, _) = s
712                 .get()
713                 .host_stream_consumers
714                 .get_mut(&payload.stream)
715                 .unwrap();
716             match &state.kind {
717                 HostStreamConsumerStateKind::Consumed(last_read) => {
718                     assert_eq!(*last_read, stream_payload(payload.item, payload.count));
719                     state.kind = HostStreamConsumerStateKind::Idle;
720                 }
721                 _ => panic!("stream not complete"),
722             }
723         }),
724         Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
725             let (state, _) = s
726                 .get()
727                 .host_stream_producers
728                 .get_mut(&payload.stream)
729                 .unwrap();
730             match state.kind {
731                 HostStreamProducerStateKind::Wrote(amt) => {
732                     assert_eq!(amt, payload.count);
733                     state.kind = HostStreamProducerStateKind::Idle;
734                 }
735                 _ => panic!("stream not complete: {:?}", state.kind),
736             }
737         }),
738         Command::StreamWriteAssertDropped(payload) => {
739             await_property(store, "stream write should be dropped", |s| {
740                 let weak = &s.host_stream_producers[&payload.stream].1;
741                 weak.upgrade().is_none()
742             })
743             .await;
744             store.with(|mut s| {
745                 let (state, weak) = s
746                     .get()
747                     .host_stream_producers
748                     .get_mut(&payload.stream)
749                     .unwrap();
750                 assert!(matches!(
751                     state.kind,
752                     HostStreamProducerStateKind::Writing(_)
753                 ));
754                 assert!(weak.upgrade().is_none());
755             })
756         }
757         Command::StreamReadAssertDropped(id) => {
758             await_property(store, "stream read should be dropped", |s| {
759                 let weak = &s.host_stream_consumers[&id].1;
760                 weak.upgrade().is_none()
761             })
762             .await;
763             store.with(|mut s| {
764                 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
765                 assert!(matches!(
766                     state.kind,
767                     HostStreamConsumerStateKind::Consuming(_),
768                 ));
769                 assert!(weak.upgrade().is_none());
770             })
771         }
772     }
773 }
774 
775 fn stream_payload(item: u32, count: u32) -> Vec<u32> {
776     (item..item + count).collect()
777 }
778 
779 fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
780     store.with(|mut s| {
781         let data = s.get();
782         if !data.host_futures.contains_key(&id) {
783             return;
784         }
785         log::debug!("future consume: start {id}");
786         let arc = Arc::new(());
787         let weak = Arc::downgrade(&arc);
788         let data = s.get();
789         let future = data.host_futures.remove(&id).unwrap();
790         let prev = data
791             .host_future_consumers
792             .insert(id, (HostFutureConsumerState::Idle, weak));
793         assert!(prev.is_none());
794         future.pipe(&mut s, HostFutureConsumer(id, arc));
795     });
796 }
797 
798 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
799     store.with(|mut s| {
800         let data = s.get();
801         if !data.host_streams.contains_key(&id) {
802             return;
803         }
804         log::debug!("stream consume: start {id}");
805         let arc = Arc::new(());
806         let weak = Arc::downgrade(&arc);
807         let prev = data.host_stream_consumers.insert(
808             id,
809             (
810                 HostStreamConsumerState {
811                     kind: HostStreamConsumerStateKind::Idle,
812                     waker: None,
813                 },
814                 weak,
815             ),
816         );
817         assert!(prev.is_none());
818         let stream = data.host_streams.remove(&id).unwrap();
819         stream.pipe(&mut s, HostStreamConsumer(id, arc));
820     });
821 }
822 
823 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
824 
825 /// Note that this is only created once a read is actually initiated on a
826 /// future. It's also not possible to cancel a host-based read on a future,
827 /// hence why this is simpler than the `HostFutureProducerState` state below.
828 #[derive(Debug)]
829 enum HostFutureConsumerState {
830     Idle,
831     Waiting(Waker),
832     Consuming,
833     Complete(u32),
834 }
835 
836 impl HostFutureConsumerState {
837     fn wake_by_ref(&mut self) {
838         if let HostFutureConsumerState::Waiting(waker) = &self {
839             waker.wake_by_ref();
840             *self = HostFutureConsumerState::Idle;
841         }
842     }
843 }
844 
845 impl FutureConsumer<Data> for HostFutureConsumer {
846     type Item = u32;
847 
848     fn poll_consume(
849         self: Pin<&mut Self>,
850         cx: &mut Context<'_>,
851         mut store: StoreContextMut<'_, Data>,
852         mut source: Source<'_, Self::Item>,
853         finish: bool,
854     ) -> Poll<Result<()>> {
855         let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
856             Some(state) => state,
857             None => {
858                 log::debug!("consume: closed {}", self.0);
859                 return Poll::Ready(Ok(()));
860             }
861         };
862         match state.0 {
863             HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
864                 if finish {
865                     log::debug!("consume: cancel {}", self.0);
866                     state.0 = HostFutureConsumerState::Idle;
867                     Poll::Ready(Ok(()))
868                 } else {
869                     log::debug!("consume: wait {}", self.0);
870                     state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
871                     Poll::Pending
872                 }
873             }
874             HostFutureConsumerState::Consuming => {
875                 log::debug!("consume: done {}", self.0);
876                 let mut item = None;
877                 source.read(&mut store, &mut item).unwrap();
878                 store
879                     .data_mut()
880                     .host_future_consumers
881                     .get_mut(&self.0)
882                     .unwrap()
883                     .0 = HostFutureConsumerState::Complete(item.unwrap());
884                 Poll::Ready(Ok(()))
885             }
886             HostFutureConsumerState::Complete(_) => unreachable!(),
887         }
888     }
889 }
890 
891 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
892 
893 #[derive(Debug)]
894 enum HostFutureProducerState {
895     Idle,
896     Waiting(Waker),
897     Writing(u32),
898     Complete,
899 }
900 
901 impl FutureProducer<Data> for HostFutureProducer {
902     type Item = u32;
903 
904     fn poll_produce(
905         self: Pin<&mut Self>,
906         cx: &mut Context<'_>,
907         mut store: StoreContextMut<'_, Data>,
908         finish: bool,
909     ) -> Poll<Result<Option<Self::Item>>> {
910         let state = store
911             .data_mut()
912             .host_future_producers
913             .get_mut(&self.0)
914             .unwrap();
915         match state.0 {
916             HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
917                 if finish {
918                     log::debug!("produce: cancel {}", self.0);
919                     state.0 = HostFutureProducerState::Idle;
920                     Poll::Ready(Ok(None))
921                 } else {
922                     log::debug!("produce: wait {}", self.0);
923                     state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
924                     Poll::Pending
925                 }
926             }
927             HostFutureProducerState::Writing(item) => {
928                 log::debug!("produce: done {}", self.0);
929                 state.0 = HostFutureProducerState::Complete;
930                 Poll::Ready(Ok(Some(item)))
931             }
932             HostFutureProducerState::Complete => unreachable!(),
933         }
934     }
935 }
936 
937 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
938 
939 #[derive(Debug)]
940 struct HostStreamConsumerState {
941     waker: Option<Waker>,
942     kind: HostStreamConsumerStateKind,
943 }
944 
945 #[derive(Debug)]
946 enum HostStreamConsumerStateKind {
947     Idle,
948     Consuming(u32),
949     Consumed(Vec<u32>),
950 }
951 
952 impl HostStreamConsumerState {
953     fn wake_by_ref(&mut self) {
954         if let Some(waker) = self.waker.take() {
955             waker.wake();
956         }
957     }
958 }
959 
960 impl StreamConsumer<Data> for HostStreamConsumer {
961     type Item = u32;
962 
963     fn poll_consume(
964         self: Pin<&mut Self>,
965         cx: &mut Context<'_>,
966         mut store: StoreContextMut<'_, Data>,
967         mut source: Source<'_, Self::Item>,
968         finish: bool,
969     ) -> Poll<Result<StreamResult>> {
970         let remaining = source.remaining(&mut store);
971         let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
972             Some((state, _)) => state,
973             None => {
974                 log::debug!("stream consume: dropped {}", self.0);
975                 return Poll::Ready(Ok(StreamResult::Dropped));
976             }
977         };
978         match state.kind {
979             HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
980                 if finish {
981                     log::debug!("stream consume: cancel {}", self.0);
982                     state.waker = None;
983                     Poll::Ready(Ok(StreamResult::Cancelled))
984                 } else {
985                     log::debug!("stream consume: wait {}", self.0);
986                     state.waker = Some(cx.waker().clone());
987                     Poll::Pending
988                 }
989             }
990             HostStreamConsumerStateKind::Consuming(amt) => {
991                 // The writer is performing a zero-length write. We always
992                 // complete that without updating our own state.
993                 if remaining == 0 {
994                     log::debug!("stream consume: completing zero-length write {}", self.0);
995                     return Poll::Ready(Ok(StreamResult::Completed));
996                 }
997 
998                 // If this is a zero-length read then block the writer but update our own state.
999                 if amt == 0 {
1000                     log::debug!("stream consume: finishing zero-length read {}", self.0);
1001                     state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1002                     state.waker = Some(cx.waker().clone());
1003                     return Poll::Pending;
1004                 }
1005 
1006                 // For non-zero sizes perform the read/copy.
1007                 log::debug!("stream consume: done {}", self.0);
1008                 let mut dst = Vec::with_capacity(amt as usize);
1009                 source.read(&mut store, &mut dst).unwrap();
1010                 let state = &mut store
1011                     .data_mut()
1012                     .host_stream_consumers
1013                     .get_mut(&self.0)
1014                     .unwrap()
1015                     .0;
1016                 state.kind = HostStreamConsumerStateKind::Consumed(dst);
1017                 state.waker = None;
1018                 Poll::Ready(Ok(StreamResult::Completed))
1019             }
1020         }
1021     }
1022 }
1023 
1024 impl Drop for HostStreamConsumer {
1025     fn drop(&mut self) {
1026         log::debug!("stream consume: drop {}", self.0);
1027     }
1028 }
1029 
1030 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1031 
1032 #[derive(Debug)]
1033 struct HostStreamProducerState {
1034     kind: HostStreamProducerStateKind,
1035     waker: Option<Waker>,
1036 }
1037 
1038 #[derive(Debug)]
1039 enum HostStreamProducerStateKind {
1040     Idle,
1041     Writing(Vec<u32>),
1042     Wrote(u32),
1043 }
1044 
1045 impl HostStreamProducerState {
1046     fn idle() -> Self {
1047         HostStreamProducerState {
1048             kind: HostStreamProducerStateKind::Idle,
1049             waker: None,
1050         }
1051     }
1052 
1053     fn wake_by_ref(&mut self) {
1054         if let Some(waker) = self.waker.take() {
1055             waker.wake();
1056         }
1057     }
1058 }
1059 
1060 impl StreamProducer<Data> for HostStreamProducer {
1061     type Item = u32;
1062     type Buffer = VecBuffer<u32>;
1063 
1064     fn poll_produce(
1065         self: Pin<&mut Self>,
1066         cx: &mut Context<'_>,
1067         mut store: StoreContextMut<'_, Data>,
1068         mut dst: Destination<'_, Self::Item, Self::Buffer>,
1069         finish: bool,
1070     ) -> Poll<Result<StreamResult>> {
1071         let remaining = dst.remaining(&mut store);
1072         let data = store.data_mut();
1073         let state = match data.host_stream_producers.get_mut(&self.0) {
1074             Some((state, _)) => state,
1075             None => {
1076                 log::debug!("stream produce: dropped {}", self.0);
1077                 return Poll::Ready(Ok(StreamResult::Dropped));
1078             }
1079         };
1080         match &mut state.kind {
1081             HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1082                 if finish {
1083                     log::debug!("stream produce: cancel {}", self.0);
1084                     state.waker = None;
1085                     Poll::Ready(Ok(StreamResult::Cancelled))
1086                 } else {
1087                     log::debug!("stream produce: wait {}", self.0);
1088                     state.waker = Some(cx.waker().clone());
1089                     Poll::Pending
1090                 }
1091             }
1092             HostStreamProducerStateKind::Writing(buf) => {
1093                 // Keep the other side blocked for a zero-length write
1094                 // originated from the host.
1095                 if buf.len() == 0 {
1096                     log::debug!("stream produce: zero-length write {}", self.0);
1097                     state.kind = HostStreamProducerStateKind::Wrote(0);
1098                     state.waker = Some(cx.waker().clone());
1099                     return Poll::Pending;
1100                 }
1101                 log::debug!("stream produce: write {}", self.0);
1102                 match remaining {
1103                     Some(amt) => {
1104                         // If the guest is doing a zero-length read then we've
1105                         // got some data for them. Complete the read but leave
1106                         // ourselves in the same `Writing` state as before.
1107                         if amt == 0 {
1108                             state.waker = None;
1109                             return Poll::Ready(Ok(StreamResult::Completed));
1110                         }
1111 
1112                         // Don't let wasmtime buffer up data for us, so truncate
1113                         // the buffer we're sending over to the amount that the
1114                         // reader is requesting.
1115                         if amt < buf.len() {
1116                             buf.truncate(amt);
1117                         }
1118                     }
1119 
1120                     // At this time host<->host stream reads/writes aren't
1121                     // fuzzed since that brings up a bunch of weird edge cases
1122                     // which aren't fun to deal with and aren't interesting
1123                     // either.
1124                     None => unreachable!(),
1125                 }
1126                 let count = buf.len() as u32;
1127                 dst.set_buffer(mem::take(buf).into());
1128                 state.kind = HostStreamProducerStateKind::Wrote(count);
1129                 state.waker = None;
1130                 Poll::Ready(Ok(StreamResult::Completed))
1131             }
1132         }
1133     }
1134 }
1135 
1136 impl Drop for HostStreamProducer {
1137     fn drop(&mut self) {
1138         log::debug!("stream produce: drop {}", self.0);
1139     }
1140 }
1141 
1142 struct SharedStream(Scope);
1143 
1144 impl SharedStream {
1145     async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1146         std::future::poll_fn(|cx| {
1147             accessor.with(|mut store| {
1148                 self.poll(cx, store.as_context_mut(), false)
1149                     .map(|pair| match pair {
1150                         (None, StreamResult::Dropped) => None,
1151                         (Some(item), StreamResult::Completed) => Some(item),
1152                         _ => unreachable!(),
1153                     })
1154             })
1155         })
1156         .await
1157     }
1158 
1159     fn poll(
1160         &mut self,
1161         cx: &mut Context<'_>,
1162         mut store: StoreContextMut<'_, Data>,
1163         finish: bool,
1164     ) -> Poll<(Option<Command>, StreamResult)> {
1165         let data = store.data_mut();
1166 
1167         // If no more commands remain then this is a closed and dropped stream.
1168         let Some((scope, command)) = data.commands.last_mut() else {
1169             log::debug!("Stream closed: {:?}", self.0);
1170             return Poll::Ready((None, StreamResult::Dropped));
1171         };
1172 
1173         // If the next queued up command is for the scope that this stream is
1174         // attached to then send off the command.
1175         if *scope == self.0 {
1176             let ret = Some(*command);
1177 
1178             // All commands are followed up with an "ack", and after the "ack"
1179             // is delivered then the command is popped to move on to the next
1180             // command. The reason for this is to guarantee that a command has
1181             // been processed before moving on to the next command. This helps
1182             // make the fuzzing easier to work with by being able to implicitly
1183             // assume that a command has been processed by the time something
1184             // else is. Otherwise it might be possible that wasmtime has a set
1185             // of commands/callbacks that are all delivered at the same time and
1186             // the component model doesn't specify what order they happen
1187             // within. By forcing an "ack" it ensures a more expected ordering
1188             // of execution to assist with fuzzing without losing really all
1189             // that much coverage.
1190             if matches!(command, Command::Ack) {
1191                 data.commands.pop();
1192             } else {
1193                 *command = Command::Ack;
1194             }
1195 
1196             // After a command was popped other streams may be able to make
1197             // progress so wake them all up.
1198             for (_, waker) in data.wakers.drain() {
1199                 waker.wake();
1200             }
1201             log::debug!("Delivering command {ret:?} for {:?}", self.0);
1202             return Poll::Ready((ret, StreamResult::Completed));
1203         }
1204 
1205         // The command queue is non-empty and the next command isn't meant for
1206         // us, so someone else needs to drain the queue. Enqueue our waker.
1207         if finish {
1208             Poll::Ready((None, StreamResult::Cancelled))
1209         } else {
1210             data.wakers.insert(self.0, cx.waker().clone());
1211             Poll::Pending
1212         }
1213     }
1214 }
1215 
1216 impl StreamProducer<Data> for SharedStream {
1217     type Item = Command;
1218     type Buffer = Option<Command>;
1219 
1220     fn poll_produce<'a>(
1221         mut self: Pin<&mut Self>,
1222         cx: &mut Context<'_>,
1223         store: StoreContextMut<'a, Data>,
1224         mut destination: Destination<'a, Self::Item, Self::Buffer>,
1225         finish: bool,
1226     ) -> Poll<Result<StreamResult>> {
1227         let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1228         destination.set_buffer(item);
1229         Poll::Ready(Ok(result))
1230     }
1231 }
1232 
1233 #[cfg(test)]
1234 mod tests {
1235     use super::{ComponentAsync, Scope, init, run};
1236     use crate::oracles::component_async::types::*;
1237     use crate::test::test_n_times;
1238     use Scope::*;
1239 
1240     #[test]
1241     fn smoke() {
1242         init();
1243 
1244         test_n_times(50, |c, _| {
1245             run(c);
1246             Ok(())
1247         });
1248     }
1249 
1250     // ========================================================================
1251     // A series of fuzz-generated test cases which caused problems during the
1252     // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1253     // changes over time.
1254 
1255     #[test]
1256     fn simple() {
1257         init();
1258 
1259         run(ComponentAsync {
1260             commands: vec![
1261                 (GuestCaller, Command::AsyncPendingImportCall(0)),
1262                 (GuestCallee, Command::AsyncPendingImportCall(1)),
1263                 (GuestCallee, Command::AsyncPendingExportComplete(0)),
1264                 (GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1265                 (GuestCaller, Command::AsyncPendingImportCall(2)),
1266             ],
1267         });
1268     }
1269 
1270     #[test]
1271     fn somewhat_larger() {
1272         static COMMANDS: &[(Scope, Command)] = &[
1273             (GuestCallee, Command::FutureNew(0)),
1274             (HostCaller, Command::FutureNew(1)),
1275             (GuestCallee, Command::FutureReadPending(0)),
1276             (GuestCaller, Command::AsyncPendingImportCall(2)),
1277             (GuestCaller, Command::AsyncPendingImportCall(3)),
1278             (GuestCaller, Command::AsyncPendingImportCall(4)),
1279             (GuestCaller, Command::AsyncPendingImportCall(5)),
1280             (GuestCallee, Command::AsyncPendingExportComplete(5)),
1281             (GuestCallee, Command::AsyncPendingExportComplete(3)),
1282             (GuestCallee, Command::AsyncPendingExportComplete(4)),
1283             (GuestCallee, Command::AsyncPendingExportComplete(2)),
1284             (GuestCaller, Command::AsyncPendingImportCall(6)),
1285             (GuestCallee, Command::AsyncPendingExportComplete(6)),
1286             (GuestCaller, Command::AsyncPendingImportCall(7)),
1287             (GuestCallee, Command::AsyncPendingExportComplete(7)),
1288             (GuestCaller, Command::AsyncPendingImportCall(8)),
1289             (GuestCallee, Command::AsyncPendingExportComplete(8)),
1290             (GuestCaller, Command::AsyncPendingImportCall(9)),
1291             (GuestCallee, Command::AsyncPendingExportComplete(9)),
1292             (GuestCaller, Command::AsyncPendingImportCall(10)),
1293             (GuestCallee, Command::AsyncPendingExportComplete(10)),
1294             (GuestCaller, Command::AsyncPendingImportCall(11)),
1295             (GuestCallee, Command::AsyncPendingExportComplete(11)),
1296             (GuestCaller, Command::AsyncPendingImportCall(12)),
1297             (GuestCallee, Command::AsyncPendingExportComplete(12)),
1298             (GuestCaller, Command::AsyncPendingImportCall(13)),
1299             (GuestCallee, Command::AsyncPendingExportComplete(13)),
1300             (GuestCaller, Command::AsyncPendingImportCall(14)),
1301             (GuestCallee, Command::AsyncPendingExportComplete(14)),
1302             (GuestCaller, Command::AsyncPendingImportCall(15)),
1303             (GuestCallee, Command::AsyncPendingExportComplete(15)),
1304             (GuestCaller, Command::AsyncPendingImportCall(16)),
1305             (GuestCallee, Command::AsyncPendingExportComplete(16)),
1306             (GuestCaller, Command::AsyncPendingImportCall(17)),
1307             (GuestCallee, Command::AsyncPendingExportComplete(17)),
1308             (GuestCaller, Command::AsyncPendingImportCall(18)),
1309             (GuestCallee, Command::AsyncPendingExportComplete(18)),
1310             (GuestCaller, Command::AsyncPendingImportCall(19)),
1311             (GuestCallee, Command::AsyncPendingExportComplete(19)),
1312             (GuestCaller, Command::AsyncPendingImportCall(20)),
1313             (GuestCallee, Command::AsyncPendingExportComplete(20)),
1314             (GuestCaller, Command::AsyncPendingImportCall(21)),
1315             (GuestCallee, Command::AsyncPendingExportComplete(21)),
1316             (GuestCaller, Command::AsyncPendingImportCall(22)),
1317             (GuestCallee, Command::AsyncPendingExportComplete(22)),
1318             (GuestCaller, Command::AsyncPendingImportCall(23)),
1319             (GuestCallee, Command::AsyncPendingExportComplete(23)),
1320             (GuestCaller, Command::AsyncPendingImportCall(24)),
1321             (GuestCallee, Command::AsyncPendingExportComplete(24)),
1322             (GuestCaller, Command::AsyncPendingImportCall(25)),
1323             (GuestCallee, Command::AsyncPendingExportComplete(25)),
1324             (GuestCaller, Command::AsyncPendingImportCall(26)),
1325             (GuestCallee, Command::AsyncPendingExportComplete(26)),
1326             (GuestCaller, Command::AsyncPendingImportCall(27)),
1327             (GuestCallee, Command::AsyncPendingExportComplete(27)),
1328             (GuestCaller, Command::AsyncPendingImportCall(28)),
1329             (GuestCallee, Command::AsyncPendingExportComplete(28)),
1330             (GuestCaller, Command::AsyncPendingImportCall(29)),
1331             (GuestCallee, Command::AsyncPendingExportComplete(29)),
1332             (GuestCaller, Command::AsyncPendingImportCall(30)),
1333             (GuestCallee, Command::AsyncPendingExportComplete(30)),
1334             (GuestCaller, Command::AsyncPendingImportCall(31)),
1335             (GuestCallee, Command::AsyncPendingExportComplete(31)),
1336             (GuestCaller, Command::AsyncPendingImportCall(32)),
1337             (GuestCallee, Command::AsyncPendingExportComplete(32)),
1338             (GuestCaller, Command::AsyncPendingImportCall(33)),
1339             (GuestCallee, Command::AsyncPendingExportComplete(33)),
1340             (GuestCaller, Command::AsyncPendingImportCall(34)),
1341             (GuestCallee, Command::AsyncPendingExportComplete(34)),
1342             (GuestCaller, Command::AsyncPendingImportCall(35)),
1343             (GuestCallee, Command::AsyncPendingExportComplete(35)),
1344             (GuestCaller, Command::AsyncPendingImportCall(36)),
1345             (GuestCallee, Command::AsyncPendingExportComplete(36)),
1346             (GuestCaller, Command::AsyncPendingImportCall(37)),
1347             (GuestCallee, Command::AsyncPendingExportComplete(37)),
1348             (GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1349         ];
1350         init();
1351 
1352         run(ComponentAsync {
1353             commands: COMMANDS.to_vec(),
1354         });
1355     }
1356 
1357     #[test]
1358     fn simple_stream1() {
1359         init();
1360 
1361         run(ComponentAsync {
1362             commands: vec![
1363                 (HostCallee, Command::StreamNew(1)),
1364                 (
1365                     HostCallee,
1366                     Command::StreamReadPending(StreamReadPayload {
1367                         stream: 1,
1368                         count: 2,
1369                     }),
1370                 ),
1371                 (HostCallee, Command::StreamCancelRead(1)),
1372                 (GuestCaller, Command::SyncReadyCall),
1373                 (
1374                     HostCallee,
1375                     Command::StreamWritePending(StreamWritePayload {
1376                         stream: 1,
1377                         item: 3,
1378                         count: 2,
1379                     }),
1380                 ),
1381                 (HostCallee, Command::StreamCancelWrite(1)),
1382                 (HostCallee, Command::StreamDropWritable(1)),
1383                 (
1384                     HostCallee,
1385                     Command::StreamReadDropped(StreamReadPayload {
1386                         stream: 1,
1387                         count: 1,
1388                     }),
1389                 ),
1390             ],
1391         });
1392     }
1393 
1394     #[test]
1395     fn simple_stream3() {
1396         init();
1397 
1398         run(ComponentAsync {
1399             commands: vec![
1400                 (GuestCaller, Command::StreamNew(26)),
1401                 (
1402                     GuestCaller,
1403                     Command::StreamReadPending(StreamReadPayload {
1404                         stream: 26,
1405                         count: 10,
1406                     }),
1407                 ),
1408                 (GuestCaller, Command::StreamDropWritable(26)),
1409                 (GuestCaller, Command::StreamReadAssertDropped(26)),
1410             ],
1411         });
1412     }
1413 
1414     #[test]
1415     fn simple_stream4() {
1416         init();
1417 
1418         run(ComponentAsync {
1419             commands: vec![
1420                 (GuestCaller, Command::StreamNew(23)),
1421                 (
1422                     GuestCaller,
1423                     Command::StreamWritePending(StreamWritePayload {
1424                         stream: 23,
1425                         item: 24,
1426                         count: 2,
1427                     }),
1428                 ),
1429                 (GuestCaller, Command::StreamGive(23)),
1430                 (GuestCallee, Command::StreamDropReadable(23)),
1431                 (
1432                     GuestCaller,
1433                     Command::StreamWriteAssertDropped(StreamReadPayload {
1434                         stream: 23,
1435                         count: 0,
1436                     }),
1437                 ),
1438             ],
1439         });
1440     }
1441 
1442     #[test]
1443     fn zero_length_behavior() {
1444         init();
1445 
1446         run(ComponentAsync {
1447             commands: vec![
1448                 (GuestCaller, Command::StreamNew(10)),
1449                 (HostCaller, Command::StreamTake(10)),
1450                 (
1451                     GuestCaller,
1452                     Command::StreamWritePending(StreamWritePayload {
1453                         stream: 10,
1454                         item: 13,
1455                         count: 5,
1456                     }),
1457                 ),
1458                 (
1459                     HostCaller,
1460                     Command::StreamReadReady(StreamReadyPayload {
1461                         stream: 10,
1462                         item: 0,
1463                         op_count: 0,
1464                         ready_count: 0,
1465                     }),
1466                 ),
1467                 (
1468                     HostCaller,
1469                     Command::StreamReadReady(StreamReadyPayload {
1470                         stream: 10,
1471                         item: 0,
1472                         op_count: 0,
1473                         ready_count: 0,
1474                     }),
1475                 ),
1476             ],
1477         });
1478     }
1479 }
1480