1 use crate::wasi::http::{outgoing_handler, types as http_types}; 2 use crate::wasi::io::streams; 3 use anyhow::{anyhow, Result}; 4 use std::fmt; 5 6 pub struct Response { 7 pub status: http_types::StatusCode, 8 pub headers: Vec<(String, Vec<u8>)>, 9 pub body: Vec<u8>, 10 } 11 impl fmt::Debug for Response { 12 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 13 let mut out = f.debug_struct("Response"); 14 out.field("status", &self.status) 15 .field("headers", &self.headers); 16 if let Ok(body) = std::str::from_utf8(&self.body) { 17 out.field("body", &body); 18 } else { 19 out.field("body", &self.body); 20 } 21 out.finish() 22 } 23 } 24 25 impl Response { 26 pub fn header(&self, name: &str) -> Option<&Vec<u8>> { 27 self.headers 28 .iter() 29 .find_map(|(k, v)| if k == name { Some(v) } else { None }) 30 } 31 } 32 33 pub fn request( 34 method: http_types::Method, 35 scheme: http_types::Scheme, 36 authority: &str, 37 path_with_query: &str, 38 body: Option<&[u8]>, 39 additional_headers: Option<&[(String, Vec<u8>)]>, 40 connect_timeout: Option<u64>, 41 first_by_timeout: Option<u64>, 42 between_bytes_timeout: Option<u64>, 43 ) -> Result<Response> { 44 fn header_val(v: &str) -> Vec<u8> { 45 v.to_string().into_bytes() 46 } 47 let headers = http_types::Headers::from_list( 48 &[ 49 &[ 50 ("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")), 51 ("Content-type".to_string(), header_val("application/json")), 52 ], 53 additional_headers.unwrap_or(&[]), 54 ] 55 .concat(), 56 )?; 57 58 let request = http_types::OutgoingRequest::new(headers); 59 60 request 61 .set_method(&method) 62 .map_err(|()| anyhow!("failed to set method"))?; 63 request 64 .set_scheme(Some(&scheme)) 65 .map_err(|()| anyhow!("failed to set scheme"))?; 66 request 67 .set_authority(Some(authority)) 68 .map_err(|()| anyhow!("failed to set authority"))?; 69 request 70 .set_path_with_query(Some(&path_with_query)) 71 .map_err(|()| anyhow!("failed to set path_with_query"))?; 72 73 let outgoing_body = request 74 .body() 75 .map_err(|_| anyhow!("outgoing request write failed"))?; 76 77 if let Some(mut buf) = body { 78 let request_body = outgoing_body 79 .write() 80 .map_err(|_| anyhow!("outgoing request write failed"))?; 81 82 let pollable = request_body.subscribe(); 83 while !buf.is_empty() { 84 pollable.block(); 85 86 let permit = match request_body.check_write() { 87 Ok(n) => n, 88 Err(_) => anyhow::bail!("output stream error"), 89 }; 90 91 let len = buf.len().min(permit as usize); 92 let (chunk, rest) = buf.split_at(len); 93 buf = rest; 94 95 match request_body.write(chunk) { 96 Err(_) => anyhow::bail!("output stream error"), 97 _ => {} 98 } 99 } 100 101 match request_body.flush() { 102 Err(_) => anyhow::bail!("output stream error"), 103 _ => {} 104 } 105 106 pollable.block(); 107 108 match request_body.check_write() { 109 Ok(_) => {} 110 Err(_) => anyhow::bail!("output stream error"), 111 }; 112 } 113 114 let options = http_types::RequestOptions::new(); 115 options 116 .set_connect_timeout(connect_timeout) 117 .map_err(|()| anyhow!("failed to set connect_timeout"))?; 118 options 119 .set_first_byte_timeout(first_by_timeout) 120 .map_err(|()| anyhow!("failed to set first_byte_timeout"))?; 121 options 122 .set_between_bytes_timeout(between_bytes_timeout) 123 .map_err(|()| anyhow!("failed to set between_bytes_timeout"))?; 124 let options = Some(options); 125 126 let future_response = outgoing_handler::handle(request, options)?; 127 128 http_types::OutgoingBody::finish(outgoing_body, None)?; 129 130 let incoming_response = match future_response.get() { 131 Some(result) => result.map_err(|()| anyhow!("response already taken"))?, 132 None => { 133 let pollable = future_response.subscribe(); 134 pollable.block(); 135 future_response 136 .get() 137 .expect("incoming response available") 138 .map_err(|()| anyhow!("response already taken"))? 139 } 140 }?; 141 142 drop(future_response); 143 144 let status = incoming_response.status(); 145 146 let headers_handle = incoming_response.headers(); 147 let headers = headers_handle.entries(); 148 drop(headers_handle); 149 150 let incoming_body = incoming_response 151 .consume() 152 .map_err(|()| anyhow!("incoming response has no body stream"))?; 153 154 drop(incoming_response); 155 156 let input_stream = incoming_body.stream().unwrap(); 157 let input_stream_pollable = input_stream.subscribe(); 158 159 let mut body = Vec::new(); 160 loop { 161 input_stream_pollable.block(); 162 163 let mut body_chunk = match input_stream.read(1024 * 1024) { 164 Ok(c) => c, 165 Err(streams::StreamError::Closed) => break, 166 Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, 167 }; 168 169 if !body_chunk.is_empty() { 170 body.append(&mut body_chunk); 171 } 172 } 173 174 Ok(Response { 175 status, 176 headers, 177 body, 178 }) 179 } 180