1 mod bindings { 2 wit_bindgen::generate!({ 3 path: "../misc/component-async-tests/wit", 4 world: "readiness-guest", 5 }); 6 } 7 8 use { 9 std::{mem, ptr}, 10 test_programs::async_::{ 11 BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_WAIT, DROPPED, EVENT_NONE, EVENT_STREAM_READ, 12 EVENT_STREAM_WRITE, context_get, context_set, waitable_join, waitable_set_drop, 13 waitable_set_new, 14 }, 15 }; 16 17 #[cfg(target_arch = "wasm32")] 18 #[link(wasm_import_module = "[export]local:local/readiness")] 19 unsafe extern "C" { 20 #[link_name = "[task-return]start"] 21 fn task_return_start(_: u32, _: *const u8, _: usize); 22 } 23 #[cfg(not(target_arch = "wasm32"))] 24 unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize) { 25 unreachable!() 26 } 27 28 #[cfg(target_arch = "wasm32")] 29 #[link(wasm_import_module = "[export]local:local/readiness")] 30 unsafe extern "C" { 31 #[link_name = "[stream-new-0]start"] 32 fn stream_new() -> u64; 33 } 34 #[cfg(not(target_arch = "wasm32"))] 35 unsafe extern "C" fn stream_new() -> u64 { 36 unreachable!() 37 } 38 39 #[cfg(target_arch = "wasm32")] 40 #[link(wasm_import_module = "[export]local:local/readiness")] 41 unsafe extern "C" { 42 #[link_name = "[async-lower][stream-write-0]start"] 43 fn stream_write(_: u32, _: *const u8, _: usize) -> u32; 44 } 45 #[cfg(not(target_arch = "wasm32"))] 46 unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 { 47 unreachable!() 48 } 49 50 #[cfg(target_arch = "wasm32")] 51 #[link(wasm_import_module = "[export]local:local/readiness")] 52 unsafe extern "C" { 53 #[link_name = "[async-lower][stream-read-0]start"] 54 fn stream_read(_: u32, _: *mut u8, _: usize) -> u32; 55 } 56 #[cfg(not(target_arch = "wasm32"))] 57 unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 { 58 unreachable!() 59 } 60 61 #[cfg(target_arch = "wasm32")] 62 #[link(wasm_import_module = "[export]local:local/readiness")] 63 unsafe extern "C" { 64 #[link_name = "[stream-drop-readable-0]start"] 65 fn stream_drop_readable(_: u32); 66 } 67 #[cfg(not(target_arch = "wasm32"))] 68 unsafe extern "C" fn stream_drop_readable(_: u32) { 69 unreachable!() 70 } 71 72 #[cfg(target_arch = "wasm32")] 73 #[link(wasm_import_module = "[export]local:local/readiness")] 74 unsafe extern "C" { 75 #[link_name = "[stream-drop-writable-0]start"] 76 fn stream_drop_writable(_: u32); 77 } 78 #[cfg(not(target_arch = "wasm32"))] 79 unsafe extern "C" fn stream_drop_writable(_: u32) { 80 unreachable!() 81 } 82 83 static BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11]; 84 85 enum State { 86 S0 { 87 rx: u32, 88 expected: Vec<u8>, 89 }, 90 S1 { 91 set: u32, 92 tx: Option<u32>, 93 rx: Option<u32>, 94 expected: Vec<u8>, 95 }, 96 } 97 98 #[unsafe(export_name = "[async-lift]local:local/readiness#start")] 99 unsafe extern "C" fn export_start(rx: u32, expected: u32, expected_len: u32) -> u32 { 100 let expected_len = usize::try_from(expected_len).unwrap(); 101 102 unsafe { 103 context_set( 104 u32::try_from(Box::into_raw(Box::new(State::S0 { 105 rx, 106 expected: Vec::from_raw_parts( 107 expected as usize as *mut u8, 108 expected_len, 109 expected_len, 110 ), 111 })) as usize) 112 .unwrap(), 113 ); 114 115 callback_start(EVENT_NONE, 0, 0) 116 } 117 } 118 119 #[unsafe(export_name = "[callback][async-lift]local:local/readiness#start")] 120 unsafe extern "C" fn callback_start(event0: u32, event1: u32, event2: u32) -> u32 { 121 unsafe { 122 let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State); 123 match state { 124 State::S0 { rx, expected } => { 125 assert_eq!(event0, EVENT_NONE); 126 127 // Do a zero-length read to wait until the writer is ready. 128 // 129 // Here we assume specific behavior from the writer, namely: 130 // 131 // - It is not immediately ready to send us anything. 132 // 133 // - When it _is_ ready, it will send us all the bytes it told us to 134 // expect at once. 135 let status = stream_read(*rx, ptr::null_mut(), 0); 136 assert_eq!(status, BLOCKED); 137 138 let set = waitable_set_new(); 139 140 waitable_join(*rx, set); 141 142 let tx = { 143 let pair = stream_new(); 144 let tx = u32::try_from(pair >> 32).unwrap(); 145 let rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap(); 146 147 // Do a zero-length write to wait until the reader is ready. 148 // 149 // Here we assume specific behavior from the reader, namely: 150 // 151 // - It is not immediately ready to receive anything (indeed, it 152 // can't possibly be ready given that we haven't returned the 153 // read handle to it yet). 154 // 155 // - When it _is_ ready, it will accept all the bytes we told it 156 // to expect at once. 157 let status = stream_write(tx, ptr::null(), 0); 158 assert_eq!(status, BLOCKED); 159 160 waitable_join(tx, set); 161 162 task_return_start(rx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len()); 163 164 tx 165 }; 166 167 *state = State::S1 { 168 set, 169 tx: Some(tx), 170 rx: Some(*rx), 171 expected: mem::take(expected), 172 }; 173 174 CALLBACK_CODE_WAIT | (set << 4) 175 } 176 177 State::S1 { 178 set, 179 tx, 180 rx, 181 expected, 182 } => { 183 if event0 == EVENT_STREAM_READ { 184 let rx = rx.take().unwrap(); 185 assert_eq!(event1, rx); 186 assert_eq!(event2, 0); 187 188 // The writer is ready now, so this read should not block. 189 // 190 // As noted above, we rely on the writer sending us all the 191 // expected bytes at once. 192 let received = &mut vec![0_u8; expected.len()]; 193 let status = stream_read(rx, received.as_mut_ptr(), received.len()); 194 assert_eq!( 195 status, 196 DROPPED | u32::try_from(received.len() << 4).unwrap() 197 ); 198 assert_eq!(received, expected); 199 200 waitable_join(rx, 0); 201 stream_drop_readable(rx); 202 203 if tx.is_none() { 204 waitable_set_drop(*set); 205 206 CALLBACK_CODE_EXIT 207 } else { 208 CALLBACK_CODE_WAIT | (*set << 4) 209 } 210 } else if event0 == EVENT_STREAM_WRITE { 211 let tx = tx.take().unwrap(); 212 assert_eq!(event1, tx); 213 assert_eq!(event2, 0); 214 215 // The reader is ready now, so this write should not block. 216 // 217 // As noted above, we rely on the reader accepting all the 218 // expected bytes at once. 219 let status = stream_write(tx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len()); 220 assert_eq!( 221 status, 222 DROPPED | u32::try_from(BYTES_TO_WRITE.len() << 4).unwrap() 223 ); 224 225 waitable_join(tx, 0); 226 stream_drop_writable(tx); 227 228 if rx.is_none() { 229 waitable_set_drop(*set); 230 231 CALLBACK_CODE_EXIT 232 } else { 233 CALLBACK_CODE_WAIT | (*set << 4) 234 } 235 } else { 236 unreachable!() 237 } 238 } 239 } 240 } 241 } 242 243 // Unused function; required since this file is built as a `bin`: 244 fn main() {} 245