1 use crate::wasi::http::{outgoing_handler, types as http_types}; 2 use crate::wasi::io::streams; 3 use anyhow::{anyhow, Result}; 4 use std::fmt; 5 6 pub struct Response { 7 pub status: http_types::StatusCode, 8 pub headers: Vec<(String, Vec<u8>)>, 9 pub body: Vec<u8>, 10 } 11 impl fmt::Debug for Response { 12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 13 let mut out = f.debug_struct("Response"); 14 out.field("status", &self.status) 15 .field("headers", &self.headers); 16 if let Ok(body) = std::str::from_utf8(&self.body) { 17 out.field("body", &body); 18 } else { 19 out.field("body", &self.body); 20 } 21 out.finish() 22 } 23 } 24 25 impl Response { 26 pub fn header(&self, name: &str) -> Option<&Vec<u8>> { 27 self.headers 28 .iter() 29 .find_map(|(k, v)| if k == name { Some(v) } else { None }) 30 } 31 } 32 33 pub fn request( 34 method: http_types::Method, 35 scheme: http_types::Scheme, 36 authority: &str, 37 path_with_query: &str, 38 body: Option<&[u8]>, 39 additional_headers: Option<&[(String, Vec<u8>)]>, 40 connect_timeout: Option<u64>, 41 first_by_timeout: Option<u64>, 42 between_bytes_timeout: Option<u64>, 43 ) -> Result<Response> { 44 fn header_val(v: &str) -> Vec<u8> { 45 v.to_string().into_bytes() 46 } 47 let headers = http_types::Headers::from_list( 48 &[ 49 &[ 50 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")), 51 ("Content-type".to_string(), header_val("application/json")), 52 ], 53 additional_headers.unwrap_or(&[]), 54 ] 55 .concat(), 56 )?; 57 58 let request = http_types::OutgoingRequest::new(headers); 59 60 request 61 .set_method(&method) 62 .map_err(|()| anyhow!("failed to set method"))?; 63 request 64 .set_scheme(Some(&scheme)) 65 .map_err(|()| anyhow!("failed to set scheme"))?; 66 request 67 .set_authority(Some(authority)) 68 .map_err(|()| anyhow!("failed to set authority"))?; 69 request 70 .set_path_with_query(Some(&path_with_query)) 71 .map_err(|()| anyhow!("failed to set path_with_query"))?; 72 73 let outgoing_body = request 74 .body() 75 .map_err(|_| anyhow!("outgoing request write failed"))?; 76 77 let options = http_types::RequestOptions::new(); 78 options 79 .set_connect_timeout(connect_timeout) 80 .map_err(|()| anyhow!("failed to set connect_timeout"))?; 81 options 82 .set_first_byte_timeout(first_by_timeout) 83 .map_err(|()| anyhow!("failed to set first_byte_timeout"))?; 84 options 85 .set_between_bytes_timeout(between_bytes_timeout) 86 .map_err(|()| anyhow!("failed to set between_bytes_timeout"))?; 87 let options = Some(options); 88 89 let future_response = outgoing_handler::handle(request, options)?; 90 91 if let Some(mut buf) = body { 92 let request_body = outgoing_body 93 .write() 94 .map_err(|_| anyhow!("outgoing request write failed"))?; 95 96 let pollable = request_body.subscribe(); 97 while !buf.is_empty() { 98 pollable.block(); 99 100 let permit = match request_body.check_write() { 101 Ok(n) => n, 102 Err(_) => anyhow::bail!("output stream error"), 103 }; 104 105 let len = buf.len().min(permit as usize); 106 let (chunk, rest) = buf.split_at(len); 107 buf = rest; 108 109 match request_body.write(chunk) { 110 Err(_) => anyhow::bail!("output stream error"), 111 _ => {} 112 } 113 } 114 115 match request_body.flush() { 116 Err(_) => anyhow::bail!("output stream error"), 117 _ => {} 118 } 119 120 pollable.block(); 121 122 match request_body.check_write() { 123 Ok(_) => {} 124 Err(_) => anyhow::bail!("output stream error"), 125 }; 126 } 127 http_types::OutgoingBody::finish(outgoing_body, None)?; 128 129 let incoming_response = match future_response.get() { 130 Some(result) => result.map_err(|()| anyhow!("response already taken"))?, 131 None => { 132 let pollable = future_response.subscribe(); 133 pollable.block(); 134 future_response 135 .get() 136 .expect("incoming response available") 137 .map_err(|()| anyhow!("response already taken"))? 138 } 139 }?; 140 141 drop(future_response); 142 143 let status = incoming_response.status(); 144 145 let headers_handle = incoming_response.headers(); 146 let headers = headers_handle.entries(); 147 drop(headers_handle); 148 149 let incoming_body = incoming_response 150 .consume() 151 .map_err(|()| anyhow!("incoming response has no body stream"))?; 152 153 drop(incoming_response); 154 155 let input_stream = incoming_body.stream().unwrap(); 156 let input_stream_pollable = input_stream.subscribe(); 157 158 let mut body = Vec::new(); 159 loop { 160 input_stream_pollable.block(); 161 162 let mut body_chunk = match input_stream.read(1024 * 1024) { 163 Ok(c) => c, 164 Err(streams::StreamError::Closed) => break, 165 Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, 166 }; 167 168 if !body_chunk.is_empty() { 169 body.append(&mut body_chunk); 170 } 171 } 172 173 Ok(Response { 174 status, 175 headers, 176 body, 177 }) 178 } 179