1 //! Test case used with the `component_async` fuzzer which is part of the `misc`
2 //! fuzz target of Wasmtime.
3 //!
4 //! This test case is a binary that's suited for just that one fuzzer and has an
5 //! associated WIT world that it works with. This test case is composed with
6 //! itself and then run within the host as well. The exact semantics of this
7 //! program and all the exports/imports are defined within the context of the
8 //! fuzzer.
9 //!
10 //! The general idea is that this program creates an "async soup" and make sure
11 //! that everything works as expected, notably also not leading to any panics
12 //! anywhere within the runtime. An example of what this fuzzer intermingles
13 //! are:
14 //!
15 //! * Synchronous calls
16 //! * Async calls that are immediately ready
17 //! * Async calls that are not immediately ready and left pending
18 //! * Creation of futures/streams
19 //! * Moving futures/streams between components
20 //! * Reading/writing futures/streams
21 //! * Cancelling reads/writes of futures/streams
22 //! * Seeing futures/streams get dropped and the effect on active reads/writes
23 //! * Mixing host<->guest, guest<->guest, guest<->host, and host<->host
24 //! calls/primitives
25 //!
26 //! The purpose of this fuzzer is to stress the management of async stacks, the
27 //! async runtime, and in theory suss out various edge cases in the handling of
28 //! async events. This fuzzer does NOT stress lifting/lowering at all because
29 //! there is a static WIT signature that this fuzzer works with.
30 //!
31 //! Much of the code in this file is semi-duplicated in the host except written
32 //! with host `wasmtime` APIs instead of `wit-bindgen` APIs. The overall
33 //! structure is roughly the same.
34 //!
35 //! # Overall architecture
36 //!
37 //! The general structure of this fuzzer is that there's a "host sandwich" which
38 //! looks like:
39 //!
40 //! ```text
41 //! ╔══════╦══════════════════════════════════════════════════════════╗
42 //! ║ Host ║ ║
43 //! ╠══════╝ ║
44 //! ║ ║
45 //! ║ ┍┯┯┯━━━ wasmtime:fuzz/types ║
46 //! ║ ││││ ║
47 //! ║ ││││ ║
48 //! ║ ││││ ╔════════════════════╦════════════════════╗ ║
49 //! ║ ││││ ║ component_async.rs ║ ║ ║
50 //! ║ ││││ ╠════════════════════╝ ║ ║
51 //! ║ ││││ ║ ║ ║
52 //! ║ ││││ ║ HostCaller ║ ║
53 //! ║ ││││ ║ ║ ║
54 //! ║ │││└────────╫─→ stream<command> ║ ║
55 //! ║ │││ ╚═══════════════════╤═════════════════════╝ ║
56 //! ║ │││ │ ║
57 //! ║ │││ ┝ wasmtime-fuzz:fuzz/async-test ║
58 //! ║ │││ │ ║
59 //! ║ │││ ╔═══════════╦════════════╪═════════════════════════════╗ ║
60 //! ║ │││ ║ Component ║ │ ║ ║
61 //! ║ │││ ╠═══════════╝ │ ║ ║
62 //! ║ │││ ║ ↓ ║ ║
63 //! ║ │││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║
64 //! ║ │││ ║ ║ fuzz-async.wasm ║ ║ ║ ║
65 //! ║ │││ ║ ╠═════════════════╝ ║ ║ ║
66 //! ║ │││ ║ ║ ║ ║ ║
67 //! ║ │││ ║ ║ GuestCaller ║ ║ ║
68 //! ║ │││ ║ ║ ║ ║ ║
69 //! ║ ││└────╫────╫─→ stream<command> ║ ║ ║
70 //! ║ ││ ║ ╚═══════╤═════════════════════════════════╝ ║ ║
71 //! ║ ││ ║ │ ║ ║
72 //! ║ ││ ║ ┝ wasmtime-fuzz:fuzz/async-test ║ ║
73 //! ║ ││ ║ │ ║ ║
74 //! ║ ││ ║ ↓ ║ ║
75 //! ║ ││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║
76 //! ║ ││ ║ ║ fuzz-async.wasm ║ ║ ║ ║
77 //! ║ ││ ║ ╠═════════════════╝ ║ ║ ║
78 //! ║ ││ ║ ║ ║ ║ ║
79 //! ║ ││ ║ ║ GuestCallee ║ ║ ║
80 //! ║ ││ ║ ║ ║ ║ ║
81 //! ║ │└─────╫────╫─→ stream<command> ║ ║ ║
82 //! ║ │ ║ ╚═══════════════════╤═════════════════════╝ ║ ║
83 //! ║ │ ║ │ ║ ║
84 //! ║ │ ║ │ ║ ║
85 //! ║ │ ╚════════════════════════╪═════════════════════════════╝ ║
86 //! ║ │ │ ║
87 //! ║ │ ┝ wasmtime-fuzz:fuzz/async-test ║
88 //! ║ │ │ ║
89 //! ║ │ ↓ ║
90 //! ║ │ ╔════════════════════╦════════════════════╗ ║
91 //! ║ │ ║ component_async.rs ║ ║ ║
92 //! ║ │ ╠════════════════════╝ ║ ║
93 //! ║ │ ║ ║ ║
94 //! ║ │ ║ HostCallee ║ ║
95 //! ║ │ ║ ║ ║
96 //! ║ └───────────╫─→ stream<command> ║ ║
97 //! ║ ╚═════════════════════════════════════════╝ ║
98 //! ║ ║
99 //! ╚═════════════════════════════════════════════════════════════════╝
100 //! ```
101 //!
102 //! Here `fuzz-async.wasm` appears twice to model all the various types of
103 //! the host/guest interaction matrix. Everything is driven by a
104 //! `stream<command>` provided to each component part of the system which
105 //! serves as a means of forcing one particular component to take action.
106 //! Commands are then the test case itself where a series of commands are
107 //! executed for each fuzz iteration.
108 //!
109 //! # Yield-loops
110 //!
111 //! This program has a function `test_property` which is a similar analog to the
112 //! one in the host-side as well. The general idea is that while component model
113 //! async is generally deterministic it does not specify what should happen when
114 //! multiple events are ready at the same time. This can pretty easily happen in
115 //! this fuzzer meaning that it's not precise which event happens first. To
116 //! assist in managing this there are two primary mitigations:
117 //!
118 //! * The first is that whenever a command is dispatched to a component it's
119 //! followed up with an "ack" which is a noop. Delivery of the "ack" can't
120 //! happen until the previous command is completely finished being processed
121 //! meaning it's a kludge way of synchronizing the receipt of a message.
122 //!
123 //! * The second is that there can still be small races where an async event
124 //! hasn't quite happened yet but it's queued up to happen. To handle these
125 //! events calls to `test_property` are sprinkled around which has an
126 //! internally-bounded yield-loop. It's expected that while yielding other
127 //! code can run which resolves the property being tested at-hand, and then
128 //! this yield loop will panic if it turns too many times as it's probably a
129 //! bug.
130 //!
131 //! It's a bit of a hack but it's so far the most effective way of handling this
132 //! that's (a) not timing-dependent e.g. adding sleeps, (b) is
133 //! reliable/deterministic, and (c) is flexible where the constant number of
134 //! yields can be bumped without much concern. The number of yields specifically
135 //! is arbitrarily chosen and while it can't be said exactly how many yields
136 //! should be necessary it should be able to say "less than N should always
137 //! work".
138
139 wit_bindgen::generate!("fuzz-async" in "../fuzzing/wit");
140
141 use crate::exports::wasmtime_fuzz::fuzz::async_test as e;
142 use crate::wasmtime_fuzz::fuzz::async_test as i;
143 use crate::wasmtime_fuzz::fuzz::types::{self, Command, Scope};
144 use futures::FutureExt;
145 use futures::channel::oneshot;
146 use pin_project_lite::pin_project;
147 use std::collections::{HashMap, HashSet};
148 use std::mem;
149 use std::pin::{Pin, pin};
150 use std::sync::Mutex;
151 use std::sync::atomic::{AtomicBool, Ordering};
152 use std::task::{Context, Poll, Waker};
153 use wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamResult, StreamWriter};
154
155 struct Component;
156
157 export!(Component);
158
159 // Convenience macro to change the "target" of `log::debug!` based on whether
160 // this component is a `caller` or `callee` scope to distinguish logs in the
161 // output.
162 macro_rules! debug {
163 ($($arg:tt)*) => {
164 log::debug!(target: log_target(), $($arg)*);
165 }
166 }
167
168 static IS_CALLER: AtomicBool = AtomicBool::new(false);
169
log_target() -> &'static str170 fn log_target() -> &'static str {
171 if IS_CALLER.load(Ordering::Relaxed) {
172 "wasmtime_fuzzing::fuzz_async::caller"
173 } else {
174 "wasmtime_fuzzing::fuzz_async::callee"
175 }
176 }
177
178 impl e::Guest for Component {
sync_ready()179 fn sync_ready() {}
180
async_ready()181 async fn async_ready() {}
182
async_pending(id: u32)183 async fn async_pending(id: u32) {
184 let (tx, rx) = oneshot::channel();
185 State::with(|s| s.async_pending_exports_ready.insert(id, tx));
186 let record = RecordCancelOnDrop(id);
187 rx.await.unwrap();
188 mem::forget(record);
189 debug!("export {id} is complete");
190
191 struct RecordCancelOnDrop(u32);
192
193 impl Drop for RecordCancelOnDrop {
194 fn drop(&mut self) {
195 debug!("export {} was cancelled", self.0);
196 State::with(|s| {
197 s.async_pending_exports_cancelled.insert(self.0);
198 });
199 }
200 }
201 }
202
init(scope: Scope)203 async fn init(scope: Scope) {
204 IS_CALLER.store(scope == Scope::Caller, Ordering::Relaxed);
205 env_logger::init();
206 i::init(Scope::Callee).await;
207 let commands = types::get_commands(scope);
208 wit_bindgen::spawn(run(commands));
209 }
210
future_take(id: u32) -> FutureReader<u32>211 fn future_take(id: u32) -> FutureReader<u32> {
212 State::with(|s| s.future_readers.remove(&id).unwrap())
213 }
214
future_receive(id: u32, f: FutureReader<u32>)215 fn future_receive(id: u32, f: FutureReader<u32>) {
216 let prev = State::with(|s| s.future_readers.insert(id, f));
217 assert!(prev.is_none());
218 }
219
stream_take(id: u32) -> StreamReader<u32>220 fn stream_take(id: u32) -> StreamReader<u32> {
221 State::with(|s| s.stream_readers.remove(&id).unwrap())
222 }
223
stream_receive(id: u32, f: StreamReader<u32>)224 fn stream_receive(id: u32, f: StreamReader<u32>) {
225 let prev = State::with(|s| s.stream_readers.insert(id, f));
226 assert!(prev.is_none());
227 }
228 }
229
230 #[derive(Default)]
231 struct State {
232 async_pending_imports_ready: HashSet<u32>,
233 async_pending_imports_in_progress: HashMap<u32, oneshot::Sender<()>>,
234 async_pending_exports_ready: HashMap<u32, oneshot::Sender<()>>,
235 async_pending_exports_cancelled: HashSet<u32>,
236
237 future_readers: HashMap<u32, FutureReader<u32>>,
238 future_writers: HashMap<u32, FutureWriter<u32>>,
239 future_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
240 future_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
241 future_writes_completed: HashMap<u32, bool>,
242 future_reads_completed: HashMap<u32, u32>,
243
244 stream_readers: HashMap<u32, StreamReader<u32>>,
245 stream_writers: HashMap<u32, StreamWriter<u32>>,
246 stream_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
247 stream_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>,
248 stream_writes_completed: HashMap<u32, Result<(usize, Vec<u32>), (usize, Vec<u32>)>>,
249 stream_reads_completed: HashMap<u32, Option<Vec<u32>>>,
250 }
251
252 impl State {
with<R>(f: impl FnOnce(&mut State) -> R) -> R253 pub fn with<R>(f: impl FnOnce(&mut State) -> R) -> R {
254 static STATE: Mutex<Option<State>> = Mutex::new(None);
255 let mut state = STATE.lock().unwrap();
256 let state = state.get_or_insert_with(|| State::default());
257 f(state)
258 }
259
test_property(mut f: impl FnMut(&mut State) -> bool) -> bool260 pub async fn test_property(mut f: impl FnMut(&mut State) -> bool) -> bool {
261 // Test if the property is ready, but it might require a sibling future
262 // task to run, so if it's not true yet then pump the executor a
263 // few times to let it finish.
264 for _ in 0..1000 {
265 if State::with(&mut f) {
266 return true;
267 }
268 wit_bindgen::yield_async().await;
269 }
270 return false;
271 }
272 }
273
run(mut commands: StreamReader<Command>)274 async fn run(mut commands: StreamReader<Command>) {
275 while let Some(command) = commands.next().await {
276 match command {
277 Command::SyncReadyCall => i::sync_ready(),
278
279 Command::AsyncReadyCall => assert_ready(pin!(i::async_ready())),
280
281 Command::AsyncPendingExportComplete(i) => {
282 assert!(
283 State::test_property(|s| s.async_pending_exports_ready.contains_key(&i)).await,
284 "expected async_pending export {i} should be pending",
285 );
286 debug!("finishing export {i}");
287 State::with(|s| {
288 s.async_pending_exports_ready
289 .remove(&i)
290 .unwrap()
291 .send(())
292 .unwrap();
293 });
294 }
295 Command::AsyncPendingExportAssertCancelled(i) => {
296 assert!(
297 State::test_property(|s| s.async_pending_exports_cancelled.remove(&i)).await,
298 "expected async_pending export {i} to be cancelled",
299 );
300 }
301 Command::AsyncPendingImportCall(i) => {
302 let mut future = Box::pin(i::async_pending(i));
303 debug!("starting export {i}");
304 assert_not_ready(future.as_mut());
305 let (cancel_tx, mut cancel_rx) = oneshot::channel();
306 State::with(|s| {
307 s.async_pending_imports_in_progress.insert(i, cancel_tx);
308 });
309 wit_bindgen::spawn(async move {
310 futures::select! {
311 _ = cancel_rx => {}
312 _ = future.fuse() => {
313 State::with(|s| s.async_pending_imports_ready.insert(i));
314 }
315 }
316 });
317 }
318 Command::AsyncPendingImportCancel(i) => {
319 debug!("cancelling import {i}");
320 State::with(|s| {
321 s.async_pending_imports_in_progress
322 .remove(&i)
323 .unwrap()
324 .send(())
325 .unwrap();
326 });
327 }
328 Command::AsyncPendingImportAssertReady(i) => {
329 assert!(
330 State::test_property(|s| s.async_pending_imports_ready.remove(&i)).await,
331 "expected async_pending import {i} to be ready",
332 );
333 }
334
335 Command::FutureNew(id) => {
336 let (writer, reader) = wit_future::new(|| unreachable!());
337 State::with(|s| {
338 let prev = s.future_writers.insert(id, writer);
339 assert!(prev.is_none());
340 let prev = s.future_readers.insert(id, reader);
341 assert!(prev.is_none());
342 });
343 }
344 Command::FutureTake(id) => {
345 let reader = i::future_take(id);
346 State::with(|s| {
347 let prev = s.future_readers.insert(id, reader);
348 assert!(prev.is_none());
349 });
350 }
351 Command::FutureGive(id) => {
352 let reader = State::with(|s| s.future_readers.remove(&id).unwrap());
353 i::future_receive(id, reader);
354 }
355 Command::FutureDropReadable(id) => {
356 let _ = State::with(|s| s.future_readers.remove(&id).unwrap());
357 }
358 Command::FutureWriteReady(payload) => {
359 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap());
360 assert_ready(pin!(writer.write(payload.item))).unwrap();
361 }
362 Command::FutureReadReady(payload) => {
363 let reader = State::with(|s| s.future_readers.remove(&payload.future).unwrap());
364 assert_eq!(assert_ready(pin!(reader.into_future())), payload.item);
365 }
366 Command::FutureWriteDropped(id) => {
367 let writer = State::with(|s| s.future_writers.remove(&id).unwrap());
368 match assert_ready(pin!(writer.write(0))) {
369 Ok(_) => panic!("should be dropped"),
370 Err(_) => {}
371 }
372 }
373 Command::FutureWritePending(payload) => {
374 use wit_bindgen::FutureWriteCancel;
375
376 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap());
377 let (tx, rx) = oneshot::channel();
378 let mut future = Box::pin(CancellableFutureWrite {
379 cancel: rx,
380 write: writer.write(payload.item),
381 });
382 assert_not_ready(future.as_mut());
383 wit_bindgen::spawn(async move {
384 let result = future.await;
385 debug!("future write {} completed: {result:?}", payload.future);
386 State::with(|s| match result {
387 FutureWriteCancel::AlreadySent => {
388 s.future_writes_completed.insert(payload.future, true);
389 }
390 FutureWriteCancel::Dropped(_) => {
391 s.future_writes_completed.insert(payload.future, false);
392 }
393 FutureWriteCancel::Cancelled(_, writer) => {
394 let prev = s.future_writers.insert(payload.future, writer);
395 assert!(prev.is_none());
396 }
397 });
398 });
399 State::with(|s| {
400 let prev = s.future_write_cancel_signals.insert(payload.future, tx);
401 assert!(prev.is_none());
402 });
403
404 pin_project! {
405 struct CancellableFutureWrite {
406 #[pin]
407 cancel: oneshot::Receiver<()>,
408 #[pin]
409 write: wit_bindgen::FutureWrite<u32>,
410 }
411 }
412
413 impl Future for CancellableFutureWrite {
414 type Output = FutureWriteCancel<u32>;
415
416 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
417 let this = self.project();
418 match this.cancel.poll(cx) {
419 Poll::Ready(_) => return Poll::Ready(this.write.cancel()),
420 Poll::Pending => {}
421 }
422 match this.write.poll(cx) {
423 Poll::Ready(Ok(())) => Poll::Ready(FutureWriteCancel::AlreadySent),
424 Poll::Ready(Err(val)) => {
425 Poll::Ready(FutureWriteCancel::Dropped(val.value))
426 }
427 Poll::Pending => Poll::Pending,
428 }
429 }
430 }
431 }
432 Command::FutureReadPending(id) => {
433 let reader = State::with(|s| s.future_readers.remove(&id).unwrap());
434 let (tx, rx) = oneshot::channel();
435 let mut future = Box::pin(CancellableFutureRead {
436 cancel: rx,
437 read: reader.into_future(),
438 });
439 assert_not_ready(future.as_mut());
440 wit_bindgen::spawn(async move {
441 let result = future.await;
442 State::with(|s| match result {
443 Ok(result) => {
444 let prev = s.future_reads_completed.insert(id, result);
445 assert!(prev.is_none());
446 }
447 Err(reader) => {
448 let prev = s.future_readers.insert(id, reader);
449 assert!(prev.is_none());
450 }
451 });
452 });
453 State::with(|s| {
454 let prev = s.future_read_cancel_signals.insert(id, tx);
455 assert!(prev.is_none());
456 });
457
458 pin_project! {
459 struct CancellableFutureRead {
460 #[pin]
461 cancel: oneshot::Receiver<()>,
462 #[pin]
463 read: wit_bindgen::FutureRead<u32>,
464 }
465 }
466
467 impl Future for CancellableFutureRead {
468 type Output = Result<u32, FutureReader<u32>>;
469
470 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
471 let this = self.project();
472 match this.cancel.poll(cx) {
473 Poll::Ready(_) => return Poll::Ready(this.read.cancel()),
474 Poll::Pending => {}
475 }
476 match this.read.poll(cx) {
477 Poll::Ready(i) => Poll::Ready(Ok(i)),
478 Poll::Pending => Poll::Pending,
479 }
480 }
481 }
482 }
483 Command::FutureCancelWrite(id) => {
484 State::with(|s| {
485 s.future_write_cancel_signals
486 .remove(&id)
487 .unwrap()
488 .send(())
489 .unwrap();
490 });
491 assert!(
492 State::test_property(|s| s.future_writers.contains_key(&id)).await,
493 "expected future write {id} to be cancelled",
494 );
495 }
496 Command::FutureCancelRead(id) => {
497 State::with(|s| {
498 s.future_read_cancel_signals
499 .remove(&id)
500 .unwrap()
501 .send(())
502 .unwrap();
503 });
504 assert!(
505 State::test_property(|s| s.future_readers.contains_key(&id)).await,
506 "expected future read {id} to be cancelled",
507 );
508 }
509 Command::FutureWriteAssertComplete(id) => {
510 assert!(
511 State::test_property(|s| match s.future_writes_completed.remove(&id) {
512 Some(true) => true,
513 Some(false) => panic!("future was dropped"),
514 None => false,
515 })
516 .await,
517 "expected future write {id} to be complete",
518 );
519 }
520 Command::FutureWriteAssertDropped(id) => {
521 assert!(
522 State::test_property(|s| match s.future_writes_completed.remove(&id) {
523 Some(true) => panic!("future write completed"),
524 Some(false) => true,
525 None => false,
526 })
527 .await,
528 "expected future write {id} to be complete",
529 );
530 }
531 Command::FutureReadAssertComplete(payload) => {
532 assert!(
533 State::test_property(|s| {
534 match s.future_reads_completed.remove(&payload.future) {
535 Some(i) => {
536 assert_eq!(i, payload.item);
537 true
538 }
539 None => false,
540 }
541 })
542 .await,
543 "expected future read {} to be complete",
544 payload.future,
545 );
546 }
547
548 Command::StreamNew(id) => {
549 let (writer, reader) = wit_stream::new();
550 State::with(|s| {
551 let prev = s.stream_writers.insert(id, writer);
552 assert!(prev.is_none());
553 let prev = s.stream_readers.insert(id, reader);
554 assert!(prev.is_none());
555 });
556 }
557 Command::StreamTake(id) => {
558 let reader = i::stream_take(id);
559 State::with(|s| {
560 let prev = s.stream_readers.insert(id, reader);
561 assert!(prev.is_none());
562 });
563 }
564 Command::StreamGive(id) => {
565 let reader = State::with(|s| s.stream_readers.remove(&id).unwrap());
566 i::stream_receive(id, reader);
567 }
568 Command::StreamDropReadable(id) => {
569 let _ = State::with(|s| s.stream_readers.remove(&id).unwrap());
570 }
571 Command::StreamDropWritable(id) => {
572 let _ = State::with(|s| s.stream_writers.remove(&id).unwrap());
573 }
574 Command::StreamWriteReady(payload) => {
575 State::with(|s| {
576 let writer = s.stream_writers.get_mut(&payload.stream).unwrap();
577 let (status, buffer) = assert_ready(pin!(
578 writer.write(stream_payload(payload.item, payload.op_count))
579 ));
580 assert_eq!(status, StreamResult::Complete(payload.ready_count as usize));
581 assert_eq!(
582 buffer.remaining() as u32,
583 payload.op_count - payload.ready_count
584 );
585 });
586 }
587 Command::StreamWriteDropped(payload) => {
588 State::with(|s| {
589 let writer = s.stream_writers.get_mut(&payload.stream).unwrap();
590 let (status, buffer) = assert_ready(pin!(
591 writer.write(stream_payload(payload.item, payload.count))
592 ));
593 assert_eq!(status, StreamResult::Dropped);
594 assert_eq!(buffer.remaining() as u32, payload.count);
595 });
596 }
597 Command::StreamReadReady(payload) => {
598 State::with(|s| {
599 let reader = s.stream_readers.get_mut(&payload.stream).unwrap();
600 let (status, buffer) = assert_ready(pin!(
601 reader.read(Vec::with_capacity(payload.op_count as usize))
602 ));
603 assert_eq!(status, StreamResult::Complete(payload.ready_count as usize));
604 assert_eq!(buffer, stream_payload(payload.item, payload.ready_count));
605 });
606 }
607 Command::StreamReadDropped(payload) => {
608 State::with(|s| {
609 let reader = s.stream_readers.get_mut(&payload.stream).unwrap();
610 let (status, buffer) = assert_ready(pin!(
611 reader.read(Vec::with_capacity(payload.count as usize))
612 ));
613 assert_eq!(status, StreamResult::Dropped);
614 assert!(buffer.is_empty());
615 });
616 }
617 Command::StreamWritePending(payload) => {
618 debug!("write pending: {}", payload.stream);
619 let mut writer = State::with(|s| s.stream_writers.remove(&payload.stream).unwrap());
620 let (tx, rx) = oneshot::channel();
621 State::with(|s| {
622 let prev = s.stream_write_cancel_signals.insert(payload.stream, tx);
623 assert!(prev.is_none());
624 });
625 let mut future = Box::pin(async move {
626 debug!("write pending start: {}", payload.stream);
627 let (result, remaining) = CancellableStreamWrite {
628 cancel: rx,
629 write: writer.write(stream_payload(payload.item, payload.count)),
630 }
631 .await;
632 debug!("write pending done: {} {result:?}", payload.stream);
633 State::with(|s| {
634 let _ = s.stream_write_cancel_signals.remove(&payload.stream);
635 match result {
636 StreamResult::Complete(n) => {
637 s.stream_writes_completed
638 .insert(payload.stream, Ok((n, remaining)));
639 }
640 StreamResult::Dropped => {
641 s.stream_writes_completed
642 .insert(payload.stream, Err((0, remaining)));
643 }
644 StreamResult::Cancelled => {}
645 }
646 let prev = s.stream_writers.insert(payload.stream, writer);
647 assert!(prev.is_none());
648 });
649 });
650 assert_not_ready(future.as_mut());
651 wit_bindgen::spawn(future);
652
653 pin_project! {
654 struct CancellableStreamWrite<'a> {
655 #[pin]
656 cancel: oneshot::Receiver<()>,
657 #[pin]
658 write: wit_bindgen::StreamWrite<'a, u32>,
659 }
660 }
661
662 impl Future for CancellableStreamWrite<'_> {
663 type Output = (StreamResult, Vec<u32>);
664
665 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666 let this = self.project();
667 let (result, buffer) = match this.cancel.poll(cx) {
668 Poll::Ready(_) => this.write.cancel(),
669 Poll::Pending => match this.write.poll(cx) {
670 Poll::Ready(result) => result,
671 Poll::Pending => return Poll::Pending,
672 },
673 };
674 Poll::Ready((result, buffer.into_vec()))
675 }
676 }
677 }
678 Command::StreamReadPending(payload) => {
679 let mut reader = State::with(|s| s.stream_readers.remove(&payload.stream).unwrap());
680 let (tx, rx) = oneshot::channel();
681 State::with(|s| {
682 let prev = s.stream_read_cancel_signals.insert(payload.stream, tx);
683 assert!(prev.is_none());
684 });
685 let mut future = Box::pin(async move {
686 let (result, buf) = CancellableStreamRead {
687 cancel: rx,
688 read: reader.read(Vec::with_capacity(payload.count as usize)),
689 }
690 .await;
691 State::with(|s| {
692 let _ = s.stream_read_cancel_signals.remove(&payload.stream);
693 match result {
694 StreamResult::Complete(_) => {
695 s.stream_reads_completed.insert(payload.stream, Some(buf));
696 }
697 StreamResult::Dropped => {
698 assert!(buf.is_empty(), "dropped but got {}", buf.len());
699 s.stream_reads_completed.insert(payload.stream, None);
700 }
701 StreamResult::Cancelled => {}
702 }
703 let prev = s.stream_readers.insert(payload.stream, reader);
704 assert!(prev.is_none());
705 });
706 });
707 assert_not_ready(future.as_mut());
708 wit_bindgen::spawn(future);
709
710 pin_project! {
711 struct CancellableStreamRead<'a> {
712 #[pin]
713 cancel: oneshot::Receiver<()>,
714 #[pin]
715 read: wit_bindgen::StreamRead<'a, u32>,
716 }
717 }
718
719 impl Future for CancellableStreamRead<'_> {
720 type Output = (StreamResult, Vec<u32>);
721
722 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
723 let this = self.project();
724 let (result, buffer) = match this.cancel.poll(cx) {
725 Poll::Ready(_) => this.read.cancel(),
726 Poll::Pending => match this.read.poll(cx) {
727 Poll::Ready(result) => result,
728 Poll::Pending => return Poll::Pending,
729 },
730 };
731 Poll::Ready((result, buffer))
732 }
733 }
734 }
735 Command::StreamCancelWrite(id) => {
736 State::with(|s| {
737 s.stream_write_cancel_signals
738 .remove(&id)
739 .unwrap()
740 .send(())
741 .unwrap();
742 });
743 assert!(
744 State::test_property(|s| s.stream_writers.contains_key(&id)).await,
745 "expected cancel write {id} to be cancelled",
746 );
747 }
748 Command::StreamCancelRead(id) => {
749 State::with(|s| {
750 s.stream_read_cancel_signals
751 .remove(&id)
752 .unwrap()
753 .send(())
754 .unwrap();
755 });
756 assert!(
757 State::test_property(|s| s.stream_readers.contains_key(&id)).await,
758 "expected future read {id} to be cancelled",
759 );
760 }
761 Command::StreamWriteAssertComplete(payload) => {
762 assert!(
763 State::test_property(|s| {
764 match s.stream_writes_completed.remove(&payload.stream) {
765 Some(Ok((size, _buf))) => {
766 assert_eq!(size, payload.count as usize);
767 true
768 }
769 Some(Err(_)) => panic!("stream was dropped"),
770 None => false,
771 }
772 })
773 .await,
774 "expected stream write {} to be complete",
775 payload.stream,
776 );
777 }
778 Command::StreamWriteAssertDropped(payload) => {
779 assert!(
780 State::test_property(|s| {
781 match s.stream_writes_completed.remove(&payload.stream) {
782 Some(Err((size, _buf))) => {
783 assert_eq!(size, payload.count as usize);
784 true
785 }
786 Some(Ok(_)) => panic!("stream was not dropped"),
787 None => false,
788 }
789 })
790 .await,
791 "expected stream write {} to be complete",
792 payload.stream,
793 );
794 }
795 Command::StreamReadAssertComplete(payload) => {
796 assert!(
797 State::test_property(|s| {
798 match s.stream_reads_completed.remove(&payload.stream) {
799 Some(Some(i)) => {
800 assert_eq!(i, stream_payload(payload.item, payload.count));
801 true
802 }
803 Some(None) => panic!("stream was dropped"),
804 None => false,
805 }
806 })
807 .await,
808 "expected stream read {} to be complete",
809 payload.stream,
810 );
811 }
812 Command::StreamReadAssertDropped(id) => {
813 assert!(
814 State::test_property(|s| {
815 match s.stream_reads_completed.remove(&id) {
816 Some(None) => true,
817 Some(Some(_)) => panic!("stream was not dropped"),
818 None => false,
819 }
820 })
821 .await,
822 "expected stream read {id} to be complete",
823 );
824 }
825
826 Command::Ack => {}
827 }
828 }
829 }
830
stream_payload(init: u32, count: u32) -> Vec<u32>831 fn stream_payload(init: u32, count: u32) -> Vec<u32> {
832 (init..init + count).collect()
833 }
834
assert_ready<F: Future>(f: Pin<&mut F>) -> F::Output835 fn assert_ready<F: Future>(f: Pin<&mut F>) -> F::Output {
836 let mut cx = Context::from_waker(Waker::noop());
837 match f.poll(&mut cx) {
838 Poll::Ready(i) => i,
839 Poll::Pending => panic!("future was pending"),
840 }
841 }
842
assert_not_ready<F: Future>(f: Pin<&mut F>)843 fn assert_not_ready<F: Future>(f: Pin<&mut F>) {
844 let mut cx = Context::from_waker(Waker::noop());
845 match f.poll(&mut cx) {
846 Poll::Ready(_) => panic!("future is ready"),
847 Poll::Pending => {}
848 }
849 }
850
main()851 fn main() {
852 unreachable!();
853 }
854