1 mod bindings {
2 wit_bindgen::generate!({
3 path: "../misc/component-async-tests/wit",
4 world: "readiness-guest",
5 });
6 }
7
8 use {
9 std::{mem, ptr},
10 test_programs::async_::{
11 BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_WAIT, DROPPED, EVENT_NONE, EVENT_STREAM_READ,
12 EVENT_STREAM_WRITE, context_get, context_set, waitable_join, waitable_set_drop,
13 waitable_set_new,
14 },
15 };
16
17 #[cfg(target_arch = "wasm32")]
18 #[link(wasm_import_module = "[export]local:local/readiness")]
19 unsafe extern "C" {
20 #[link_name = "[task-return]start"]
task_return_start(_: u32, _: *const u8, _: usize)21 fn task_return_start(_: u32, _: *const u8, _: usize);
22 }
23 #[cfg(not(target_arch = "wasm32"))]
task_return_start(_: u32, _: *const u8, _: usize)24 unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize) {
25 unreachable!()
26 }
27
28 #[cfg(target_arch = "wasm32")]
29 #[link(wasm_import_module = "[export]local:local/readiness")]
30 unsafe extern "C" {
31 #[link_name = "[stream-new-0]start"]
stream_new() -> u6432 fn stream_new() -> u64;
33 }
34 #[cfg(not(target_arch = "wasm32"))]
stream_new() -> u6435 unsafe extern "C" fn stream_new() -> u64 {
36 unreachable!()
37 }
38
39 #[cfg(target_arch = "wasm32")]
40 #[link(wasm_import_module = "[export]local:local/readiness")]
41 unsafe extern "C" {
42 #[link_name = "[async-lower][stream-write-0]start"]
stream_write(_: u32, _: *const u8, _: usize) -> u3243 fn stream_write(_: u32, _: *const u8, _: usize) -> u32;
44 }
45 #[cfg(not(target_arch = "wasm32"))]
stream_write(_: u32, _: *const u8, _: usize) -> u3246 unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {
47 unreachable!()
48 }
49
50 #[cfg(target_arch = "wasm32")]
51 #[link(wasm_import_module = "[export]local:local/readiness")]
52 unsafe extern "C" {
53 #[link_name = "[async-lower][stream-read-0]start"]
stream_read(_: u32, _: *mut u8, _: usize) -> u3254 fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
55 }
56 #[cfg(not(target_arch = "wasm32"))]
stream_read(_: u32, _: *mut u8, _: usize) -> u3257 unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {
58 unreachable!()
59 }
60
61 #[cfg(target_arch = "wasm32")]
62 #[link(wasm_import_module = "[export]local:local/readiness")]
63 unsafe extern "C" {
64 #[link_name = "[stream-drop-readable-0]start"]
stream_drop_readable(_: u32)65 fn stream_drop_readable(_: u32);
66 }
67 #[cfg(not(target_arch = "wasm32"))]
stream_drop_readable(_: u32)68 unsafe extern "C" fn stream_drop_readable(_: u32) {
69 unreachable!()
70 }
71
72 #[cfg(target_arch = "wasm32")]
73 #[link(wasm_import_module = "[export]local:local/readiness")]
74 unsafe extern "C" {
75 #[link_name = "[stream-drop-writable-0]start"]
stream_drop_writable(_: u32)76 fn stream_drop_writable(_: u32);
77 }
78 #[cfg(not(target_arch = "wasm32"))]
stream_drop_writable(_: u32)79 unsafe extern "C" fn stream_drop_writable(_: u32) {
80 unreachable!()
81 }
82
83 static BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];
84
85 enum State {
86 S0 {
87 rx: u32,
88 expected: Vec<u8>,
89 },
90 S1 {
91 set: u32,
92 tx: Option<u32>,
93 rx: Option<u32>,
94 expected: Vec<u8>,
95 },
96 }
97
98 #[unsafe(export_name = "[async-lift]local:local/readiness#start")]
export_start(rx: u32, expected: u32, expected_len: u32) -> u3299 unsafe extern "C" fn export_start(rx: u32, expected: u32, expected_len: u32) -> u32 {
100 let expected_len = usize::try_from(expected_len).unwrap();
101
102 unsafe {
103 context_set(
104 u32::try_from(Box::into_raw(Box::new(State::S0 {
105 rx,
106 expected: Vec::from_raw_parts(
107 expected as usize as *mut u8,
108 expected_len,
109 expected_len,
110 ),
111 })) as usize)
112 .unwrap(),
113 );
114
115 callback_start(EVENT_NONE, 0, 0)
116 }
117 }
118
119 #[unsafe(export_name = "[callback][async-lift]local:local/readiness#start")]
callback_start(event0: u32, event1: u32, event2: u32) -> u32120 unsafe extern "C" fn callback_start(event0: u32, event1: u32, event2: u32) -> u32 {
121 unsafe {
122 let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);
123 match state {
124 State::S0 { rx, expected } => {
125 assert_eq!(event0, EVENT_NONE);
126
127 // Do a zero-length read to wait until the writer is ready.
128 //
129 // Here we assume specific behavior from the writer, namely:
130 //
131 // - It is not immediately ready to send us anything.
132 //
133 // - When it _is_ ready, it will send us all the bytes it told us to
134 // expect at once.
135 let status = stream_read(*rx, ptr::null_mut(), 0);
136 assert_eq!(status, BLOCKED);
137
138 let set = waitable_set_new();
139
140 waitable_join(*rx, set);
141
142 let tx = {
143 let pair = stream_new();
144 let tx = u32::try_from(pair >> 32).unwrap();
145 let rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
146
147 // Do a zero-length write to wait until the reader is ready.
148 //
149 // Here we assume specific behavior from the reader, namely:
150 //
151 // - It is not immediately ready to receive anything (indeed, it
152 // can't possibly be ready given that we haven't returned the
153 // read handle to it yet).
154 //
155 // - When it _is_ ready, it will accept all the bytes we told it
156 // to expect at once.
157 let status = stream_write(tx, ptr::null(), 0);
158 assert_eq!(status, BLOCKED);
159
160 waitable_join(tx, set);
161
162 task_return_start(rx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
163
164 tx
165 };
166
167 *state = State::S1 {
168 set,
169 tx: Some(tx),
170 rx: Some(*rx),
171 expected: mem::take(expected),
172 };
173
174 CALLBACK_CODE_WAIT | (set << 4)
175 }
176
177 State::S1 {
178 set,
179 tx,
180 rx,
181 expected,
182 } => {
183 if event0 == EVENT_STREAM_READ {
184 let rx = rx.take().unwrap();
185 assert_eq!(event1, rx);
186 assert_eq!(event2, 0);
187
188 // The writer is ready now, so this read should not block.
189 //
190 // As noted above, we rely on the writer sending us all the
191 // expected bytes at once.
192 let received = &mut vec![0_u8; expected.len()];
193 let status = stream_read(rx, received.as_mut_ptr(), received.len());
194 assert_eq!(
195 status,
196 DROPPED | u32::try_from(received.len() << 4).unwrap()
197 );
198 assert_eq!(received, expected);
199
200 waitable_join(rx, 0);
201 stream_drop_readable(rx);
202
203 if tx.is_none() {
204 waitable_set_drop(*set);
205
206 CALLBACK_CODE_EXIT
207 } else {
208 CALLBACK_CODE_WAIT | (*set << 4)
209 }
210 } else if event0 == EVENT_STREAM_WRITE {
211 let tx = tx.take().unwrap();
212 assert_eq!(event1, tx);
213 assert_eq!(event2, 0);
214
215 // The reader is ready now, so this write should not block.
216 //
217 // As noted above, we rely on the reader accepting all the
218 // expected bytes at once.
219 let status = stream_write(tx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
220 assert_eq!(
221 status,
222 DROPPED | u32::try_from(BYTES_TO_WRITE.len() << 4).unwrap()
223 );
224
225 waitable_join(tx, 0);
226 stream_drop_writable(tx);
227
228 if rx.is_none() {
229 waitable_set_drop(*set);
230
231 CALLBACK_CODE_EXIT
232 } else {
233 CALLBACK_CODE_WAIT | (*set << 4)
234 }
235 } else {
236 unreachable!()
237 }
238 }
239 }
240 }
241 }
242
243 // Unused function; required since this file is built as a `bin`:
main()244 fn main() {}
245