1 use anyhow::{Result, anyhow, bail}; 2 use futures::{Future, SinkExt, StreamExt, TryStreamExt, future, stream}; 3 use test_programs::wasi::http::types::{ 4 Fields, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest, 5 OutgoingResponse, ResponseOutparam, Scheme, 6 }; 7 use url::Url; 8 9 const MAX_CONCURRENCY: usize = 16; 10 11 struct Handler; 12 13 test_programs::proxy::export!(Handler); 14 15 impl test_programs::proxy::exports::wasi::http::incoming_handler::Guest for Handler { 16 fn handle(request: IncomingRequest, response_out: ResponseOutparam) { 17 executor::run(async move { 18 handle_request(request, response_out).await; 19 }) 20 } 21 } 22 23 async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam) { 24 let headers = request.headers().entries(); 25 26 assert!(request.authority().is_some()); 27 28 match (request.method(), request.path_with_query().as_deref()) { 29 (Method::Get, Some("/hash-all")) => { 30 // Send outgoing GET requests to the specified URLs and stream the hashes of the response bodies as 31 // they arrive. 32 33 let urls = headers.iter().filter_map(|(k, v)| { 34 (k == "url") 35 .then_some(v) 36 .and_then(|v| std::str::from_utf8(v).ok()) 37 .and_then(|v| Url::parse(v).ok()) 38 }); 39 40 let results = urls.map(|url| async move { 41 let result = hash(&url).await; 42 (url, result) 43 }); 44 45 let mut results = stream::iter(results).buffer_unordered(MAX_CONCURRENCY); 46 47 let response = OutgoingResponse::new( 48 Fields::from_list(&[("content-type".to_string(), b"text/plain".to_vec())]).unwrap(), 49 ); 50 51 let mut body = 52 executor::outgoing_body(response.body().expect("response should be writable")); 53 54 ResponseOutparam::set(response_out, Ok(response)); 55 56 while let Some((url, result)) = results.next().await { 57 let payload = match result { 58 Ok(hash) => format!("{url}: {hash}\n"), 59 Err(e) => format!("{url}: {e:?}\n"), 60 } 61 .into_bytes(); 62 63 if let Err(e) = body.send(payload).await { 64 eprintln!("Error sending payload: {e}"); 65 } 66 } 67 } 68 69 (Method::Post, Some("/echo")) => { 70 // Echo the request body without buffering it. 71 72 let response = OutgoingResponse::new( 73 Fields::from_list( 74 &headers 75 .into_iter() 76 .filter_map(|(k, v)| (k == "content-type").then_some((k, v))) 77 .collect::<Vec<_>>(), 78 ) 79 .unwrap(), 80 ); 81 82 let mut body = 83 executor::outgoing_body(response.body().expect("response should be writable")); 84 85 ResponseOutparam::set(response_out, Ok(response)); 86 87 let mut stream = 88 executor::incoming_body(request.consume().expect("request should be readable")); 89 90 while let Some(chunk) = stream.next().await { 91 match chunk { 92 Ok(chunk) => { 93 if let Err(e) = body.send(chunk).await { 94 eprintln!("Error sending body: {e}"); 95 break; 96 } 97 } 98 Err(e) => { 99 eprintln!("Error receiving body: {e}"); 100 break; 101 } 102 } 103 } 104 } 105 106 (Method::Post, Some("/double-echo")) => { 107 // Pipe the request body to an outgoing request and stream the response back to the client. 108 109 if let Some(url) = headers.iter().find_map(|(k, v)| { 110 (k == "url") 111 .then_some(v) 112 .and_then(|v| std::str::from_utf8(v).ok()) 113 .and_then(|v| Url::parse(v).ok()) 114 }) { 115 match double_echo(request, &url).await { 116 Ok((request_copy, response)) => { 117 let mut stream = executor::incoming_body( 118 response.consume().expect("response should be consumable"), 119 ); 120 121 let response = OutgoingResponse::new( 122 Fields::from_list( 123 &headers 124 .into_iter() 125 .filter_map(|(k, v)| (k == "content-type").then_some((k, v))) 126 .collect::<Vec<_>>(), 127 ) 128 .unwrap(), 129 ); 130 131 let mut body = executor::outgoing_body( 132 response.body().expect("response should be writable"), 133 ); 134 135 ResponseOutparam::set(response_out, Ok(response)); 136 137 let response_copy = async move { 138 while let Some(chunk) = stream.next().await { 139 body.send(chunk?).await?; 140 } 141 Ok::<_, anyhow::Error>(()) 142 }; 143 144 let (request_copy, response_copy) = 145 future::join(request_copy, response_copy).await; 146 if let Err(e) = request_copy.and(response_copy) { 147 eprintln!("error piping to and from {url}: {e}"); 148 } 149 } 150 151 Err(e) => { 152 eprintln!("Error sending outgoing request to {url}: {e}"); 153 server_error(response_out); 154 } 155 } 156 } else { 157 bad_request(response_out); 158 } 159 } 160 161 _ => method_not_allowed(response_out), 162 } 163 } 164 165 async fn double_echo( 166 incoming_request: IncomingRequest, 167 url: &Url, 168 ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)> { 169 let outgoing_request = OutgoingRequest::new(Fields::new()); 170 171 outgoing_request 172 .set_method(&Method::Post) 173 .map_err(|()| anyhow!("failed to set method"))?; 174 175 outgoing_request 176 .set_path_with_query(Some(url.path())) 177 .map_err(|()| anyhow!("failed to set path_with_query"))?; 178 179 outgoing_request 180 .set_scheme(Some(&match url.scheme() { 181 "http" => Scheme::Http, 182 "https" => Scheme::Https, 183 scheme => Scheme::Other(scheme.into()), 184 })) 185 .map_err(|()| anyhow!("failed to set scheme"))?; 186 187 outgoing_request 188 .set_authority(Some(&format!( 189 "{}{}", 190 url.host_str().unwrap_or(""), 191 if let Some(port) = url.port() { 192 format!(":{port}") 193 } else { 194 String::new() 195 } 196 ))) 197 .map_err(|()| anyhow!("failed to set authority"))?; 198 199 let mut body = executor::outgoing_body( 200 outgoing_request 201 .body() 202 .expect("request body should be writable"), 203 ); 204 205 let response = executor::outgoing_request_send(outgoing_request); 206 207 let mut stream = executor::incoming_body( 208 incoming_request 209 .consume() 210 .expect("request should be consumable"), 211 ); 212 213 let copy = async move { 214 while let Some(chunk) = stream.next().await { 215 body.send(chunk?).await?; 216 } 217 Ok::<_, anyhow::Error>(()) 218 }; 219 220 let response = response.await?; 221 222 let status = response.status(); 223 224 if !(200..300).contains(&status) { 225 bail!("unexpected status: {status}"); 226 } 227 228 Ok((copy, response)) 229 } 230 231 fn server_error(response_out: ResponseOutparam) { 232 respond(500, response_out) 233 } 234 235 fn bad_request(response_out: ResponseOutparam) { 236 respond(400, response_out) 237 } 238 239 fn method_not_allowed(response_out: ResponseOutparam) { 240 respond(405, response_out) 241 } 242 243 fn respond(status: u16, response_out: ResponseOutparam) { 244 let response = OutgoingResponse::new(Fields::new()); 245 response 246 .set_status_code(status) 247 .expect("setting status code"); 248 249 let body = response.body().expect("response should be writable"); 250 251 ResponseOutparam::set(response_out, Ok(response)); 252 253 OutgoingBody::finish(body, None).expect("outgoing-body.finish"); 254 } 255 256 async fn hash(url: &Url) -> Result<String> { 257 let request = OutgoingRequest::new(Fields::new()); 258 259 request 260 .set_path_with_query(Some(url.path())) 261 .map_err(|()| anyhow!("failed to set path_with_query"))?; 262 request 263 .set_scheme(Some(&match url.scheme() { 264 "http" => Scheme::Http, 265 "https" => Scheme::Https, 266 scheme => Scheme::Other(scheme.into()), 267 })) 268 .map_err(|()| anyhow!("failed to set scheme"))?; 269 request 270 .set_authority(Some(&format!( 271 "{}{}", 272 url.host_str().unwrap_or(""), 273 if let Some(port) = url.port() { 274 format!(":{port}") 275 } else { 276 String::new() 277 } 278 ))) 279 .map_err(|()| anyhow!("failed to set authority"))?; 280 281 let response = executor::outgoing_request_send(request).await?; 282 283 let status = response.status(); 284 285 if !(200..300).contains(&status) { 286 bail!("unexpected status: {status}"); 287 } 288 289 let mut body = 290 executor::incoming_body(response.consume().expect("response should be readable")); 291 292 use sha2::Digest; 293 let mut hasher = sha2::Sha256::new(); 294 while let Some(chunk) = body.try_next().await? { 295 hasher.update(&chunk); 296 } 297 298 use base64::Engine; 299 Ok(base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize())) 300 } 301 302 // Technically this should not be here for a proxy, but given the current 303 // framework for tests it's required since this file is built as a `bin` 304 fn main() {} 305 306 mod executor { 307 use anyhow::{Error, Result, anyhow}; 308 use futures::{Sink, Stream, future, sink, stream}; 309 use std::{ 310 cell::RefCell, 311 future::Future, 312 mem, 313 rc::Rc, 314 sync::{Arc, Mutex}, 315 task::{Context, Poll, Wake, Waker}, 316 }; 317 use test_programs::wasi::{ 318 http::{ 319 outgoing_handler, 320 types::{ 321 self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody, 322 OutgoingRequest, OutputStream, 323 }, 324 }, 325 io::{self, streams::StreamError}, 326 }; 327 328 const READ_SIZE: u64 = 16 * 1024; 329 330 static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new()); 331 332 pub fn run<T>(future: impl Future<Output = T>) -> T { 333 futures::pin_mut!(future); 334 335 struct DummyWaker; 336 337 impl Wake for DummyWaker { 338 fn wake(self: Arc<Self>) {} 339 } 340 341 let waker = Arc::new(DummyWaker).into(); 342 343 loop { 344 match future.as_mut().poll(&mut Context::from_waker(&waker)) { 345 Poll::Pending => { 346 let mut new_wakers = Vec::new(); 347 348 let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap()); 349 350 assert!(!wakers.is_empty()); 351 352 let pollables = wakers 353 .iter() 354 .map(|(pollable, _)| pollable) 355 .collect::<Vec<_>>(); 356 357 let mut ready = vec![false; wakers.len()]; 358 359 for index in io::poll::poll(&pollables) { 360 ready[usize::try_from(index).unwrap()] = true; 361 } 362 363 for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) { 364 if ready { 365 waker.wake() 366 } else { 367 new_wakers.push((pollable, waker)); 368 } 369 } 370 371 *WAKERS.lock().unwrap() = new_wakers; 372 } 373 Poll::Ready(result) => break result, 374 } 375 } 376 } 377 378 pub fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error> { 379 struct Outgoing(Option<(OutputStream, OutgoingBody)>); 380 381 impl Drop for Outgoing { 382 fn drop(&mut self) { 383 if let Some((stream, body)) = self.0.take() { 384 drop(stream); 385 OutgoingBody::finish(body, None).expect("outgoing-body.finish"); 386 } 387 } 388 } 389 390 let stream = body.write().expect("response body should be writable"); 391 let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body))))); 392 393 sink::unfold((), { 394 move |(), chunk: Vec<u8>| { 395 future::poll_fn({ 396 let mut offset = 0; 397 let mut flushing = false; 398 let pair = pair.clone(); 399 400 move |context| { 401 let pair = pair.borrow(); 402 let (stream, _) = &pair.0.as_ref().unwrap(); 403 404 loop { 405 match stream.check_write() { 406 Ok(0) => { 407 WAKERS 408 .lock() 409 .unwrap() 410 .push((stream.subscribe(), context.waker().clone())); 411 412 break Poll::Pending; 413 } 414 Ok(count) => { 415 if offset == chunk.len() { 416 if flushing { 417 break Poll::Ready(Ok(())); 418 } else { 419 stream.flush().expect("stream should be flushable"); 420 flushing = true; 421 } 422 } else { 423 let count = usize::try_from(count) 424 .unwrap() 425 .min(chunk.len() - offset); 426 427 match stream.write(&chunk[offset..][..count]) { 428 Ok(()) => { 429 offset += count; 430 } 431 Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))), 432 } 433 } 434 } 435 Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))), 436 } 437 } 438 } 439 }) 440 } 441 }) 442 } 443 444 pub fn outgoing_request_send( 445 request: OutgoingRequest, 446 ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>> { 447 future::poll_fn({ 448 let response = outgoing_handler::handle(request, None); 449 450 move |context| match &response { 451 Ok(response) => { 452 if let Some(response) = response.get() { 453 Poll::Ready(response.unwrap()) 454 } else { 455 WAKERS 456 .lock() 457 .unwrap() 458 .push((response.subscribe(), context.waker().clone())); 459 Poll::Pending 460 } 461 } 462 Err(error) => Poll::Ready(Err(error.clone())), 463 } 464 }) 465 } 466 467 pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> { 468 enum Inner { 469 Stream { 470 stream: InputStream, 471 body: IncomingBody, 472 }, 473 Trailers(FutureTrailers), 474 Closed, 475 } 476 477 struct Incoming(Inner); 478 479 impl Drop for Incoming { 480 fn drop(&mut self) { 481 match mem::replace(&mut self.0, Inner::Closed) { 482 Inner::Stream { stream, body } => { 483 drop(stream); 484 IncomingBody::finish(body); 485 } 486 Inner::Trailers(_) | Inner::Closed => {} 487 } 488 } 489 } 490 491 stream::poll_fn({ 492 let stream = body.stream().expect("response body should be readable"); 493 let mut incoming = Incoming(Inner::Stream { stream, body }); 494 495 move |context| { 496 loop { 497 match &incoming.0 { 498 Inner::Stream { stream, .. } => match stream.read(READ_SIZE) { 499 Ok(buffer) => { 500 return if buffer.is_empty() { 501 WAKERS 502 .lock() 503 .unwrap() 504 .push((stream.subscribe(), context.waker().clone())); 505 Poll::Pending 506 } else { 507 Poll::Ready(Some(Ok(buffer))) 508 }; 509 } 510 Err(StreamError::Closed) => { 511 let Inner::Stream { stream, body } = 512 mem::replace(&mut incoming.0, Inner::Closed) 513 else { 514 unreachable!(); 515 }; 516 drop(stream); 517 incoming.0 = Inner::Trailers(IncomingBody::finish(body)); 518 } 519 Err(StreamError::LastOperationFailed(error)) => { 520 return Poll::Ready(Some(Err(anyhow!( 521 "{}", 522 error.to_debug_string() 523 )))); 524 } 525 }, 526 527 Inner::Trailers(trailers) => { 528 match trailers.get() { 529 Some(Ok(trailers)) => { 530 incoming.0 = Inner::Closed; 531 match trailers { 532 Ok(Some(_)) => { 533 // Currently, we just ignore any trailers. TODO: Add a test that 534 // expects trailers and verify they match the expected contents. 535 } 536 Ok(None) => { 537 // No trailers; nothing else to do. 538 } 539 Err(error) => { 540 // Error reading the trailers: pass it on to the application. 541 return Poll::Ready(Some(Err(anyhow!("{error:?}")))); 542 } 543 } 544 } 545 Some(Err(_)) => { 546 // Should only happen if we try to retrieve the trailers twice, i.e. a bug in 547 // this code. 548 unreachable!(); 549 } 550 None => { 551 WAKERS 552 .lock() 553 .unwrap() 554 .push((trailers.subscribe(), context.waker().clone())); 555 return Poll::Pending; 556 } 557 } 558 } 559 560 Inner::Closed => { 561 return Poll::Ready(None); 562 } 563 } 564 } 565 } 566 }) 567 } 568 } 569