1*b315a0a8SYosh use anyhow::{Result, anyhow, bail};
2*b315a0a8SYosh use futures::{Future, SinkExt, StreamExt, TryStreamExt, future, stream};
3*b315a0a8SYosh use test_programs::wasi::http::types::{
4*b315a0a8SYosh     Fields, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest,
5*b315a0a8SYosh     OutgoingResponse, ResponseOutparam, Scheme,
6*b315a0a8SYosh };
7*b315a0a8SYosh use url::Url;
8*b315a0a8SYosh 
9*b315a0a8SYosh const MAX_CONCURRENCY: usize = 16;
10*b315a0a8SYosh 
11*b315a0a8SYosh struct Handler;
12*b315a0a8SYosh 
13*b315a0a8SYosh test_programs::proxy::export!(Handler);
14*b315a0a8SYosh 
15*b315a0a8SYosh impl test_programs::proxy::exports::wasi::http::incoming_handler::Guest for Handler {
handle(request: IncomingRequest, response_out: ResponseOutparam)16*b315a0a8SYosh     fn handle(request: IncomingRequest, response_out: ResponseOutparam) {
17*b315a0a8SYosh         executor::run(async move {
18*b315a0a8SYosh             handle_request(request, response_out).await;
19*b315a0a8SYosh         })
20*b315a0a8SYosh     }
21*b315a0a8SYosh }
22*b315a0a8SYosh 
handle_request(request: IncomingRequest, response_out: ResponseOutparam)23*b315a0a8SYosh async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam) {
24*b315a0a8SYosh     let headers = request.headers().entries();
25*b315a0a8SYosh 
26*b315a0a8SYosh     assert!(request.authority().is_some());
27*b315a0a8SYosh 
28*b315a0a8SYosh     match (request.method(), request.path_with_query().as_deref()) {
29*b315a0a8SYosh         (Method::Get, Some("/hash-all")) => {
30*b315a0a8SYosh             // Send outgoing GET requests to the specified URLs and stream the hashes of the response bodies as
31*b315a0a8SYosh             // they arrive.
32*b315a0a8SYosh 
33*b315a0a8SYosh             let urls = headers.iter().filter_map(|(k, v)| {
34*b315a0a8SYosh                 (k == "url")
35*b315a0a8SYosh                     .then_some(v)
36*b315a0a8SYosh                     .and_then(|v| std::str::from_utf8(v).ok())
37*b315a0a8SYosh                     .and_then(|v| Url::parse(v).ok())
38*b315a0a8SYosh             });
39*b315a0a8SYosh 
40*b315a0a8SYosh             let results = urls.map(|url| async move {
41*b315a0a8SYosh                 let result = hash(&url).await;
42*b315a0a8SYosh                 (url, result)
43*b315a0a8SYosh             });
44*b315a0a8SYosh 
45*b315a0a8SYosh             let mut results = stream::iter(results).buffer_unordered(MAX_CONCURRENCY);
46*b315a0a8SYosh 
47*b315a0a8SYosh             let response = OutgoingResponse::new(
48*b315a0a8SYosh                 Fields::from_list(&[("content-type".to_string(), b"text/plain".to_vec())]).unwrap(),
49*b315a0a8SYosh             );
50*b315a0a8SYosh 
51*b315a0a8SYosh             let mut body =
52*b315a0a8SYosh                 executor::outgoing_body(response.body().expect("response should be writable"));
53*b315a0a8SYosh 
54*b315a0a8SYosh             ResponseOutparam::set(response_out, Ok(response));
55*b315a0a8SYosh 
56*b315a0a8SYosh             while let Some((url, result)) = results.next().await {
57*b315a0a8SYosh                 let payload = match result {
58*b315a0a8SYosh                     Ok(hash) => format!("{url}: {hash}\n"),
59*b315a0a8SYosh                     Err(e) => format!("{url}: {e:?}\n"),
60*b315a0a8SYosh                 }
61*b315a0a8SYosh                 .into_bytes();
62*b315a0a8SYosh 
63*b315a0a8SYosh                 if let Err(e) = body.send(payload).await {
64*b315a0a8SYosh                     eprintln!("Error sending payload: {e}");
65*b315a0a8SYosh                 }
66*b315a0a8SYosh             }
67*b315a0a8SYosh         }
68*b315a0a8SYosh 
69*b315a0a8SYosh         (Method::Post, Some("/echo")) => {
70*b315a0a8SYosh             // Echo the request body without buffering it.
71*b315a0a8SYosh 
72*b315a0a8SYosh             let response = OutgoingResponse::new(
73*b315a0a8SYosh                 Fields::from_list(
74*b315a0a8SYosh                     &headers
75*b315a0a8SYosh                         .into_iter()
76*b315a0a8SYosh                         .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
77*b315a0a8SYosh                         .collect::<Vec<_>>(),
78*b315a0a8SYosh                 )
79*b315a0a8SYosh                 .unwrap(),
80*b315a0a8SYosh             );
81*b315a0a8SYosh 
82*b315a0a8SYosh             let mut body =
83*b315a0a8SYosh                 executor::outgoing_body(response.body().expect("response should be writable"));
84*b315a0a8SYosh 
85*b315a0a8SYosh             ResponseOutparam::set(response_out, Ok(response));
86*b315a0a8SYosh 
87*b315a0a8SYosh             let mut stream =
88*b315a0a8SYosh                 executor::incoming_body(request.consume().expect("request should be readable"));
89*b315a0a8SYosh 
90*b315a0a8SYosh             while let Some(chunk) = stream.next().await {
91*b315a0a8SYosh                 match chunk {
92*b315a0a8SYosh                     Ok(chunk) => {
93*b315a0a8SYosh                         if let Err(e) = body.send(chunk).await {
94*b315a0a8SYosh                             eprintln!("Error sending body: {e}");
95*b315a0a8SYosh                             break;
96*b315a0a8SYosh                         }
97*b315a0a8SYosh                     }
98*b315a0a8SYosh                     Err(e) => {
99*b315a0a8SYosh                         eprintln!("Error receiving body: {e}");
100*b315a0a8SYosh                         break;
101*b315a0a8SYosh                     }
102*b315a0a8SYosh                 }
103*b315a0a8SYosh             }
104*b315a0a8SYosh         }
105*b315a0a8SYosh 
106*b315a0a8SYosh         (Method::Post, Some("/double-echo")) => {
107*b315a0a8SYosh             // Pipe the request body to an outgoing request and stream the response back to the client.
108*b315a0a8SYosh 
109*b315a0a8SYosh             if let Some(url) = headers.iter().find_map(|(k, v)| {
110*b315a0a8SYosh                 (k == "url")
111*b315a0a8SYosh                     .then_some(v)
112*b315a0a8SYosh                     .and_then(|v| std::str::from_utf8(v).ok())
113*b315a0a8SYosh                     .and_then(|v| Url::parse(v).ok())
114*b315a0a8SYosh             }) {
115*b315a0a8SYosh                 match double_echo(request, &url).await {
116*b315a0a8SYosh                     Ok((request_copy, response)) => {
117*b315a0a8SYosh                         let mut stream = executor::incoming_body(
118*b315a0a8SYosh                             response.consume().expect("response should be consumable"),
119*b315a0a8SYosh                         );
120*b315a0a8SYosh 
121*b315a0a8SYosh                         let response = OutgoingResponse::new(
122*b315a0a8SYosh                             Fields::from_list(
123*b315a0a8SYosh                                 &headers
124*b315a0a8SYosh                                     .into_iter()
125*b315a0a8SYosh                                     .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
126*b315a0a8SYosh                                     .collect::<Vec<_>>(),
127*b315a0a8SYosh                             )
128*b315a0a8SYosh                             .unwrap(),
129*b315a0a8SYosh                         );
130*b315a0a8SYosh 
131*b315a0a8SYosh                         let mut body = executor::outgoing_body(
132*b315a0a8SYosh                             response.body().expect("response should be writable"),
133*b315a0a8SYosh                         );
134*b315a0a8SYosh 
135*b315a0a8SYosh                         ResponseOutparam::set(response_out, Ok(response));
136*b315a0a8SYosh 
137*b315a0a8SYosh                         let response_copy = async move {
138*b315a0a8SYosh                             while let Some(chunk) = stream.next().await {
139*b315a0a8SYosh                                 body.send(chunk?).await?;
140*b315a0a8SYosh                             }
141*b315a0a8SYosh                             Ok::<_, anyhow::Error>(())
142*b315a0a8SYosh                         };
143*b315a0a8SYosh 
144*b315a0a8SYosh                         let (request_copy, response_copy) =
145*b315a0a8SYosh                             future::join(request_copy, response_copy).await;
146*b315a0a8SYosh                         if let Err(e) = request_copy.and(response_copy) {
147*b315a0a8SYosh                             eprintln!("error piping to and from {url}: {e}");
148*b315a0a8SYosh                         }
149*b315a0a8SYosh                     }
150*b315a0a8SYosh 
151*b315a0a8SYosh                     Err(e) => {
152*b315a0a8SYosh                         eprintln!("Error sending outgoing request to {url}: {e}");
153*b315a0a8SYosh                         server_error(response_out);
154*b315a0a8SYosh                     }
155*b315a0a8SYosh                 }
156*b315a0a8SYosh             } else {
157*b315a0a8SYosh                 bad_request(response_out);
158*b315a0a8SYosh             }
159*b315a0a8SYosh         }
160*b315a0a8SYosh 
161*b315a0a8SYosh         _ => method_not_allowed(response_out),
162*b315a0a8SYosh     }
163*b315a0a8SYosh }
164*b315a0a8SYosh 
double_echo( incoming_request: IncomingRequest, url: &Url, ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)>165*b315a0a8SYosh async fn double_echo(
166*b315a0a8SYosh     incoming_request: IncomingRequest,
167*b315a0a8SYosh     url: &Url,
168*b315a0a8SYosh ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)> {
169*b315a0a8SYosh     let outgoing_request = OutgoingRequest::new(Fields::new());
170*b315a0a8SYosh 
171*b315a0a8SYosh     outgoing_request
172*b315a0a8SYosh         .set_method(&Method::Post)
173*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set method"))?;
174*b315a0a8SYosh 
175*b315a0a8SYosh     outgoing_request
176*b315a0a8SYosh         .set_path_with_query(Some(url.path()))
177*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set path_with_query"))?;
178*b315a0a8SYosh 
179*b315a0a8SYosh     outgoing_request
180*b315a0a8SYosh         .set_scheme(Some(&match url.scheme() {
181*b315a0a8SYosh             "http" => Scheme::Http,
182*b315a0a8SYosh             "https" => Scheme::Https,
183*b315a0a8SYosh             scheme => Scheme::Other(scheme.into()),
184*b315a0a8SYosh         }))
185*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set scheme"))?;
186*b315a0a8SYosh 
187*b315a0a8SYosh     outgoing_request
188*b315a0a8SYosh         .set_authority(Some(&format!(
189*b315a0a8SYosh             "{}{}",
190*b315a0a8SYosh             url.host_str().unwrap_or(""),
191*b315a0a8SYosh             if let Some(port) = url.port() {
192*b315a0a8SYosh                 format!(":{port}")
193*b315a0a8SYosh             } else {
194*b315a0a8SYosh                 String::new()
195*b315a0a8SYosh             }
196*b315a0a8SYosh         )))
197*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set authority"))?;
198*b315a0a8SYosh 
199*b315a0a8SYosh     let mut body = executor::outgoing_body(
200*b315a0a8SYosh         outgoing_request
201*b315a0a8SYosh             .body()
202*b315a0a8SYosh             .expect("request body should be writable"),
203*b315a0a8SYosh     );
204*b315a0a8SYosh 
205*b315a0a8SYosh     let response = executor::outgoing_request_send(outgoing_request);
206*b315a0a8SYosh 
207*b315a0a8SYosh     let mut stream = executor::incoming_body(
208*b315a0a8SYosh         incoming_request
209*b315a0a8SYosh             .consume()
210*b315a0a8SYosh             .expect("request should be consumable"),
211*b315a0a8SYosh     );
212*b315a0a8SYosh 
213*b315a0a8SYosh     let copy = async move {
214*b315a0a8SYosh         while let Some(chunk) = stream.next().await {
215*b315a0a8SYosh             body.send(chunk?).await?;
216*b315a0a8SYosh         }
217*b315a0a8SYosh         Ok::<_, anyhow::Error>(())
218*b315a0a8SYosh     };
219*b315a0a8SYosh 
220*b315a0a8SYosh     let response = response.await?;
221*b315a0a8SYosh 
222*b315a0a8SYosh     let status = response.status();
223*b315a0a8SYosh 
224*b315a0a8SYosh     if !(200..300).contains(&status) {
225*b315a0a8SYosh         bail!("unexpected status: {status}");
226*b315a0a8SYosh     }
227*b315a0a8SYosh 
228*b315a0a8SYosh     Ok((copy, response))
229*b315a0a8SYosh }
230*b315a0a8SYosh 
server_error(response_out: ResponseOutparam)231*b315a0a8SYosh fn server_error(response_out: ResponseOutparam) {
232*b315a0a8SYosh     respond(500, response_out)
233*b315a0a8SYosh }
234*b315a0a8SYosh 
bad_request(response_out: ResponseOutparam)235*b315a0a8SYosh fn bad_request(response_out: ResponseOutparam) {
236*b315a0a8SYosh     respond(400, response_out)
237*b315a0a8SYosh }
238*b315a0a8SYosh 
method_not_allowed(response_out: ResponseOutparam)239*b315a0a8SYosh fn method_not_allowed(response_out: ResponseOutparam) {
240*b315a0a8SYosh     respond(405, response_out)
241*b315a0a8SYosh }
242*b315a0a8SYosh 
respond(status: u16, response_out: ResponseOutparam)243*b315a0a8SYosh fn respond(status: u16, response_out: ResponseOutparam) {
244*b315a0a8SYosh     let response = OutgoingResponse::new(Fields::new());
245*b315a0a8SYosh     response
246*b315a0a8SYosh         .set_status_code(status)
247*b315a0a8SYosh         .expect("setting status code");
248*b315a0a8SYosh 
249*b315a0a8SYosh     let body = response.body().expect("response should be writable");
250*b315a0a8SYosh 
251*b315a0a8SYosh     ResponseOutparam::set(response_out, Ok(response));
252*b315a0a8SYosh 
253*b315a0a8SYosh     OutgoingBody::finish(body, None).expect("outgoing-body.finish");
254*b315a0a8SYosh }
255*b315a0a8SYosh 
hash(url: &Url) -> Result<String>256*b315a0a8SYosh async fn hash(url: &Url) -> Result<String> {
257*b315a0a8SYosh     let request = OutgoingRequest::new(Fields::new());
258*b315a0a8SYosh 
259*b315a0a8SYosh     request
260*b315a0a8SYosh         .set_path_with_query(Some(url.path()))
261*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set path_with_query"))?;
262*b315a0a8SYosh     request
263*b315a0a8SYosh         .set_scheme(Some(&match url.scheme() {
264*b315a0a8SYosh             "http" => Scheme::Http,
265*b315a0a8SYosh             "https" => Scheme::Https,
266*b315a0a8SYosh             scheme => Scheme::Other(scheme.into()),
267*b315a0a8SYosh         }))
268*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set scheme"))?;
269*b315a0a8SYosh     request
270*b315a0a8SYosh         .set_authority(Some(&format!(
271*b315a0a8SYosh             "{}{}",
272*b315a0a8SYosh             url.host_str().unwrap_or(""),
273*b315a0a8SYosh             if let Some(port) = url.port() {
274*b315a0a8SYosh                 format!(":{port}")
275*b315a0a8SYosh             } else {
276*b315a0a8SYosh                 String::new()
277*b315a0a8SYosh             }
278*b315a0a8SYosh         )))
279*b315a0a8SYosh         .map_err(|()| anyhow!("failed to set authority"))?;
280*b315a0a8SYosh 
281*b315a0a8SYosh     let response = executor::outgoing_request_send(request).await?;
282*b315a0a8SYosh 
283*b315a0a8SYosh     let status = response.status();
284*b315a0a8SYosh 
285*b315a0a8SYosh     if !(200..300).contains(&status) {
286*b315a0a8SYosh         bail!("unexpected status: {status}");
287*b315a0a8SYosh     }
288*b315a0a8SYosh 
289*b315a0a8SYosh     let mut body =
290*b315a0a8SYosh         executor::incoming_body(response.consume().expect("response should be readable"));
291*b315a0a8SYosh 
292*b315a0a8SYosh     use sha2::Digest;
293*b315a0a8SYosh     let mut hasher = sha2::Sha256::new();
294*b315a0a8SYosh     while let Some(chunk) = body.try_next().await? {
295*b315a0a8SYosh         hasher.update(&chunk);
296*b315a0a8SYosh     }
297*b315a0a8SYosh 
298*b315a0a8SYosh     use base64::Engine;
299*b315a0a8SYosh     Ok(base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize()))
300*b315a0a8SYosh }
301*b315a0a8SYosh 
302*b315a0a8SYosh // Technically this should not be here for a proxy, but given the current
303*b315a0a8SYosh // framework for tests it's required since this file is built as a `bin`
main()304*b315a0a8SYosh fn main() {}
305*b315a0a8SYosh 
306*b315a0a8SYosh mod executor {
307*b315a0a8SYosh     use anyhow::{Error, Result, anyhow};
308*b315a0a8SYosh     use futures::{Sink, Stream, future, sink, stream};
309*b315a0a8SYosh     use std::{
310*b315a0a8SYosh         cell::RefCell,
311*b315a0a8SYosh         future::Future,
312*b315a0a8SYosh         mem,
313*b315a0a8SYosh         rc::Rc,
314*b315a0a8SYosh         sync::{Arc, Mutex},
315*b315a0a8SYosh         task::{Context, Poll, Wake, Waker},
316*b315a0a8SYosh     };
317*b315a0a8SYosh     use test_programs::wasi::{
318*b315a0a8SYosh         http::{
319*b315a0a8SYosh             outgoing_handler,
320*b315a0a8SYosh             types::{
321*b315a0a8SYosh                 self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody,
322*b315a0a8SYosh                 OutgoingRequest, OutputStream,
323*b315a0a8SYosh             },
324*b315a0a8SYosh         },
325*b315a0a8SYosh         io::{self, streams::StreamError},
326*b315a0a8SYosh     };
327*b315a0a8SYosh 
328*b315a0a8SYosh     const READ_SIZE: u64 = 16 * 1024;
329*b315a0a8SYosh 
330*b315a0a8SYosh     static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
331*b315a0a8SYosh 
run<T>(future: impl Future<Output = T>) -> T332*b315a0a8SYosh     pub fn run<T>(future: impl Future<Output = T>) -> T {
333*b315a0a8SYosh         futures::pin_mut!(future);
334*b315a0a8SYosh 
335*b315a0a8SYosh         struct DummyWaker;
336*b315a0a8SYosh 
337*b315a0a8SYosh         impl Wake for DummyWaker {
338*b315a0a8SYosh             fn wake(self: Arc<Self>) {}
339*b315a0a8SYosh         }
340*b315a0a8SYosh 
341*b315a0a8SYosh         let waker = Arc::new(DummyWaker).into();
342*b315a0a8SYosh 
343*b315a0a8SYosh         loop {
344*b315a0a8SYosh             match future.as_mut().poll(&mut Context::from_waker(&waker)) {
345*b315a0a8SYosh                 Poll::Pending => {
346*b315a0a8SYosh                     let mut new_wakers = Vec::new();
347*b315a0a8SYosh 
348*b315a0a8SYosh                     let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap());
349*b315a0a8SYosh 
350*b315a0a8SYosh                     assert!(!wakers.is_empty());
351*b315a0a8SYosh 
352*b315a0a8SYosh                     let pollables = wakers
353*b315a0a8SYosh                         .iter()
354*b315a0a8SYosh                         .map(|(pollable, _)| pollable)
355*b315a0a8SYosh                         .collect::<Vec<_>>();
356*b315a0a8SYosh 
357*b315a0a8SYosh                     let mut ready = vec![false; wakers.len()];
358*b315a0a8SYosh 
359*b315a0a8SYosh                     for index in io::poll::poll(&pollables) {
360*b315a0a8SYosh                         ready[usize::try_from(index).unwrap()] = true;
361*b315a0a8SYosh                     }
362*b315a0a8SYosh 
363*b315a0a8SYosh                     for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
364*b315a0a8SYosh                         if ready {
365*b315a0a8SYosh                             waker.wake()
366*b315a0a8SYosh                         } else {
367*b315a0a8SYosh                             new_wakers.push((pollable, waker));
368*b315a0a8SYosh                         }
369*b315a0a8SYosh                     }
370*b315a0a8SYosh 
371*b315a0a8SYosh                     *WAKERS.lock().unwrap() = new_wakers;
372*b315a0a8SYosh                 }
373*b315a0a8SYosh                 Poll::Ready(result) => break result,
374*b315a0a8SYosh             }
375*b315a0a8SYosh         }
376*b315a0a8SYosh     }
377*b315a0a8SYosh 
outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error>378*b315a0a8SYosh     pub fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error> {
379*b315a0a8SYosh         struct Outgoing(Option<(OutputStream, OutgoingBody)>);
380*b315a0a8SYosh 
381*b315a0a8SYosh         impl Drop for Outgoing {
382*b315a0a8SYosh             fn drop(&mut self) {
383*b315a0a8SYosh                 if let Some((stream, body)) = self.0.take() {
384*b315a0a8SYosh                     drop(stream);
385*b315a0a8SYosh                     OutgoingBody::finish(body, None).expect("outgoing-body.finish");
386*b315a0a8SYosh                 }
387*b315a0a8SYosh             }
388*b315a0a8SYosh         }
389*b315a0a8SYosh 
390*b315a0a8SYosh         let stream = body.write().expect("response body should be writable");
391*b315a0a8SYosh         let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
392*b315a0a8SYosh 
393*b315a0a8SYosh         sink::unfold((), {
394*b315a0a8SYosh             move |(), chunk: Vec<u8>| {
395*b315a0a8SYosh                 future::poll_fn({
396*b315a0a8SYosh                     let mut offset = 0;
397*b315a0a8SYosh                     let mut flushing = false;
398*b315a0a8SYosh                     let pair = pair.clone();
399*b315a0a8SYosh 
400*b315a0a8SYosh                     move |context| {
401*b315a0a8SYosh                         let pair = pair.borrow();
402*b315a0a8SYosh                         let (stream, _) = &pair.0.as_ref().unwrap();
403*b315a0a8SYosh 
404*b315a0a8SYosh                         loop {
405*b315a0a8SYosh                             match stream.check_write() {
406*b315a0a8SYosh                                 Ok(0) => {
407*b315a0a8SYosh                                     WAKERS
408*b315a0a8SYosh                                         .lock()
409*b315a0a8SYosh                                         .unwrap()
410*b315a0a8SYosh                                         .push((stream.subscribe(), context.waker().clone()));
411*b315a0a8SYosh 
412*b315a0a8SYosh                                     break Poll::Pending;
413*b315a0a8SYosh                                 }
414*b315a0a8SYosh                                 Ok(count) => {
415*b315a0a8SYosh                                     if offset == chunk.len() {
416*b315a0a8SYosh                                         if flushing {
417*b315a0a8SYosh                                             break Poll::Ready(Ok(()));
418*b315a0a8SYosh                                         } else {
419*b315a0a8SYosh                                             stream.flush().expect("stream should be flushable");
420*b315a0a8SYosh                                             flushing = true;
421*b315a0a8SYosh                                         }
422*b315a0a8SYosh                                     } else {
423*b315a0a8SYosh                                         let count = usize::try_from(count)
424*b315a0a8SYosh                                             .unwrap()
425*b315a0a8SYosh                                             .min(chunk.len() - offset);
426*b315a0a8SYosh 
427*b315a0a8SYosh                                         match stream.write(&chunk[offset..][..count]) {
428*b315a0a8SYosh                                             Ok(()) => {
429*b315a0a8SYosh                                                 offset += count;
430*b315a0a8SYosh                                             }
431*b315a0a8SYosh                                             Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
432*b315a0a8SYosh                                         }
433*b315a0a8SYosh                                     }
434*b315a0a8SYosh                                 }
435*b315a0a8SYosh                                 Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
436*b315a0a8SYosh                             }
437*b315a0a8SYosh                         }
438*b315a0a8SYosh                     }
439*b315a0a8SYosh                 })
440*b315a0a8SYosh             }
441*b315a0a8SYosh         })
442*b315a0a8SYosh     }
443*b315a0a8SYosh 
outgoing_request_send( request: OutgoingRequest, ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>>444*b315a0a8SYosh     pub fn outgoing_request_send(
445*b315a0a8SYosh         request: OutgoingRequest,
446*b315a0a8SYosh     ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>> {
447*b315a0a8SYosh         future::poll_fn({
448*b315a0a8SYosh             let response = outgoing_handler::handle(request, None);
449*b315a0a8SYosh 
450*b315a0a8SYosh             move |context| match &response {
451*b315a0a8SYosh                 Ok(response) => {
452*b315a0a8SYosh                     if let Some(response) = response.get() {
453*b315a0a8SYosh                         Poll::Ready(response.unwrap())
454*b315a0a8SYosh                     } else {
455*b315a0a8SYosh                         WAKERS
456*b315a0a8SYosh                             .lock()
457*b315a0a8SYosh                             .unwrap()
458*b315a0a8SYosh                             .push((response.subscribe(), context.waker().clone()));
459*b315a0a8SYosh                         Poll::Pending
460*b315a0a8SYosh                     }
461*b315a0a8SYosh                 }
462*b315a0a8SYosh                 Err(error) => Poll::Ready(Err(error.clone())),
463*b315a0a8SYosh             }
464*b315a0a8SYosh         })
465*b315a0a8SYosh     }
466*b315a0a8SYosh 
incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>>467*b315a0a8SYosh     pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
468*b315a0a8SYosh         enum Inner {
469*b315a0a8SYosh             Stream {
470*b315a0a8SYosh                 stream: InputStream,
471*b315a0a8SYosh                 body: IncomingBody,
472*b315a0a8SYosh             },
473*b315a0a8SYosh             Trailers(FutureTrailers),
474*b315a0a8SYosh             Closed,
475*b315a0a8SYosh         }
476*b315a0a8SYosh 
477*b315a0a8SYosh         struct Incoming(Inner);
478*b315a0a8SYosh 
479*b315a0a8SYosh         impl Drop for Incoming {
480*b315a0a8SYosh             fn drop(&mut self) {
481*b315a0a8SYosh                 match mem::replace(&mut self.0, Inner::Closed) {
482*b315a0a8SYosh                     Inner::Stream { stream, body } => {
483*b315a0a8SYosh                         drop(stream);
484*b315a0a8SYosh                         IncomingBody::finish(body);
485*b315a0a8SYosh                     }
486*b315a0a8SYosh                     Inner::Trailers(_) | Inner::Closed => {}
487*b315a0a8SYosh                 }
488*b315a0a8SYosh             }
489*b315a0a8SYosh         }
490*b315a0a8SYosh 
491*b315a0a8SYosh         stream::poll_fn({
492*b315a0a8SYosh             let stream = body.stream().expect("response body should be readable");
493*b315a0a8SYosh             let mut incoming = Incoming(Inner::Stream { stream, body });
494*b315a0a8SYosh 
495*b315a0a8SYosh             move |context| {
496*b315a0a8SYosh                 loop {
497*b315a0a8SYosh                     match &incoming.0 {
498*b315a0a8SYosh                         Inner::Stream { stream, .. } => match stream.read(READ_SIZE) {
499*b315a0a8SYosh                             Ok(buffer) => {
500*b315a0a8SYosh                                 return if buffer.is_empty() {
501*b315a0a8SYosh                                     WAKERS
502*b315a0a8SYosh                                         .lock()
503*b315a0a8SYosh                                         .unwrap()
504*b315a0a8SYosh                                         .push((stream.subscribe(), context.waker().clone()));
505*b315a0a8SYosh                                     Poll::Pending
506*b315a0a8SYosh                                 } else {
507*b315a0a8SYosh                                     Poll::Ready(Some(Ok(buffer)))
508*b315a0a8SYosh                                 };
509*b315a0a8SYosh                             }
510*b315a0a8SYosh                             Err(StreamError::Closed) => {
511*b315a0a8SYosh                                 let Inner::Stream { stream, body } =
512*b315a0a8SYosh                                     mem::replace(&mut incoming.0, Inner::Closed)
513*b315a0a8SYosh                                 else {
514*b315a0a8SYosh                                     unreachable!();
515*b315a0a8SYosh                                 };
516*b315a0a8SYosh                                 drop(stream);
517*b315a0a8SYosh                                 incoming.0 = Inner::Trailers(IncomingBody::finish(body));
518*b315a0a8SYosh                             }
519*b315a0a8SYosh                             Err(StreamError::LastOperationFailed(error)) => {
520*b315a0a8SYosh                                 return Poll::Ready(Some(Err(anyhow!(
521*b315a0a8SYosh                                     "{}",
522*b315a0a8SYosh                                     error.to_debug_string()
523*b315a0a8SYosh                                 ))));
524*b315a0a8SYosh                             }
525*b315a0a8SYosh                         },
526*b315a0a8SYosh 
527*b315a0a8SYosh                         Inner::Trailers(trailers) => {
528*b315a0a8SYosh                             match trailers.get() {
529*b315a0a8SYosh                                 Some(Ok(trailers)) => {
530*b315a0a8SYosh                                     incoming.0 = Inner::Closed;
531*b315a0a8SYosh                                     match trailers {
532*b315a0a8SYosh                                         Ok(Some(_)) => {
533*b315a0a8SYosh                                             // Currently, we just ignore any trailers.  TODO: Add a test that
534*b315a0a8SYosh                                             // expects trailers and verify they match the expected contents.
535*b315a0a8SYosh                                         }
536*b315a0a8SYosh                                         Ok(None) => {
537*b315a0a8SYosh                                             // No trailers; nothing else to do.
538*b315a0a8SYosh                                         }
539*b315a0a8SYosh                                         Err(error) => {
540*b315a0a8SYosh                                             // Error reading the trailers: pass it on to the application.
541*b315a0a8SYosh                                             return Poll::Ready(Some(Err(anyhow!("{error:?}"))));
542*b315a0a8SYosh                                         }
543*b315a0a8SYosh                                     }
544*b315a0a8SYosh                                 }
545*b315a0a8SYosh                                 Some(Err(_)) => {
546*b315a0a8SYosh                                     // Should only happen if we try to retrieve the trailers twice, i.e. a bug in
547*b315a0a8SYosh                                     // this code.
548*b315a0a8SYosh                                     unreachable!();
549*b315a0a8SYosh                                 }
550*b315a0a8SYosh                                 None => {
551*b315a0a8SYosh                                     WAKERS
552*b315a0a8SYosh                                         .lock()
553*b315a0a8SYosh                                         .unwrap()
554*b315a0a8SYosh                                         .push((trailers.subscribe(), context.waker().clone()));
555*b315a0a8SYosh                                     return Poll::Pending;
556*b315a0a8SYosh                                 }
557*b315a0a8SYosh                             }
558*b315a0a8SYosh                         }
559*b315a0a8SYosh 
560*b315a0a8SYosh                         Inner::Closed => {
561*b315a0a8SYosh                             return Poll::Ready(None);
562*b315a0a8SYosh                         }
563*b315a0a8SYosh                     }
564*b315a0a8SYosh                 }
565*b315a0a8SYosh             }
566*b315a0a8SYosh         })
567*b315a0a8SYosh     }
568*b315a0a8SYosh }
569