1 use crate::cli::{IsTerminal, StdinStream, StdoutStream}; 2 use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult}; 3 use bytes::Bytes; 4 use std::io::{Read, Write}; 5 use std::pin::Pin; 6 use std::sync::Arc; 7 use std::task::{Context, Poll}; 8 use tokio::io::{self, AsyncRead, AsyncWrite}; 9 10 /// This implementation will yield output streams that block on writes, and 11 /// output directly to a file. If truly async output is required, 12 /// [`AsyncStdoutStream`] should be used instead. 13 /// 14 /// [`AsyncStdoutStream`]: crate::cli::AsyncStdoutStream 15 #[derive(Clone)] 16 pub struct OutputFile { 17 file: Arc<std::fs::File>, 18 } 19 20 impl OutputFile { new(file: std::fs::File) -> Self21 pub fn new(file: std::fs::File) -> Self { 22 Self { 23 file: Arc::new(file), 24 } 25 } 26 } 27 28 impl IsTerminal for OutputFile { is_terminal(&self) -> bool29 fn is_terminal(&self) -> bool { 30 false 31 } 32 } 33 34 impl StdoutStream for OutputFile { p2_stream(&self) -> Box<dyn OutputStream>35 fn p2_stream(&self) -> Box<dyn OutputStream> { 36 Box::new(self.clone()) 37 } 38 async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>39 fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> { 40 Box::new(self.clone()) 41 } 42 } 43 44 #[async_trait::async_trait] 45 impl Pollable for OutputFile { ready(&mut self)46 async fn ready(&mut self) {} 47 } 48 49 impl OutputStream for OutputFile { write(&mut self, bytes: Bytes) -> StreamResult<()>50 fn write(&mut self, bytes: Bytes) -> StreamResult<()> { 51 (&*self.file) 52 .write_all(&bytes) 53 .map_err(|e| StreamError::LastOperationFailed(wasmtime::format_err!(e))) 54 } 55 flush(&mut self) -> StreamResult<()>56 fn flush(&mut self) -> StreamResult<()> { 57 use std::io::Write; 58 self.file 59 .flush() 60 .map_err(|e| StreamError::LastOperationFailed(wasmtime::format_err!(e))) 61 } 62 check_write(&mut self) -> StreamResult<usize>63 fn check_write(&mut self) -> StreamResult<usize> { 64 Ok(1024 * 1024) 65 } 66 } 67 68 impl AsyncWrite for OutputFile { poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>69 fn poll_write( 70 self: Pin<&mut Self>, 71 _cx: &mut Context<'_>, 72 buf: &[u8], 73 ) -> Poll<io::Result<usize>> { 74 match (&*self.file).write_all(buf) { 75 Ok(()) => Poll::Ready(Ok(buf.len())), 76 Err(e) => Poll::Ready(Err(e)), 77 } 78 } poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>79 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 80 Poll::Ready((&*self.file).flush()) 81 } poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>82 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { 83 Poll::Ready(Ok(())) 84 } 85 } 86 87 /// This implementation will yield input streams that block on reads, and 88 /// reads directly from a file. If truly async input is required, 89 /// [`AsyncStdinStream`] should be used instead. 90 /// 91 /// [`AsyncStdinStream`]: crate::cli::AsyncStdinStream 92 #[derive(Clone)] 93 pub struct InputFile { 94 file: Arc<std::fs::File>, 95 } 96 97 impl InputFile { new(file: std::fs::File) -> Self98 pub fn new(file: std::fs::File) -> Self { 99 Self { 100 file: Arc::new(file), 101 } 102 } 103 } 104 105 impl StdinStream for InputFile { p2_stream(&self) -> Box<dyn InputStream>106 fn p2_stream(&self) -> Box<dyn InputStream> { 107 Box::new(self.clone()) 108 } async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>109 fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> { 110 Box::new(self.clone()) 111 } 112 } 113 114 impl IsTerminal for InputFile { is_terminal(&self) -> bool115 fn is_terminal(&self) -> bool { 116 false 117 } 118 } 119 120 #[async_trait::async_trait] 121 impl Pollable for InputFile { ready(&mut self)122 async fn ready(&mut self) {} 123 } 124 125 impl InputStream for InputFile { read(&mut self, size: usize) -> StreamResult<Bytes>126 fn read(&mut self, size: usize) -> StreamResult<Bytes> { 127 let mut buf = bytes::BytesMut::zeroed(size.min(crate::MAX_READ_SIZE_ALLOC)); 128 let bytes_read = self 129 .file 130 .read(&mut buf) 131 .map_err(|e| StreamError::LastOperationFailed(wasmtime::format_err!(e)))?; 132 if bytes_read == 0 { 133 return Err(StreamError::Closed); 134 } 135 buf.truncate(bytes_read); 136 StreamResult::Ok(buf.into()) 137 } 138 } 139 140 impl AsyncRead for InputFile { poll_read( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &mut io::ReadBuf<'_>, ) -> Poll<io::Result<()>>141 fn poll_read( 142 self: Pin<&mut Self>, 143 _cx: &mut Context<'_>, 144 buf: &mut io::ReadBuf<'_>, 145 ) -> Poll<io::Result<()>> { 146 match (&*self.file).read(buf.initialize_unfilled()) { 147 Ok(n) => { 148 buf.advance(n); 149 Poll::Ready(Ok(())) 150 } 151 Err(e) => Poll::Ready(Err(e)), 152 } 153 } 154 } 155