1 use crate::I32Exit;
2 use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3 use crate::p3::DEFAULT_BUFFER_CAPACITY;
4 use crate::p3::bindings::cli::types::ErrorCode;
5 use crate::p3::bindings::cli::{
6 environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7 terminal_stdin, terminal_stdout,
8 };
9 use crate::p3::cli::{TerminalInput, TerminalOutput};
10 use bytes::BytesMut;
11 use core::pin::Pin;
12 use core::task::{Context, Poll};
13 use std::io::{self, Cursor};
14 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15 use tokio::sync::oneshot;
16 use wasmtime::component::{
17 Access, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18 StreamReader, StreamResult,
19 };
20 use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22 struct InputStreamProducer {
23 rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24 result_tx: Option<oneshot::Sender<ErrorCode>>,
25 }
26
io_error_to_error_code(err: io::Error) -> ErrorCode27 fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28 match err.kind() {
29 io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30 other => {
31 tracing::warn!("stdio error: {other}");
32 ErrorCode::Io
33 }
34 }
35 }
36
37 impl<D> StreamProducer<D> for InputStreamProducer {
38 type Item = u8;
39 type Buffer = Cursor<BytesMut>;
40
poll_produce<'a>( mut self: Pin<&mut Self>, cx: &mut Context<'_>, mut store: StoreContextMut<'a, D>, dst: Destination<'a, Self::Item, Self::Buffer>, finish: bool, ) -> Poll<wasmtime::Result<StreamResult>>41 fn poll_produce<'a>(
42 mut self: Pin<&mut Self>,
43 cx: &mut Context<'_>,
44 mut store: StoreContextMut<'a, D>,
45 dst: Destination<'a, Self::Item, Self::Buffer>,
46 finish: bool,
47 ) -> Poll<wasmtime::Result<StreamResult>> {
48 // If the destination buffer is empty then this is a request on
49 // behalf of the guest to wait for this input stream to be readable.
50 // The `AsyncRead` trait abstraction does not provide the ability to
51 // await this event so we're forced to basically just lie here and
52 // say we're ready read data later.
53 //
54 // See WebAssembly/component-model#561 for some more information.
55 if dst.remaining(store.as_context_mut()) == Some(0) {
56 return Poll::Ready(Ok(StreamResult::Completed));
57 }
58
59 let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60 let mut buf = ReadBuf::new(dst.remaining());
61 match self.rx.as_mut().poll_read(cx, &mut buf) {
62 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63 Poll::Ready(Ok(StreamResult::Dropped))
64 }
65 Poll::Ready(Ok(())) => {
66 let n = buf.filled().len();
67 dst.mark_written(n);
68 Poll::Ready(Ok(StreamResult::Completed))
69 }
70 Poll::Ready(Err(e)) => {
71 let _ = self
72 .result_tx
73 .take()
74 .unwrap()
75 .send(io_error_to_error_code(e));
76 Poll::Ready(Ok(StreamResult::Dropped))
77 }
78 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79 Poll::Pending => Poll::Pending,
80 }
81 }
82 }
83
84 struct OutputStreamConsumer {
85 tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86 result_tx: Option<oneshot::Sender<ErrorCode>>,
87 }
88
89 impl<D> StreamConsumer<D> for OutputStreamConsumer {
90 type Item = u8;
91
poll_consume( mut self: Pin<&mut Self>, cx: &mut Context<'_>, store: StoreContextMut<D>, src: Source<Self::Item>, finish: bool, ) -> Poll<wasmtime::Result<StreamResult>>92 fn poll_consume(
93 mut self: Pin<&mut Self>,
94 cx: &mut Context<'_>,
95 store: StoreContextMut<D>,
96 src: Source<Self::Item>,
97 finish: bool,
98 ) -> Poll<wasmtime::Result<StreamResult>> {
99 let mut src = src.as_direct(store);
100 let buf = src.remaining();
101
102 // If the source buffer is empty then this is a request on behalf of
103 // the guest to wait for this output stream to be writable. The
104 // `AsyncWrite` trait abstraction does not provide the ability to await
105 // this event so we're forced to basically just lie here and say we're
106 // ready write data later.
107 //
108 // See WebAssembly/component-model#561 for some more information.
109 if buf.len() == 0 {
110 return Poll::Ready(Ok(StreamResult::Completed));
111 }
112 match self.tx.as_mut().poll_write(cx, buf) {
113 Poll::Ready(Ok(n)) => {
114 src.mark_read(n);
115 Poll::Ready(Ok(StreamResult::Completed))
116 }
117 Poll::Ready(Err(e)) => {
118 let _ = self
119 .result_tx
120 .take()
121 .unwrap()
122 .send(io_error_to_error_code(e));
123 Poll::Ready(Ok(StreamResult::Dropped))
124 }
125 Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
126 Poll::Pending => Poll::Pending,
127 }
128 }
129 }
130
131 impl terminal_input::Host for WasiCliCtxView<'_> {}
132 impl terminal_output::Host for WasiCliCtxView<'_> {}
133
134 impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()>135 fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
136 self.table
137 .delete(rep)
138 .context("failed to delete terminal input resource from table")?;
139 Ok(())
140 }
141 }
142
143 impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()>144 fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
145 self.table
146 .delete(rep)
147 .context("failed to delete terminal output resource from table")?;
148 Ok(())
149 }
150 }
151
152 impl terminal_stdin::Host for WasiCliCtxView<'_> {
get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>>153 fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
154 if self.ctx.stdin.is_terminal() {
155 let fd = self
156 .table
157 .push(TerminalInput)
158 .context("failed to push terminal stdin resource to table")?;
159 Ok(Some(fd))
160 } else {
161 Ok(None)
162 }
163 }
164 }
165
166 impl terminal_stdout::Host for WasiCliCtxView<'_> {
get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>>167 fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
168 if self.ctx.stdout.is_terminal() {
169 let fd = self
170 .table
171 .push(TerminalOutput)
172 .context("failed to push terminal stdout resource to table")?;
173 Ok(Some(fd))
174 } else {
175 Ok(None)
176 }
177 }
178 }
179
180 impl terminal_stderr::Host for WasiCliCtxView<'_> {
get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>>181 fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
182 if self.ctx.stderr.is_terminal() {
183 let fd = self
184 .table
185 .push(TerminalOutput)
186 .context("failed to push terminal stderr resource to table")?;
187 Ok(Some(fd))
188 } else {
189 Ok(None)
190 }
191 }
192 }
193
194 impl stdin::HostWithStore for WasiCli {
read_via_stream<U>( mut store: Access<U, Self>, ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)>195 fn read_via_stream<U>(
196 mut store: Access<U, Self>,
197 ) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
198 let rx = store.get().ctx.stdin.async_stream();
199 let (result_tx, result_rx) = oneshot::channel();
200 let stream = StreamReader::new(
201 &mut store,
202 InputStreamProducer {
203 rx: Box::into_pin(rx),
204 result_tx: Some(result_tx),
205 },
206 )?;
207 let future = FutureReader::new(&mut store, async {
208 wasmtime::error::Ok(match result_rx.await {
209 Ok(err) => Err(err),
210 Err(_) => Ok(()),
211 })
212 })?;
213 Ok((stream, future))
214 }
215 }
216
217 impl stdin::Host for WasiCliCtxView<'_> {}
218
219 impl stdout::HostWithStore for WasiCli {
write_via_stream<U>( mut store: Access<'_, U, Self>, data: StreamReader<u8>, ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>>220 fn write_via_stream<U>(
221 mut store: Access<'_, U, Self>,
222 data: StreamReader<u8>,
223 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
224 let (result_tx, result_rx) = oneshot::channel();
225 let tx = store.get().ctx.stdout.async_stream();
226 data.pipe(
227 &mut store,
228 OutputStreamConsumer {
229 tx: Box::into_pin(tx),
230 result_tx: Some(result_tx),
231 },
232 )?;
233 FutureReader::new(&mut store, async {
234 wasmtime::error::Ok(match result_rx.await {
235 Ok(err) => Err(err),
236 Err(_) => Ok(()),
237 })
238 })
239 }
240 }
241
242 impl stdout::Host for WasiCliCtxView<'_> {}
243
244 impl stderr::HostWithStore for WasiCli {
write_via_stream<U>( mut store: Access<'_, U, Self>, data: StreamReader<u8>, ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>>245 fn write_via_stream<U>(
246 mut store: Access<'_, U, Self>,
247 data: StreamReader<u8>,
248 ) -> wasmtime::Result<FutureReader<Result<(), ErrorCode>>> {
249 let (result_tx, result_rx) = oneshot::channel();
250 let tx = store.get().ctx.stderr.async_stream();
251 data.pipe(
252 &mut store,
253 OutputStreamConsumer {
254 tx: Box::into_pin(tx),
255 result_tx: Some(result_tx),
256 },
257 )?;
258 FutureReader::new(&mut store, async {
259 wasmtime::error::Ok(match result_rx.await {
260 Ok(err) => Err(err),
261 Err(_) => Ok(()),
262 })
263 })
264 }
265 }
266
267 impl stderr::Host for WasiCliCtxView<'_> {}
268
269 impl environment::Host for WasiCliCtxView<'_> {
get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>>270 fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
271 Ok(self.ctx.environment.clone())
272 }
273
get_arguments(&mut self) -> wasmtime::Result<Vec<String>>274 fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
275 Ok(self.ctx.arguments.clone())
276 }
277
get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>>278 fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
279 Ok(self.ctx.initial_cwd.clone())
280 }
281 }
282
283 impl exit::Host for WasiCliCtxView<'_> {
exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()>284 fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
285 let status = match status {
286 Ok(()) => 0,
287 Err(()) => 1,
288 };
289 Err(format_err!(I32Exit(status)))
290 }
291
exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()>292 fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
293 Err(format_err!(I32Exit(status_code.into())))
294 }
295 }
296