1 use anyhow::{Context as _, Result, anyhow};
2 use core::fmt;
3 use futures::join;
4 
5 use crate::p3::wasi::http::{handler, types};
6 use crate::p3::{wit_future, wit_stream};
7 
8 pub struct Response {
9     pub status: types::StatusCode,
10     pub headers: Vec<(String, Vec<u8>)>,
11     pub body: Vec<u8>,
12     pub trailers: Option<Vec<(String, Vec<u8>)>>,
13 }
14 impl fmt::Debug for Response {
15     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16         let mut out = f.debug_struct("Response");
17         out.field("status", &self.status)
18             .field("headers", &self.headers);
19         if let Ok(body) = std::str::from_utf8(&self.body) {
20             out.field("body", &body);
21         } else {
22             out.field("body", &self.body);
23         }
24         out.field("trailers", &self.trailers);
25         out.finish()
26     }
27 }
28 
29 impl Response {
30     pub fn header(&self, name: &str) -> Option<&Vec<u8>> {
31         self.headers
32             .iter()
33             .find_map(|(k, v)| if k == name { Some(v) } else { None })
34     }
35 }
36 
37 pub async fn request(
38     method: types::Method,
39     scheme: types::Scheme,
40     authority: &str,
41     path_with_query: &str,
42     body: Option<&[u8]>,
43     additional_headers: Option<&[(String, Vec<u8>)]>,
44     connect_timeout: Option<u64>,
45     first_by_timeout: Option<u64>,
46     between_bytes_timeout: Option<u64>,
47 ) -> Result<Response> {
48     fn header_val(v: &str) -> Vec<u8> {
49         v.to_string().into_bytes()
50     }
51     let headers = types::Headers::from_list(
52         &[
53             &[
54                 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),
55                 ("Content-type".to_string(), header_val("application/json")),
56             ],
57             additional_headers.unwrap_or(&[]),
58         ]
59         .concat(),
60     )?;
61 
62     let options = types::RequestOptions::new();
63     options
64         .set_connect_timeout(connect_timeout)
65         .map_err(|_err| anyhow!("failed to set connect_timeout"))?;
66     options
67         .set_first_byte_timeout(first_by_timeout)
68         .map_err(|_err| anyhow!("failed to set first_byte_timeout"))?;
69     options
70         .set_between_bytes_timeout(between_bytes_timeout)
71         .map_err(|_err| anyhow!("failed to set between_bytes_timeout"))?;
72 
73     let (mut contents_tx, contents_rx) = wit_stream::new();
74     let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None));
75     let (request, transmit) =
76         types::Request::new(headers, Some(contents_rx), trailers_rx, Some(options));
77 
78     request
79         .set_method(&method)
80         .map_err(|()| anyhow!("failed to set method"))?;
81     request
82         .set_scheme(Some(&scheme))
83         .map_err(|()| anyhow!("failed to set scheme"))?;
84     request
85         .set_authority(Some(authority))
86         .map_err(|()| anyhow!("failed to set authority"))?;
87     request
88         .set_path_with_query(Some(&path_with_query))
89         .map_err(|()| anyhow!("failed to set path_with_query"))?;
90 
91     let (transmit, handle) = join!(
92         async { transmit.await.context("failed to transmit request") },
93         async {
94             let response = handler::handle(request).await?;
95             let status = response.get_status_code();
96             let headers = response.get_headers().copy_all();
97             let (body_rx, trailers_rx) = response
98                 .consume_body()
99                 .expect("failed to get response body");
100             let ((), rx) = join!(
101                 async {
102                     if let Some(buf) = body {
103                         let remaining = contents_tx.write_all(buf.into()).await;
104                         assert!(remaining.is_empty());
105                     }
106                     drop(contents_tx);
107                     // This can fail in HTTP/1.1, since the connection might already be closed
108                     _ = trailers_tx.write(Ok(None)).await;
109                 },
110                 async {
111                     let body = body_rx.collect().await;
112                     let trailers = trailers_rx.await.context("failed to read body")?;
113                     let trailers = trailers.map(|trailers| trailers.copy_all());
114                     anyhow::Ok(Response {
115                         status,
116                         headers,
117                         body,
118                         trailers,
119                     })
120                 }
121             );
122             rx
123         },
124     );
125     let response = handle?;
126     transmit?;
127     Ok(response)
128 }
129