1 //! Implements the base structure that will provide the implementation of the
2 //! wasi-http API.
3 
4 use crate::FieldMap;
5 use crate::p2::{
6     WasiHttpCtxView, WasiHttpHooks,
7     bindings::http::types::{self, ErrorCode, Method, Scheme},
8     body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
9 };
10 use bytes::Bytes;
11 use http_body_util::BodyExt;
12 use hyper::body::Body;
13 use std::time::Duration;
14 use wasmtime::component::Resource;
15 use wasmtime::{Result, bail};
16 use wasmtime_wasi::p2::Pollable;
17 use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
18 
19 /// Removes forbidden headers from a [`FieldMap`].
remove_forbidden_headers( hooks: &mut dyn WasiHttpHooks, headers: &mut http::HeaderMap, )20 pub(crate) fn remove_forbidden_headers(
21     hooks: &mut dyn WasiHttpHooks,
22     headers: &mut http::HeaderMap,
23 ) {
24     let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
25         if hooks.is_forbidden_header(name) {
26             Some(name.clone())
27         } else {
28             None
29         }
30     }));
31 
32     for name in forbidden_keys {
33         headers.remove(&name);
34     }
35 }
36 
37 /// Configuration for an outgoing request.
38 pub struct OutgoingRequestConfig {
39     /// Whether to use TLS for the request.
40     pub use_tls: bool,
41     /// The timeout for connecting.
42     pub connect_timeout: Duration,
43     /// The timeout until the first byte.
44     pub first_byte_timeout: Duration,
45     /// The timeout between chunks of a streaming body
46     pub between_bytes_timeout: Duration,
47 }
48 
49 impl From<http::Method> for types::Method {
from(method: http::Method) -> Self50     fn from(method: http::Method) -> Self {
51         if method == http::Method::GET {
52             types::Method::Get
53         } else if method == hyper::Method::HEAD {
54             types::Method::Head
55         } else if method == hyper::Method::POST {
56             types::Method::Post
57         } else if method == hyper::Method::PUT {
58             types::Method::Put
59         } else if method == hyper::Method::DELETE {
60             types::Method::Delete
61         } else if method == hyper::Method::CONNECT {
62             types::Method::Connect
63         } else if method == hyper::Method::OPTIONS {
64             types::Method::Options
65         } else if method == hyper::Method::TRACE {
66             types::Method::Trace
67         } else if method == hyper::Method::PATCH {
68             types::Method::Patch
69         } else {
70             types::Method::Other(method.to_string())
71         }
72     }
73 }
74 
75 impl TryInto<http::Method> for types::Method {
76     type Error = http::method::InvalidMethod;
77 
try_into(self) -> Result<http::Method, Self::Error>78     fn try_into(self) -> Result<http::Method, Self::Error> {
79         match self {
80             Method::Get => Ok(http::Method::GET),
81             Method::Head => Ok(http::Method::HEAD),
82             Method::Post => Ok(http::Method::POST),
83             Method::Put => Ok(http::Method::PUT),
84             Method::Delete => Ok(http::Method::DELETE),
85             Method::Connect => Ok(http::Method::CONNECT),
86             Method::Options => Ok(http::Method::OPTIONS),
87             Method::Trace => Ok(http::Method::TRACE),
88             Method::Patch => Ok(http::Method::PATCH),
89             Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
90         }
91     }
92 }
93 
94 /// The concrete type behind a `wasi:http/types.incoming-request` resource.
95 #[derive(Debug)]
96 pub struct HostIncomingRequest {
97     pub(crate) method: http::method::Method,
98     pub(crate) uri: http::uri::Uri,
99     pub(crate) headers: FieldMap,
100     pub(crate) scheme: Scheme,
101     pub(crate) authority: String,
102     /// The body of the incoming request.
103     pub body: Option<HostIncomingBody>,
104 }
105 
106 impl WasiHttpCtxView<'_> {
107     /// Create a new incoming request resource.
new_incoming_request<B>( &mut self, scheme: Scheme, req: hyper::Request<B>, ) -> wasmtime::Result<Resource<HostIncomingRequest>> where B: Body<Data = Bytes> + Send + 'static, B::Error: Into<ErrorCode>,108     pub fn new_incoming_request<B>(
109         &mut self,
110         scheme: Scheme,
111         req: hyper::Request<B>,
112     ) -> wasmtime::Result<Resource<HostIncomingRequest>>
113     where
114         B: Body<Data = Bytes> + Send + 'static,
115         B::Error: Into<ErrorCode>,
116     {
117         let (mut parts, body) = req.into_parts();
118         let body = body.map_err(Into::into).boxed_unsync();
119         let body = HostIncomingBody::new(
120             body,
121             // TODO: this needs to be plumbed through
122             std::time::Duration::from_millis(600 * 1000),
123         );
124         let authority = match parts.uri.authority() {
125             Some(authority) => authority.to_string(),
126             None => match parts.headers.get(http::header::HOST) {
127                 Some(host) => host.to_str()?.to_string(),
128                 None => bail!("invalid HTTP request missing authority in URI and host header"),
129             },
130         };
131 
132         remove_forbidden_headers(self.hooks, &mut parts.headers);
133         let headers = FieldMap::new_immutable(parts.headers);
134 
135         let req = HostIncomingRequest {
136             method: parts.method,
137             uri: parts.uri,
138             headers,
139             authority,
140             scheme,
141             body: Some(body),
142         };
143         Ok(self.table.push(req)?)
144     }
145 }
146 
147 /// The concrete type behind a `wasi:http/types.response-outparam` resource.
148 pub struct HostResponseOutparam {
149     /// The sender for sending a response.
150     pub result:
151         tokio::sync::oneshot::Sender<Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>>,
152 }
153 
154 impl WasiHttpCtxView<'_> {
155     /// Create a new outgoing response resource.
new_response_outparam( &mut self, result: tokio::sync::oneshot::Sender< Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>, >, ) -> wasmtime::Result<Resource<HostResponseOutparam>>156     pub fn new_response_outparam(
157         &mut self,
158         result: tokio::sync::oneshot::Sender<
159             Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
160         >,
161     ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
162         let id = self.table.push(HostResponseOutparam { result })?;
163         Ok(id)
164     }
165 }
166 
167 /// The concrete type behind a `wasi:http/types.outgoing-response` resource.
168 pub struct HostOutgoingResponse {
169     /// The status of the response.
170     pub status: http::StatusCode,
171     /// The headers of the response.
172     pub headers: FieldMap,
173     /// The body of the response.
174     pub body: Option<HyperOutgoingBody>,
175 }
176 
177 impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
178     type Error = http::Error;
179 
try_from( resp: HostOutgoingResponse, ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error>180     fn try_from(
181         resp: HostOutgoingResponse,
182     ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
183         use http_body_util::Empty;
184 
185         let mut builder = hyper::Response::builder().status(resp.status);
186 
187         *builder.headers_mut().unwrap() = resp.headers.into();
188 
189         match resp.body {
190             Some(body) => builder.body(body),
191             None => builder.body(
192                 Empty::<bytes::Bytes>::new()
193                     .map_err(|_| unreachable!("Infallible error"))
194                     .boxed_unsync(),
195             ),
196         }
197     }
198 }
199 
200 /// The concrete type behind a `wasi:http/types.outgoing-request` resource.
201 #[derive(Debug)]
202 pub struct HostOutgoingRequest {
203     /// The method of the request.
204     pub method: Method,
205     /// The scheme of the request.
206     pub scheme: Option<Scheme>,
207     /// The authority of the request.
208     pub authority: Option<String>,
209     /// The path and query of the request.
210     pub path_with_query: Option<String>,
211     /// The request headers.
212     pub headers: FieldMap,
213     /// The request body.
214     pub body: Option<HyperOutgoingBody>,
215 }
216 
217 /// The concrete type behind a `wasi:http/types.request-options` resource.
218 #[derive(Debug, Default)]
219 pub struct HostRequestOptions {
220     /// How long to wait for a connection to be established.
221     pub connect_timeout: Option<std::time::Duration>,
222     /// How long to wait for the first byte of the response body.
223     pub first_byte_timeout: Option<std::time::Duration>,
224     /// How long to wait between frames of the response body.
225     pub between_bytes_timeout: Option<std::time::Duration>,
226 }
227 
228 /// The concrete type behind a `wasi:http/types.incoming-response` resource.
229 #[derive(Debug)]
230 pub struct HostIncomingResponse {
231     /// The response status
232     pub status: u16,
233     /// The response headers
234     pub headers: FieldMap,
235     /// The response body
236     pub body: Option<HostIncomingBody>,
237 }
238 
239 /// A handle to a future incoming response.
240 pub type FutureIncomingResponseHandle =
241     AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
242 
243 /// A response that is in the process of being received.
244 #[derive(Debug)]
245 pub struct IncomingResponse {
246     /// The response itself.
247     pub resp: hyper::Response<HyperIncomingBody>,
248     /// Optional worker task that continues to process the response.
249     pub worker: Option<AbortOnDropJoinHandle<()>>,
250     /// The timeout between chunks of the response.
251     pub between_bytes_timeout: std::time::Duration,
252 }
253 
254 /// The concrete type behind a `wasi:http/types.future-incoming-response` resource.
255 #[derive(Debug)]
256 pub enum HostFutureIncomingResponse {
257     /// A pending response
258     Pending(FutureIncomingResponseHandle),
259     /// The response is ready.
260     ///
261     /// An outer error will trap while the inner error gets returned to the guest.
262     Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
263     /// The response has been consumed.
264     Consumed,
265 }
266 
267 impl HostFutureIncomingResponse {
268     /// Create a new `HostFutureIncomingResponse` that is pending on the provided task handle.
pending(handle: FutureIncomingResponseHandle) -> Self269     pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
270         Self::Pending(handle)
271     }
272 
273     /// Create a new `HostFutureIncomingResponse` that is ready.
ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self274     pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
275         Self::Ready(result)
276     }
277 
278     /// Returns `true` if the response is ready.
is_ready(&self) -> bool279     pub fn is_ready(&self) -> bool {
280         matches!(self, Self::Ready(_))
281     }
282 
283     /// Unwrap the response, panicking if it is not ready.
unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>284     pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
285         match self {
286             Self::Ready(res) => res,
287             Self::Pending(_) | Self::Consumed => {
288                 panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
289             }
290         }
291     }
292 }
293 
294 #[async_trait::async_trait]
295 impl Pollable for HostFutureIncomingResponse {
ready(&mut self)296     async fn ready(&mut self) {
297         if let Self::Pending(handle) = self {
298             *self = Self::Ready(handle.await);
299         }
300     }
301 }
302