1*b315a0a8SYosh use anyhow::{Result, anyhow, bail};
2*b315a0a8SYosh use futures::{Future, SinkExt, StreamExt, TryStreamExt, future, stream};
3*b315a0a8SYosh use test_programs::wasi::http::types::{
4*b315a0a8SYosh Fields, IncomingRequest, IncomingResponse, Method, OutgoingBody, OutgoingRequest,
5*b315a0a8SYosh OutgoingResponse, ResponseOutparam, Scheme,
6*b315a0a8SYosh };
7*b315a0a8SYosh use url::Url;
8*b315a0a8SYosh
9*b315a0a8SYosh const MAX_CONCURRENCY: usize = 16;
10*b315a0a8SYosh
11*b315a0a8SYosh struct Handler;
12*b315a0a8SYosh
13*b315a0a8SYosh test_programs::proxy::export!(Handler);
14*b315a0a8SYosh
15*b315a0a8SYosh impl test_programs::proxy::exports::wasi::http::incoming_handler::Guest for Handler {
handle(request: IncomingRequest, response_out: ResponseOutparam)16*b315a0a8SYosh fn handle(request: IncomingRequest, response_out: ResponseOutparam) {
17*b315a0a8SYosh executor::run(async move {
18*b315a0a8SYosh handle_request(request, response_out).await;
19*b315a0a8SYosh })
20*b315a0a8SYosh }
21*b315a0a8SYosh }
22*b315a0a8SYosh
handle_request(request: IncomingRequest, response_out: ResponseOutparam)23*b315a0a8SYosh async fn handle_request(request: IncomingRequest, response_out: ResponseOutparam) {
24*b315a0a8SYosh let headers = request.headers().entries();
25*b315a0a8SYosh
26*b315a0a8SYosh assert!(request.authority().is_some());
27*b315a0a8SYosh
28*b315a0a8SYosh match (request.method(), request.path_with_query().as_deref()) {
29*b315a0a8SYosh (Method::Get, Some("/hash-all")) => {
30*b315a0a8SYosh // Send outgoing GET requests to the specified URLs and stream the hashes of the response bodies as
31*b315a0a8SYosh // they arrive.
32*b315a0a8SYosh
33*b315a0a8SYosh let urls = headers.iter().filter_map(|(k, v)| {
34*b315a0a8SYosh (k == "url")
35*b315a0a8SYosh .then_some(v)
36*b315a0a8SYosh .and_then(|v| std::str::from_utf8(v).ok())
37*b315a0a8SYosh .and_then(|v| Url::parse(v).ok())
38*b315a0a8SYosh });
39*b315a0a8SYosh
40*b315a0a8SYosh let results = urls.map(|url| async move {
41*b315a0a8SYosh let result = hash(&url).await;
42*b315a0a8SYosh (url, result)
43*b315a0a8SYosh });
44*b315a0a8SYosh
45*b315a0a8SYosh let mut results = stream::iter(results).buffer_unordered(MAX_CONCURRENCY);
46*b315a0a8SYosh
47*b315a0a8SYosh let response = OutgoingResponse::new(
48*b315a0a8SYosh Fields::from_list(&[("content-type".to_string(), b"text/plain".to_vec())]).unwrap(),
49*b315a0a8SYosh );
50*b315a0a8SYosh
51*b315a0a8SYosh let mut body =
52*b315a0a8SYosh executor::outgoing_body(response.body().expect("response should be writable"));
53*b315a0a8SYosh
54*b315a0a8SYosh ResponseOutparam::set(response_out, Ok(response));
55*b315a0a8SYosh
56*b315a0a8SYosh while let Some((url, result)) = results.next().await {
57*b315a0a8SYosh let payload = match result {
58*b315a0a8SYosh Ok(hash) => format!("{url}: {hash}\n"),
59*b315a0a8SYosh Err(e) => format!("{url}: {e:?}\n"),
60*b315a0a8SYosh }
61*b315a0a8SYosh .into_bytes();
62*b315a0a8SYosh
63*b315a0a8SYosh if let Err(e) = body.send(payload).await {
64*b315a0a8SYosh eprintln!("Error sending payload: {e}");
65*b315a0a8SYosh }
66*b315a0a8SYosh }
67*b315a0a8SYosh }
68*b315a0a8SYosh
69*b315a0a8SYosh (Method::Post, Some("/echo")) => {
70*b315a0a8SYosh // Echo the request body without buffering it.
71*b315a0a8SYosh
72*b315a0a8SYosh let response = OutgoingResponse::new(
73*b315a0a8SYosh Fields::from_list(
74*b315a0a8SYosh &headers
75*b315a0a8SYosh .into_iter()
76*b315a0a8SYosh .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
77*b315a0a8SYosh .collect::<Vec<_>>(),
78*b315a0a8SYosh )
79*b315a0a8SYosh .unwrap(),
80*b315a0a8SYosh );
81*b315a0a8SYosh
82*b315a0a8SYosh let mut body =
83*b315a0a8SYosh executor::outgoing_body(response.body().expect("response should be writable"));
84*b315a0a8SYosh
85*b315a0a8SYosh ResponseOutparam::set(response_out, Ok(response));
86*b315a0a8SYosh
87*b315a0a8SYosh let mut stream =
88*b315a0a8SYosh executor::incoming_body(request.consume().expect("request should be readable"));
89*b315a0a8SYosh
90*b315a0a8SYosh while let Some(chunk) = stream.next().await {
91*b315a0a8SYosh match chunk {
92*b315a0a8SYosh Ok(chunk) => {
93*b315a0a8SYosh if let Err(e) = body.send(chunk).await {
94*b315a0a8SYosh eprintln!("Error sending body: {e}");
95*b315a0a8SYosh break;
96*b315a0a8SYosh }
97*b315a0a8SYosh }
98*b315a0a8SYosh Err(e) => {
99*b315a0a8SYosh eprintln!("Error receiving body: {e}");
100*b315a0a8SYosh break;
101*b315a0a8SYosh }
102*b315a0a8SYosh }
103*b315a0a8SYosh }
104*b315a0a8SYosh }
105*b315a0a8SYosh
106*b315a0a8SYosh (Method::Post, Some("/double-echo")) => {
107*b315a0a8SYosh // Pipe the request body to an outgoing request and stream the response back to the client.
108*b315a0a8SYosh
109*b315a0a8SYosh if let Some(url) = headers.iter().find_map(|(k, v)| {
110*b315a0a8SYosh (k == "url")
111*b315a0a8SYosh .then_some(v)
112*b315a0a8SYosh .and_then(|v| std::str::from_utf8(v).ok())
113*b315a0a8SYosh .and_then(|v| Url::parse(v).ok())
114*b315a0a8SYosh }) {
115*b315a0a8SYosh match double_echo(request, &url).await {
116*b315a0a8SYosh Ok((request_copy, response)) => {
117*b315a0a8SYosh let mut stream = executor::incoming_body(
118*b315a0a8SYosh response.consume().expect("response should be consumable"),
119*b315a0a8SYosh );
120*b315a0a8SYosh
121*b315a0a8SYosh let response = OutgoingResponse::new(
122*b315a0a8SYosh Fields::from_list(
123*b315a0a8SYosh &headers
124*b315a0a8SYosh .into_iter()
125*b315a0a8SYosh .filter_map(|(k, v)| (k == "content-type").then_some((k, v)))
126*b315a0a8SYosh .collect::<Vec<_>>(),
127*b315a0a8SYosh )
128*b315a0a8SYosh .unwrap(),
129*b315a0a8SYosh );
130*b315a0a8SYosh
131*b315a0a8SYosh let mut body = executor::outgoing_body(
132*b315a0a8SYosh response.body().expect("response should be writable"),
133*b315a0a8SYosh );
134*b315a0a8SYosh
135*b315a0a8SYosh ResponseOutparam::set(response_out, Ok(response));
136*b315a0a8SYosh
137*b315a0a8SYosh let response_copy = async move {
138*b315a0a8SYosh while let Some(chunk) = stream.next().await {
139*b315a0a8SYosh body.send(chunk?).await?;
140*b315a0a8SYosh }
141*b315a0a8SYosh Ok::<_, anyhow::Error>(())
142*b315a0a8SYosh };
143*b315a0a8SYosh
144*b315a0a8SYosh let (request_copy, response_copy) =
145*b315a0a8SYosh future::join(request_copy, response_copy).await;
146*b315a0a8SYosh if let Err(e) = request_copy.and(response_copy) {
147*b315a0a8SYosh eprintln!("error piping to and from {url}: {e}");
148*b315a0a8SYosh }
149*b315a0a8SYosh }
150*b315a0a8SYosh
151*b315a0a8SYosh Err(e) => {
152*b315a0a8SYosh eprintln!("Error sending outgoing request to {url}: {e}");
153*b315a0a8SYosh server_error(response_out);
154*b315a0a8SYosh }
155*b315a0a8SYosh }
156*b315a0a8SYosh } else {
157*b315a0a8SYosh bad_request(response_out);
158*b315a0a8SYosh }
159*b315a0a8SYosh }
160*b315a0a8SYosh
161*b315a0a8SYosh _ => method_not_allowed(response_out),
162*b315a0a8SYosh }
163*b315a0a8SYosh }
164*b315a0a8SYosh
double_echo( incoming_request: IncomingRequest, url: &Url, ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)>165*b315a0a8SYosh async fn double_echo(
166*b315a0a8SYosh incoming_request: IncomingRequest,
167*b315a0a8SYosh url: &Url,
168*b315a0a8SYosh ) -> Result<(impl Future<Output = Result<()>> + use<>, IncomingResponse)> {
169*b315a0a8SYosh let outgoing_request = OutgoingRequest::new(Fields::new());
170*b315a0a8SYosh
171*b315a0a8SYosh outgoing_request
172*b315a0a8SYosh .set_method(&Method::Post)
173*b315a0a8SYosh .map_err(|()| anyhow!("failed to set method"))?;
174*b315a0a8SYosh
175*b315a0a8SYosh outgoing_request
176*b315a0a8SYosh .set_path_with_query(Some(url.path()))
177*b315a0a8SYosh .map_err(|()| anyhow!("failed to set path_with_query"))?;
178*b315a0a8SYosh
179*b315a0a8SYosh outgoing_request
180*b315a0a8SYosh .set_scheme(Some(&match url.scheme() {
181*b315a0a8SYosh "http" => Scheme::Http,
182*b315a0a8SYosh "https" => Scheme::Https,
183*b315a0a8SYosh scheme => Scheme::Other(scheme.into()),
184*b315a0a8SYosh }))
185*b315a0a8SYosh .map_err(|()| anyhow!("failed to set scheme"))?;
186*b315a0a8SYosh
187*b315a0a8SYosh outgoing_request
188*b315a0a8SYosh .set_authority(Some(&format!(
189*b315a0a8SYosh "{}{}",
190*b315a0a8SYosh url.host_str().unwrap_or(""),
191*b315a0a8SYosh if let Some(port) = url.port() {
192*b315a0a8SYosh format!(":{port}")
193*b315a0a8SYosh } else {
194*b315a0a8SYosh String::new()
195*b315a0a8SYosh }
196*b315a0a8SYosh )))
197*b315a0a8SYosh .map_err(|()| anyhow!("failed to set authority"))?;
198*b315a0a8SYosh
199*b315a0a8SYosh let mut body = executor::outgoing_body(
200*b315a0a8SYosh outgoing_request
201*b315a0a8SYosh .body()
202*b315a0a8SYosh .expect("request body should be writable"),
203*b315a0a8SYosh );
204*b315a0a8SYosh
205*b315a0a8SYosh let response = executor::outgoing_request_send(outgoing_request);
206*b315a0a8SYosh
207*b315a0a8SYosh let mut stream = executor::incoming_body(
208*b315a0a8SYosh incoming_request
209*b315a0a8SYosh .consume()
210*b315a0a8SYosh .expect("request should be consumable"),
211*b315a0a8SYosh );
212*b315a0a8SYosh
213*b315a0a8SYosh let copy = async move {
214*b315a0a8SYosh while let Some(chunk) = stream.next().await {
215*b315a0a8SYosh body.send(chunk?).await?;
216*b315a0a8SYosh }
217*b315a0a8SYosh Ok::<_, anyhow::Error>(())
218*b315a0a8SYosh };
219*b315a0a8SYosh
220*b315a0a8SYosh let response = response.await?;
221*b315a0a8SYosh
222*b315a0a8SYosh let status = response.status();
223*b315a0a8SYosh
224*b315a0a8SYosh if !(200..300).contains(&status) {
225*b315a0a8SYosh bail!("unexpected status: {status}");
226*b315a0a8SYosh }
227*b315a0a8SYosh
228*b315a0a8SYosh Ok((copy, response))
229*b315a0a8SYosh }
230*b315a0a8SYosh
server_error(response_out: ResponseOutparam)231*b315a0a8SYosh fn server_error(response_out: ResponseOutparam) {
232*b315a0a8SYosh respond(500, response_out)
233*b315a0a8SYosh }
234*b315a0a8SYosh
bad_request(response_out: ResponseOutparam)235*b315a0a8SYosh fn bad_request(response_out: ResponseOutparam) {
236*b315a0a8SYosh respond(400, response_out)
237*b315a0a8SYosh }
238*b315a0a8SYosh
method_not_allowed(response_out: ResponseOutparam)239*b315a0a8SYosh fn method_not_allowed(response_out: ResponseOutparam) {
240*b315a0a8SYosh respond(405, response_out)
241*b315a0a8SYosh }
242*b315a0a8SYosh
respond(status: u16, response_out: ResponseOutparam)243*b315a0a8SYosh fn respond(status: u16, response_out: ResponseOutparam) {
244*b315a0a8SYosh let response = OutgoingResponse::new(Fields::new());
245*b315a0a8SYosh response
246*b315a0a8SYosh .set_status_code(status)
247*b315a0a8SYosh .expect("setting status code");
248*b315a0a8SYosh
249*b315a0a8SYosh let body = response.body().expect("response should be writable");
250*b315a0a8SYosh
251*b315a0a8SYosh ResponseOutparam::set(response_out, Ok(response));
252*b315a0a8SYosh
253*b315a0a8SYosh OutgoingBody::finish(body, None).expect("outgoing-body.finish");
254*b315a0a8SYosh }
255*b315a0a8SYosh
hash(url: &Url) -> Result<String>256*b315a0a8SYosh async fn hash(url: &Url) -> Result<String> {
257*b315a0a8SYosh let request = OutgoingRequest::new(Fields::new());
258*b315a0a8SYosh
259*b315a0a8SYosh request
260*b315a0a8SYosh .set_path_with_query(Some(url.path()))
261*b315a0a8SYosh .map_err(|()| anyhow!("failed to set path_with_query"))?;
262*b315a0a8SYosh request
263*b315a0a8SYosh .set_scheme(Some(&match url.scheme() {
264*b315a0a8SYosh "http" => Scheme::Http,
265*b315a0a8SYosh "https" => Scheme::Https,
266*b315a0a8SYosh scheme => Scheme::Other(scheme.into()),
267*b315a0a8SYosh }))
268*b315a0a8SYosh .map_err(|()| anyhow!("failed to set scheme"))?;
269*b315a0a8SYosh request
270*b315a0a8SYosh .set_authority(Some(&format!(
271*b315a0a8SYosh "{}{}",
272*b315a0a8SYosh url.host_str().unwrap_or(""),
273*b315a0a8SYosh if let Some(port) = url.port() {
274*b315a0a8SYosh format!(":{port}")
275*b315a0a8SYosh } else {
276*b315a0a8SYosh String::new()
277*b315a0a8SYosh }
278*b315a0a8SYosh )))
279*b315a0a8SYosh .map_err(|()| anyhow!("failed to set authority"))?;
280*b315a0a8SYosh
281*b315a0a8SYosh let response = executor::outgoing_request_send(request).await?;
282*b315a0a8SYosh
283*b315a0a8SYosh let status = response.status();
284*b315a0a8SYosh
285*b315a0a8SYosh if !(200..300).contains(&status) {
286*b315a0a8SYosh bail!("unexpected status: {status}");
287*b315a0a8SYosh }
288*b315a0a8SYosh
289*b315a0a8SYosh let mut body =
290*b315a0a8SYosh executor::incoming_body(response.consume().expect("response should be readable"));
291*b315a0a8SYosh
292*b315a0a8SYosh use sha2::Digest;
293*b315a0a8SYosh let mut hasher = sha2::Sha256::new();
294*b315a0a8SYosh while let Some(chunk) = body.try_next().await? {
295*b315a0a8SYosh hasher.update(&chunk);
296*b315a0a8SYosh }
297*b315a0a8SYosh
298*b315a0a8SYosh use base64::Engine;
299*b315a0a8SYosh Ok(base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize()))
300*b315a0a8SYosh }
301*b315a0a8SYosh
302*b315a0a8SYosh // Technically this should not be here for a proxy, but given the current
303*b315a0a8SYosh // framework for tests it's required since this file is built as a `bin`
main()304*b315a0a8SYosh fn main() {}
305*b315a0a8SYosh
306*b315a0a8SYosh mod executor {
307*b315a0a8SYosh use anyhow::{Error, Result, anyhow};
308*b315a0a8SYosh use futures::{Sink, Stream, future, sink, stream};
309*b315a0a8SYosh use std::{
310*b315a0a8SYosh cell::RefCell,
311*b315a0a8SYosh future::Future,
312*b315a0a8SYosh mem,
313*b315a0a8SYosh rc::Rc,
314*b315a0a8SYosh sync::{Arc, Mutex},
315*b315a0a8SYosh task::{Context, Poll, Wake, Waker},
316*b315a0a8SYosh };
317*b315a0a8SYosh use test_programs::wasi::{
318*b315a0a8SYosh http::{
319*b315a0a8SYosh outgoing_handler,
320*b315a0a8SYosh types::{
321*b315a0a8SYosh self, FutureTrailers, IncomingBody, IncomingResponse, InputStream, OutgoingBody,
322*b315a0a8SYosh OutgoingRequest, OutputStream,
323*b315a0a8SYosh },
324*b315a0a8SYosh },
325*b315a0a8SYosh io::{self, streams::StreamError},
326*b315a0a8SYosh };
327*b315a0a8SYosh
328*b315a0a8SYosh const READ_SIZE: u64 = 16 * 1024;
329*b315a0a8SYosh
330*b315a0a8SYosh static WAKERS: Mutex<Vec<(io::poll::Pollable, Waker)>> = Mutex::new(Vec::new());
331*b315a0a8SYosh
run<T>(future: impl Future<Output = T>) -> T332*b315a0a8SYosh pub fn run<T>(future: impl Future<Output = T>) -> T {
333*b315a0a8SYosh futures::pin_mut!(future);
334*b315a0a8SYosh
335*b315a0a8SYosh struct DummyWaker;
336*b315a0a8SYosh
337*b315a0a8SYosh impl Wake for DummyWaker {
338*b315a0a8SYosh fn wake(self: Arc<Self>) {}
339*b315a0a8SYosh }
340*b315a0a8SYosh
341*b315a0a8SYosh let waker = Arc::new(DummyWaker).into();
342*b315a0a8SYosh
343*b315a0a8SYosh loop {
344*b315a0a8SYosh match future.as_mut().poll(&mut Context::from_waker(&waker)) {
345*b315a0a8SYosh Poll::Pending => {
346*b315a0a8SYosh let mut new_wakers = Vec::new();
347*b315a0a8SYosh
348*b315a0a8SYosh let wakers = mem::take::<Vec<_>>(&mut WAKERS.lock().unwrap());
349*b315a0a8SYosh
350*b315a0a8SYosh assert!(!wakers.is_empty());
351*b315a0a8SYosh
352*b315a0a8SYosh let pollables = wakers
353*b315a0a8SYosh .iter()
354*b315a0a8SYosh .map(|(pollable, _)| pollable)
355*b315a0a8SYosh .collect::<Vec<_>>();
356*b315a0a8SYosh
357*b315a0a8SYosh let mut ready = vec![false; wakers.len()];
358*b315a0a8SYosh
359*b315a0a8SYosh for index in io::poll::poll(&pollables) {
360*b315a0a8SYosh ready[usize::try_from(index).unwrap()] = true;
361*b315a0a8SYosh }
362*b315a0a8SYosh
363*b315a0a8SYosh for (ready, (pollable, waker)) in ready.into_iter().zip(wakers) {
364*b315a0a8SYosh if ready {
365*b315a0a8SYosh waker.wake()
366*b315a0a8SYosh } else {
367*b315a0a8SYosh new_wakers.push((pollable, waker));
368*b315a0a8SYosh }
369*b315a0a8SYosh }
370*b315a0a8SYosh
371*b315a0a8SYosh *WAKERS.lock().unwrap() = new_wakers;
372*b315a0a8SYosh }
373*b315a0a8SYosh Poll::Ready(result) => break result,
374*b315a0a8SYosh }
375*b315a0a8SYosh }
376*b315a0a8SYosh }
377*b315a0a8SYosh
outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error>378*b315a0a8SYosh pub fn outgoing_body(body: OutgoingBody) -> impl Sink<Vec<u8>, Error = Error> {
379*b315a0a8SYosh struct Outgoing(Option<(OutputStream, OutgoingBody)>);
380*b315a0a8SYosh
381*b315a0a8SYosh impl Drop for Outgoing {
382*b315a0a8SYosh fn drop(&mut self) {
383*b315a0a8SYosh if let Some((stream, body)) = self.0.take() {
384*b315a0a8SYosh drop(stream);
385*b315a0a8SYosh OutgoingBody::finish(body, None).expect("outgoing-body.finish");
386*b315a0a8SYosh }
387*b315a0a8SYosh }
388*b315a0a8SYosh }
389*b315a0a8SYosh
390*b315a0a8SYosh let stream = body.write().expect("response body should be writable");
391*b315a0a8SYosh let pair = Rc::new(RefCell::new(Outgoing(Some((stream, body)))));
392*b315a0a8SYosh
393*b315a0a8SYosh sink::unfold((), {
394*b315a0a8SYosh move |(), chunk: Vec<u8>| {
395*b315a0a8SYosh future::poll_fn({
396*b315a0a8SYosh let mut offset = 0;
397*b315a0a8SYosh let mut flushing = false;
398*b315a0a8SYosh let pair = pair.clone();
399*b315a0a8SYosh
400*b315a0a8SYosh move |context| {
401*b315a0a8SYosh let pair = pair.borrow();
402*b315a0a8SYosh let (stream, _) = &pair.0.as_ref().unwrap();
403*b315a0a8SYosh
404*b315a0a8SYosh loop {
405*b315a0a8SYosh match stream.check_write() {
406*b315a0a8SYosh Ok(0) => {
407*b315a0a8SYosh WAKERS
408*b315a0a8SYosh .lock()
409*b315a0a8SYosh .unwrap()
410*b315a0a8SYosh .push((stream.subscribe(), context.waker().clone()));
411*b315a0a8SYosh
412*b315a0a8SYosh break Poll::Pending;
413*b315a0a8SYosh }
414*b315a0a8SYosh Ok(count) => {
415*b315a0a8SYosh if offset == chunk.len() {
416*b315a0a8SYosh if flushing {
417*b315a0a8SYosh break Poll::Ready(Ok(()));
418*b315a0a8SYosh } else {
419*b315a0a8SYosh stream.flush().expect("stream should be flushable");
420*b315a0a8SYosh flushing = true;
421*b315a0a8SYosh }
422*b315a0a8SYosh } else {
423*b315a0a8SYosh let count = usize::try_from(count)
424*b315a0a8SYosh .unwrap()
425*b315a0a8SYosh .min(chunk.len() - offset);
426*b315a0a8SYosh
427*b315a0a8SYosh match stream.write(&chunk[offset..][..count]) {
428*b315a0a8SYosh Ok(()) => {
429*b315a0a8SYosh offset += count;
430*b315a0a8SYosh }
431*b315a0a8SYosh Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
432*b315a0a8SYosh }
433*b315a0a8SYosh }
434*b315a0a8SYosh }
435*b315a0a8SYosh Err(_) => break Poll::Ready(Err(anyhow!("I/O error"))),
436*b315a0a8SYosh }
437*b315a0a8SYosh }
438*b315a0a8SYosh }
439*b315a0a8SYosh })
440*b315a0a8SYosh }
441*b315a0a8SYosh })
442*b315a0a8SYosh }
443*b315a0a8SYosh
outgoing_request_send( request: OutgoingRequest, ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>>444*b315a0a8SYosh pub fn outgoing_request_send(
445*b315a0a8SYosh request: OutgoingRequest,
446*b315a0a8SYosh ) -> impl Future<Output = Result<IncomingResponse, types::ErrorCode>> {
447*b315a0a8SYosh future::poll_fn({
448*b315a0a8SYosh let response = outgoing_handler::handle(request, None);
449*b315a0a8SYosh
450*b315a0a8SYosh move |context| match &response {
451*b315a0a8SYosh Ok(response) => {
452*b315a0a8SYosh if let Some(response) = response.get() {
453*b315a0a8SYosh Poll::Ready(response.unwrap())
454*b315a0a8SYosh } else {
455*b315a0a8SYosh WAKERS
456*b315a0a8SYosh .lock()
457*b315a0a8SYosh .unwrap()
458*b315a0a8SYosh .push((response.subscribe(), context.waker().clone()));
459*b315a0a8SYosh Poll::Pending
460*b315a0a8SYosh }
461*b315a0a8SYosh }
462*b315a0a8SYosh Err(error) => Poll::Ready(Err(error.clone())),
463*b315a0a8SYosh }
464*b315a0a8SYosh })
465*b315a0a8SYosh }
466*b315a0a8SYosh
incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>>467*b315a0a8SYosh pub fn incoming_body(body: IncomingBody) -> impl Stream<Item = Result<Vec<u8>>> {
468*b315a0a8SYosh enum Inner {
469*b315a0a8SYosh Stream {
470*b315a0a8SYosh stream: InputStream,
471*b315a0a8SYosh body: IncomingBody,
472*b315a0a8SYosh },
473*b315a0a8SYosh Trailers(FutureTrailers),
474*b315a0a8SYosh Closed,
475*b315a0a8SYosh }
476*b315a0a8SYosh
477*b315a0a8SYosh struct Incoming(Inner);
478*b315a0a8SYosh
479*b315a0a8SYosh impl Drop for Incoming {
480*b315a0a8SYosh fn drop(&mut self) {
481*b315a0a8SYosh match mem::replace(&mut self.0, Inner::Closed) {
482*b315a0a8SYosh Inner::Stream { stream, body } => {
483*b315a0a8SYosh drop(stream);
484*b315a0a8SYosh IncomingBody::finish(body);
485*b315a0a8SYosh }
486*b315a0a8SYosh Inner::Trailers(_) | Inner::Closed => {}
487*b315a0a8SYosh }
488*b315a0a8SYosh }
489*b315a0a8SYosh }
490*b315a0a8SYosh
491*b315a0a8SYosh stream::poll_fn({
492*b315a0a8SYosh let stream = body.stream().expect("response body should be readable");
493*b315a0a8SYosh let mut incoming = Incoming(Inner::Stream { stream, body });
494*b315a0a8SYosh
495*b315a0a8SYosh move |context| {
496*b315a0a8SYosh loop {
497*b315a0a8SYosh match &incoming.0 {
498*b315a0a8SYosh Inner::Stream { stream, .. } => match stream.read(READ_SIZE) {
499*b315a0a8SYosh Ok(buffer) => {
500*b315a0a8SYosh return if buffer.is_empty() {
501*b315a0a8SYosh WAKERS
502*b315a0a8SYosh .lock()
503*b315a0a8SYosh .unwrap()
504*b315a0a8SYosh .push((stream.subscribe(), context.waker().clone()));
505*b315a0a8SYosh Poll::Pending
506*b315a0a8SYosh } else {
507*b315a0a8SYosh Poll::Ready(Some(Ok(buffer)))
508*b315a0a8SYosh };
509*b315a0a8SYosh }
510*b315a0a8SYosh Err(StreamError::Closed) => {
511*b315a0a8SYosh let Inner::Stream { stream, body } =
512*b315a0a8SYosh mem::replace(&mut incoming.0, Inner::Closed)
513*b315a0a8SYosh else {
514*b315a0a8SYosh unreachable!();
515*b315a0a8SYosh };
516*b315a0a8SYosh drop(stream);
517*b315a0a8SYosh incoming.0 = Inner::Trailers(IncomingBody::finish(body));
518*b315a0a8SYosh }
519*b315a0a8SYosh Err(StreamError::LastOperationFailed(error)) => {
520*b315a0a8SYosh return Poll::Ready(Some(Err(anyhow!(
521*b315a0a8SYosh "{}",
522*b315a0a8SYosh error.to_debug_string()
523*b315a0a8SYosh ))));
524*b315a0a8SYosh }
525*b315a0a8SYosh },
526*b315a0a8SYosh
527*b315a0a8SYosh Inner::Trailers(trailers) => {
528*b315a0a8SYosh match trailers.get() {
529*b315a0a8SYosh Some(Ok(trailers)) => {
530*b315a0a8SYosh incoming.0 = Inner::Closed;
531*b315a0a8SYosh match trailers {
532*b315a0a8SYosh Ok(Some(_)) => {
533*b315a0a8SYosh // Currently, we just ignore any trailers. TODO: Add a test that
534*b315a0a8SYosh // expects trailers and verify they match the expected contents.
535*b315a0a8SYosh }
536*b315a0a8SYosh Ok(None) => {
537*b315a0a8SYosh // No trailers; nothing else to do.
538*b315a0a8SYosh }
539*b315a0a8SYosh Err(error) => {
540*b315a0a8SYosh // Error reading the trailers: pass it on to the application.
541*b315a0a8SYosh return Poll::Ready(Some(Err(anyhow!("{error:?}"))));
542*b315a0a8SYosh }
543*b315a0a8SYosh }
544*b315a0a8SYosh }
545*b315a0a8SYosh Some(Err(_)) => {
546*b315a0a8SYosh // Should only happen if we try to retrieve the trailers twice, i.e. a bug in
547*b315a0a8SYosh // this code.
548*b315a0a8SYosh unreachable!();
549*b315a0a8SYosh }
550*b315a0a8SYosh None => {
551*b315a0a8SYosh WAKERS
552*b315a0a8SYosh .lock()
553*b315a0a8SYosh .unwrap()
554*b315a0a8SYosh .push((trailers.subscribe(), context.waker().clone()));
555*b315a0a8SYosh return Poll::Pending;
556*b315a0a8SYosh }
557*b315a0a8SYosh }
558*b315a0a8SYosh }
559*b315a0a8SYosh
560*b315a0a8SYosh Inner::Closed => {
561*b315a0a8SYosh return Poll::Ready(None);
562*b315a0a8SYosh }
563*b315a0a8SYosh }
564*b315a0a8SYosh }
565*b315a0a8SYosh }
566*b315a0a8SYosh })
567*b315a0a8SYosh }
568*b315a0a8SYosh }
569