1 use futures::{Sink, Stream, channel::oneshot};
2 use std::{
3     marker::PhantomData,
4     pin::Pin,
5     task::{Context, Poll},
6 };
7 use wasmtime::Result;
8 use wasmtime::{
9     StoreContextMut,
10     component::{
11         Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
12         StreamProducer, StreamResult,
13     },
14 };
15 
yield_times(n: usize)16 pub async fn yield_times(n: usize) {
17     for _ in 0..n {
18         tokio::task::yield_now().await;
19     }
20 }
21 
22 pub struct PipeProducer<S>(S);
23 
24 impl<S> PipeProducer<S> {
new(rx: S) -> Self25     pub fn new(rx: S) -> Self {
26         Self(rx)
27     }
28 }
29 
30 impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static> StreamProducer<D>
31     for PipeProducer<S>
32 {
33     type Item = T;
34     type Buffer = Option<T>;
35 
poll_produce<'a>( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut<D>, mut destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>36     fn poll_produce<'a>(
37         self: Pin<&mut Self>,
38         cx: &mut Context<'_>,
39         _: StoreContextMut<D>,
40         mut destination: Destination<'a, Self::Item, Self::Buffer>,
41         finish: bool,
42     ) -> Poll<Result<StreamResult>> {
43         // SAFETY: This is a standard pin-projection, and we never move
44         // out of `self`.
45         let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
46 
47         match stream.poll_next(cx) {
48             Poll::Pending => {
49                 if finish {
50                     Poll::Ready(Ok(StreamResult::Cancelled))
51                 } else {
52                     Poll::Pending
53                 }
54             }
55             Poll::Ready(Some(item)) => {
56                 destination.set_buffer(Some(item));
57                 Poll::Ready(Ok(StreamResult::Completed))
58             }
59             Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
60         }
61     }
62 }
63 
64 pub struct PipeConsumer<T, S>(S, PhantomData<fn() -> T>);
65 
66 impl<T, S> PipeConsumer<T, S> {
new(tx: S) -> Self67     pub fn new(tx: S) -> Self {
68         Self(tx, PhantomData)
69     }
70 }
71 
72 impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> + Send + 'static>
73     StreamConsumer<D> for PipeConsumer<T, S>
74 {
75     type Item = T;
76 
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<D>, mut source: Source<Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>77     fn poll_consume(
78         self: Pin<&mut Self>,
79         cx: &mut Context<'_>,
80         store: StoreContextMut<D>,
81         mut source: Source<Self::Item>,
82         finish: bool,
83     ) -> Poll<Result<StreamResult>> {
84         // SAFETY: This is a standard pin-projection, and we never move
85         // out of `self`.
86         let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
87 
88         let on_pending = || {
89             if finish {
90                 Poll::Ready(Ok(StreamResult::Cancelled))
91             } else {
92                 Poll::Pending
93             }
94         };
95 
96         match sink.as_mut().poll_flush(cx) {
97             Poll::Pending => on_pending(),
98             Poll::Ready(result) => {
99                 result?;
100                 match sink.as_mut().poll_ready(cx) {
101                     Poll::Pending => on_pending(),
102                     Poll::Ready(result) => {
103                         result?;
104                         let item = &mut None;
105                         source.read(store, item)?;
106                         sink.start_send(item.take().unwrap())?;
107                         Poll::Ready(Ok(StreamResult::Completed))
108                     }
109                 }
110             }
111         }
112     }
113 }
114 
115 pub struct OneshotProducer<T>(oneshot::Receiver<T>);
116 
117 impl<T> OneshotProducer<T> {
new(rx: oneshot::Receiver<T>) -> Self118     pub fn new(rx: oneshot::Receiver<T>) -> Self {
119         Self(rx)
120     }
121 }
122 
123 impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {
124     type Item = T;
125 
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut<D>, finish: bool, ) -> Poll<Result<Option<T>>>126     fn poll_produce(
127         self: Pin<&mut Self>,
128         cx: &mut Context<'_>,
129         _: StoreContextMut<D>,
130         finish: bool,
131     ) -> Poll<Result<Option<T>>> {
132         match Pin::new(&mut self.get_mut().0).poll(cx) {
133             Poll::Pending if finish => Poll::Ready(Ok(None)),
134             Poll::Pending => Poll::Pending,
135             Poll::Ready(result) => Poll::Ready(Ok(Some(result?))),
136         }
137     }
138 }
139 
140 pub struct OneshotConsumer<T>(Option<oneshot::Sender<T>>);
141 
142 impl<T> OneshotConsumer<T> {
new(tx: oneshot::Sender<T>) -> Self143     pub fn new(tx: oneshot::Sender<T>) -> Self {
144         Self(Some(tx))
145     }
146 }
147 
148 impl<D, T: Lift + Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
149     type Item = T;
150 
poll_consume( self: Pin<&mut Self>, _: &mut Context<'_>, store: StoreContextMut<D>, mut source: Source<'_, T>, _: bool, ) -> Poll<Result<()>>151     fn poll_consume(
152         self: Pin<&mut Self>,
153         _: &mut Context<'_>,
154         store: StoreContextMut<D>,
155         mut source: Source<'_, T>,
156         _: bool,
157     ) -> Poll<Result<()>> {
158         let value = &mut None;
159         source.read(store, value)?;
160         _ = self.get_mut().0.take().unwrap().send(value.take().unwrap());
161         Poll::Ready(Ok(()))
162     }
163 }
164