xref: /wasmtime-44.0.1/crates/wasi/src/cli.rs (revision bc4582c3)
1 use crate::p2;
2 use std::pin::Pin;
3 use std::sync::Arc;
4 use tokio::io::{AsyncRead, AsyncWrite, empty};
5 use wasmtime::component::{HasData, ResourceTable};
6 use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7 
8 mod empty;
9 mod file;
10 mod locked_async;
11 mod mem;
12 mod stdout;
13 mod worker_thread_stdin;
14 
15 pub use self::file::{InputFile, OutputFile};
16 pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17 
18 // Convenience reexport for stdio types so tokio doesn't have to be imported
19 // itself.
20 #[doc(no_inline)]
21 pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22 
23 /// A helper struct which implements [`HasData`] for the `wasi:cli` APIs.
24 ///
25 /// This can be useful when directly calling `add_to_linker` functions directly,
26 /// such as [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`] as
27 /// the `D` type parameter. See [`HasData`] for more information about the type
28 /// parameter's purpose.
29 ///
30 /// When using this type you can skip the [`WasiCliView`] trait, for
31 /// example.
32 ///
33 /// [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`]: crate::p2::bindings::cli::environment::add_to_linker
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use wasmtime::component::{Linker, ResourceTable};
39 /// use wasmtime::{Engine, Result};
40 /// use wasmtime_wasi::cli::*;
41 ///
42 /// struct MyStoreState {
43 ///     table: ResourceTable,
44 ///     cli: WasiCliCtx,
45 /// }
46 ///
47 /// fn main() -> Result<()> {
48 ///     let engine = Engine::default();
49 ///     let mut linker = Linker::new(&engine);
50 ///
51 ///     wasmtime_wasi::p2::bindings::cli::environment::add_to_linker::<MyStoreState, WasiCli>(
52 ///         &mut linker,
53 ///         |state| WasiCliCtxView {
54 ///             table: &mut state.table,
55 ///             ctx: &mut state.cli,
56 ///         },
57 ///     )?;
58 ///     Ok(())
59 /// }
60 /// ```
61 pub struct WasiCli;
62 
63 impl HasData for WasiCli {
64     type Data<'a> = WasiCliCtxView<'a>;
65 }
66 
67 /// Provides a "view" of `wasi:cli`-related context used to implement host
68 /// traits.
69 pub trait WasiCliView: Send {
cli(&mut self) -> WasiCliCtxView<'_>70     fn cli(&mut self) -> WasiCliCtxView<'_>;
71 }
72 
73 pub struct WasiCliCtxView<'a> {
74     pub ctx: &'a mut WasiCliCtx,
75     pub table: &'a mut ResourceTable,
76 }
77 
78 pub struct WasiCliCtx {
79     pub(crate) environment: Vec<(String, String)>,
80     pub(crate) arguments: Vec<String>,
81     pub(crate) initial_cwd: Option<String>,
82     pub(crate) stdin: Box<dyn StdinStream>,
83     pub(crate) stdout: Box<dyn StdoutStream>,
84     pub(crate) stderr: Box<dyn StdoutStream>,
85 }
86 
87 impl Default for WasiCliCtx {
default() -> WasiCliCtx88     fn default() -> WasiCliCtx {
89         WasiCliCtx {
90             environment: Vec::new(),
91             arguments: Vec::new(),
92             initial_cwd: None,
93             stdin: Box::new(empty()),
94             stdout: Box::new(empty()),
95             stderr: Box::new(empty()),
96         }
97     }
98 }
99 
100 pub trait IsTerminal {
101     /// Returns whether this stream is backed by a TTY.
is_terminal(&self) -> bool102     fn is_terminal(&self) -> bool;
103 }
104 
105 /// A trait used to represent the standard input to a guest program.
106 ///
107 /// Note that there are many built-in implementations of this trait for various
108 /// types such as [`tokio::io::Stdin`], [`tokio::io::Empty`], and
109 /// [`p2::pipe::MemoryInputPipe`].
110 pub trait StdinStream: IsTerminal + Send {
111     /// Creates a fresh stream which is reading stdin.
112     ///
113     /// Note that the returned stream must share state with all other streams
114     /// previously created. Guests may create multiple handles to the same stdin
115     /// and they should all be synchronized in their progress through the
116     /// program's input.
117     ///
118     /// Note that this means that if one handle becomes ready for reading they
119     /// all become ready for reading. Subsequently if one is read from it may
120     /// mean that all the others are no longer ready for reading. This is
121     /// basically a consequence of the way the WIT APIs are designed today.
async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>122     fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
123 
124     /// Same as [`Self::async_stream`] except that a WASIp2 [`InputStream`] is
125     /// returned.
126     ///
127     /// Note that this has a default implementation which uses
128     /// [`p2::pipe::AsyncReadStream`] as an adapter, but this can be overridden
129     /// if there's a more specialized implementation available.
p2_stream(&self) -> Box<dyn InputStream>130     fn p2_stream(&self) -> Box<dyn InputStream> {
131         Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
132             self.async_stream(),
133         )))
134     }
135 }
136 
137 /// Similar to [`StdinStream`], except for output.
138 ///
139 /// This is used both for a guest stdin and a guest stdout.
140 ///
141 /// Note that there are many built-in implementations of this trait for various
142 /// types such as [`tokio::io::Stdout`], [`tokio::io::Empty`], and
143 /// [`p2::pipe::MemoryOutputPipe`].
144 pub trait StdoutStream: IsTerminal + Send {
145     /// Returns a fresh new stream which can write to this output stream.
146     ///
147     /// Note that all output streams should output to the same logical source.
148     /// This means that it's possible for each independent stream to acquire a
149     /// separate "permit" to write and then act on that permit. Note that
150     /// additionally at this time once a permit is "acquired" there's no way to
151     /// release it, for example you can wait for readiness and then never
152     /// actually write in WASI. This means that acquisition of a permit for one
153     /// stream cannot discount the size of a permit another stream could
154     /// obtain.
155     ///
156     /// Implementations must be able to handle this
async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>157     fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
158 
159     /// Same as [`Self::async_stream`] except that a WASIp2 [`OutputStream`] is
160     /// returned.
161     ///
162     /// Note that this has a default implementation which uses
163     /// [`p2::pipe::AsyncWriteStream`] as an adapter, but this can be overridden
164     /// if there's a more specialized implementation available.
p2_stream(&self) -> Box<dyn OutputStream>165     fn p2_stream(&self) -> Box<dyn OutputStream> {
166         Box::new(p2::pipe::AsyncWriteStream::new(
167             8192, // FIXME: extract this to a constant.
168             Pin::from(self.async_stream()),
169         ))
170     }
171 }
172 
173 // Forward `&T => T`
174 impl<T: ?Sized + IsTerminal> IsTerminal for &T {
is_terminal(&self) -> bool175     fn is_terminal(&self) -> bool {
176         T::is_terminal(self)
177     }
178 }
179 impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
p2_stream(&self) -> Box<dyn InputStream>180     fn p2_stream(&self) -> Box<dyn InputStream> {
181         T::p2_stream(self)
182     }
async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>183     fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
184         T::async_stream(self)
185     }
186 }
187 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
p2_stream(&self) -> Box<dyn OutputStream>188     fn p2_stream(&self) -> Box<dyn OutputStream> {
189         T::p2_stream(self)
190     }
async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>191     fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
192         T::async_stream(self)
193     }
194 }
195 
196 // Forward `&mut T => T`
197 impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
is_terminal(&self) -> bool198     fn is_terminal(&self) -> bool {
199         T::is_terminal(self)
200     }
201 }
202 impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
p2_stream(&self) -> Box<dyn InputStream>203     fn p2_stream(&self) -> Box<dyn InputStream> {
204         T::p2_stream(self)
205     }
async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>206     fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
207         T::async_stream(self)
208     }
209 }
210 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
p2_stream(&self) -> Box<dyn OutputStream>211     fn p2_stream(&self) -> Box<dyn OutputStream> {
212         T::p2_stream(self)
213     }
async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>214     fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
215         T::async_stream(self)
216     }
217 }
218 
219 // Forward `Box<T> => T`
220 impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
is_terminal(&self) -> bool221     fn is_terminal(&self) -> bool {
222         T::is_terminal(self)
223     }
224 }
225 impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
p2_stream(&self) -> Box<dyn InputStream>226     fn p2_stream(&self) -> Box<dyn InputStream> {
227         T::p2_stream(self)
228     }
async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>229     fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
230         T::async_stream(self)
231     }
232 }
233 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
p2_stream(&self) -> Box<dyn OutputStream>234     fn p2_stream(&self) -> Box<dyn OutputStream> {
235         T::p2_stream(self)
236     }
async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>237     fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
238         T::async_stream(self)
239     }
240 }
241 
242 // Forward `Arc<T> => T`
243 impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
is_terminal(&self) -> bool244     fn is_terminal(&self) -> bool {
245         T::is_terminal(self)
246     }
247 }
248 impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
p2_stream(&self) -> Box<dyn InputStream>249     fn p2_stream(&self) -> Box<dyn InputStream> {
250         T::p2_stream(self)
251     }
async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>252     fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
253         T::async_stream(self)
254     }
255 }
256 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
p2_stream(&self) -> Box<dyn OutputStream>257     fn p2_stream(&self) -> Box<dyn OutputStream> {
258         T::p2_stream(self)
259     }
async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>260     fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
261         T::async_stream(self)
262     }
263 }
264 
265 #[cfg(test)]
266 mod test {
267     use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
268     use crate::p2::{self, OutputStream};
269     use bytes::Bytes;
270     use tokio::io::AsyncReadExt;
271     use wasmtime::Result;
272 
273     #[test]
memory_stdin_stream()274     fn memory_stdin_stream() {
275         // A StdinStream has the property that there are multiple
276         // InputStreams created, using the stream() method which are each
277         // views on the same shared state underneath. Consuming input on one
278         // stream results in consuming that input on all streams.
279         //
280         // The simplest way to measure this is to check if the MemoryInputPipe
281         // impl of StdinStream follows this property.
282 
283         let pipe =
284             p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
285 
286         let mut view1 = pipe.p2_stream();
287         let mut view2 = pipe.p2_stream();
288 
289         let read1 = view1.read(10).expect("read first 10 bytes");
290         assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
291         let read2 = view2.read(10).expect("read second 10 bytes");
292         assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
293         let read3 = view1.read(10).expect("read third 10 bytes");
294         assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
295         let read4 = view2.read(10).expect("read fourth 10 bytes");
296         assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
297     }
298 
299     #[tokio::test]
async_stdin_stream()300     async fn async_stdin_stream() {
301         // A StdinStream has the property that there are multiple
302         // InputStreams created, using the stream() method which are each
303         // views on the same shared state underneath. Consuming input on one
304         // stream results in consuming that input on all streams.
305         //
306         // AsyncStdinStream is a slightly more complex impl of StdinStream
307         // than the MemoryInputPipe above. We can create an AsyncReadStream
308         // from a file on the disk, and an AsyncStdinStream from that common
309         // stream, then check that the same property holds as above.
310 
311         let dir = tempfile::tempdir().unwrap();
312         let mut path = std::path::PathBuf::from(dir.path());
313         path.push("file");
314         std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
315 
316         let file = tokio::fs::File::open(&path)
317             .await
318             .expect("open created file");
319         let stdin_stream = super::AsyncStdinStream::new(file);
320 
321         use super::StdinStream;
322 
323         let mut view1 = stdin_stream.p2_stream();
324         let mut view2 = stdin_stream.p2_stream();
325 
326         view1.ready().await;
327 
328         let read1 = view1.read(10).expect("read first 10 bytes");
329         assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
330         let read2 = view2.read(10).expect("read second 10 bytes");
331         assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
332         let read3 = view1.read(10).expect("read third 10 bytes");
333         assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
334         let read4 = view2.read(10).expect("read fourth 10 bytes");
335         assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
336     }
337 
338     #[tokio::test]
async_stdout_stream_unblocks()339     async fn async_stdout_stream_unblocks() {
340         let (mut read, write) = tokio::io::duplex(32);
341         let stdout = AsyncStdoutStream::new(32, write);
342 
343         let task = tokio::task::spawn(async move {
344             let mut stream = stdout.p2_stream();
345             blocking_write_and_flush(&mut *stream, "x".into())
346                 .await
347                 .unwrap();
348         });
349 
350         let mut buf = [0; 100];
351         let n = read.read(&mut buf).await.unwrap();
352         assert_eq!(&buf[..n], b"x");
353 
354         task.await.unwrap();
355     }
356 
blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()>357     async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
358         while !bytes.is_empty() {
359             let permit = s.write_ready().await?;
360             let len = bytes.len().min(permit);
361             let chunk = bytes.split_to(len);
362             s.write(chunk)?;
363         }
364 
365         s.flush()?;
366         s.write_ready().await?;
367         Ok(())
368     }
369 }
370