1 //! For a high-level overview of this fuzz target see `fuzz_async.rs`
2 
3 #![expect(missing_docs, reason = "macro-generated code")]
4 
5 use arbitrary::{Arbitrary, Unstructured};
6 use indexmap::{IndexMap, IndexSet};
7 
8 wasmtime::component::bindgen!({
9     world: "fuzz-async",
10     imports: {
11         "wasmtime-fuzz:fuzz/types.get-commands": store,
12     },
13     exports: { default: async | store },
14 });
15 
16 use wasmtime_fuzz::fuzz::types::{
17     Command, FuturePayload, StreamReadPayload, StreamReadyPayload, StreamWritePayload,
18 };
19 
20 const SOFT_MAX_COMMANDS: usize = 100;
21 const MAX_STREAM_COUNT: u32 = 10;
22 
23 /// Structure used for the "component async" fuzzer.
24 ///
25 /// This encapsulates a list of commands for the fuzzer to run. Note that the
26 /// commands are not 100% arbitrary but instead they're generated similar to
27 /// wasm instructions where only some sequences of instructions are valid. The
28 /// rest of this module is dedicated to the generation of these commands.
29 #[derive(Debug)]
30 pub struct ComponentAsync {
31     /// A sequence of commands to run, tagged with a scope that they're run
32     /// within.
33     pub commands: Vec<(Scope, Command)>,
34 }
35 
36 /// The possible "scopes" that async commands run within.
37 #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
38 pub enum Scope {
39     /// The outermost layer of the host, which controls invocations of the
40     /// guests.
41     HostCaller,
42 
43     /// The first layer of the guest, or the raw exports from the root of the
44     /// component.
45     ///
46     /// This imports functions from the `GuestCallee`.
47     GuestCaller,
48 
49     /// The second layer of the guest which imports the host functions directly.
50     ///
51     /// This is then in turn imported by the `GuestCaller`.
52     GuestCallee,
53 
54     /// The innermost layer of the host which provides imported functions to the
55     /// `GuestCallee`.
56     HostCallee,
57 }
58 
59 impl Scope {
60     const ALL: &[Scope; 4] = &[
61         Scope::HostCaller,
62         Scope::GuestCaller,
63         Scope::GuestCallee,
64         Scope::HostCallee,
65     ];
66     const CALLERS: &[Scope; 3] = &[Scope::HostCaller, Scope::GuestCaller, Scope::GuestCallee];
67 
callee(&self) -> Option<Scope>68     fn callee(&self) -> Option<Scope> {
69         match self {
70             Scope::HostCaller => Some(Scope::GuestCaller),
71             Scope::GuestCaller => Some(Scope::GuestCallee),
72             Scope::GuestCallee => Some(Scope::HostCallee),
73             Scope::HostCallee => None,
74         }
75     }
76 
caller(&self) -> Option<Scope>77     fn caller(&self) -> Option<Scope> {
78         match self {
79             Scope::HostCaller => None,
80             Scope::GuestCaller => Some(Scope::HostCaller),
81             Scope::GuestCallee => Some(Scope::GuestCaller),
82             Scope::HostCallee => Some(Scope::GuestCallee),
83         }
84     }
85 
is_host(&self) -> bool86     fn is_host(&self) -> bool {
87         match self {
88             Scope::HostCaller | Scope::HostCallee => true,
89             Scope::GuestCaller | Scope::GuestCallee => false,
90         }
91     }
92 }
93 
94 impl Arbitrary<'_> for ComponentAsync {
arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self>95     fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
96         let mut state = State::default();
97         let mut ret = Vec::new();
98 
99         // While there's more unstructured data, and our list of commands isn't
100         // too long, generate some new commands per-component.
101         while !u.is_empty() && ret.len() < SOFT_MAX_COMMANDS {
102             state.generate(u, false, &mut ret)?;
103         }
104 
105         // Optionally, if specified, finish up all async operations.
106         if u.arbitrary()? {
107             while !state.is_empty() {
108                 state.generate(u, true, &mut ret)?;
109             }
110         }
111 
112         Ok(ComponentAsync { commands: ret })
113     }
114 }
115 
116 #[derive(Default)]
117 struct State {
118     next_id: u32,
119 
120     /// List of scopes that have an active and pending call to the
121     /// `async-pending` function.
122     async_pending: Vec<(Scope, u32)>,
123 
124     /// Deferred work that can happen at any time, for example asserting the
125     /// result of some previous operation.
126     deferred: Vec<(Scope, Command)>,
127 
128     /// State associated with futures/streams and their handles within.
129     futures: HandleStates<(), u32>,
130     streams: HandleStates<StreamRead, StreamWrite>,
131 }
132 
133 #[derive(Default)]
134 struct HandleStates<R, W> {
135     readers: HalfStates<R>,
136     writers: HalfStates<W>,
137 }
138 
139 impl<R, W> HandleStates<R, W> {
is_empty(&self) -> bool140     fn is_empty(&self) -> bool {
141         self.readers.is_empty() && self.writers.is_empty()
142     }
143 }
144 
145 /// State management for "half" of a future/stream read/write pair.
146 ///
147 /// This tracks all the various states of all handles in the system to be able
148 /// to select amongst them an arbitrary operation to perform. This structure's
149 /// sets are primarily manipulated through helper methods to ensure that the set
150 /// metadata all stays in sync.
151 #[derive(Default)]
152 struct HalfStates<T> {
153     /// All known handles of this type, where they're located, etc.
154     handles: IndexMap<u32, (Scope, HalfState, Transferable)>,
155 
156     /// All handles which can currently be dropped. Handles can't be dropped if
157     /// they're in use, for example.
158     droppable: IndexSet<u32>,
159 
160     /// All handles which can be read/written from (depending on handle type).
161     /// Handles where both pairs are in the same component can't be
162     /// read/written to for example.
163     ready: IndexSet<u32>,
164 
165     /// All handles which can be transferred somewhere else.
166     ///
167     /// Some examples of non-transferable handles are:
168     ///
169     /// * writers
170     /// * handles with an outstanding read
171     /// * host-based handles that have been used at least once (FIXME #12090)
172     transferable: IndexSet<u32>,
173 
174     /// Handles currently being read/written to.
175     ///
176     /// Also includes state about the operation, such as whether it's been
177     /// dropped on the other side.
178     in_use: IndexMap<u32, (T, OpState)>,
179 
180     /// Handles with a pending operation which can be cancelled.
181     cancellable: IndexSet<u32>,
182 }
183 
184 enum HalfState {
185     Idle,
186     InUse,
187 }
188 
189 #[derive(Copy, Clone, PartialEq, Debug)]
190 enum Transferable {
191     Yes,
192     No,
193 }
194 
195 #[derive(Copy, Clone, PartialEq, Debug)]
196 enum Cancellable {
197     Yes,
198     No,
199 }
200 
201 #[derive(Copy, Clone, PartialEq, Debug)]
202 enum OpState {
203     Pending,
204     Dropped,
205 }
206 
207 #[derive(Default, Copy, Clone)]
208 struct StreamRead {
209     count: u32,
210 }
211 
212 #[derive(Default, Copy, Clone)]
213 struct StreamWrite {
214     item: u32,
215     count: u32,
216 }
217 
218 impl<T> HalfStates<T> {
is_empty(&self) -> bool219     fn is_empty(&self) -> bool {
220         self.handles.is_empty()
221     }
222 
223     /// Adds a new handle `id` to this set.
insert(&mut self, id: u32, scope: Scope, transferable: Transferable)224     fn insert(&mut self, id: u32, scope: Scope, transferable: Transferable) {
225         let prev = self
226             .handles
227             .insert(id, (scope, HalfState::Idle, transferable));
228         assert!(prev.is_none());
229         assert!(self.droppable.insert(id));
230         if transferable == Transferable::Yes {
231             self.transferable.insert(id);
232         }
233     }
234 
235     /// Removes the handle `id` for closing.
remove(&mut self, id: u32) -> Scope236     fn remove(&mut self, id: u32) -> Scope {
237         let (scope, state, transferable) = self.handles.swap_remove(&id).unwrap();
238         assert!(matches!(state, HalfState::Idle));
239         self.droppable.swap_remove(&id);
240         self.ready.swap_remove(&id);
241         if transferable == Transferable::Yes {
242             assert!(self.transferable.swap_remove(&id));
243         }
244         scope
245     }
246 
247     /// Locks `id` in whatever scope it's currently in for the rest of its
248     /// lifetime, preventing its transfer. This is used as a workaround for
249     /// #12090.
lock_in_place(&mut self, id: u32)250     fn lock_in_place(&mut self, id: u32) {
251         let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap();
252         assert!(matches!(state, HalfState::Idle));
253         if matches!(transferable, Transferable::Yes) {
254             assert!(self.transferable.swap_remove(&id));
255             *transferable = Transferable::No;
256         }
257     }
258 
259     /// Starts an operation on the handle `id`.
start(&mut self, id: u32, cancellable: Cancellable, payload: T)260     fn start(&mut self, id: u32, cancellable: Cancellable, payload: T) {
261         let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap();
262         assert!(matches!(state, HalfState::Idle));
263         assert!(self.ready.swap_remove(&id));
264         self.droppable.swap_remove(&id);
265         *state = HalfState::InUse;
266         let prev = self.in_use.insert(id, (payload, OpState::Pending));
267         assert!(prev.is_none());
268         if *transferable == Transferable::Yes {
269             assert!(self.transferable.swap_remove(&id));
270         }
271         if cancellable == Cancellable::Yes {
272             assert!(self.cancellable.insert(id));
273         }
274     }
275 
276     /// Completes an operation on `id`, returning the state it was started with
277     /// along with whether it was dropped.
stop(&mut self, id: u32) -> (T, OpState)278     fn stop(&mut self, id: u32) -> (T, OpState) {
279         let (_scope, state, transferable) = self.handles.get_mut(&id).unwrap();
280         assert!(matches!(state, HalfState::InUse));
281         *state = HalfState::Idle;
282         let dropped = self.in_use.swap_remove(&id).unwrap();
283         self.cancellable.swap_remove(&id);
284         if *transferable == Transferable::Yes {
285             assert!(self.transferable.insert(id));
286         }
287         assert!(self.droppable.insert(id));
288         if dropped.1 != OpState::Dropped {
289             assert!(self.ready.insert(id));
290         } else {
291             self.lock_in_place(id);
292         }
293         dropped
294     }
295 
296     /// Updates to `OpState::Dropped` for an operation-in-progress.
set_in_use_state_dropped(&mut self, id: u32)297     fn set_in_use_state_dropped(&mut self, id: u32) {
298         let (_, prev) = self.in_use.get_mut(&id).unwrap();
299         assert_eq!(*prev, OpState::Pending);
300         *prev = OpState::Dropped;
301 
302         // This operation is now "cancellable" meaning that at any point in the
303         // future it can be resolved since the other end was dropped.
304         self.cancellable.insert(id);
305     }
306 }
307 
308 impl State {
is_empty(&self) -> bool309     fn is_empty(&self) -> bool {
310         let State {
311             next_id: _,
312             async_pending,
313             deferred,
314             futures,
315             streams,
316         } = self;
317         async_pending.is_empty() && deferred.is_empty() && futures.is_empty() && streams.is_empty()
318     }
319 
generate( &mut self, u: &mut Unstructured<'_>, finish: bool, commands: &mut Vec<(Scope, Command)>, ) -> arbitrary::Result<()>320     fn generate(
321         &mut self,
322         u: &mut Unstructured<'_>,
323         finish: bool,
324         commands: &mut Vec<(Scope, Command)>,
325     ) -> arbitrary::Result<()> {
326         let mut choices = Vec::new();
327 
328         // If we're not finishing up then have the possibility of
329         // immediately-ready sync/async calls and such sort of miscellaneous
330         // work.
331         if !finish {
332             choices.push(Choice::SyncReadyCall);
333             choices.push(Choice::AsyncReadyCall);
334             choices.push(Choice::FutureNew);
335             choices.push(Choice::StreamNew);
336         }
337 
338         // If we're not finishing up, and if we don't have too much pending
339         // work, then possibly make some more pending work.
340         if !finish && self.async_pending.len() < 20 {
341             choices.push(Choice::AsyncPendingCall);
342         }
343 
344         // If there's pending work, possibly resolve something.
345         if self.async_pending.len() > 0 {
346             choices.push(Choice::AsyncPendingResolve);
347         }
348 
349         // If something has been deferred to later, possibly add that command
350         // into the stream.
351         if self.deferred.len() > 0 {
352             choices.push(Choice::Deferred);
353         }
354 
355         // Wrap up work with futures by dropping handles, writing, cancelling,
356         // etc.
357         if self.futures.readers.droppable.len() > 0 {
358             choices.push(Choice::FutureDropReadable);
359         }
360         if self.futures.writers.droppable.len() > 0 {
361             choices.push(Choice::FutureDropWritable);
362         }
363         if self.futures.writers.cancellable.len() > 0 {
364             choices.push(Choice::FutureCancelWrite);
365         }
366         if self.futures.readers.cancellable.len() > 0 {
367             choices.push(Choice::FutureCancelRead);
368         }
369         // If more work is allowed kick of reads/transfers.
370         if !finish {
371             if self.futures.writers.ready.len() > 0 {
372                 choices.push(Choice::FutureWrite);
373             }
374             if self.futures.readers.ready.len() > 0 {
375                 choices.push(Choice::FutureRead);
376             }
377             if self.futures.readers.transferable.len() > 0 {
378                 choices.push(Choice::FutureReaderTransfer);
379             }
380         }
381 
382         // Streams can be dropped at any time and their pending operations can
383         // be ceased at any time.
384         if self.streams.readers.droppable.len() > 0 {
385             choices.push(Choice::StreamDropReadable);
386         }
387         if self.streams.writers.droppable.len() > 0 {
388             choices.push(Choice::StreamDropWritable);
389         }
390         if self.streams.readers.cancellable.len() > 0 {
391             choices.push(Choice::StreamEndRead);
392         }
393         if self.streams.writers.cancellable.len() > 0 {
394             choices.push(Choice::StreamEndWrite);
395         }
396         // If more work is allowed then streams can be moved around and new
397         // reads/writes may be started.
398         if !finish {
399             if self.streams.readers.transferable.len() > 0 {
400                 choices.push(Choice::StreamReaderTransfer);
401             }
402             if self.streams.readers.ready.len() > 0 {
403                 choices.push(Choice::StreamRead);
404             }
405             if self.streams.writers.ready.len() > 0 {
406                 choices.push(Choice::StreamWrite);
407             }
408         }
409 
410         #[derive(Debug)]
411         enum Choice {
412             SyncReadyCall,
413             AsyncReadyCall,
414             AsyncPendingCall,
415             AsyncPendingResolve,
416             Deferred,
417 
418             FutureNew,
419             FutureReaderTransfer,
420             FutureRead,
421             FutureWrite,
422             FutureCancelRead,
423             FutureCancelWrite,
424             FutureDropReadable,
425             FutureDropWritable,
426 
427             StreamNew,
428             StreamReaderTransfer,
429             StreamDropReadable,
430             StreamDropWritable,
431             StreamRead,
432             StreamWrite,
433             StreamEndRead,
434             StreamEndWrite,
435         }
436 
437         match u.choose(&choices)? {
438             Choice::SyncReadyCall => {
439                 let caller = *u.choose(Scope::CALLERS)?;
440                 commands.push((caller, Command::SyncReadyCall));
441             }
442             Choice::AsyncReadyCall => {
443                 let caller = *u.choose(Scope::CALLERS)?;
444                 commands.push((caller, Command::AsyncReadyCall));
445             }
446 
447             Choice::AsyncPendingCall => {
448                 let caller = *u.choose(Scope::CALLERS)?;
449                 let id = self.next_id();
450                 self.async_pending.push((caller, id));
451                 commands.push((caller, Command::AsyncPendingImportCall(id)));
452             }
453 
454             Choice::AsyncPendingResolve => {
455                 let index = u.int_in_range(0..=self.async_pending.len() - 1)?;
456                 let (caller, id) = self.async_pending.swap_remove(index);
457                 let callee = caller.callee().unwrap();
458 
459                 // FIXME(#11833) the host can't cancel calls at this time, so
460                 // they can only be completed. Everything else though is
461                 // guest-initiated which means that the call can be either
462                 // completed or cancelled.
463                 let complete = caller == Scope::HostCaller || u.arbitrary()?;
464 
465                 if complete {
466                     commands.push((callee, Command::AsyncPendingExportComplete(id)));
467                     self.deferred
468                         .push((caller, Command::AsyncPendingImportAssertReady(id)));
469                 } else {
470                     commands.push((caller, Command::AsyncPendingImportCancel(id)));
471                     self.deferred
472                         .push((callee, Command::AsyncPendingExportAssertCancelled(id)));
473                 }
474             }
475 
476             Choice::Deferred => {
477                 let index = u.int_in_range(0..=self.deferred.len() - 1)?;
478                 let (scope, cmd) = self.deferred.swap_remove(index);
479                 commands.push((scope, cmd));
480             }
481 
482             Choice::FutureNew => {
483                 let scope = *u.choose(Scope::ALL)?;
484                 let id = self.next_id();
485                 commands.push((scope, Command::FutureNew(id)));
486                 self.futures.readers.insert(id, scope, Transferable::Yes);
487                 self.futures.writers.insert(id, scope, Transferable::No);
488 
489                 // Future writers cannot be dropped without writing.
490                 assert!(self.futures.writers.droppable.swap_remove(&id));
491             }
492             Choice::FutureReaderTransfer => {
493                 let set = &mut self.futures.readers.transferable;
494                 let i = u.int_in_range(0..=set.len() - 1)?;
495                 let id = *set.get_index(i).unwrap();
496                 let scope = &mut self.futures.readers.handles[&id].0;
497 
498                 enum Action {
499                     CallerTake(Scope),
500                     GiveCallee(Scope),
501                 }
502 
503                 let action = match (scope.caller(), scope.callee()) {
504                     (Some(caller), None) => Action::CallerTake(caller),
505                     (None, Some(callee)) => Action::GiveCallee(callee),
506                     (Some(caller), Some(callee)) => {
507                         if u.arbitrary()? {
508                             Action::CallerTake(caller)
509                         } else {
510                             Action::GiveCallee(callee)
511                         }
512                     }
513                     (None, None) => unreachable!(),
514                 };
515                 match action {
516                     Action::CallerTake(caller) => {
517                         commands.push((caller, Command::FutureTake(id)));
518                         *scope = caller;
519                     }
520                     Action::GiveCallee(callee) => {
521                         commands.push((*scope, Command::FutureGive(id)));
522                         *scope = callee;
523                     }
524                 }
525 
526                 // See what scope the reader/writer half are in. Allow
527                 // operations if they're in different scopes, but disallow
528                 // operations if they're in the same scope.
529                 let reader_scope = Some(*scope);
530                 let writer_scope = self.futures.writers.handles.get(&id).map(|p| p.0);
531                 if reader_scope == writer_scope {
532                     self.futures.readers.ready.swap_remove(&id);
533                     self.futures.writers.ready.swap_remove(&id);
534                 } else {
535                     self.futures.readers.ready.insert(id);
536                     if writer_scope.is_some() && !self.futures.writers.in_use.contains_key(&id) {
537                         self.futures.writers.ready.insert(id);
538                     }
539                 }
540             }
541             Choice::FutureRead => {
542                 let set = &self.futures.readers.ready;
543                 let i = u.int_in_range(0..=set.len() - 1)?;
544                 let id = *set.get_index(i).unwrap();
545                 let scope = self.futures.readers.handles[&id].0;
546 
547                 if let Some((item, _)) = self.futures.writers.in_use.get(&id) {
548                     // If the future has an active write, then this should
549                     // complete with that write. The write is then resolved and
550                     // the future reader/writer are both gone.
551                     let item = *item;
552                     commands.push((
553                         scope,
554                         Command::FutureReadReady(FuturePayload { future: id, item }),
555                     ));
556                     let write_scope = self.futures.writers.handles[&id].0;
557                     commands.push((write_scope, Command::FutureWriteAssertComplete(id)));
558 
559                     self.futures.writers.stop(id);
560                     self.futures.readers.remove(id);
561                     self.futures.writers.remove(id);
562                 } else {
563                     // If the write-end is idle, then this should be a pending
564                     // future read.
565                     //
566                     // FIXME(#12090) host reads cannot be cancelled
567                     let cancellable = if scope.is_host() {
568                         Cancellable::No
569                     } else {
570                         Cancellable::Yes
571                     };
572                     self.futures.readers.start(id, cancellable, ());
573                     commands.push((scope, Command::FutureReadPending(id)));
574                 }
575             }
576             Choice::FutureWrite => {
577                 let set = &self.futures.writers.ready;
578                 let i = u.int_in_range(0..=set.len() - 1)?;
579                 let id = *set.get_index(i).unwrap();
580                 let scope = self.futures.writers.handles[&id].0;
581                 let item = self.next_id();
582                 let payload = FuturePayload { future: id, item };
583 
584                 if !self.futures.readers.handles.contains_key(&id) {
585                     // If the reader is gone then this write should complete
586                     // immediately with "dropped" and furthermore the writer
587                     // should now be removed.
588                     commands.push((scope, Command::FutureWriteDropped(id)));
589                     self.futures.writers.remove(id);
590                 } else if self.futures.readers.in_use.contains_key(&id) {
591                     // If the reader is in-progress then this should complete
592                     // the read/write pair. The reader/writer are both removed
593                     // as a result.
594                     commands.push((scope, Command::FutureWriteReady(payload)));
595                     let read_scope = self.futures.readers.handles[&id].0;
596                     commands.push((read_scope, Command::FutureReadAssertComplete(payload)));
597                     self.futures.readers.stop(id);
598                     self.futures.readers.remove(id);
599                     self.futures.writers.remove(id);
600                 } else {
601                     // If the read-end is idle, then this should be a pending
602                     // future read.
603                     self.futures.writers.start(id, Cancellable::Yes, item);
604                     commands.push((scope, Command::FutureWritePending(payload)));
605                 }
606             }
607             Choice::FutureCancelWrite => {
608                 let set = &self.futures.writers.cancellable;
609                 let i = u.int_in_range(0..=set.len() - 1)?;
610                 let id = *set.get_index(i).unwrap();
611                 let scope = self.futures.writers.handles[&id].0;
612 
613                 let (_write, state) = self.futures.writers.stop(id);
614                 match state {
615                     OpState::Pending => {
616                         commands.push((scope, Command::FutureCancelWrite(id)));
617                         assert!(self.futures.writers.droppable.swap_remove(&id));
618                     }
619                     OpState::Dropped => {
620                         commands.push((scope, Command::FutureWriteAssertDropped(id)));
621                         self.futures.writers.remove(id);
622                     }
623                 }
624             }
625             Choice::FutureCancelRead => {
626                 let set = &self.futures.readers.cancellable;
627                 let i = u.int_in_range(0..=set.len() - 1)?;
628                 let id = *set.get_index(i).unwrap();
629                 let scope = self.futures.readers.handles[&id].0;
630 
631                 let (_read, state) = self.futures.readers.stop(id);
632                 match state {
633                     OpState::Pending => {
634                         commands.push((scope, Command::FutureCancelRead(id)));
635                     }
636                     // Writers cannot be dropped with futures, so this is not
637                     // reachable.
638                     OpState::Dropped => unreachable!(),
639                 }
640             }
641             Choice::FutureDropReadable => {
642                 let set = &self.futures.readers.droppable;
643                 let i = u.int_in_range(0..=set.len() - 1)?;
644                 let id = *set.get_index(i).unwrap();
645                 let scope = self.futures.readers.remove(id);
646                 commands.push((scope, Command::FutureDropReadable(id)));
647 
648                 // If the writer is active then its write is now destined to
649                 // finish with "dropped", and otherwise the writer is also now
650                 // droppable since the reader handle is gone.
651                 if self.futures.writers.in_use.contains_key(&id) {
652                     self.futures.writers.set_in_use_state_dropped(id);
653                 } else {
654                     assert!(self.futures.writers.droppable.insert(id));
655                 }
656             }
657             Choice::FutureDropWritable => {
658                 let set = &self.futures.writers.droppable;
659                 let i = u.int_in_range(0..=set.len() - 1)?;
660                 let id = *set.get_index(i).unwrap();
661                 let scope = self.futures.writers.remove(id);
662 
663                 // Writers can't actually be dropped prior to writing so fake
664                 // a write by writing a value and asserting that the result is
665                 // "dropped".
666                 commands.push((scope, Command::FutureWriteDropped(id)));
667 
668                 assert!(!self.futures.readers.handles.contains_key(&id));
669             }
670 
671             Choice::StreamNew => {
672                 let scope = *u.choose(Scope::ALL)?;
673                 let id = self.next_id();
674                 commands.push((scope, Command::StreamNew(id)));
675                 self.streams.readers.insert(id, scope, Transferable::Yes);
676                 self.streams.writers.insert(id, scope, Transferable::No);
677             }
678             Choice::StreamReaderTransfer => {
679                 let set = &mut self.streams.readers.transferable;
680                 let i = u.int_in_range(0..=set.len() - 1)?;
681                 let id = *set.get_index(i).unwrap();
682                 let scope = &mut self.streams.readers.handles[&id].0;
683 
684                 enum Action {
685                     CallerTake(Scope),
686                     GiveCallee(Scope),
687                 }
688 
689                 let action = match (scope.caller(), scope.callee()) {
690                     (Some(caller), None) => Action::CallerTake(caller),
691                     (None, Some(callee)) => Action::GiveCallee(callee),
692                     (Some(caller), Some(callee)) => {
693                         if u.arbitrary()? {
694                             Action::CallerTake(caller)
695                         } else {
696                             Action::GiveCallee(callee)
697                         }
698                     }
699                     (None, None) => unreachable!(),
700                 };
701                 match action {
702                     Action::CallerTake(caller) => {
703                         commands.push((caller, Command::StreamTake(id)));
704                         *scope = caller;
705                     }
706                     Action::GiveCallee(callee) => {
707                         commands.push((*scope, Command::StreamGive(id)));
708                         *scope = callee;
709                     }
710                 }
711 
712                 // See what scope the reader/writer half are in. Allow
713                 // operations if they're in different scopes, but disallow
714                 // operations if they're in the same scope.
715                 //
716                 // Note that host<->host reads/writes for streams aren't fuzzed
717                 // at this time so that's also explicitly disallowed.
718                 let reader_scope = Some(*scope);
719                 let writer_scope = self.streams.writers.handles.get(&id).map(|p| p.0);
720                 if reader_scope == writer_scope
721                     || reader_scope.is_some_and(|s| s.is_host())
722                         == writer_scope.is_some_and(|s| s.is_host())
723                 {
724                     self.streams.readers.ready.swap_remove(&id);
725                     self.streams.writers.ready.swap_remove(&id);
726                 } else {
727                     self.streams.readers.ready.insert(id);
728                     if writer_scope.is_some() && !self.streams.writers.in_use.contains_key(&id) {
729                         self.streams.writers.ready.insert(id);
730                     }
731                 }
732             }
733             Choice::StreamDropReadable => {
734                 let set = &self.streams.readers.droppable;
735                 let i = u.int_in_range(0..=set.len() - 1)?;
736                 let id = *set.get_index(i).unwrap();
737                 let scope = self.streams.readers.remove(id);
738                 commands.push((scope, Command::StreamDropReadable(id)));
739 
740                 if self.streams.writers.in_use.contains_key(&id) {
741                     self.streams.writers.set_in_use_state_dropped(id);
742                 }
743             }
744             Choice::StreamDropWritable => {
745                 let set = &self.streams.writers.droppable;
746                 let i = u.int_in_range(0..=set.len() - 1)?;
747                 let id = *set.get_index(i).unwrap();
748                 let scope = self.streams.writers.remove(id);
749                 commands.push((scope, Command::StreamDropWritable(id)));
750 
751                 if self.streams.readers.in_use.contains_key(&id) {
752                     self.streams.readers.set_in_use_state_dropped(id);
753                 }
754             }
755             Choice::StreamRead => {
756                 let set = &self.streams.readers.ready;
757                 let i = u.int_in_range(0..=set.len() - 1)?;
758                 let id = *set.get_index(i).unwrap();
759                 let scope = self.streams.readers.handles[&id].0;
760                 let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;
761 
762                 // FIXME(#12090)
763                 if scope.is_host() {
764                     self.streams.readers.lock_in_place(id);
765                 }
766 
767                 if !self.streams.writers.handles.contains_key(&id) {
768                     // If the write handle is dropped, then this should
769                     // immediately report as such.
770                     commands.push((
771                         scope,
772                         Command::StreamReadDropped(StreamReadPayload { stream: id, count }),
773                     ));
774                     // Can't read from this stream again, so it's not ready,
775                     // and then we also can't lift/lower it any more so it's
776                     // locked in place.
777                     assert!(self.streams.readers.ready.swap_remove(&id));
778                     self.streams.readers.lock_in_place(id);
779                 } else if self.streams.writers.in_use.contains_key(&id) {
780                     // If the write handle is active then this read should
781                     // complete immediately.
782                     let write_count = self.streams.writers.in_use[&id].0.count;
783                     let write_scope = self.streams.writers.handles[&id].0;
784                     let min = count.min(write_count);
785 
786                     match (count, write_count) {
787                         // Two zero-length operations rendezvousing will leave
788                         // the reader blocked but the writer should wake up. A
789                         // nonzero-length read and a 0-length write performs
790                         // the same way too.
791                         (0, 0) | (1.., 0) => {
792                             self.streams
793                                 .readers
794                                 .start(id, Cancellable::Yes, StreamRead { count });
795                             commands.push((
796                                 scope,
797                                 Command::StreamReadPending(StreamReadPayload { stream: id, count }),
798                             ));
799                             self.streams.writers.stop(id);
800                             commands.push((
801                                 write_scope,
802                                 Command::StreamWriteAssertComplete(StreamReadPayload {
803                                     stream: id,
804                                     count: min,
805                                 }),
806                             ));
807                         }
808 
809                         // A zero-length read with a nonzero-length-write
810                         // should wake up just the reader and do nothing to the
811                         // writer.
812                         (0, 1..) => {
813                             commands.push((
814                                 scope,
815                                 Command::StreamReadReady(StreamReadyPayload {
816                                     stream: id,
817                                     item: 0,
818                                     ready_count: min,
819                                     op_count: count,
820                                 }),
821                             ));
822                         }
823 
824                         // With two nonzero lengths both operations should complete.
825                         (1.., 1..) => {
826                             let (write, _) = self.streams.writers.stop(id);
827                             commands.push((
828                                 scope,
829                                 Command::StreamReadReady(StreamReadyPayload {
830                                     stream: id,
831                                     item: write.item,
832                                     ready_count: min,
833                                     op_count: count,
834                                 }),
835                             ));
836                             commands.push((
837                                 write_scope,
838                                 Command::StreamWriteAssertComplete(StreamReadPayload {
839                                     stream: id,
840                                     count: min,
841                                 }),
842                             ));
843                         }
844                     }
845                 } else {
846                     // If the write handle is not active then this should be in
847                     // a pending state now.
848                     self.streams
849                         .readers
850                         .start(id, Cancellable::Yes, StreamRead { count });
851                     commands.push((
852                         scope,
853                         Command::StreamReadPending(StreamReadPayload { stream: id, count }),
854                     ));
855                 }
856             }
857             Choice::StreamWrite => {
858                 let set = &self.streams.writers.ready;
859                 let i = u.int_in_range(0..=set.len() - 1)?;
860                 let id = *set.get_index(i).unwrap();
861                 let scope = self.streams.writers.handles[&id].0;
862                 let item = self.next_id();
863                 let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;
864 
865                 // FIXME(#12090)
866                 if scope.is_host() {
867                     self.streams.writers.lock_in_place(id);
868                 }
869 
870                 if !self.streams.readers.handles.contains_key(&id) {
871                     // If the read handle is dropped, then this should
872                     // immediately report as such.
873                     commands.push((
874                         scope,
875                         Command::StreamWriteDropped(StreamWritePayload {
876                             stream: id,
877                             item,
878                             count,
879                         }),
880                     ));
881                     // Cannot write ever again to this handle so remove it from
882                     // the writable set.
883                     assert!(self.streams.writers.ready.swap_remove(&id));
884                 } else if self.streams.readers.in_use.contains_key(&id) {
885                     // If the read handle is active then this write should
886                     // complete immediately.
887                     let read_count = self.streams.readers.in_use[&id].0.count;
888                     let read_scope = self.streams.readers.handles[&id].0;
889                     let min = count.min(read_count);
890 
891                     match (read_count, count) {
892                         // A zero-length write, no matter what the read half is
893                         // pending as, is always ready and doesn't affect the
894                         // reader.
895                         (_, 0) => {
896                             commands.push((
897                                 scope,
898                                 Command::StreamWriteReady(StreamReadyPayload {
899                                     stream: id,
900                                     item,
901                                     op_count: count,
902                                     ready_count: min,
903                                 }),
904                             ));
905                         }
906 
907                         // With a zero-length read and a nonzero-length write
908                         // the writer is blocked but the reader is unblocked.
909                         (0, 1..) => {
910                             self.streams.writers.start(
911                                 id,
912                                 Cancellable::Yes,
913                                 StreamWrite { item, count },
914                             );
915                             commands.push((
916                                 scope,
917                                 Command::StreamWritePending(StreamWritePayload {
918                                     stream: id,
919                                     item,
920                                     count,
921                                 }),
922                             ));
923                             self.streams.readers.stop(id);
924                             commands.push((
925                                 read_scope,
926                                 Command::StreamReadAssertComplete(StreamWritePayload {
927                                     stream: id,
928                                     item,
929                                     count: min,
930                                 }),
931                             ));
932                         }
933 
934                         // Nonzero sizes means that the write immediately
935                         // finishes and the read is also now ready to complete.
936                         (1.., 1..) => {
937                             commands.push((
938                                 scope,
939                                 Command::StreamWriteReady(StreamReadyPayload {
940                                     stream: id,
941                                     item,
942                                     op_count: count,
943                                     ready_count: min,
944                                 }),
945                             ));
946                             self.streams.readers.stop(id);
947                             commands.push((
948                                 read_scope,
949                                 Command::StreamReadAssertComplete(StreamWritePayload {
950                                     stream: id,
951                                     item,
952                                     count: min,
953                                 }),
954                             ));
955                         }
956                     }
957                 } else {
958                     // If the read handle is not active then this should be in
959                     // a pending state now.
960                     self.streams
961                         .writers
962                         .start(id, Cancellable::Yes, StreamWrite { item, count });
963                     commands.push((
964                         scope,
965                         Command::StreamWritePending(StreamWritePayload {
966                             stream: id,
967                             item,
968                             count,
969                         }),
970                     ));
971                 }
972             }
973             Choice::StreamEndRead => {
974                 let set = &self.streams.readers.cancellable;
975                 let i = u.int_in_range(0..=set.len() - 1)?;
976                 let id = *set.get_index(i).unwrap();
977                 let scope = self.streams.readers.handles[&id].0;
978 
979                 let (_read, state) = self.streams.readers.stop(id);
980                 match state {
981                     OpState::Pending => {
982                         commands.push((scope, Command::StreamCancelRead(id)));
983                     }
984                     OpState::Dropped => {
985                         commands.push((scope, Command::StreamReadAssertDropped(id)));
986                     }
987                 }
988             }
989             Choice::StreamEndWrite => {
990                 let set = &self.streams.writers.cancellable;
991                 let i = u.int_in_range(0..=set.len() - 1)?;
992                 let id = *set.get_index(i).unwrap();
993                 let scope = self.streams.writers.handles[&id].0;
994 
995                 let (_write, state) = self.streams.writers.stop(id);
996                 match state {
997                     OpState::Pending => {
998                         commands.push((scope, Command::StreamCancelWrite(id)));
999                     }
1000                     OpState::Dropped => {
1001                         commands.push((
1002                             scope,
1003                             Command::StreamWriteAssertDropped(StreamReadPayload {
1004                                 stream: id,
1005                                 count: 0,
1006                             }),
1007                         ));
1008                     }
1009                 }
1010             }
1011         }
1012         Ok(())
1013     }
1014 
next_id(&mut self) -> u321015     fn next_id(&mut self) -> u32 {
1016         let id = self.next_id;
1017         self.next_id += 1;
1018         id
1019     }
1020 }
1021