1 use crate::bindings::wasi::io::{error, poll, streams}; 2 use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe}; 3 use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult}; 4 use alloc::collections::BTreeMap; 5 use alloc::string::String; 6 use alloc::vec::Vec; 7 use bytes::Bytes; 8 use core::future::Future; 9 use core::pin::Pin; 10 use core::task::{Context, Poll}; 11 use wasmtime::component::{Resource, ResourceTable}; 12 use wasmtime::{Result, format_err}; 13 14 impl poll::Host for ResourceTable { poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>>15 async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> { 16 type ReadylistIndex = u32; 17 18 if pollables.is_empty() { 19 return Err(format_err!("empty poll list")); 20 } 21 22 let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new(); 23 24 for (ix, p) in pollables.iter().enumerate() { 25 let ix: u32 = ix.try_into()?; 26 27 let pollable = self.get(p)?; 28 let (_, list) = table_futures 29 .entry(pollable.index) 30 .or_insert((pollable.make_future, Vec::new())); 31 list.push(ix); 32 } 33 34 let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new(); 35 for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) { 36 let entry = entry?; 37 futures.push((make_future(entry), readylist_indices)); 38 } 39 40 struct PollList<'a> { 41 futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>, 42 } 43 impl<'a> Future for PollList<'a> { 44 type Output = Vec<u32>; 45 46 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 47 let mut any_ready = false; 48 let mut results = Vec::new(); 49 for (fut, readylist_indices) in self.futures.iter_mut() { 50 match fut.as_mut().poll(cx) { 51 Poll::Ready(()) => { 52 results.extend_from_slice(readylist_indices); 53 any_ready = true; 54 } 55 Poll::Pending => {} 56 } 57 } 58 if any_ready { 59 Poll::Ready(results) 60 } else { 61 Poll::Pending 62 } 63 } 64 } 65 66 Ok(PollList { futures }.await) 67 } 68 } 69 70 impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable { block(&mut self, pollable: Resource<DynPollable>) -> Result<()>71 async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> { 72 let pollable = self.get(&pollable)?; 73 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?); 74 ready.await; 75 Ok(()) 76 } ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool>77 async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> { 78 let pollable = self.get(&pollable)?; 79 let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?); 80 futures::pin_mut!(ready); 81 Ok(matches!( 82 futures::future::poll_immediate(ready).await, 83 Some(()) 84 )) 85 } drop(&mut self, pollable: Resource<DynPollable>) -> Result<()>86 fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> { 87 let pollable = self.delete(pollable)?; 88 if let Some(delete) = pollable.remove_index_on_delete { 89 delete(self, pollable.index)?; 90 } 91 Ok(()) 92 } 93 } 94 95 impl error::Host for ResourceTable {} 96 97 impl streams::Host for ResourceTable { convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError>98 fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> { 99 match err { 100 StreamError::Closed => Ok(streams::StreamError::Closed), 101 StreamError::LastOperationFailed(e) => { 102 Ok(streams::StreamError::LastOperationFailed(self.push(e)?)) 103 } 104 StreamError::Trap(e) => Err(e), 105 } 106 } 107 } 108 109 impl error::HostError for ResourceTable { drop(&mut self, err: Resource<streams::Error>) -> Result<()>110 fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> { 111 self.delete(err)?; 112 Ok(()) 113 } 114 to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String>115 fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> { 116 Ok(alloc::format!("{:?}", self.get(&err)?)) 117 } 118 } 119 120 impl streams::HostOutputStream for ResourceTable { drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()>121 async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> { 122 self.delete(stream)?.cancel().await; 123 Ok(()) 124 } 125 check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64>126 fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> { 127 let bytes = self.get_mut(&stream)?.check_write()?; 128 Ok(bytes as u64) 129 } 130 write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()>131 fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> { 132 self.get_mut(&stream)?.write(bytes.into())?; 133 Ok(()) 134 } 135 subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>>136 fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> { 137 subscribe(self, stream) 138 } 139 blocking_write_and_flush( &mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>, ) -> StreamResult<()>140 async fn blocking_write_and_flush( 141 &mut self, 142 stream: Resource<DynOutputStream>, 143 bytes: Vec<u8>, 144 ) -> StreamResult<()> { 145 if bytes.len() > 4096 { 146 return Err(StreamError::trap( 147 "Buffer too large for blocking-write-and-flush (expected at most 4096)", 148 )); 149 } 150 151 self.get_mut(&stream)? 152 .blocking_write_and_flush(bytes.into()) 153 .await 154 } 155 blocking_write_zeroes_and_flush( &mut self, stream: Resource<DynOutputStream>, len: u64, ) -> StreamResult<()>156 async fn blocking_write_zeroes_and_flush( 157 &mut self, 158 stream: Resource<DynOutputStream>, 159 len: u64, 160 ) -> StreamResult<()> { 161 if len > 4096 { 162 return Err(StreamError::trap( 163 "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)", 164 )); 165 } 166 167 // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write 168 // repeatedly from a 'static buffer of zeros. 169 let bs = Bytes::from_iter(core::iter::repeat(0).take(len as usize)); 170 self.get_mut(&stream)?.blocking_write_and_flush(bs).await 171 } 172 write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()>173 fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> { 174 self.get_mut(&stream)?.write_zeroes(len as usize)?; 175 Ok(()) 176 } 177 flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()>178 fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> { 179 self.get_mut(&stream)?.flush()?; 180 Ok(()) 181 } 182 blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()>183 async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> { 184 let s = self.get_mut(&stream)?; 185 s.flush()?; 186 s.write_ready().await?; 187 Ok(()) 188 } 189 splice( &mut self, dest: Resource<DynOutputStream>, src: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>190 fn splice( 191 &mut self, 192 dest: Resource<DynOutputStream>, 193 src: Resource<DynInputStream>, 194 len: u64, 195 ) -> StreamResult<u64> { 196 let len = len.try_into().unwrap_or(usize::MAX); 197 198 let permit = { 199 let output = self.get_mut(&dest)?; 200 output.check_write()? 201 }; 202 let len = len.min(permit); 203 if len == 0 { 204 return Ok(0); 205 } 206 207 let contents = self.get_mut(&src)?.read(len)?; 208 209 let len = contents.len(); 210 if len == 0 { 211 return Ok(0); 212 } 213 214 let output = self.get_mut(&dest)?; 215 output.write(contents)?; 216 Ok(len.try_into().expect("usize can fit in u64")) 217 } 218 blocking_splice( &mut self, dest: Resource<DynOutputStream>, src: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>219 async fn blocking_splice( 220 &mut self, 221 dest: Resource<DynOutputStream>, 222 src: Resource<DynInputStream>, 223 len: u64, 224 ) -> StreamResult<u64> { 225 let len = len.try_into().unwrap_or(usize::MAX); 226 227 let permit = { 228 let output = self.get_mut(&dest)?; 229 output.write_ready().await? 230 }; 231 let len = len.min(permit); 232 if len == 0 { 233 return Ok(0); 234 } 235 236 let contents = self.get_mut(&src)?.blocking_read(len).await?; 237 238 let len = contents.len(); 239 if len == 0 { 240 return Ok(0); 241 } 242 243 let output = self.get_mut(&dest)?; 244 output.blocking_write_and_flush(contents).await?; 245 Ok(len.try_into().expect("usize can fit in u64")) 246 } 247 } 248 249 impl streams::HostInputStream for ResourceTable { drop(&mut self, stream: Resource<DynInputStream>) -> Result<()>250 async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> { 251 self.delete(stream)?.cancel().await; 252 Ok(()) 253 } 254 read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>>255 fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> { 256 let len = len.try_into().unwrap_or(usize::MAX); 257 let bytes = self.get_mut(&stream)?.read(len)?; 258 debug_assert!(bytes.len() <= len); 259 Ok(bytes.into()) 260 } 261 blocking_read( &mut self, stream: Resource<DynInputStream>, len: u64, ) -> StreamResult<Vec<u8>>262 async fn blocking_read( 263 &mut self, 264 stream: Resource<DynInputStream>, 265 len: u64, 266 ) -> StreamResult<Vec<u8>> { 267 let len = len.try_into().unwrap_or(usize::MAX); 268 let bytes = self.get_mut(&stream)?.blocking_read(len).await?; 269 debug_assert!(bytes.len() <= len); 270 Ok(bytes.into()) 271 } 272 skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64>273 fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> { 274 let len = len.try_into().unwrap_or(usize::MAX); 275 let written = self.get_mut(&stream)?.skip(len)?; 276 Ok(written.try_into().expect("usize always fits in u64")) 277 } 278 blocking_skip( &mut self, stream: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>279 async fn blocking_skip( 280 &mut self, 281 stream: Resource<DynInputStream>, 282 len: u64, 283 ) -> StreamResult<u64> { 284 let len = len.try_into().unwrap_or(usize::MAX); 285 let written = self.get_mut(&stream)?.blocking_skip(len).await?; 286 Ok(written.try_into().expect("usize always fits in u64")) 287 } 288 subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>>289 fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> { 290 crate::poll::subscribe(self, stream) 291 } 292 } 293