1 //! Test case used with the `component_async` fuzzer which is part of the `misc`
2 //! fuzz target of Wasmtime.
3 //!
4 //! This test case is a binary that's suited for just that one fuzzer and has an
5 //! associated WIT world that it works with. This test case is composed with
6 //! itself and then run within the host as well. The exact semantics of this
7 //! program and all the exports/imports are defined within the context of the
8 //! fuzzer.
9 //!
10 //! The general idea is that this program creates an "async soup" and make sure
11 //! that everything works as expected, notably also not leading to any panics
12 //! anywhere within the runtime. An example of what this fuzzer intermingles
13 //! are:
14 //!
15 //! * Synchronous calls
16 //! * Async calls that are immediately ready
17 //! * Async calls that are not immediately ready and left pending
18 //! * Creation of futures/streams
19 //! * Moving futures/streams between components
20 //! * Reading/writing futures/streams
21 //! * Cancelling reads/writes of futures/streams
22 //! * Seeing futures/streams get dropped and the effect on active reads/writes
23 //! * Mixing host<->guest, guest<->guest, guest<->host, and host<->host
24 //!   calls/primitives
25 //!
26 //! The purpose of this fuzzer is to stress the management of async stacks, the
27 //! async runtime, and in theory suss out various edge cases in the handling of
28 //! async events. This fuzzer does NOT stress lifting/lowering at all because
29 //! there is a static WIT signature that this fuzzer works with.
30 //!
31 //! Much of the code in this file is semi-duplicated in the host except written
32 //! with host `wasmtime` APIs instead of `wit-bindgen` APIs. The overall
33 //! structure is roughly the same.
34 //!
35 //! # Overall architecture
36 //!
37 //! The general structure of this fuzzer is that there's a "host sandwich" which
38 //! looks like:
39 //!
40 //! ```text
41 //! ╔══════╦══════════════════════════════════════════════════════════╗
42 //! ║ Host ║                                                          ║
43 //! ╠══════╝                                                          ║
44 //! ║                                                                 ║
45 //! ║ ┍┯┯┯━━━ wasmtime:fuzz/types
46 //! ║ ││││                                                            ║
47 //! ║ ││││                                                            ║
48 //! ║ ││││        ╔════════════════════╦════════════════════╗         ║
49 //! ║ ││││        ║ component_async.rs ║                    ║         ║
50 //! ║ ││││        ╠════════════════════╝                    ║         ║
51 //! ║ ││││        ║                                         ║         ║
52 //! ║ ││││        ║            HostCaller                   ║         ║
53 //! ║ ││││        ║                                         ║         ║
54 //! ║ │││└────────╫─→ stream<command>                       ║         ║
55 //! ║ │││         ╚═══════════════════╤═════════════════════╝         ║
56 //! ║ │││                             │                               ║
57 //! ║ │││                             ┝ wasmtime-fuzz:fuzz/async-test
58 //! ║ │││                             │                               ║
59 //! ║ │││    ╔═══════════╦════════════╪═════════════════════════════╗ ║
60 //! ║ │││    ║ Component ║            │                             ║ ║
61 //! ║ │││    ╠═══════════╝            │                             ║ ║
62 //! ║ │││    ║                        ↓                             ║ ║
63 //! ║ │││    ║    ╔═════════════════╦═══════════════════════╗       ║ ║
64 //! ║ │││    ║    ║ fuzz-async.wasm ║                       ║       ║ ║
65 //! ║ │││    ║    ╠═════════════════╝                       ║       ║ ║
66 //! ║ │││    ║    ║                                         ║       ║ ║
67 //! ║ │││    ║    ║            GuestCaller                  ║       ║ ║
68 //! ║ │││    ║    ║                                         ║       ║ ║
69 //! ║ ││└────╫────╫─→ stream<command>                       ║       ║ ║
70 //! ║ ││     ║    ╚═══════╤═════════════════════════════════╝       ║ ║
71 //! ║ ││     ║            │                                         ║ ║
72 //! ║ ││     ║            ┝ wasmtime-fuzz:fuzz/async-test           ║ ║
73 //! ║ ││     ║            │                                         ║ ║
74 //! ║ ││     ║            ↓                                         ║ ║
75 //! ║ ││     ║    ╔═════════════════╦═══════════════════════╗       ║ ║
76 //! ║ ││     ║    ║ fuzz-async.wasm ║                       ║       ║ ║
77 //! ║ ││     ║    ╠═════════════════╝                       ║       ║ ║
78 //! ║ ││     ║    ║                                         ║       ║ ║
79 //! ║ ││     ║    ║            GuestCallee                  ║       ║ ║
80 //! ║ ││     ║    ║                                         ║       ║ ║
81 //! ║ │└─────╫────╫─→ stream<command>                       ║       ║ ║
82 //! ║ │      ║    ╚═══════════════════╤═════════════════════╝       ║ ║
83 //! ║ │      ║                        │                             ║ ║
84 //! ║ │      ║                        │                             ║ ║
85 //! ║ │      ╚════════════════════════╪═════════════════════════════╝ ║
86 //! ║ │                               │                               ║
87 //! ║ │                               ┝ wasmtime-fuzz:fuzz/async-test
88 //! ║ │                               │                               ║
89 //! ║ │                               ↓                               ║
90 //! ║ │           ╔════════════════════╦════════════════════╗         ║
91 //! ║ │           ║ component_async.rs ║                    ║         ║
92 //! ║ │           ╠════════════════════╝                    ║         ║
93 //! ║ │           ║                                         ║         ║
94 //! ║ │           ║            HostCallee                   ║         ║
95 //! ║ │           ║                                         ║         ║
96 //! ║ └───────────╫─→ stream<command>                       ║         ║
97 //! ║             ╚═════════════════════════════════════════╝         ║
98 //! ║                                                                 ║
99 //! ╚═════════════════════════════════════════════════════════════════╝
100 //! ```
101 //!
102 //! Here `fuzz-async.wasm` appears twice to model all the various types of
103 //! the host/guest interaction matrix. Everything is driven by a
104 //! `stream<command>` provided to each component part of the system which
105 //! serves as a means of forcing one particular component to take action.
106 //! Commands are then the test case itself where a series of commands are
107 //! executed for each fuzz iteration.
108 //!
109 //! # Yield-loops
110 //!
111 //! This program has a function `test_property` which is a similar analog to the
112 //! one in the host-side as well. The general idea is that while component model
113 //! async is generally deterministic it does not specify what should happen when
114 //! multiple events are ready at the same time. This can pretty easily happen in
115 //! this fuzzer meaning that it's not precise which event happens first. To
116 //! assist in managing this there are two primary mitigations:
117 //!
118 //! * The first is that whenever a command is dispatched to a component it's
119 //!   followed up with an "ack" which is a noop. Delivery of the "ack" can't
120 //!   happen until the previous command is completely finished being processed
121 //!   meaning it's a kludge way of synchronizing the receipt of a message.
122 //!
123 //! * The second is that there can still be small races where an async event
124 //!   hasn't quite happened yet but it's queued up to happen. To handle these
125 //!   events calls to `test_property` are sprinkled around which has an
126 //!   internally-bounded yield-loop. It's expected that while yielding other
127 //!   code can run which resolves the property being tested at-hand, and then
128 //!   this yield loop will panic if it turns too many times as it's probably a
129 //!   bug.
130 //!
131 //! It's a bit of a hack but it's so far the most effective way of handling this
132 //! that's (a) not timing-dependent e.g. adding sleeps, (b) is
133 //! reliable/deterministic, and (c) is flexible where the constant number of
134 //! yields can be bumped without much concern. The number of yields specifically
135 //! is arbitrarily chosen and while it can't be said exactly how many yields
136 //! should be necessary it should be able to say "less than N should always
137 //! work".
138 
139 wit_bindgen::generate!("fuzz-async" in "../fuzzing/wit");
140 
141 use crate::exports::wasmtime_fuzz::fuzz::async_test as e;
142 use crate::wasmtime_fuzz::fuzz::async_test as i;
143 use crate::wasmtime_fuzz::fuzz::types::{self, Command, Scope};
144 use futures::FutureExt;
145 use futures::channel::oneshot;
146 use pin_project_lite::pin_project;
147 use std::collections::{HashMap, HashSet};
148 use std::mem;
149 use std::pin::{Pin, pin};
150 use std::sync::Mutex;
151 use std::sync::atomic::{AtomicBool, Ordering};
152 use std::task::{Context, Poll, Waker};
153 use wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamResult, StreamWriter};
154 
155 struct Component;
156 
157 export!(Component);
158 
159 // Convenience macro to change the "target" of `log::debug!` based on whether
160 // this component is a `caller` or `callee` scope to distinguish logs in the
161 // output.
162 macro_rules! debug {
163     ($($arg:tt)*) => {
164         log::debug!(target: log_target(), $($arg)*);
165     }
166 }
167 
168 static IS_CALLER: AtomicBool = AtomicBool::new(false);
169 
170 fn log_target() -> &'static str {
171     if IS_CALLER.load(Ordering::Relaxed) {
172         "wasmtime_fuzzing::fuzz_async::caller"
173     } else {
174         "wasmtime_fuzzing::fuzz_async::callee"
175     }
176 }
177 
178 impl e::Guest for Component {
179     fn sync_ready() {}
180 
181     async fn async_ready() {}
182 
183     async fn async_pending(id: u32) {
184         let (tx, rx) = oneshot::channel();
185         State::with(|s| s.async_pending_exports_ready.insert(id, tx));
186         let record = RecordCancelOnDrop(id);
187         rx.await.unwrap();
188         mem::forget(record);
189         debug!("export {id} is complete");
190 
191         struct RecordCancelOnDrop(u32);
192 
193         impl Drop for RecordCancelOnDrop {
194             fn drop(&mut self) {
195                 debug!("export {} was cancelled", self.0);
196                 State::with(|s| {
197                     s.async_pending_exports_cancelled.insert(self.0);
198                 });
199             }
200         }
201     }
202 
203     async fn init(scope: Scope) {
204         IS_CALLER.store(scope == Scope::Caller, Ordering::Relaxed);
205         env_logger::init();
206         i::init(Scope::Callee).await;
207         let commands = types::get_commands(scope);
208         wit_bindgen::spawn(run(commands));
209     }
210 
211     fn future_take(id: u32) -> FutureReader<u32> {
212         State::with(|s| s.future_readers.remove(&id).unwrap())
213     }
214 
215     fn future_receive(id: u32, f: FutureReader<u32>) {
216         let prev = State::with(|s| s.future_readers.insert(id, f));
217         assert!(prev.is_none());
218     }
219 
220     fn stream_take(id: u32) -> StreamReader<u32> {
221         State::with(|s| s.stream_readers.remove(&id).unwrap())
222     }
223 
224     fn stream_receive(id: u32, f: StreamReader<u32>) {
225         let prev = State::with(|s| s.stream_readers.insert(id, f));
226         assert!(prev.is_none());
227     }
228 }
229 
230 #[derive(Default)]
231 struct State {
232     async_pending_imports_ready: HashSet<u32>,
233     async_pending_imports_in_progress: HashMap<u32, oneshot::Sender<()>>,
234     async_pending_exports_ready: HashMap<u32, oneshot::Sender<()>>,
235     async_pending_exports_cancelled: HashSet<u32>,
236 
237     future_readers: HashMap<u32, FutureReader<u32>>,
238     future_writers: HashMap<u32, FutureWriter<u32>>,
239     future_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
240     future_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
241     future_writes_completed: HashMap<u32, bool>,
242     future_reads_completed: HashMap<u32, u32>,
243 
244     stream_readers: HashMap<u32, StreamReader<u32>>,
245     stream_writers: HashMap<u32, StreamWriter<u32>>,
246     stream_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
247     stream_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
248     stream_writes_completed: HashMap<u32, Result<(usize, Vec<u32>), (usize, Vec<u32>)>>,
249     stream_reads_completed: HashMap<u32, Option<Vec<u32>>>,
250 }
251 
252 impl State {
253     pub fn with<R>(f: impl FnOnce(&mut State) -> R) -> R {
254         static STATE: Mutex<Option<State>> = Mutex::new(None);
255         let mut state = STATE.lock().unwrap();
256         let state = state.get_or_insert_with(|| State::default());
257         f(state)
258     }
259 
260     pub async fn test_property(mut f: impl FnMut(&mut State) -> bool) -> bool {
261         // Test if the property is ready, but it might require a sibling future
262         // task to run, so if it's not true yet then pump the executor a
263         // few times to let it finish.
264         for _ in 0..1000 {
265             if State::with(&mut f) {
266                 return true;
267             }
268             wit_bindgen::yield_async().await;
269         }
270         return false;
271     }
272 }
273 
274 async fn run(mut commands: StreamReader<Command>) {
275     while let Some(command) = commands.next().await {
276         match command {
277             Command::SyncReadyCall => i::sync_ready(),
278 
279             Command::AsyncReadyCall => assert_ready(pin!(i::async_ready())),
280 
281             Command::AsyncPendingExportComplete(i) => {
282                 assert!(
283                     State::test_property(|s| s.async_pending_exports_ready.contains_key(&i)).await,
284                     "expected async_pending export {i} should be pending",
285                 );
286                 debug!("finishing export {i}");
287                 State::with(|s| {
288                     s.async_pending_exports_ready
289                         .remove(&i)
290                         .unwrap()
291                         .send(())
292                         .unwrap();
293                 });
294             }
295             Command::AsyncPendingExportAssertCancelled(i) => {
296                 assert!(
297                     State::test_property(|s| s.async_pending_exports_cancelled.remove(&i)).await,
298                     "expected async_pending export {i} to be cancelled",
299                 );
300             }
301             Command::AsyncPendingImportCall(i) => {
302                 let mut future = Box::pin(i::async_pending(i));
303                 debug!("starting export {i}");
304                 assert_not_ready(future.as_mut());
305                 let (cancel_tx, mut cancel_rx) = oneshot::channel();
306                 State::with(|s| {
307                     s.async_pending_imports_in_progress.insert(i, cancel_tx);
308                 });
309                 wit_bindgen::spawn(async move {
310                     futures::select! {
311                         _ = cancel_rx => {}
312                         _ = future.fuse() => {
313                             State::with(|s| s.async_pending_imports_ready.insert(i));
314                         }
315                     }
316                 });
317             }
318             Command::AsyncPendingImportCancel(i) => {
319                 debug!("cancelling import {i}");
320                 State::with(|s| {
321                     s.async_pending_imports_in_progress
322                         .remove(&i)
323                         .unwrap()
324                         .send(())
325                         .unwrap();
326                 });
327             }
328             Command::AsyncPendingImportAssertReady(i) => {
329                 assert!(
330                     State::test_property(|s| s.async_pending_imports_ready.remove(&i)).await,
331                     "expected async_pending import {i} to be ready",
332                 );
333             }
334 
335             Command::FutureNew(id) => {
336                 let (writer, reader) = wit_future::new(|| unreachable!());
337                 State::with(|s| {
338                     let prev = s.future_writers.insert(id, writer);
339                     assert!(prev.is_none());
340                     let prev = s.future_readers.insert(id, reader);
341                     assert!(prev.is_none());
342                 });
343             }
344             Command::FutureTake(id) => {
345                 let reader = i::future_take(id);
346                 State::with(|s| {
347                     let prev = s.future_readers.insert(id, reader);
348                     assert!(prev.is_none());
349                 });
350             }
351             Command::FutureGive(id) => {
352                 let reader = State::with(|s| s.future_readers.remove(&id).unwrap());
353                 i::future_receive(id, reader);
354             }
355             Command::FutureDropReadable(id) => {
356                 let _ = State::with(|s| s.future_readers.remove(&id).unwrap());
357             }
358             Command::FutureWriteReady(payload) => {
359                 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap());
360                 assert_ready(pin!(writer.write(payload.item))).unwrap();
361             }
362             Command::FutureReadReady(payload) => {
363                 let reader = State::with(|s| s.future_readers.remove(&payload.future).unwrap());
364                 assert_eq!(assert_ready(pin!(reader.into_future())), payload.item);
365             }
366             Command::FutureWriteDropped(id) => {
367                 let writer = State::with(|s| s.future_writers.remove(&id).unwrap());
368                 match assert_ready(pin!(writer.write(0))) {
369                     Ok(_) => panic!("should be dropped"),
370                     Err(_) => {}
371                 }
372             }
373             Command::FutureWritePending(payload) => {
374                 use wit_bindgen::FutureWriteCancel;
375 
376                 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap());
377                 let (tx, rx) = oneshot::channel();
378                 let mut future = Box::pin(CancellableFutureWrite {
379                     cancel: rx,
380                     write: writer.write(payload.item),
381                 });
382                 assert_not_ready(future.as_mut());
383                 wit_bindgen::spawn(async move {
384                     let result = future.await;
385                     debug!("future write {} completed: {result:?}", payload.future);
386                     State::with(|s| match result {
387                         FutureWriteCancel::AlreadySent => {
388                             s.future_writes_completed.insert(payload.future, true);
389                         }
390                         FutureWriteCancel::Dropped(_) => {
391                             s.future_writes_completed.insert(payload.future, false);
392                         }
393                         FutureWriteCancel::Cancelled(_, writer) => {
394                             let prev = s.future_writers.insert(payload.future, writer);
395                             assert!(prev.is_none());
396                         }
397                     });
398                 });
399                 State::with(|s| {
400                     let prev = s.future_write_cancel_signals.insert(payload.future, tx);
401                     assert!(prev.is_none());
402                 });
403 
404                 pin_project! {
405                     struct CancellableFutureWrite {
406                         #[pin]
407                         cancel: oneshot::Receiver<()>,
408                         #[pin]
409                         write: wit_bindgen::FutureWrite<u32>,
410                     }
411                 }
412 
413                 impl Future for CancellableFutureWrite {
414                     type Output = FutureWriteCancel<u32>;
415 
416                     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
417                         let this = self.project();
418                         match this.cancel.poll(cx) {
419                             Poll::Ready(_) => return Poll::Ready(this.write.cancel()),
420                             Poll::Pending => {}
421                         }
422                         match this.write.poll(cx) {
423                             Poll::Ready(Ok(())) => Poll::Ready(FutureWriteCancel::AlreadySent),
424                             Poll::Ready(Err(val)) => {
425                                 Poll::Ready(FutureWriteCancel::Dropped(val.value))
426                             }
427                             Poll::Pending => Poll::Pending,
428                         }
429                     }
430                 }
431             }
432             Command::FutureReadPending(id) => {
433                 let reader = State::with(|s| s.future_readers.remove(&id).unwrap());
434                 let (tx, rx) = oneshot::channel();
435                 let mut future = Box::pin(CancellableFutureRead {
436                     cancel: rx,
437                     read: reader.into_future(),
438                 });
439                 assert_not_ready(future.as_mut());
440                 wit_bindgen::spawn(async move {
441                     let result = future.await;
442                     State::with(|s| match result {
443                         Ok(result) => {
444                             let prev = s.future_reads_completed.insert(id, result);
445                             assert!(prev.is_none());
446                         }
447                         Err(reader) => {
448                             let prev = s.future_readers.insert(id, reader);
449                             assert!(prev.is_none());
450                         }
451                     });
452                 });
453                 State::with(|s| {
454                     let prev = s.future_read_cancel_signals.insert(id, tx);
455                     assert!(prev.is_none());
456                 });
457 
458                 pin_project! {
459                     struct CancellableFutureRead {
460                         #[pin]
461                         cancel: oneshot::Receiver<()>,
462                         #[pin]
463                         read: wit_bindgen::FutureRead<u32>,
464                     }
465                 }
466 
467                 impl Future for CancellableFutureRead {
468                     type Output = Result<u32, FutureReader<u32>>;
469 
470                     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
471                         let this = self.project();
472                         match this.cancel.poll(cx) {
473                             Poll::Ready(_) => return Poll::Ready(this.read.cancel()),
474                             Poll::Pending => {}
475                         }
476                         match this.read.poll(cx) {
477                             Poll::Ready(i) => Poll::Ready(Ok(i)),
478                             Poll::Pending => Poll::Pending,
479                         }
480                     }
481                 }
482             }
483             Command::FutureCancelWrite(id) => {
484                 State::with(|s| {
485                     s.future_write_cancel_signals
486                         .remove(&id)
487                         .unwrap()
488                         .send(())
489                         .unwrap();
490                 });
491                 assert!(
492                     State::test_property(|s| s.future_writers.contains_key(&id)).await,
493                     "expected future write {id} to be cancelled",
494                 );
495             }
496             Command::FutureCancelRead(id) => {
497                 State::with(|s| {
498                     s.future_read_cancel_signals
499                         .remove(&id)
500                         .unwrap()
501                         .send(())
502                         .unwrap();
503                 });
504                 assert!(
505                     State::test_property(|s| s.future_readers.contains_key(&id)).await,
506                     "expected future read {id} to be cancelled",
507                 );
508             }
509             Command::FutureWriteAssertComplete(id) => {
510                 assert!(
511                     State::test_property(|s| match s.future_writes_completed.remove(&id) {
512                         Some(true) => true,
513                         Some(false) => panic!("future was dropped"),
514                         None => false,
515                     })
516                     .await,
517                     "expected future write {id} to be complete",
518                 );
519             }
520             Command::FutureWriteAssertDropped(id) => {
521                 assert!(
522                     State::test_property(|s| match s.future_writes_completed.remove(&id) {
523                         Some(true) => panic!("future write completed"),
524                         Some(false) => true,
525                         None => false,
526                     })
527                     .await,
528                     "expected future write {id} to be complete",
529                 );
530             }
531             Command::FutureReadAssertComplete(payload) => {
532                 assert!(
533                     State::test_property(|s| {
534                         match s.future_reads_completed.remove(&payload.future) {
535                             Some(i) => {
536                                 assert_eq!(i, payload.item);
537                                 true
538                             }
539                             None => false,
540                         }
541                     })
542                     .await,
543                     "expected future read {} to be complete",
544                     payload.future,
545                 );
546             }
547 
548             Command::StreamNew(id) => {
549                 let (writer, reader) = wit_stream::new();
550                 State::with(|s| {
551                     let prev = s.stream_writers.insert(id, writer);
552                     assert!(prev.is_none());
553                     let prev = s.stream_readers.insert(id, reader);
554                     assert!(prev.is_none());
555                 });
556             }
557             Command::StreamTake(id) => {
558                 let reader = i::stream_take(id);
559                 State::with(|s| {
560                     let prev = s.stream_readers.insert(id, reader);
561                     assert!(prev.is_none());
562                 });
563             }
564             Command::StreamGive(id) => {
565                 let reader = State::with(|s| s.stream_readers.remove(&id).unwrap());
566                 i::stream_receive(id, reader);
567             }
568             Command::StreamDropReadable(id) => {
569                 let _ = State::with(|s| s.stream_readers.remove(&id).unwrap());
570             }
571             Command::StreamDropWritable(id) => {
572                 let _ = State::with(|s| s.stream_writers.remove(&id).unwrap());
573             }
574             Command::StreamWriteReady(payload) => {
575                 State::with(|s| {
576                     let writer = s.stream_writers.get_mut(&payload.stream).unwrap();
577                     let (status, buffer) = assert_ready(pin!(
578                         writer.write(stream_payload(payload.item, payload.op_count))
579                     ));
580                     assert_eq!(status, StreamResult::Complete(payload.ready_count as usize));
581                     assert_eq!(
582                         buffer.remaining() as u32,
583                         payload.op_count - payload.ready_count
584                     );
585                 });
586             }
587             Command::StreamWriteDropped(payload) => {
588                 State::with(|s| {
589                     let writer = s.stream_writers.get_mut(&payload.stream).unwrap();
590                     let (status, buffer) = assert_ready(pin!(
591                         writer.write(stream_payload(payload.item, payload.count))
592                     ));
593                     assert_eq!(status, StreamResult::Dropped);
594                     assert_eq!(buffer.remaining() as u32, payload.count);
595                 });
596             }
597             Command::StreamReadReady(payload) => {
598                 State::with(|s| {
599                     let reader = s.stream_readers.get_mut(&payload.stream).unwrap();
600                     let (status, buffer) = assert_ready(pin!(
601                         reader.read(Vec::with_capacity(payload.op_count as usize))
602                     ));
603                     assert_eq!(status, StreamResult::Complete(payload.ready_count as usize));
604                     assert_eq!(buffer, stream_payload(payload.item, payload.ready_count));
605                 });
606             }
607             Command::StreamReadDropped(payload) => {
608                 State::with(|s| {
609                     let reader = s.stream_readers.get_mut(&payload.stream).unwrap();
610                     let (status, buffer) = assert_ready(pin!(
611                         reader.read(Vec::with_capacity(payload.count as usize))
612                     ));
613                     assert_eq!(status, StreamResult::Dropped);
614                     assert!(buffer.is_empty());
615                 });
616             }
617             Command::StreamWritePending(payload) => {
618                 debug!("write pending: {}", payload.stream);
619                 let mut writer = State::with(|s| s.stream_writers.remove(&payload.stream).unwrap());
620                 let (tx, rx) = oneshot::channel();
621                 State::with(|s| {
622                     let prev = s.stream_write_cancel_signals.insert(payload.stream, tx);
623                     assert!(prev.is_none());
624                 });
625                 let mut future = Box::pin(async move {
626                     debug!("write pending start: {}", payload.stream);
627                     let (result, remaining) = CancellableStreamWrite {
628                         cancel: rx,
629                         write: writer.write(stream_payload(payload.item, payload.count)),
630                     }
631                     .await;
632                     debug!("write pending done: {} {result:?}", payload.stream);
633                     State::with(|s| {
634                         let _ = s.stream_write_cancel_signals.remove(&payload.stream);
635                         match result {
636                             StreamResult::Complete(n) => {
637                                 s.stream_writes_completed
638                                     .insert(payload.stream, Ok((n, remaining)));
639                             }
640                             StreamResult::Dropped => {
641                                 s.stream_writes_completed
642                                     .insert(payload.stream, Err((0, remaining)));
643                             }
644                             StreamResult::Cancelled => {}
645                         }
646                         let prev = s.stream_writers.insert(payload.stream, writer);
647                         assert!(prev.is_none());
648                     });
649                 });
650                 assert_not_ready(future.as_mut());
651                 wit_bindgen::spawn(future);
652 
653                 pin_project! {
654                     struct CancellableStreamWrite<'a> {
655                         #[pin]
656                         cancel: oneshot::Receiver<()>,
657                         #[pin]
658                         write: wit_bindgen::StreamWrite<'a, u32>,
659                     }
660                 }
661 
662                 impl Future for CancellableStreamWrite<'_> {
663                     type Output = (StreamResult, Vec<u32>);
664 
665                     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666                         let this = self.project();
667                         let (result, buffer) = match this.cancel.poll(cx) {
668                             Poll::Ready(_) => this.write.cancel(),
669                             Poll::Pending => match this.write.poll(cx) {
670                                 Poll::Ready(result) => result,
671                                 Poll::Pending => return Poll::Pending,
672                             },
673                         };
674                         Poll::Ready((result, buffer.into_vec()))
675                     }
676                 }
677             }
678             Command::StreamReadPending(payload) => {
679                 let mut reader = State::with(|s| s.stream_readers.remove(&payload.stream).unwrap());
680                 let (tx, rx) = oneshot::channel();
681                 State::with(|s| {
682                     let prev = s.stream_read_cancel_signals.insert(payload.stream, tx);
683                     assert!(prev.is_none());
684                 });
685                 let mut future = Box::pin(async move {
686                     let (result, buf) = CancellableStreamRead {
687                         cancel: rx,
688                         read: reader.read(Vec::with_capacity(payload.count as usize)),
689                     }
690                     .await;
691                     State::with(|s| {
692                         let _ = s.stream_read_cancel_signals.remove(&payload.stream);
693                         match result {
694                             StreamResult::Complete(_) => {
695                                 s.stream_reads_completed.insert(payload.stream, Some(buf));
696                             }
697                             StreamResult::Dropped => {
698                                 assert!(buf.is_empty(), "dropped but got {}", buf.len());
699                                 s.stream_reads_completed.insert(payload.stream, None);
700                             }
701                             StreamResult::Cancelled => {}
702                         }
703                         let prev = s.stream_readers.insert(payload.stream, reader);
704                         assert!(prev.is_none());
705                     });
706                 });
707                 assert_not_ready(future.as_mut());
708                 wit_bindgen::spawn(future);
709 
710                 pin_project! {
711                     struct CancellableStreamRead<'a> {
712                         #[pin]
713                         cancel: oneshot::Receiver<()>,
714                         #[pin]
715                         read: wit_bindgen::StreamRead<'a, u32>,
716                     }
717                 }
718 
719                 impl Future for CancellableStreamRead<'_> {
720                     type Output = (StreamResult, Vec<u32>);
721 
722                     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
723                         let this = self.project();
724                         let (result, buffer) = match this.cancel.poll(cx) {
725                             Poll::Ready(_) => this.read.cancel(),
726                             Poll::Pending => match this.read.poll(cx) {
727                                 Poll::Ready(result) => result,
728                                 Poll::Pending => return Poll::Pending,
729                             },
730                         };
731                         Poll::Ready((result, buffer))
732                     }
733                 }
734             }
735             Command::StreamCancelWrite(id) => {
736                 State::with(|s| {
737                     s.stream_write_cancel_signals
738                         .remove(&id)
739                         .unwrap()
740                         .send(())
741                         .unwrap();
742                 });
743                 assert!(
744                     State::test_property(|s| s.stream_writers.contains_key(&id)).await,
745                     "expected cancel write {id} to be cancelled",
746                 );
747             }
748             Command::StreamCancelRead(id) => {
749                 State::with(|s| {
750                     s.stream_read_cancel_signals
751                         .remove(&id)
752                         .unwrap()
753                         .send(())
754                         .unwrap();
755                 });
756                 assert!(
757                     State::test_property(|s| s.stream_readers.contains_key(&id)).await,
758                     "expected future read {id} to be cancelled",
759                 );
760             }
761             Command::StreamWriteAssertComplete(payload) => {
762                 assert!(
763                     State::test_property(|s| {
764                         match s.stream_writes_completed.remove(&payload.stream) {
765                             Some(Ok((size, _buf))) => {
766                                 assert_eq!(size, payload.count as usize);
767                                 true
768                             }
769                             Some(Err(_)) => panic!("stream was dropped"),
770                             None => false,
771                         }
772                     })
773                     .await,
774                     "expected stream write {} to be complete",
775                     payload.stream,
776                 );
777             }
778             Command::StreamWriteAssertDropped(payload) => {
779                 assert!(
780                     State::test_property(|s| {
781                         match s.stream_writes_completed.remove(&payload.stream) {
782                             Some(Err((size, _buf))) => {
783                                 assert_eq!(size, payload.count as usize);
784                                 true
785                             }
786                             Some(Ok(_)) => panic!("stream was not dropped"),
787                             None => false,
788                         }
789                     })
790                     .await,
791                     "expected stream write {} to be complete",
792                     payload.stream,
793                 );
794             }
795             Command::StreamReadAssertComplete(payload) => {
796                 assert!(
797                     State::test_property(|s| {
798                         match s.stream_reads_completed.remove(&payload.stream) {
799                             Some(Some(i)) => {
800                                 assert_eq!(i, stream_payload(payload.item, payload.count));
801                                 true
802                             }
803                             Some(None) => panic!("stream was dropped"),
804                             None => false,
805                         }
806                     })
807                     .await,
808                     "expected stream read {} to be complete",
809                     payload.stream,
810                 );
811             }
812             Command::StreamReadAssertDropped(id) => {
813                 assert!(
814                     State::test_property(|s| {
815                         match s.stream_reads_completed.remove(&id) {
816                             Some(None) => true,
817                             Some(Some(_)) => panic!("stream was not dropped"),
818                             None => false,
819                         }
820                     })
821                     .await,
822                     "expected stream read {id} to be complete",
823                 );
824             }
825 
826             Command::Ack => {}
827         }
828     }
829 }
830 
831 fn stream_payload(init: u32, count: u32) -> Vec<u32> {
832     (init..init + count).collect()
833 }
834 
835 fn assert_ready<F: Future>(f: Pin<&mut F>) -> F::Output {
836     let mut cx = Context::from_waker(Waker::noop());
837     match f.poll(&mut cx) {
838         Poll::Ready(i) => i,
839         Poll::Pending => panic!("future was pending"),
840     }
841 }
842 
843 fn assert_not_ready<F: Future>(f: Pin<&mut F>) {
844     let mut cx = Context::from_waker(Waker::noop());
845     match f.poll(&mut cx) {
846         Poll::Ready(_) => panic!("future is ready"),
847         Poll::Pending => {}
848     }
849 }
850 
851 fn main() {
852     unreachable!();
853 }
854