1 use futures::{Sink, Stream, channel::oneshot}; 2 use std::{ 3 marker::PhantomData, 4 pin::Pin, 5 task::{Context, Poll}, 6 }; 7 use wasmtime::Result; 8 use wasmtime::{ 9 StoreContextMut, 10 component::{ 11 Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer, 12 StreamProducer, StreamResult, 13 }, 14 }; 15 16 pub async fn yield_times(n: usize) { 17 for _ in 0..n { 18 tokio::task::yield_now().await; 19 } 20 } 21 22 pub struct PipeProducer<S>(S); 23 24 impl<S> PipeProducer<S> { 25 pub fn new(rx: S) -> Self { 26 Self(rx) 27 } 28 } 29 30 impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static> StreamProducer<D> 31 for PipeProducer<S> 32 { 33 type Item = T; 34 type Buffer = Option<T>; 35 36 fn poll_produce<'a>( 37 self: Pin<&mut Self>, 38 cx: &mut Context<'_>, 39 _: StoreContextMut<D>, 40 mut destination: Destination<'a, Self::Item, Self::Buffer>, 41 finish: bool, 42 ) -> Poll<Result<StreamResult>> { 43 // SAFETY: This is a standard pin-projection, and we never move 44 // out of `self`. 45 let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) }; 46 47 match stream.poll_next(cx) { 48 Poll::Pending => { 49 if finish { 50 Poll::Ready(Ok(StreamResult::Cancelled)) 51 } else { 52 Poll::Pending 53 } 54 } 55 Poll::Ready(Some(item)) => { 56 destination.set_buffer(Some(item)); 57 Poll::Ready(Ok(StreamResult::Completed)) 58 } 59 Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)), 60 } 61 } 62 } 63 64 pub struct PipeConsumer<T, S>(S, PhantomData<fn() -> T>); 65 66 impl<T, S> PipeConsumer<T, S> { 67 pub fn new(tx: S) -> Self { 68 Self(tx, PhantomData) 69 } 70 } 71 72 impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> + Send + 'static> 73 StreamConsumer<D> for PipeConsumer<T, S> 74 { 75 type Item = T; 76 77 fn poll_consume( 78 self: Pin<&mut Self>, 79 cx: &mut Context<'_>, 80 store: StoreContextMut<D>, 81 mut source: Source<Self::Item>, 82 finish: bool, 83 ) -> Poll<Result<StreamResult>> { 84 // SAFETY: This is a standard pin-projection, and we never move 85 // out of `self`. 86 let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) }; 87 88 let on_pending = || { 89 if finish { 90 Poll::Ready(Ok(StreamResult::Cancelled)) 91 } else { 92 Poll::Pending 93 } 94 }; 95 96 match sink.as_mut().poll_flush(cx) { 97 Poll::Pending => on_pending(), 98 Poll::Ready(result) => { 99 result?; 100 match sink.as_mut().poll_ready(cx) { 101 Poll::Pending => on_pending(), 102 Poll::Ready(result) => { 103 result?; 104 let item = &mut None; 105 source.read(store, item)?; 106 sink.start_send(item.take().unwrap())?; 107 Poll::Ready(Ok(StreamResult::Completed)) 108 } 109 } 110 } 111 } 112 } 113 } 114 115 pub struct OneshotProducer<T>(oneshot::Receiver<T>); 116 117 impl<T> OneshotProducer<T> { 118 pub fn new(rx: oneshot::Receiver<T>) -> Self { 119 Self(rx) 120 } 121 } 122 123 impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> { 124 type Item = T; 125 126 fn poll_produce( 127 self: Pin<&mut Self>, 128 cx: &mut Context<'_>, 129 _: StoreContextMut<D>, 130 finish: bool, 131 ) -> Poll<Result<Option<T>>> { 132 match Pin::new(&mut self.get_mut().0).poll(cx) { 133 Poll::Pending if finish => Poll::Ready(Ok(None)), 134 Poll::Pending => Poll::Pending, 135 Poll::Ready(result) => Poll::Ready(Ok(Some(result?))), 136 } 137 } 138 } 139 140 pub struct OneshotConsumer<T>(Option<oneshot::Sender<T>>); 141 142 impl<T> OneshotConsumer<T> { 143 pub fn new(tx: oneshot::Sender<T>) -> Self { 144 Self(Some(tx)) 145 } 146 } 147 148 impl<D, T: Lift + Send + 'static> FutureConsumer<D> for OneshotConsumer<T> { 149 type Item = T; 150 151 fn poll_consume( 152 self: Pin<&mut Self>, 153 _: &mut Context<'_>, 154 store: StoreContextMut<D>, 155 mut source: Source<'_, T>, 156 _: bool, 157 ) -> Poll<Result<()>> { 158 let value = &mut None; 159 source.read(store, value)?; 160 _ = self.get_mut().0.take().unwrap().send(value.take().unwrap()); 161 Poll::Ready(Ok(())) 162 } 163 } 164