1 //! Test case used with the `component_async` fuzzer which is part of the `misc` 2 //! fuzz target of Wasmtime. 3 //! 4 //! This test case is a binary that's suited for just that one fuzzer and has an 5 //! associated WIT world that it works with. This test case is composed with 6 //! itself and then run within the host as well. The exact semantics of this 7 //! program and all the exports/imports are defined within the context of the 8 //! fuzzer. 9 //! 10 //! The general idea is that this program creates an "async soup" and make sure 11 //! that everything works as expected, notably also not leading to any panics 12 //! anywhere within the runtime. An example of what this fuzzer intermingles 13 //! are: 14 //! 15 //! * Synchronous calls 16 //! * Async calls that are immediately ready 17 //! * Async calls that are not immediately ready and left pending 18 //! * Creation of futures/streams 19 //! * Moving futures/streams between components 20 //! * Reading/writing futures/streams 21 //! * Cancelling reads/writes of futures/streams 22 //! * Seeing futures/streams get dropped and the effect on active reads/writes 23 //! * Mixing host<->guest, guest<->guest, guest<->host, and host<->host 24 //! calls/primitives 25 //! 26 //! The purpose of this fuzzer is to stress the management of async stacks, the 27 //! async runtime, and in theory suss out various edge cases in the handling of 28 //! async events. This fuzzer does NOT stress lifting/lowering at all because 29 //! there is a static WIT signature that this fuzzer works with. 30 //! 31 //! Much of the code in this file is semi-duplicated in the host except written 32 //! with host `wasmtime` APIs instead of `wit-bindgen` APIs. The overall 33 //! structure is roughly the same. 34 //! 35 //! # Overall architecture 36 //! 37 //! The general structure of this fuzzer is that there's a "host sandwich" which 38 //! looks like: 39 //! 40 //! ```text 41 //! ╔══════╦══════════════════════════════════════════════════════════╗ 42 //! ║ Host ║ ║ 43 //! ╠══════╝ ║ 44 //! ║ ║ 45 //! ║ ┍┯┯┯━━━ wasmtime:fuzz/types ║ 46 //! ║ ││││ ║ 47 //! ║ ││││ ║ 48 //! ║ ││││ ╔════════════════════╦════════════════════╗ ║ 49 //! ║ ││││ ║ component_async.rs ║ ║ ║ 50 //! ║ ││││ ╠════════════════════╝ ║ ║ 51 //! ║ ││││ ║ ║ ║ 52 //! ║ ││││ ║ HostCaller ║ ║ 53 //! ║ ││││ ║ ║ ║ 54 //! ║ │││└────────╫─→ stream<command> ║ ║ 55 //! ║ │││ ╚═══════════════════╤═════════════════════╝ ║ 56 //! ║ │││ │ ║ 57 //! ║ │││ ┝ wasmtime-fuzz:fuzz/async-test ║ 58 //! ║ │││ │ ║ 59 //! ║ │││ ╔═══════════╦════════════╪═════════════════════════════╗ ║ 60 //! ║ │││ ║ Component ║ │ ║ ║ 61 //! ║ │││ ╠═══════════╝ │ ║ ║ 62 //! ║ │││ ║ ↓ ║ ║ 63 //! ║ │││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║ 64 //! ║ │││ ║ ║ fuzz-async.wasm ║ ║ ║ ║ 65 //! ║ │││ ║ ╠═════════════════╝ ║ ║ ║ 66 //! ║ │││ ║ ║ ║ ║ ║ 67 //! ║ │││ ║ ║ GuestCaller ║ ║ ║ 68 //! ║ │││ ║ ║ ║ ║ ║ 69 //! ║ ││└────╫────╫─→ stream<command> ║ ║ ║ 70 //! ║ ││ ║ ╚═══════╤═════════════════════════════════╝ ║ ║ 71 //! ║ ││ ║ │ ║ ║ 72 //! ║ ││ ║ ┝ wasmtime-fuzz:fuzz/async-test ║ ║ 73 //! ║ ││ ║ │ ║ ║ 74 //! ║ ││ ║ ↓ ║ ║ 75 //! ║ ││ ║ ╔═════════════════╦═══════════════════════╗ ║ ║ 76 //! ║ ││ ║ ║ fuzz-async.wasm ║ ║ ║ ║ 77 //! ║ ││ ║ ╠═════════════════╝ ║ ║ ║ 78 //! ║ ││ ║ ║ ║ ║ ║ 79 //! ║ ││ ║ ║ GuestCallee ║ ║ ║ 80 //! ║ ││ ║ ║ ║ ║ ║ 81 //! ║ │└─────╫────╫─→ stream<command> ║ ║ ║ 82 //! ║ │ ║ ╚═══════════════════╤═════════════════════╝ ║ ║ 83 //! ║ │ ║ │ ║ ║ 84 //! ║ │ ║ │ ║ ║ 85 //! ║ │ ╚════════════════════════╪═════════════════════════════╝ ║ 86 //! ║ │ │ ║ 87 //! ║ │ ┝ wasmtime-fuzz:fuzz/async-test ║ 88 //! ║ │ │ ║ 89 //! ║ │ ↓ ║ 90 //! ║ │ ╔════════════════════╦════════════════════╗ ║ 91 //! ║ │ ║ component_async.rs ║ ║ ║ 92 //! ║ │ ╠════════════════════╝ ║ ║ 93 //! ║ │ ║ ║ ║ 94 //! ║ │ ║ HostCallee ║ ║ 95 //! ║ │ ║ ║ ║ 96 //! ║ └───────────╫─→ stream<command> ║ ║ 97 //! ║ ╚═════════════════════════════════════════╝ ║ 98 //! ║ ║ 99 //! ╚═════════════════════════════════════════════════════════════════╝ 100 //! ``` 101 //! 102 //! Here `fuzz-async.wasm` appears twice to model all the various types of 103 //! the host/guest interaction matrix. Everything is driven by a 104 //! `stream<command>` provided to each component part of the system which 105 //! serves as a means of forcing one particular component to take action. 106 //! Commands are then the test case itself where a series of commands are 107 //! executed for each fuzz iteration. 108 //! 109 //! # Yield-loops 110 //! 111 //! This program has a function `test_property` which is a similar analog to the 112 //! one in the host-side as well. The general idea is that while component model 113 //! async is generally deterministic it does not specify what should happen when 114 //! multiple events are ready at the same time. This can pretty easily happen in 115 //! this fuzzer meaning that it's not precise which event happens first. To 116 //! assist in managing this there are two primary mitigations: 117 //! 118 //! * The first is that whenever a command is dispatched to a component it's 119 //! followed up with an "ack" which is a noop. Delivery of the "ack" can't 120 //! happen until the previous command is completely finished being processed 121 //! meaning it's a kludge way of synchronizing the receipt of a message. 122 //! 123 //! * The second is that there can still be small races where an async event 124 //! hasn't quite happened yet but it's queued up to happen. To handle these 125 //! events calls to `test_property` are sprinkled around which has an 126 //! internally-bounded yield-loop. It's expected that while yielding other 127 //! code can run which resolves the property being tested at-hand, and then 128 //! this yield loop will panic if it turns too many times as it's probably a 129 //! bug. 130 //! 131 //! It's a bit of a hack but it's so far the most effective way of handling this 132 //! that's (a) not timing-dependent e.g. adding sleeps, (b) is 133 //! reliable/deterministic, and (c) is flexible where the constant number of 134 //! yields can be bumped without much concern. The number of yields specifically 135 //! is arbitrarily chosen and while it can't be said exactly how many yields 136 //! should be necessary it should be able to say "less than N should always 137 //! work". 138 139 wit_bindgen::generate!("fuzz-async" in "../fuzzing/wit"); 140 141 use crate::exports::wasmtime_fuzz::fuzz::async_test as e; 142 use crate::wasmtime_fuzz::fuzz::async_test as i; 143 use crate::wasmtime_fuzz::fuzz::types::{self, Command, Scope}; 144 use futures::FutureExt; 145 use futures::channel::oneshot; 146 use pin_project_lite::pin_project; 147 use std::collections::{HashMap, HashSet}; 148 use std::mem; 149 use std::pin::{Pin, pin}; 150 use std::sync::Mutex; 151 use std::sync::atomic::{AtomicBool, Ordering}; 152 use std::task::{Context, Poll, Waker}; 153 use wit_bindgen::{FutureReader, FutureWriter, StreamReader, StreamResult, StreamWriter}; 154 155 struct Component; 156 157 export!(Component); 158 159 // Convenience macro to change the "target" of `log::debug!` based on whether 160 // this component is a `caller` or `callee` scope to distinguish logs in the 161 // output. 162 macro_rules! debug { 163 ($($arg:tt)*) => { 164 log::debug!(target: log_target(), $($arg)*); 165 } 166 } 167 168 static IS_CALLER: AtomicBool = AtomicBool::new(false); 169 170 fn log_target() -> &'static str { 171 if IS_CALLER.load(Ordering::Relaxed) { 172 "wasmtime_fuzzing::fuzz_async::caller" 173 } else { 174 "wasmtime_fuzzing::fuzz_async::callee" 175 } 176 } 177 178 impl e::Guest for Component { 179 fn sync_ready() {} 180 181 async fn async_ready() {} 182 183 async fn async_pending(id: u32) { 184 let (tx, rx) = oneshot::channel(); 185 State::with(|s| s.async_pending_exports_ready.insert(id, tx)); 186 let record = RecordCancelOnDrop(id); 187 rx.await.unwrap(); 188 mem::forget(record); 189 debug!("export {id} is complete"); 190 191 struct RecordCancelOnDrop(u32); 192 193 impl Drop for RecordCancelOnDrop { 194 fn drop(&mut self) { 195 debug!("export {} was cancelled", self.0); 196 State::with(|s| { 197 s.async_pending_exports_cancelled.insert(self.0); 198 }); 199 } 200 } 201 } 202 203 async fn init(scope: Scope) { 204 IS_CALLER.store(scope == Scope::Caller, Ordering::Relaxed); 205 env_logger::init(); 206 i::init(Scope::Callee).await; 207 let commands = types::get_commands(scope); 208 wit_bindgen::spawn(run(commands)); 209 } 210 211 fn future_take(id: u32) -> FutureReader<u32> { 212 State::with(|s| s.future_readers.remove(&id).unwrap()) 213 } 214 215 fn future_receive(id: u32, f: FutureReader<u32>) { 216 let prev = State::with(|s| s.future_readers.insert(id, f)); 217 assert!(prev.is_none()); 218 } 219 220 fn stream_take(id: u32) -> StreamReader<u32> { 221 State::with(|s| s.stream_readers.remove(&id).unwrap()) 222 } 223 224 fn stream_receive(id: u32, f: StreamReader<u32>) { 225 let prev = State::with(|s| s.stream_readers.insert(id, f)); 226 assert!(prev.is_none()); 227 } 228 } 229 230 #[derive(Default)] 231 struct State { 232 async_pending_imports_ready: HashSet<u32>, 233 async_pending_imports_in_progress: HashMap<u32, oneshot::Sender<()>>, 234 async_pending_exports_ready: HashMap<u32, oneshot::Sender<()>>, 235 async_pending_exports_cancelled: HashSet<u32>, 236 237 future_readers: HashMap<u32, FutureReader<u32>>, 238 future_writers: HashMap<u32, FutureWriter<u32>>, 239 future_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>, 240 future_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>, 241 future_writes_completed: HashMap<u32, bool>, 242 future_reads_completed: HashMap<u32, u32>, 243 244 stream_readers: HashMap<u32, StreamReader<u32>>, 245 stream_writers: HashMap<u32, StreamWriter<u32>>, 246 stream_write_cancel_signals: HashMap<u32, oneshot::Sender<()>>, 247 stream_read_cancel_signals: HashMap<u32, oneshot::Sender<()>>, 248 stream_writes_completed: HashMap<u32, Result<(usize, Vec<u32>), (usize, Vec<u32>)>>, 249 stream_reads_completed: HashMap<u32, Option<Vec<u32>>>, 250 } 251 252 impl State { 253 pub fn with<R>(f: impl FnOnce(&mut State) -> R) -> R { 254 static STATE: Mutex<Option<State>> = Mutex::new(None); 255 let mut state = STATE.lock().unwrap(); 256 let state = state.get_or_insert_with(|| State::default()); 257 f(state) 258 } 259 260 pub async fn test_property(mut f: impl FnMut(&mut State) -> bool) -> bool { 261 // Test if the property is ready, but it might require a sibling future 262 // task to run, so if it's not true yet then pump the executor a 263 // few times to let it finish. 264 for _ in 0..1000 { 265 if State::with(&mut f) { 266 return true; 267 } 268 wit_bindgen::yield_async().await; 269 } 270 return false; 271 } 272 } 273 274 async fn run(mut commands: StreamReader<Command>) { 275 while let Some(command) = commands.next().await { 276 match command { 277 Command::SyncReadyCall => i::sync_ready(), 278 279 Command::AsyncReadyCall => assert_ready(pin!(i::async_ready())), 280 281 Command::AsyncPendingExportComplete(i) => { 282 assert!( 283 State::test_property(|s| s.async_pending_exports_ready.contains_key(&i)).await, 284 "expected async_pending export {i} should be pending", 285 ); 286 debug!("finishing export {i}"); 287 State::with(|s| { 288 s.async_pending_exports_ready 289 .remove(&i) 290 .unwrap() 291 .send(()) 292 .unwrap(); 293 }); 294 } 295 Command::AsyncPendingExportAssertCancelled(i) => { 296 assert!( 297 State::test_property(|s| s.async_pending_exports_cancelled.remove(&i)).await, 298 "expected async_pending export {i} to be cancelled", 299 ); 300 } 301 Command::AsyncPendingImportCall(i) => { 302 let mut future = Box::pin(i::async_pending(i)); 303 debug!("starting export {i}"); 304 assert_not_ready(future.as_mut()); 305 let (cancel_tx, mut cancel_rx) = oneshot::channel(); 306 State::with(|s| { 307 s.async_pending_imports_in_progress.insert(i, cancel_tx); 308 }); 309 wit_bindgen::spawn(async move { 310 futures::select! { 311 _ = cancel_rx => {} 312 _ = future.fuse() => { 313 State::with(|s| s.async_pending_imports_ready.insert(i)); 314 } 315 } 316 }); 317 } 318 Command::AsyncPendingImportCancel(i) => { 319 debug!("cancelling import {i}"); 320 State::with(|s| { 321 s.async_pending_imports_in_progress 322 .remove(&i) 323 .unwrap() 324 .send(()) 325 .unwrap(); 326 }); 327 } 328 Command::AsyncPendingImportAssertReady(i) => { 329 assert!( 330 State::test_property(|s| s.async_pending_imports_ready.remove(&i)).await, 331 "expected async_pending import {i} to be ready", 332 ); 333 } 334 335 Command::FutureNew(id) => { 336 let (writer, reader) = wit_future::new(|| unreachable!()); 337 State::with(|s| { 338 let prev = s.future_writers.insert(id, writer); 339 assert!(prev.is_none()); 340 let prev = s.future_readers.insert(id, reader); 341 assert!(prev.is_none()); 342 }); 343 } 344 Command::FutureTake(id) => { 345 let reader = i::future_take(id); 346 State::with(|s| { 347 let prev = s.future_readers.insert(id, reader); 348 assert!(prev.is_none()); 349 }); 350 } 351 Command::FutureGive(id) => { 352 let reader = State::with(|s| s.future_readers.remove(&id).unwrap()); 353 i::future_receive(id, reader); 354 } 355 Command::FutureDropReadable(id) => { 356 let _ = State::with(|s| s.future_readers.remove(&id).unwrap()); 357 } 358 Command::FutureWriteReady(payload) => { 359 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap()); 360 assert_ready(pin!(writer.write(payload.item))).unwrap(); 361 } 362 Command::FutureReadReady(payload) => { 363 let reader = State::with(|s| s.future_readers.remove(&payload.future).unwrap()); 364 assert_eq!(assert_ready(pin!(reader.into_future())), payload.item); 365 } 366 Command::FutureWriteDropped(id) => { 367 let writer = State::with(|s| s.future_writers.remove(&id).unwrap()); 368 match assert_ready(pin!(writer.write(0))) { 369 Ok(_) => panic!("should be dropped"), 370 Err(_) => {} 371 } 372 } 373 Command::FutureWritePending(payload) => { 374 use wit_bindgen::FutureWriteCancel; 375 376 let writer = State::with(|s| s.future_writers.remove(&payload.future).unwrap()); 377 let (tx, rx) = oneshot::channel(); 378 let mut future = Box::pin(CancellableFutureWrite { 379 cancel: rx, 380 write: writer.write(payload.item), 381 }); 382 assert_not_ready(future.as_mut()); 383 wit_bindgen::spawn(async move { 384 let result = future.await; 385 debug!("future write {} completed: {result:?}", payload.future); 386 State::with(|s| match result { 387 FutureWriteCancel::AlreadySent => { 388 s.future_writes_completed.insert(payload.future, true); 389 } 390 FutureWriteCancel::Dropped(_) => { 391 s.future_writes_completed.insert(payload.future, false); 392 } 393 FutureWriteCancel::Cancelled(_, writer) => { 394 let prev = s.future_writers.insert(payload.future, writer); 395 assert!(prev.is_none()); 396 } 397 }); 398 }); 399 State::with(|s| { 400 let prev = s.future_write_cancel_signals.insert(payload.future, tx); 401 assert!(prev.is_none()); 402 }); 403 404 pin_project! { 405 struct CancellableFutureWrite { 406 #[pin] 407 cancel: oneshot::Receiver<()>, 408 #[pin] 409 write: wit_bindgen::FutureWrite<u32>, 410 } 411 } 412 413 impl Future for CancellableFutureWrite { 414 type Output = FutureWriteCancel<u32>; 415 416 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 417 let this = self.project(); 418 match this.cancel.poll(cx) { 419 Poll::Ready(_) => return Poll::Ready(this.write.cancel()), 420 Poll::Pending => {} 421 } 422 match this.write.poll(cx) { 423 Poll::Ready(Ok(())) => Poll::Ready(FutureWriteCancel::AlreadySent), 424 Poll::Ready(Err(val)) => { 425 Poll::Ready(FutureWriteCancel::Dropped(val.value)) 426 } 427 Poll::Pending => Poll::Pending, 428 } 429 } 430 } 431 } 432 Command::FutureReadPending(id) => { 433 let reader = State::with(|s| s.future_readers.remove(&id).unwrap()); 434 let (tx, rx) = oneshot::channel(); 435 let mut future = Box::pin(CancellableFutureRead { 436 cancel: rx, 437 read: reader.into_future(), 438 }); 439 assert_not_ready(future.as_mut()); 440 wit_bindgen::spawn(async move { 441 let result = future.await; 442 State::with(|s| match result { 443 Ok(result) => { 444 let prev = s.future_reads_completed.insert(id, result); 445 assert!(prev.is_none()); 446 } 447 Err(reader) => { 448 let prev = s.future_readers.insert(id, reader); 449 assert!(prev.is_none()); 450 } 451 }); 452 }); 453 State::with(|s| { 454 let prev = s.future_read_cancel_signals.insert(id, tx); 455 assert!(prev.is_none()); 456 }); 457 458 pin_project! { 459 struct CancellableFutureRead { 460 #[pin] 461 cancel: oneshot::Receiver<()>, 462 #[pin] 463 read: wit_bindgen::FutureRead<u32>, 464 } 465 } 466 467 impl Future for CancellableFutureRead { 468 type Output = Result<u32, FutureReader<u32>>; 469 470 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 471 let this = self.project(); 472 match this.cancel.poll(cx) { 473 Poll::Ready(_) => return Poll::Ready(this.read.cancel()), 474 Poll::Pending => {} 475 } 476 match this.read.poll(cx) { 477 Poll::Ready(i) => Poll::Ready(Ok(i)), 478 Poll::Pending => Poll::Pending, 479 } 480 } 481 } 482 } 483 Command::FutureCancelWrite(id) => { 484 State::with(|s| { 485 s.future_write_cancel_signals 486 .remove(&id) 487 .unwrap() 488 .send(()) 489 .unwrap(); 490 }); 491 assert!( 492 State::test_property(|s| s.future_writers.contains_key(&id)).await, 493 "expected future write {id} to be cancelled", 494 ); 495 } 496 Command::FutureCancelRead(id) => { 497 State::with(|s| { 498 s.future_read_cancel_signals 499 .remove(&id) 500 .unwrap() 501 .send(()) 502 .unwrap(); 503 }); 504 assert!( 505 State::test_property(|s| s.future_readers.contains_key(&id)).await, 506 "expected future read {id} to be cancelled", 507 ); 508 } 509 Command::FutureWriteAssertComplete(id) => { 510 assert!( 511 State::test_property(|s| match s.future_writes_completed.remove(&id) { 512 Some(true) => true, 513 Some(false) => panic!("future was dropped"), 514 None => false, 515 }) 516 .await, 517 "expected future write {id} to be complete", 518 ); 519 } 520 Command::FutureWriteAssertDropped(id) => { 521 assert!( 522 State::test_property(|s| match s.future_writes_completed.remove(&id) { 523 Some(true) => panic!("future write completed"), 524 Some(false) => true, 525 None => false, 526 }) 527 .await, 528 "expected future write {id} to be complete", 529 ); 530 } 531 Command::FutureReadAssertComplete(payload) => { 532 assert!( 533 State::test_property(|s| { 534 match s.future_reads_completed.remove(&payload.future) { 535 Some(i) => { 536 assert_eq!(i, payload.item); 537 true 538 } 539 None => false, 540 } 541 }) 542 .await, 543 "expected future read {} to be complete", 544 payload.future, 545 ); 546 } 547 548 Command::StreamNew(id) => { 549 let (writer, reader) = wit_stream::new(); 550 State::with(|s| { 551 let prev = s.stream_writers.insert(id, writer); 552 assert!(prev.is_none()); 553 let prev = s.stream_readers.insert(id, reader); 554 assert!(prev.is_none()); 555 }); 556 } 557 Command::StreamTake(id) => { 558 let reader = i::stream_take(id); 559 State::with(|s| { 560 let prev = s.stream_readers.insert(id, reader); 561 assert!(prev.is_none()); 562 }); 563 } 564 Command::StreamGive(id) => { 565 let reader = State::with(|s| s.stream_readers.remove(&id).unwrap()); 566 i::stream_receive(id, reader); 567 } 568 Command::StreamDropReadable(id) => { 569 let _ = State::with(|s| s.stream_readers.remove(&id).unwrap()); 570 } 571 Command::StreamDropWritable(id) => { 572 let _ = State::with(|s| s.stream_writers.remove(&id).unwrap()); 573 } 574 Command::StreamWriteReady(payload) => { 575 State::with(|s| { 576 let writer = s.stream_writers.get_mut(&payload.stream).unwrap(); 577 let (status, buffer) = assert_ready(pin!( 578 writer.write(stream_payload(payload.item, payload.op_count)) 579 )); 580 assert_eq!(status, StreamResult::Complete(payload.ready_count as usize)); 581 assert_eq!( 582 buffer.remaining() as u32, 583 payload.op_count - payload.ready_count 584 ); 585 }); 586 } 587 Command::StreamWriteDropped(payload) => { 588 State::with(|s| { 589 let writer = s.stream_writers.get_mut(&payload.stream).unwrap(); 590 let (status, buffer) = assert_ready(pin!( 591 writer.write(stream_payload(payload.item, payload.count)) 592 )); 593 assert_eq!(status, StreamResult::Dropped); 594 assert_eq!(buffer.remaining() as u32, payload.count); 595 }); 596 } 597 Command::StreamReadReady(payload) => { 598 State::with(|s| { 599 let reader = s.stream_readers.get_mut(&payload.stream).unwrap(); 600 let (status, buffer) = assert_ready(pin!( 601 reader.read(Vec::with_capacity(payload.op_count as usize)) 602 )); 603 assert_eq!(status, StreamResult::Complete(payload.ready_count as usize)); 604 assert_eq!(buffer, stream_payload(payload.item, payload.ready_count)); 605 }); 606 } 607 Command::StreamReadDropped(payload) => { 608 State::with(|s| { 609 let reader = s.stream_readers.get_mut(&payload.stream).unwrap(); 610 let (status, buffer) = assert_ready(pin!( 611 reader.read(Vec::with_capacity(payload.count as usize)) 612 )); 613 assert_eq!(status, StreamResult::Dropped); 614 assert!(buffer.is_empty()); 615 }); 616 } 617 Command::StreamWritePending(payload) => { 618 debug!("write pending: {}", payload.stream); 619 let mut writer = State::with(|s| s.stream_writers.remove(&payload.stream).unwrap()); 620 let (tx, rx) = oneshot::channel(); 621 State::with(|s| { 622 let prev = s.stream_write_cancel_signals.insert(payload.stream, tx); 623 assert!(prev.is_none()); 624 }); 625 let mut future = Box::pin(async move { 626 debug!("write pending start: {}", payload.stream); 627 let (result, remaining) = CancellableStreamWrite { 628 cancel: rx, 629 write: writer.write(stream_payload(payload.item, payload.count)), 630 } 631 .await; 632 debug!("write pending done: {} {result:?}", payload.stream); 633 State::with(|s| { 634 let _ = s.stream_write_cancel_signals.remove(&payload.stream); 635 match result { 636 StreamResult::Complete(n) => { 637 s.stream_writes_completed 638 .insert(payload.stream, Ok((n, remaining))); 639 } 640 StreamResult::Dropped => { 641 s.stream_writes_completed 642 .insert(payload.stream, Err((0, remaining))); 643 } 644 StreamResult::Cancelled => {} 645 } 646 let prev = s.stream_writers.insert(payload.stream, writer); 647 assert!(prev.is_none()); 648 }); 649 }); 650 assert_not_ready(future.as_mut()); 651 wit_bindgen::spawn(future); 652 653 pin_project! { 654 struct CancellableStreamWrite<'a> { 655 #[pin] 656 cancel: oneshot::Receiver<()>, 657 #[pin] 658 write: wit_bindgen::StreamWrite<'a, u32>, 659 } 660 } 661 662 impl Future for CancellableStreamWrite<'_> { 663 type Output = (StreamResult, Vec<u32>); 664 665 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 666 let this = self.project(); 667 let (result, buffer) = match this.cancel.poll(cx) { 668 Poll::Ready(_) => this.write.cancel(), 669 Poll::Pending => match this.write.poll(cx) { 670 Poll::Ready(result) => result, 671 Poll::Pending => return Poll::Pending, 672 }, 673 }; 674 Poll::Ready((result, buffer.into_vec())) 675 } 676 } 677 } 678 Command::StreamReadPending(payload) => { 679 let mut reader = State::with(|s| s.stream_readers.remove(&payload.stream).unwrap()); 680 let (tx, rx) = oneshot::channel(); 681 State::with(|s| { 682 let prev = s.stream_read_cancel_signals.insert(payload.stream, tx); 683 assert!(prev.is_none()); 684 }); 685 let mut future = Box::pin(async move { 686 let (result, buf) = CancellableStreamRead { 687 cancel: rx, 688 read: reader.read(Vec::with_capacity(payload.count as usize)), 689 } 690 .await; 691 State::with(|s| { 692 let _ = s.stream_read_cancel_signals.remove(&payload.stream); 693 match result { 694 StreamResult::Complete(_) => { 695 s.stream_reads_completed.insert(payload.stream, Some(buf)); 696 } 697 StreamResult::Dropped => { 698 assert!(buf.is_empty(), "dropped but got {}", buf.len()); 699 s.stream_reads_completed.insert(payload.stream, None); 700 } 701 StreamResult::Cancelled => {} 702 } 703 let prev = s.stream_readers.insert(payload.stream, reader); 704 assert!(prev.is_none()); 705 }); 706 }); 707 assert_not_ready(future.as_mut()); 708 wit_bindgen::spawn(future); 709 710 pin_project! { 711 struct CancellableStreamRead<'a> { 712 #[pin] 713 cancel: oneshot::Receiver<()>, 714 #[pin] 715 read: wit_bindgen::StreamRead<'a, u32>, 716 } 717 } 718 719 impl Future for CancellableStreamRead<'_> { 720 type Output = (StreamResult, Vec<u32>); 721 722 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 723 let this = self.project(); 724 let (result, buffer) = match this.cancel.poll(cx) { 725 Poll::Ready(_) => this.read.cancel(), 726 Poll::Pending => match this.read.poll(cx) { 727 Poll::Ready(result) => result, 728 Poll::Pending => return Poll::Pending, 729 }, 730 }; 731 Poll::Ready((result, buffer)) 732 } 733 } 734 } 735 Command::StreamCancelWrite(id) => { 736 State::with(|s| { 737 s.stream_write_cancel_signals 738 .remove(&id) 739 .unwrap() 740 .send(()) 741 .unwrap(); 742 }); 743 assert!( 744 State::test_property(|s| s.stream_writers.contains_key(&id)).await, 745 "expected cancel write {id} to be cancelled", 746 ); 747 } 748 Command::StreamCancelRead(id) => { 749 State::with(|s| { 750 s.stream_read_cancel_signals 751 .remove(&id) 752 .unwrap() 753 .send(()) 754 .unwrap(); 755 }); 756 assert!( 757 State::test_property(|s| s.stream_readers.contains_key(&id)).await, 758 "expected future read {id} to be cancelled", 759 ); 760 } 761 Command::StreamWriteAssertComplete(payload) => { 762 assert!( 763 State::test_property(|s| { 764 match s.stream_writes_completed.remove(&payload.stream) { 765 Some(Ok((size, _buf))) => { 766 assert_eq!(size, payload.count as usize); 767 true 768 } 769 Some(Err(_)) => panic!("stream was dropped"), 770 None => false, 771 } 772 }) 773 .await, 774 "expected stream write {} to be complete", 775 payload.stream, 776 ); 777 } 778 Command::StreamWriteAssertDropped(payload) => { 779 assert!( 780 State::test_property(|s| { 781 match s.stream_writes_completed.remove(&payload.stream) { 782 Some(Err((size, _buf))) => { 783 assert_eq!(size, payload.count as usize); 784 true 785 } 786 Some(Ok(_)) => panic!("stream was not dropped"), 787 None => false, 788 } 789 }) 790 .await, 791 "expected stream write {} to be complete", 792 payload.stream, 793 ); 794 } 795 Command::StreamReadAssertComplete(payload) => { 796 assert!( 797 State::test_property(|s| { 798 match s.stream_reads_completed.remove(&payload.stream) { 799 Some(Some(i)) => { 800 assert_eq!(i, stream_payload(payload.item, payload.count)); 801 true 802 } 803 Some(None) => panic!("stream was dropped"), 804 None => false, 805 } 806 }) 807 .await, 808 "expected stream read {} to be complete", 809 payload.stream, 810 ); 811 } 812 Command::StreamReadAssertDropped(id) => { 813 assert!( 814 State::test_property(|s| { 815 match s.stream_reads_completed.remove(&id) { 816 Some(None) => true, 817 Some(Some(_)) => panic!("stream was not dropped"), 818 None => false, 819 } 820 }) 821 .await, 822 "expected stream read {id} to be complete", 823 ); 824 } 825 826 Command::Ack => {} 827 } 828 } 829 } 830 831 fn stream_payload(init: u32, count: u32) -> Vec<u32> { 832 (init..init + count).collect() 833 } 834 835 fn assert_ready<F: Future>(f: Pin<&mut F>) -> F::Output { 836 let mut cx = Context::from_waker(Waker::noop()); 837 match f.poll(&mut cx) { 838 Poll::Ready(i) => i, 839 Poll::Pending => panic!("future was pending"), 840 } 841 } 842 843 fn assert_not_ready<F: Future>(f: Pin<&mut F>) { 844 let mut cx = Context::from_waker(Waker::noop()); 845 match f.poll(&mut cx) { 846 Poll::Ready(_) => panic!("future is ready"), 847 Poll::Pending => {} 848 } 849 } 850 851 fn main() { 852 unreachable!(); 853 } 854