1 mod bindings {
2     wit_bindgen::generate!({
3         path: "../misc/component-async-tests/wit",
4         world: "readiness-guest",
5     });
6 }
7 
8 use {
9     std::{mem, ptr},
10     test_programs::async_::{
11         BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_WAIT, DROPPED, EVENT_NONE, EVENT_STREAM_READ,
12         EVENT_STREAM_WRITE, context_get, context_set, waitable_join, waitable_set_drop,
13         waitable_set_new,
14     },
15 };
16 
17 #[cfg(target_arch = "wasm32")]
18 #[link(wasm_import_module = "[export]local:local/readiness")]
19 unsafe extern "C" {
20     #[link_name = "[task-return]start"]
task_return_start(_: u32, _: *const u8, _: usize)21     fn task_return_start(_: u32, _: *const u8, _: usize);
22 }
23 #[cfg(not(target_arch = "wasm32"))]
task_return_start(_: u32, _: *const u8, _: usize)24 unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize) {
25     unreachable!()
26 }
27 
28 #[cfg(target_arch = "wasm32")]
29 #[link(wasm_import_module = "[export]local:local/readiness")]
30 unsafe extern "C" {
31     #[link_name = "[stream-new-0]start"]
stream_new() -> u6432     fn stream_new() -> u64;
33 }
34 #[cfg(not(target_arch = "wasm32"))]
stream_new() -> u6435 unsafe extern "C" fn stream_new() -> u64 {
36     unreachable!()
37 }
38 
39 #[cfg(target_arch = "wasm32")]
40 #[link(wasm_import_module = "[export]local:local/readiness")]
41 unsafe extern "C" {
42     #[link_name = "[async-lower][stream-write-0]start"]
stream_write(_: u32, _: *const u8, _: usize) -> u3243     fn stream_write(_: u32, _: *const u8, _: usize) -> u32;
44 }
45 #[cfg(not(target_arch = "wasm32"))]
stream_write(_: u32, _: *const u8, _: usize) -> u3246 unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {
47     unreachable!()
48 }
49 
50 #[cfg(target_arch = "wasm32")]
51 #[link(wasm_import_module = "[export]local:local/readiness")]
52 unsafe extern "C" {
53     #[link_name = "[async-lower][stream-read-0]start"]
stream_read(_: u32, _: *mut u8, _: usize) -> u3254     fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
55 }
56 #[cfg(not(target_arch = "wasm32"))]
stream_read(_: u32, _: *mut u8, _: usize) -> u3257 unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {
58     unreachable!()
59 }
60 
61 #[cfg(target_arch = "wasm32")]
62 #[link(wasm_import_module = "[export]local:local/readiness")]
63 unsafe extern "C" {
64     #[link_name = "[stream-drop-readable-0]start"]
stream_drop_readable(_: u32)65     fn stream_drop_readable(_: u32);
66 }
67 #[cfg(not(target_arch = "wasm32"))]
stream_drop_readable(_: u32)68 unsafe extern "C" fn stream_drop_readable(_: u32) {
69     unreachable!()
70 }
71 
72 #[cfg(target_arch = "wasm32")]
73 #[link(wasm_import_module = "[export]local:local/readiness")]
74 unsafe extern "C" {
75     #[link_name = "[stream-drop-writable-0]start"]
stream_drop_writable(_: u32)76     fn stream_drop_writable(_: u32);
77 }
78 #[cfg(not(target_arch = "wasm32"))]
stream_drop_writable(_: u32)79 unsafe extern "C" fn stream_drop_writable(_: u32) {
80     unreachable!()
81 }
82 
83 static BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];
84 
85 enum State {
86     S0 {
87         rx: u32,
88         expected: Vec<u8>,
89     },
90     S1 {
91         set: u32,
92         tx: Option<u32>,
93         rx: Option<u32>,
94         expected: Vec<u8>,
95     },
96 }
97 
98 #[unsafe(export_name = "[async-lift]local:local/readiness#start")]
export_start(rx: u32, expected: u32, expected_len: u32) -> u3299 unsafe extern "C" fn export_start(rx: u32, expected: u32, expected_len: u32) -> u32 {
100     let expected_len = usize::try_from(expected_len).unwrap();
101 
102     unsafe {
103         context_set(
104             u32::try_from(Box::into_raw(Box::new(State::S0 {
105                 rx,
106                 expected: Vec::from_raw_parts(
107                     expected as usize as *mut u8,
108                     expected_len,
109                     expected_len,
110                 ),
111             })) as usize)
112             .unwrap(),
113         );
114 
115         callback_start(EVENT_NONE, 0, 0)
116     }
117 }
118 
119 #[unsafe(export_name = "[callback][async-lift]local:local/readiness#start")]
callback_start(event0: u32, event1: u32, event2: u32) -> u32120 unsafe extern "C" fn callback_start(event0: u32, event1: u32, event2: u32) -> u32 {
121     unsafe {
122         let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);
123         match state {
124             State::S0 { rx, expected } => {
125                 assert_eq!(event0, EVENT_NONE);
126 
127                 // Do a zero-length read to wait until the writer is ready.
128                 //
129                 // Here we assume specific behavior from the writer, namely:
130                 //
131                 // - It is not immediately ready to send us anything.
132                 //
133                 // - When it _is_ ready, it will send us all the bytes it told us to
134                 // expect at once.
135                 let status = stream_read(*rx, ptr::null_mut(), 0);
136                 assert_eq!(status, BLOCKED);
137 
138                 let set = waitable_set_new();
139 
140                 waitable_join(*rx, set);
141 
142                 let tx = {
143                     let pair = stream_new();
144                     let tx = u32::try_from(pair >> 32).unwrap();
145                     let rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
146 
147                     // Do a zero-length write to wait until the reader is ready.
148                     //
149                     // Here we assume specific behavior from the reader, namely:
150                     //
151                     // - It is not immediately ready to receive anything (indeed, it
152                     // can't possibly be ready given that we haven't returned the
153                     // read handle to it yet).
154                     //
155                     // - When it _is_ ready, it will accept all the bytes we told it
156                     // to expect at once.
157                     let status = stream_write(tx, ptr::null(), 0);
158                     assert_eq!(status, BLOCKED);
159 
160                     waitable_join(tx, set);
161 
162                     task_return_start(rx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
163 
164                     tx
165                 };
166 
167                 *state = State::S1 {
168                     set,
169                     tx: Some(tx),
170                     rx: Some(*rx),
171                     expected: mem::take(expected),
172                 };
173 
174                 CALLBACK_CODE_WAIT | (set << 4)
175             }
176 
177             State::S1 {
178                 set,
179                 tx,
180                 rx,
181                 expected,
182             } => {
183                 if event0 == EVENT_STREAM_READ {
184                     let rx = rx.take().unwrap();
185                     assert_eq!(event1, rx);
186                     assert_eq!(event2, 0);
187 
188                     // The writer is ready now, so this read should not block.
189                     //
190                     // As noted above, we rely on the writer sending us all the
191                     // expected bytes at once.
192                     let received = &mut vec![0_u8; expected.len()];
193                     let status = stream_read(rx, received.as_mut_ptr(), received.len());
194                     assert_eq!(
195                         status,
196                         DROPPED | u32::try_from(received.len() << 4).unwrap()
197                     );
198                     assert_eq!(received, expected);
199 
200                     waitable_join(rx, 0);
201                     stream_drop_readable(rx);
202 
203                     if tx.is_none() {
204                         waitable_set_drop(*set);
205 
206                         CALLBACK_CODE_EXIT
207                     } else {
208                         CALLBACK_CODE_WAIT | (*set << 4)
209                     }
210                 } else if event0 == EVENT_STREAM_WRITE {
211                     let tx = tx.take().unwrap();
212                     assert_eq!(event1, tx);
213                     assert_eq!(event2, 0);
214 
215                     // The reader is ready now, so this write should not block.
216                     //
217                     // As noted above, we rely on the reader accepting all the
218                     // expected bytes at once.
219                     let status = stream_write(tx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
220                     assert_eq!(
221                         status,
222                         DROPPED | u32::try_from(BYTES_TO_WRITE.len() << 4).unwrap()
223                     );
224 
225                     waitable_join(tx, 0);
226                     stream_drop_writable(tx);
227 
228                     if rx.is_none() {
229                         waitable_set_drop(*set);
230 
231                         CALLBACK_CODE_EXIT
232                     } else {
233                         CALLBACK_CODE_WAIT | (*set << 4)
234                     }
235                 } else {
236                     unreachable!()
237                 }
238             }
239         }
240     }
241 }
242 
243 // Unused function; required since this file is built as a `bin`:
main()244 fn main() {}
245