1 mod bindings {
2     wit_bindgen::generate!({
3         path: "../misc/component-async-tests/wit",
4         world: "transmit-caller",
5     });
6 
7     use super::Component;
8     export!(Component);
9 }
10 
11 use {
12     bindings::{
13         exports::local::local::run::Guest,
14         local::local::transmit::{self, Control},
15         wit_future, wit_stream,
16     },
17     futures::{FutureExt, StreamExt, future, stream::FuturesUnordered},
18     std::{
19         future::{Future, IntoFuture},
20         pin::{Pin, pin},
21         task::Poll,
22     },
23     wit_bindgen::{FutureWriteCancel, StreamResult},
24 };
25 
26 struct Component;
27 
28 impl Guest for Component {
run()29     async fn run() {
30         let (mut control_tx, control_rx) = wit_stream::new();
31         let (mut caller_stream_tx, caller_stream_rx) = wit_stream::new();
32         let (mut caller_future_tx1, caller_future_rx1) = wit_future::new(|| todo!());
33         let (caller_future_tx2, caller_future_rx2) = wit_future::new(|| String::new());
34 
35         let (mut callee_stream_rx, mut callee_future_rx1, callee_future_rx2) = transmit::exchange(
36             control_rx,
37             caller_stream_rx,
38             caller_future_rx1,
39             caller_future_rx2,
40         )
41         .await;
42 
43         // Tell peer to read from its end of the stream and assert that the result matches an expected value.
44         assert!(
45             control_tx
46                 .write_one(Control::ReadStream("a".into()))
47                 .await
48                 .is_none()
49         );
50         assert!(caller_stream_tx.write_one("a".into()).await.is_none());
51 
52         // Start writing another value, but cancel the write before telling the peer to read.
53         {
54             let send = Box::pin(caller_stream_tx.write_one("b".into()));
55             assert!(poll(send).await.is_err());
56         }
57 
58         // Tell the peer to read an expected value again, which should _not_ match the value provided in the
59         // canceled write above.
60         assert!(
61             control_tx
62                 .write_one(Control::ReadStream("c".into()))
63                 .await
64                 .is_none()
65         );
66         assert!(caller_stream_tx.write_one("c".into()).await.is_none());
67 
68         // Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a
69         // non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length
70         // read and assert that the write completes.
71         assert!(
72             control_tx
73                 .write_one(Control::ReadStreamZero)
74                 .await
75                 .is_none()
76         );
77         {
78             assert_eq!(
79                 caller_stream_tx.write(Vec::new()).await.0,
80                 StreamResult::Complete(0)
81             );
82 
83             let send = Box::pin(caller_stream_tx.write_one("d".into()));
84             let Err(send) = poll(send).await else {
85                 panic!()
86             };
87 
88             let mut futures = FuturesUnordered::new();
89             futures.push(Box::pin(send.map(|v| {
90                 assert!(v.is_none());
91             })) as Pin<Box<dyn Future<Output = _>>>);
92             futures.push(Box::pin(
93                 control_tx
94                     .write_one(Control::ReadStream("d".into()))
95                     .map(|v| {
96                         assert!(v.is_none());
97                     }),
98             ));
99             while let Some(()) = futures.next().await {}
100         }
101 
102         // Start writing a value to the future, but cancel the write before telling the peer to read.
103         {
104             let send = Box::pin(caller_future_tx1.write("x".into()));
105             match poll(send).await {
106                 Ok(_) => panic!(),
107                 Err(mut send) => {
108                     caller_future_tx1 = match send.as_mut().cancel() {
109                         FutureWriteCancel::AlreadySent => unreachable!(),
110                         FutureWriteCancel::Dropped(_) => unreachable!(),
111                         FutureWriteCancel::Cancelled(_, writer) => writer,
112                     }
113                 }
114             }
115         }
116 
117         // Tell the peer to read an expected value again, which should _not_ match the value provided in the
118         // canceled write above.
119         assert!(
120             control_tx
121                 .write_one(Control::ReadFuture("y".into()))
122                 .await
123                 .is_none()
124         );
125         caller_future_tx1.write("y".into()).await.unwrap();
126 
127         // Tell the peer to write a value to its end of the stream, then read from our end and assert the value
128         // matches.
129         assert!(
130             control_tx
131                 .write_one(Control::WriteStream("a".into()))
132                 .await
133                 .is_none()
134         );
135         assert_eq!(callee_stream_rx.next().await, Some("a".into()));
136 
137         // Start reading a value from the stream, but cancel the read before telling the peer to write.
138         {
139             let next = Box::pin(callee_stream_rx.read(Vec::with_capacity(1)));
140             assert!(poll(next).await.is_err());
141         }
142 
143         // Once again, tell the peer to write a value to its end of the stream, then read from our end and assert
144         // the value matches.
145         assert!(
146             control_tx
147                 .write_one(Control::WriteStream("b".into()))
148                 .await
149                 .is_none()
150         );
151         assert_eq!(callee_stream_rx.next().await, Some("b".into()));
152 
153         // Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to
154         // do a non-zero-length write and assert that the read completes.
155         assert!(
156             control_tx
157                 .write_one(Control::WriteStreamZero)
158                 .await
159                 .is_none()
160         );
161         {
162             let next = Box::pin(callee_stream_rx.next());
163             let Err(next) = poll(next).await else {
164                 panic!()
165             };
166 
167             let mut futures = FuturesUnordered::new();
168             futures.push(Box::pin(next.map(|v| {
169                 assert_eq!(v, Some("c".into()));
170             })) as Pin<Box<dyn Future<Output = _>>>);
171             futures.push(Box::pin(
172                 control_tx
173                     .write_one(Control::WriteStream("c".into()))
174                     .map(|v| {
175                         assert!(v.is_none());
176                     }),
177             ));
178             while let Some(()) = futures.next().await {}
179         }
180 
181         // Start reading a value from the future, but cancel the read before telling the peer to write.
182         {
183             let next = Box::pin(callee_future_rx1.into_future());
184             match poll(next).await {
185                 Ok(_) => panic!(),
186                 Err(mut next) => callee_future_rx1 = next.as_mut().cancel().unwrap_err(),
187             }
188         }
189 
190         // Tell the peer to write a value to its end of the future, then read from our end and assert the value
191         // matches.
192         assert!(
193             control_tx
194                 .write_one(Control::WriteFuture("b".into()))
195                 .await
196                 .is_none()
197         );
198         assert_eq!(callee_future_rx1.into_future().await, "b");
199 
200         // Start writing a value to the stream, but drop the stream without telling the peer to read.
201         let send = Box::pin(caller_stream_tx.write_one("d".into()));
202         assert!(poll(send).await.is_err());
203         drop(caller_stream_tx);
204 
205         // Start reading a value from the stream, but drop the stream without telling the peer to write.
206         let next = Box::pin(callee_stream_rx.next());
207         assert!(poll(next).await.is_err());
208         drop(callee_stream_rx);
209 
210         // Start writing a value to the future, but drop the write without telling the peer to read.
211         {
212             let send = pin!(caller_future_tx2.write("x".into()));
213             assert!(poll(send).await.is_err());
214         }
215 
216         // Start reading a value from the future, but drop the read without telling the peer to write.
217         {
218             let next = Box::pin(callee_future_rx2.into_future());
219             assert!(poll(next).await.is_err());
220         }
221     }
222 }
223 
poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F>224 async fn poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F> {
225     let mut fut = Some(fut);
226     future::poll_fn(move |cx| {
227         let mut fut = fut.take().unwrap();
228         Poll::Ready(match fut.poll_unpin(cx) {
229             Poll::Ready(v) => Ok(v),
230             Poll::Pending => Err(fut),
231         })
232     })
233     .await
234 }
235 
236 // Unused function; required since this file is built as a `bin`:
main()237 fn main() {}
238