xref: /wasmtime-44.0.1/crates/wasi/src/p3/cli/host.rs (revision da093747)
1 use crate::I32Exit;
2 use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3 use crate::p3::DEFAULT_BUFFER_CAPACITY;
4 use crate::p3::bindings::cli::types::ErrorCode;
5 use crate::p3::bindings::cli::{
6     environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7     terminal_stdin, terminal_stdout,
8 };
9 use crate::p3::cli::{TerminalInput, TerminalOutput};
10 use bytes::BytesMut;
11 use core::pin::Pin;
12 use core::task::{Context, Poll};
13 use std::io::{self, Cursor};
14 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15 use tokio::sync::oneshot;
16 use wasmtime::component::{
17     Access, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18     StreamReader, StreamResult,
19 };
20 use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21 
22 struct InputStreamProducer {
23     rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24     result_tx: Option<oneshot::Sender<ErrorCode>>,
25 }
26 
io_error_to_error_code(err: io::Error) -> ErrorCode27 fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28     match err.kind() {
29         io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30         other => {
31             tracing::warn!("stdio error: {other}");
32             ErrorCode::Io
33         }
34     }
35 }
36 
37 impl<D> StreamProducer<D> for InputStreamProducer {
38     type Item = u8;
39     type Buffer = Cursor<BytesMut>;
40 
poll_produce<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<wasmtime::Result<StreamResult>>41     fn poll_produce<'a>(
42         mut self: Pin<&mut Self>,
43         cx: &mut Context<'_>,
44         mut store: StoreContextMut<'a, D>,
45         dst: Destination<'a, Self::Item, Self::Buffer>,
46         finish: bool,
47     ) -> Poll<wasmtime::Result<StreamResult>> {
48         // If the destination buffer is empty then this is a request on
49         // behalf of the guest to wait for this input stream to be readable.
50         // The `AsyncRead` trait abstraction does not provide the ability to
51         // await this event so we're forced to basically just lie here and
52         // say we're ready read data later.
53         //
54         // See WebAssembly/component-model#561 for some more information.
55         if dst.remaining(store.as_context_mut()) == Some(0) {
56             return Poll::Ready(Ok(StreamResult::Completed));
57         }
58 
59         let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60         let mut buf = ReadBuf::new(dst.remaining());
61         match self.rx.as_mut().poll_read(cx, &mut buf) {
62             Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63                 Poll::Ready(Ok(StreamResult::Dropped))
64             }
65             Poll::Ready(Ok(())) => {
66                 let n = buf.filled().len();
67                 dst.mark_written(n);
68                 Poll::Ready(Ok(StreamResult::Completed))
69             }
70             Poll::Ready(Err(e)) => {
71                 let _ = self
72                     .result_tx
73                     .take()
74                     .unwrap()
75                     .send(io_error_to_error_code(e));
76                 Poll::Ready(Ok(StreamResult::Dropped))
77             }
78             Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79             Poll::Pending => Poll::Pending,
80         }
81     }
82 }
83 
84 struct OutputStreamConsumer {
85     tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86     result_tx: Option<oneshot::Sender<ErrorCode>>,
87 }
88 
89 impl<D> StreamConsumer<D> for OutputStreamConsumer {
90     type Item = u8;
91 
poll_consume( mut self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<D>, src: Source<Self::Item>, finish: bool, ) -> Poll<wasmtime::Result<StreamResult>>92     fn poll_consume(
93         mut self: Pin<&mut Self>,
94         cx: &mut Context<'_>,
95         store: StoreContextMut<D>,
96         src: Source<Self::Item>,
97         finish: bool,
98     ) -> Poll<wasmtime::Result<StreamResult>> {
99         let mut src = src.as_direct(store);
100         let buf = src.remaining();
101 
102         // If the source buffer is empty then this is a request on behalf of
103         // the guest to wait for this output stream to be writable. The
104         // `AsyncWrite` trait abstraction does not provide the ability to await
105         // this event so we're forced to basically just lie here and say we're
106         // ready write data later.
107         //
108         // See WebAssembly/component-model#561 for some more information.
109         if buf.len() == 0 {
110             return Poll::Ready(Ok(StreamResult::Completed));
111         }
112         match self.tx.as_mut().poll_write(cx, buf) {
113             Poll::Ready(Ok(n)) => {
114                 src.mark_read(n);
115                 Poll::Ready(Ok(StreamResult::Completed))
116             }
117             Poll::Ready(Err(e)) => {
118                 let _ = self
119                     .result_tx
120                     .take()
121                     .unwrap()
122                     .send(io_error_to_error_code(e));
123                 Poll::Ready(Ok(StreamResult::Dropped))
124             }
125             Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
126             Poll::Pending => Poll::Pending,
127         }
128     }
129 }
130 
131 impl terminal_input::Host for WasiCliCtxView<'_> {}
132 impl terminal_output::Host for WasiCliCtxView<'_> {}
133 
134 impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()>135     fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
136         self.table
137             .delete(rep)
138             .context("failed to delete terminal input resource from table")?;
139         Ok(())
140     }
141 }
142 
143 impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()>144     fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
145         self.table
146             .delete(rep)
147             .context("failed to delete terminal output resource from table")?;
148         Ok(())
149     }
150 }
151 
152 impl terminal_stdin::Host for WasiCliCtxView<'_> {
get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>>153     fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
154         if self.ctx.stdin.is_terminal() {
155             let fd = self
156                 .table
157                 .push(TerminalInput)
158                 .context("failed to push terminal stdin resource to table")?;
159             Ok(Some(fd))
160         } else {
161             Ok(None)
162         }
163     }
164 }
165 
166 impl terminal_stdout::Host for WasiCliCtxView<'_> {
get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>>167     fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
168         if self.ctx.stdout.is_terminal() {
169             let fd = self
170                 .table
171                 .push(TerminalOutput)
172                 .context("failed to push terminal stdout resource to table")?;
173             Ok(Some(fd))
174         } else {
175             Ok(None)
176         }
177     }
178 }
179 
180 impl terminal_stderr::Host for WasiCliCtxView<'_> {
get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>>181     fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
182         if self.ctx.stderr.is_terminal() {
183             let fd = self
184                 .table
185                 .push(TerminalOutput)
186                 .context("failed to push terminal stderr resource to table")?;
187             Ok(Some(fd))
188         } else {
189             Ok(None)
190         }
191     }
192 }
193 
194 impl stdin::HostWithStore for WasiCli {
read_via_stream<U>( mut store: Access<U, Self>, ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)>195     fn read_via_stream<U>(
196         mut store: Access<U, Self>,
197     ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
198         let rx = store.get().ctx.stdin.async_stream();
199         let (result_tx, result_rx) = oneshot::channel();
200         let stream = StreamReader::new(
201             &mut store,
202             InputStreamProducer {
203                 rx: Box::into_pin(rx),
204                 result_tx: Some(result_tx),
205             },
206         )?;
207         let future = FutureReader::new(&mut store, async {
208             wasmtime::error::Ok(match result_rx.await {
209                 Ok(err) => Err(err),
210                 Err(_) => Ok(()),
211             })
212         })?;
213         Ok((stream, future))
214     }
215 }
216 
217 impl stdin::Host for WasiCliCtxView<'_> {}
218 
219 impl stdout::HostWithStore for WasiCli {
write_via_stream<U>( mut store: Access<'_, U, Self>, data: StreamReader<u8>, ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>>220     fn write_via_stream<U>(
221         mut store: Access<'_, U, Self>,
222         data: StreamReader<u8>,
223     ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
224         let (result_tx, result_rx) = oneshot::channel();
225         let tx = store.get().ctx.stdout.async_stream();
226         data.pipe(
227             &mut store,
228             OutputStreamConsumer {
229                 tx: Box::into_pin(tx),
230                 result_tx: Some(result_tx),
231             },
232         )?;
233         FutureReader::new(&mut store, async {
234             wasmtime::error::Ok(match result_rx.await {
235                 Ok(err) => Err(err),
236                 Err(_) => Ok(()),
237             })
238         })
239     }
240 }
241 
242 impl stdout::Host for WasiCliCtxView<'_> {}
243 
244 impl stderr::HostWithStore for WasiCli {
write_via_stream<U>( mut store: Access<'_, U, Self>, data: StreamReader<u8>, ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>>245     fn write_via_stream<U>(
246         mut store: Access<'_, U, Self>,
247         data: StreamReader<u8>,
248     ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
249         let (result_tx, result_rx) = oneshot::channel();
250         let tx = store.get().ctx.stderr.async_stream();
251         data.pipe(
252             &mut store,
253             OutputStreamConsumer {
254                 tx: Box::into_pin(tx),
255                 result_tx: Some(result_tx),
256             },
257         )?;
258         FutureReader::new(&mut store, async {
259             wasmtime::error::Ok(match result_rx.await {
260                 Ok(err) => Err(err),
261                 Err(_) => Ok(()),
262             })
263         })
264     }
265 }
266 
267 impl stderr::Host for WasiCliCtxView<'_> {}
268 
269 impl environment::Host for WasiCliCtxView<'_> {
get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>>270     fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
271         Ok(self.ctx.environment.clone())
272     }
273 
get_arguments(&mut self) -> wasmtime::Result<Vec<String>>274     fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
275         Ok(self.ctx.arguments.clone())
276     }
277 
get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>>278     fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
279         Ok(self.ctx.initial_cwd.clone())
280     }
281 }
282 
283 impl exit::Host for WasiCliCtxView<'_> {
exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()>284     fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
285         let status = match status {
286             Ok(()) => 0,
287             Err(()) => 1,
288         };
289         Err(format_err!(I32Exit(status)))
290     }
291 
exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()>292     fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
293         Err(format_err!(I32Exit(status_code.into())))
294     }
295 }
296