1 use anyhow::{Result, anyhow, bail};
2 use futures::{Future, SinkExt, StreamExt, TryStreamExt, future, stream};
3 use test_programs::wasi::http::types::{
4 Fields, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest,
5 OutgoingResponse, ResponseOutparam, Scheme,
6 };
7 use url::Url;
8
9 const MAX_CONCURRENCY: usize = 16;
10
11 struct Handler;
12
13 test_programs::proxy::export!(Handler);
14
15 impl test_programs::proxy::exports::wasi::http::incoming_handler::Guest for Handler {
handle(request: IncomingRequest, response_out: ResponseOutparam)16 fn handle(request: IncomingRequest, response_out: ResponseOutparam) {
17 executor::run(async move {
18 handle_request(request, response_out).await;
19 })
20 }
21 }
22
handle_request(request: IncomingRequest, response_out: ResponseOutparam)23 async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam) {
24 let headers = request.headers().entries();
25
26 assert!(request.authority().is_some());
27
28 match (request.method(), request.path_with_query().as_deref()) {
29 (Method::Get, Some("/hash-all")) => {
30 // Send outgoing GET requests to the specified URLs and stream the hashes of the response bodies as
31 // they arrive.
32
33 let urls = headers.iter().filter_map(|(k, v)| {
34 (k == "url")
35 .then_some(v)
36 .and_then(|v| std::str::from_utf8(v).ok())
37 .and_then(|v| Url::parse(v).ok())
38 });
39
40 let results = urls.map(|url| async move {
41 let result = hash(&url).await;
42 (url, result)
43 });
44
45 let mut results = stream::iter(results).buffer_unordered(MAX_CONCURRENCY);
46
47 let response = OutgoingResponse::new(
48 Fields::from_list(&[("content-type".to_string(), b"text/plain".to_vec())]).unwrap(),
49 );
50
51 let mut body =
52 executor::outgoing_body(response.body().expect("response should be writable"));
53
54 ResponseOutparam::set(response_out, Ok(response));
55
56 while let Some((url, result)) = results.next().await {
57 let payload = match result {
58 Ok(hash) => format!("{url}: {hash}\n"),
59 Err(e) => format!("{url}: {e:?}\n"),
60 }
61 .into_bytes();
62
63 if let Err(e) = body.send(payload).await {
64 eprintln!("Error sending payload: {e}");
65 }
66 }
67 }
68
69 (Method::Post, Some("/echo")) => {
70 // Echo the request body without buffering it.
71
72 let response = OutgoingResponse::new(
73 Fields::from_list(
74 &headers
75 .into_iter()
76 .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
77 .collect::<Vec<_>>(),
78 )
79 .unwrap(),
80 );
81
82 let mut body =
83 executor::outgoing_body(response.body().expect("response should be writable"));
84
85 ResponseOutparam::set(response_out, Ok(response));
86
87 let mut stream =
88 executor::incoming_body(request.consume().expect("request should be readable"));
89
90 while let Some(chunk) = stream.next().await {
91 match chunk {
92 Ok(chunk) => {
93 if let Err(e) = body.send(chunk).await {
94 eprintln!("Error sending body: {e}");
95 break;
96 }
97 }
98 Err(e) => {
99 eprintln!("Error receiving body: {e}");
100 break;
101 }
102 }
103 }
104 }
105
106 (Method::Post, Some("/double-echo")) => {
107 // Pipe the request body to an outgoing request and stream the response back to the client.
108
109 if let Some(url) = headers.iter().find_map(|(k, v)| {
110 (k == "url")
111 .then_some(v)
112 .and_then(|v| std::str::from_utf8(v).ok())
113 .and_then(|v| Url::parse(v).ok())
114 }) {
115 match double_echo(request, &url).await {
116 Ok((request_copy, response)) => {
117 let mut stream = executor::incoming_body(
118 response.consume().expect("response should be consumable"),
119 );
120
121 let response = OutgoingResponse::new(
122 Fields::from_list(
123 &headers
124 .into_iter()
125 .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
126 .collect::<Vec<_>>(),
127 )
128 .unwrap(),
129 );
130
131 let mut body = executor::outgoing_body(
132 response.body().expect("response should be writable"),
133 );
134
135 ResponseOutparam::set(response_out, Ok(response));
136
137 let response_copy = async move {
138 while let Some(chunk) = stream.next().await {
139 body.send(chunk?).await?;
140 }
141 Ok::<_, anyhow::Error>(())
142 };
143
144 let (request_copy, response_copy) =
145 future::join(request_copy, response_copy).await;
146 if let Err(e) = request_copy.and(response_copy) {
147 eprintln!("error piping to and from {url}: {e}");
148 }
149 }
150
151 Err(e) => {
152 eprintln!("Error sending outgoing request to {url}: {e}");
153 server_error(response_out);
154 }
155 }
156 } else {
157 bad_request(response_out);
158 }
159 }
160
161 _ => method_not_allowed(response_out),
162 }
163 }
164
double_echo( incoming_request: IncomingRequest, url: &Url, ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)>165 async fn double_echo(
166 incoming_request: IncomingRequest,
167 url: &Url,
168 ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)> {
169 let outgoing_request = OutgoingRequest::new(Fields::new());
170
171 outgoing_request
172 .set_method(&Method::Post)
173 .map_err(|()| anyhow!("failed to set method"))?;
174
175 outgoing_request
176 .set_path_with_query(Some(url.path()))
177 .map_err(|()| anyhow!("failed to set path_with_query"))?;
178
179 outgoing_request
180 .set_scheme(Some(&match url.scheme() {
181 "http" => Scheme::Http,
182 "https" => Scheme::Https,
183 scheme => Scheme::Other(scheme.into()),
184 }))
185 .map_err(|()| anyhow!("failed to set scheme"))?;
186
187 outgoing_request
188 .set_authority(Some(&format!(
189 "{}{}",
190 url.host_str().unwrap_or(""),
191 if let Some(port) = url.port() {
192 format!(":{port}")
193 } else {
194 String::new()
195 }
196 )))
197 .map_err(|()| anyhow!("failed to set authority"))?;
198
199 let mut body = executor::outgoing_body(
200 outgoing_request
201 .body()
202 .expect("request body should be writable"),
203 );
204
205 let response = executor::outgoing_request_send(outgoing_request);
206
207 let mut stream = executor::incoming_body(
208 incoming_request
209 .consume()
210 .expect("request should be consumable"),
211 );
212
213 let copy = async move {
214 while let Some(chunk) = stream.next().await {
215 body.send(chunk?).await?;
216 }
217 Ok::<_, anyhow::Error>(())
218 };
219
220 let response = response.await?;
221
222 let status = response.status();
223
224 if !(200..300).contains(&status) {
225 bail!("unexpected status: {status}");
226 }
227
228 Ok((copy, response))
229 }
230
server_error(response_out: ResponseOutparam)231 fn server_error(response_out: ResponseOutparam) {
232 respond(500, response_out)
233 }
234
bad_request(response_out: ResponseOutparam)235 fn bad_request(response_out: ResponseOutparam) {
236 respond(400, response_out)
237 }
238
method_not_allowed(response_out: ResponseOutparam)239 fn method_not_allowed(response_out: ResponseOutparam) {
240 respond(405, response_out)
241 }
242
respond(status: u16, response_out: ResponseOutparam)243 fn respond(status: u16, response_out: ResponseOutparam) {
244 let response = OutgoingResponse::new(Fields::new());
245 response
246 .set_status_code(status)
247 .expect("setting status code");
248
249 let body = response.body().expect("response should be writable");
250
251 ResponseOutparam::set(response_out, Ok(response));
252
253 OutgoingBody::finish(body, None).expect("outgoing-body.finish");
254 }
255
hash(url: &Url) -> Result<String>256 async fn hash(url: &Url) -> Result<String> {
257 let request = OutgoingRequest::new(Fields::new());
258
259 request
260 .set_path_with_query(Some(url.path()))
261 .map_err(|()| anyhow!("failed to set path_with_query"))?;
262 request
263 .set_scheme(Some(&match url.scheme() {
264 "http" => Scheme::Http,
265 "https" => Scheme::Https,
266 scheme => Scheme::Other(scheme.into()),
267 }))
268 .map_err(|()| anyhow!("failed to set scheme"))?;
269 request
270 .set_authority(Some(&format!(
271 "{}{}",
272 url.host_str().unwrap_or(""),
273 if let Some(port) = url.port() {
274 format!(":{port}")
275 } else {
276 String::new()
277 }
278 )))
279 .map_err(|()| anyhow!("failed to set authority"))?;
280
281 let response = executor::outgoing_request_send(request).await?;
282
283 let status = response.status();
284
285 if !(200..300).contains(&status) {
286 bail!("unexpected status: {status}");
287 }
288
289 let mut body =
290 executor::incoming_body(response.consume().expect("response should be readable"));
291
292 use sha2::Digest;
293 let mut hasher = sha2::Sha256::new();
294 while let Some(chunk) = body.try_next().await? {
295 hasher.update(&chunk);
296 }
297
298 use base64::Engine;
299 Ok(base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize()))
300 }
301
302 // Technically this should not be here for a proxy, but given the current
303 // framework for tests it's required since this file is built as a `bin`
main()304 fn main() {}
305
306 mod executor {
307 use anyhow::{Error, Result, anyhow};
308 use futures::{Sink, Stream, future, sink, stream};
309 use std::{
310 cell::RefCell,
311 future::Future,
312 mem,
313 rc::Rc,
314 sync::{Arc, Mutex},
315 task::{Context, Poll, Wake, Waker},
316 };
317 use test_programs::wasi::{
318 http::{
319 outgoing_handler,
320 types::{
321 self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody,
322 OutgoingRequest, OutputStream,
323 },
324 },
325 io::{self, streams::StreamError},
326 };
327
328 const READ_SIZE: u64 = 16 * 1024;
329
330 static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
331
run<T>(future: impl Future<Output = T>) -> T332 pub fn run<T>(future: impl Future<Output = T>) -> T {
333 futures::pin_mut!(future);
334
335 struct DummyWaker;
336
337 impl Wake for DummyWaker {
338 fn wake(self: Arc<Self>) {}
339 }
340
341 let waker = Arc::new(DummyWaker).into();
342
343 loop {
344 match future.as_mut().poll(&mut Context::from_waker(&waker)) {
345 Poll::Pending => {
346 let mut new_wakers = Vec::new();
347
348 let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap());
349
350 assert!(!wakers.is_empty());
351
352 let pollables = wakers
353 .iter()
354 .map(|(pollable, _)| pollable)
355 .collect::<Vec<_>>();
356
357 let mut ready = vec![false; wakers.len()];
358
359 for index in io::poll::poll(&pollables) {
360 ready[usize::try_from(index).unwrap()] = true;
361 }
362
363 for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
364 if ready {
365 waker.wake()
366 } else {
367 new_wakers.push((pollable, waker));
368 }
369 }
370
371 *WAKERS.lock().unwrap() = new_wakers;
372 }
373 Poll::Ready(result) => break result,
374 }
375 }
376 }
377
outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error>378 pub fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error> {
379 struct Outgoing(Option<(OutputStream, OutgoingBody)>);
380
381 impl Drop for Outgoing {
382 fn drop(&mut self) {
383 if let Some((stream, body)) = self.0.take() {
384 drop(stream);
385 OutgoingBody::finish(body, None).expect("outgoing-body.finish");
386 }
387 }
388 }
389
390 let stream = body.write().expect("response body should be writable");
391 let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
392
393 sink::unfold((), {
394 move |(), chunk: Vec<u8>| {
395 future::poll_fn({
396 let mut offset = 0;
397 let mut flushing = false;
398 let pair = pair.clone();
399
400 move |context| {
401 let pair = pair.borrow();
402 let (stream, _) = &pair.0.as_ref().unwrap();
403
404 loop {
405 match stream.check_write() {
406 Ok(0) => {
407 WAKERS
408 .lock()
409 .unwrap()
410 .push((stream.subscribe(), context.waker().clone()));
411
412 break Poll::Pending;
413 }
414 Ok(count) => {
415 if offset == chunk.len() {
416 if flushing {
417 break Poll::Ready(Ok(()));
418 } else {
419 stream.flush().expect("stream should be flushable");
420 flushing = true;
421 }
422 } else {
423 let count = usize::try_from(count)
424 .unwrap()
425 .min(chunk.len() - offset);
426
427 match stream.write(&chunk[offset..][..count]) {
428 Ok(()) => {
429 offset += count;
430 }
431 Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
432 }
433 }
434 }
435 Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
436 }
437 }
438 }
439 })
440 }
441 })
442 }
443
outgoing_request_send( request: OutgoingRequest, ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>>444 pub fn outgoing_request_send(
445 request: OutgoingRequest,
446 ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>> {
447 future::poll_fn({
448 let response = outgoing_handler::handle(request, None);
449
450 move |context| match &response {
451 Ok(response) => {
452 if let Some(response) = response.get() {
453 Poll::Ready(response.unwrap())
454 } else {
455 WAKERS
456 .lock()
457 .unwrap()
458 .push((response.subscribe(), context.waker().clone()));
459 Poll::Pending
460 }
461 }
462 Err(error) => Poll::Ready(Err(error.clone())),
463 }
464 })
465 }
466
incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>>467 pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
468 enum Inner {
469 Stream {
470 stream: InputStream,
471 body: IncomingBody,
472 },
473 Trailers(FutureTrailers),
474 Closed,
475 }
476
477 struct Incoming(Inner);
478
479 impl Drop for Incoming {
480 fn drop(&mut self) {
481 match mem::replace(&mut self.0, Inner::Closed) {
482 Inner::Stream { stream, body } => {
483 drop(stream);
484 IncomingBody::finish(body);
485 }
486 Inner::Trailers(_) | Inner::Closed => {}
487 }
488 }
489 }
490
491 stream::poll_fn({
492 let stream = body.stream().expect("response body should be readable");
493 let mut incoming = Incoming(Inner::Stream { stream, body });
494
495 move |context| {
496 loop {
497 match &incoming.0 {
498 Inner::Stream { stream, .. } => match stream.read(READ_SIZE) {
499 Ok(buffer) => {
500 return if buffer.is_empty() {
501 WAKERS
502 .lock()
503 .unwrap()
504 .push((stream.subscribe(), context.waker().clone()));
505 Poll::Pending
506 } else {
507 Poll::Ready(Some(Ok(buffer)))
508 };
509 }
510 Err(StreamError::Closed) => {
511 let Inner::Stream { stream, body } =
512 mem::replace(&mut incoming.0, Inner::Closed)
513 else {
514 unreachable!();
515 };
516 drop(stream);
517 incoming.0 = Inner::Trailers(IncomingBody::finish(body));
518 }
519 Err(StreamError::LastOperationFailed(error)) => {
520 return Poll::Ready(Some(Err(anyhow!(
521 "{}",
522 error.to_debug_string()
523 ))));
524 }
525 },
526
527 Inner::Trailers(trailers) => {
528 match trailers.get() {
529 Some(Ok(trailers)) => {
530 incoming.0 = Inner::Closed;
531 match trailers {
532 Ok(Some(_)) => {
533 // Currently, we just ignore any trailers. TODO: Add a test that
534 // expects trailers and verify they match the expected contents.
535 }
536 Ok(None) => {
537 // No trailers; nothing else to do.
538 }
539 Err(error) => {
540 // Error reading the trailers: pass it on to the application.
541 return Poll::Ready(Some(Err(anyhow!("{error:?}"))));
542 }
543 }
544 }
545 Some(Err(_)) => {
546 // Should only happen if we try to retrieve the trailers twice, i.e. a bug in
547 // this code.
548 unreachable!();
549 }
550 None => {
551 WAKERS
552 .lock()
553 .unwrap()
554 .push((trailers.subscribe(), context.waker().clone()));
555 return Poll::Pending;
556 }
557 }
558 }
559
560 Inner::Closed => {
561 return Poll::Ready(None);
562 }
563 }
564 }
565 }
566 })
567 }
568 }
569