1 use crate::wasi::http::{outgoing_handler, types as http_types};
2 use crate::wasi::io::streams;
3 use anyhow::{Result, anyhow};
4 use std::fmt;
5
6 pub struct Response {
7 pub status: http_types::StatusCode,
8 pub headers: Vec<(String, Vec<u8>)>,
9 pub body: Vec<u8>,
10 }
11 impl fmt::Debug for Response {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13 let mut out = f.debug_struct("Response");
14 out.field("status", &self.status)
15 .field("headers", &self.headers);
16 if let Ok(body) = std::str::from_utf8(&self.body) {
17 out.field("body", &body);
18 } else {
19 out.field("body", &self.body);
20 }
21 out.finish()
22 }
23 }
24
25 impl Response {
header(&self, name: &str) -> Option<&Vec<u8>>26 pub fn header(&self, name: &str) -> Option<&Vec<u8>> {
27 self.headers
28 .iter()
29 .find_map(|(k, v)| if k == name { Some(v) } else { None })
30 }
31 }
32
request( method: http_types::Method, scheme: http_types::Scheme, authority: &str, path_with_query: &str, body: Option<&[u8]>, additional_headers: Option<&[(String, Vec<u8>)]>, connect_timeout: Option<u64>, first_by_timeout: Option<u64>, between_bytes_timeout: Option<u64>, ) -> Result<Response>33 pub fn request(
34 method: http_types::Method,
35 scheme: http_types::Scheme,
36 authority: &str,
37 path_with_query: &str,
38 body: Option<&[u8]>,
39 additional_headers: Option<&[(String, Vec<u8>)]>,
40 connect_timeout: Option<u64>,
41 first_by_timeout: Option<u64>,
42 between_bytes_timeout: Option<u64>,
43 ) -> Result<Response> {
44 fn header_val(v: &str) -> Vec<u8> {
45 v.to_string().into_bytes()
46 }
47 let headers = http_types::Headers::from_list(
48 &[
49 &[
50 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),
51 ("Content-type".to_string(), header_val("application/json")),
52 ],
53 additional_headers.unwrap_or(&[]),
54 ]
55 .concat(),
56 )?;
57
58 let request = http_types::OutgoingRequest::new(headers);
59
60 request
61 .set_method(&method)
62 .map_err(|()| anyhow!("failed to set method"))?;
63 request
64 .set_scheme(Some(&scheme))
65 .map_err(|()| anyhow!("failed to set scheme"))?;
66 request
67 .set_authority(Some(authority))
68 .map_err(|()| anyhow!("failed to set authority"))?;
69 request
70 .set_path_with_query(Some(&path_with_query))
71 .map_err(|()| anyhow!("failed to set path_with_query"))?;
72
73 let outgoing_body = request
74 .body()
75 .map_err(|_| anyhow!("outgoing request write failed"))?;
76
77 let options = http_types::RequestOptions::new();
78 options
79 .set_connect_timeout(connect_timeout)
80 .map_err(|()| anyhow!("failed to set connect_timeout"))?;
81 options
82 .set_first_byte_timeout(first_by_timeout)
83 .map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
84 options
85 .set_between_bytes_timeout(between_bytes_timeout)
86 .map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
87 let options = Some(options);
88
89 let future_response = outgoing_handler::handle(request, options)?;
90
91 if let Some(mut buf) = body {
92 let request_body = outgoing_body
93 .write()
94 .map_err(|_| anyhow!("outgoing request write failed"))?;
95
96 let pollable = request_body.subscribe();
97 while !buf.is_empty() {
98 pollable.block();
99
100 let permit = match request_body.check_write() {
101 Ok(n) => n,
102 Err(_) => anyhow::bail!("output stream error"),
103 };
104
105 let len = buf.len().min(permit as usize);
106 let (chunk, rest) = buf.split_at(len);
107 buf = rest;
108
109 match request_body.write(chunk) {
110 Err(_) => anyhow::bail!("output stream error"),
111 _ => {}
112 }
113 }
114
115 match request_body.flush() {
116 Err(_) => anyhow::bail!("output stream error"),
117 _ => {}
118 }
119
120 pollable.block();
121
122 match request_body.check_write() {
123 Ok(_) => {}
124 Err(_) => anyhow::bail!("output stream error"),
125 };
126 }
127 http_types::OutgoingBody::finish(outgoing_body, None)?;
128
129 let incoming_response = match future_response.get() {
130 Some(result) => result.map_err(|()| anyhow!("response already taken"))?,
131 None => {
132 let pollable = future_response.subscribe();
133 pollable.block();
134 future_response
135 .get()
136 .expect("incoming response available")
137 .map_err(|()| anyhow!("response already taken"))?
138 }
139 }?;
140
141 drop(future_response);
142
143 let status = incoming_response.status();
144
145 let headers_handle = incoming_response.headers();
146 let headers = headers_handle.entries();
147 drop(headers_handle);
148
149 let incoming_body = incoming_response
150 .consume()
151 .map_err(|()| anyhow!("incoming response has no body stream"))?;
152
153 drop(incoming_response);
154
155 let input_stream = incoming_body.stream().unwrap();
156 let input_stream_pollable = input_stream.subscribe();
157
158 let mut body = Vec::new();
159 loop {
160 input_stream_pollable.block();
161
162 let mut body_chunk = match input_stream.read(1024 * 1024) {
163 Ok(c) => c,
164 Err(streams::StreamError::Closed) => break,
165 Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?,
166 };
167
168 if !body_chunk.is_empty() {
169 body.append(&mut body_chunk);
170 }
171 }
172
173 Ok(Response {
174 status,
175 headers,
176 body,
177 })
178 }
179