1 mod bindings {
2 wit_bindgen::generate!({
3 path: "../misc/component-async-tests/wit",
4 world: "synchronous-transmit-guest",
5 });
6 }
7
8 use {
9 std::{
10 mem::{self, ManuallyDrop},
11 slice,
12 },
13 test_programs::async_::{
14 BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_YIELD, COMPLETED, DROPPED, EVENT_NONE,
15 context_get, context_set,
16 },
17 };
18
19 #[cfg(target_arch = "wasm32")]
20 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
21 unsafe extern "C" {
22 #[link_name = "[task-return]start"]
task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8)23 fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8);
24 }
25 #[cfg(not(target_arch = "wasm32"))]
task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8)26 unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8) {
27 unreachable!()
28 }
29
30 #[cfg(target_arch = "wasm32")]
31 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
32 unsafe extern "C" {
33 #[link_name = "[stream-new-0]start"]
stream_new() -> u6434 fn stream_new() -> u64;
35 }
36 #[cfg(not(target_arch = "wasm32"))]
stream_new() -> u6437 unsafe extern "C" fn stream_new() -> u64 {
38 unreachable!()
39 }
40
41 #[cfg(target_arch = "wasm32")]
42 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
43 unsafe extern "C" {
44 #[link_name = "[async-lower][stream-write-0]start"]
stream_write(_: u32, _: *const u8, _: usize) -> u3245 fn stream_write(_: u32, _: *const u8, _: usize) -> u32;
46 }
47 #[cfg(not(target_arch = "wasm32"))]
stream_write(_: u32, _: *const u8, _: usize) -> u3248 unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {
49 unreachable!()
50 }
51
52 #[cfg(target_arch = "wasm32")]
53 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
54 unsafe extern "C" {
55 #[link_name = "[async-lower][stream-read-0]start"]
stream_read(_: u32, _: *mut u8, _: usize) -> u3256 fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
57 }
58 #[cfg(not(target_arch = "wasm32"))]
stream_read(_: u32, _: *mut u8, _: usize) -> u3259 unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {
60 unreachable!()
61 }
62
63 #[cfg(target_arch = "wasm32")]
64 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
65 unsafe extern "C" {
66 #[link_name = "[stream-cancel-write-0]start"]
stream_cancel_write(_: u32) -> u3267 fn stream_cancel_write(_: u32) -> u32;
68 }
69 #[cfg(not(target_arch = "wasm32"))]
stream_cancel_write(_: u32) -> u3270 unsafe extern "C" fn stream_cancel_write(_: u32) -> u32 {
71 unreachable!()
72 }
73
74 #[cfg(target_arch = "wasm32")]
75 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
76 unsafe extern "C" {
77 #[link_name = "[stream-cancel-read-0]start"]
stream_cancel_read(_: u32) -> u3278 fn stream_cancel_read(_: u32) -> u32;
79 }
80 #[cfg(not(target_arch = "wasm32"))]
stream_cancel_read(_: u32) -> u3281 unsafe extern "C" fn stream_cancel_read(_: u32) -> u32 {
82 unreachable!()
83 }
84
85 #[cfg(target_arch = "wasm32")]
86 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
87 unsafe extern "C" {
88 #[link_name = "[stream-drop-readable-0]start"]
stream_drop_readable(_: u32)89 fn stream_drop_readable(_: u32);
90 }
91 #[cfg(not(target_arch = "wasm32"))]
stream_drop_readable(_: u32)92 unsafe extern "C" fn stream_drop_readable(_: u32) {
93 unreachable!()
94 }
95
96 #[cfg(target_arch = "wasm32")]
97 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
98 unsafe extern "C" {
99 #[link_name = "[stream-drop-writable-0]start"]
stream_drop_writable(_: u32)100 fn stream_drop_writable(_: u32);
101 }
102 #[cfg(not(target_arch = "wasm32"))]
stream_drop_writable(_: u32)103 unsafe extern "C" fn stream_drop_writable(_: u32) {
104 unreachable!()
105 }
106
107 #[cfg(target_arch = "wasm32")]
108 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
109 unsafe extern "C" {
110 #[link_name = "[future-new-1]start"]
future_new() -> u64111 fn future_new() -> u64;
112 }
113 #[cfg(not(target_arch = "wasm32"))]
future_new() -> u64114 unsafe extern "C" fn future_new() -> u64 {
115 unreachable!()
116 }
117
118 #[cfg(target_arch = "wasm32")]
119 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
120 unsafe extern "C" {
121 #[link_name = "[async-lower][future-write-1]start"]
future_write(_: u32, _: *const u8) -> u32122 fn future_write(_: u32, _: *const u8) -> u32;
123 }
124 #[cfg(not(target_arch = "wasm32"))]
future_write(_: u32, _: *const u8) -> u32125 unsafe extern "C" fn future_write(_: u32, _: *const u8) -> u32 {
126 unreachable!()
127 }
128
129 #[cfg(target_arch = "wasm32")]
130 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
131 unsafe extern "C" {
132 #[link_name = "[async-lower][future-read-1]start"]
future_read(_: u32, _: *mut u8) -> u32133 fn future_read(_: u32, _: *mut u8) -> u32;
134 }
135 #[cfg(not(target_arch = "wasm32"))]
future_read(_: u32, _: *mut u8) -> u32136 unsafe extern "C" fn future_read(_: u32, _: *mut u8) -> u32 {
137 unreachable!()
138 }
139
140 #[cfg(target_arch = "wasm32")]
141 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
142 unsafe extern "C" {
143 #[link_name = "[future-cancel-write-1]start"]
future_cancel_write(_: u32) -> u32144 fn future_cancel_write(_: u32) -> u32;
145 }
146 #[cfg(not(target_arch = "wasm32"))]
future_cancel_write(_: u32) -> u32147 unsafe extern "C" fn future_cancel_write(_: u32) -> u32 {
148 unreachable!()
149 }
150
151 #[cfg(target_arch = "wasm32")]
152 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
153 unsafe extern "C" {
154 #[link_name = "[future-cancel-read-1]start"]
future_cancel_read(_: u32) -> u32155 fn future_cancel_read(_: u32) -> u32;
156 }
157 #[cfg(not(target_arch = "wasm32"))]
future_cancel_read(_: u32) -> u32158 unsafe extern "C" fn future_cancel_read(_: u32) -> u32 {
159 unreachable!()
160 }
161
162 #[cfg(target_arch = "wasm32")]
163 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
164 unsafe extern "C" {
165 #[link_name = "[future-drop-readable-1]start"]
future_drop_readable(_: u32)166 fn future_drop_readable(_: u32);
167 }
168 #[cfg(not(target_arch = "wasm32"))]
future_drop_readable(_: u32)169 unsafe extern "C" fn future_drop_readable(_: u32) {
170 unreachable!()
171 }
172
173 #[cfg(target_arch = "wasm32")]
174 #[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
175 unsafe extern "C" {
176 #[link_name = "[future-drop-writable-1]start"]
future_drop_writable(_: u32)177 fn future_drop_writable(_: u32);
178 }
179 #[cfg(not(target_arch = "wasm32"))]
future_drop_writable(_: u32)180 unsafe extern "C" fn future_drop_writable(_: u32) {
181 unreachable!()
182 }
183
184 static STREAM_BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];
185 static FUTURE_BYTE_TO_WRITE: u8 = 13;
186
187 enum State {
188 S0 {
189 stream: u32,
190 stream_expected: Vec<u8>,
191 future: u32,
192 future_expected: u8,
193 },
194 S1 {
195 stream_read_buffer: *mut u8,
196 stream_tx: u32,
197 stream: u32,
198 stream_expected: Vec<u8>,
199 future_read_buffer: *mut u8,
200 future_tx: u32,
201 future: u32,
202 future_expected: u8,
203 },
204 }
205
206 #[unsafe(export_name = "[async-lift]local:local/synchronous-transmit#start")]
export_start( stream: u32, stream_expected: u32, stream_expected_len: u32, future: u32, future_expected: u8, ) -> u32207 unsafe extern "C" fn export_start(
208 stream: u32,
209 stream_expected: u32,
210 stream_expected_len: u32,
211 future: u32,
212 future_expected: u8,
213 ) -> u32 {
214 let stream_expected_len = usize::try_from(stream_expected_len).unwrap();
215
216 unsafe {
217 context_set(
218 u32::try_from(Box::into_raw(Box::new(State::S0 {
219 stream,
220 stream_expected: Vec::from_raw_parts(
221 stream_expected as usize as *mut u8,
222 stream_expected_len,
223 stream_expected_len,
224 ),
225 future,
226 future_expected,
227 })) as usize)
228 .unwrap(),
229 );
230
231 callback_start(EVENT_NONE, 0, 0)
232 }
233 }
234
235 #[unsafe(export_name = "[callback][async-lift]local:local/synchronous-transmit#start")]
callback_start(event0: u32, _event1: u32, _event2: u32) -> u32236 unsafe extern "C" fn callback_start(event0: u32, _event1: u32, _event2: u32) -> u32 {
237 unsafe {
238 let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);
239 match state {
240 &mut State::S0 {
241 stream,
242 ref mut stream_expected,
243 future,
244 future_expected,
245 } => {
246 assert_eq!(event0, EVENT_NONE);
247
248 // Here we assume specific behavior from the writers, namely:
249 //
250 // - They will not send us anything until after we cancel the
251 // reads, and even then there will be a delay.
252 //
253 // - When they _do_ send, they will send us all the bytes it
254 // told us to expect at once.
255 let stream_read_buffer =
256 ManuallyDrop::new(vec![0_u8; stream_expected.len()]).as_mut_ptr();
257 let status = stream_read(stream, stream_read_buffer, stream_expected.len());
258 assert_eq!(status, BLOCKED);
259
260 let future_read_buffer = Box::into_raw(Box::new(0_u8));
261 let status = future_read(future, future_read_buffer);
262 assert_eq!(status, BLOCKED);
263
264 let pair = stream_new();
265 let stream_tx = u32::try_from(pair >> 32).unwrap();
266 let stream_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
267
268 let pair = future_new();
269 let future_tx = u32::try_from(pair >> 32).unwrap();
270 let future_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
271
272 task_return_start(
273 stream_rx,
274 STREAM_BYTES_TO_WRITE.as_ptr(),
275 STREAM_BYTES_TO_WRITE.len(),
276 future_rx,
277 FUTURE_BYTE_TO_WRITE,
278 );
279
280 // Here we assume specific behavior from the readers, namely:
281 //
282 // - They will not read anything until after we cancel the
283 // write, and even then there will be a delay.
284 //
285 // - When they _do_ read, they will accept all the bytes we told
286 // it to expect at once.
287 let status = stream_write(
288 stream_tx,
289 STREAM_BYTES_TO_WRITE.as_ptr(),
290 STREAM_BYTES_TO_WRITE.len(),
291 );
292 assert_eq!(status, BLOCKED);
293
294 let status = future_write(future_tx, &FUTURE_BYTE_TO_WRITE);
295 assert_eq!(status, BLOCKED);
296
297 *state = State::S1 {
298 stream_read_buffer,
299 stream_tx,
300 stream,
301 stream_expected: mem::take(stream_expected),
302 future_read_buffer,
303 future_tx,
304 future,
305 future_expected,
306 };
307
308 CALLBACK_CODE_YIELD
309 }
310
311 &mut State::S1 {
312 stream_read_buffer,
313 stream_tx,
314 stream,
315 ref mut stream_expected,
316 future_read_buffer,
317 future_tx,
318 future,
319 future_expected,
320 } => {
321 // Now we synchronously cancel everything and expect that the
322 // reads and writes complete.
323
324 let status = stream_cancel_read(stream);
325 assert_eq!(
326 status,
327 DROPPED | u32::try_from(stream_expected.len() << 4).unwrap()
328 );
329 let received = Box::from_raw(slice::from_raw_parts_mut(
330 stream_read_buffer,
331 stream_expected.len(),
332 ));
333 assert_eq!(&received[..], stream_expected);
334 stream_drop_readable(stream);
335
336 let status = stream_cancel_write(stream_tx);
337 assert_eq!(
338 status,
339 DROPPED | u32::try_from(STREAM_BYTES_TO_WRITE.len() << 4).unwrap()
340 );
341 stream_drop_writable(stream_tx);
342
343 let status = future_cancel_read(future);
344 assert_eq!(status, COMPLETED);
345 let received = Box::from_raw(future_read_buffer);
346 assert_eq!(*received, future_expected);
347 future_drop_readable(future);
348
349 let status = future_cancel_write(future_tx);
350 assert_eq!(status, COMPLETED);
351 future_drop_writable(future_tx);
352
353 CALLBACK_CODE_EXIT
354 }
355 }
356 }
357 }
358
359 // Unused function; required since this file is built as a `bin`:
main()360 fn main() {}
361