1 use futures::{Sink, Stream, channel::oneshot};
2 use std::{
3 marker::PhantomData,
4 pin::Pin,
5 task::{Context, Poll},
6 };
7 use wasmtime::Result;
8 use wasmtime::{
9 StoreContextMut,
10 component::{
11 Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
12 StreamProducer, StreamResult,
13 },
14 };
15
yield_times(n: usize)16 pub async fn yield_times(n: usize) {
17 for _ in 0..n {
18 tokio::task::yield_now().await;
19 }
20 }
21
22 pub struct PipeProducer<S>(S);
23
24 impl<S> PipeProducer<S> {
new(rx: S) -> Self25 pub fn new(rx: S) -> Self {
26 Self(rx)
27 }
28 }
29
30 impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static> StreamProducer<D>
31 for PipeProducer<S>
32 {
33 type Item = T;
34 type Buffer = Option<T>;
35
poll_produce<'a>( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut<D>, mut destination: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<Result<StreamResult>>36 fn poll_produce<'a>(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 _: StoreContextMut<D>,
40 mut destination: Destination<'a, Self::Item, Self::Buffer>,
41 finish: bool,
42 ) -> Poll<Result<StreamResult>> {
43 // SAFETY: This is a standard pin-projection, and we never move
44 // out of `self`.
45 let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
46
47 match stream.poll_next(cx) {
48 Poll::Pending => {
49 if finish {
50 Poll::Ready(Ok(StreamResult::Cancelled))
51 } else {
52 Poll::Pending
53 }
54 }
55 Poll::Ready(Some(item)) => {
56 destination.set_buffer(Some(item));
57 Poll::Ready(Ok(StreamResult::Completed))
58 }
59 Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
60 }
61 }
62 }
63
64 pub struct PipeConsumer<T, S>(S, PhantomData<fn() -> T>);
65
66 impl<T, S> PipeConsumer<T, S> {
new(tx: S) -> Self67 pub fn new(tx: S) -> Self {
68 Self(tx, PhantomData)
69 }
70 }
71
72 impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> + Send + 'static>
73 StreamConsumer<D> for PipeConsumer<T, S>
74 {
75 type Item = T;
76
poll_consume( self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<D>, mut source: Source<Self::Item>, finish: bool, ) -> Poll<Result<StreamResult>>77 fn poll_consume(
78 self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 store: StoreContextMut<D>,
81 mut source: Source<Self::Item>,
82 finish: bool,
83 ) -> Poll<Result<StreamResult>> {
84 // SAFETY: This is a standard pin-projection, and we never move
85 // out of `self`.
86 let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
87
88 let on_pending = || {
89 if finish {
90 Poll::Ready(Ok(StreamResult::Cancelled))
91 } else {
92 Poll::Pending
93 }
94 };
95
96 match sink.as_mut().poll_flush(cx) {
97 Poll::Pending => on_pending(),
98 Poll::Ready(result) => {
99 result?;
100 match sink.as_mut().poll_ready(cx) {
101 Poll::Pending => on_pending(),
102 Poll::Ready(result) => {
103 result?;
104 let item = &mut None;
105 source.read(store, item)?;
106 sink.start_send(item.take().unwrap())?;
107 Poll::Ready(Ok(StreamResult::Completed))
108 }
109 }
110 }
111 }
112 }
113 }
114
115 pub struct OneshotProducer<T>(oneshot::Receiver<T>);
116
117 impl<T> OneshotProducer<T> {
new(rx: oneshot::Receiver<T>) -> Self118 pub fn new(rx: oneshot::Receiver<T>) -> Self {
119 Self(rx)
120 }
121 }
122
123 impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {
124 type Item = T;
125
poll_produce( self: Pin<&mut Self>, cx: &mut Context<'_>, _: StoreContextMut<D>, finish: bool, ) -> Poll<Result<Option<T>>>126 fn poll_produce(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 _: StoreContextMut<D>,
130 finish: bool,
131 ) -> Poll<Result<Option<T>>> {
132 match Pin::new(&mut self.get_mut().0).poll(cx) {
133 Poll::Pending if finish => Poll::Ready(Ok(None)),
134 Poll::Pending => Poll::Pending,
135 Poll::Ready(result) => Poll::Ready(Ok(Some(result?))),
136 }
137 }
138 }
139
140 pub struct OneshotConsumer<T>(Option<oneshot::Sender<T>>);
141
142 impl<T> OneshotConsumer<T> {
new(tx: oneshot::Sender<T>) -> Self143 pub fn new(tx: oneshot::Sender<T>) -> Self {
144 Self(Some(tx))
145 }
146 }
147
148 impl<D, T: Lift + Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
149 type Item = T;
150
poll_consume( self: Pin<&mut Self>, _: &mut Context<'_>, store: StoreContextMut<D>, mut source: Source<'_, T>, _: bool, ) -> Poll<Result<()>>151 fn poll_consume(
152 self: Pin<&mut Self>,
153 _: &mut Context<'_>,
154 store: StoreContextMut<D>,
155 mut source: Source<'_, T>,
156 _: bool,
157 ) -> Poll<Result<()>> {
158 let value = &mut None;
159 source.read(store, value)?;
160 _ = self.get_mut().0.take().unwrap().send(value.take().unwrap());
161 Poll::Ready(Ok(()))
162 }
163 }
164