1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2 
3 use crate::block_on;
4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8 use futures::channel::oneshot;
9 use std::collections::{HashMap, HashSet};
10 use std::mem;
11 use std::pin::Pin;
12 use std::sync::{Arc, OnceLock, Weak};
13 use std::task::{Context, Poll, Waker};
14 use std::time::Instant;
15 use wasmtime::component::{
16     Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17     FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18     StreamReader, StreamResult, VecBuffer,
19 };
20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22 
23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24 
25 /// Initializes state for future fuzz runs.
26 ///
27 /// This will create an `Engine` to run this fuzzer within and it will
28 /// additionally precompile the component that will be used for fuzzing.
29 ///
30 /// There are a few points of note about this:
31 ///
32 /// * The `misc` fuzzer is manually instrumented with this function as the init
33 ///   hook to ensure this runs before any other fuzzing.
34 ///
35 /// * Compilation of the component takes quite some time with
36 ///   fuzzing-instrumented Cranelift. To assist with local development this
37 ///   implements a cache which is serialized/deserialized via an env var.
38 pub fn init() {
39     crate::init_fuzzing();
40 
41     STATE.get_or_init(|| {
42         let mut config = Config::new();
43         config.wasm_component_model_async(true);
44         let engine = Engine::new(&config).unwrap();
45         let component = compile(&engine);
46         let mut linker = Linker::new(&engine);
47         wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
48         async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
49         types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50 
51         let pre = linker.instantiate_pre(&component).unwrap();
52         let pre = FuzzAsyncPre::new(pre).unwrap();
53 
54         (engine, pre)
55     });
56 
57     fn compile(engine: &Engine) -> Component {
58         let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
59         let wasm = test_programs_artifacts::fuzz_async_component_bytes!();
60         let wasm = &wasm[..];
61         let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
62         if let Some(path) = &cwasm_cache
63             && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
64             && let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified())
65             && cwasm_mtime > wasm_mtime
66         {
67             log::debug!("Using cached component async cwasm at {path}");
68             unsafe {
69                 return Component::deserialize_file(engine, path).unwrap();
70             }
71         }
72 
73         let composition = {
74             let mut config = wasm_compose::config::Config::default();
75             let tempdir = tempfile::TempDir::new().unwrap();
76             let path = tempdir.path().join("fuzz-async.wasm");
77             std::fs::write(&path, wasm).unwrap();
78             config.definitions.push(path.clone());
79 
80             wasm_compose::composer::ComponentComposer::new(&path, &config)
81                 .compose()
82                 .unwrap()
83         };
84         let start = Instant::now();
85         let component = Component::new(&engine, &composition).unwrap();
86         if let Some(path) = cwasm_cache {
87             log::debug!("Caching component async cwasm to {path}");
88             std::fs::write(path, &component.serialize().unwrap()).unwrap();
89         } else if start.elapsed() > std::time::Duration::from_secs(1) {
90             eprintln!(
91                 "
92 !!!!!!!!!!!!!!!!!!!!!!!!!!
93 
94 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
95 cache compilation results
96 
97 !!!!!!!!!!!!!!!!!!!!!!!!!!
98 "
99             );
100         }
101         return component;
102     }
103 }
104 
105 #[derive(Default)]
106 struct Data {
107     ctx: WasiCtx,
108     table: ResourceTable,
109     wakers: HashMap<Scope, Waker>,
110     commands: Vec<(Scope, Command)>,
111 
112     guest_caller_stream: Option<StreamReader<Command>>,
113     guest_callee_stream: Option<StreamReader<Command>>,
114 
115     host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
116     host_pending_async_calls_cancelled: HashSet<u32>,
117     guest_pending_async_calls_ready: HashSet<u32>,
118 
119     // State of futures/streams. Note that while #12091 is unresolved an
120     // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
121     // and the various halves we're interacting with using traits.
122     host_futures: HashMap<u32, FutureReader<u32>>,
123     host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
124     host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
125     host_streams: HashMap<u32, StreamReader<u32>>,
126     host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
127     host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
128 }
129 
130 impl WasiView for Data {
131     fn ctx(&mut self) -> WasiCtxView<'_> {
132         WasiCtxView {
133             ctx: &mut self.ctx,
134             table: &mut self.table,
135         }
136     }
137 }
138 
139 impl async_test::HostWithStore for HasSelf<Data> {
140     async fn async_ready<T>(_store: &Accessor<T, Self>) {}
141 
142     async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
143         let (tx, rx) = oneshot::channel();
144         store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
145         let record = RecordCancelOnDrop { store, id };
146         rx.await.unwrap();
147         mem::forget(record);
148 
149         struct RecordCancelOnDrop<'a, T: 'static> {
150             store: &'a Accessor<T, HasSelf<Data>>,
151             id: u32,
152         }
153 
154         impl<T> Drop for RecordCancelOnDrop<'_, T> {
155             fn drop(&mut self) {
156                 self.store.with(|mut s| {
157                     s.get().host_pending_async_calls_cancelled.insert(self.id);
158                 });
159             }
160         }
161     }
162 
163     async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
164 }
165 
166 impl async_test::Host for Data {
167     fn sync_ready(&mut self) {}
168 
169     fn future_take(&mut self, id: u32) -> FutureReader<u32> {
170         self.host_futures.remove(&id).unwrap()
171     }
172 
173     fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
174         let prev = self.host_futures.insert(id, future);
175         assert!(prev.is_none());
176     }
177 
178     fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
179         self.host_streams.remove(&id).unwrap()
180     }
181 
182     fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
183         let prev = self.host_streams.insert(id, stream);
184         assert!(prev.is_none());
185     }
186 }
187 
188 impl types::HostWithStore for HasSelf<Data> {
189     fn get_commands<T>(
190         mut store: Access<'_, T, Self>,
191         scope: types::Scope,
192     ) -> StreamReader<Command> {
193         let data = store.get();
194         match scope {
195             types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
196             types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
197         }
198     }
199 }
200 
201 impl types::Host for Data {}
202 
203 /// Executes the `input` provided, assuming that `init` has been previously
204 /// executed.
205 pub fn run(mut input: ComponentAsync) {
206     log::debug!("Running component async fuzz test with\n{input:?}");
207 
208     // Commands are executed in the order that they're listed in the input, but
209     // to make it easier on the `StreamProducer` implementation below they're
210     // popped off the back. To ensure that they're all delivered in the right
211     // order reverse the list to ensure the correct order is maintained.
212     input.commands.reverse();
213 
214     let (engine, pre) = STATE.get().unwrap();
215     let mut store = Store::new(
216         engine,
217         Data {
218             ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
219             commands: input.commands,
220             ..Data::default()
221         },
222     );
223 
224     let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller));
225     let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee));
226     store.data_mut().guest_caller_stream = Some(guest_caller_stream);
227     store.data_mut().guest_callee_stream = Some(guest_callee_stream);
228     block_on(async {
229         let instance = pre.instantiate_async(&mut store).await.unwrap();
230         let test = instance.wasmtime_fuzz_fuzz_async_test();
231 
232         let mut host_caller = SharedStream(Scope::HostCaller);
233         let mut host_callee = SharedStream(Scope::HostCallee);
234         store
235             .run_concurrent(async |store| {
236                 // Kick off stream reads in the guest. This function will return
237                 // but the tasks in the guest will keep running after they
238                 // return to process stream items.
239                 test.call_init(store, types::Scope::Caller).await.unwrap();
240 
241                 // Simultaneously process commands from both host streams. These
242                 // will return once the entire command queue is exhausted.
243                 futures::join!(
244                     async {
245                         while let Some(cmd) = host_caller.next(store).await {
246                             host_caller_cmd(&test, store, cmd).await;
247                         }
248                     },
249                     async {
250                         while let Some(cmd) = host_callee.next(store).await {
251                             host_callee_cmd(store, cmd).await;
252                         }
253                     },
254                 );
255 
256                 // Note that there may still be pending async work in the guest
257                 // (or host). It's intentional that it's not cleaned up here to
258                 // help test situations where async work is all abruptly
259                 // cancelled by just being dropped in the host.
260             })
261             .await
262             .unwrap();
263     });
264 }
265 
266 /// See documentation in `fuzz_async.rs` for what's going on here.
267 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
268 where
269     F: FnMut(&mut Data) -> bool,
270 {
271     for _ in 0..1000 {
272         let ready = store.with(|mut s| f(s.get()));
273         if ready {
274             return true;
275         }
276 
277         crate::YieldN(1).await;
278     }
279 
280     return false;
281 }
282 
283 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
284 where
285     F: FnMut(&mut Data) -> bool,
286 {
287     assert!(
288         test_property(store, f).await,
289         "timed out waiting for {desc}",
290     );
291 }
292 
293 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
294     match cmd {
295         Command::Ack => {}
296         Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
297         Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
298         Command::AsyncPendingExportComplete(_i) => todo!(),
299         Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
300         Command::AsyncPendingImportCall(i) => {
301             struct RunPendingImport {
302                 test: Guest,
303                 i: u32,
304             }
305 
306             store.spawn(RunPendingImport {
307                 test: test.clone(),
308                 i,
309             });
310 
311             impl AccessorTask<Data> for RunPendingImport {
312                 async fn run(self, store: &Accessor<Data>) -> Result<()> {
313                     self.test.call_async_pending(store, self.i).await?;
314                     store.with(|mut s| {
315                         s.get().guest_pending_async_calls_ready.insert(self.i);
316                     });
317                     Ok(())
318                 }
319             }
320         }
321         Command::AsyncPendingImportCancel(_i) => todo!(),
322         Command::AsyncPendingImportAssertReady(i) => {
323             assert!(
324                 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
325                 "expected async_pending import {i} to be ready",
326             );
327         }
328 
329         Command::FutureTake(i) => {
330             let future = test.call_future_take(store, i).await.unwrap();
331             store.with(|mut s| {
332                 let prev = s.get().host_futures.insert(i, future);
333                 assert!(prev.is_none());
334             });
335         }
336         Command::FutureGive(i) => {
337             let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
338             test.call_future_receive(store, i, future).await.unwrap();
339         }
340         Command::StreamTake(i) => {
341             let stream = test.call_stream_take(store, i).await.unwrap();
342             store.with(|mut s| {
343                 let prev = s.get().host_streams.insert(i, stream);
344                 assert!(prev.is_none());
345             });
346         }
347         Command::StreamGive(i) => {
348             let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
349             test.call_stream_receive(store, i, stream).await.unwrap();
350         }
351 
352         other => future_or_stream_cmd(store, other).await,
353     }
354 }
355 
356 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
357     match cmd {
358         Command::Ack => {}
359         Command::SyncReadyCall => todo!(),
360         Command::AsyncReadyCall => todo!(),
361         Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
362             s.get()
363                 .host_pending_async_calls
364                 .remove(&i)
365                 .unwrap()
366                 .send(())
367                 .unwrap();
368         }),
369         Command::AsyncPendingExportAssertCancelled(i) => {
370             assert!(
371                 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
372                 "expected async_pending export {i} to be cancelled",
373             );
374         }
375         Command::AsyncPendingImportCall(_i) => todo!(),
376         Command::AsyncPendingImportCancel(_i) => todo!(),
377         Command::AsyncPendingImportAssertReady(_i) => todo!(),
378 
379         other => future_or_stream_cmd(store, other).await,
380     }
381 }
382 
383 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
384     match cmd {
385         // These commands should be handled above
386         Command::Ack
387         | Command::SyncReadyCall
388         | Command::AsyncReadyCall
389         | Command::AsyncPendingExportComplete(_)
390         | Command::AsyncPendingExportAssertCancelled(_)
391         | Command::AsyncPendingImportCall(_)
392         | Command::AsyncPendingImportCancel(_)
393         | Command::FutureTake(_)
394         | Command::FutureGive(_)
395         | Command::StreamTake(_)
396         | Command::StreamGive(_)
397         | Command::AsyncPendingImportAssertReady(_) => unreachable!(),
398 
399         Command::FutureNew(id) => {
400             store.with(|mut s| {
401                 let arc = Arc::new(());
402                 let weak = Arc::downgrade(&arc);
403                 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc));
404                 let data = s.get();
405                 let prev = data.host_futures.insert(id, future);
406                 assert!(prev.is_none());
407                 let prev = data
408                     .host_future_producers
409                     .insert(id, (HostFutureProducerState::Idle, weak));
410                 assert!(prev.is_none());
411             });
412         }
413         Command::FutureDropReadable(id) => {
414             store.with(|mut s| match s.get().host_futures.remove(&id) {
415                 Some(mut future) => future.close(&mut s),
416                 None => {
417                     let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
418                     state.wake_by_ref();
419                 }
420             })
421         }
422         Command::FutureWriteReady(payload) => {
423             await_property(store, "future write should be waiting", |s| {
424                 matches!(
425                     s.host_future_producers.get(&payload.future),
426                     Some((HostFutureProducerState::Waiting(_), _))
427                 )
428             })
429             .await;
430             store.with(|mut s| {
431                 let state = s
432                     .get()
433                     .host_future_producers
434                     .get_mut(&payload.future)
435                     .unwrap();
436                 match state {
437                     (HostFutureProducerState::Waiting(waker), _) => {
438                         waker.wake_by_ref();
439                         state.0 = HostFutureProducerState::Writing(payload.item);
440                     }
441                     (state, _) => panic!("future not waiting: {state:?}"),
442                 }
443             })
444         }
445         Command::FutureWritePending(payload) => store.with(|mut s| {
446             let state = s
447                 .get()
448                 .host_future_producers
449                 .get_mut(&payload.future)
450                 .unwrap();
451             match state {
452                 (HostFutureProducerState::Idle, _) => {
453                     state.0 = HostFutureProducerState::Writing(payload.item);
454                 }
455                 _ => panic!("future not idle"),
456             }
457         }),
458         Command::FutureWriteDropped(id) => store.with(|mut s| {
459             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
460             assert!(matches!(state, HostFutureProducerState::Idle));
461             assert!(weak.upgrade().is_none());
462         }),
463         Command::FutureReadReady(payload) => {
464             let id = payload.future;
465             store.with(|mut s| {
466                 let arc = Arc::new(());
467                 let weak = Arc::downgrade(&arc);
468                 let data = s.get();
469                 let future = data.host_futures.remove(&id).unwrap();
470                 let prev = data
471                     .host_future_consumers
472                     .insert(id, (HostFutureConsumerState::Consuming, weak));
473                 assert!(prev.is_none());
474                 future.pipe(&mut s, HostFutureConsumer(id, arc));
475             });
476 
477             await_property(store, "future should be present", |s| {
478                 matches!(
479                     s.host_future_consumers[&id],
480                     (HostFutureConsumerState::Complete(_), _)
481                 )
482             })
483             .await;
484 
485             store.with(|mut s| {
486                 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
487                 match state {
488                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
489                     _ => panic!("future not complete"),
490                 }
491             });
492         }
493         Command::FutureReadPending(id) => {
494             ensure_future_reading(store, id);
495             store.with(|mut s| {
496                 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
497                 state.wake_by_ref();
498                 assert!(
499                     matches!(state, HostFutureConsumerState::Idle),
500                     "bad state: {state:?}",
501                 );
502                 *state = HostFutureConsumerState::Consuming;
503             })
504         }
505         Command::FutureCancelWrite(id) => store.with(|mut s| {
506             let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
507             assert!(matches!(state, HostFutureProducerState::Writing(_)));
508             *state = HostFutureProducerState::Idle;
509         }),
510         Command::FutureCancelRead(id) => store.with(|mut s| {
511             let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
512             assert!(matches!(state, HostFutureConsumerState::Consuming));
513             *state = HostFutureConsumerState::Idle;
514         }),
515         Command::FutureReadAssertComplete(payload) => {
516             await_property(store, "future read should be complete", |s| {
517                 matches!(
518                     s.host_future_consumers.get(&payload.future),
519                     Some((HostFutureConsumerState::Complete(_), _))
520                 )
521             })
522             .await;
523             store.with(|mut s| {
524                 let (state, _) = s
525                     .get()
526                     .host_future_consumers
527                     .remove(&payload.future)
528                     .unwrap();
529                 match state {
530                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
531                     _ => panic!("future not complete"),
532                 }
533             })
534         }
535         Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
536             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
537             assert!(matches!(state, HostFutureProducerState::Complete));
538             assert!(weak.upgrade().is_none());
539         }),
540         Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
541             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
542             assert!(matches!(state, HostFutureProducerState::Writing(_)));
543             assert!(weak.upgrade().is_none());
544         }),
545 
546         Command::StreamNew(id) => {
547             store.with(|mut s| {
548                 let arc = Arc::new(());
549                 let weak = Arc::downgrade(&arc);
550                 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc));
551                 let data = s.get();
552                 let prev = data.host_streams.insert(id, stream);
553                 assert!(prev.is_none());
554                 let prev = data
555                     .host_stream_producers
556                     .insert(id, (HostStreamProducerState::idle(), weak));
557                 assert!(prev.is_none());
558             });
559         }
560         Command::StreamDropReadable(id) => {
561             store.with(|mut s| match s.get().host_streams.remove(&id) {
562                 Some(mut stream) => {
563                     stream.close(&mut s);
564                 }
565                 None => {
566                     let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
567                     state.wake_by_ref();
568                 }
569             })
570         }
571         Command::StreamDropWritable(id) => store.with(|mut s| {
572             let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
573             state.wake_by_ref();
574         }),
575         Command::StreamWriteReady(payload) => {
576             let id = payload.stream;
577             store.with(|mut s| {
578                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
579                 state.wake_by_ref();
580                 match state.kind {
581                     HostStreamProducerStateKind::Idle => {
582                         state.kind = HostStreamProducerStateKind::Writing(stream_payload(
583                             payload.item,
584                             payload.op_count,
585                         ));
586                     }
587                     _ => panic!("stream not idle: {state:?}"),
588                 }
589             });
590             await_property(store, "stream should complete a write", |s| {
591                 matches!(
592                     s.host_stream_producers[&id].0.kind,
593                     HostStreamProducerStateKind::Wrote(_),
594                 )
595             })
596             .await;
597             store.with(|mut s| {
598                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
599                 match state.kind {
600                     HostStreamProducerStateKind::Wrote(amt) => {
601                         assert_eq!(amt, payload.ready_count);
602                         state.kind = HostStreamProducerStateKind::Idle;
603                     }
604                     _ => panic!("stream not idle: {state:?}"),
605                 }
606             });
607         }
608         Command::StreamReadReady(payload) => {
609             let id = payload.stream;
610             ensure_stream_reading(store, id);
611             store.with(|mut s| {
612                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
613                 state.wake_by_ref();
614                 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
615             });
616             await_property(store, "stream should complete a read", |s| {
617                 matches!(
618                     s.host_stream_consumers[&id].0.kind,
619                     HostStreamConsumerStateKind::Consumed(_),
620                 )
621             })
622             .await;
623 
624             store.with(|mut s| {
625                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
626                 match &state.kind {
627                     HostStreamConsumerStateKind::Consumed(last_read) => {
628                         assert_eq!(
629                             *last_read,
630                             stream_payload(payload.item, payload.ready_count)
631                         );
632                         state.kind = HostStreamConsumerStateKind::Idle;
633                     }
634                     _ => panic!("future not complete"),
635                 }
636             });
637         }
638         Command::StreamWritePending(payload) => store.with(|mut s| {
639             let (state, _) = s
640                 .get()
641                 .host_stream_producers
642                 .get_mut(&payload.stream)
643                 .unwrap();
644             state.wake_by_ref();
645             match state.kind {
646                 HostStreamProducerStateKind::Idle => {
647                     state.kind = HostStreamProducerStateKind::Writing(stream_payload(
648                         payload.item,
649                         payload.count,
650                     ));
651                 }
652                 _ => panic!("stream not idle {:?}", state.kind),
653             }
654         }),
655         Command::StreamReadPending(payload) => {
656             ensure_stream_reading(store, payload.stream);
657             store.with(|mut s| {
658                 let (state, _) = s
659                     .get()
660                     .host_stream_consumers
661                     .get_mut(&payload.stream)
662                     .unwrap();
663                 state.wake_by_ref();
664                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
665                 state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
666             })
667         }
668         Command::StreamWriteDropped(payload) => store.with(|mut s| {
669             let (state, weak) = s
670                 .get()
671                 .host_stream_producers
672                 .get_mut(&payload.stream)
673                 .unwrap();
674             assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
675             assert!(weak.upgrade().is_none());
676         }),
677         Command::StreamReadDropped(payload) => {
678             ensure_stream_reading(store, payload.stream);
679             await_property(store, "stream read should get dropped", |s| {
680                 let weak = &s.host_stream_consumers[&payload.stream].1;
681                 weak.upgrade().is_none()
682             })
683             .await;
684             store.with(|mut s| {
685                 let (state, weak) = s
686                     .get()
687                     .host_stream_consumers
688                     .get_mut(&payload.stream)
689                     .unwrap();
690                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
691                 assert!(weak.upgrade().is_none());
692             })
693         }
694         Command::StreamCancelWrite(id) => store.with(|mut s| {
695             let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
696             assert!(
697                 matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
698                 "invalid state {state:?}",
699             );
700             state.kind = HostStreamProducerStateKind::Idle;
701             state.wake_by_ref();
702         }),
703         Command::StreamCancelRead(id) => store.with(|mut s| {
704             let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
705             assert!(matches!(
706                 state.kind,
707                 HostStreamConsumerStateKind::Consuming(_)
708             ));
709             state.kind = HostStreamConsumerStateKind::Idle;
710         }),
711         Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
712             let (state, _) = s
713                 .get()
714                 .host_stream_consumers
715                 .get_mut(&payload.stream)
716                 .unwrap();
717             match &state.kind {
718                 HostStreamConsumerStateKind::Consumed(last_read) => {
719                     assert_eq!(*last_read, stream_payload(payload.item, payload.count));
720                     state.kind = HostStreamConsumerStateKind::Idle;
721                 }
722                 _ => panic!("stream not complete"),
723             }
724         }),
725         Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
726             let (state, _) = s
727                 .get()
728                 .host_stream_producers
729                 .get_mut(&payload.stream)
730                 .unwrap();
731             match state.kind {
732                 HostStreamProducerStateKind::Wrote(amt) => {
733                     assert_eq!(amt, payload.count);
734                     state.kind = HostStreamProducerStateKind::Idle;
735                 }
736                 _ => panic!("stream not complete: {:?}", state.kind),
737             }
738         }),
739         Command::StreamWriteAssertDropped(payload) => {
740             await_property(store, "stream write should be dropped", |s| {
741                 let weak = &s.host_stream_producers[&payload.stream].1;
742                 weak.upgrade().is_none()
743             })
744             .await;
745             store.with(|mut s| {
746                 let (state, weak) = s
747                     .get()
748                     .host_stream_producers
749                     .get_mut(&payload.stream)
750                     .unwrap();
751                 assert!(matches!(
752                     state.kind,
753                     HostStreamProducerStateKind::Writing(_)
754                 ));
755                 assert!(weak.upgrade().is_none());
756             })
757         }
758         Command::StreamReadAssertDropped(id) => {
759             await_property(store, "stream read should be dropped", |s| {
760                 let weak = &s.host_stream_consumers[&id].1;
761                 weak.upgrade().is_none()
762             })
763             .await;
764             store.with(|mut s| {
765                 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
766                 assert!(matches!(
767                     state.kind,
768                     HostStreamConsumerStateKind::Consuming(_),
769                 ));
770                 assert!(weak.upgrade().is_none());
771             })
772         }
773     }
774 }
775 
776 fn stream_payload(item: u32, count: u32) -> Vec<u32> {
777     (item..item + count).collect()
778 }
779 
780 fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
781     store.with(|mut s| {
782         let data = s.get();
783         if !data.host_futures.contains_key(&id) {
784             return;
785         }
786         log::debug!("future consume: start {id}");
787         let arc = Arc::new(());
788         let weak = Arc::downgrade(&arc);
789         let data = s.get();
790         let future = data.host_futures.remove(&id).unwrap();
791         let prev = data
792             .host_future_consumers
793             .insert(id, (HostFutureConsumerState::Idle, weak));
794         assert!(prev.is_none());
795         future.pipe(&mut s, HostFutureConsumer(id, arc));
796     });
797 }
798 
799 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
800     store.with(|mut s| {
801         let data = s.get();
802         if !data.host_streams.contains_key(&id) {
803             return;
804         }
805         log::debug!("stream consume: start {id}");
806         let arc = Arc::new(());
807         let weak = Arc::downgrade(&arc);
808         let prev = data.host_stream_consumers.insert(
809             id,
810             (
811                 HostStreamConsumerState {
812                     kind: HostStreamConsumerStateKind::Idle,
813                     waker: None,
814                 },
815                 weak,
816             ),
817         );
818         assert!(prev.is_none());
819         let stream = data.host_streams.remove(&id).unwrap();
820         stream.pipe(&mut s, HostStreamConsumer(id, arc));
821     });
822 }
823 
824 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
825 
826 /// Note that this is only created once a read is actually initiated on a
827 /// future. It's also not possible to cancel a host-based read on a future,
828 /// hence why this is simpler than the `HostFutureProducerState` state below.
829 #[derive(Debug)]
830 enum HostFutureConsumerState {
831     Idle,
832     Waiting(Waker),
833     Consuming,
834     Complete(u32),
835 }
836 
837 impl HostFutureConsumerState {
838     fn wake_by_ref(&mut self) {
839         if let HostFutureConsumerState::Waiting(waker) = &self {
840             waker.wake_by_ref();
841             *self = HostFutureConsumerState::Idle;
842         }
843     }
844 }
845 
846 impl FutureConsumer<Data> for HostFutureConsumer {
847     type Item = u32;
848 
849     fn poll_consume(
850         self: Pin<&mut Self>,
851         cx: &mut Context<'_>,
852         mut store: StoreContextMut<'_, Data>,
853         mut source: Source<'_, Self::Item>,
854         finish: bool,
855     ) -> Poll<Result<()>> {
856         let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
857             Some(state) => state,
858             None => {
859                 log::debug!("consume: closed {}", self.0);
860                 return Poll::Ready(Ok(()));
861             }
862         };
863         match state.0 {
864             HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
865                 if finish {
866                     log::debug!("consume: cancel {}", self.0);
867                     state.0 = HostFutureConsumerState::Idle;
868                     Poll::Ready(Ok(()))
869                 } else {
870                     log::debug!("consume: wait {}", self.0);
871                     state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
872                     Poll::Pending
873                 }
874             }
875             HostFutureConsumerState::Consuming => {
876                 log::debug!("consume: done {}", self.0);
877                 let mut item = None;
878                 source.read(&mut store, &mut item).unwrap();
879                 store
880                     .data_mut()
881                     .host_future_consumers
882                     .get_mut(&self.0)
883                     .unwrap()
884                     .0 = HostFutureConsumerState::Complete(item.unwrap());
885                 Poll::Ready(Ok(()))
886             }
887             HostFutureConsumerState::Complete(_) => unreachable!(),
888         }
889     }
890 }
891 
892 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
893 
894 #[derive(Debug)]
895 enum HostFutureProducerState {
896     Idle,
897     Waiting(Waker),
898     Writing(u32),
899     Complete,
900 }
901 
902 impl FutureProducer<Data> for HostFutureProducer {
903     type Item = u32;
904 
905     fn poll_produce(
906         self: Pin<&mut Self>,
907         cx: &mut Context<'_>,
908         mut store: StoreContextMut<'_, Data>,
909         finish: bool,
910     ) -> Poll<Result<Option<Self::Item>>> {
911         let state = store
912             .data_mut()
913             .host_future_producers
914             .get_mut(&self.0)
915             .unwrap();
916         match state.0 {
917             HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
918                 if finish {
919                     log::debug!("produce: cancel {}", self.0);
920                     state.0 = HostFutureProducerState::Idle;
921                     Poll::Ready(Ok(None))
922                 } else {
923                     log::debug!("produce: wait {}", self.0);
924                     state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
925                     Poll::Pending
926                 }
927             }
928             HostFutureProducerState::Writing(item) => {
929                 log::debug!("produce: done {}", self.0);
930                 state.0 = HostFutureProducerState::Complete;
931                 Poll::Ready(Ok(Some(item)))
932             }
933             HostFutureProducerState::Complete => unreachable!(),
934         }
935     }
936 }
937 
938 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
939 
940 #[derive(Debug)]
941 struct HostStreamConsumerState {
942     waker: Option<Waker>,
943     kind: HostStreamConsumerStateKind,
944 }
945 
946 #[derive(Debug)]
947 enum HostStreamConsumerStateKind {
948     Idle,
949     Consuming(u32),
950     Consumed(Vec<u32>),
951 }
952 
953 impl HostStreamConsumerState {
954     fn wake_by_ref(&mut self) {
955         if let Some(waker) = self.waker.take() {
956             waker.wake();
957         }
958     }
959 }
960 
961 impl StreamConsumer<Data> for HostStreamConsumer {
962     type Item = u32;
963 
964     fn poll_consume(
965         self: Pin<&mut Self>,
966         cx: &mut Context<'_>,
967         mut store: StoreContextMut<'_, Data>,
968         mut source: Source<'_, Self::Item>,
969         finish: bool,
970     ) -> Poll<Result<StreamResult>> {
971         let remaining = source.remaining(&mut store);
972         let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
973             Some((state, _)) => state,
974             None => {
975                 log::debug!("stream consume: dropped {}", self.0);
976                 return Poll::Ready(Ok(StreamResult::Dropped));
977             }
978         };
979         match state.kind {
980             HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
981                 if finish {
982                     log::debug!("stream consume: cancel {}", self.0);
983                     state.waker = None;
984                     Poll::Ready(Ok(StreamResult::Cancelled))
985                 } else {
986                     log::debug!("stream consume: wait {}", self.0);
987                     state.waker = Some(cx.waker().clone());
988                     Poll::Pending
989                 }
990             }
991             HostStreamConsumerStateKind::Consuming(amt) => {
992                 // The writer is performing a zero-length write. We always
993                 // complete that without updating our own state.
994                 if remaining == 0 {
995                     log::debug!("stream consume: completing zero-length write {}", self.0);
996                     return Poll::Ready(Ok(StreamResult::Completed));
997                 }
998 
999                 // If this is a zero-length read then block the writer but update our own state.
1000                 if amt == 0 {
1001                     log::debug!("stream consume: finishing zero-length read {}", self.0);
1002                     state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1003                     state.waker = Some(cx.waker().clone());
1004                     return Poll::Pending;
1005                 }
1006 
1007                 // For non-zero sizes perform the read/copy.
1008                 log::debug!("stream consume: done {}", self.0);
1009                 let mut dst = Vec::with_capacity(amt as usize);
1010                 source.read(&mut store, &mut dst).unwrap();
1011                 let state = &mut store
1012                     .data_mut()
1013                     .host_stream_consumers
1014                     .get_mut(&self.0)
1015                     .unwrap()
1016                     .0;
1017                 state.kind = HostStreamConsumerStateKind::Consumed(dst);
1018                 state.waker = None;
1019                 Poll::Ready(Ok(StreamResult::Completed))
1020             }
1021         }
1022     }
1023 }
1024 
1025 impl Drop for HostStreamConsumer {
1026     fn drop(&mut self) {
1027         log::debug!("stream consume: drop {}", self.0);
1028     }
1029 }
1030 
1031 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1032 
1033 #[derive(Debug)]
1034 struct HostStreamProducerState {
1035     kind: HostStreamProducerStateKind,
1036     waker: Option<Waker>,
1037 }
1038 
1039 #[derive(Debug)]
1040 enum HostStreamProducerStateKind {
1041     Idle,
1042     Writing(Vec<u32>),
1043     Wrote(u32),
1044 }
1045 
1046 impl HostStreamProducerState {
1047     fn idle() -> Self {
1048         HostStreamProducerState {
1049             kind: HostStreamProducerStateKind::Idle,
1050             waker: None,
1051         }
1052     }
1053 
1054     fn wake_by_ref(&mut self) {
1055         if let Some(waker) = self.waker.take() {
1056             waker.wake();
1057         }
1058     }
1059 }
1060 
1061 impl StreamProducer<Data> for HostStreamProducer {
1062     type Item = u32;
1063     type Buffer = VecBuffer<u32>;
1064 
1065     fn poll_produce(
1066         self: Pin<&mut Self>,
1067         cx: &mut Context<'_>,
1068         mut store: StoreContextMut<'_, Data>,
1069         mut dst: Destination<'_, Self::Item, Self::Buffer>,
1070         finish: bool,
1071     ) -> Poll<Result<StreamResult>> {
1072         let remaining = dst.remaining(&mut store);
1073         let data = store.data_mut();
1074         let state = match data.host_stream_producers.get_mut(&self.0) {
1075             Some((state, _)) => state,
1076             None => {
1077                 log::debug!("stream produce: dropped {}", self.0);
1078                 return Poll::Ready(Ok(StreamResult::Dropped));
1079             }
1080         };
1081         match &mut state.kind {
1082             HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1083                 if finish {
1084                     log::debug!("stream produce: cancel {}", self.0);
1085                     state.waker = None;
1086                     Poll::Ready(Ok(StreamResult::Cancelled))
1087                 } else {
1088                     log::debug!("stream produce: wait {}", self.0);
1089                     state.waker = Some(cx.waker().clone());
1090                     Poll::Pending
1091                 }
1092             }
1093             HostStreamProducerStateKind::Writing(buf) => {
1094                 // Keep the other side blocked for a zero-length write
1095                 // originated from the host.
1096                 if buf.len() == 0 {
1097                     log::debug!("stream produce: zero-length write {}", self.0);
1098                     state.kind = HostStreamProducerStateKind::Wrote(0);
1099                     state.waker = Some(cx.waker().clone());
1100                     return Poll::Pending;
1101                 }
1102                 log::debug!("stream produce: write {}", self.0);
1103                 match remaining {
1104                     Some(amt) => {
1105                         // If the guest is doing a zero-length read then we've
1106                         // got some data for them. Complete the read but leave
1107                         // ourselves in the same `Writing` state as before.
1108                         if amt == 0 {
1109                             state.waker = None;
1110                             return Poll::Ready(Ok(StreamResult::Completed));
1111                         }
1112 
1113                         // Don't let wasmtime buffer up data for us, so truncate
1114                         // the buffer we're sending over to the amount that the
1115                         // reader is requesting.
1116                         if amt < buf.len() {
1117                             buf.truncate(amt);
1118                         }
1119                     }
1120 
1121                     // At this time host<->host stream reads/writes aren't
1122                     // fuzzed since that brings up a bunch of weird edge cases
1123                     // which aren't fun to deal with and aren't interesting
1124                     // either.
1125                     None => unreachable!(),
1126                 }
1127                 let count = buf.len() as u32;
1128                 dst.set_buffer(mem::take(buf).into());
1129                 state.kind = HostStreamProducerStateKind::Wrote(count);
1130                 state.waker = None;
1131                 Poll::Ready(Ok(StreamResult::Completed))
1132             }
1133         }
1134     }
1135 }
1136 
1137 impl Drop for HostStreamProducer {
1138     fn drop(&mut self) {
1139         log::debug!("stream produce: drop {}", self.0);
1140     }
1141 }
1142 
1143 struct SharedStream(Scope);
1144 
1145 impl SharedStream {
1146     async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1147         std::future::poll_fn(|cx| {
1148             accessor.with(|mut store| {
1149                 self.poll(cx, store.as_context_mut(), false)
1150                     .map(|pair| match pair {
1151                         (None, StreamResult::Dropped) => None,
1152                         (Some(item), StreamResult::Completed) => Some(item),
1153                         _ => unreachable!(),
1154                     })
1155             })
1156         })
1157         .await
1158     }
1159 
1160     fn poll(
1161         &mut self,
1162         cx: &mut Context<'_>,
1163         mut store: StoreContextMut<'_, Data>,
1164         finish: bool,
1165     ) -> Poll<(Option<Command>, StreamResult)> {
1166         let data = store.data_mut();
1167 
1168         // If no more commands remain then this is a closed and dropped stream.
1169         let Some((scope, command)) = data.commands.last_mut() else {
1170             log::debug!("Stream closed: {:?}", self.0);
1171             return Poll::Ready((None, StreamResult::Dropped));
1172         };
1173 
1174         // If the next queued up command is for the scope that this stream is
1175         // attached to then send off the command.
1176         if *scope == self.0 {
1177             let ret = Some(*command);
1178 
1179             // All commands are followed up with an "ack", and after the "ack"
1180             // is delivered then the command is popped to move on to the next
1181             // command. The reason for this is to guarantee that a command has
1182             // been processed before moving on to the next command. This helps
1183             // make the fuzzing easier to work with by being able to implicitly
1184             // assume that a command has been processed by the time something
1185             // else is. Otherwise it might be possible that wasmtime has a set
1186             // of commands/callbacks that are all delivered at the same time and
1187             // the component model doesn't specify what order they happen
1188             // within. By forcing an "ack" it ensures a more expected ordering
1189             // of execution to assist with fuzzing without losing really all
1190             // that much coverage.
1191             if matches!(command, Command::Ack) {
1192                 data.commands.pop();
1193             } else {
1194                 *command = Command::Ack;
1195             }
1196 
1197             // After a command was popped other streams may be able to make
1198             // progress so wake them all up.
1199             for (_, waker) in data.wakers.drain() {
1200                 waker.wake();
1201             }
1202             log::debug!("Delivering command {ret:?} for {:?}", self.0);
1203             return Poll::Ready((ret, StreamResult::Completed));
1204         }
1205 
1206         // The command queue is non-empty and the next command isn't meant for
1207         // us, so someone else needs to drain the queue. Enqueue our waker.
1208         if finish {
1209             Poll::Ready((None, StreamResult::Cancelled))
1210         } else {
1211             data.wakers.insert(self.0, cx.waker().clone());
1212             Poll::Pending
1213         }
1214     }
1215 }
1216 
1217 impl StreamProducer<Data> for SharedStream {
1218     type Item = Command;
1219     type Buffer = Option<Command>;
1220 
1221     fn poll_produce<'a>(
1222         mut self: Pin<&mut Self>,
1223         cx: &mut Context<'_>,
1224         store: StoreContextMut<'a, Data>,
1225         mut destination: Destination<'a, Self::Item, Self::Buffer>,
1226         finish: bool,
1227     ) -> Poll<Result<StreamResult>> {
1228         let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1229         destination.set_buffer(item);
1230         Poll::Ready(Ok(result))
1231     }
1232 }
1233 
1234 #[cfg(test)]
1235 mod tests {
1236     use super::{ComponentAsync, Scope, init, run};
1237     use crate::oracles::component_async::types::*;
1238     use crate::test::test_n_times;
1239     use Scope::*;
1240 
1241     #[test]
1242     fn smoke() {
1243         init();
1244 
1245         test_n_times(50, |c, _| {
1246             run(c);
1247             Ok(())
1248         });
1249     }
1250 
1251     // ========================================================================
1252     // A series of fuzz-generated test cases which caused problems during the
1253     // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1254     // changes over time.
1255 
1256     #[test]
1257     fn simple() {
1258         init();
1259 
1260         run(ComponentAsync {
1261             commands: vec![
1262                 (GuestCaller, Command::AsyncPendingImportCall(0)),
1263                 (GuestCallee, Command::AsyncPendingImportCall(1)),
1264                 (GuestCallee, Command::AsyncPendingExportComplete(0)),
1265                 (GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1266                 (GuestCaller, Command::AsyncPendingImportCall(2)),
1267             ],
1268         });
1269     }
1270 
1271     #[test]
1272     fn somewhat_larger() {
1273         static COMMANDS: &[(Scope, Command)] = &[
1274             (GuestCallee, Command::FutureNew(0)),
1275             (HostCaller, Command::FutureNew(1)),
1276             (GuestCallee, Command::FutureReadPending(0)),
1277             (GuestCaller, Command::AsyncPendingImportCall(2)),
1278             (GuestCaller, Command::AsyncPendingImportCall(3)),
1279             (GuestCaller, Command::AsyncPendingImportCall(4)),
1280             (GuestCaller, Command::AsyncPendingImportCall(5)),
1281             (GuestCallee, Command::AsyncPendingExportComplete(5)),
1282             (GuestCallee, Command::AsyncPendingExportComplete(3)),
1283             (GuestCallee, Command::AsyncPendingExportComplete(4)),
1284             (GuestCallee, Command::AsyncPendingExportComplete(2)),
1285             (GuestCaller, Command::AsyncPendingImportCall(6)),
1286             (GuestCallee, Command::AsyncPendingExportComplete(6)),
1287             (GuestCaller, Command::AsyncPendingImportCall(7)),
1288             (GuestCallee, Command::AsyncPendingExportComplete(7)),
1289             (GuestCaller, Command::AsyncPendingImportCall(8)),
1290             (GuestCallee, Command::AsyncPendingExportComplete(8)),
1291             (GuestCaller, Command::AsyncPendingImportCall(9)),
1292             (GuestCallee, Command::AsyncPendingExportComplete(9)),
1293             (GuestCaller, Command::AsyncPendingImportCall(10)),
1294             (GuestCallee, Command::AsyncPendingExportComplete(10)),
1295             (GuestCaller, Command::AsyncPendingImportCall(11)),
1296             (GuestCallee, Command::AsyncPendingExportComplete(11)),
1297             (GuestCaller, Command::AsyncPendingImportCall(12)),
1298             (GuestCallee, Command::AsyncPendingExportComplete(12)),
1299             (GuestCaller, Command::AsyncPendingImportCall(13)),
1300             (GuestCallee, Command::AsyncPendingExportComplete(13)),
1301             (GuestCaller, Command::AsyncPendingImportCall(14)),
1302             (GuestCallee, Command::AsyncPendingExportComplete(14)),
1303             (GuestCaller, Command::AsyncPendingImportCall(15)),
1304             (GuestCallee, Command::AsyncPendingExportComplete(15)),
1305             (GuestCaller, Command::AsyncPendingImportCall(16)),
1306             (GuestCallee, Command::AsyncPendingExportComplete(16)),
1307             (GuestCaller, Command::AsyncPendingImportCall(17)),
1308             (GuestCallee, Command::AsyncPendingExportComplete(17)),
1309             (GuestCaller, Command::AsyncPendingImportCall(18)),
1310             (GuestCallee, Command::AsyncPendingExportComplete(18)),
1311             (GuestCaller, Command::AsyncPendingImportCall(19)),
1312             (GuestCallee, Command::AsyncPendingExportComplete(19)),
1313             (GuestCaller, Command::AsyncPendingImportCall(20)),
1314             (GuestCallee, Command::AsyncPendingExportComplete(20)),
1315             (GuestCaller, Command::AsyncPendingImportCall(21)),
1316             (GuestCallee, Command::AsyncPendingExportComplete(21)),
1317             (GuestCaller, Command::AsyncPendingImportCall(22)),
1318             (GuestCallee, Command::AsyncPendingExportComplete(22)),
1319             (GuestCaller, Command::AsyncPendingImportCall(23)),
1320             (GuestCallee, Command::AsyncPendingExportComplete(23)),
1321             (GuestCaller, Command::AsyncPendingImportCall(24)),
1322             (GuestCallee, Command::AsyncPendingExportComplete(24)),
1323             (GuestCaller, Command::AsyncPendingImportCall(25)),
1324             (GuestCallee, Command::AsyncPendingExportComplete(25)),
1325             (GuestCaller, Command::AsyncPendingImportCall(26)),
1326             (GuestCallee, Command::AsyncPendingExportComplete(26)),
1327             (GuestCaller, Command::AsyncPendingImportCall(27)),
1328             (GuestCallee, Command::AsyncPendingExportComplete(27)),
1329             (GuestCaller, Command::AsyncPendingImportCall(28)),
1330             (GuestCallee, Command::AsyncPendingExportComplete(28)),
1331             (GuestCaller, Command::AsyncPendingImportCall(29)),
1332             (GuestCallee, Command::AsyncPendingExportComplete(29)),
1333             (GuestCaller, Command::AsyncPendingImportCall(30)),
1334             (GuestCallee, Command::AsyncPendingExportComplete(30)),
1335             (GuestCaller, Command::AsyncPendingImportCall(31)),
1336             (GuestCallee, Command::AsyncPendingExportComplete(31)),
1337             (GuestCaller, Command::AsyncPendingImportCall(32)),
1338             (GuestCallee, Command::AsyncPendingExportComplete(32)),
1339             (GuestCaller, Command::AsyncPendingImportCall(33)),
1340             (GuestCallee, Command::AsyncPendingExportComplete(33)),
1341             (GuestCaller, Command::AsyncPendingImportCall(34)),
1342             (GuestCallee, Command::AsyncPendingExportComplete(34)),
1343             (GuestCaller, Command::AsyncPendingImportCall(35)),
1344             (GuestCallee, Command::AsyncPendingExportComplete(35)),
1345             (GuestCaller, Command::AsyncPendingImportCall(36)),
1346             (GuestCallee, Command::AsyncPendingExportComplete(36)),
1347             (GuestCaller, Command::AsyncPendingImportCall(37)),
1348             (GuestCallee, Command::AsyncPendingExportComplete(37)),
1349             (GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1350         ];
1351         init();
1352 
1353         run(ComponentAsync {
1354             commands: COMMANDS.to_vec(),
1355         });
1356     }
1357 
1358     #[test]
1359     fn simple_stream1() {
1360         init();
1361 
1362         run(ComponentAsync {
1363             commands: vec![
1364                 (HostCallee, Command::StreamNew(1)),
1365                 (
1366                     HostCallee,
1367                     Command::StreamReadPending(StreamReadPayload {
1368                         stream: 1,
1369                         count: 2,
1370                     }),
1371                 ),
1372                 (HostCallee, Command::StreamCancelRead(1)),
1373                 (GuestCaller, Command::SyncReadyCall),
1374                 (
1375                     HostCallee,
1376                     Command::StreamWritePending(StreamWritePayload {
1377                         stream: 1,
1378                         item: 3,
1379                         count: 2,
1380                     }),
1381                 ),
1382                 (HostCallee, Command::StreamCancelWrite(1)),
1383                 (HostCallee, Command::StreamDropWritable(1)),
1384                 (
1385                     HostCallee,
1386                     Command::StreamReadDropped(StreamReadPayload {
1387                         stream: 1,
1388                         count: 1,
1389                     }),
1390                 ),
1391             ],
1392         });
1393     }
1394 
1395     #[test]
1396     fn simple_stream3() {
1397         init();
1398 
1399         run(ComponentAsync {
1400             commands: vec![
1401                 (GuestCaller, Command::StreamNew(26)),
1402                 (
1403                     GuestCaller,
1404                     Command::StreamReadPending(StreamReadPayload {
1405                         stream: 26,
1406                         count: 10,
1407                     }),
1408                 ),
1409                 (GuestCaller, Command::StreamDropWritable(26)),
1410                 (GuestCaller, Command::StreamReadAssertDropped(26)),
1411             ],
1412         });
1413     }
1414 
1415     #[test]
1416     fn simple_stream4() {
1417         init();
1418 
1419         run(ComponentAsync {
1420             commands: vec![
1421                 (GuestCaller, Command::StreamNew(23)),
1422                 (
1423                     GuestCaller,
1424                     Command::StreamWritePending(StreamWritePayload {
1425                         stream: 23,
1426                         item: 24,
1427                         count: 2,
1428                     }),
1429                 ),
1430                 (GuestCaller, Command::StreamGive(23)),
1431                 (GuestCallee, Command::StreamDropReadable(23)),
1432                 (
1433                     GuestCaller,
1434                     Command::StreamWriteAssertDropped(StreamReadPayload {
1435                         stream: 23,
1436                         count: 0,
1437                     }),
1438                 ),
1439             ],
1440         });
1441     }
1442 
1443     #[test]
1444     fn zero_length_behavior() {
1445         init();
1446 
1447         run(ComponentAsync {
1448             commands: vec![
1449                 (GuestCaller, Command::StreamNew(10)),
1450                 (HostCaller, Command::StreamTake(10)),
1451                 (
1452                     GuestCaller,
1453                     Command::StreamWritePending(StreamWritePayload {
1454                         stream: 10,
1455                         item: 13,
1456                         count: 5,
1457                     }),
1458                 ),
1459                 (
1460                     HostCaller,
1461                     Command::StreamReadReady(StreamReadyPayload {
1462                         stream: 10,
1463                         item: 0,
1464                         op_count: 0,
1465                         ready_count: 0,
1466                     }),
1467                 ),
1468                 (
1469                     HostCaller,
1470                     Command::StreamReadReady(StreamReadyPayload {
1471                         stream: 10,
1472                         item: 0,
1473                         op_count: 0,
1474                         ready_count: 0,
1475                     }),
1476                 ),
1477             ],
1478         });
1479     }
1480 }
1481