1 mod bindings {
2 wit_bindgen::generate!({
3 path: "../misc/component-async-tests/wit",
4 world: "transmit-caller",
5 });
6
7 use super::Component;
8 export!(Component);
9 }
10
11 use {
12 bindings::{
13 exports::local::local::run::Guest,
14 local::local::transmit::{self, Control},
15 wit_future, wit_stream,
16 },
17 futures::{FutureExt, StreamExt, future, stream::FuturesUnordered},
18 std::{
19 future::{Future, IntoFuture},
20 pin::{Pin, pin},
21 task::Poll,
22 },
23 wit_bindgen::{FutureWriteCancel, StreamResult},
24 };
25
26 struct Component;
27
28 impl Guest for Component {
run()29 async fn run() {
30 let (mut control_tx, control_rx) = wit_stream::new();
31 let (mut caller_stream_tx, caller_stream_rx) = wit_stream::new();
32 let (mut caller_future_tx1, caller_future_rx1) = wit_future::new(|| todo!());
33 let (caller_future_tx2, caller_future_rx2) = wit_future::new(|| String::new());
34
35 let (mut callee_stream_rx, mut callee_future_rx1, callee_future_rx2) = transmit::exchange(
36 control_rx,
37 caller_stream_rx,
38 caller_future_rx1,
39 caller_future_rx2,
40 )
41 .await;
42
43 // Tell peer to read from its end of the stream and assert that the result matches an expected value.
44 assert!(
45 control_tx
46 .write_one(Control::ReadStream("a".into()))
47 .await
48 .is_none()
49 );
50 assert!(caller_stream_tx.write_one("a".into()).await.is_none());
51
52 // Start writing another value, but cancel the write before telling the peer to read.
53 {
54 let send = Box::pin(caller_stream_tx.write_one("b".into()));
55 assert!(poll(send).await.is_err());
56 }
57
58 // Tell the peer to read an expected value again, which should _not_ match the value provided in the
59 // canceled write above.
60 assert!(
61 control_tx
62 .write_one(Control::ReadStream("c".into()))
63 .await
64 .is_none()
65 );
66 assert!(caller_stream_tx.write_one("c".into()).await.is_none());
67
68 // Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a
69 // non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length
70 // read and assert that the write completes.
71 assert!(
72 control_tx
73 .write_one(Control::ReadStreamZero)
74 .await
75 .is_none()
76 );
77 {
78 assert_eq!(
79 caller_stream_tx.write(Vec::new()).await.0,
80 StreamResult::Complete(0)
81 );
82
83 let send = Box::pin(caller_stream_tx.write_one("d".into()));
84 let Err(send) = poll(send).await else {
85 panic!()
86 };
87
88 let mut futures = FuturesUnordered::new();
89 futures.push(Box::pin(send.map(|v| {
90 assert!(v.is_none());
91 })) as Pin<Box<dyn Future<Output = _>>>);
92 futures.push(Box::pin(
93 control_tx
94 .write_one(Control::ReadStream("d".into()))
95 .map(|v| {
96 assert!(v.is_none());
97 }),
98 ));
99 while let Some(()) = futures.next().await {}
100 }
101
102 // Start writing a value to the future, but cancel the write before telling the peer to read.
103 {
104 let send = Box::pin(caller_future_tx1.write("x".into()));
105 match poll(send).await {
106 Ok(_) => panic!(),
107 Err(mut send) => {
108 caller_future_tx1 = match send.as_mut().cancel() {
109 FutureWriteCancel::AlreadySent => unreachable!(),
110 FutureWriteCancel::Dropped(_) => unreachable!(),
111 FutureWriteCancel::Cancelled(_, writer) => writer,
112 }
113 }
114 }
115 }
116
117 // Tell the peer to read an expected value again, which should _not_ match the value provided in the
118 // canceled write above.
119 assert!(
120 control_tx
121 .write_one(Control::ReadFuture("y".into()))
122 .await
123 .is_none()
124 );
125 caller_future_tx1.write("y".into()).await.unwrap();
126
127 // Tell the peer to write a value to its end of the stream, then read from our end and assert the value
128 // matches.
129 assert!(
130 control_tx
131 .write_one(Control::WriteStream("a".into()))
132 .await
133 .is_none()
134 );
135 assert_eq!(callee_stream_rx.next().await, Some("a".into()));
136
137 // Start reading a value from the stream, but cancel the read before telling the peer to write.
138 {
139 let next = Box::pin(callee_stream_rx.read(Vec::with_capacity(1)));
140 assert!(poll(next).await.is_err());
141 }
142
143 // Once again, tell the peer to write a value to its end of the stream, then read from our end and assert
144 // the value matches.
145 assert!(
146 control_tx
147 .write_one(Control::WriteStream("b".into()))
148 .await
149 .is_none()
150 );
151 assert_eq!(callee_stream_rx.next().await, Some("b".into()));
152
153 // Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to
154 // do a non-zero-length write and assert that the read completes.
155 assert!(
156 control_tx
157 .write_one(Control::WriteStreamZero)
158 .await
159 .is_none()
160 );
161 {
162 let next = Box::pin(callee_stream_rx.next());
163 let Err(next) = poll(next).await else {
164 panic!()
165 };
166
167 let mut futures = FuturesUnordered::new();
168 futures.push(Box::pin(next.map(|v| {
169 assert_eq!(v, Some("c".into()));
170 })) as Pin<Box<dyn Future<Output = _>>>);
171 futures.push(Box::pin(
172 control_tx
173 .write_one(Control::WriteStream("c".into()))
174 .map(|v| {
175 assert!(v.is_none());
176 }),
177 ));
178 while let Some(()) = futures.next().await {}
179 }
180
181 // Start reading a value from the future, but cancel the read before telling the peer to write.
182 {
183 let next = Box::pin(callee_future_rx1.into_future());
184 match poll(next).await {
185 Ok(_) => panic!(),
186 Err(mut next) => callee_future_rx1 = next.as_mut().cancel().unwrap_err(),
187 }
188 }
189
190 // Tell the peer to write a value to its end of the future, then read from our end and assert the value
191 // matches.
192 assert!(
193 control_tx
194 .write_one(Control::WriteFuture("b".into()))
195 .await
196 .is_none()
197 );
198 assert_eq!(callee_future_rx1.into_future().await, "b");
199
200 // Start writing a value to the stream, but drop the stream without telling the peer to read.
201 let send = Box::pin(caller_stream_tx.write_one("d".into()));
202 assert!(poll(send).await.is_err());
203 drop(caller_stream_tx);
204
205 // Start reading a value from the stream, but drop the stream without telling the peer to write.
206 let next = Box::pin(callee_stream_rx.next());
207 assert!(poll(next).await.is_err());
208 drop(callee_stream_rx);
209
210 // Start writing a value to the future, but drop the write without telling the peer to read.
211 {
212 let send = pin!(caller_future_tx2.write("x".into()));
213 assert!(poll(send).await.is_err());
214 }
215
216 // Start reading a value from the future, but drop the read without telling the peer to write.
217 {
218 let next = Box::pin(callee_future_rx2.into_future());
219 assert!(poll(next).await.is_err());
220 }
221 }
222 }
223
poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F>224 async fn poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F> {
225 let mut fut = Some(fut);
226 future::poll_fn(move |cx| {
227 let mut fut = fut.take().unwrap();
228 Poll::Ready(match fut.poll_unpin(cx) {
229 Poll::Ready(v) => Ok(v),
230 Poll::Pending => Err(fut),
231 })
232 })
233 .await
234 }
235
236 // Unused function; required since this file is built as a `bin`:
main()237 fn main() {}
238