1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2
3 use crate::block_on;
4 use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5 use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6 use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7 use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8 use futures::channel::oneshot;
9 use std::collections::{HashMap, HashSet};
10 use std::mem;
11 use std::pin::Pin;
12 use std::sync::{Arc, OnceLock, Weak};
13 use std::task::{Context, Poll, Waker};
14 use std::time::Instant;
15 use wasmtime::component::{
16 Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17 FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18 StreamReader, StreamResult, VecBuffer,
19 };
20 use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21 use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22
23 static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24
25 /// Initializes state for future fuzz runs.
26 ///
27 /// This will create an `Engine` to run this fuzzer within and it will
28 /// additionally precompile the component that will be used for fuzzing.
29 ///
30 /// There are a few points of note about this:
31 ///
32 /// * The `misc` fuzzer is manually instrumented with this function as the init
33 /// hook to ensure this runs before any other fuzzing.
34 ///
35 /// * Compilation of the component takes quite some time with
36 /// fuzzing-instrumented Cranelift. To assist with local development this
37 /// implements a cache which is serialized/deserialized via an env var.
init()38 pub fn init() {
39 crate::init_fuzzing();
40
41 STATE.get_or_init(|| {
42 let mut config = Config::new();
43 config.wasm_component_model_async(true);
44 let engine = Engine::new(&config).unwrap();
45 let component = compile(&engine);
46 let mut linker = Linker::new(&engine);
47 wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
48 async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
49 types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50
51 let pre = linker.instantiate_pre(&component).unwrap();
52 let pre = FuzzAsyncPre::new(pre).unwrap();
53
54 (engine, pre)
55 });
56
57 fn compile(engine: &Engine) -> Component {
58 let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
59 let wasm = test_programs_artifacts::fuzz_async_component_bytes!();
60 let wasm = &wasm[..];
61 let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
62 if let Some(path) = &cwasm_cache
63 && let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
64 && let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified())
65 && cwasm_mtime > wasm_mtime
66 {
67 log::debug!("Using cached component async cwasm at {path}");
68 unsafe {
69 return Component::deserialize_file(engine, path).unwrap();
70 }
71 }
72
73 let composition = {
74 let mut config = wasm_compose::config::Config::default();
75 let tempdir = tempfile::TempDir::new().unwrap();
76 let path = tempdir.path().join("fuzz-async.wasm");
77 std::fs::write(&path, wasm).unwrap();
78 config.definitions.push(path.clone());
79
80 wasm_compose::composer::ComponentComposer::new(&path, &config)
81 .compose()
82 .unwrap()
83 };
84 let start = Instant::now();
85 let component = Component::new(&engine, &composition).unwrap();
86 if let Some(path) = cwasm_cache {
87 log::debug!("Caching component async cwasm to {path}");
88 std::fs::write(path, &component.serialize().unwrap()).unwrap();
89 } else if start.elapsed() > std::time::Duration::from_secs(1) {
90 eprintln!(
91 "
92 !!!!!!!!!!!!!!!!!!!!!!!!!!
93
94 Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
95 cache compilation results
96
97 !!!!!!!!!!!!!!!!!!!!!!!!!!
98 "
99 );
100 }
101 return component;
102 }
103 }
104
105 #[derive(Default)]
106 struct Data {
107 ctx: WasiCtx,
108 table: ResourceTable,
109 wakers: HashMap<Scope, Waker>,
110 commands: Vec<(Scope, Command)>,
111
112 guest_caller_stream: Option<StreamReader<Command>>,
113 guest_callee_stream: Option<StreamReader<Command>>,
114
115 host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
116 host_pending_async_calls_cancelled: HashSet<u32>,
117 guest_pending_async_calls_ready: HashSet<u32>,
118
119 // State of futures/streams. Note that while #12091 is unresolved an
120 // `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
121 // and the various halves we're interacting with using traits.
122 host_futures: HashMap<u32, FutureReader<u32>>,
123 host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
124 host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
125 host_streams: HashMap<u32, StreamReader<u32>>,
126 host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
127 host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
128 }
129
130 impl WasiView for Data {
ctx(&mut self) -> WasiCtxView<'_>131 fn ctx(&mut self) -> WasiCtxView<'_> {
132 WasiCtxView {
133 ctx: &mut self.ctx,
134 table: &mut self.table,
135 }
136 }
137 }
138
139 impl async_test::HostWithStore for HasSelf<Data> {
async_ready<T>(_store: &Accessor<T, Self>)140 async fn async_ready<T>(_store: &Accessor<T, Self>) {}
141
async_pending<T>(store: &Accessor<T, Self>, id: u32)142 async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
143 let (tx, rx) = oneshot::channel();
144 store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
145 let record = RecordCancelOnDrop { store, id };
146 rx.await.unwrap();
147 mem::forget(record);
148
149 struct RecordCancelOnDrop<'a, T: 'static> {
150 store: &'a Accessor<T, HasSelf<Data>>,
151 id: u32,
152 }
153
154 impl<T> Drop for RecordCancelOnDrop<'_, T> {
155 fn drop(&mut self) {
156 self.store.with(|mut s| {
157 s.get().host_pending_async_calls_cancelled.insert(self.id);
158 });
159 }
160 }
161 }
162
init<T>(_store: &Accessor<T, Self>, _scope: types::Scope)163 async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
164 }
165
166 impl async_test::Host for Data {
sync_ready(&mut self)167 fn sync_ready(&mut self) {}
168
future_take(&mut self, id: u32) -> FutureReader<u32>169 fn future_take(&mut self, id: u32) -> FutureReader<u32> {
170 self.host_futures.remove(&id).unwrap()
171 }
172
future_receive(&mut self, id: u32, future: FutureReader<u32>)173 fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
174 let prev = self.host_futures.insert(id, future);
175 assert!(prev.is_none());
176 }
177
stream_take(&mut self, id: u32) -> StreamReader<u32>178 fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
179 self.host_streams.remove(&id).unwrap()
180 }
181
stream_receive(&mut self, id: u32, stream: StreamReader<u32>)182 fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
183 let prev = self.host_streams.insert(id, stream);
184 assert!(prev.is_none());
185 }
186 }
187
188 impl types::HostWithStore for HasSelf<Data> {
get_commands<T>( mut store: Access<'_, T, Self>, scope: types::Scope, ) -> StreamReader<Command>189 fn get_commands<T>(
190 mut store: Access<'_, T, Self>,
191 scope: types::Scope,
192 ) -> StreamReader<Command> {
193 let data = store.get();
194 match scope {
195 types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
196 types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
197 }
198 }
199 }
200
201 impl types::Host for Data {}
202
203 /// Executes the `input` provided, assuming that `init` has been previously
204 /// executed.
run(mut input: ComponentAsync)205 pub fn run(mut input: ComponentAsync) {
206 log::debug!("Running component async fuzz test with\n{input:?}");
207
208 // Commands are executed in the order that they're listed in the input, but
209 // to make it easier on the `StreamProducer` implementation below they're
210 // popped off the back. To ensure that they're all delivered in the right
211 // order reverse the list to ensure the correct order is maintained.
212 input.commands.reverse();
213
214 let (engine, pre) = STATE.get().unwrap();
215 let mut store = Store::new(
216 engine,
217 Data {
218 ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
219 commands: input.commands,
220 ..Data::default()
221 },
222 );
223
224 let guest_caller_stream =
225 StreamReader::new(&mut store, SharedStream(Scope::GuestCaller)).unwrap();
226 let guest_callee_stream =
227 StreamReader::new(&mut store, SharedStream(Scope::GuestCallee)).unwrap();
228 store.data_mut().guest_caller_stream = Some(guest_caller_stream);
229 store.data_mut().guest_callee_stream = Some(guest_callee_stream);
230 block_on(async {
231 let instance = pre.instantiate_async(&mut store).await.unwrap();
232 let test = instance.wasmtime_fuzz_fuzz_async_test();
233
234 let mut host_caller = SharedStream(Scope::HostCaller);
235 let mut host_callee = SharedStream(Scope::HostCallee);
236 store
237 .run_concurrent(async |store| {
238 // Kick off stream reads in the guest. This function will return
239 // but the tasks in the guest will keep running after they
240 // return to process stream items.
241 test.call_init(store, types::Scope::Caller).await.unwrap();
242
243 // Simultaneously process commands from both host streams. These
244 // will return once the entire command queue is exhausted.
245 futures::join!(
246 async {
247 while let Some(cmd) = host_caller.next(store).await {
248 host_caller_cmd(&test, store, cmd).await;
249 }
250 },
251 async {
252 while let Some(cmd) = host_callee.next(store).await {
253 host_callee_cmd(store, cmd).await;
254 }
255 },
256 );
257
258 // Note that there may still be pending async work in the guest
259 // (or host). It's intentional that it's not cleaned up here to
260 // help test situations where async work is all abruptly
261 // cancelled by just being dropped in the host.
262 })
263 .await
264 .unwrap();
265 });
266 }
267
268 /// See documentation in `fuzz_async.rs` for what's going on here.
test_property<F>(store: &Accessor<Data>, mut f: F) -> bool where F: FnMut(&mut Data) -> bool,269 async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
270 where
271 F: FnMut(&mut Data) -> bool,
272 {
273 for _ in 0..1000 {
274 let ready = store.with(|mut s| f(s.get()));
275 if ready {
276 return true;
277 }
278
279 crate::YieldN(1).await;
280 }
281
282 return false;
283 }
284
await_property<F>(store: &Accessor<Data>, desc: &str, f: F) where F: FnMut(&mut Data) -> bool,285 async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
286 where
287 F: FnMut(&mut Data) -> bool,
288 {
289 assert!(
290 test_property(store, f).await,
291 "timed out waiting for {desc}",
292 );
293 }
294
host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command)295 async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
296 match cmd {
297 Command::Ack => {}
298 Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
299 Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
300 Command::AsyncPendingExportComplete(_i) => todo!(),
301 Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
302 Command::AsyncPendingImportCall(i) => {
303 struct RunPendingImport {
304 test: Guest,
305 i: u32,
306 }
307
308 store.spawn(RunPendingImport {
309 test: test.clone(),
310 i,
311 });
312
313 impl AccessorTask<Data> for RunPendingImport {
314 async fn run(self, store: &Accessor<Data>) -> Result<()> {
315 self.test.call_async_pending(store, self.i).await?;
316 store.with(|mut s| {
317 s.get().guest_pending_async_calls_ready.insert(self.i);
318 });
319 Ok(())
320 }
321 }
322 }
323 Command::AsyncPendingImportCancel(_i) => todo!(),
324 Command::AsyncPendingImportAssertReady(i) => {
325 assert!(
326 test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
327 "expected async_pending import {i} to be ready",
328 );
329 }
330
331 Command::FutureTake(i) => {
332 let future = test.call_future_take(store, i).await.unwrap();
333 store.with(|mut s| {
334 let prev = s.get().host_futures.insert(i, future);
335 assert!(prev.is_none());
336 });
337 }
338 Command::FutureGive(i) => {
339 let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
340 test.call_future_receive(store, i, future).await.unwrap();
341 }
342 Command::StreamTake(i) => {
343 let stream = test.call_stream_take(store, i).await.unwrap();
344 store.with(|mut s| {
345 let prev = s.get().host_streams.insert(i, stream);
346 assert!(prev.is_none());
347 });
348 }
349 Command::StreamGive(i) => {
350 let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
351 test.call_stream_receive(store, i, stream).await.unwrap();
352 }
353
354 other => future_or_stream_cmd(store, other).await,
355 }
356 }
357
host_callee_cmd(store: &Accessor<Data>, cmd: Command)358 async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
359 match cmd {
360 Command::Ack => {}
361 Command::SyncReadyCall => todo!(),
362 Command::AsyncReadyCall => todo!(),
363 Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
364 s.get()
365 .host_pending_async_calls
366 .remove(&i)
367 .unwrap()
368 .send(())
369 .unwrap();
370 }),
371 Command::AsyncPendingExportAssertCancelled(i) => {
372 assert!(
373 test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
374 "expected async_pending export {i} to be cancelled",
375 );
376 }
377 Command::AsyncPendingImportCall(_i) => todo!(),
378 Command::AsyncPendingImportCancel(_i) => todo!(),
379 Command::AsyncPendingImportAssertReady(_i) => todo!(),
380
381 other => future_or_stream_cmd(store, other).await,
382 }
383 }
384
future_or_stream_cmd(store: &Accessor<Data>, cmd: Command)385 async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
386 match cmd {
387 // These commands should be handled above
388 Command::Ack
389 | Command::SyncReadyCall
390 | Command::AsyncReadyCall
391 | Command::AsyncPendingExportComplete(_)
392 | Command::AsyncPendingExportAssertCancelled(_)
393 | Command::AsyncPendingImportCall(_)
394 | Command::AsyncPendingImportCancel(_)
395 | Command::FutureTake(_)
396 | Command::FutureGive(_)
397 | Command::StreamTake(_)
398 | Command::StreamGive(_)
399 | Command::AsyncPendingImportAssertReady(_) => unreachable!(),
400
401 Command::FutureNew(id) => {
402 store.with(|mut s| {
403 let arc = Arc::new(());
404 let weak = Arc::downgrade(&arc);
405 let future = FutureReader::new(&mut s, HostFutureProducer(id, arc)).unwrap();
406 let data = s.get();
407 let prev = data.host_futures.insert(id, future);
408 assert!(prev.is_none());
409 let prev = data
410 .host_future_producers
411 .insert(id, (HostFutureProducerState::Idle, weak));
412 assert!(prev.is_none());
413 });
414 }
415 Command::FutureDropReadable(id) => {
416 store.with(|mut s| match s.get().host_futures.remove(&id) {
417 Some(mut future) => future.close(&mut s).unwrap(),
418 None => {
419 let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
420 state.wake_by_ref();
421 }
422 })
423 }
424 Command::FutureWriteReady(payload) => {
425 await_property(store, "future write should be waiting", |s| {
426 matches!(
427 s.host_future_producers.get(&payload.future),
428 Some((HostFutureProducerState::Waiting(_), _))
429 )
430 })
431 .await;
432 store.with(|mut s| {
433 let state = s
434 .get()
435 .host_future_producers
436 .get_mut(&payload.future)
437 .unwrap();
438 match state {
439 (HostFutureProducerState::Waiting(waker), _) => {
440 waker.wake_by_ref();
441 state.0 = HostFutureProducerState::Writing(payload.item);
442 }
443 (state, _) => panic!("future not waiting: {state:?}"),
444 }
445 })
446 }
447 Command::FutureWritePending(payload) => store.with(|mut s| {
448 let state = s
449 .get()
450 .host_future_producers
451 .get_mut(&payload.future)
452 .unwrap();
453 match state {
454 (HostFutureProducerState::Idle, _) => {
455 state.0 = HostFutureProducerState::Writing(payload.item);
456 }
457 _ => panic!("future not idle"),
458 }
459 }),
460 Command::FutureWriteDropped(id) => store.with(|mut s| {
461 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
462 assert!(matches!(state, HostFutureProducerState::Idle));
463 assert!(weak.upgrade().is_none());
464 }),
465 Command::FutureReadReady(payload) => {
466 let id = payload.future;
467 store.with(|mut s| {
468 let arc = Arc::new(());
469 let weak = Arc::downgrade(&arc);
470 let data = s.get();
471 let future = data.host_futures.remove(&id).unwrap();
472 let prev = data
473 .host_future_consumers
474 .insert(id, (HostFutureConsumerState::Consuming, weak));
475 assert!(prev.is_none());
476 future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap();
477 });
478
479 await_property(store, "future should be present", |s| {
480 matches!(
481 s.host_future_consumers[&id],
482 (HostFutureConsumerState::Complete(_), _)
483 )
484 })
485 .await;
486
487 store.with(|mut s| {
488 let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
489 match state {
490 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
491 _ => panic!("future not complete"),
492 }
493 });
494 }
495 Command::FutureReadPending(id) => {
496 ensure_future_reading(store, id);
497 store.with(|mut s| {
498 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
499 state.wake_by_ref();
500 assert!(
501 matches!(state, HostFutureConsumerState::Idle),
502 "bad state: {state:?}",
503 );
504 *state = HostFutureConsumerState::Consuming;
505 })
506 }
507 Command::FutureCancelWrite(id) => store.with(|mut s| {
508 let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
509 assert!(matches!(state, HostFutureProducerState::Writing(_)));
510 *state = HostFutureProducerState::Idle;
511 }),
512 Command::FutureCancelRead(id) => store.with(|mut s| {
513 let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
514 assert!(matches!(state, HostFutureConsumerState::Consuming));
515 *state = HostFutureConsumerState::Idle;
516 }),
517 Command::FutureReadAssertComplete(payload) => {
518 await_property(store, "future read should be complete", |s| {
519 matches!(
520 s.host_future_consumers.get(&payload.future),
521 Some((HostFutureConsumerState::Complete(_), _))
522 )
523 })
524 .await;
525 store.with(|mut s| {
526 let (state, _) = s
527 .get()
528 .host_future_consumers
529 .remove(&payload.future)
530 .unwrap();
531 match state {
532 HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
533 _ => panic!("future not complete"),
534 }
535 })
536 }
537 Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
538 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
539 assert!(matches!(state, HostFutureProducerState::Complete));
540 assert!(weak.upgrade().is_none());
541 }),
542 Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
543 let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
544 assert!(matches!(state, HostFutureProducerState::Writing(_)));
545 assert!(weak.upgrade().is_none());
546 }),
547
548 Command::StreamNew(id) => {
549 store.with(|mut s| {
550 let arc = Arc::new(());
551 let weak = Arc::downgrade(&arc);
552 let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc)).unwrap();
553 let data = s.get();
554 let prev = data.host_streams.insert(id, stream);
555 assert!(prev.is_none());
556 let prev = data
557 .host_stream_producers
558 .insert(id, (HostStreamProducerState::idle(), weak));
559 assert!(prev.is_none());
560 });
561 }
562 Command::StreamDropReadable(id) => {
563 store.with(|mut s| match s.get().host_streams.remove(&id) {
564 Some(mut stream) => stream.close(&mut s).unwrap(),
565 None => {
566 let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
567 state.wake_by_ref();
568 }
569 })
570 }
571 Command::StreamDropWritable(id) => store.with(|mut s| {
572 let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
573 state.wake_by_ref();
574 }),
575 Command::StreamWriteReady(payload) => {
576 let id = payload.stream;
577 store.with(|mut s| {
578 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
579 state.wake_by_ref();
580 match state.kind {
581 HostStreamProducerStateKind::Idle => {
582 state.kind = HostStreamProducerStateKind::Writing(stream_payload(
583 payload.item,
584 payload.op_count,
585 ));
586 }
587 _ => panic!("stream not idle: {state:?}"),
588 }
589 });
590 await_property(store, "stream should complete a write", |s| {
591 matches!(
592 s.host_stream_producers[&id].0.kind,
593 HostStreamProducerStateKind::Wrote(_),
594 )
595 })
596 .await;
597 store.with(|mut s| {
598 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
599 match state.kind {
600 HostStreamProducerStateKind::Wrote(amt) => {
601 assert_eq!(amt, payload.ready_count);
602 state.kind = HostStreamProducerStateKind::Idle;
603 }
604 _ => panic!("stream not idle: {state:?}"),
605 }
606 });
607 }
608 Command::StreamReadReady(payload) => {
609 let id = payload.stream;
610 ensure_stream_reading(store, id);
611 store.with(|mut s| {
612 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
613 state.wake_by_ref();
614 state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
615 });
616 await_property(store, "stream should complete a read", |s| {
617 matches!(
618 s.host_stream_consumers[&id].0.kind,
619 HostStreamConsumerStateKind::Consumed(_),
620 )
621 })
622 .await;
623
624 store.with(|mut s| {
625 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
626 match &state.kind {
627 HostStreamConsumerStateKind::Consumed(last_read) => {
628 assert_eq!(
629 *last_read,
630 stream_payload(payload.item, payload.ready_count)
631 );
632 state.kind = HostStreamConsumerStateKind::Idle;
633 }
634 _ => panic!("future not complete"),
635 }
636 });
637 }
638 Command::StreamWritePending(payload) => store.with(|mut s| {
639 let (state, _) = s
640 .get()
641 .host_stream_producers
642 .get_mut(&payload.stream)
643 .unwrap();
644 state.wake_by_ref();
645 match state.kind {
646 HostStreamProducerStateKind::Idle => {
647 state.kind = HostStreamProducerStateKind::Writing(stream_payload(
648 payload.item,
649 payload.count,
650 ));
651 }
652 _ => panic!("stream not idle {:?}", state.kind),
653 }
654 }),
655 Command::StreamReadPending(payload) => {
656 ensure_stream_reading(store, payload.stream);
657 store.with(|mut s| {
658 let (state, _) = s
659 .get()
660 .host_stream_consumers
661 .get_mut(&payload.stream)
662 .unwrap();
663 state.wake_by_ref();
664 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
665 state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
666 })
667 }
668 Command::StreamWriteDropped(payload) => store.with(|mut s| {
669 let (state, weak) = s
670 .get()
671 .host_stream_producers
672 .get_mut(&payload.stream)
673 .unwrap();
674 assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
675 assert!(weak.upgrade().is_none());
676 }),
677 Command::StreamReadDropped(payload) => {
678 ensure_stream_reading(store, payload.stream);
679 await_property(store, "stream read should get dropped", |s| {
680 let weak = &s.host_stream_consumers[&payload.stream].1;
681 weak.upgrade().is_none()
682 })
683 .await;
684 store.with(|mut s| {
685 let (state, weak) = s
686 .get()
687 .host_stream_consumers
688 .get_mut(&payload.stream)
689 .unwrap();
690 assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
691 assert!(weak.upgrade().is_none());
692 })
693 }
694 Command::StreamCancelWrite(id) => store.with(|mut s| {
695 let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
696 assert!(
697 matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
698 "invalid state {state:?}",
699 );
700 state.kind = HostStreamProducerStateKind::Idle;
701 state.wake_by_ref();
702 }),
703 Command::StreamCancelRead(id) => store.with(|mut s| {
704 let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
705 assert!(matches!(
706 state.kind,
707 HostStreamConsumerStateKind::Consuming(_)
708 ));
709 state.kind = HostStreamConsumerStateKind::Idle;
710 }),
711 Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
712 let (state, _) = s
713 .get()
714 .host_stream_consumers
715 .get_mut(&payload.stream)
716 .unwrap();
717 match &state.kind {
718 HostStreamConsumerStateKind::Consumed(last_read) => {
719 assert_eq!(*last_read, stream_payload(payload.item, payload.count));
720 state.kind = HostStreamConsumerStateKind::Idle;
721 }
722 _ => panic!("stream not complete"),
723 }
724 }),
725 Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
726 let (state, _) = s
727 .get()
728 .host_stream_producers
729 .get_mut(&payload.stream)
730 .unwrap();
731 match state.kind {
732 HostStreamProducerStateKind::Wrote(amt) => {
733 assert_eq!(amt, payload.count);
734 state.kind = HostStreamProducerStateKind::Idle;
735 }
736 _ => panic!("stream not complete: {:?}", state.kind),
737 }
738 }),
739 Command::StreamWriteAssertDropped(payload) => {
740 await_property(store, "stream write should be dropped", |s| {
741 let weak = &s.host_stream_producers[&payload.stream].1;
742 weak.upgrade().is_none()
743 })
744 .await;
745 store.with(|mut s| {
746 let (state, weak) = s
747 .get()
748 .host_stream_producers
749 .get_mut(&payload.stream)
750 .unwrap();
751 assert!(matches!(
752 state.kind,
753 HostStreamProducerStateKind::Writing(_)
754 ));
755 assert!(weak.upgrade().is_none());
756 })
757 }
758 Command::StreamReadAssertDropped(id) => {
759 await_property(store, "stream read should be dropped", |s| {
760 let weak = &s.host_stream_consumers[&id].1;
761 weak.upgrade().is_none()
762 })
763 .await;
764 store.with(|mut s| {
765 let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
766 assert!(matches!(
767 state.kind,
768 HostStreamConsumerStateKind::Consuming(_),
769 ));
770 assert!(weak.upgrade().is_none());
771 })
772 }
773 }
774 }
775
stream_payload(item: u32, count: u32) -> Vec<u32>776 fn stream_payload(item: u32, count: u32) -> Vec<u32> {
777 (item..item + count).collect()
778 }
779
ensure_future_reading(store: &Accessor<Data>, id: u32)780 fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
781 store.with(|mut s| {
782 let data = s.get();
783 if !data.host_futures.contains_key(&id) {
784 return;
785 }
786 log::debug!("future consume: start {id}");
787 let arc = Arc::new(());
788 let weak = Arc::downgrade(&arc);
789 let data = s.get();
790 let future = data.host_futures.remove(&id).unwrap();
791 let prev = data
792 .host_future_consumers
793 .insert(id, (HostFutureConsumerState::Idle, weak));
794 assert!(prev.is_none());
795 future.pipe(&mut s, HostFutureConsumer(id, arc)).unwrap();
796 });
797 }
798
ensure_stream_reading(store: &Accessor<Data>, id: u32)799 fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
800 store.with(|mut s| {
801 let data = s.get();
802 if !data.host_streams.contains_key(&id) {
803 return;
804 }
805 log::debug!("stream consume: start {id}");
806 let arc = Arc::new(());
807 let weak = Arc::downgrade(&arc);
808 let prev = data.host_stream_consumers.insert(
809 id,
810 (
811 HostStreamConsumerState {
812 kind: HostStreamConsumerStateKind::Idle,
813 waker: None,
814 },
815 weak,
816 ),
817 );
818 assert!(prev.is_none());
819 let stream = data.host_streams.remove(&id).unwrap();
820 stream.pipe(&mut s, HostStreamConsumer(id, arc)).unwrap();
821 });
822 }
823
824 struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
825
826 /// Note that this is only created once a read is actually initiated on a
827 /// future. It's also not possible to cancel a host-based read on a future,
828 /// hence why this is simpler than the `HostFutureProducerState` state below.
829 #[derive(Debug)]
830 enum HostFutureConsumerState {
831 Idle,
832 Waiting(Waker),
833 Consuming,
834 Complete(u32),
835 }
836
837 impl HostFutureConsumerState {
wake_by_ref(&mut self)838 fn wake_by_ref(&mut self) {
839 if let HostFutureConsumerState::Waiting(waker) = &self {
840 waker.wake_by_ref();
841 *self = HostFutureConsumerState::Idle;
842 }
843 }
844 }
845
846 impl FutureConsumer<Data> for HostFutureConsumer {
847 type Item = u32;
848
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<()>>849 fn poll_consume(
850 self: Pin<&mut Self>,
851 cx: &mut Context<'_>,
852 mut store: StoreContextMut<'_, Data>,
853 mut source: Source<'_, Self::Item>,
854 finish: bool,
855 ) -> Poll<Result<()>> {
856 let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
857 Some(state) => state,
858 None => {
859 log::debug!("consume: closed {}", self.0);
860 return Poll::Ready(Ok(()));
861 }
862 };
863 match state.0 {
864 HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
865 if finish {
866 log::debug!("consume: cancel {}", self.0);
867 state.0 = HostFutureConsumerState::Idle;
868 Poll::Ready(Ok(()))
869 } else {
870 log::debug!("consume: wait {}", self.0);
871 state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
872 Poll::Pending
873 }
874 }
875 HostFutureConsumerState::Consuming => {
876 log::debug!("consume: done {}", self.0);
877 let mut item = None;
878 source.read(&mut store, &mut item).unwrap();
879 store
880 .data_mut()
881 .host_future_consumers
882 .get_mut(&self.0)
883 .unwrap()
884 .0 = HostFutureConsumerState::Complete(item.unwrap());
885 Poll::Ready(Ok(()))
886 }
887 HostFutureConsumerState::Complete(_) => unreachable!(),
888 }
889 }
890 }
891
892 struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
893
894 #[derive(Debug)]
895 enum HostFutureProducerState {
896 Idle,
897 Waiting(Waker),
898 Writing(u32),
899 Complete,
900 }
901
902 impl FutureProducer<Data> for HostFutureProducer {
903 type Item = u32;
904
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, finish: bool, ) -> Poll<Result<Option<Self::Item>>>905 fn poll_produce(
906 self: Pin<&mut Self>,
907 cx: &mut Context<'_>,
908 mut store: StoreContextMut<'_, Data>,
909 finish: bool,
910 ) -> Poll<Result<Option<Self::Item>>> {
911 let state = store
912 .data_mut()
913 .host_future_producers
914 .get_mut(&self.0)
915 .unwrap();
916 match state.0 {
917 HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
918 if finish {
919 log::debug!("produce: cancel {}", self.0);
920 state.0 = HostFutureProducerState::Idle;
921 Poll::Ready(Ok(None))
922 } else {
923 log::debug!("produce: wait {}", self.0);
924 state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
925 Poll::Pending
926 }
927 }
928 HostFutureProducerState::Writing(item) => {
929 log::debug!("produce: done {}", self.0);
930 state.0 = HostFutureProducerState::Complete;
931 Poll::Ready(Ok(Some(item)))
932 }
933 HostFutureProducerState::Complete => unreachable!(),
934 }
935 }
936 }
937
938 struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
939
940 #[derive(Debug)]
941 struct HostStreamConsumerState {
942 waker: Option<Waker>,
943 kind: HostStreamConsumerStateKind,
944 }
945
946 #[derive(Debug)]
947 enum HostStreamConsumerStateKind {
948 Idle,
949 Consuming(u32),
950 Consumed(Vec<u32>),
951 }
952
953 impl HostStreamConsumerState {
wake_by_ref(&mut self)954 fn wake_by_ref(&mut self) {
955 if let Some(waker) = self.waker.take() {
956 waker.wake();
957 }
958 }
959 }
960
961 impl StreamConsumer<Data> for HostStreamConsumer {
962 type Item = u32;
963
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut source: Source<'_, Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>964 fn poll_consume(
965 self: Pin<&mut Self>,
966 cx: &mut Context<'_>,
967 mut store: StoreContextMut<'_, Data>,
968 mut source: Source<'_, Self::Item>,
969 finish: bool,
970 ) -> Poll<Result<StreamResult>> {
971 let remaining = source.remaining(&mut store);
972 let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
973 Some((state, _)) => state,
974 None => {
975 log::debug!("stream consume: dropped {}", self.0);
976 return Poll::Ready(Ok(StreamResult::Dropped));
977 }
978 };
979 match state.kind {
980 HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
981 if finish {
982 log::debug!("stream consume: cancel {}", self.0);
983 state.waker = None;
984 Poll::Ready(Ok(StreamResult::Cancelled))
985 } else {
986 log::debug!("stream consume: wait {}", self.0);
987 state.waker = Some(cx.waker().clone());
988 Poll::Pending
989 }
990 }
991 HostStreamConsumerStateKind::Consuming(amt) => {
992 // The writer is performing a zero-length write. We always
993 // complete that without updating our own state.
994 if remaining == 0 {
995 log::debug!("stream consume: completing zero-length write {}", self.0);
996 return Poll::Ready(Ok(StreamResult::Completed));
997 }
998
999 // If this is a zero-length read then block the writer but update our own state.
1000 if amt == 0 {
1001 log::debug!("stream consume: finishing zero-length read {}", self.0);
1002 state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1003 state.waker = Some(cx.waker().clone());
1004 return Poll::Pending;
1005 }
1006
1007 // For non-zero sizes perform the read/copy.
1008 log::debug!("stream consume: done {}", self.0);
1009 let mut dst = Vec::with_capacity(amt as usize);
1010 source.read(&mut store, &mut dst).unwrap();
1011 let state = &mut store
1012 .data_mut()
1013 .host_stream_consumers
1014 .get_mut(&self.0)
1015 .unwrap()
1016 .0;
1017 state.kind = HostStreamConsumerStateKind::Consumed(dst);
1018 state.waker = None;
1019 Poll::Ready(Ok(StreamResult::Completed))
1020 }
1021 }
1022 }
1023 }
1024
1025 impl Drop for HostStreamConsumer {
drop(&mut self)1026 fn drop(&mut self) {
1027 log::debug!("stream consume: drop {}", self.0);
1028 }
1029 }
1030
1031 struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1032
1033 #[derive(Debug)]
1034 struct HostStreamProducerState {
1035 kind: HostStreamProducerStateKind,
1036 waker: Option<Waker>,
1037 }
1038
1039 #[derive(Debug)]
1040 enum HostStreamProducerStateKind {
1041 Idle,
1042 Writing(Vec<u32>),
1043 Wrote(u32),
1044 }
1045
1046 impl HostStreamProducerState {
idle() -> Self1047 fn idle() -> Self {
1048 HostStreamProducerState {
1049 kind: HostStreamProducerStateKind::Idle,
1050 waker: None,
1051 }
1052 }
1053
wake_by_ref(&mut self)1054 fn wake_by_ref(&mut self) {
1055 if let Some(waker) = self.waker.take() {
1056 waker.wake();
1057 }
1058 }
1059 }
1060
1061 impl StreamProducer<Data> for HostStreamProducer {
1062 type Item = u32;
1063 type Buffer = VecBuffer<u32>;
1064
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, mut dst: Destination<'_, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>1065 fn poll_produce(
1066 self: Pin<&mut Self>,
1067 cx: &mut Context<'_>,
1068 mut store: StoreContextMut<'_, Data>,
1069 mut dst: Destination<'_, Self::Item, Self::Buffer>,
1070 finish: bool,
1071 ) -> Poll<Result<StreamResult>> {
1072 let remaining = dst.remaining(&mut store);
1073 let data = store.data_mut();
1074 let state = match data.host_stream_producers.get_mut(&self.0) {
1075 Some((state, _)) => state,
1076 None => {
1077 log::debug!("stream produce: dropped {}", self.0);
1078 return Poll::Ready(Ok(StreamResult::Dropped));
1079 }
1080 };
1081 match &mut state.kind {
1082 HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1083 if finish {
1084 log::debug!("stream produce: cancel {}", self.0);
1085 state.waker = None;
1086 Poll::Ready(Ok(StreamResult::Cancelled))
1087 } else {
1088 log::debug!("stream produce: wait {}", self.0);
1089 state.waker = Some(cx.waker().clone());
1090 Poll::Pending
1091 }
1092 }
1093 HostStreamProducerStateKind::Writing(buf) => {
1094 // Keep the other side blocked for a zero-length write
1095 // originated from the host.
1096 if buf.len() == 0 {
1097 log::debug!("stream produce: zero-length write {}", self.0);
1098 state.kind = HostStreamProducerStateKind::Wrote(0);
1099 state.waker = Some(cx.waker().clone());
1100 return Poll::Pending;
1101 }
1102 log::debug!("stream produce: write {}", self.0);
1103 match remaining {
1104 Some(amt) => {
1105 // If the guest is doing a zero-length read then we've
1106 // got some data for them. Complete the read but leave
1107 // ourselves in the same `Writing` state as before.
1108 if amt == 0 {
1109 state.waker = None;
1110 return Poll::Ready(Ok(StreamResult::Completed));
1111 }
1112
1113 // Don't let wasmtime buffer up data for us, so truncate
1114 // the buffer we're sending over to the amount that the
1115 // reader is requesting.
1116 if amt < buf.len() {
1117 buf.truncate(amt);
1118 }
1119 }
1120
1121 // At this time host<->host stream reads/writes aren't
1122 // fuzzed since that brings up a bunch of weird edge cases
1123 // which aren't fun to deal with and aren't interesting
1124 // either.
1125 None => unreachable!(),
1126 }
1127 let count = buf.len() as u32;
1128 dst.set_buffer(mem::take(buf).into());
1129 state.kind = HostStreamProducerStateKind::Wrote(count);
1130 state.waker = None;
1131 Poll::Ready(Ok(StreamResult::Completed))
1132 }
1133 }
1134 }
1135 }
1136
1137 impl Drop for HostStreamProducer {
drop(&mut self)1138 fn drop(&mut self) {
1139 log::debug!("stream produce: drop {}", self.0);
1140 }
1141 }
1142
1143 struct SharedStream(Scope);
1144
1145 impl SharedStream {
next(&mut self, accessor: &Accessor<Data>) -> Option<Command>1146 async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1147 std::future::poll_fn(|cx| {
1148 accessor.with(|mut store| {
1149 self.poll(cx, store.as_context_mut(), false)
1150 .map(|pair| match pair {
1151 (None, StreamResult::Dropped) => None,
1152 (Some(item), StreamResult::Completed) => Some(item),
1153 _ => unreachable!(),
1154 })
1155 })
1156 })
1157 .await
1158 }
1159
poll( &mut self, cx: &mut Context<'_>, mut store: StoreContextMut<'_, Data>, finish: bool, ) -> Poll<(Option<Command>, StreamResult)>1160 fn poll(
1161 &mut self,
1162 cx: &mut Context<'_>,
1163 mut store: StoreContextMut<'_, Data>,
1164 finish: bool,
1165 ) -> Poll<(Option<Command>, StreamResult)> {
1166 let data = store.data_mut();
1167
1168 // If no more commands remain then this is a closed and dropped stream.
1169 let Some((scope, command)) = data.commands.last_mut() else {
1170 log::debug!("Stream closed: {:?}", self.0);
1171 return Poll::Ready((None, StreamResult::Dropped));
1172 };
1173
1174 // If the next queued up command is for the scope that this stream is
1175 // attached to then send off the command.
1176 if *scope == self.0 {
1177 let ret = Some(*command);
1178
1179 // All commands are followed up with an "ack", and after the "ack"
1180 // is delivered then the command is popped to move on to the next
1181 // command. The reason for this is to guarantee that a command has
1182 // been processed before moving on to the next command. This helps
1183 // make the fuzzing easier to work with by being able to implicitly
1184 // assume that a command has been processed by the time something
1185 // else is. Otherwise it might be possible that wasmtime has a set
1186 // of commands/callbacks that are all delivered at the same time and
1187 // the component model doesn't specify what order they happen
1188 // within. By forcing an "ack" it ensures a more expected ordering
1189 // of execution to assist with fuzzing without losing really all
1190 // that much coverage.
1191 if matches!(command, Command::Ack) {
1192 data.commands.pop();
1193 } else {
1194 *command = Command::Ack;
1195 }
1196
1197 // After a command was popped other streams may be able to make
1198 // progress so wake them all up.
1199 for (_, waker) in data.wakers.drain() {
1200 waker.wake();
1201 }
1202 log::debug!("Delivering command {ret:?} for {:?}", self.0);
1203 return Poll::Ready((ret, StreamResult::Completed));
1204 }
1205
1206 // The command queue is non-empty and the next command isn't meant for
1207 // us, so someone else needs to drain the queue. Enqueue our waker.
1208 if finish {
1209 Poll::Ready((None, StreamResult::Cancelled))
1210 } else {
1211 data.wakers.insert(self.0, cx.waker().clone());
1212 Poll::Pending
1213 }
1214 }
1215 }
1216
1217 impl StreamProducer<Data> for SharedStream {
1218 type Item = Command;
1219 type Buffer = Option<Command>;
1220
poll_produce<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<'a, Data>, mut destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>1221 fn poll_produce<'a>(
1222 mut self: Pin<&mut Self>,
1223 cx: &mut Context<'_>,
1224 store: StoreContextMut<'a, Data>,
1225 mut destination: Destination<'a, Self::Item, Self::Buffer>,
1226 finish: bool,
1227 ) -> Poll<Result<StreamResult>> {
1228 let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1229 destination.set_buffer(item);
1230 Poll::Ready(Ok(result))
1231 }
1232 }
1233
1234 #[cfg(test)]
1235 mod tests {
1236 use super::{ComponentAsync, Scope, init, run};
1237 use crate::oracles::component_async::types::*;
1238 use crate::test::test_n_times;
1239 use Scope::*;
1240
1241 #[test]
smoke()1242 fn smoke() {
1243 init();
1244
1245 test_n_times(50, |c, _| {
1246 run(c);
1247 Ok(())
1248 });
1249 }
1250
1251 // ========================================================================
1252 // A series of fuzz-generated test cases which caused problems during the
1253 // development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1254 // changes over time.
1255
1256 #[test]
simple()1257 fn simple() {
1258 init();
1259
1260 run(ComponentAsync {
1261 commands: vec![
1262 (GuestCaller, Command::AsyncPendingImportCall(0)),
1263 (GuestCallee, Command::AsyncPendingImportCall(1)),
1264 (GuestCallee, Command::AsyncPendingExportComplete(0)),
1265 (GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1266 (GuestCaller, Command::AsyncPendingImportCall(2)),
1267 ],
1268 });
1269 }
1270
1271 #[test]
somewhat_larger()1272 fn somewhat_larger() {
1273 static COMMANDS: &[(Scope, Command)] = &[
1274 (GuestCallee, Command::FutureNew(0)),
1275 (HostCaller, Command::FutureNew(1)),
1276 (GuestCallee, Command::FutureReadPending(0)),
1277 (GuestCaller, Command::AsyncPendingImportCall(2)),
1278 (GuestCaller, Command::AsyncPendingImportCall(3)),
1279 (GuestCaller, Command::AsyncPendingImportCall(4)),
1280 (GuestCaller, Command::AsyncPendingImportCall(5)),
1281 (GuestCallee, Command::AsyncPendingExportComplete(5)),
1282 (GuestCallee, Command::AsyncPendingExportComplete(3)),
1283 (GuestCallee, Command::AsyncPendingExportComplete(4)),
1284 (GuestCallee, Command::AsyncPendingExportComplete(2)),
1285 (GuestCaller, Command::AsyncPendingImportCall(6)),
1286 (GuestCallee, Command::AsyncPendingExportComplete(6)),
1287 (GuestCaller, Command::AsyncPendingImportCall(7)),
1288 (GuestCallee, Command::AsyncPendingExportComplete(7)),
1289 (GuestCaller, Command::AsyncPendingImportCall(8)),
1290 (GuestCallee, Command::AsyncPendingExportComplete(8)),
1291 (GuestCaller, Command::AsyncPendingImportCall(9)),
1292 (GuestCallee, Command::AsyncPendingExportComplete(9)),
1293 (GuestCaller, Command::AsyncPendingImportCall(10)),
1294 (GuestCallee, Command::AsyncPendingExportComplete(10)),
1295 (GuestCaller, Command::AsyncPendingImportCall(11)),
1296 (GuestCallee, Command::AsyncPendingExportComplete(11)),
1297 (GuestCaller, Command::AsyncPendingImportCall(12)),
1298 (GuestCallee, Command::AsyncPendingExportComplete(12)),
1299 (GuestCaller, Command::AsyncPendingImportCall(13)),
1300 (GuestCallee, Command::AsyncPendingExportComplete(13)),
1301 (GuestCaller, Command::AsyncPendingImportCall(14)),
1302 (GuestCallee, Command::AsyncPendingExportComplete(14)),
1303 (GuestCaller, Command::AsyncPendingImportCall(15)),
1304 (GuestCallee, Command::AsyncPendingExportComplete(15)),
1305 (GuestCaller, Command::AsyncPendingImportCall(16)),
1306 (GuestCallee, Command::AsyncPendingExportComplete(16)),
1307 (GuestCaller, Command::AsyncPendingImportCall(17)),
1308 (GuestCallee, Command::AsyncPendingExportComplete(17)),
1309 (GuestCaller, Command::AsyncPendingImportCall(18)),
1310 (GuestCallee, Command::AsyncPendingExportComplete(18)),
1311 (GuestCaller, Command::AsyncPendingImportCall(19)),
1312 (GuestCallee, Command::AsyncPendingExportComplete(19)),
1313 (GuestCaller, Command::AsyncPendingImportCall(20)),
1314 (GuestCallee, Command::AsyncPendingExportComplete(20)),
1315 (GuestCaller, Command::AsyncPendingImportCall(21)),
1316 (GuestCallee, Command::AsyncPendingExportComplete(21)),
1317 (GuestCaller, Command::AsyncPendingImportCall(22)),
1318 (GuestCallee, Command::AsyncPendingExportComplete(22)),
1319 (GuestCaller, Command::AsyncPendingImportCall(23)),
1320 (GuestCallee, Command::AsyncPendingExportComplete(23)),
1321 (GuestCaller, Command::AsyncPendingImportCall(24)),
1322 (GuestCallee, Command::AsyncPendingExportComplete(24)),
1323 (GuestCaller, Command::AsyncPendingImportCall(25)),
1324 (GuestCallee, Command::AsyncPendingExportComplete(25)),
1325 (GuestCaller, Command::AsyncPendingImportCall(26)),
1326 (GuestCallee, Command::AsyncPendingExportComplete(26)),
1327 (GuestCaller, Command::AsyncPendingImportCall(27)),
1328 (GuestCallee, Command::AsyncPendingExportComplete(27)),
1329 (GuestCaller, Command::AsyncPendingImportCall(28)),
1330 (GuestCallee, Command::AsyncPendingExportComplete(28)),
1331 (GuestCaller, Command::AsyncPendingImportCall(29)),
1332 (GuestCallee, Command::AsyncPendingExportComplete(29)),
1333 (GuestCaller, Command::AsyncPendingImportCall(30)),
1334 (GuestCallee, Command::AsyncPendingExportComplete(30)),
1335 (GuestCaller, Command::AsyncPendingImportCall(31)),
1336 (GuestCallee, Command::AsyncPendingExportComplete(31)),
1337 (GuestCaller, Command::AsyncPendingImportCall(32)),
1338 (GuestCallee, Command::AsyncPendingExportComplete(32)),
1339 (GuestCaller, Command::AsyncPendingImportCall(33)),
1340 (GuestCallee, Command::AsyncPendingExportComplete(33)),
1341 (GuestCaller, Command::AsyncPendingImportCall(34)),
1342 (GuestCallee, Command::AsyncPendingExportComplete(34)),
1343 (GuestCaller, Command::AsyncPendingImportCall(35)),
1344 (GuestCallee, Command::AsyncPendingExportComplete(35)),
1345 (GuestCaller, Command::AsyncPendingImportCall(36)),
1346 (GuestCallee, Command::AsyncPendingExportComplete(36)),
1347 (GuestCaller, Command::AsyncPendingImportCall(37)),
1348 (GuestCallee, Command::AsyncPendingExportComplete(37)),
1349 (GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1350 ];
1351 init();
1352
1353 run(ComponentAsync {
1354 commands: COMMANDS.to_vec(),
1355 });
1356 }
1357
1358 #[test]
simple_stream1()1359 fn simple_stream1() {
1360 init();
1361
1362 run(ComponentAsync {
1363 commands: vec![
1364 (HostCallee, Command::StreamNew(1)),
1365 (
1366 HostCallee,
1367 Command::StreamReadPending(StreamReadPayload {
1368 stream: 1,
1369 count: 2,
1370 }),
1371 ),
1372 (HostCallee, Command::StreamCancelRead(1)),
1373 (GuestCaller, Command::SyncReadyCall),
1374 (
1375 HostCallee,
1376 Command::StreamWritePending(StreamWritePayload {
1377 stream: 1,
1378 item: 3,
1379 count: 2,
1380 }),
1381 ),
1382 (HostCallee, Command::StreamCancelWrite(1)),
1383 (HostCallee, Command::StreamDropWritable(1)),
1384 (
1385 HostCallee,
1386 Command::StreamReadDropped(StreamReadPayload {
1387 stream: 1,
1388 count: 1,
1389 }),
1390 ),
1391 ],
1392 });
1393 }
1394
1395 #[test]
simple_stream3()1396 fn simple_stream3() {
1397 init();
1398
1399 run(ComponentAsync {
1400 commands: vec![
1401 (GuestCaller, Command::StreamNew(26)),
1402 (
1403 GuestCaller,
1404 Command::StreamReadPending(StreamReadPayload {
1405 stream: 26,
1406 count: 10,
1407 }),
1408 ),
1409 (GuestCaller, Command::StreamDropWritable(26)),
1410 (GuestCaller, Command::StreamReadAssertDropped(26)),
1411 ],
1412 });
1413 }
1414
1415 #[test]
simple_stream4()1416 fn simple_stream4() {
1417 init();
1418
1419 run(ComponentAsync {
1420 commands: vec![
1421 (GuestCaller, Command::StreamNew(23)),
1422 (
1423 GuestCaller,
1424 Command::StreamWritePending(StreamWritePayload {
1425 stream: 23,
1426 item: 24,
1427 count: 2,
1428 }),
1429 ),
1430 (GuestCaller, Command::StreamGive(23)),
1431 (GuestCallee, Command::StreamDropReadable(23)),
1432 (
1433 GuestCaller,
1434 Command::StreamWriteAssertDropped(StreamReadPayload {
1435 stream: 23,
1436 count: 0,
1437 }),
1438 ),
1439 ],
1440 });
1441 }
1442
1443 #[test]
zero_length_behavior()1444 fn zero_length_behavior() {
1445 init();
1446
1447 run(ComponentAsync {
1448 commands: vec![
1449 (GuestCaller, Command::StreamNew(10)),
1450 (HostCaller, Command::StreamTake(10)),
1451 (
1452 GuestCaller,
1453 Command::StreamWritePending(StreamWritePayload {
1454 stream: 10,
1455 item: 13,
1456 count: 5,
1457 }),
1458 ),
1459 (
1460 HostCaller,
1461 Command::StreamReadReady(StreamReadyPayload {
1462 stream: 10,
1463 item: 0,
1464 op_count: 0,
1465 ready_count: 0,
1466 }),
1467 ),
1468 (
1469 HostCaller,
1470 Command::StreamReadReady(StreamReadyPayload {
1471 stream: 10,
1472 item: 0,
1473 op_count: 0,
1474 ready_count: 0,
1475 }),
1476 ),
1477 ],
1478 });
1479 }
1480 }
1481