use futures::{Sink, Stream, channel::oneshot}; use std::{ marker::PhantomData, pin::Pin, task::{Context, Poll}, }; use wasmtime::Result; use wasmtime::{ StoreContextMut, component::{ Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer, StreamProducer, StreamResult, }, }; pub async fn yield_times(n: usize) { for _ in 0..n { tokio::task::yield_now().await; } } pub struct PipeProducer(S); impl PipeProducer { pub fn new(rx: S) -> Self { Self(rx) } } impl + Send + 'static> StreamProducer for PipeProducer { type Item = T; type Buffer = Option; fn poll_produce<'a>( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut, mut destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll> { // SAFETY: This is a standard pin-projection, and we never move // out of `self`. let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) }; match stream.poll_next(cx) { Poll::Pending => { if finish { Poll::Ready(Ok(StreamResult::Cancelled)) } else { Poll::Pending } } Poll::Ready(Some(item)) => { destination.set_buffer(Some(item)); Poll::Ready(Ok(StreamResult::Completed)) } Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)), } } } pub struct PipeConsumer(S, PhantomData T>); impl PipeConsumer { pub fn new(tx: S) -> Self { Self(tx, PhantomData) } } impl + Send + 'static> StreamConsumer for PipeConsumer { type Item = T; fn poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut, mut source: Source, finish: bool, ) -> Poll> { // SAFETY: This is a standard pin-projection, and we never move // out of `self`. let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) }; let on_pending = || { if finish { Poll::Ready(Ok(StreamResult::Cancelled)) } else { Poll::Pending } }; match sink.as_mut().poll_flush(cx) { Poll::Pending => on_pending(), Poll::Ready(result) => { result?; match sink.as_mut().poll_ready(cx) { Poll::Pending => on_pending(), Poll::Ready(result) => { result?; let item = &mut None; source.read(store, item)?; sink.start_send(item.take().unwrap())?; Poll::Ready(Ok(StreamResult::Completed)) } } } } } } pub struct OneshotProducer(oneshot::Receiver); impl OneshotProducer { pub fn new(rx: oneshot::Receiver) -> Self { Self(rx) } } impl FutureProducer for OneshotProducer { type Item = T; fn poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut, finish: bool, ) -> Poll>> { match Pin::new(&mut self.get_mut().0).poll(cx) { Poll::Pending if finish => Poll::Ready(Ok(None)), Poll::Pending => Poll::Pending, Poll::Ready(result) => Poll::Ready(Ok(Some(result?))), } } } pub struct OneshotConsumer(Option>); impl OneshotConsumer { pub fn new(tx: oneshot::Sender) -> Self { Self(Some(tx)) } } impl FutureConsumer for OneshotConsumer { type Item = T; fn poll_consume( self: Pin<&mut Self>, _: &mut Context<'_>, store: StoreContextMut, mut source: Source<'_, T>, _: bool, ) -> Poll> { let value = &mut None; source.read(store, value)?; _ = self.get_mut().0.take().unwrap().send(value.take().unwrap()); Poll::Ready(Ok(())) } }