1 //! For a high-level overview of this fuzz target see `fuzz_async.rs` 2 3 #![expect(missing_docs, reason = "macro-generated code")] 4 5 use arbitrary::{Arbitrary, Unstructured}; 6 use indexmap::{IndexMap, IndexSet}; 7 8 wasmtime::component::bindgen!({ 9 world: "fuzz-async", 10 imports: { 11 "wasmtime-fuzz:fuzz/types.get-commands": store, 12 }, 13 exports: { default: async | store }, 14 }); 15 16 use wasmtime_fuzz::fuzz::types::{ 17 Command, FuturePayload, StreamReadPayload, StreamReadyPayload, StreamWritePayload, 18 }; 19 20 const SOFT_MAX_COMMANDS: usize = 100; 21 const MAX_STREAM_COUNT: u32 = 10; 22 23 /// Structure used for the "component async" fuzzer. 24 /// 25 /// This encapsulates a list of commands for the fuzzer to run. Note that the 26 /// commands are not 100% arbitrary but instead they're generated similar to 27 /// wasm instructions where only some sequences of instructions are valid. The 28 /// rest of this module is dedicated to the generation of these commands. 29 #[derive(Debug)] 30 pub struct ComponentAsync { 31 /// A sequence of commands to run, tagged with a scope that they're run 32 /// within. 33 pub commands: Vec<(Scope, Command)>, 34 } 35 36 /// The possible "scopes" that async commands run within. 37 #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] 38 pub enum Scope { 39 /// The outermost layer of the host, which controls invocations of the 40 /// guests. 41 HostCaller, 42 43 /// The first layer of the guest, or the raw exports from the root of the 44 /// component. 45 /// 46 /// This imports functions from the `GuestCallee`. 47 GuestCaller, 48 49 /// The second layer of the guest which imports the host functions directly. 50 /// 51 /// This is then in turn imported by the `GuestCaller`. 52 GuestCallee, 53 54 /// The innermost layer of the host which provides imported functions to the 55 /// `GuestCallee`. 56 HostCallee, 57 } 58 59 impl Scope { 60 const ALL: &[Scope; 4] = &[ 61 Scope::HostCaller, 62 Scope::GuestCaller, 63 Scope::GuestCallee, 64 Scope::HostCallee, 65 ]; 66 const CALLERS: &[Scope; 3] = &[Scope::HostCaller, Scope::GuestCaller, Scope::GuestCallee]; 67 68 fn callee(&self) -> Option<Scope> { 69 match self { 70 Scope::HostCaller => Some(Scope::GuestCaller), 71 Scope::GuestCaller => Some(Scope::GuestCallee), 72 Scope::GuestCallee => Some(Scope::HostCallee), 73 Scope::HostCallee => None, 74 } 75 } 76 77 fn caller(&self) -> Option<Scope> { 78 match self { 79 Scope::HostCaller => None, 80 Scope::GuestCaller => Some(Scope::HostCaller), 81 Scope::GuestCallee => Some(Scope::GuestCaller), 82 Scope::HostCallee => Some(Scope::GuestCallee), 83 } 84 } 85 86 fn is_host(&self) -> bool { 87 match self { 88 Scope::HostCaller | Scope::HostCallee => true, 89 Scope::GuestCaller | Scope::GuestCallee => false, 90 } 91 } 92 } 93 94 impl Arbitrary<'_> for ComponentAsync { 95 fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> { 96 let mut state = State::default(); 97 let mut ret = Vec::new(); 98 99 // While there's more unstructured data, and our list of commands isn't 100 // too long, generate some new commands per-component. 101 while !u.is_empty() && ret.len() < SOFT_MAX_COMMANDS { 102 state.generate(u, false, &mut ret)?; 103 } 104 105 // Optionally, if specified, finish up all async operations. 106 if u.arbitrary()? { 107 while !state.is_empty() { 108 state.generate(u, true, &mut ret)?; 109 } 110 } 111 112 Ok(ComponentAsync { commands: ret }) 113 } 114 } 115 116 #[derive(Default)] 117 struct State { 118 next_id: u32, 119 120 /// List of scopes that have an active and pending call to the 121 /// `async-pending` function. 122 async_pending: Vec<(Scope, u32)>, 123 124 /// Deferred work that can happen at any time, for example asserting the 125 /// result of some previous operation. 126 deferred: Vec<(Scope, Command)>, 127 128 /// State associated with futures/streams and their handles within. 129 futures: HandleStates<(), u32>, 130 streams: HandleStates<StreamRead, StreamWrite>, 131 } 132 133 #[derive(Default)] 134 struct HandleStates<R, W> { 135 readers: HalfStates<R>, 136 writers: HalfStates<W>, 137 } 138 139 impl<R, W> HandleStates<R, W> { 140 fn is_empty(&self) -> bool { 141 self.readers.is_empty() && self.writers.is_empty() 142 } 143 } 144 145 /// State management for "half" of a future/stream read/write pair. 146 /// 147 /// This tracks all the various states of all handles in the system to be able 148 /// to select amongst them an arbitrary operation to perform. This structure's 149 /// sets are primarily manipulated through helper methods to ensure that the set 150 /// metadata all stays in sync. 151 #[derive(Default)] 152 struct HalfStates<T> { 153 /// All known handles of this type, where they're located, etc. 154 handles: IndexMap<u32, (Scope, HalfState, Transferrable)>, 155 156 /// All handles which can currently be dropped. Handles can't be dropped if 157 /// they're in use, for example. 158 droppable: IndexSet<u32>, 159 160 /// All handles which can be read/written from (depending on handle type). 161 /// Handles where both pairs are in the same component can't be 162 /// read/written to for example. 163 ready: IndexSet<u32>, 164 165 /// All handles which can be transferred somewhere else. 166 /// 167 /// Some examples of non-transferrable handles are: 168 /// 169 /// * writers 170 /// * handles with an outstanding read 171 /// * host-based handles that have been used at least once (FIXME #12090) 172 transferrable: IndexSet<u32>, 173 174 /// Handles currently being read/written to. 175 /// 176 /// Also includes state about the operation, such as whether it's been 177 /// dropped on the other side. 178 in_use: IndexMap<u32, (T, OpState)>, 179 180 /// Handles with a pending operation which can be cancelled. 181 cancellable: IndexSet<u32>, 182 } 183 184 enum HalfState { 185 Idle, 186 InUse, 187 } 188 189 #[derive(Copy, Clone, PartialEq, Debug)] 190 enum Transferrable { 191 Yes, 192 No, 193 } 194 195 #[derive(Copy, Clone, PartialEq, Debug)] 196 enum Cancellable { 197 Yes, 198 No, 199 } 200 201 #[derive(Copy, Clone, PartialEq, Debug)] 202 enum OpState { 203 Pending, 204 Dropped, 205 } 206 207 #[derive(Default, Copy, Clone)] 208 struct StreamRead { 209 count: u32, 210 } 211 212 #[derive(Default, Copy, Clone)] 213 struct StreamWrite { 214 item: u32, 215 count: u32, 216 } 217 218 impl<T> HalfStates<T> { 219 fn is_empty(&self) -> bool { 220 self.handles.is_empty() 221 } 222 223 /// Adds a new handle `id` to this set. 224 fn insert(&mut self, id: u32, scope: Scope, transferrable: Transferrable) { 225 let prev = self 226 .handles 227 .insert(id, (scope, HalfState::Idle, transferrable)); 228 assert!(prev.is_none()); 229 assert!(self.droppable.insert(id)); 230 if transferrable == Transferrable::Yes { 231 self.transferrable.insert(id); 232 } 233 } 234 235 /// Removes the handle `id` for closing. 236 fn remove(&mut self, id: u32) -> Scope { 237 let (scope, state, transferrable) = self.handles.swap_remove(&id).unwrap(); 238 assert!(matches!(state, HalfState::Idle)); 239 self.droppable.swap_remove(&id); 240 self.ready.swap_remove(&id); 241 if transferrable == Transferrable::Yes { 242 assert!(self.transferrable.swap_remove(&id)); 243 } 244 scope 245 } 246 247 /// Locks `id` in whatever scope it's currently in for the rest of its 248 /// lifetime, preventing its transfer. This is used as a workaround for 249 /// #12090. 250 fn lock_in_place(&mut self, id: u32) { 251 let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap(); 252 assert!(matches!(state, HalfState::Idle)); 253 if matches!(transferrable, Transferrable::Yes) { 254 assert!(self.transferrable.swap_remove(&id)); 255 *transferrable = Transferrable::No; 256 } 257 } 258 259 /// Starts an operation on the handle `id`. 260 fn start(&mut self, id: u32, cancellable: Cancellable, payload: T) { 261 let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap(); 262 assert!(matches!(state, HalfState::Idle)); 263 assert!(self.ready.swap_remove(&id)); 264 self.droppable.swap_remove(&id); 265 *state = HalfState::InUse; 266 let prev = self.in_use.insert(id, (payload, OpState::Pending)); 267 assert!(prev.is_none()); 268 if *transferrable == Transferrable::Yes { 269 assert!(self.transferrable.swap_remove(&id)); 270 } 271 if cancellable == Cancellable::Yes { 272 assert!(self.cancellable.insert(id)); 273 } 274 } 275 276 /// Completes an operation on `id`, returning the state it was started with 277 /// along with whether it was dropped. 278 fn stop(&mut self, id: u32) -> (T, OpState) { 279 let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap(); 280 assert!(matches!(state, HalfState::InUse)); 281 *state = HalfState::Idle; 282 let dropped = self.in_use.swap_remove(&id).unwrap(); 283 self.cancellable.swap_remove(&id); 284 if *transferrable == Transferrable::Yes { 285 assert!(self.transferrable.insert(id)); 286 } 287 assert!(self.droppable.insert(id)); 288 if dropped.1 != OpState::Dropped { 289 assert!(self.ready.insert(id)); 290 } else { 291 self.lock_in_place(id); 292 } 293 dropped 294 } 295 296 /// Updates to `OpState::Dropped` for an operation-in-progress. 297 fn set_in_use_state_dropped(&mut self, id: u32) { 298 let (_, prev) = self.in_use.get_mut(&id).unwrap(); 299 assert_eq!(*prev, OpState::Pending); 300 *prev = OpState::Dropped; 301 302 // This operation is now "cancellable" meaning that at any point in the 303 // future it can be resolved since the other end was dropped. 304 self.cancellable.insert(id); 305 } 306 } 307 308 impl State { 309 fn is_empty(&self) -> bool { 310 let State { 311 next_id: _, 312 async_pending, 313 deferred, 314 futures, 315 streams, 316 } = self; 317 async_pending.is_empty() && deferred.is_empty() && futures.is_empty() && streams.is_empty() 318 } 319 320 fn generate( 321 &mut self, 322 u: &mut Unstructured<'_>, 323 finish: bool, 324 commands: &mut Vec<(Scope, Command)>, 325 ) -> arbitrary::Result<()> { 326 let mut choices = Vec::new(); 327 328 // If we're not finishing up then have the possibility of 329 // immediately-ready sync/async calls and such sort of miscellaneous 330 // work. 331 if !finish { 332 choices.push(Choice::SyncReadyCall); 333 choices.push(Choice::AsyncReadyCall); 334 choices.push(Choice::FutureNew); 335 choices.push(Choice::StreamNew); 336 } 337 338 // If we're not finishing up, and if we don't have too much pending 339 // work, then possibly make some more pending work. 340 if !finish && self.async_pending.len() < 20 { 341 choices.push(Choice::AsyncPendingCall); 342 } 343 344 // If there's pending work, possibly resolve something. 345 if self.async_pending.len() > 0 { 346 choices.push(Choice::AsyncPendingResolve); 347 } 348 349 // If something has been deferred to later, possibly add that command 350 // into the stream. 351 if self.deferred.len() > 0 { 352 choices.push(Choice::Deferred); 353 } 354 355 // Wrap up work with futures by dropping handles, writing, cancelling, 356 // etc. 357 if self.futures.readers.droppable.len() > 0 { 358 choices.push(Choice::FutureDropReadable); 359 } 360 if self.futures.writers.droppable.len() > 0 { 361 choices.push(Choice::FutureDropWritable); 362 } 363 if self.futures.writers.cancellable.len() > 0 { 364 choices.push(Choice::FutureCancelWrite); 365 } 366 if self.futures.readers.cancellable.len() > 0 { 367 choices.push(Choice::FutureCancelRead); 368 } 369 // If more work is allowed kick of reads/transfers. 370 if !finish { 371 if self.futures.writers.ready.len() > 0 { 372 choices.push(Choice::FutureWrite); 373 } 374 if self.futures.readers.ready.len() > 0 { 375 choices.push(Choice::FutureRead); 376 } 377 if self.futures.readers.transferrable.len() > 0 { 378 choices.push(Choice::FutureReaderTransfer); 379 } 380 } 381 382 // Streams can be dropped at any time and their pending operations can 383 // be ceased at any time. 384 if self.streams.readers.droppable.len() > 0 { 385 choices.push(Choice::StreamDropReadable); 386 } 387 if self.streams.writers.droppable.len() > 0 { 388 choices.push(Choice::StreamDropWritable); 389 } 390 if self.streams.readers.cancellable.len() > 0 { 391 choices.push(Choice::StreamEndRead); 392 } 393 if self.streams.writers.cancellable.len() > 0 { 394 choices.push(Choice::StreamEndWrite); 395 } 396 // If more work is allowed then streams can be moved around and new 397 // reads/writes may be started. 398 if !finish { 399 if self.streams.readers.transferrable.len() > 0 { 400 choices.push(Choice::StreamReaderTransfer); 401 } 402 if self.streams.readers.ready.len() > 0 { 403 choices.push(Choice::StreamRead); 404 } 405 if self.streams.writers.ready.len() > 0 { 406 choices.push(Choice::StreamWrite); 407 } 408 } 409 410 #[derive(Debug)] 411 enum Choice { 412 SyncReadyCall, 413 AsyncReadyCall, 414 AsyncPendingCall, 415 AsyncPendingResolve, 416 Deferred, 417 418 FutureNew, 419 FutureReaderTransfer, 420 FutureRead, 421 FutureWrite, 422 FutureCancelRead, 423 FutureCancelWrite, 424 FutureDropReadable, 425 FutureDropWritable, 426 427 StreamNew, 428 StreamReaderTransfer, 429 StreamDropReadable, 430 StreamDropWritable, 431 StreamRead, 432 StreamWrite, 433 StreamEndRead, 434 StreamEndWrite, 435 } 436 437 match u.choose(&choices)? { 438 Choice::SyncReadyCall => { 439 let caller = *u.choose(Scope::CALLERS)?; 440 commands.push((caller, Command::SyncReadyCall)); 441 } 442 Choice::AsyncReadyCall => { 443 let caller = *u.choose(Scope::CALLERS)?; 444 commands.push((caller, Command::AsyncReadyCall)); 445 } 446 447 Choice::AsyncPendingCall => { 448 let caller = *u.choose(Scope::CALLERS)?; 449 let id = self.next_id(); 450 self.async_pending.push((caller, id)); 451 commands.push((caller, Command::AsyncPendingImportCall(id))); 452 } 453 454 Choice::AsyncPendingResolve => { 455 let index = u.int_in_range(0..=self.async_pending.len() - 1)?; 456 let (caller, id) = self.async_pending.swap_remove(index); 457 let callee = caller.callee().unwrap(); 458 459 // FIXME(#11833) the host can't cancel calls at this time, so 460 // they can only be completed. Everything else though is 461 // guest-initiated which means that the call can be either 462 // completed or cancelled. 463 let complete = caller == Scope::HostCaller || u.arbitrary()?; 464 465 if complete { 466 commands.push((callee, Command::AsyncPendingExportComplete(id))); 467 self.deferred 468 .push((caller, Command::AsyncPendingImportAssertReady(id))); 469 } else { 470 commands.push((caller, Command::AsyncPendingImportCancel(id))); 471 self.deferred 472 .push((callee, Command::AsyncPendingExportAssertCancelled(id))); 473 } 474 } 475 476 Choice::Deferred => { 477 let index = u.int_in_range(0..=self.deferred.len() - 1)?; 478 let (scope, cmd) = self.deferred.swap_remove(index); 479 commands.push((scope, cmd)); 480 } 481 482 Choice::FutureNew => { 483 let scope = *u.choose(Scope::ALL)?; 484 let id = self.next_id(); 485 commands.push((scope, Command::FutureNew(id))); 486 self.futures.readers.insert(id, scope, Transferrable::Yes); 487 self.futures.writers.insert(id, scope, Transferrable::No); 488 489 // Future writers cannot be dropped without writing. 490 assert!(self.futures.writers.droppable.swap_remove(&id)); 491 } 492 Choice::FutureReaderTransfer => { 493 let set = &mut self.futures.readers.transferrable; 494 let i = u.int_in_range(0..=set.len() - 1)?; 495 let id = *set.get_index(i).unwrap(); 496 let scope = &mut self.futures.readers.handles[&id].0; 497 498 enum Action { 499 CallerTake(Scope), 500 GiveCallee(Scope), 501 } 502 503 let action = match (scope.caller(), scope.callee()) { 504 (Some(caller), None) => Action::CallerTake(caller), 505 (None, Some(callee)) => Action::GiveCallee(callee), 506 (Some(caller), Some(callee)) => { 507 if u.arbitrary()? { 508 Action::CallerTake(caller) 509 } else { 510 Action::GiveCallee(callee) 511 } 512 } 513 (None, None) => unreachable!(), 514 }; 515 match action { 516 Action::CallerTake(caller) => { 517 commands.push((caller, Command::FutureTake(id))); 518 *scope = caller; 519 } 520 Action::GiveCallee(callee) => { 521 commands.push((*scope, Command::FutureGive(id))); 522 *scope = callee; 523 } 524 } 525 526 // See what scope the reader/writer half are in. Allow 527 // operations if they're in different scopes, but disallow 528 // operations if they're in the same scope. 529 let reader_scope = Some(*scope); 530 let writer_scope = self.futures.writers.handles.get(&id).map(|p| p.0); 531 if reader_scope == writer_scope { 532 self.futures.readers.ready.swap_remove(&id); 533 self.futures.writers.ready.swap_remove(&id); 534 } else { 535 self.futures.readers.ready.insert(id); 536 if writer_scope.is_some() && !self.futures.writers.in_use.contains_key(&id) { 537 self.futures.writers.ready.insert(id); 538 } 539 } 540 } 541 Choice::FutureRead => { 542 let set = &self.futures.readers.ready; 543 let i = u.int_in_range(0..=set.len() - 1)?; 544 let id = *set.get_index(i).unwrap(); 545 let scope = self.futures.readers.handles[&id].0; 546 547 if let Some((item, _)) = self.futures.writers.in_use.get(&id) { 548 // If the future has an active write, then this should 549 // complete with that write. The write is then resolved and 550 // the future reader/writer are both gone. 551 let item = *item; 552 commands.push(( 553 scope, 554 Command::FutureReadReady(FuturePayload { future: id, item }), 555 )); 556 let write_scope = self.futures.writers.handles[&id].0; 557 commands.push((write_scope, Command::FutureWriteAssertComplete(id))); 558 559 self.futures.writers.stop(id); 560 self.futures.readers.remove(id); 561 self.futures.writers.remove(id); 562 } else { 563 // If the write-end is idle, then this should be a pending 564 // future read. 565 // 566 // FIXME(#12090) host reads cannot be cancelled 567 let cancellable = if scope.is_host() { 568 Cancellable::No 569 } else { 570 Cancellable::Yes 571 }; 572 self.futures.readers.start(id, cancellable, ()); 573 commands.push((scope, Command::FutureReadPending(id))); 574 } 575 } 576 Choice::FutureWrite => { 577 let set = &self.futures.writers.ready; 578 let i = u.int_in_range(0..=set.len() - 1)?; 579 let id = *set.get_index(i).unwrap(); 580 let scope = self.futures.writers.handles[&id].0; 581 let item = self.next_id(); 582 let payload = FuturePayload { future: id, item }; 583 584 if !self.futures.readers.handles.contains_key(&id) { 585 // If the reader is gone then this write should complete 586 // immediately with "dropped" and furthermore the writer 587 // should now be removed. 588 commands.push((scope, Command::FutureWriteDropped(id))); 589 self.futures.writers.remove(id); 590 } else if self.futures.readers.in_use.contains_key(&id) { 591 // If the reader is in-progress then this should complete 592 // the read/write pair. The reader/writer are both removed 593 // as a result. 594 commands.push((scope, Command::FutureWriteReady(payload))); 595 let read_scope = self.futures.readers.handles[&id].0; 596 commands.push((read_scope, Command::FutureReadAssertComplete(payload))); 597 self.futures.readers.stop(id); 598 self.futures.readers.remove(id); 599 self.futures.writers.remove(id); 600 } else { 601 // If the read-end is idle, then this should be a pending 602 // future read. 603 self.futures.writers.start(id, Cancellable::Yes, item); 604 commands.push((scope, Command::FutureWritePending(payload))); 605 } 606 } 607 Choice::FutureCancelWrite => { 608 let set = &self.futures.writers.cancellable; 609 let i = u.int_in_range(0..=set.len() - 1)?; 610 let id = *set.get_index(i).unwrap(); 611 let scope = self.futures.writers.handles[&id].0; 612 613 let (_write, state) = self.futures.writers.stop(id); 614 match state { 615 OpState::Pending => { 616 commands.push((scope, Command::FutureCancelWrite(id))); 617 assert!(self.futures.writers.droppable.swap_remove(&id)); 618 } 619 OpState::Dropped => { 620 commands.push((scope, Command::FutureWriteAssertDropped(id))); 621 self.futures.writers.remove(id); 622 } 623 } 624 } 625 Choice::FutureCancelRead => { 626 let set = &self.futures.readers.cancellable; 627 let i = u.int_in_range(0..=set.len() - 1)?; 628 let id = *set.get_index(i).unwrap(); 629 let scope = self.futures.readers.handles[&id].0; 630 631 let (_read, state) = self.futures.readers.stop(id); 632 match state { 633 OpState::Pending => { 634 commands.push((scope, Command::FutureCancelRead(id))); 635 } 636 // Writers cannot be dropped with futures, so this is not 637 // reachable. 638 OpState::Dropped => unreachable!(), 639 } 640 } 641 Choice::FutureDropReadable => { 642 let set = &self.futures.readers.droppable; 643 let i = u.int_in_range(0..=set.len() - 1)?; 644 let id = *set.get_index(i).unwrap(); 645 let scope = self.futures.readers.remove(id); 646 commands.push((scope, Command::FutureDropReadable(id))); 647 648 // If the writer is active then its write is now destined to 649 // finish with "dropped", and otherwise the writer is also now 650 // droppable since the reader handle is gone. 651 if self.futures.writers.in_use.contains_key(&id) { 652 self.futures.writers.set_in_use_state_dropped(id); 653 } else { 654 assert!(self.futures.writers.droppable.insert(id)); 655 } 656 } 657 Choice::FutureDropWritable => { 658 let set = &self.futures.writers.droppable; 659 let i = u.int_in_range(0..=set.len() - 1)?; 660 let id = *set.get_index(i).unwrap(); 661 let scope = self.futures.writers.remove(id); 662 663 // Writers can't actually be dropped prior to writing so fake 664 // a write by writing a value and asserting that the result is 665 // "dropped". 666 commands.push((scope, Command::FutureWriteDropped(id))); 667 668 assert!(!self.futures.readers.handles.contains_key(&id)); 669 } 670 671 Choice::StreamNew => { 672 let scope = *u.choose(Scope::ALL)?; 673 let id = self.next_id(); 674 commands.push((scope, Command::StreamNew(id))); 675 self.streams.readers.insert(id, scope, Transferrable::Yes); 676 self.streams.writers.insert(id, scope, Transferrable::No); 677 } 678 Choice::StreamReaderTransfer => { 679 let set = &mut self.streams.readers.transferrable; 680 let i = u.int_in_range(0..=set.len() - 1)?; 681 let id = *set.get_index(i).unwrap(); 682 let scope = &mut self.streams.readers.handles[&id].0; 683 684 enum Action { 685 CallerTake(Scope), 686 GiveCallee(Scope), 687 } 688 689 let action = match (scope.caller(), scope.callee()) { 690 (Some(caller), None) => Action::CallerTake(caller), 691 (None, Some(callee)) => Action::GiveCallee(callee), 692 (Some(caller), Some(callee)) => { 693 if u.arbitrary()? { 694 Action::CallerTake(caller) 695 } else { 696 Action::GiveCallee(callee) 697 } 698 } 699 (None, None) => unreachable!(), 700 }; 701 match action { 702 Action::CallerTake(caller) => { 703 commands.push((caller, Command::StreamTake(id))); 704 *scope = caller; 705 } 706 Action::GiveCallee(callee) => { 707 commands.push((*scope, Command::StreamGive(id))); 708 *scope = callee; 709 } 710 } 711 712 // See what scope the reader/writer half are in. Allow 713 // operations if they're in different scopes, but disallow 714 // operations if they're in the same scope. 715 // 716 // Note that host<->host reads/writes for streams aren't fuzzed 717 // at this time so that's also explicitly disallowed. 718 let reader_scope = Some(*scope); 719 let writer_scope = self.streams.writers.handles.get(&id).map(|p| p.0); 720 if reader_scope == writer_scope 721 || reader_scope.is_some_and(|s| s.is_host()) 722 == writer_scope.is_some_and(|s| s.is_host()) 723 { 724 self.streams.readers.ready.swap_remove(&id); 725 self.streams.writers.ready.swap_remove(&id); 726 } else { 727 self.streams.readers.ready.insert(id); 728 if writer_scope.is_some() && !self.streams.writers.in_use.contains_key(&id) { 729 self.streams.writers.ready.insert(id); 730 } 731 } 732 } 733 Choice::StreamDropReadable => { 734 let set = &self.streams.readers.droppable; 735 let i = u.int_in_range(0..=set.len() - 1)?; 736 let id = *set.get_index(i).unwrap(); 737 let scope = self.streams.readers.remove(id); 738 commands.push((scope, Command::StreamDropReadable(id))); 739 740 if self.streams.writers.in_use.contains_key(&id) { 741 self.streams.writers.set_in_use_state_dropped(id); 742 } 743 } 744 Choice::StreamDropWritable => { 745 let set = &self.streams.writers.droppable; 746 let i = u.int_in_range(0..=set.len() - 1)?; 747 let id = *set.get_index(i).unwrap(); 748 let scope = self.streams.writers.remove(id); 749 commands.push((scope, Command::StreamDropWritable(id))); 750 751 if self.streams.readers.in_use.contains_key(&id) { 752 self.streams.readers.set_in_use_state_dropped(id); 753 } 754 } 755 Choice::StreamRead => { 756 let set = &self.streams.readers.ready; 757 let i = u.int_in_range(0..=set.len() - 1)?; 758 let id = *set.get_index(i).unwrap(); 759 let scope = self.streams.readers.handles[&id].0; 760 let count = u.int_in_range(0..=MAX_STREAM_COUNT)?; 761 762 // FIXME(#12090) 763 if scope.is_host() { 764 self.streams.readers.lock_in_place(id); 765 } 766 767 if !self.streams.writers.handles.contains_key(&id) { 768 // If the write handle is dropped, then this should 769 // immediately report as such. 770 commands.push(( 771 scope, 772 Command::StreamReadDropped(StreamReadPayload { stream: id, count }), 773 )); 774 // Can't read from this stream again, so it's not ready, 775 // and then we also can't lift/lower it any more so it's 776 // locked in place. 777 assert!(self.streams.readers.ready.swap_remove(&id)); 778 self.streams.readers.lock_in_place(id); 779 } else if self.streams.writers.in_use.contains_key(&id) { 780 // If the write handle is active then this read should 781 // complete immediately. 782 let write_count = self.streams.writers.in_use[&id].0.count; 783 let write_scope = self.streams.writers.handles[&id].0; 784 let min = count.min(write_count); 785 786 match (count, write_count) { 787 // Two zero-length operations rendezvousing will leave 788 // the reader blocked but the writer should wake up. A 789 // nonzero-length read and a 0-length write performs 790 // the same way too. 791 (0, 0) | (1.., 0) => { 792 self.streams 793 .readers 794 .start(id, Cancellable::Yes, StreamRead { count }); 795 commands.push(( 796 scope, 797 Command::StreamReadPending(StreamReadPayload { stream: id, count }), 798 )); 799 self.streams.writers.stop(id); 800 commands.push(( 801 write_scope, 802 Command::StreamWriteAssertComplete(StreamReadPayload { 803 stream: id, 804 count: min, 805 }), 806 )); 807 } 808 809 // A zero-length read with a nonzero-length-write 810 // should wake up just the reader and do nothing to the 811 // writer. 812 (0, 1..) => { 813 commands.push(( 814 scope, 815 Command::StreamReadReady(StreamReadyPayload { 816 stream: id, 817 item: 0, 818 ready_count: min, 819 op_count: count, 820 }), 821 )); 822 } 823 824 // With two nonzero lengths both operations should complete. 825 (1.., 1..) => { 826 let (write, _) = self.streams.writers.stop(id); 827 commands.push(( 828 scope, 829 Command::StreamReadReady(StreamReadyPayload { 830 stream: id, 831 item: write.item, 832 ready_count: min, 833 op_count: count, 834 }), 835 )); 836 commands.push(( 837 write_scope, 838 Command::StreamWriteAssertComplete(StreamReadPayload { 839 stream: id, 840 count: min, 841 }), 842 )); 843 } 844 } 845 } else { 846 // If the write handle is not active then this should be in 847 // a pending state now. 848 self.streams 849 .readers 850 .start(id, Cancellable::Yes, StreamRead { count }); 851 commands.push(( 852 scope, 853 Command::StreamReadPending(StreamReadPayload { stream: id, count }), 854 )); 855 } 856 } 857 Choice::StreamWrite => { 858 let set = &self.streams.writers.ready; 859 let i = u.int_in_range(0..=set.len() - 1)?; 860 let id = *set.get_index(i).unwrap(); 861 let scope = self.streams.writers.handles[&id].0; 862 let item = self.next_id(); 863 let count = u.int_in_range(0..=MAX_STREAM_COUNT)?; 864 865 // FIXME(#12090) 866 if scope.is_host() { 867 self.streams.writers.lock_in_place(id); 868 } 869 870 if !self.streams.readers.handles.contains_key(&id) { 871 // If the read handle is dropped, then this should 872 // immediately report as such. 873 commands.push(( 874 scope, 875 Command::StreamWriteDropped(StreamWritePayload { 876 stream: id, 877 item, 878 count, 879 }), 880 )); 881 // Cannot write ever again to this handle so remove it from 882 // the writable set. 883 assert!(self.streams.writers.ready.swap_remove(&id)); 884 } else if self.streams.readers.in_use.contains_key(&id) { 885 // If the read handle is active then this write should 886 // complete immediately. 887 let read_count = self.streams.readers.in_use[&id].0.count; 888 let read_scope = self.streams.readers.handles[&id].0; 889 let min = count.min(read_count); 890 891 match (read_count, count) { 892 // A zero-length write, no matter what the read half is 893 // pending as, is always ready and doesn't affect the 894 // reader. 895 (_, 0) => { 896 commands.push(( 897 scope, 898 Command::StreamWriteReady(StreamReadyPayload { 899 stream: id, 900 item, 901 op_count: count, 902 ready_count: min, 903 }), 904 )); 905 } 906 907 // With a zero-length read and a nonzero-length write 908 // the writer is blocked but the reader is unblocked. 909 (0, 1..) => { 910 self.streams.writers.start( 911 id, 912 Cancellable::Yes, 913 StreamWrite { item, count }, 914 ); 915 commands.push(( 916 scope, 917 Command::StreamWritePending(StreamWritePayload { 918 stream: id, 919 item, 920 count, 921 }), 922 )); 923 self.streams.readers.stop(id); 924 commands.push(( 925 read_scope, 926 Command::StreamReadAssertComplete(StreamWritePayload { 927 stream: id, 928 item, 929 count: min, 930 }), 931 )); 932 } 933 934 // Nonzero sizes means that the write immediately 935 // finishes and the read is also now ready to complete. 936 (1.., 1..) => { 937 commands.push(( 938 scope, 939 Command::StreamWriteReady(StreamReadyPayload { 940 stream: id, 941 item, 942 op_count: count, 943 ready_count: min, 944 }), 945 )); 946 self.streams.readers.stop(id); 947 commands.push(( 948 read_scope, 949 Command::StreamReadAssertComplete(StreamWritePayload { 950 stream: id, 951 item, 952 count: min, 953 }), 954 )); 955 } 956 } 957 } else { 958 // If the read handle is not active then this should be in 959 // a pending state now. 960 self.streams 961 .writers 962 .start(id, Cancellable::Yes, StreamWrite { item, count }); 963 commands.push(( 964 scope, 965 Command::StreamWritePending(StreamWritePayload { 966 stream: id, 967 item, 968 count, 969 }), 970 )); 971 } 972 } 973 Choice::StreamEndRead => { 974 let set = &self.streams.readers.cancellable; 975 let i = u.int_in_range(0..=set.len() - 1)?; 976 let id = *set.get_index(i).unwrap(); 977 let scope = self.streams.readers.handles[&id].0; 978 979 let (_read, state) = self.streams.readers.stop(id); 980 match state { 981 OpState::Pending => { 982 commands.push((scope, Command::StreamCancelRead(id))); 983 } 984 OpState::Dropped => { 985 commands.push((scope, Command::StreamReadAssertDropped(id))); 986 } 987 } 988 } 989 Choice::StreamEndWrite => { 990 let set = &self.streams.writers.cancellable; 991 let i = u.int_in_range(0..=set.len() - 1)?; 992 let id = *set.get_index(i).unwrap(); 993 let scope = self.streams.writers.handles[&id].0; 994 995 let (_write, state) = self.streams.writers.stop(id); 996 match state { 997 OpState::Pending => { 998 commands.push((scope, Command::StreamCancelWrite(id))); 999 } 1000 OpState::Dropped => { 1001 commands.push(( 1002 scope, 1003 Command::StreamWriteAssertDropped(StreamReadPayload { 1004 stream: id, 1005 count: 0, 1006 }), 1007 )); 1008 } 1009 } 1010 } 1011 } 1012 Ok(()) 1013 } 1014 1015 fn next_id(&mut self) -> u32 { 1016 let id = self.next_id; 1017 self.next_id += 1; 1018 id 1019 } 1020 } 1021