1 use crate::p2; 2 use std::pin::Pin; 3 use std::sync::Arc; 4 use tokio::io::{AsyncRead, AsyncWrite, empty}; 5 use wasmtime::component::{HasData, ResourceTable}; 6 use wasmtime_wasi_io::streams::{InputStream, OutputStream}; 7 8 mod empty; 9 mod file; 10 mod locked_async; 11 mod mem; 12 mod stdout; 13 mod worker_thread_stdin; 14 15 pub use self::file::{InputFile, OutputFile}; 16 pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream}; 17 18 // Convenience reexport for stdio types so tokio doesn't have to be imported 19 // itself. 20 #[doc(no_inline)] 21 pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout}; 22 23 /// A helper struct which implements [`HasData`] for the `wasi:cli` APIs. 24 /// 25 /// This can be useful when directly calling `add_to_linker` functions directly, 26 /// such as [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`] as 27 /// the `D` type parameter. See [`HasData`] for more information about the type 28 /// parameter's purpose. 29 /// 30 /// When using this type you can skip the [`WasiCliView`] trait, for 31 /// example. 32 /// 33 /// [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`]: crate::p2::bindings::cli::environment::add_to_linker 34 /// 35 /// # Examples 36 /// 37 /// ``` 38 /// use wasmtime::component::{Linker, ResourceTable}; 39 /// use wasmtime::{Engine, Result}; 40 /// use wasmtime_wasi::cli::*; 41 /// 42 /// struct MyStoreState { 43 /// table: ResourceTable, 44 /// cli: WasiCliCtx, 45 /// } 46 /// 47 /// fn main() -> Result<()> { 48 /// let engine = Engine::default(); 49 /// let mut linker = Linker::new(&engine); 50 /// 51 /// wasmtime_wasi::p2::bindings::cli::environment::add_to_linker::<MyStoreState, WasiCli>( 52 /// &mut linker, 53 /// |state| WasiCliCtxView { 54 /// table: &mut state.table, 55 /// ctx: &mut state.cli, 56 /// }, 57 /// )?; 58 /// Ok(()) 59 /// } 60 /// ``` 61 pub struct WasiCli; 62 63 impl HasData for WasiCli { 64 type Data<'a> = WasiCliCtxView<'a>; 65 } 66 67 /// Provides a "view" of `wasi:cli`-related context used to implement host 68 /// traits. 69 pub trait WasiCliView: Send { cli(&mut self) -> WasiCliCtxView<'_>70 fn cli(&mut self) -> WasiCliCtxView<'_>; 71 } 72 73 pub struct WasiCliCtxView<'a> { 74 pub ctx: &'a mut WasiCliCtx, 75 pub table: &'a mut ResourceTable, 76 } 77 78 pub struct WasiCliCtx { 79 pub(crate) environment: Vec<(String, String)>, 80 pub(crate) arguments: Vec<String>, 81 pub(crate) initial_cwd: Option<String>, 82 pub(crate) stdin: Box<dyn StdinStream>, 83 pub(crate) stdout: Box<dyn StdoutStream>, 84 pub(crate) stderr: Box<dyn StdoutStream>, 85 } 86 87 impl Default for WasiCliCtx { default() -> WasiCliCtx88 fn default() -> WasiCliCtx { 89 WasiCliCtx { 90 environment: Vec::new(), 91 arguments: Vec::new(), 92 initial_cwd: None, 93 stdin: Box::new(empty()), 94 stdout: Box::new(empty()), 95 stderr: Box::new(empty()), 96 } 97 } 98 } 99 100 pub trait IsTerminal { 101 /// Returns whether this stream is backed by a TTY. is_terminal(&self) -> bool102 fn is_terminal(&self) -> bool; 103 } 104 105 /// A trait used to represent the standard input to a guest program. 106 /// 107 /// Note that there are many built-in implementations of this trait for various 108 /// types such as [`tokio::io::Stdin`], [`tokio::io::Empty`], and 109 /// [`p2::pipe::MemoryInputPipe`]. 110 pub trait StdinStream: IsTerminal + Send { 111 /// Creates a fresh stream which is reading stdin. 112 /// 113 /// Note that the returned stream must share state with all other streams 114 /// previously created. Guests may create multiple handles to the same stdin 115 /// and they should all be synchronized in their progress through the 116 /// program's input. 117 /// 118 /// Note that this means that if one handle becomes ready for reading they 119 /// all become ready for reading. Subsequently if one is read from it may 120 /// mean that all the others are no longer ready for reading. This is 121 /// basically a consequence of the way the WIT APIs are designed today. async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>122 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>; 123 124 /// Same as [`Self::async_stream`] except that a WASIp2 [`InputStream`] is 125 /// returned. 126 /// 127 /// Note that this has a default implementation which uses 128 /// [`p2::pipe::AsyncReadStream`] as an adapter, but this can be overridden 129 /// if there's a more specialized implementation available. p2_stream(&self) -> Box<dyn InputStream>130 fn p2_stream(&self) -> Box<dyn InputStream> { 131 Box::new(p2::pipe::AsyncReadStream::new(Pin::from( 132 self.async_stream(), 133 ))) 134 } 135 } 136 137 /// Similar to [`StdinStream`], except for output. 138 /// 139 /// This is used both for a guest stdin and a guest stdout. 140 /// 141 /// Note that there are many built-in implementations of this trait for various 142 /// types such as [`tokio::io::Stdout`], [`tokio::io::Empty`], and 143 /// [`p2::pipe::MemoryOutputPipe`]. 144 pub trait StdoutStream: IsTerminal + Send { 145 /// Returns a fresh new stream which can write to this output stream. 146 /// 147 /// Note that all output streams should output to the same logical source. 148 /// This means that it's possible for each independent stream to acquire a 149 /// separate "permit" to write and then act on that permit. Note that 150 /// additionally at this time once a permit is "acquired" there's no way to 151 /// release it, for example you can wait for readiness and then never 152 /// actually write in WASI. This means that acquisition of a permit for one 153 /// stream cannot discount the size of a permit another stream could 154 /// obtain. 155 /// 156 /// Implementations must be able to handle this async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>157 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>; 158 159 /// Same as [`Self::async_stream`] except that a WASIp2 [`OutputStream`] is 160 /// returned. 161 /// 162 /// Note that this has a default implementation which uses 163 /// [`p2::pipe::AsyncWriteStream`] as an adapter, but this can be overridden 164 /// if there's a more specialized implementation available. p2_stream(&self) -> Box<dyn OutputStream>165 fn p2_stream(&self) -> Box<dyn OutputStream> { 166 Box::new(p2::pipe::AsyncWriteStream::new( 167 8192, // FIXME: extract this to a constant. 168 Pin::from(self.async_stream()), 169 )) 170 } 171 } 172 173 // Forward `&T => T` 174 impl<T: ?Sized + IsTerminal> IsTerminal for &T { is_terminal(&self) -> bool175 fn is_terminal(&self) -> bool { 176 T::is_terminal(self) 177 } 178 } 179 impl<T: ?Sized + StdinStream + Sync> StdinStream for &T { p2_stream(&self) -> Box<dyn InputStream>180 fn p2_stream(&self) -> Box<dyn InputStream> { 181 T::p2_stream(self) 182 } async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>183 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> { 184 T::async_stream(self) 185 } 186 } 187 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T { p2_stream(&self) -> Box<dyn OutputStream>188 fn p2_stream(&self) -> Box<dyn OutputStream> { 189 T::p2_stream(self) 190 } async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>191 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> { 192 T::async_stream(self) 193 } 194 } 195 196 // Forward `&mut T => T` 197 impl<T: ?Sized + IsTerminal> IsTerminal for &mut T { is_terminal(&self) -> bool198 fn is_terminal(&self) -> bool { 199 T::is_terminal(self) 200 } 201 } 202 impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T { p2_stream(&self) -> Box<dyn InputStream>203 fn p2_stream(&self) -> Box<dyn InputStream> { 204 T::p2_stream(self) 205 } async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>206 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> { 207 T::async_stream(self) 208 } 209 } 210 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T { p2_stream(&self) -> Box<dyn OutputStream>211 fn p2_stream(&self) -> Box<dyn OutputStream> { 212 T::p2_stream(self) 213 } async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>214 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> { 215 T::async_stream(self) 216 } 217 } 218 219 // Forward `Box<T> => T` 220 impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> { is_terminal(&self) -> bool221 fn is_terminal(&self) -> bool { 222 T::is_terminal(self) 223 } 224 } 225 impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> { p2_stream(&self) -> Box<dyn InputStream>226 fn p2_stream(&self) -> Box<dyn InputStream> { 227 T::p2_stream(self) 228 } async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>229 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> { 230 T::async_stream(self) 231 } 232 } 233 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> { p2_stream(&self) -> Box<dyn OutputStream>234 fn p2_stream(&self) -> Box<dyn OutputStream> { 235 T::p2_stream(self) 236 } async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>237 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> { 238 T::async_stream(self) 239 } 240 } 241 242 // Forward `Arc<T> => T` 243 impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> { is_terminal(&self) -> bool244 fn is_terminal(&self) -> bool { 245 T::is_terminal(self) 246 } 247 } 248 impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> { p2_stream(&self) -> Box<dyn InputStream>249 fn p2_stream(&self) -> Box<dyn InputStream> { 250 T::p2_stream(self) 251 } async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>252 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> { 253 T::async_stream(self) 254 } 255 } 256 impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> { p2_stream(&self) -> Box<dyn OutputStream>257 fn p2_stream(&self) -> Box<dyn OutputStream> { 258 T::p2_stream(self) 259 } async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>260 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> { 261 T::async_stream(self) 262 } 263 } 264 265 #[cfg(test)] 266 mod test { 267 use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream}; 268 use crate::p2::{self, OutputStream}; 269 use bytes::Bytes; 270 use tokio::io::AsyncReadExt; 271 use wasmtime::Result; 272 273 #[test] memory_stdin_stream()274 fn memory_stdin_stream() { 275 // A StdinStream has the property that there are multiple 276 // InputStreams created, using the stream() method which are each 277 // views on the same shared state underneath. Consuming input on one 278 // stream results in consuming that input on all streams. 279 // 280 // The simplest way to measure this is to check if the MemoryInputPipe 281 // impl of StdinStream follows this property. 282 283 let pipe = 284 p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs"); 285 286 let mut view1 = pipe.p2_stream(); 287 let mut view2 = pipe.p2_stream(); 288 289 let read1 = view1.read(10).expect("read first 10 bytes"); 290 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes"); 291 let read2 = view2.read(10).expect("read second 10 bytes"); 292 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes"); 293 let read3 = view1.read(10).expect("read third 10 bytes"); 294 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes"); 295 let read4 = view2.read(10).expect("read fourth 10 bytes"); 296 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes"); 297 } 298 299 #[tokio::test] async_stdin_stream()300 async fn async_stdin_stream() { 301 // A StdinStream has the property that there are multiple 302 // InputStreams created, using the stream() method which are each 303 // views on the same shared state underneath. Consuming input on one 304 // stream results in consuming that input on all streams. 305 // 306 // AsyncStdinStream is a slightly more complex impl of StdinStream 307 // than the MemoryInputPipe above. We can create an AsyncReadStream 308 // from a file on the disk, and an AsyncStdinStream from that common 309 // stream, then check that the same property holds as above. 310 311 let dir = tempfile::tempdir().unwrap(); 312 let mut path = std::path::PathBuf::from(dir.path()); 313 path.push("file"); 314 std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap(); 315 316 let file = tokio::fs::File::open(&path) 317 .await 318 .expect("open created file"); 319 let stdin_stream = super::AsyncStdinStream::new(file); 320 321 use super::StdinStream; 322 323 let mut view1 = stdin_stream.p2_stream(); 324 let mut view2 = stdin_stream.p2_stream(); 325 326 view1.ready().await; 327 328 let read1 = view1.read(10).expect("read first 10 bytes"); 329 assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes"); 330 let read2 = view2.read(10).expect("read second 10 bytes"); 331 assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes"); 332 let read3 = view1.read(10).expect("read third 10 bytes"); 333 assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes"); 334 let read4 = view2.read(10).expect("read fourth 10 bytes"); 335 assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes"); 336 } 337 338 #[tokio::test] async_stdout_stream_unblocks()339 async fn async_stdout_stream_unblocks() { 340 let (mut read, write) = tokio::io::duplex(32); 341 let stdout = AsyncStdoutStream::new(32, write); 342 343 let task = tokio::task::spawn(async move { 344 let mut stream = stdout.p2_stream(); 345 blocking_write_and_flush(&mut *stream, "x".into()) 346 .await 347 .unwrap(); 348 }); 349 350 let mut buf = [0; 100]; 351 let n = read.read(&mut buf).await.unwrap(); 352 assert_eq!(&buf[..n], b"x"); 353 354 task.await.unwrap(); 355 } 356 blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()>357 async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> { 358 while !bytes.is_empty() { 359 let permit = s.write_ready().await?; 360 let len = bytes.len().min(permit); 361 let chunk = bytes.split_to(len); 362 s.write(chunk)?; 363 } 364 365 s.flush()?; 366 s.write_ready().await?; 367 Ok(()) 368 } 369 } 370