1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2 
3 use crate::block_on;
4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8 use futures::channel::oneshot;
9 use std::collections::{HashMap, HashSet};
10 use std::mem;
11 use std::pin::Pin;
12 use std::sync::{Arc, OnceLock, Weak};
13 use std::task::{Context, Poll, Waker};
14 use std::time::Instant;
15 use wasmtime::component::{
16     Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17     FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18     StreamReader, StreamResult, VecBuffer,
19 };
20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22 
23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24 
25 /// Initializes state for future fuzz runs.
26 ///
27 /// This will create an `Engine` to run this fuzzer within and it will
28 /// additionally precompile the component that will be used for fuzzing.
29 ///
30 /// There are a few points of note about this:
31 ///
32 /// * The `misc` fuzzer is manually instrumented with this function as the init
33 ///   hook to ensure this runs before any other fuzzing.
34 ///
35 /// * Compilation of the component takes quite some time with
36 ///   fuzzing-instrumented Cranelift. To assist with local development this
37 ///   implements a cache which is serialized/deserialized via an env var.
38 pub fn init() {
39     crate::init_fuzzing();
40 
41     STATE.get_or_init(|| {
42         let mut config = Config::new();
43         config.wasm_component_model_async(true);
44         let engine = Engine::new(&config).unwrap();
45         let component = compile(&engine);
46         let mut linker = Linker::new(&engine);
47         wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
48         async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
49         types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50 
51         let pre = linker.instantiate_pre(&component).unwrap();
52         let pre = FuzzAsyncPre::new(pre).unwrap();
53 
54         (engine, pre)
55     });
56 
57     fn compile(engine: &Engine) -> Component {
58         let wasm = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
59         let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
60         if let Some(path) = &cwasm_cache
61             && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
62             && let Ok(wasm_mtime) = std::fs::metadata(wasm).and_then(|m| m.modified())
63             && cwasm_mtime > wasm_mtime
64         {
65             log::debug!("Using cached component async cwasm at {path}");
66             unsafe {
67                 return Component::deserialize_file(engine, path).unwrap();
68             }
69         }
70 
71         let composition = {
72             let mut config = wasm_compose::config::Config::default();
73             let tempdir = tempfile::TempDir::new().unwrap();
74             let path = tempdir.path().join("fuzz-async.wasm");
75             std::fs::copy(wasm, &path).unwrap();
76             config.definitions.push(path.clone());
77 
78             wasm_compose::composer::ComponentComposer::new(&path, &config)
79                 .compose()
80                 .unwrap()
81         };
82         let start = Instant::now();
83         let component = Component::new(&engine, &composition).unwrap();
84         if let Some(path) = cwasm_cache {
85             log::debug!("Caching component async cwasm to {path}");
86             std::fs::write(path, &component.serialize().unwrap()).unwrap();
87         } else if start.elapsed() > std::time::Duration::from_secs(1) {
88             eprintln!(
89                 "
90 !!!!!!!!!!!!!!!!!!!!!!!!!!
91 
92 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
93 cache compilation results
94 
95 !!!!!!!!!!!!!!!!!!!!!!!!!!
96 "
97             );
98         }
99         return component;
100     }
101 }
102 
103 #[derive(Default)]
104 struct Data {
105     ctx: WasiCtx,
106     table: ResourceTable,
107     wakers: HashMap<Scope, Waker>,
108     commands: Vec<(Scope, Command)>,
109 
110     guest_caller_stream: Option<StreamReader<Command>>,
111     guest_callee_stream: Option<StreamReader<Command>>,
112 
113     host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
114     host_pending_async_calls_cancelled: HashSet<u32>,
115     guest_pending_async_calls_ready: HashSet<u32>,
116 
117     // State of futures/streams. Note that while #12091 is unresolved an
118     // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
119     // and the various halves we're interacting with using traits.
120     host_futures: HashMap<u32, FutureReader<u32>>,
121     host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
122     host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
123     host_streams: HashMap<u32, StreamReader<u32>>,
124     host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
125     host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
126 }
127 
128 impl WasiView for Data {
129     fn ctx(&mut self) -> WasiCtxView<'_> {
130         WasiCtxView {
131             ctx: &mut self.ctx,
132             table: &mut self.table,
133         }
134     }
135 }
136 
137 impl async_test::HostWithStore for HasSelf<Data> {
138     async fn async_ready<T>(_store: &Accessor<T, Self>) {}
139 
140     async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
141         let (tx, rx) = oneshot::channel();
142         store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
143         let record = RecordCancelOnDrop { store, id };
144         rx.await.unwrap();
145         mem::forget(record);
146 
147         struct RecordCancelOnDrop<'a, T: 'static> {
148             store: &'a Accessor<T, HasSelf<Data>>,
149             id: u32,
150         }
151 
152         impl<T> Drop for RecordCancelOnDrop<'_, T> {
153             fn drop(&mut self) {
154                 self.store.with(|mut s| {
155                     s.get().host_pending_async_calls_cancelled.insert(self.id);
156                 });
157             }
158         }
159     }
160 
161     async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
162 }
163 
164 impl async_test::Host for Data {
165     fn sync_ready(&mut self) {}
166 
167     fn future_take(&mut self, id: u32) -> FutureReader<u32> {
168         self.host_futures.remove(&id).unwrap()
169     }
170 
171     fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
172         let prev = self.host_futures.insert(id, future);
173         assert!(prev.is_none());
174     }
175 
176     fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
177         self.host_streams.remove(&id).unwrap()
178     }
179 
180     fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
181         let prev = self.host_streams.insert(id, stream);
182         assert!(prev.is_none());
183     }
184 }
185 
186 impl types::HostWithStore for HasSelf<Data> {
187     fn get_commands<T>(
188         mut store: Access<'_, T, Self>,
189         scope: types::Scope,
190     ) -> StreamReader<Command> {
191         let data = store.get();
192         match scope {
193             types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
194             types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
195         }
196     }
197 }
198 
199 impl types::Host for Data {}
200 
201 /// Executes the `input` provided, assuming that `init` has been previously
202 /// executed.
203 pub fn run(mut input: ComponentAsync) {
204     log::debug!("Running component async fuzz test with\n{input:?}");
205 
206     // Commands are executed in the order that they're listed in the input, but
207     // to make it easier on the `StreamProducer` implementation below they're
208     // popped off the back. To ensure that they're all delivered in the right
209     // order reverse the list to ensure the correct order is maintained.
210     input.commands.reverse();
211 
212     let (engine, pre) = STATE.get().unwrap();
213     let mut store = Store::new(
214         engine,
215         Data {
216             ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
217             commands: input.commands,
218             ..Data::default()
219         },
220     );
221 
222     let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller));
223     let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee));
224     store.data_mut().guest_caller_stream = Some(guest_caller_stream);
225     store.data_mut().guest_callee_stream = Some(guest_callee_stream);
226     block_on(async {
227         let instance = pre.instantiate_async(&mut store).await.unwrap();
228         let test = instance.wasmtime_fuzz_fuzz_async_test();
229 
230         let mut host_caller = SharedStream(Scope::HostCaller);
231         let mut host_callee = SharedStream(Scope::HostCallee);
232         store
233             .run_concurrent(async |store| {
234                 // Kick off stream reads in the guest. This function will return
235                 // but the tasks in the guest will keep running after they
236                 // return to process stream items.
237                 test.call_init(store, types::Scope::Caller).await.unwrap();
238 
239                 // Simultaneously process commands from both host streams. These
240                 // will return once the entire command queue is exhausted.
241                 futures::join!(
242                     async {
243                         while let Some(cmd) = host_caller.next(store).await {
244                             host_caller_cmd(&test, store, cmd).await;
245                         }
246                     },
247                     async {
248                         while let Some(cmd) = host_callee.next(store).await {
249                             host_callee_cmd(store, cmd).await;
250                         }
251                     },
252                 );
253 
254                 // Note that there may still be pending async work in the guest
255                 // (or host). It's intentional that it's not cleaned up here to
256                 // help test situations where async work is all abruptly
257                 // cancelled by just being dropped in the host.
258             })
259             .await
260             .unwrap();
261     });
262 }
263 
264 /// See documentation in `fuzz_async.rs` for what's going on here.
265 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
266 where
267     F: FnMut(&mut Data) -> bool,
268 {
269     for _ in 0..1000 {
270         let ready = store.with(|mut s| f(s.get()));
271         if ready {
272             return true;
273         }
274 
275         crate::YieldN(1).await;
276     }
277 
278     return false;
279 }
280 
281 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
282 where
283     F: FnMut(&mut Data) -> bool,
284 {
285     assert!(
286         test_property(store, f).await,
287         "timed out waiting for {desc}",
288     );
289 }
290 
291 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
292     match cmd {
293         Command::Ack => {}
294         Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
295         Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
296         Command::AsyncPendingExportComplete(_i) => todo!(),
297         Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
298         Command::AsyncPendingImportCall(i) => {
299             struct RunPendingImport {
300                 test: Guest,
301                 i: u32,
302             }
303 
304             store.spawn(RunPendingImport {
305                 test: test.clone(),
306                 i,
307             });
308 
309             impl AccessorTask<Data> for RunPendingImport {
310                 async fn run(self, store: &Accessor<Data>) -> Result<()> {
311                     self.test.call_async_pending(store, self.i).await?;
312                     store.with(|mut s| {
313                         s.get().guest_pending_async_calls_ready.insert(self.i);
314                     });
315                     Ok(())
316                 }
317             }
318         }
319         Command::AsyncPendingImportCancel(_i) => todo!(),
320         Command::AsyncPendingImportAssertReady(i) => {
321             assert!(
322                 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
323                 "expected async_pending import {i} to be ready",
324             );
325         }
326 
327         Command::FutureTake(i) => {
328             let future = test.call_future_take(store, i).await.unwrap();
329             store.with(|mut s| {
330                 let prev = s.get().host_futures.insert(i, future);
331                 assert!(prev.is_none());
332             });
333         }
334         Command::FutureGive(i) => {
335             let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
336             test.call_future_receive(store, i, future).await.unwrap();
337         }
338         Command::StreamTake(i) => {
339             let stream = test.call_stream_take(store, i).await.unwrap();
340             store.with(|mut s| {
341                 let prev = s.get().host_streams.insert(i, stream);
342                 assert!(prev.is_none());
343             });
344         }
345         Command::StreamGive(i) => {
346             let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
347             test.call_stream_receive(store, i, stream).await.unwrap();
348         }
349 
350         other => future_or_stream_cmd(store, other).await,
351     }
352 }
353 
354 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
355     match cmd {
356         Command::Ack => {}
357         Command::SyncReadyCall => todo!(),
358         Command::AsyncReadyCall => todo!(),
359         Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
360             s.get()
361                 .host_pending_async_calls
362                 .remove(&i)
363                 .unwrap()
364                 .send(())
365                 .unwrap();
366         }),
367         Command::AsyncPendingExportAssertCancelled(i) => {
368             assert!(
369                 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
370                 "expected async_pending export {i} to be cancelled",
371             );
372         }
373         Command::AsyncPendingImportCall(_i) => todo!(),
374         Command::AsyncPendingImportCancel(_i) => todo!(),
375         Command::AsyncPendingImportAssertReady(_i) => todo!(),
376 
377         other => future_or_stream_cmd(store, other).await,
378     }
379 }
380 
381 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
382     match cmd {
383         // These commands should be handled above
384         Command::Ack
385         | Command::SyncReadyCall
386         | Command::AsyncReadyCall
387         | Command::AsyncPendingExportComplete(_)
388         | Command::AsyncPendingExportAssertCancelled(_)
389         | Command::AsyncPendingImportCall(_)
390         | Command::AsyncPendingImportCancel(_)
391         | Command::FutureTake(_)
392         | Command::FutureGive(_)
393         | Command::StreamTake(_)
394         | Command::StreamGive(_)
395         | Command::AsyncPendingImportAssertReady(_) => unreachable!(),
396 
397         Command::FutureNew(id) => {
398             store.with(|mut s| {
399                 let arc = Arc::new(());
400                 let weak = Arc::downgrade(&arc);
401                 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc));
402                 let data = s.get();
403                 let prev = data.host_futures.insert(id, future);
404                 assert!(prev.is_none());
405                 let prev = data
406                     .host_future_producers
407                     .insert(id, (HostFutureProducerState::Idle, weak));
408                 assert!(prev.is_none());
409             });
410         }
411         Command::FutureDropReadable(id) => {
412             store.with(|mut s| match s.get().host_futures.remove(&id) {
413                 Some(mut future) => future.close(&mut s),
414                 None => {
415                     let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
416                     state.wake_by_ref();
417                 }
418             })
419         }
420         Command::FutureWriteReady(payload) => {
421             await_property(store, "future write should be waiting", |s| {
422                 matches!(
423                     s.host_future_producers.get(&payload.future),
424                     Some((HostFutureProducerState::Waiting(_), _))
425                 )
426             })
427             .await;
428             store.with(|mut s| {
429                 let state = s
430                     .get()
431                     .host_future_producers
432                     .get_mut(&payload.future)
433                     .unwrap();
434                 match state {
435                     (HostFutureProducerState::Waiting(waker), _) => {
436                         waker.wake_by_ref();
437                         state.0 = HostFutureProducerState::Writing(payload.item);
438                     }
439                     (state, _) => panic!("future not waiting: {state:?}"),
440                 }
441             })
442         }
443         Command::FutureWritePending(payload) => store.with(|mut s| {
444             let state = s
445                 .get()
446                 .host_future_producers
447                 .get_mut(&payload.future)
448                 .unwrap();
449             match state {
450                 (HostFutureProducerState::Idle, _) => {
451                     state.0 = HostFutureProducerState::Writing(payload.item);
452                 }
453                 _ => panic!("future not idle"),
454             }
455         }),
456         Command::FutureWriteDropped(id) => store.with(|mut s| {
457             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
458             assert!(matches!(state, HostFutureProducerState::Idle));
459             assert!(weak.upgrade().is_none());
460         }),
461         Command::FutureReadReady(payload) => {
462             let id = payload.future;
463             store.with(|mut s| {
464                 let arc = Arc::new(());
465                 let weak = Arc::downgrade(&arc);
466                 let data = s.get();
467                 let future = data.host_futures.remove(&id).unwrap();
468                 let prev = data
469                     .host_future_consumers
470                     .insert(id, (HostFutureConsumerState::Consuming, weak));
471                 assert!(prev.is_none());
472                 future.pipe(&mut s, HostFutureConsumer(id, arc));
473             });
474 
475             await_property(store, "future should be present", |s| {
476                 matches!(
477                     s.host_future_consumers[&id],
478                     (HostFutureConsumerState::Complete(_), _)
479                 )
480             })
481             .await;
482 
483             store.with(|mut s| {
484                 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
485                 match state {
486                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
487                     _ => panic!("future not complete"),
488                 }
489             });
490         }
491         Command::FutureReadPending(id) => {
492             ensure_future_reading(store, id);
493             store.with(|mut s| {
494                 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
495                 state.wake_by_ref();
496                 assert!(
497                     matches!(state, HostFutureConsumerState::Idle),
498                     "bad state: {state:?}",
499                 );
500                 *state = HostFutureConsumerState::Consuming;
501             })
502         }
503         Command::FutureCancelWrite(id) => store.with(|mut s| {
504             let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
505             assert!(matches!(state, HostFutureProducerState::Writing(_)));
506             *state = HostFutureProducerState::Idle;
507         }),
508         Command::FutureCancelRead(id) => store.with(|mut s| {
509             let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
510             assert!(matches!(state, HostFutureConsumerState::Consuming));
511             *state = HostFutureConsumerState::Idle;
512         }),
513         Command::FutureReadAssertComplete(payload) => {
514             await_property(store, "future read should be complete", |s| {
515                 matches!(
516                     s.host_future_consumers.get(&payload.future),
517                     Some((HostFutureConsumerState::Complete(_), _))
518                 )
519             })
520             .await;
521             store.with(|mut s| {
522                 let (state, _) = s
523                     .get()
524                     .host_future_consumers
525                     .remove(&payload.future)
526                     .unwrap();
527                 match state {
528                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
529                     _ => panic!("future not complete"),
530                 }
531             })
532         }
533         Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
534             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
535             assert!(matches!(state, HostFutureProducerState::Complete));
536             assert!(weak.upgrade().is_none());
537         }),
538         Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
539             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
540             assert!(matches!(state, HostFutureProducerState::Writing(_)));
541             assert!(weak.upgrade().is_none());
542         }),
543 
544         Command::StreamNew(id) => {
545             store.with(|mut s| {
546                 let arc = Arc::new(());
547                 let weak = Arc::downgrade(&arc);
548                 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc));
549                 let data = s.get();
550                 let prev = data.host_streams.insert(id, stream);
551                 assert!(prev.is_none());
552                 let prev = data
553                     .host_stream_producers
554                     .insert(id, (HostStreamProducerState::idle(), weak));
555                 assert!(prev.is_none());
556             });
557         }
558         Command::StreamDropReadable(id) => {
559             store.with(|mut s| match s.get().host_streams.remove(&id) {
560                 Some(mut stream) => {
561                     stream.close(&mut s);
562                 }
563                 None => {
564                     let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
565                     state.wake_by_ref();
566                 }
567             })
568         }
569         Command::StreamDropWritable(id) => store.with(|mut s| {
570             let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
571             state.wake_by_ref();
572         }),
573         Command::StreamWriteReady(payload) => {
574             let id = payload.stream;
575             store.with(|mut s| {
576                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
577                 state.wake_by_ref();
578                 match state.kind {
579                     HostStreamProducerStateKind::Idle => {
580                         state.kind = HostStreamProducerStateKind::Writing(stream_payload(
581                             payload.item,
582                             payload.op_count,
583                         ));
584                     }
585                     _ => panic!("stream not idle: {state:?}"),
586                 }
587             });
588             await_property(store, "stream should complete a write", |s| {
589                 matches!(
590                     s.host_stream_producers[&id].0.kind,
591                     HostStreamProducerStateKind::Wrote(_),
592                 )
593             })
594             .await;
595             store.with(|mut s| {
596                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
597                 match state.kind {
598                     HostStreamProducerStateKind::Wrote(amt) => {
599                         assert_eq!(amt, payload.ready_count);
600                         state.kind = HostStreamProducerStateKind::Idle;
601                     }
602                     _ => panic!("stream not idle: {state:?}"),
603                 }
604             });
605         }
606         Command::StreamReadReady(payload) => {
607             let id = payload.stream;
608             ensure_stream_reading(store, id);
609             store.with(|mut s| {
610                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
611                 state.wake_by_ref();
612                 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
613             });
614             await_property(store, "stream should complete a read", |s| {
615                 matches!(
616                     s.host_stream_consumers[&id].0.kind,
617                     HostStreamConsumerStateKind::Consumed(_),
618                 )
619             })
620             .await;
621 
622             store.with(|mut s| {
623                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
624                 match &state.kind {
625                     HostStreamConsumerStateKind::Consumed(last_read) => {
626                         assert_eq!(
627                             *last_read,
628                             stream_payload(payload.item, payload.ready_count)
629                         );
630                         state.kind = HostStreamConsumerStateKind::Idle;
631                     }
632                     _ => panic!("future not complete"),
633                 }
634             });
635         }
636         Command::StreamWritePending(payload) => store.with(|mut s| {
637             let (state, _) = s
638                 .get()
639                 .host_stream_producers
640                 .get_mut(&payload.stream)
641                 .unwrap();
642             state.wake_by_ref();
643             match state.kind {
644                 HostStreamProducerStateKind::Idle => {
645                     state.kind = HostStreamProducerStateKind::Writing(stream_payload(
646                         payload.item,
647                         payload.count,
648                     ));
649                 }
650                 _ => panic!("stream not idle {:?}", state.kind),
651             }
652         }),
653         Command::StreamReadPending(payload) => {
654             ensure_stream_reading(store, payload.stream);
655             store.with(|mut s| {
656                 let (state, _) = s
657                     .get()
658                     .host_stream_consumers
659                     .get_mut(&payload.stream)
660                     .unwrap();
661                 state.wake_by_ref();
662                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
663                 state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
664             })
665         }
666         Command::StreamWriteDropped(payload) => store.with(|mut s| {
667             let (state, weak) = s
668                 .get()
669                 .host_stream_producers
670                 .get_mut(&payload.stream)
671                 .unwrap();
672             assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
673             assert!(weak.upgrade().is_none());
674         }),
675         Command::StreamReadDropped(payload) => {
676             ensure_stream_reading(store, payload.stream);
677             await_property(store, "stream read should get dropped", |s| {
678                 let weak = &s.host_stream_consumers[&payload.stream].1;
679                 weak.upgrade().is_none()
680             })
681             .await;
682             store.with(|mut s| {
683                 let (state, weak) = s
684                     .get()
685                     .host_stream_consumers
686                     .get_mut(&payload.stream)
687                     .unwrap();
688                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
689                 assert!(weak.upgrade().is_none());
690             })
691         }
692         Command::StreamCancelWrite(id) => store.with(|mut s| {
693             let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
694             assert!(
695                 matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
696                 "invalid state {state:?}",
697             );
698             state.kind = HostStreamProducerStateKind::Idle;
699             state.wake_by_ref();
700         }),
701         Command::StreamCancelRead(id) => store.with(|mut s| {
702             let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
703             assert!(matches!(
704                 state.kind,
705                 HostStreamConsumerStateKind::Consuming(_)
706             ));
707             state.kind = HostStreamConsumerStateKind::Idle;
708         }),
709         Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
710             let (state, _) = s
711                 .get()
712                 .host_stream_consumers
713                 .get_mut(&payload.stream)
714                 .unwrap();
715             match &state.kind {
716                 HostStreamConsumerStateKind::Consumed(last_read) => {
717                     assert_eq!(*last_read, stream_payload(payload.item, payload.count));
718                     state.kind = HostStreamConsumerStateKind::Idle;
719                 }
720                 _ => panic!("stream not complete"),
721             }
722         }),
723         Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
724             let (state, _) = s
725                 .get()
726                 .host_stream_producers
727                 .get_mut(&payload.stream)
728                 .unwrap();
729             match state.kind {
730                 HostStreamProducerStateKind::Wrote(amt) => {
731                     assert_eq!(amt, payload.count);
732                     state.kind = HostStreamProducerStateKind::Idle;
733                 }
734                 _ => panic!("stream not complete: {:?}", state.kind),
735             }
736         }),
737         Command::StreamWriteAssertDropped(payload) => {
738             await_property(store, "stream write should be dropped", |s| {
739                 let weak = &s.host_stream_producers[&payload.stream].1;
740                 weak.upgrade().is_none()
741             })
742             .await;
743             store.with(|mut s| {
744                 let (state, weak) = s
745                     .get()
746                     .host_stream_producers
747                     .get_mut(&payload.stream)
748                     .unwrap();
749                 assert!(matches!(
750                     state.kind,
751                     HostStreamProducerStateKind::Writing(_)
752                 ));
753                 assert!(weak.upgrade().is_none());
754             })
755         }
756         Command::StreamReadAssertDropped(id) => {
757             await_property(store, "stream read should be dropped", |s| {
758                 let weak = &s.host_stream_consumers[&id].1;
759                 weak.upgrade().is_none()
760             })
761             .await;
762             store.with(|mut s| {
763                 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
764                 assert!(matches!(
765                     state.kind,
766                     HostStreamConsumerStateKind::Consuming(_),
767                 ));
768                 assert!(weak.upgrade().is_none());
769             })
770         }
771     }
772 }
773 
774 fn stream_payload(item: u32, count: u32) -> Vec<u32> {
775     (item..item + count).collect()
776 }
777 
778 fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
779     store.with(|mut s| {
780         let data = s.get();
781         if !data.host_futures.contains_key(&id) {
782             return;
783         }
784         log::debug!("future consume: start {id}");
785         let arc = Arc::new(());
786         let weak = Arc::downgrade(&arc);
787         let data = s.get();
788         let future = data.host_futures.remove(&id).unwrap();
789         let prev = data
790             .host_future_consumers
791             .insert(id, (HostFutureConsumerState::Idle, weak));
792         assert!(prev.is_none());
793         future.pipe(&mut s, HostFutureConsumer(id, arc));
794     });
795 }
796 
797 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
798     store.with(|mut s| {
799         let data = s.get();
800         if !data.host_streams.contains_key(&id) {
801             return;
802         }
803         log::debug!("stream consume: start {id}");
804         let arc = Arc::new(());
805         let weak = Arc::downgrade(&arc);
806         let prev = data.host_stream_consumers.insert(
807             id,
808             (
809                 HostStreamConsumerState {
810                     kind: HostStreamConsumerStateKind::Idle,
811                     waker: None,
812                 },
813                 weak,
814             ),
815         );
816         assert!(prev.is_none());
817         let stream = data.host_streams.remove(&id).unwrap();
818         stream.pipe(&mut s, HostStreamConsumer(id, arc));
819     });
820 }
821 
822 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
823 
824 /// Note that this is only created once a read is actually initiated on a
825 /// future. It's also not possible to cancel a host-based read on a future,
826 /// hence why this is simpler than the `HostFutureProducerState` state below.
827 #[derive(Debug)]
828 enum HostFutureConsumerState {
829     Idle,
830     Waiting(Waker),
831     Consuming,
832     Complete(u32),
833 }
834 
835 impl HostFutureConsumerState {
836     fn wake_by_ref(&mut self) {
837         if let HostFutureConsumerState::Waiting(waker) = &self {
838             waker.wake_by_ref();
839             *self = HostFutureConsumerState::Idle;
840         }
841     }
842 }
843 
844 impl FutureConsumer<Data> for HostFutureConsumer {
845     type Item = u32;
846 
847     fn poll_consume(
848         self: Pin<&mut Self>,
849         cx: &mut Context<'_>,
850         mut store: StoreContextMut<'_, Data>,
851         mut source: Source<'_, Self::Item>,
852         finish: bool,
853     ) -> Poll<Result<()>> {
854         let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
855             Some(state) => state,
856             None => {
857                 log::debug!("consume: closed {}", self.0);
858                 return Poll::Ready(Ok(()));
859             }
860         };
861         match state.0 {
862             HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
863                 if finish {
864                     log::debug!("consume: cancel {}", self.0);
865                     state.0 = HostFutureConsumerState::Idle;
866                     Poll::Ready(Ok(()))
867                 } else {
868                     log::debug!("consume: wait {}", self.0);
869                     state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
870                     Poll::Pending
871                 }
872             }
873             HostFutureConsumerState::Consuming => {
874                 log::debug!("consume: done {}", self.0);
875                 let mut item = None;
876                 source.read(&mut store, &mut item).unwrap();
877                 store
878                     .data_mut()
879                     .host_future_consumers
880                     .get_mut(&self.0)
881                     .unwrap()
882                     .0 = HostFutureConsumerState::Complete(item.unwrap());
883                 Poll::Ready(Ok(()))
884             }
885             HostFutureConsumerState::Complete(_) => unreachable!(),
886         }
887     }
888 }
889 
890 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
891 
892 #[derive(Debug)]
893 enum HostFutureProducerState {
894     Idle,
895     Waiting(Waker),
896     Writing(u32),
897     Complete,
898 }
899 
900 impl FutureProducer<Data> for HostFutureProducer {
901     type Item = u32;
902 
903     fn poll_produce(
904         self: Pin<&mut Self>,
905         cx: &mut Context<'_>,
906         mut store: StoreContextMut<'_, Data>,
907         finish: bool,
908     ) -> Poll<Result<Option<Self::Item>>> {
909         let state = store
910             .data_mut()
911             .host_future_producers
912             .get_mut(&self.0)
913             .unwrap();
914         match state.0 {
915             HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
916                 if finish {
917                     log::debug!("produce: cancel {}", self.0);
918                     state.0 = HostFutureProducerState::Idle;
919                     Poll::Ready(Ok(None))
920                 } else {
921                     log::debug!("produce: wait {}", self.0);
922                     state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
923                     Poll::Pending
924                 }
925             }
926             HostFutureProducerState::Writing(item) => {
927                 log::debug!("produce: done {}", self.0);
928                 state.0 = HostFutureProducerState::Complete;
929                 Poll::Ready(Ok(Some(item)))
930             }
931             HostFutureProducerState::Complete => unreachable!(),
932         }
933     }
934 }
935 
936 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
937 
938 #[derive(Debug)]
939 struct HostStreamConsumerState {
940     waker: Option<Waker>,
941     kind: HostStreamConsumerStateKind,
942 }
943 
944 #[derive(Debug)]
945 enum HostStreamConsumerStateKind {
946     Idle,
947     Consuming(u32),
948     Consumed(Vec<u32>),
949 }
950 
951 impl HostStreamConsumerState {
952     fn wake_by_ref(&mut self) {
953         if let Some(waker) = self.waker.take() {
954             waker.wake();
955         }
956     }
957 }
958 
959 impl StreamConsumer<Data> for HostStreamConsumer {
960     type Item = u32;
961 
962     fn poll_consume(
963         self: Pin<&mut Self>,
964         cx: &mut Context<'_>,
965         mut store: StoreContextMut<'_, Data>,
966         mut source: Source<'_, Self::Item>,
967         finish: bool,
968     ) -> Poll<Result<StreamResult>> {
969         let remaining = source.remaining(&mut store);
970         let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
971             Some((state, _)) => state,
972             None => {
973                 log::debug!("stream consume: dropped {}", self.0);
974                 return Poll::Ready(Ok(StreamResult::Dropped));
975             }
976         };
977         match state.kind {
978             HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
979                 if finish {
980                     log::debug!("stream consume: cancel {}", self.0);
981                     state.waker = None;
982                     Poll::Ready(Ok(StreamResult::Cancelled))
983                 } else {
984                     log::debug!("stream consume: wait {}", self.0);
985                     state.waker = Some(cx.waker().clone());
986                     Poll::Pending
987                 }
988             }
989             HostStreamConsumerStateKind::Consuming(amt) => {
990                 // The writer is performing a zero-length write. We always
991                 // complete that without updating our own state.
992                 if remaining == 0 {
993                     log::debug!("stream consume: completing zero-length write {}", self.0);
994                     return Poll::Ready(Ok(StreamResult::Completed));
995                 }
996 
997                 // If this is a zero-length read then block the writer but update our own state.
998                 if amt == 0 {
999                     log::debug!("stream consume: finishing zero-length read {}", self.0);
1000                     state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1001                     state.waker = Some(cx.waker().clone());
1002                     return Poll::Pending;
1003                 }
1004 
1005                 // For non-zero sizes perform the read/copy.
1006                 log::debug!("stream consume: done {}", self.0);
1007                 let mut dst = Vec::with_capacity(amt as usize);
1008                 source.read(&mut store, &mut dst).unwrap();
1009                 let state = &mut store
1010                     .data_mut()
1011                     .host_stream_consumers
1012                     .get_mut(&self.0)
1013                     .unwrap()
1014                     .0;
1015                 state.kind = HostStreamConsumerStateKind::Consumed(dst);
1016                 state.waker = None;
1017                 Poll::Ready(Ok(StreamResult::Completed))
1018             }
1019         }
1020     }
1021 }
1022 
1023 impl Drop for HostStreamConsumer {
1024     fn drop(&mut self) {
1025         log::debug!("stream consume: drop {}", self.0);
1026     }
1027 }
1028 
1029 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1030 
1031 #[derive(Debug)]
1032 struct HostStreamProducerState {
1033     kind: HostStreamProducerStateKind,
1034     waker: Option<Waker>,
1035 }
1036 
1037 #[derive(Debug)]
1038 enum HostStreamProducerStateKind {
1039     Idle,
1040     Writing(Vec<u32>),
1041     Wrote(u32),
1042 }
1043 
1044 impl HostStreamProducerState {
1045     fn idle() -> Self {
1046         HostStreamProducerState {
1047             kind: HostStreamProducerStateKind::Idle,
1048             waker: None,
1049         }
1050     }
1051 
1052     fn wake_by_ref(&mut self) {
1053         if let Some(waker) = self.waker.take() {
1054             waker.wake();
1055         }
1056     }
1057 }
1058 
1059 impl StreamProducer<Data> for HostStreamProducer {
1060     type Item = u32;
1061     type Buffer = VecBuffer<u32>;
1062 
1063     fn poll_produce(
1064         self: Pin<&mut Self>,
1065         cx: &mut Context<'_>,
1066         mut store: StoreContextMut<'_, Data>,
1067         mut dst: Destination<'_, Self::Item, Self::Buffer>,
1068         finish: bool,
1069     ) -> Poll<Result<StreamResult>> {
1070         let remaining = dst.remaining(&mut store);
1071         let data = store.data_mut();
1072         let state = match data.host_stream_producers.get_mut(&self.0) {
1073             Some((state, _)) => state,
1074             None => {
1075                 log::debug!("stream produce: dropped {}", self.0);
1076                 return Poll::Ready(Ok(StreamResult::Dropped));
1077             }
1078         };
1079         match &mut state.kind {
1080             HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1081                 if finish {
1082                     log::debug!("stream produce: cancel {}", self.0);
1083                     state.waker = None;
1084                     Poll::Ready(Ok(StreamResult::Cancelled))
1085                 } else {
1086                     log::debug!("stream produce: wait {}", self.0);
1087                     state.waker = Some(cx.waker().clone());
1088                     Poll::Pending
1089                 }
1090             }
1091             HostStreamProducerStateKind::Writing(buf) => {
1092                 // Keep the other side blocked for a zero-length write
1093                 // originated from the host.
1094                 if buf.len() == 0 {
1095                     log::debug!("stream produce: zero-length write {}", self.0);
1096                     state.kind = HostStreamProducerStateKind::Wrote(0);
1097                     state.waker = Some(cx.waker().clone());
1098                     return Poll::Pending;
1099                 }
1100                 log::debug!("stream produce: write {}", self.0);
1101                 match remaining {
1102                     Some(amt) => {
1103                         // If the guest is doing a zero-length read then we've
1104                         // got some data for them. Complete the read but leave
1105                         // ourselves in the same `Writing` state as before.
1106                         if amt == 0 {
1107                             state.waker = None;
1108                             return Poll::Ready(Ok(StreamResult::Completed));
1109                         }
1110 
1111                         // Don't let wasmtime buffer up data for us, so truncate
1112                         // the buffer we're sending over to the amount that the
1113                         // reader is requesting.
1114                         if amt < buf.len() {
1115                             buf.truncate(amt);
1116                         }
1117                     }
1118 
1119                     // At this time host<->host stream reads/writes aren't
1120                     // fuzzed since that brings up a bunch of weird edge cases
1121                     // which aren't fun to deal with and aren't interesting
1122                     // either.
1123                     None => unreachable!(),
1124                 }
1125                 let count = buf.len() as u32;
1126                 dst.set_buffer(mem::take(buf).into());
1127                 state.kind = HostStreamProducerStateKind::Wrote(count);
1128                 state.waker = None;
1129                 Poll::Ready(Ok(StreamResult::Completed))
1130             }
1131         }
1132     }
1133 }
1134 
1135 impl Drop for HostStreamProducer {
1136     fn drop(&mut self) {
1137         log::debug!("stream produce: drop {}", self.0);
1138     }
1139 }
1140 
1141 struct SharedStream(Scope);
1142 
1143 impl SharedStream {
1144     async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1145         std::future::poll_fn(|cx| {
1146             accessor.with(|mut store| {
1147                 self.poll(cx, store.as_context_mut(), false)
1148                     .map(|pair| match pair {
1149                         (None, StreamResult::Dropped) => None,
1150                         (Some(item), StreamResult::Completed) => Some(item),
1151                         _ => unreachable!(),
1152                     })
1153             })
1154         })
1155         .await
1156     }
1157 
1158     fn poll(
1159         &mut self,
1160         cx: &mut Context<'_>,
1161         mut store: StoreContextMut<'_, Data>,
1162         finish: bool,
1163     ) -> Poll<(Option<Command>, StreamResult)> {
1164         let data = store.data_mut();
1165 
1166         // If no more commands remain then this is a closed and dropped stream.
1167         let Some((scope, command)) = data.commands.last_mut() else {
1168             log::debug!("Stream closed: {:?}", self.0);
1169             return Poll::Ready((None, StreamResult::Dropped));
1170         };
1171 
1172         // If the next queued up command is for the scope that this stream is
1173         // attached to then send off the command.
1174         if *scope == self.0 {
1175             let ret = Some(*command);
1176 
1177             // All commands are followed up with an "ack", and after the "ack"
1178             // is delivered then the command is popped to move on to the next
1179             // command. The reason for this is to guarantee that a command has
1180             // been processed before moving on to the next command. This helps
1181             // make the fuzzing easier to work with by being able to implicitly
1182             // assume that a command has been processed by the time something
1183             // else is. Otherwise it might be possible that wasmtime has a set
1184             // of commands/callbacks that are all delivered at the same time and
1185             // the component model doesn't specify what order they happen
1186             // within. By forcing an "ack" it ensures a more expected ordering
1187             // of execution to assist with fuzzing without losing really all
1188             // that much coverage.
1189             if matches!(command, Command::Ack) {
1190                 data.commands.pop();
1191             } else {
1192                 *command = Command::Ack;
1193             }
1194 
1195             // After a command was popped other streams may be able to make
1196             // progress so wake them all up.
1197             for (_, waker) in data.wakers.drain() {
1198                 waker.wake();
1199             }
1200             log::debug!("Delivering command {ret:?} for {:?}", self.0);
1201             return Poll::Ready((ret, StreamResult::Completed));
1202         }
1203 
1204         // The command queue is non-empty and the next command isn't meant for
1205         // us, so someone else needs to drain the queue. Enqueue our waker.
1206         if finish {
1207             Poll::Ready((None, StreamResult::Cancelled))
1208         } else {
1209             data.wakers.insert(self.0, cx.waker().clone());
1210             Poll::Pending
1211         }
1212     }
1213 }
1214 
1215 impl StreamProducer<Data> for SharedStream {
1216     type Item = Command;
1217     type Buffer = Option<Command>;
1218 
1219     fn poll_produce<'a>(
1220         mut self: Pin<&mut Self>,
1221         cx: &mut Context<'_>,
1222         store: StoreContextMut<'a, Data>,
1223         mut destination: Destination<'a, Self::Item, Self::Buffer>,
1224         finish: bool,
1225     ) -> Poll<Result<StreamResult>> {
1226         let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1227         destination.set_buffer(item);
1228         Poll::Ready(Ok(result))
1229     }
1230 }
1231 
1232 #[cfg(test)]
1233 mod tests {
1234     use super::{ComponentAsync, Scope, init, run};
1235     use crate::oracles::component_async::types::*;
1236     use crate::test::test_n_times;
1237     use Scope::*;
1238 
1239     #[test]
1240     fn smoke() {
1241         init();
1242 
1243         test_n_times(50, |c, _| {
1244             run(c);
1245             Ok(())
1246         });
1247     }
1248 
1249     // ========================================================================
1250     // A series of fuzz-generated test cases which caused problems during the
1251     // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1252     // changes over time.
1253 
1254     #[test]
1255     fn simple() {
1256         init();
1257 
1258         run(ComponentAsync {
1259             commands: vec![
1260                 (GuestCaller, Command::AsyncPendingImportCall(0)),
1261                 (GuestCallee, Command::AsyncPendingImportCall(1)),
1262                 (GuestCallee, Command::AsyncPendingExportComplete(0)),
1263                 (GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1264                 (GuestCaller, Command::AsyncPendingImportCall(2)),
1265             ],
1266         });
1267     }
1268 
1269     #[test]
1270     fn somewhat_larger() {
1271         static COMMANDS: &[(Scope, Command)] = &[
1272             (GuestCallee, Command::FutureNew(0)),
1273             (HostCaller, Command::FutureNew(1)),
1274             (GuestCallee, Command::FutureReadPending(0)),
1275             (GuestCaller, Command::AsyncPendingImportCall(2)),
1276             (GuestCaller, Command::AsyncPendingImportCall(3)),
1277             (GuestCaller, Command::AsyncPendingImportCall(4)),
1278             (GuestCaller, Command::AsyncPendingImportCall(5)),
1279             (GuestCallee, Command::AsyncPendingExportComplete(5)),
1280             (GuestCallee, Command::AsyncPendingExportComplete(3)),
1281             (GuestCallee, Command::AsyncPendingExportComplete(4)),
1282             (GuestCallee, Command::AsyncPendingExportComplete(2)),
1283             (GuestCaller, Command::AsyncPendingImportCall(6)),
1284             (GuestCallee, Command::AsyncPendingExportComplete(6)),
1285             (GuestCaller, Command::AsyncPendingImportCall(7)),
1286             (GuestCallee, Command::AsyncPendingExportComplete(7)),
1287             (GuestCaller, Command::AsyncPendingImportCall(8)),
1288             (GuestCallee, Command::AsyncPendingExportComplete(8)),
1289             (GuestCaller, Command::AsyncPendingImportCall(9)),
1290             (GuestCallee, Command::AsyncPendingExportComplete(9)),
1291             (GuestCaller, Command::AsyncPendingImportCall(10)),
1292             (GuestCallee, Command::AsyncPendingExportComplete(10)),
1293             (GuestCaller, Command::AsyncPendingImportCall(11)),
1294             (GuestCallee, Command::AsyncPendingExportComplete(11)),
1295             (GuestCaller, Command::AsyncPendingImportCall(12)),
1296             (GuestCallee, Command::AsyncPendingExportComplete(12)),
1297             (GuestCaller, Command::AsyncPendingImportCall(13)),
1298             (GuestCallee, Command::AsyncPendingExportComplete(13)),
1299             (GuestCaller, Command::AsyncPendingImportCall(14)),
1300             (GuestCallee, Command::AsyncPendingExportComplete(14)),
1301             (GuestCaller, Command::AsyncPendingImportCall(15)),
1302             (GuestCallee, Command::AsyncPendingExportComplete(15)),
1303             (GuestCaller, Command::AsyncPendingImportCall(16)),
1304             (GuestCallee, Command::AsyncPendingExportComplete(16)),
1305             (GuestCaller, Command::AsyncPendingImportCall(17)),
1306             (GuestCallee, Command::AsyncPendingExportComplete(17)),
1307             (GuestCaller, Command::AsyncPendingImportCall(18)),
1308             (GuestCallee, Command::AsyncPendingExportComplete(18)),
1309             (GuestCaller, Command::AsyncPendingImportCall(19)),
1310             (GuestCallee, Command::AsyncPendingExportComplete(19)),
1311             (GuestCaller, Command::AsyncPendingImportCall(20)),
1312             (GuestCallee, Command::AsyncPendingExportComplete(20)),
1313             (GuestCaller, Command::AsyncPendingImportCall(21)),
1314             (GuestCallee, Command::AsyncPendingExportComplete(21)),
1315             (GuestCaller, Command::AsyncPendingImportCall(22)),
1316             (GuestCallee, Command::AsyncPendingExportComplete(22)),
1317             (GuestCaller, Command::AsyncPendingImportCall(23)),
1318             (GuestCallee, Command::AsyncPendingExportComplete(23)),
1319             (GuestCaller, Command::AsyncPendingImportCall(24)),
1320             (GuestCallee, Command::AsyncPendingExportComplete(24)),
1321             (GuestCaller, Command::AsyncPendingImportCall(25)),
1322             (GuestCallee, Command::AsyncPendingExportComplete(25)),
1323             (GuestCaller, Command::AsyncPendingImportCall(26)),
1324             (GuestCallee, Command::AsyncPendingExportComplete(26)),
1325             (GuestCaller, Command::AsyncPendingImportCall(27)),
1326             (GuestCallee, Command::AsyncPendingExportComplete(27)),
1327             (GuestCaller, Command::AsyncPendingImportCall(28)),
1328             (GuestCallee, Command::AsyncPendingExportComplete(28)),
1329             (GuestCaller, Command::AsyncPendingImportCall(29)),
1330             (GuestCallee, Command::AsyncPendingExportComplete(29)),
1331             (GuestCaller, Command::AsyncPendingImportCall(30)),
1332             (GuestCallee, Command::AsyncPendingExportComplete(30)),
1333             (GuestCaller, Command::AsyncPendingImportCall(31)),
1334             (GuestCallee, Command::AsyncPendingExportComplete(31)),
1335             (GuestCaller, Command::AsyncPendingImportCall(32)),
1336             (GuestCallee, Command::AsyncPendingExportComplete(32)),
1337             (GuestCaller, Command::AsyncPendingImportCall(33)),
1338             (GuestCallee, Command::AsyncPendingExportComplete(33)),
1339             (GuestCaller, Command::AsyncPendingImportCall(34)),
1340             (GuestCallee, Command::AsyncPendingExportComplete(34)),
1341             (GuestCaller, Command::AsyncPendingImportCall(35)),
1342             (GuestCallee, Command::AsyncPendingExportComplete(35)),
1343             (GuestCaller, Command::AsyncPendingImportCall(36)),
1344             (GuestCallee, Command::AsyncPendingExportComplete(36)),
1345             (GuestCaller, Command::AsyncPendingImportCall(37)),
1346             (GuestCallee, Command::AsyncPendingExportComplete(37)),
1347             (GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1348         ];
1349         init();
1350 
1351         run(ComponentAsync {
1352             commands: COMMANDS.to_vec(),
1353         });
1354     }
1355 
1356     #[test]
1357     fn simple_stream1() {
1358         init();
1359 
1360         run(ComponentAsync {
1361             commands: vec![
1362                 (HostCallee, Command::StreamNew(1)),
1363                 (
1364                     HostCallee,
1365                     Command::StreamReadPending(StreamReadPayload {
1366                         stream: 1,
1367                         count: 2,
1368                     }),
1369                 ),
1370                 (HostCallee, Command::StreamCancelRead(1)),
1371                 (GuestCaller, Command::SyncReadyCall),
1372                 (
1373                     HostCallee,
1374                     Command::StreamWritePending(StreamWritePayload {
1375                         stream: 1,
1376                         item: 3,
1377                         count: 2,
1378                     }),
1379                 ),
1380                 (HostCallee, Command::StreamCancelWrite(1)),
1381                 (HostCallee, Command::StreamDropWritable(1)),
1382                 (
1383                     HostCallee,
1384                     Command::StreamReadDropped(StreamReadPayload {
1385                         stream: 1,
1386                         count: 1,
1387                     }),
1388                 ),
1389             ],
1390         });
1391     }
1392 
1393     #[test]
1394     fn simple_stream3() {
1395         init();
1396 
1397         run(ComponentAsync {
1398             commands: vec![
1399                 (GuestCaller, Command::StreamNew(26)),
1400                 (
1401                     GuestCaller,
1402                     Command::StreamReadPending(StreamReadPayload {
1403                         stream: 26,
1404                         count: 10,
1405                     }),
1406                 ),
1407                 (GuestCaller, Command::StreamDropWritable(26)),
1408                 (GuestCaller, Command::StreamReadAssertDropped(26)),
1409             ],
1410         });
1411     }
1412 
1413     #[test]
1414     fn simple_stream4() {
1415         init();
1416 
1417         run(ComponentAsync {
1418             commands: vec![
1419                 (GuestCaller, Command::StreamNew(23)),
1420                 (
1421                     GuestCaller,
1422                     Command::StreamWritePending(StreamWritePayload {
1423                         stream: 23,
1424                         item: 24,
1425                         count: 2,
1426                     }),
1427                 ),
1428                 (GuestCaller, Command::StreamGive(23)),
1429                 (GuestCallee, Command::StreamDropReadable(23)),
1430                 (
1431                     GuestCaller,
1432                     Command::StreamWriteAssertDropped(StreamReadPayload {
1433                         stream: 23,
1434                         count: 0,
1435                     }),
1436                 ),
1437             ],
1438         });
1439     }
1440 
1441     #[test]
1442     fn zero_length_behavior() {
1443         init();
1444 
1445         run(ComponentAsync {
1446             commands: vec![
1447                 (GuestCaller, Command::StreamNew(10)),
1448                 (HostCaller, Command::StreamTake(10)),
1449                 (
1450                     GuestCaller,
1451                     Command::StreamWritePending(StreamWritePayload {
1452                         stream: 10,
1453                         item: 13,
1454                         count: 5,
1455                     }),
1456                 ),
1457                 (
1458                     HostCaller,
1459                     Command::StreamReadReady(StreamReadyPayload {
1460                         stream: 10,
1461                         item: 0,
1462                         op_count: 0,
1463                         ready_count: 0,
1464                     }),
1465                 ),
1466                 (
1467                     HostCaller,
1468                     Command::StreamReadReady(StreamReadyPayload {
1469                         stream: 10,
1470                         item: 0,
1471                         op_count: 0,
1472                         ready_count: 0,
1473                     }),
1474                 ),
1475             ],
1476         });
1477     }
1478 }
1479