1 use super::Ctx; 2 use futures::future; 3 use std::ops::DerefMut; 4 use std::sync::{Arc, Mutex}; 5 use std::task::{Poll, Waker}; 6 use wasmtime::component::{Accessor, Resource}; 7 8 pub mod bindings { 9 wasmtime::component::bindgen!({ 10 path: "wit", 11 world: "yield-runner", 12 imports: { default: trappable }, 13 with: { 14 "local:local/ready.thing": super::Thing, 15 }, 16 }); 17 } 18 19 #[derive(Default)] 20 pub struct Thing { 21 wakers: Arc<Mutex<Option<Vec<Waker>>>>, 22 } 23 24 impl bindings::local::local::continue_::Host for Ctx { set_continue(&mut self, v: bool) -> wasmtime::Result<()>25 fn set_continue(&mut self, v: bool) -> wasmtime::Result<()> { 26 self.continue_ = v; 27 Ok(()) 28 } 29 get_continue(&mut self) -> wasmtime::Result<bool>30 fn get_continue(&mut self) -> wasmtime::Result<bool> { 31 Ok(self.continue_) 32 } 33 } 34 35 impl bindings::local::local::ready::HostThing for Ctx { new(&mut self) -> wasmtime::Result<Resource<Thing>>36 fn new(&mut self) -> wasmtime::Result<Resource<Thing>> { 37 Ok(self.table.push(Thing::default())?) 38 } 39 set_ready(&mut self, thing: Resource<Thing>, ready: bool) -> wasmtime::Result<()>40 fn set_ready(&mut self, thing: Resource<Thing>, ready: bool) -> wasmtime::Result<()> { 41 let thing = self.table.get(&thing)?; 42 let mut wakers = thing.wakers.lock().unwrap(); 43 if ready { 44 if let Some(wakers) = wakers.take() { 45 for waker in wakers { 46 waker.wake(); 47 } 48 } 49 } else if wakers.is_none() { 50 *wakers = Some(Vec::new()); 51 } 52 Ok(()) 53 } 54 drop(&mut self, thing: Resource<Thing>) -> wasmtime::Result<()>55 fn drop(&mut self, thing: Resource<Thing>) -> wasmtime::Result<()> { 56 self.table.delete(thing)?; 57 Ok(()) 58 } 59 } 60 61 impl bindings::local::local::ready::HostThingWithStore for Ctx { when_ready<T>( accessor: &Accessor<T, Self>, thing: Resource<Thing>, ) -> wasmtime::Result<()>62 async fn when_ready<T>( 63 accessor: &Accessor<T, Self>, 64 thing: Resource<Thing>, 65 ) -> wasmtime::Result<()> { 66 let wakers = accessor.with(|mut view| { 67 Ok::<_, wasmtime::Error>(view.get().table.get(&thing)?.wakers.clone()) 68 })?; 69 70 future::poll_fn(move |cx| { 71 let mut wakers = wakers.lock().unwrap(); 72 if let Some(wakers) = wakers.deref_mut() { 73 wakers.push(cx.waker().clone()); 74 Poll::Pending 75 } else { 76 Poll::Ready(()) 77 } 78 }) 79 .await; 80 81 Ok(()) 82 } 83 } 84 85 impl bindings::local::local::ready::Host for Ctx {} 86