1 use anyhow::{Context as _, Result, anyhow}; 2 use core::fmt; 3 use futures::join; 4 5 use crate::p3::wasi::http::{handler, types}; 6 use crate::p3::{wit_future, wit_stream}; 7 8 pub struct Response { 9 pub status: types::StatusCode, 10 pub headers: Vec<(String, Vec<u8>)>, 11 pub body: Vec<u8>, 12 pub trailers: Option<Vec<(String, Vec<u8>)>>, 13 } 14 impl fmt::Debug for Response { 15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 16 let mut out = f.debug_struct("Response"); 17 out.field("status", &self.status) 18 .field("headers", &self.headers); 19 if let Ok(body) = std::str::from_utf8(&self.body) { 20 out.field("body", &body); 21 } else { 22 out.field("body", &self.body); 23 } 24 out.field("trailers", &self.trailers); 25 out.finish() 26 } 27 } 28 29 impl Response { 30 pub fn header(&self, name: &str) -> Option<&Vec<u8>> { 31 self.headers 32 .iter() 33 .find_map(|(k, v)| if k == name { Some(v) } else { None }) 34 } 35 } 36 37 pub async fn request( 38 method: types::Method, 39 scheme: types::Scheme, 40 authority: &str, 41 path_with_query: &str, 42 body: Option<&[u8]>, 43 additional_headers: Option<&[(String, Vec<u8>)]>, 44 connect_timeout: Option<u64>, 45 first_by_timeout: Option<u64>, 46 between_bytes_timeout: Option<u64>, 47 ) -> Result<Response> { 48 fn header_val(v: &str) -> Vec<u8> { 49 v.to_string().into_bytes() 50 } 51 let headers = types::Headers::from_list( 52 &[ 53 &[ 54 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")), 55 ("Content-type".to_string(), header_val("application/json")), 56 ], 57 additional_headers.unwrap_or(&[]), 58 ] 59 .concat(), 60 )?; 61 62 let options = types::RequestOptions::new(); 63 options 64 .set_connect_timeout(connect_timeout) 65 .map_err(|_err| anyhow!("failed to set connect_timeout"))?; 66 options 67 .set_first_byte_timeout(first_by_timeout) 68 .map_err(|_err| anyhow!("failed to set first_byte_timeout"))?; 69 options 70 .set_between_bytes_timeout(between_bytes_timeout) 71 .map_err(|_err| anyhow!("failed to set between_bytes_timeout"))?; 72 73 let (mut contents_tx, contents_rx) = wit_stream::new(); 74 let (trailers_tx, trailers_rx) = wit_future::new(|| Ok(None)); 75 let (request, transmit) = 76 types::Request::new(headers, Some(contents_rx), trailers_rx, Some(options)); 77 78 request 79 .set_method(&method) 80 .map_err(|()| anyhow!("failed to set method"))?; 81 request 82 .set_scheme(Some(&scheme)) 83 .map_err(|()| anyhow!("failed to set scheme"))?; 84 request 85 .set_authority(Some(authority)) 86 .map_err(|()| anyhow!("failed to set authority"))?; 87 request 88 .set_path_with_query(Some(&path_with_query)) 89 .map_err(|()| anyhow!("failed to set path_with_query"))?; 90 91 let (transmit, handle) = join!( 92 async { transmit.await.context("failed to transmit request") }, 93 async { 94 let response = handler::handle(request).await?; 95 let status = response.get_status_code(); 96 let headers = response.get_headers().copy_all(); 97 let (body_rx, trailers_rx) = response 98 .consume_body() 99 .expect("failed to get response body"); 100 let ((), rx) = join!( 101 async { 102 if let Some(buf) = body { 103 let remaining = contents_tx.write_all(buf.into()).await; 104 assert!(remaining.is_empty()); 105 } 106 drop(contents_tx); 107 // This can fail in HTTP/1.1, since the connection might already be closed 108 _ = trailers_tx.write(Ok(None)).await; 109 }, 110 async { 111 let body = body_rx.collect().await; 112 let trailers = trailers_rx.await.context("failed to read body")?; 113 let trailers = trailers.map(|trailers| trailers.copy_all()); 114 anyhow::Ok(Response { 115 status, 116 headers, 117 body, 118 trailers, 119 }) 120 } 121 ); 122 rx 123 }, 124 ); 125 let response = handle?; 126 transmit?; 127 Ok(response) 128 } 129