1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2 
3 use crate::block_on;
4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8 use futures::channel::oneshot;
9 use std::collections::{HashMap, HashSet};
10 use std::mem;
11 use std::pin::Pin;
12 use std::sync::{Arc, OnceLock, Weak};
13 use std::task::{Context, Poll, Waker};
14 use std::time::Instant;
15 use wasmtime::component::{
16     Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17     FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18     StreamReader, StreamResult, VecBuffer,
19 };
20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22 
23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24 
25 /// Initializes state for future fuzz runs.
26 ///
27 /// This will create an `Engine` to run this fuzzer within and it will
28 /// additionally precompile the component that will be used for fuzzing.
29 ///
30 /// There are a few points of note about this:
31 ///
32 /// * The `misc` fuzzer is manually instrumented with this function as the init
33 ///   hook to ensure this runs before any other fuzzing.
34 ///
35 /// * Compilation of the component takes quite some time with
36 ///   fuzzing-instrumented Cranelift. To assist with local development this
37 ///   implements a cache which is serialized/deserialized via an env var.
init()38 pub fn init() {
39     crate::init_fuzzing();
40 
41     STATE.get_or_init(|| {
42         let mut config = Config::new();
43         config.wasm_component_model_async(true);
44         let engine = Engine::new(&config).unwrap();
45         let component = compile(&engine);
46         let mut linker = Linker::new(&engine);
47         wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
48         async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
49         types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50 
51         let pre = linker.instantiate_pre(&component).unwrap();
52         let pre = FuzzAsyncPre::new(pre).unwrap();
53 
54         (engine, pre)
55     });
56 
57     fn compile(engine: &Engine) -> Component {
58         let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
59         let wasm = test_programs_artifacts::fuzz_async_component_bytes!();
60         let wasm = &wasm[..];
61         let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
62         if let Some(path) = &cwasm_cache
63             && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
64             && let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified())
65             && cwasm_mtime > wasm_mtime
66         {
67             log::debug!("Using cached component async cwasm at {path}");
68             unsafe {
69                 return Component::deserialize_file(engine, path).unwrap();
70             }
71         }
72 
73         let composition = {
74             let mut config = wasm_compose::config::Config::default();
75             let tempdir = tempfile::TempDir::new().unwrap();
76             let path = tempdir.path().join("fuzz-async.wasm");
77             std::fs::write(&path, wasm).unwrap();
78             config.definitions.push(path.clone());
79 
80             wasm_compose::composer::ComponentComposer::new(&path, &config)
81                 .compose()
82                 .unwrap()
83         };
84         let start = Instant::now();
85         let component = Component::new(&engine, &composition).unwrap();
86         if let Some(path) = cwasm_cache {
87             log::debug!("Caching component async cwasm to {path}");
88             std::fs::write(path, &component.serialize().unwrap()).unwrap();
89         } else if start.elapsed() > std::time::Duration::from_secs(1) {
90             eprintln!(
91                 "
92 !!!!!!!!!!!!!!!!!!!!!!!!!!
93 
94 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
95 cache compilation results
96 
97 !!!!!!!!!!!!!!!!!!!!!!!!!!
98 "
99             );
100         }
101         return component;
102     }
103 }
104 
105 #[derive(Default)]
106 struct Data {
107     ctx: WasiCtx,
108     table: ResourceTable,
109     wakers: HashMap<Scope, Waker>,
110     commands: Vec<(Scope, Command)>,
111 
112     guest_caller_stream: Option<StreamReader<Command>>,
113     guest_callee_stream: Option<StreamReader<Command>>,
114 
115     host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
116     host_pending_async_calls_cancelled: HashSet<u32>,
117     guest_pending_async_calls_ready: HashSet<u32>,
118 
119     // State of futures/streams. Note that while #12091 is unresolved an
120     // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
121     // and the various halves we're interacting with using traits.
122     host_futures: HashMap<u32, FutureReader<u32>>,
123     host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
124     host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
125     host_streams: HashMap<u32, StreamReader<u32>>,
126     host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
127     host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
128 }
129 
130 impl WasiView for Data {
ctx(&mut self) -> WasiCtxView<'_>131     fn ctx(&mut self) -> WasiCtxView<'_> {
132         WasiCtxView {
133             ctx: &mut self.ctx,
134             table: &mut self.table,
135         }
136     }
137 }
138 
139 impl async_test::HostWithStore for HasSelf<Data> {
async_ready<T>(_store: &Accessor<T, Self>)140     async fn async_ready<T>(_store: &Accessor<T, Self>) {}
141 
async_pending<T>(store: &Accessor<T, Self>, id: u32)142     async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
143         let (tx, rx) = oneshot::channel();
144         store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
145         let record = RecordCancelOnDrop { store, id };
146         rx.await.unwrap();
147         mem::forget(record);
148 
149         struct RecordCancelOnDrop<'a, T: 'static> {
150             store: &'a Accessor<T, HasSelf<Data>>,
151             id: u32,
152         }
153 
154         impl<T> Drop for RecordCancelOnDrop<'_, T> {
155             fn drop(&mut self) {
156                 self.store.with(|mut s| {
157                     s.get().host_pending_async_calls_cancelled.insert(self.id);
158                 });
159             }
160         }
161     }
162 
init<T>(_store: &Accessor<T, Self>, _scope: types::Scope)163     async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
164 }
165 
166 impl async_test::Host for Data {
sync_ready(&mut self)167     fn sync_ready(&mut self) {}
168 
future_take(&mut self, id: u32) -> FutureReader<u32>169     fn future_take(&mut self, id: u32) -> FutureReader<u32> {
170         self.host_futures.remove(&id).unwrap()
171     }
172 
future_receive(&mut self, id: u32, future: FutureReader<u32>)173     fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
174         let prev = self.host_futures.insert(id, future);
175         assert!(prev.is_none());
176     }
177 
stream_take(&mut self, id: u32) -> StreamReader<u32>178     fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
179         self.host_streams.remove(&id).unwrap()
180     }
181 
stream_receive(&mut self, id: u32, stream: StreamReader<u32>)182     fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
183         let prev = self.host_streams.insert(id, stream);
184         assert!(prev.is_none());
185     }
186 }
187 
188 impl types::HostWithStore for HasSelf<Data> {
get_commands<T>( mut store: Access<'_, T, Self>, scope: types::Scope, ) -> StreamReader<Command>189     fn get_commands<T>(
190         mut store: Access<'_, T, Self>,
191         scope: types::Scope,
192     ) -> StreamReader<Command> {
193         let data = store.get();
194         match scope {
195             types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
196             types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
197         }
198     }
199 }
200 
201 impl types::Host for Data {}
202 
203 /// Executes the `input` provided, assuming that `init` has been previously
204 /// executed.
run(mut input: ComponentAsync)205 pub fn run(mut input: ComponentAsync) {
206     log::debug!("Running component async fuzz test with\n{input:?}");
207 
208     // Commands are executed in the order that they're listed in the input, but
209     // to make it easier on the `StreamProducer` implementation below they're
210     // popped off the back. To ensure that they're all delivered in the right
211     // order reverse the list to ensure the correct order is maintained.
212     input.commands.reverse();
213 
214     let (engine, pre) = STATE.get().unwrap();
215     let mut store = Store::new(
216         engine,
217         Data {
218             ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
219             commands: input.commands,
220             ..Data::default()
221         },
222     );
223 
224     let guest_caller_stream =
225         StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)).unwrap();
226     let guest_callee_stream =
227         StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)).unwrap();
228     store.data_mut().guest_caller_stream = Some(guest_caller_stream);
229     store.data_mut().guest_callee_stream = Some(guest_callee_stream);
230     block_on(async {
231         let instance = pre.instantiate_async(&mut store).await.unwrap();
232         let test = instance.wasmtime_fuzz_fuzz_async_test();
233 
234         let mut host_caller = SharedStream(Scope::HostCaller);
235         let mut host_callee = SharedStream(Scope::HostCallee);
236         store
237             .run_concurrent(async |store| {
238                 // Kick off stream reads in the guest. This function will return
239                 // but the tasks in the guest will keep running after they
240                 // return to process stream items.
241                 test.call_init(store, types::Scope::Caller).await.unwrap();
242 
243                 // Simultaneously process commands from both host streams. These
244                 // will return once the entire command queue is exhausted.
245                 futures::join!(
246                     async {
247                         while let Some(cmd) = host_caller.next(store).await {
248                             host_caller_cmd(&test, store, cmd).await;
249                         }
250                     },
251                     async {
252                         while let Some(cmd) = host_callee.next(store).await {
253                             host_callee_cmd(store, cmd).await;
254                         }
255                     },
256                 );
257 
258                 // Note that there may still be pending async work in the guest
259                 // (or host). It's intentional that it's not cleaned up here to
260                 // help test situations where async work is all abruptly
261                 // cancelled by just being dropped in the host.
262             })
263             .await
264             .unwrap();
265     });
266 }
267 
268 /// See documentation in `fuzz_async.rs` for what's going on here.
test_property<F>(store: &Accessor<Data>, mut f: F) -> bool where F: FnMut(&mut Data) -> bool,269 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
270 where
271     F: FnMut(&mut Data) -> bool,
272 {
273     for _ in 0..1000 {
274         let ready = store.with(|mut s| f(s.get()));
275         if ready {
276             return true;
277         }
278 
279         crate::YieldN(1).await;
280     }
281 
282     return false;
283 }
284 
await_property<F>(store: &Accessor<Data>, desc: &str, f: F) where F: FnMut(&mut Data) -> bool,285 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
286 where
287     F: FnMut(&mut Data) -> bool,
288 {
289     assert!(
290         test_property(store, f).await,
291         "timed out waiting for {desc}",
292     );
293 }
294 
host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command)295 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
296     match cmd {
297         Command::Ack => {}
298         Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
299         Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
300         Command::AsyncPendingExportComplete(_i) => todo!(),
301         Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
302         Command::AsyncPendingImportCall(i) => {
303             struct RunPendingImport {
304                 test: Guest,
305                 i: u32,
306             }
307 
308             store.spawn(RunPendingImport {
309                 test: test.clone(),
310                 i,
311             });
312 
313             impl AccessorTask<Data> for RunPendingImport {
314                 async fn run(self, store: &Accessor<Data>) -> Result<()> {
315                     self.test.call_async_pending(store, self.i).await?;
316                     store.with(|mut s| {
317                         s.get().guest_pending_async_calls_ready.insert(self.i);
318                     });
319                     Ok(())
320                 }
321             }
322         }
323         Command::AsyncPendingImportCancel(_i) => todo!(),
324         Command::AsyncPendingImportAssertReady(i) => {
325             assert!(
326                 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
327                 "expected async_pending import {i} to be ready",
328             );
329         }
330 
331         Command::FutureTake(i) => {
332             let future = test.call_future_take(store, i).await.unwrap();
333             store.with(|mut s| {
334                 let prev = s.get().host_futures.insert(i, future);
335                 assert!(prev.is_none());
336             });
337         }
338         Command::FutureGive(i) => {
339             let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
340             test.call_future_receive(store, i, future).await.unwrap();
341         }
342         Command::StreamTake(i) => {
343             let stream = test.call_stream_take(store, i).await.unwrap();
344             store.with(|mut s| {
345                 let prev = s.get().host_streams.insert(i, stream);
346                 assert!(prev.is_none());
347             });
348         }
349         Command::StreamGive(i) => {
350             let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
351             test.call_stream_receive(store, i, stream).await.unwrap();
352         }
353 
354         other => future_or_stream_cmd(store, other).await,
355     }
356 }
357 
host_callee_cmd(store: &Accessor<Data>, cmd: Command)358 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
359     match cmd {
360         Command::Ack => {}
361         Command::SyncReadyCall => todo!(),
362         Command::AsyncReadyCall => todo!(),
363         Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
364             s.get()
365                 .host_pending_async_calls
366                 .remove(&i)
367                 .unwrap()
368                 .send(())
369                 .unwrap();
370         }),
371         Command::AsyncPendingExportAssertCancelled(i) => {
372             assert!(
373                 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
374                 "expected async_pending export {i} to be cancelled",
375             );
376         }
377         Command::AsyncPendingImportCall(_i) => todo!(),
378         Command::AsyncPendingImportCancel(_i) => todo!(),
379         Command::AsyncPendingImportAssertReady(_i) => todo!(),
380 
381         other => future_or_stream_cmd(store, other).await,
382     }
383 }
384 
future_or_stream_cmd(store: &Accessor<Data>, cmd: Command)385 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
386     match cmd {
387         // These commands should be handled above
388         Command::Ack
389         | Command::SyncReadyCall
390         | Command::AsyncReadyCall
391         | Command::AsyncPendingExportComplete(_)
392         | Command::AsyncPendingExportAssertCancelled(_)
393         | Command::AsyncPendingImportCall(_)
394         | Command::AsyncPendingImportCancel(_)
395         | Command::FutureTake(_)
396         | Command::FutureGive(_)
397         | Command::StreamTake(_)
398         | Command::StreamGive(_)
399         | Command::AsyncPendingImportAssertReady(_) => unreachable!(),
400 
401         Command::FutureNew(id) => {
402             store.with(|mut s| {
403                 let arc = Arc::new(());
404                 let weak = Arc::downgrade(&arc);
405                 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)).unwrap();
406                 let data = s.get();
407                 let prev = data.host_futures.insert(id, future);
408                 assert!(prev.is_none());
409                 let prev = data
410                     .host_future_producers
411                     .insert(id, (HostFutureProducerState::Idle, weak));
412                 assert!(prev.is_none());
413             });
414         }
415         Command::FutureDropReadable(id) => {
416             store.with(|mut s| match s.get().host_futures.remove(&id) {
417                 Some(mut future) => future.close(&mut s).unwrap(),
418                 None => {
419                     let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
420                     state.wake_by_ref();
421                 }
422             })
423         }
424         Command::FutureWriteReady(payload) => {
425             await_property(store, "future write should be waiting", |s| {
426                 matches!(
427                     s.host_future_producers.get(&payload.future),
428                     Some((HostFutureProducerState::Waiting(_), _))
429                 )
430             })
431             .await;
432             store.with(|mut s| {
433                 let state = s
434                     .get()
435                     .host_future_producers
436                     .get_mut(&payload.future)
437                     .unwrap();
438                 match state {
439                     (HostFutureProducerState::Waiting(waker), _) => {
440                         waker.wake_by_ref();
441                         state.0 = HostFutureProducerState::Writing(payload.item);
442                     }
443                     (state, _) => panic!("future not waiting: {state:?}"),
444                 }
445             })
446         }
447         Command::FutureWritePending(payload) => store.with(|mut s| {
448             let state = s
449                 .get()
450                 .host_future_producers
451                 .get_mut(&payload.future)
452                 .unwrap();
453             match state {
454                 (HostFutureProducerState::Idle, _) => {
455                     state.0 = HostFutureProducerState::Writing(payload.item);
456                 }
457                 _ => panic!("future not idle"),
458             }
459         }),
460         Command::FutureWriteDropped(id) => store.with(|mut s| {
461             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
462             assert!(matches!(state, HostFutureProducerState::Idle));
463             assert!(weak.upgrade().is_none());
464         }),
465         Command::FutureReadReady(payload) => {
466             let id = payload.future;
467             store.with(|mut s| {
468                 let arc = Arc::new(());
469                 let weak = Arc::downgrade(&arc);
470                 let data = s.get();
471                 let future = data.host_futures.remove(&id).unwrap();
472                 let prev = data
473                     .host_future_consumers
474                     .insert(id, (HostFutureConsumerState::Consuming, weak));
475                 assert!(prev.is_none());
476                 future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap();
477             });
478 
479             await_property(store, "future should be present", |s| {
480                 matches!(
481                     s.host_future_consumers[&id],
482                     (HostFutureConsumerState::Complete(_), _)
483                 )
484             })
485             .await;
486 
487             store.with(|mut s| {
488                 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
489                 match state {
490                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
491                     _ => panic!("future not complete"),
492                 }
493             });
494         }
495         Command::FutureReadPending(id) => {
496             ensure_future_reading(store, id);
497             store.with(|mut s| {
498                 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
499                 state.wake_by_ref();
500                 assert!(
501                     matches!(state, HostFutureConsumerState::Idle),
502                     "bad state: {state:?}",
503                 );
504                 *state = HostFutureConsumerState::Consuming;
505             })
506         }
507         Command::FutureCancelWrite(id) => store.with(|mut s| {
508             let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
509             assert!(matches!(state, HostFutureProducerState::Writing(_)));
510             *state = HostFutureProducerState::Idle;
511         }),
512         Command::FutureCancelRead(id) => store.with(|mut s| {
513             let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
514             assert!(matches!(state, HostFutureConsumerState::Consuming));
515             *state = HostFutureConsumerState::Idle;
516         }),
517         Command::FutureReadAssertComplete(payload) => {
518             await_property(store, "future read should be complete", |s| {
519                 matches!(
520                     s.host_future_consumers.get(&payload.future),
521                     Some((HostFutureConsumerState::Complete(_), _))
522                 )
523             })
524             .await;
525             store.with(|mut s| {
526                 let (state, _) = s
527                     .get()
528                     .host_future_consumers
529                     .remove(&payload.future)
530                     .unwrap();
531                 match state {
532                     HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
533                     _ => panic!("future not complete"),
534                 }
535             })
536         }
537         Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
538             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
539             assert!(matches!(state, HostFutureProducerState::Complete));
540             assert!(weak.upgrade().is_none());
541         }),
542         Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
543             let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
544             assert!(matches!(state, HostFutureProducerState::Writing(_)));
545             assert!(weak.upgrade().is_none());
546         }),
547 
548         Command::StreamNew(id) => {
549             store.with(|mut s| {
550                 let arc = Arc::new(());
551                 let weak = Arc::downgrade(&arc);
552                 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)).unwrap();
553                 let data = s.get();
554                 let prev = data.host_streams.insert(id, stream);
555                 assert!(prev.is_none());
556                 let prev = data
557                     .host_stream_producers
558                     .insert(id, (HostStreamProducerState::idle(), weak));
559                 assert!(prev.is_none());
560             });
561         }
562         Command::StreamDropReadable(id) => {
563             store.with(|mut s| match s.get().host_streams.remove(&id) {
564                 Some(mut stream) => stream.close(&mut s).unwrap(),
565                 None => {
566                     let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
567                     state.wake_by_ref();
568                 }
569             })
570         }
571         Command::StreamDropWritable(id) => store.with(|mut s| {
572             let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
573             state.wake_by_ref();
574         }),
575         Command::StreamWriteReady(payload) => {
576             let id = payload.stream;
577             store.with(|mut s| {
578                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
579                 state.wake_by_ref();
580                 match state.kind {
581                     HostStreamProducerStateKind::Idle => {
582                         state.kind = HostStreamProducerStateKind::Writing(stream_payload(
583                             payload.item,
584                             payload.op_count,
585                         ));
586                     }
587                     _ => panic!("stream not idle: {state:?}"),
588                 }
589             });
590             await_property(store, "stream should complete a write", |s| {
591                 matches!(
592                     s.host_stream_producers[&id].0.kind,
593                     HostStreamProducerStateKind::Wrote(_),
594                 )
595             })
596             .await;
597             store.with(|mut s| {
598                 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
599                 match state.kind {
600                     HostStreamProducerStateKind::Wrote(amt) => {
601                         assert_eq!(amt, payload.ready_count);
602                         state.kind = HostStreamProducerStateKind::Idle;
603                     }
604                     _ => panic!("stream not idle: {state:?}"),
605                 }
606             });
607         }
608         Command::StreamReadReady(payload) => {
609             let id = payload.stream;
610             ensure_stream_reading(store, id);
611             store.with(|mut s| {
612                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
613                 state.wake_by_ref();
614                 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
615             });
616             await_property(store, "stream should complete a read", |s| {
617                 matches!(
618                     s.host_stream_consumers[&id].0.kind,
619                     HostStreamConsumerStateKind::Consumed(_),
620                 )
621             })
622             .await;
623 
624             store.with(|mut s| {
625                 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
626                 match &state.kind {
627                     HostStreamConsumerStateKind::Consumed(last_read) => {
628                         assert_eq!(
629                             *last_read,
630                             stream_payload(payload.item, payload.ready_count)
631                         );
632                         state.kind = HostStreamConsumerStateKind::Idle;
633                     }
634                     _ => panic!("future not complete"),
635                 }
636             });
637         }
638         Command::StreamWritePending(payload) => store.with(|mut s| {
639             let (state, _) = s
640                 .get()
641                 .host_stream_producers
642                 .get_mut(&payload.stream)
643                 .unwrap();
644             state.wake_by_ref();
645             match state.kind {
646                 HostStreamProducerStateKind::Idle => {
647                     state.kind = HostStreamProducerStateKind::Writing(stream_payload(
648                         payload.item,
649                         payload.count,
650                     ));
651                 }
652                 _ => panic!("stream not idle {:?}", state.kind),
653             }
654         }),
655         Command::StreamReadPending(payload) => {
656             ensure_stream_reading(store, payload.stream);
657             store.with(|mut s| {
658                 let (state, _) = s
659                     .get()
660                     .host_stream_consumers
661                     .get_mut(&payload.stream)
662                     .unwrap();
663                 state.wake_by_ref();
664                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
665                 state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
666             })
667         }
668         Command::StreamWriteDropped(payload) => store.with(|mut s| {
669             let (state, weak) = s
670                 .get()
671                 .host_stream_producers
672                 .get_mut(&payload.stream)
673                 .unwrap();
674             assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
675             assert!(weak.upgrade().is_none());
676         }),
677         Command::StreamReadDropped(payload) => {
678             ensure_stream_reading(store, payload.stream);
679             await_property(store, "stream read should get dropped", |s| {
680                 let weak = &s.host_stream_consumers[&payload.stream].1;
681                 weak.upgrade().is_none()
682             })
683             .await;
684             store.with(|mut s| {
685                 let (state, weak) = s
686                     .get()
687                     .host_stream_consumers
688                     .get_mut(&payload.stream)
689                     .unwrap();
690                 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
691                 assert!(weak.upgrade().is_none());
692             })
693         }
694         Command::StreamCancelWrite(id) => store.with(|mut s| {
695             let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
696             assert!(
697                 matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
698                 "invalid state {state:?}",
699             );
700             state.kind = HostStreamProducerStateKind::Idle;
701             state.wake_by_ref();
702         }),
703         Command::StreamCancelRead(id) => store.with(|mut s| {
704             let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
705             assert!(matches!(
706                 state.kind,
707                 HostStreamConsumerStateKind::Consuming(_)
708             ));
709             state.kind = HostStreamConsumerStateKind::Idle;
710         }),
711         Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
712             let (state, _) = s
713                 .get()
714                 .host_stream_consumers
715                 .get_mut(&payload.stream)
716                 .unwrap();
717             match &state.kind {
718                 HostStreamConsumerStateKind::Consumed(last_read) => {
719                     assert_eq!(*last_read, stream_payload(payload.item, payload.count));
720                     state.kind = HostStreamConsumerStateKind::Idle;
721                 }
722                 _ => panic!("stream not complete"),
723             }
724         }),
725         Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
726             let (state, _) = s
727                 .get()
728                 .host_stream_producers
729                 .get_mut(&payload.stream)
730                 .unwrap();
731             match state.kind {
732                 HostStreamProducerStateKind::Wrote(amt) => {
733                     assert_eq!(amt, payload.count);
734                     state.kind = HostStreamProducerStateKind::Idle;
735                 }
736                 _ => panic!("stream not complete: {:?}", state.kind),
737             }
738         }),
739         Command::StreamWriteAssertDropped(payload) => {
740             await_property(store, "stream write should be dropped", |s| {
741                 let weak = &s.host_stream_producers[&payload.stream].1;
742                 weak.upgrade().is_none()
743             })
744             .await;
745             store.with(|mut s| {
746                 let (state, weak) = s
747                     .get()
748                     .host_stream_producers
749                     .get_mut(&payload.stream)
750                     .unwrap();
751                 assert!(matches!(
752                     state.kind,
753                     HostStreamProducerStateKind::Writing(_)
754                 ));
755                 assert!(weak.upgrade().is_none());
756             })
757         }
758         Command::StreamReadAssertDropped(id) => {
759             await_property(store, "stream read should be dropped", |s| {
760                 let weak = &s.host_stream_consumers[&id].1;
761                 weak.upgrade().is_none()
762             })
763             .await;
764             store.with(|mut s| {
765                 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
766                 assert!(matches!(
767                     state.kind,
768                     HostStreamConsumerStateKind::Consuming(_),
769                 ));
770                 assert!(weak.upgrade().is_none());
771             })
772         }
773     }
774 }
775 
stream_payload(item: u32, count: u32) -> Vec<u32>776 fn stream_payload(item: u32, count: u32) -> Vec<u32> {
777     (item..item + count).collect()
778 }
779 
ensure_future_reading(store: &Accessor<Data>, id: u32)780 fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
781     store.with(|mut s| {
782         let data = s.get();
783         if !data.host_futures.contains_key(&id) {
784             return;
785         }
786         log::debug!("future consume: start {id}");
787         let arc = Arc::new(());
788         let weak = Arc::downgrade(&arc);
789         let data = s.get();
790         let future = data.host_futures.remove(&id).unwrap();
791         let prev = data
792             .host_future_consumers
793             .insert(id, (HostFutureConsumerState::Idle, weak));
794         assert!(prev.is_none());
795         future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap();
796     });
797 }
798 
ensure_stream_reading(store: &Accessor<Data>, id: u32)799 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
800     store.with(|mut s| {
801         let data = s.get();
802         if !data.host_streams.contains_key(&id) {
803             return;
804         }
805         log::debug!("stream consume: start {id}");
806         let arc = Arc::new(());
807         let weak = Arc::downgrade(&arc);
808         let prev = data.host_stream_consumers.insert(
809             id,
810             (
811                 HostStreamConsumerState {
812                     kind: HostStreamConsumerStateKind::Idle,
813                     waker: None,
814                 },
815                 weak,
816             ),
817         );
818         assert!(prev.is_none());
819         let stream = data.host_streams.remove(&id).unwrap();
820         stream.pipe(&mut s, HostStreamConsumer(id, arc)).unwrap();
821     });
822 }
823 
824 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
825 
826 /// Note that this is only created once a read is actually initiated on a
827 /// future. It's also not possible to cancel a host-based read on a future,
828 /// hence why this is simpler than the `HostFutureProducerState` state below.
829 #[derive(Debug)]
830 enum HostFutureConsumerState {
831     Idle,
832     Waiting(Waker),
833     Consuming,
834     Complete(u32),
835 }
836 
837 impl HostFutureConsumerState {
wake_by_ref(&mut self)838     fn wake_by_ref(&mut self) {
839         if let HostFutureConsumerState::Waiting(waker) = &self {
840             waker.wake_by_ref();
841             *self = HostFutureConsumerState::Idle;
842         }
843     }
844 }
845 
846 impl FutureConsumer<Data> for HostFutureConsumer {
847     type Item = u32;
848 
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<()>>849     fn poll_consume(
850         self: Pin<&mut Self>,
851         cx: &mut Context<'_>,
852         mut store: StoreContextMut<'_, Data>,
853         mut source: Source<'_, Self::Item>,
854         finish: bool,
855     ) -> Poll<Result<()>> {
856         let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
857             Some(state) => state,
858             None => {
859                 log::debug!("consume: closed {}", self.0);
860                 return Poll::Ready(Ok(()));
861             }
862         };
863         match state.0 {
864             HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
865                 if finish {
866                     log::debug!("consume: cancel {}", self.0);
867                     state.0 = HostFutureConsumerState::Idle;
868                     Poll::Ready(Ok(()))
869                 } else {
870                     log::debug!("consume: wait {}", self.0);
871                     state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
872                     Poll::Pending
873                 }
874             }
875             HostFutureConsumerState::Consuming => {
876                 log::debug!("consume: done {}", self.0);
877                 let mut item = None;
878                 source.read(&mut store, &mut item).unwrap();
879                 store
880                     .data_mut()
881                     .host_future_consumers
882                     .get_mut(&self.0)
883                     .unwrap()
884                     .0 = HostFutureConsumerState::Complete(item.unwrap());
885                 Poll::Ready(Ok(()))
886             }
887             HostFutureConsumerState::Complete(_) => unreachable!(),
888         }
889     }
890 }
891 
892 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
893 
894 #[derive(Debug)]
895 enum HostFutureProducerState {
896     Idle,
897     Waiting(Waker),
898     Writing(u32),
899     Complete,
900 }
901 
902 impl FutureProducer<Data> for HostFutureProducer {
903     type Item = u32;
904 
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, finish: bool, ) -> Poll<Result<Option<Self::Item>>>905     fn poll_produce(
906         self: Pin<&mut Self>,
907         cx: &mut Context<'_>,
908         mut store: StoreContextMut<'_, Data>,
909         finish: bool,
910     ) -> Poll<Result<Option<Self::Item>>> {
911         let state = store
912             .data_mut()
913             .host_future_producers
914             .get_mut(&self.0)
915             .unwrap();
916         match state.0 {
917             HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
918                 if finish {
919                     log::debug!("produce: cancel {}", self.0);
920                     state.0 = HostFutureProducerState::Idle;
921                     Poll::Ready(Ok(None))
922                 } else {
923                     log::debug!("produce: wait {}", self.0);
924                     state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
925                     Poll::Pending
926                 }
927             }
928             HostFutureProducerState::Writing(item) => {
929                 log::debug!("produce: done {}", self.0);
930                 state.0 = HostFutureProducerState::Complete;
931                 Poll::Ready(Ok(Some(item)))
932             }
933             HostFutureProducerState::Complete => unreachable!(),
934         }
935     }
936 }
937 
938 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
939 
940 #[derive(Debug)]
941 struct HostStreamConsumerState {
942     waker: Option<Waker>,
943     kind: HostStreamConsumerStateKind,
944 }
945 
946 #[derive(Debug)]
947 enum HostStreamConsumerStateKind {
948     Idle,
949     Consuming(u32),
950     Consumed(Vec<u32>),
951 }
952 
953 impl HostStreamConsumerState {
wake_by_ref(&mut self)954     fn wake_by_ref(&mut self) {
955         if let Some(waker) = self.waker.take() {
956             waker.wake();
957         }
958     }
959 }
960 
961 impl StreamConsumer<Data> for HostStreamConsumer {
962     type Item = u32;
963 
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>964     fn poll_consume(
965         self: Pin<&mut Self>,
966         cx: &mut Context<'_>,
967         mut store: StoreContextMut<'_, Data>,
968         mut source: Source<'_, Self::Item>,
969         finish: bool,
970     ) -> Poll<Result<StreamResult>> {
971         let remaining = source.remaining(&mut store);
972         let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
973             Some((state, _)) => state,
974             None => {
975                 log::debug!("stream consume: dropped {}", self.0);
976                 return Poll::Ready(Ok(StreamResult::Dropped));
977             }
978         };
979         match state.kind {
980             HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
981                 if finish {
982                     log::debug!("stream consume: cancel {}", self.0);
983                     state.waker = None;
984                     Poll::Ready(Ok(StreamResult::Cancelled))
985                 } else {
986                     log::debug!("stream consume: wait {}", self.0);
987                     state.waker = Some(cx.waker().clone());
988                     Poll::Pending
989                 }
990             }
991             HostStreamConsumerStateKind::Consuming(amt) => {
992                 // The writer is performing a zero-length write. We always
993                 // complete that without updating our own state.
994                 if remaining == 0 {
995                     log::debug!("stream consume: completing zero-length write {}", self.0);
996                     return Poll::Ready(Ok(StreamResult::Completed));
997                 }
998 
999                 // If this is a zero-length read then block the writer but update our own state.
1000                 if amt == 0 {
1001                     log::debug!("stream consume: finishing zero-length read {}", self.0);
1002                     state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1003                     state.waker = Some(cx.waker().clone());
1004                     return Poll::Pending;
1005                 }
1006 
1007                 // For non-zero sizes perform the read/copy.
1008                 log::debug!("stream consume: done {}", self.0);
1009                 let mut dst = Vec::with_capacity(amt as usize);
1010                 source.read(&mut store, &mut dst).unwrap();
1011                 let state = &mut store
1012                     .data_mut()
1013                     .host_stream_consumers
1014                     .get_mut(&self.0)
1015                     .unwrap()
1016                     .0;
1017                 state.kind = HostStreamConsumerStateKind::Consumed(dst);
1018                 state.waker = None;
1019                 Poll::Ready(Ok(StreamResult::Completed))
1020             }
1021         }
1022     }
1023 }
1024 
1025 impl Drop for HostStreamConsumer {
drop(&mut self)1026     fn drop(&mut self) {
1027         log::debug!("stream consume: drop {}", self.0);
1028     }
1029 }
1030 
1031 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1032 
1033 #[derive(Debug)]
1034 struct HostStreamProducerState {
1035     kind: HostStreamProducerStateKind,
1036     waker: Option<Waker>,
1037 }
1038 
1039 #[derive(Debug)]
1040 enum HostStreamProducerStateKind {
1041     Idle,
1042     Writing(Vec<u32>),
1043     Wrote(u32),
1044 }
1045 
1046 impl HostStreamProducerState {
idle() -> Self1047     fn idle() -> Self {
1048         HostStreamProducerState {
1049             kind: HostStreamProducerStateKind::Idle,
1050             waker: None,
1051         }
1052     }
1053 
wake_by_ref(&mut self)1054     fn wake_by_ref(&mut self) {
1055         if let Some(waker) = self.waker.take() {
1056             waker.wake();
1057         }
1058     }
1059 }
1060 
1061 impl StreamProducer<Data> for HostStreamProducer {
1062     type Item = u32;
1063     type Buffer = VecBuffer<u32>;
1064 
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut dst: Destination<'_, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>1065     fn poll_produce(
1066         self: Pin<&mut Self>,
1067         cx: &mut Context<'_>,
1068         mut store: StoreContextMut<'_, Data>,
1069         mut dst: Destination<'_, Self::Item, Self::Buffer>,
1070         finish: bool,
1071     ) -> Poll<Result<StreamResult>> {
1072         let remaining = dst.remaining(&mut store);
1073         let data = store.data_mut();
1074         let state = match data.host_stream_producers.get_mut(&self.0) {
1075             Some((state, _)) => state,
1076             None => {
1077                 log::debug!("stream produce: dropped {}", self.0);
1078                 return Poll::Ready(Ok(StreamResult::Dropped));
1079             }
1080         };
1081         match &mut state.kind {
1082             HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1083                 if finish {
1084                     log::debug!("stream produce: cancel {}", self.0);
1085                     state.waker = None;
1086                     Poll::Ready(Ok(StreamResult::Cancelled))
1087                 } else {
1088                     log::debug!("stream produce: wait {}", self.0);
1089                     state.waker = Some(cx.waker().clone());
1090                     Poll::Pending
1091                 }
1092             }
1093             HostStreamProducerStateKind::Writing(buf) => {
1094                 // Keep the other side blocked for a zero-length write
1095                 // originated from the host.
1096                 if buf.len() == 0 {
1097                     log::debug!("stream produce: zero-length write {}", self.0);
1098                     state.kind = HostStreamProducerStateKind::Wrote(0);
1099                     state.waker = Some(cx.waker().clone());
1100                     return Poll::Pending;
1101                 }
1102                 log::debug!("stream produce: write {}", self.0);
1103                 match remaining {
1104                     Some(amt) => {
1105                         // If the guest is doing a zero-length read then we've
1106                         // got some data for them. Complete the read but leave
1107                         // ourselves in the same `Writing` state as before.
1108                         if amt == 0 {
1109                             state.waker = None;
1110                             return Poll::Ready(Ok(StreamResult::Completed));
1111                         }
1112 
1113                         // Don't let wasmtime buffer up data for us, so truncate
1114                         // the buffer we're sending over to the amount that the
1115                         // reader is requesting.
1116                         if amt < buf.len() {
1117                             buf.truncate(amt);
1118                         }
1119                     }
1120 
1121                     // At this time host<->host stream reads/writes aren't
1122                     // fuzzed since that brings up a bunch of weird edge cases
1123                     // which aren't fun to deal with and aren't interesting
1124                     // either.
1125                     None => unreachable!(),
1126                 }
1127                 let count = buf.len() as u32;
1128                 dst.set_buffer(mem::take(buf).into());
1129                 state.kind = HostStreamProducerStateKind::Wrote(count);
1130                 state.waker = None;
1131                 Poll::Ready(Ok(StreamResult::Completed))
1132             }
1133         }
1134     }
1135 }
1136 
1137 impl Drop for HostStreamProducer {
drop(&mut self)1138     fn drop(&mut self) {
1139         log::debug!("stream produce: drop {}", self.0);
1140     }
1141 }
1142 
1143 struct SharedStream(Scope);
1144 
1145 impl SharedStream {
next(&mut self, accessor: &Accessor<Data>) -> Option<Command>1146     async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1147         std::future::poll_fn(|cx| {
1148             accessor.with(|mut store| {
1149                 self.poll(cx, store.as_context_mut(), false)
1150                     .map(|pair| match pair {
1151                         (None, StreamResult::Dropped) => None,
1152                         (Some(item), StreamResult::Completed) => Some(item),
1153                         _ => unreachable!(),
1154                     })
1155             })
1156         })
1157         .await
1158     }
1159 
poll( &mut self, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, finish: bool, ) -> Poll<(Option<Command>, StreamResult)>1160     fn poll(
1161         &mut self,
1162         cx: &mut Context<'_>,
1163         mut store: StoreContextMut<'_, Data>,
1164         finish: bool,
1165     ) -> Poll<(Option<Command>, StreamResult)> {
1166         let data = store.data_mut();
1167 
1168         // If no more commands remain then this is a closed and dropped stream.
1169         let Some((scope, command)) = data.commands.last_mut() else {
1170             log::debug!("Stream closed: {:?}", self.0);
1171             return Poll::Ready((None, StreamResult::Dropped));
1172         };
1173 
1174         // If the next queued up command is for the scope that this stream is
1175         // attached to then send off the command.
1176         if *scope == self.0 {
1177             let ret = Some(*command);
1178 
1179             // All commands are followed up with an "ack", and after the "ack"
1180             // is delivered then the command is popped to move on to the next
1181             // command. The reason for this is to guarantee that a command has
1182             // been processed before moving on to the next command. This helps
1183             // make the fuzzing easier to work with by being able to implicitly
1184             // assume that a command has been processed by the time something
1185             // else is. Otherwise it might be possible that wasmtime has a set
1186             // of commands/callbacks that are all delivered at the same time and
1187             // the component model doesn't specify what order they happen
1188             // within. By forcing an "ack" it ensures a more expected ordering
1189             // of execution to assist with fuzzing without losing really all
1190             // that much coverage.
1191             if matches!(command, Command::Ack) {
1192                 data.commands.pop();
1193             } else {
1194                 *command = Command::Ack;
1195             }
1196 
1197             // After a command was popped other streams may be able to make
1198             // progress so wake them all up.
1199             for (_, waker) in data.wakers.drain() {
1200                 waker.wake();
1201             }
1202             log::debug!("Delivering command {ret:?} for {:?}", self.0);
1203             return Poll::Ready((ret, StreamResult::Completed));
1204         }
1205 
1206         // The command queue is non-empty and the next command isn't meant for
1207         // us, so someone else needs to drain the queue. Enqueue our waker.
1208         if finish {
1209             Poll::Ready((None, StreamResult::Cancelled))
1210         } else {
1211             data.wakers.insert(self.0, cx.waker().clone());
1212             Poll::Pending
1213         }
1214     }
1215 }
1216 
1217 impl StreamProducer<Data> for SharedStream {
1218     type Item = Command;
1219     type Buffer = Option<Command>;
1220 
poll_produce<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<'a, Data>, mut destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>1221     fn poll_produce<'a>(
1222         mut self: Pin<&mut Self>,
1223         cx: &mut Context<'_>,
1224         store: StoreContextMut<'a, Data>,
1225         mut destination: Destination<'a, Self::Item, Self::Buffer>,
1226         finish: bool,
1227     ) -> Poll<Result<StreamResult>> {
1228         let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1229         destination.set_buffer(item);
1230         Poll::Ready(Ok(result))
1231     }
1232 }
1233 
1234 #[cfg(test)]
1235 mod tests {
1236     use super::{ComponentAsync, Scope, init, run};
1237     use crate::oracles::component_async::types::*;
1238     use crate::test::test_n_times;
1239     use Scope::*;
1240 
1241     #[test]
smoke()1242     fn smoke() {
1243         init();
1244 
1245         test_n_times(50, |c, _| {
1246             run(c);
1247             Ok(())
1248         });
1249     }
1250 
1251     // ========================================================================
1252     // A series of fuzz-generated test cases which caused problems during the
1253     // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1254     // changes over time.
1255 
1256     #[test]
simple()1257     fn simple() {
1258         init();
1259 
1260         run(ComponentAsync {
1261             commands: vec![
1262                 (GuestCaller, Command::AsyncPendingImportCall(0)),
1263                 (GuestCallee, Command::AsyncPendingImportCall(1)),
1264                 (GuestCallee, Command::AsyncPendingExportComplete(0)),
1265                 (GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1266                 (GuestCaller, Command::AsyncPendingImportCall(2)),
1267             ],
1268         });
1269     }
1270 
1271     #[test]
somewhat_larger()1272     fn somewhat_larger() {
1273         static COMMANDS: &[(Scope, Command)] = &[
1274             (GuestCallee, Command::FutureNew(0)),
1275             (HostCaller, Command::FutureNew(1)),
1276             (GuestCallee, Command::FutureReadPending(0)),
1277             (GuestCaller, Command::AsyncPendingImportCall(2)),
1278             (GuestCaller, Command::AsyncPendingImportCall(3)),
1279             (GuestCaller, Command::AsyncPendingImportCall(4)),
1280             (GuestCaller, Command::AsyncPendingImportCall(5)),
1281             (GuestCallee, Command::AsyncPendingExportComplete(5)),
1282             (GuestCallee, Command::AsyncPendingExportComplete(3)),
1283             (GuestCallee, Command::AsyncPendingExportComplete(4)),
1284             (GuestCallee, Command::AsyncPendingExportComplete(2)),
1285             (GuestCaller, Command::AsyncPendingImportCall(6)),
1286             (GuestCallee, Command::AsyncPendingExportComplete(6)),
1287             (GuestCaller, Command::AsyncPendingImportCall(7)),
1288             (GuestCallee, Command::AsyncPendingExportComplete(7)),
1289             (GuestCaller, Command::AsyncPendingImportCall(8)),
1290             (GuestCallee, Command::AsyncPendingExportComplete(8)),
1291             (GuestCaller, Command::AsyncPendingImportCall(9)),
1292             (GuestCallee, Command::AsyncPendingExportComplete(9)),
1293             (GuestCaller, Command::AsyncPendingImportCall(10)),
1294             (GuestCallee, Command::AsyncPendingExportComplete(10)),
1295             (GuestCaller, Command::AsyncPendingImportCall(11)),
1296             (GuestCallee, Command::AsyncPendingExportComplete(11)),
1297             (GuestCaller, Command::AsyncPendingImportCall(12)),
1298             (GuestCallee, Command::AsyncPendingExportComplete(12)),
1299             (GuestCaller, Command::AsyncPendingImportCall(13)),
1300             (GuestCallee, Command::AsyncPendingExportComplete(13)),
1301             (GuestCaller, Command::AsyncPendingImportCall(14)),
1302             (GuestCallee, Command::AsyncPendingExportComplete(14)),
1303             (GuestCaller, Command::AsyncPendingImportCall(15)),
1304             (GuestCallee, Command::AsyncPendingExportComplete(15)),
1305             (GuestCaller, Command::AsyncPendingImportCall(16)),
1306             (GuestCallee, Command::AsyncPendingExportComplete(16)),
1307             (GuestCaller, Command::AsyncPendingImportCall(17)),
1308             (GuestCallee, Command::AsyncPendingExportComplete(17)),
1309             (GuestCaller, Command::AsyncPendingImportCall(18)),
1310             (GuestCallee, Command::AsyncPendingExportComplete(18)),
1311             (GuestCaller, Command::AsyncPendingImportCall(19)),
1312             (GuestCallee, Command::AsyncPendingExportComplete(19)),
1313             (GuestCaller, Command::AsyncPendingImportCall(20)),
1314             (GuestCallee, Command::AsyncPendingExportComplete(20)),
1315             (GuestCaller, Command::AsyncPendingImportCall(21)),
1316             (GuestCallee, Command::AsyncPendingExportComplete(21)),
1317             (GuestCaller, Command::AsyncPendingImportCall(22)),
1318             (GuestCallee, Command::AsyncPendingExportComplete(22)),
1319             (GuestCaller, Command::AsyncPendingImportCall(23)),
1320             (GuestCallee, Command::AsyncPendingExportComplete(23)),
1321             (GuestCaller, Command::AsyncPendingImportCall(24)),
1322             (GuestCallee, Command::AsyncPendingExportComplete(24)),
1323             (GuestCaller, Command::AsyncPendingImportCall(25)),
1324             (GuestCallee, Command::AsyncPendingExportComplete(25)),
1325             (GuestCaller, Command::AsyncPendingImportCall(26)),
1326             (GuestCallee, Command::AsyncPendingExportComplete(26)),
1327             (GuestCaller, Command::AsyncPendingImportCall(27)),
1328             (GuestCallee, Command::AsyncPendingExportComplete(27)),
1329             (GuestCaller, Command::AsyncPendingImportCall(28)),
1330             (GuestCallee, Command::AsyncPendingExportComplete(28)),
1331             (GuestCaller, Command::AsyncPendingImportCall(29)),
1332             (GuestCallee, Command::AsyncPendingExportComplete(29)),
1333             (GuestCaller, Command::AsyncPendingImportCall(30)),
1334             (GuestCallee, Command::AsyncPendingExportComplete(30)),
1335             (GuestCaller, Command::AsyncPendingImportCall(31)),
1336             (GuestCallee, Command::AsyncPendingExportComplete(31)),
1337             (GuestCaller, Command::AsyncPendingImportCall(32)),
1338             (GuestCallee, Command::AsyncPendingExportComplete(32)),
1339             (GuestCaller, Command::AsyncPendingImportCall(33)),
1340             (GuestCallee, Command::AsyncPendingExportComplete(33)),
1341             (GuestCaller, Command::AsyncPendingImportCall(34)),
1342             (GuestCallee, Command::AsyncPendingExportComplete(34)),
1343             (GuestCaller, Command::AsyncPendingImportCall(35)),
1344             (GuestCallee, Command::AsyncPendingExportComplete(35)),
1345             (GuestCaller, Command::AsyncPendingImportCall(36)),
1346             (GuestCallee, Command::AsyncPendingExportComplete(36)),
1347             (GuestCaller, Command::AsyncPendingImportCall(37)),
1348             (GuestCallee, Command::AsyncPendingExportComplete(37)),
1349             (GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1350         ];
1351         init();
1352 
1353         run(ComponentAsync {
1354             commands: COMMANDS.to_vec(),
1355         });
1356     }
1357 
1358     #[test]
simple_stream1()1359     fn simple_stream1() {
1360         init();
1361 
1362         run(ComponentAsync {
1363             commands: vec![
1364                 (HostCallee, Command::StreamNew(1)),
1365                 (
1366                     HostCallee,
1367                     Command::StreamReadPending(StreamReadPayload {
1368                         stream: 1,
1369                         count: 2,
1370                     }),
1371                 ),
1372                 (HostCallee, Command::StreamCancelRead(1)),
1373                 (GuestCaller, Command::SyncReadyCall),
1374                 (
1375                     HostCallee,
1376                     Command::StreamWritePending(StreamWritePayload {
1377                         stream: 1,
1378                         item: 3,
1379                         count: 2,
1380                     }),
1381                 ),
1382                 (HostCallee, Command::StreamCancelWrite(1)),
1383                 (HostCallee, Command::StreamDropWritable(1)),
1384                 (
1385                     HostCallee,
1386                     Command::StreamReadDropped(StreamReadPayload {
1387                         stream: 1,
1388                         count: 1,
1389                     }),
1390                 ),
1391             ],
1392         });
1393     }
1394 
1395     #[test]
simple_stream3()1396     fn simple_stream3() {
1397         init();
1398 
1399         run(ComponentAsync {
1400             commands: vec![
1401                 (GuestCaller, Command::StreamNew(26)),
1402                 (
1403                     GuestCaller,
1404                     Command::StreamReadPending(StreamReadPayload {
1405                         stream: 26,
1406                         count: 10,
1407                     }),
1408                 ),
1409                 (GuestCaller, Command::StreamDropWritable(26)),
1410                 (GuestCaller, Command::StreamReadAssertDropped(26)),
1411             ],
1412         });
1413     }
1414 
1415     #[test]
simple_stream4()1416     fn simple_stream4() {
1417         init();
1418 
1419         run(ComponentAsync {
1420             commands: vec![
1421                 (GuestCaller, Command::StreamNew(23)),
1422                 (
1423                     GuestCaller,
1424                     Command::StreamWritePending(StreamWritePayload {
1425                         stream: 23,
1426                         item: 24,
1427                         count: 2,
1428                     }),
1429                 ),
1430                 (GuestCaller, Command::StreamGive(23)),
1431                 (GuestCallee, Command::StreamDropReadable(23)),
1432                 (
1433                     GuestCaller,
1434                     Command::StreamWriteAssertDropped(StreamReadPayload {
1435                         stream: 23,
1436                         count: 0,
1437                     }),
1438                 ),
1439             ],
1440         });
1441     }
1442 
1443     #[test]
zero_length_behavior()1444     fn zero_length_behavior() {
1445         init();
1446 
1447         run(ComponentAsync {
1448             commands: vec![
1449                 (GuestCaller, Command::StreamNew(10)),
1450                 (HostCaller, Command::StreamTake(10)),
1451                 (
1452                     GuestCaller,
1453                     Command::StreamWritePending(StreamWritePayload {
1454                         stream: 10,
1455                         item: 13,
1456                         count: 5,
1457                     }),
1458                 ),
1459                 (
1460                     HostCaller,
1461                     Command::StreamReadReady(StreamReadyPayload {
1462                         stream: 10,
1463                         item: 0,
1464                         op_count: 0,
1465                         ready_count: 0,
1466                     }),
1467                 ),
1468                 (
1469                     HostCaller,
1470                     Command::StreamReadReady(StreamReadyPayload {
1471                         stream: 10,
1472                         item: 0,
1473                         op_count: 0,
1474                         ready_count: 0,
1475                     }),
1476                 ),
1477             ],
1478         });
1479     }
1480 }
1481