1 use { 2 flate2::{ 3 Compression, 4 write::{DeflateDecoder, DeflateEncoder}, 5 }, 6 std::{io::Write, mem}, 7 test_programs::p3::{ 8 proxy::exports::wasi::http::handler::Guest as Handler, 9 wasi::http::{ 10 handler, 11 types::{ErrorCode, Headers, Request, Response}, 12 }, 13 wit_future, wit_stream, 14 }, 15 wit_bindgen::StreamResult, 16 }; 17 18 struct Component; 19 20 test_programs::p3::proxy::export!(Component); 21 22 impl Handler for Component { 23 /// Forward the specified request to the imported `wasi:http/handler`, transparently decoding the request body 24 /// if it is `deflate`d and then encoding the response body if the client has provided an `accept-encoding: 25 /// deflate` header. 26 async fn handle(request: Request) -> Result<Response, ErrorCode> { 27 // First, extract the parts of the request and check for (and remove) headers pertaining to body encodings. 28 let method = request.get_method(); 29 let scheme = request.get_scheme(); 30 let path_with_query = request.get_path_with_query(); 31 let authority = request.get_authority(); 32 let mut accept_deflated = false; 33 let mut content_deflated = false; 34 let headers = request.get_headers(); 35 let mut headers = headers.copy_all(); 36 headers.retain(|(k, v)| match (k.as_str(), v.as_slice()) { 37 ("accept-encoding", value) 38 if std::str::from_utf8(value) 39 .map(|v| v.contains("deflate")) 40 .unwrap_or(false) => 41 { 42 accept_deflated = true; 43 false 44 } 45 ("content-encoding", b"deflate") => { 46 content_deflated = true; 47 false 48 } 49 _ => true, 50 }); 51 let (_, result_rx) = wit_future::new(|| Ok(())); 52 let (mut body, trailers) = Request::consume_body(request, result_rx); 53 54 let (body, trailers) = if content_deflated { 55 // Next, spawn a task to pipe and decode the original request body and trailers into a new request 56 // we'll create below. This will run concurrently with any code in the imported `wasi:http/handler`. 57 let (trailers_tx, trailers_rx) = wit_future::new(|| todo!()); 58 let (mut pipe_tx, pipe_rx) = wit_stream::new(); 59 60 wit_bindgen::spawn(async move { 61 { 62 let mut decoder = DeflateDecoder::new(Vec::new()); 63 let mut status = StreamResult::Complete(0); 64 let mut chunk = Vec::with_capacity(64 * 1024); 65 66 while let StreamResult::Complete(_) = status { 67 (status, chunk) = body.read(chunk).await; 68 decoder.write_all(&chunk).unwrap(); 69 let remaining = pipe_tx.write_all(mem::take(decoder.get_mut())).await; 70 assert!(remaining.is_empty()); 71 *decoder.get_mut() = remaining; 72 chunk.clear(); 73 } 74 75 let remaining = pipe_tx.write_all(decoder.finish().unwrap()).await; 76 assert!(remaining.is_empty()); 77 78 drop(pipe_tx); 79 } 80 81 trailers_tx.write(trailers.await).await.unwrap(); 82 }); 83 84 (pipe_rx, trailers_rx) 85 } else { 86 (body, trailers) 87 }; 88 89 // While the above task (if any) is running, synthesize a request from the parts collected above and pass 90 // it to the imported `wasi:http/handler`. 91 let (my_request, _request_complete) = Request::new( 92 Headers::from_list(&headers).unwrap(), 93 Some(body), 94 trailers, 95 None, 96 ); 97 my_request.set_method(&method).unwrap(); 98 my_request.set_scheme(scheme.as_ref()).unwrap(); 99 my_request 100 .set_path_with_query(path_with_query.as_deref()) 101 .unwrap(); 102 my_request.set_authority(authority.as_deref()).unwrap(); 103 104 let response = handler::handle(my_request).await?; 105 106 // Now that we have the response, extract the parts, adding an extra header if we'll be encoding the body. 107 let status_code = response.get_status_code(); 108 let mut headers = response.get_headers().copy_all(); 109 if accept_deflated { 110 headers.push(("content-encoding".into(), b"deflate".into())); 111 } 112 113 let (_, result_rx) = wit_future::new(|| Ok(())); 114 let (mut body, trailers) = Response::consume_body(response, result_rx); 115 let (body, trailers) = if accept_deflated { 116 headers.retain(|(name, _value)| name != "content-length"); 117 118 // Spawn another task; this one is to pipe and encode the original response body and trailers into a 119 // new response we'll create below. This will run concurrently with the caller's code (i.e. it won't 120 // necessarily complete before we return a value). 121 let (trailers_tx, trailers_rx) = wit_future::new(|| todo!()); 122 let (mut pipe_tx, pipe_rx) = wit_stream::new(); 123 124 wit_bindgen::spawn(async move { 125 { 126 let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast()); 127 let mut status = StreamResult::Complete(0); 128 let mut chunk = Vec::with_capacity(64 * 1024); 129 130 while let StreamResult::Complete(_) = status { 131 (status, chunk) = body.read(chunk).await; 132 encoder.write_all(&chunk).unwrap(); 133 let remaining = pipe_tx.write_all(mem::take(encoder.get_mut())).await; 134 assert!(remaining.is_empty()); 135 *encoder.get_mut() = remaining; 136 chunk.clear(); 137 } 138 139 let remaining = pipe_tx.write_all(encoder.finish().unwrap()).await; 140 assert!(remaining.is_empty()); 141 142 drop(pipe_tx); 143 } 144 145 trailers_tx.write(trailers.await).await.unwrap(); 146 }); 147 148 (pipe_rx, trailers_rx) 149 } else { 150 (body, trailers) 151 }; 152 153 // While the above tasks (if any) are running, synthesize a response from the parts collected above and 154 // return it. 155 let (my_response, _response_complete) = 156 Response::new(Headers::from_list(&headers).unwrap(), Some(body), trailers); 157 my_response.set_status_code(status_code).unwrap(); 158 159 Ok(my_response) 160 } 161 } 162 163 // Unused function; required since this file is built as a `bin`: 164 fn main() {} 165