1 //! Implements the base structure that will provide the implementation of the
2 //! wasi-http API.
3
4 use crate::FieldMap;
5 use crate::p2::{
6 WasiHttpCtxView, WasiHttpHooks,
7 bindings::http::types::{self, ErrorCode, Method, Scheme},
8 body::{HostIncomingBody, HyperIncomingBody, HyperOutgoingBody},
9 };
10 use bytes::Bytes;
11 use http_body_util::BodyExt;
12 use hyper::body::Body;
13 use std::time::Duration;
14 use wasmtime::component::Resource;
15 use wasmtime::{Result, bail};
16 use wasmtime_wasi::p2::Pollable;
17 use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
18
19 /// Removes forbidden headers from a [`FieldMap`].
remove_forbidden_headers( hooks: &mut dyn WasiHttpHooks, headers: &mut http::HeaderMap, )20 pub(crate) fn remove_forbidden_headers(
21 hooks: &mut dyn WasiHttpHooks,
22 headers: &mut http::HeaderMap,
23 ) {
24 let forbidden_keys = Vec::from_iter(headers.keys().filter_map(|name| {
25 if hooks.is_forbidden_header(name) {
26 Some(name.clone())
27 } else {
28 None
29 }
30 }));
31
32 for name in forbidden_keys {
33 headers.remove(&name);
34 }
35 }
36
37 /// Configuration for an outgoing request.
38 pub struct OutgoingRequestConfig {
39 /// Whether to use TLS for the request.
40 pub use_tls: bool,
41 /// The timeout for connecting.
42 pub connect_timeout: Duration,
43 /// The timeout until the first byte.
44 pub first_byte_timeout: Duration,
45 /// The timeout between chunks of a streaming body
46 pub between_bytes_timeout: Duration,
47 }
48
49 impl From<http::Method> for types::Method {
from(method: http::Method) -> Self50 fn from(method: http::Method) -> Self {
51 if method == http::Method::GET {
52 types::Method::Get
53 } else if method == hyper::Method::HEAD {
54 types::Method::Head
55 } else if method == hyper::Method::POST {
56 types::Method::Post
57 } else if method == hyper::Method::PUT {
58 types::Method::Put
59 } else if method == hyper::Method::DELETE {
60 types::Method::Delete
61 } else if method == hyper::Method::CONNECT {
62 types::Method::Connect
63 } else if method == hyper::Method::OPTIONS {
64 types::Method::Options
65 } else if method == hyper::Method::TRACE {
66 types::Method::Trace
67 } else if method == hyper::Method::PATCH {
68 types::Method::Patch
69 } else {
70 types::Method::Other(method.to_string())
71 }
72 }
73 }
74
75 impl TryInto<http::Method> for types::Method {
76 type Error = http::method::InvalidMethod;
77
try_into(self) -> Result<http::Method, Self::Error>78 fn try_into(self) -> Result<http::Method, Self::Error> {
79 match self {
80 Method::Get => Ok(http::Method::GET),
81 Method::Head => Ok(http::Method::HEAD),
82 Method::Post => Ok(http::Method::POST),
83 Method::Put => Ok(http::Method::PUT),
84 Method::Delete => Ok(http::Method::DELETE),
85 Method::Connect => Ok(http::Method::CONNECT),
86 Method::Options => Ok(http::Method::OPTIONS),
87 Method::Trace => Ok(http::Method::TRACE),
88 Method::Patch => Ok(http::Method::PATCH),
89 Method::Other(s) => http::Method::from_bytes(s.as_bytes()),
90 }
91 }
92 }
93
94 /// The concrete type behind a `wasi:http/types.incoming-request` resource.
95 #[derive(Debug)]
96 pub struct HostIncomingRequest {
97 pub(crate) method: http::method::Method,
98 pub(crate) uri: http::uri::Uri,
99 pub(crate) headers: FieldMap,
100 pub(crate) scheme: Scheme,
101 pub(crate) authority: String,
102 /// The body of the incoming request.
103 pub body: Option<HostIncomingBody>,
104 }
105
106 impl WasiHttpCtxView<'_> {
107 /// Create a new incoming request resource.
new_incoming_request<B>( &mut self, scheme: Scheme, req: hyper::Request<B>, ) -> wasmtime::Result<Resource<HostIncomingRequest>> where B: Body<Data = Bytes> + Send + 'static, B::Error: Into<ErrorCode>,108 pub fn new_incoming_request<B>(
109 &mut self,
110 scheme: Scheme,
111 req: hyper::Request<B>,
112 ) -> wasmtime::Result<Resource<HostIncomingRequest>>
113 where
114 B: Body<Data = Bytes> + Send + 'static,
115 B::Error: Into<ErrorCode>,
116 {
117 let (mut parts, body) = req.into_parts();
118 let body = body.map_err(Into::into).boxed_unsync();
119 let body = HostIncomingBody::new(
120 body,
121 // TODO: this needs to be plumbed through
122 std::time::Duration::from_millis(600 * 1000),
123 );
124 let authority = match parts.uri.authority() {
125 Some(authority) => authority.to_string(),
126 None => match parts.headers.get(http::header::HOST) {
127 Some(host) => host.to_str()?.to_string(),
128 None => bail!("invalid HTTP request missing authority in URI and host header"),
129 },
130 };
131
132 remove_forbidden_headers(self.hooks, &mut parts.headers);
133 let headers = FieldMap::new_immutable(parts.headers);
134
135 let req = HostIncomingRequest {
136 method: parts.method,
137 uri: parts.uri,
138 headers,
139 authority,
140 scheme,
141 body: Some(body),
142 };
143 Ok(self.table.push(req)?)
144 }
145 }
146
147 /// The concrete type behind a `wasi:http/types.response-outparam` resource.
148 pub struct HostResponseOutparam {
149 /// The sender for sending a response.
150 pub result:
151 tokio::sync::oneshot::Sender<Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>>,
152 }
153
154 impl WasiHttpCtxView<'_> {
155 /// Create a new outgoing response resource.
new_response_outparam( &mut self, result: tokio::sync::oneshot::Sender< Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>, >, ) -> wasmtime::Result<Resource<HostResponseOutparam>>156 pub fn new_response_outparam(
157 &mut self,
158 result: tokio::sync::oneshot::Sender<
159 Result<hyper::Response<HyperOutgoingBody>, types::ErrorCode>,
160 >,
161 ) -> wasmtime::Result<Resource<HostResponseOutparam>> {
162 let id = self.table.push(HostResponseOutparam { result })?;
163 Ok(id)
164 }
165 }
166
167 /// The concrete type behind a `wasi:http/types.outgoing-response` resource.
168 pub struct HostOutgoingResponse {
169 /// The status of the response.
170 pub status: http::StatusCode,
171 /// The headers of the response.
172 pub headers: FieldMap,
173 /// The body of the response.
174 pub body: Option<HyperOutgoingBody>,
175 }
176
177 impl TryFrom<HostOutgoingResponse> for hyper::Response<HyperOutgoingBody> {
178 type Error = http::Error;
179
try_from( resp: HostOutgoingResponse, ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error>180 fn try_from(
181 resp: HostOutgoingResponse,
182 ) -> Result<hyper::Response<HyperOutgoingBody>, Self::Error> {
183 use http_body_util::Empty;
184
185 let mut builder = hyper::Response::builder().status(resp.status);
186
187 *builder.headers_mut().unwrap() = resp.headers.into();
188
189 match resp.body {
190 Some(body) => builder.body(body),
191 None => builder.body(
192 Empty::<bytes::Bytes>::new()
193 .map_err(|_| unreachable!("Infallible error"))
194 .boxed_unsync(),
195 ),
196 }
197 }
198 }
199
200 /// The concrete type behind a `wasi:http/types.outgoing-request` resource.
201 #[derive(Debug)]
202 pub struct HostOutgoingRequest {
203 /// The method of the request.
204 pub method: Method,
205 /// The scheme of the request.
206 pub scheme: Option<Scheme>,
207 /// The authority of the request.
208 pub authority: Option<String>,
209 /// The path and query of the request.
210 pub path_with_query: Option<String>,
211 /// The request headers.
212 pub headers: FieldMap,
213 /// The request body.
214 pub body: Option<HyperOutgoingBody>,
215 }
216
217 /// The concrete type behind a `wasi:http/types.request-options` resource.
218 #[derive(Debug, Default)]
219 pub struct HostRequestOptions {
220 /// How long to wait for a connection to be established.
221 pub connect_timeout: Option<std::time::Duration>,
222 /// How long to wait for the first byte of the response body.
223 pub first_byte_timeout: Option<std::time::Duration>,
224 /// How long to wait between frames of the response body.
225 pub between_bytes_timeout: Option<std::time::Duration>,
226 }
227
228 /// The concrete type behind a `wasi:http/types.incoming-response` resource.
229 #[derive(Debug)]
230 pub struct HostIncomingResponse {
231 /// The response status
232 pub status: u16,
233 /// The response headers
234 pub headers: FieldMap,
235 /// The response body
236 pub body: Option<HostIncomingBody>,
237 }
238
239 /// A handle to a future incoming response.
240 pub type FutureIncomingResponseHandle =
241 AbortOnDropJoinHandle<wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>>;
242
243 /// A response that is in the process of being received.
244 #[derive(Debug)]
245 pub struct IncomingResponse {
246 /// The response itself.
247 pub resp: hyper::Response<HyperIncomingBody>,
248 /// Optional worker task that continues to process the response.
249 pub worker: Option<AbortOnDropJoinHandle<()>>,
250 /// The timeout between chunks of the response.
251 pub between_bytes_timeout: std::time::Duration,
252 }
253
254 /// The concrete type behind a `wasi:http/types.future-incoming-response` resource.
255 #[derive(Debug)]
256 pub enum HostFutureIncomingResponse {
257 /// A pending response
258 Pending(FutureIncomingResponseHandle),
259 /// The response is ready.
260 ///
261 /// An outer error will trap while the inner error gets returned to the guest.
262 Ready(wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>),
263 /// The response has been consumed.
264 Consumed,
265 }
266
267 impl HostFutureIncomingResponse {
268 /// Create a new `HostFutureIncomingResponse` that is pending on the provided task handle.
pending(handle: FutureIncomingResponseHandle) -> Self269 pub fn pending(handle: FutureIncomingResponseHandle) -> Self {
270 Self::Pending(handle)
271 }
272
273 /// Create a new `HostFutureIncomingResponse` that is ready.
ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self274 pub fn ready(result: wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>) -> Self {
275 Self::Ready(result)
276 }
277
278 /// Returns `true` if the response is ready.
is_ready(&self) -> bool279 pub fn is_ready(&self) -> bool {
280 matches!(self, Self::Ready(_))
281 }
282
283 /// Unwrap the response, panicking if it is not ready.
unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>>284 pub fn unwrap_ready(self) -> wasmtime::Result<Result<IncomingResponse, types::ErrorCode>> {
285 match self {
286 Self::Ready(res) => res,
287 Self::Pending(_) | Self::Consumed => {
288 panic!("unwrap_ready called on a pending HostFutureIncomingResponse")
289 }
290 }
291 }
292 }
293
294 #[async_trait::async_trait]
295 impl Pollable for HostFutureIncomingResponse {
ready(&mut self)296 async fn ready(&mut self) {
297 if let Self::Pending(handle) = self {
298 *self = Self::Ready(handle.await);
299 }
300 }
301 }
302