1 use crate::wasi::http::{outgoing_handler, types as http_types};
2 use crate::wasi::io::streams;
3 use anyhow::{anyhow, Result};
4 use std::fmt;
5 
6 pub struct Response {
7     pub status: http_types::StatusCode,
8     pub headers: Vec<(String, Vec<u8>)>,
9     pub body: Vec<u8>,
10 }
11 impl fmt::Debug for Response {
12     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13         let mut out = f.debug_struct("Response");
14         out.field("status", &self.status)
15             .field("headers", &self.headers);
16         if let Ok(body) = std::str::from_utf8(&self.body) {
17             out.field("body", &body);
18         } else {
19             out.field("body", &self.body);
20         }
21         out.finish()
22     }
23 }
24 
25 impl Response {
26     pub fn header(&self, name: &str) -> Option<&Vec<u8>> {
27         self.headers
28             .iter()
29             .find_map(|(k, v)| if k == name { Some(v) } else { None })
30     }
31 }
32 
33 pub fn request(
34     method: http_types::Method,
35     scheme: http_types::Scheme,
36     authority: &str,
37     path_with_query: &str,
38     body: Option<&[u8]>,
39     additional_headers: Option<&[(String, Vec<u8>)]>,
40     connect_timeout: Option<u64>,
41     first_by_timeout: Option<u64>,
42     between_bytes_timeout: Option<u64>,
43 ) -> Result<Response> {
44     fn header_val(v: &str) -> Vec<u8> {
45         v.to_string().into_bytes()
46     }
47     let headers = http_types::Headers::from_list(
48         &[
49             &[
50                 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),
51                 ("Content-type".to_string(), header_val("application/json")),
52             ],
53             additional_headers.unwrap_or(&[]),
54         ]
55         .concat(),
56     )?;
57 
58     let request = http_types::OutgoingRequest::new(headers);
59 
60     request
61         .set_method(&method)
62         .map_err(|()| anyhow!("failed to set method"))?;
63     request
64         .set_scheme(Some(&scheme))
65         .map_err(|()| anyhow!("failed to set scheme"))?;
66     request
67         .set_authority(Some(authority))
68         .map_err(|()| anyhow!("failed to set authority"))?;
69     request
70         .set_path_with_query(Some(&path_with_query))
71         .map_err(|()| anyhow!("failed to set path_with_query"))?;
72 
73     let outgoing_body = request
74         .body()
75         .map_err(|_| anyhow!("outgoing request write failed"))?;
76 
77     if let Some(mut buf) = body {
78         let request_body = outgoing_body
79             .write()
80             .map_err(|_| anyhow!("outgoing request write failed"))?;
81 
82         let pollable = request_body.subscribe();
83         while !buf.is_empty() {
84             pollable.block();
85 
86             let permit = match request_body.check_write() {
87                 Ok(n) => n,
88                 Err(_) => anyhow::bail!("output stream error"),
89             };
90 
91             let len = buf.len().min(permit as usize);
92             let (chunk, rest) = buf.split_at(len);
93             buf = rest;
94 
95             match request_body.write(chunk) {
96                 Err(_) => anyhow::bail!("output stream error"),
97                 _ => {}
98             }
99         }
100 
101         match request_body.flush() {
102             Err(_) => anyhow::bail!("output stream error"),
103             _ => {}
104         }
105 
106         pollable.block();
107 
108         match request_body.check_write() {
109             Ok(_) => {}
110             Err(_) => anyhow::bail!("output stream error"),
111         };
112     }
113 
114     let options = http_types::RequestOptions::new();
115     options
116         .set_connect_timeout(connect_timeout)
117         .map_err(|()| anyhow!("failed to set connect_timeout"))?;
118     options
119         .set_first_byte_timeout(first_by_timeout)
120         .map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
121     options
122         .set_between_bytes_timeout(between_bytes_timeout)
123         .map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
124     let options = Some(options);
125 
126     let future_response = outgoing_handler::handle(request, options)?;
127 
128     http_types::OutgoingBody::finish(outgoing_body, None)?;
129 
130     let incoming_response = match future_response.get() {
131         Some(result) => result.map_err(|()| anyhow!("response already taken"))?,
132         None => {
133             let pollable = future_response.subscribe();
134             pollable.block();
135             future_response
136                 .get()
137                 .expect("incoming response available")
138                 .map_err(|()| anyhow!("response already taken"))?
139         }
140     }?;
141 
142     drop(future_response);
143 
144     let status = incoming_response.status();
145 
146     let headers_handle = incoming_response.headers();
147     let headers = headers_handle.entries();
148     drop(headers_handle);
149 
150     let incoming_body = incoming_response
151         .consume()
152         .map_err(|()| anyhow!("incoming response has no body stream"))?;
153 
154     drop(incoming_response);
155 
156     let input_stream = incoming_body.stream().unwrap();
157     let input_stream_pollable = input_stream.subscribe();
158 
159     let mut body = Vec::new();
160     loop {
161         input_stream_pollable.block();
162 
163         let mut body_chunk = match input_stream.read(1024 * 1024) {
164             Ok(c) => c,
165             Err(streams::StreamError::Closed) => break,
166             Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?,
167         };
168 
169         if !body_chunk.is_empty() {
170             body.append(&mut body_chunk);
171         }
172     }
173 
174     Ok(Response {
175         status,
176         headers,
177         body,
178     })
179 }
180