xref: /wasmtime-44.0.1/crates/wasi-io/src/impls.rs (revision ab78bd82)
1 use crate::bindings::wasi::io::{error, poll, streams};
2 use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3 use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4 use alloc::collections::BTreeMap;
5 use alloc::string::String;
6 use alloc::vec::Vec;
7 use bytes::Bytes;
8 use core::future::Future;
9 use core::pin::Pin;
10 use core::task::{Context, Poll};
11 use wasmtime::component::{Resource, ResourceTable};
12 use wasmtime::{Result, format_err};
13 
14 impl poll::Host for ResourceTable {
poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>>15     async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
16         type ReadylistIndex = u32;
17 
18         if pollables.is_empty() {
19             return Err(format_err!("empty poll list"));
20         }
21 
22         let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
23 
24         for (ix, p) in pollables.iter().enumerate() {
25             let ix: u32 = ix.try_into()?;
26 
27             let pollable = self.get(p)?;
28             let (_, list) = table_futures
29                 .entry(pollable.index)
30                 .or_insert((pollable.make_future, Vec::new()));
31             list.push(ix);
32         }
33 
34         let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
35         for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
36             let entry = entry?;
37             futures.push((make_future(entry), readylist_indices));
38         }
39 
40         struct PollList<'a> {
41             futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
42         }
43         impl<'a> Future for PollList<'a> {
44             type Output = Vec<u32>;
45 
46             fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47                 let mut any_ready = false;
48                 let mut results = Vec::new();
49                 for (fut, readylist_indices) in self.futures.iter_mut() {
50                     match fut.as_mut().poll(cx) {
51                         Poll::Ready(()) => {
52                             results.extend_from_slice(readylist_indices);
53                             any_ready = true;
54                         }
55                         Poll::Pending => {}
56                     }
57                 }
58                 if any_ready {
59                     Poll::Ready(results)
60                 } else {
61                     Poll::Pending
62                 }
63             }
64         }
65 
66         Ok(PollList { futures }.await)
67     }
68 }
69 
70 impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
block(&mut self, pollable: Resource<DynPollable>) -> Result<()>71     async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
72         let pollable = self.get(&pollable)?;
73         let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
74         ready.await;
75         Ok(())
76     }
ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool>77     async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
78         let pollable = self.get(&pollable)?;
79         let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
80         futures::pin_mut!(ready);
81         Ok(matches!(
82             futures::future::poll_immediate(ready).await,
83             Some(())
84         ))
85     }
drop(&mut self, pollable: Resource<DynPollable>) -> Result<()>86     fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
87         let pollable = self.delete(pollable)?;
88         if let Some(delete) = pollable.remove_index_on_delete {
89             delete(self, pollable.index)?;
90         }
91         Ok(())
92     }
93 }
94 
95 impl error::Host for ResourceTable {}
96 
97 impl streams::Host for ResourceTable {
convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError>98     fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
99         match err {
100             StreamError::Closed => Ok(streams::StreamError::Closed),
101             StreamError::LastOperationFailed(e) => {
102                 Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
103             }
104             StreamError::Trap(e) => Err(e),
105         }
106     }
107 }
108 
109 impl error::HostError for ResourceTable {
drop(&mut self, err: Resource<streams::Error>) -> Result<()>110     fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
111         self.delete(err)?;
112         Ok(())
113     }
114 
to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String>115     fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
116         Ok(alloc::format!("{:?}", self.get(&err)?))
117     }
118 }
119 
120 impl streams::HostOutputStream for ResourceTable {
drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()>121     async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
122         self.delete(stream)?.cancel().await;
123         Ok(())
124     }
125 
check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64>126     fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
127         let bytes = self.get_mut(&stream)?.check_write()?;
128         Ok(bytes as u64)
129     }
130 
write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()>131     fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
132         self.get_mut(&stream)?.write(bytes.into())?;
133         Ok(())
134     }
135 
subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>>136     fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
137         subscribe(self, stream)
138     }
139 
blocking_write_and_flush( &mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>, ) -> StreamResult<()>140     async fn blocking_write_and_flush(
141         &mut self,
142         stream: Resource<DynOutputStream>,
143         bytes: Vec<u8>,
144     ) -> StreamResult<()> {
145         if bytes.len() > 4096 {
146             return Err(StreamError::trap(
147                 "Buffer too large for blocking-write-and-flush (expected at most 4096)",
148             ));
149         }
150 
151         self.get_mut(&stream)?
152             .blocking_write_and_flush(bytes.into())
153             .await
154     }
155 
blocking_write_zeroes_and_flush( &mut self, stream: Resource<DynOutputStream>, len: u64, ) -> StreamResult<()>156     async fn blocking_write_zeroes_and_flush(
157         &mut self,
158         stream: Resource<DynOutputStream>,
159         len: u64,
160     ) -> StreamResult<()> {
161         if len > 4096 {
162             return Err(StreamError::trap(
163                 "Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
164             ));
165         }
166 
167         // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
168         // repeatedly from a 'static buffer of zeros.
169         let bs = Bytes::from_iter(core::iter::repeat(0).take(len as usize));
170         self.get_mut(&stream)?.blocking_write_and_flush(bs).await
171     }
172 
write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()>173     fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
174         self.get_mut(&stream)?.write_zeroes(len as usize)?;
175         Ok(())
176     }
177 
flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()>178     fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
179         self.get_mut(&stream)?.flush()?;
180         Ok(())
181     }
182 
blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()>183     async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
184         let s = self.get_mut(&stream)?;
185         s.flush()?;
186         s.write_ready().await?;
187         Ok(())
188     }
189 
splice( &mut self, dest: Resource<DynOutputStream>, src: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>190     fn splice(
191         &mut self,
192         dest: Resource<DynOutputStream>,
193         src: Resource<DynInputStream>,
194         len: u64,
195     ) -> StreamResult<u64> {
196         let len = len.try_into().unwrap_or(usize::MAX);
197 
198         let permit = {
199             let output = self.get_mut(&dest)?;
200             output.check_write()?
201         };
202         let len = len.min(permit);
203         if len == 0 {
204             return Ok(0);
205         }
206 
207         let contents = self.get_mut(&src)?.read(len)?;
208 
209         let len = contents.len();
210         if len == 0 {
211             return Ok(0);
212         }
213 
214         let output = self.get_mut(&dest)?;
215         output.write(contents)?;
216         Ok(len.try_into().expect("usize can fit in u64"))
217     }
218 
blocking_splice( &mut self, dest: Resource<DynOutputStream>, src: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>219     async fn blocking_splice(
220         &mut self,
221         dest: Resource<DynOutputStream>,
222         src: Resource<DynInputStream>,
223         len: u64,
224     ) -> StreamResult<u64> {
225         let len = len.try_into().unwrap_or(usize::MAX);
226 
227         let permit = {
228             let output = self.get_mut(&dest)?;
229             output.write_ready().await?
230         };
231         let len = len.min(permit);
232         if len == 0 {
233             return Ok(0);
234         }
235 
236         let contents = self.get_mut(&src)?.blocking_read(len).await?;
237 
238         let len = contents.len();
239         if len == 0 {
240             return Ok(0);
241         }
242 
243         let output = self.get_mut(&dest)?;
244         output.blocking_write_and_flush(contents).await?;
245         Ok(len.try_into().expect("usize can fit in u64"))
246     }
247 }
248 
249 impl streams::HostInputStream for ResourceTable {
drop(&mut self, stream: Resource<DynInputStream>) -> Result<()>250     async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
251         self.delete(stream)?.cancel().await;
252         Ok(())
253     }
254 
read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>>255     fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
256         let len = len.try_into().unwrap_or(usize::MAX);
257         let bytes = self.get_mut(&stream)?.read(len)?;
258         debug_assert!(bytes.len() <= len);
259         Ok(bytes.into())
260     }
261 
blocking_read( &mut self, stream: Resource<DynInputStream>, len: u64, ) -> StreamResult<Vec<u8>>262     async fn blocking_read(
263         &mut self,
264         stream: Resource<DynInputStream>,
265         len: u64,
266     ) -> StreamResult<Vec<u8>> {
267         let len = len.try_into().unwrap_or(usize::MAX);
268         let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
269         debug_assert!(bytes.len() <= len);
270         Ok(bytes.into())
271     }
272 
skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64>273     fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
274         let len = len.try_into().unwrap_or(usize::MAX);
275         let written = self.get_mut(&stream)?.skip(len)?;
276         Ok(written.try_into().expect("usize always fits in u64"))
277     }
278 
blocking_skip( &mut self, stream: Resource<DynInputStream>, len: u64, ) -> StreamResult<u64>279     async fn blocking_skip(
280         &mut self,
281         stream: Resource<DynInputStream>,
282         len: u64,
283     ) -> StreamResult<u64> {
284         let len = len.try_into().unwrap_or(usize::MAX);
285         let written = self.get_mut(&stream)?.blocking_skip(len).await?;
286         Ok(written.try_into().expect("usize always fits in u64"))
287     }
288 
subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>>289     fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
290         crate::poll::subscribe(self, stream)
291     }
292 }
293