1 use crate::wasi::http::{outgoing_handler, types as http_types};
2 use crate::wasi::io::poll;
3 use crate::wasi::io::streams;
4 use anyhow::{anyhow, Result};
5 use std::fmt;
6 
7 pub struct Response {
8     pub status: http_types::StatusCode,
9     pub headers: Vec<(String, Vec<u8>)>,
10     pub body: Vec<u8>,
11 }
12 impl fmt::Debug for Response {
13     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
14         let mut out = f.debug_struct("Response");
15         out.field("status", &self.status)
16             .field("headers", &self.headers);
17         if let Ok(body) = std::str::from_utf8(&self.body) {
18             out.field("body", &body);
19         } else {
20             out.field("body", &self.body);
21         }
22         out.finish()
23     }
24 }
25 
26 impl Response {
27     pub fn header(&self, name: &str) -> Option<&Vec<u8>> {
28         self.headers
29             .iter()
30             .find_map(|(k, v)| if k == name { Some(v) } else { None })
31     }
32 }
33 
34 pub fn request(
35     method: http_types::Method,
36     scheme: http_types::Scheme,
37     authority: &str,
38     path_with_query: &str,
39     body: Option<&[u8]>,
40     additional_headers: Option<&[(String, Vec<u8>)]>,
41 ) -> Result<Response> {
42     fn header_val(v: &str) -> Vec<u8> {
43         v.to_string().into_bytes()
44     }
45     let headers = http_types::Headers::from_list(
46         &[
47             &[
48                 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),
49                 ("Content-type".to_string(), header_val("application/json")),
50             ],
51             additional_headers.unwrap_or(&[]),
52         ]
53         .concat(),
54     )?;
55 
56     let request = http_types::OutgoingRequest::new(
57         &method,
58         Some(path_with_query),
59         Some(&scheme),
60         Some(authority),
61         headers,
62     );
63 
64     let outgoing_body = request
65         .body()
66         .map_err(|_| anyhow!("outgoing request write failed"))?;
67 
68     if let Some(mut buf) = body {
69         let request_body = outgoing_body
70             .write()
71             .map_err(|_| anyhow!("outgoing request write failed"))?;
72 
73         let pollable = request_body.subscribe();
74         while !buf.is_empty() {
75             poll::poll_list(&[&pollable]);
76 
77             let permit = match request_body.check_write() {
78                 Ok(n) => n,
79                 Err(_) => anyhow::bail!("output stream error"),
80             };
81 
82             let len = buf.len().min(permit as usize);
83             let (chunk, rest) = buf.split_at(len);
84             buf = rest;
85 
86             match request_body.write(chunk) {
87                 Err(_) => anyhow::bail!("output stream error"),
88                 _ => {}
89             }
90         }
91 
92         match request_body.flush() {
93             Err(_) => anyhow::bail!("output stream error"),
94             _ => {}
95         }
96 
97         poll::poll_list(&[&pollable]);
98 
99         match request_body.check_write() {
100             Ok(_) => {}
101             Err(_) => anyhow::bail!("output stream error"),
102         };
103     }
104 
105     let future_response = outgoing_handler::handle(request, None)?;
106 
107     http_types::OutgoingBody::finish(outgoing_body, None);
108 
109     let incoming_response = match future_response.get() {
110         Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?,
111         None => {
112             let pollable = future_response.subscribe();
113             let _ = poll::poll_list(&[&pollable]);
114             future_response
115                 .get()
116                 .expect("incoming response available")
117                 .map_err(|_| anyhow!("incoming response errored"))?
118         }
119     }
120     // TODO: maybe anything that appears in the Result<_, E> position should impl
121     // Error? anyway, just use its Debug here:
122     .map_err(|e| anyhow!("{e:?}"))?;
123 
124     drop(future_response);
125 
126     let status = incoming_response.status();
127 
128     let headers_handle = incoming_response.headers();
129     let headers = headers_handle.entries();
130     drop(headers_handle);
131 
132     let incoming_body = incoming_response
133         .consume()
134         .map_err(|()| anyhow!("incoming response has no body stream"))?;
135 
136     drop(incoming_response);
137 
138     let input_stream = incoming_body.stream().unwrap();
139     let input_stream_pollable = input_stream.subscribe();
140 
141     let mut body = Vec::new();
142     loop {
143         poll::poll_list(&[&input_stream_pollable]);
144 
145         let mut body_chunk = match input_stream.read(1024 * 1024) {
146             Ok(c) => c,
147             Err(streams::StreamError::Closed) => break,
148             Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?,
149         };
150 
151         if !body_chunk.is_empty() {
152             body.append(&mut body_chunk);
153         }
154     }
155 
156     Ok(Response {
157         status,
158         headers,
159         body,
160     })
161 }
162