1 use crate::wasi::http::{outgoing_handler, types as http_types}; 2 use crate::wasi::io::streams; 3 use anyhow::{anyhow, Result}; 4 use std::fmt; 5 6 pub struct Response { 7 pub status: http_types::StatusCode, 8 pub headers: Vec<(String, Vec<u8>)>, 9 pub body: Vec<u8>, 10 } 11 impl fmt::Debug for Response { 12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 13 let mut out = f.debug_struct("Response"); 14 out.field("status", &self.status) 15 .field("headers", &self.headers); 16 if let Ok(body) = std::str::from_utf8(&self.body) { 17 out.field("body", &body); 18 } else { 19 out.field("body", &self.body); 20 } 21 out.finish() 22 } 23 } 24 25 impl Response { 26 pub fn header(&self, name: &str) -> Option<&Vec<u8>> { 27 self.headers 28 .iter() 29 .find_map(|(k, v)| if k == name { Some(v) } else { None }) 30 } 31 } 32 33 pub fn request( 34 method: http_types::Method, 35 scheme: http_types::Scheme, 36 authority: &str, 37 path_with_query: &str, 38 body: Option<&[u8]>, 39 additional_headers: Option<&[(String, Vec<u8>)]>, 40 ) -> Result<Response> { 41 fn header_val(v: &str) -> Vec<u8> { 42 v.to_string().into_bytes() 43 } 44 let headers = http_types::Headers::from_list( 45 &[ 46 &[ 47 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")), 48 ("Content-type".to_string(), header_val("application/json")), 49 ], 50 additional_headers.unwrap_or(&[]), 51 ] 52 .concat(), 53 )?; 54 55 let request = http_types::OutgoingRequest::new(headers); 56 57 request 58 .set_method(&method) 59 .map_err(|()| anyhow!("failed to set method"))?; 60 request 61 .set_scheme(Some(&scheme)) 62 .map_err(|()| anyhow!("failed to set scheme"))?; 63 request 64 .set_authority(Some(authority)) 65 .map_err(|()| anyhow!("failed to set authority"))?; 66 request 67 .set_path_with_query(Some(&path_with_query)) 68 .map_err(|()| anyhow!("failed to set path_with_query"))?; 69 70 let outgoing_body = request 71 .body() 72 .map_err(|_| anyhow!("outgoing request write failed"))?; 73 74 if let Some(mut buf) = body { 75 let request_body = outgoing_body 76 .write() 77 .map_err(|_| anyhow!("outgoing request write failed"))?; 78 79 let pollable = request_body.subscribe(); 80 while !buf.is_empty() { 81 pollable.block(); 82 83 let permit = match request_body.check_write() { 84 Ok(n) => n, 85 Err(_) => anyhow::bail!("output stream error"), 86 }; 87 88 let len = buf.len().min(permit as usize); 89 let (chunk, rest) = buf.split_at(len); 90 buf = rest; 91 92 match request_body.write(chunk) { 93 Err(_) => anyhow::bail!("output stream error"), 94 _ => {} 95 } 96 } 97 98 match request_body.flush() { 99 Err(_) => anyhow::bail!("output stream error"), 100 _ => {} 101 } 102 103 pollable.block(); 104 105 match request_body.check_write() { 106 Ok(_) => {} 107 Err(_) => anyhow::bail!("output stream error"), 108 }; 109 } 110 111 let future_response = outgoing_handler::handle(request, None)?; 112 113 http_types::OutgoingBody::finish(outgoing_body, None); 114 115 let incoming_response = match future_response.get() { 116 Some(result) => result.map_err(|_| anyhow!("incoming response errored"))?, 117 None => { 118 let pollable = future_response.subscribe(); 119 pollable.block(); 120 future_response 121 .get() 122 .expect("incoming response available") 123 .map_err(|_| anyhow!("incoming response errored"))? 124 } 125 } 126 // TODO: maybe anything that appears in the Result<_, E> position should impl 127 // Error? anyway, just use its Debug here: 128 .map_err(|e| anyhow!("{e:?}"))?; 129 130 drop(future_response); 131 132 let status = incoming_response.status(); 133 134 let headers_handle = incoming_response.headers(); 135 let headers = headers_handle.entries(); 136 drop(headers_handle); 137 138 let incoming_body = incoming_response 139 .consume() 140 .map_err(|()| anyhow!("incoming response has no body stream"))?; 141 142 drop(incoming_response); 143 144 let input_stream = incoming_body.stream().unwrap(); 145 let input_stream_pollable = input_stream.subscribe(); 146 147 let mut body = Vec::new(); 148 loop { 149 input_stream_pollable.block(); 150 151 let mut body_chunk = match input_stream.read(1024 * 1024) { 152 Ok(c) => c, 153 Err(streams::StreamError::Closed) => break, 154 Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, 155 }; 156 157 if !body_chunk.is_empty() { 158 body.append(&mut body_chunk); 159 } 160 } 161 162 Ok(Response { 163 status, 164 headers, 165 body, 166 }) 167 } 168