1 //! Provides utilities useful for dispatching incoming HTTP requests 2 //! `wasi:http/handler` guest instances. 3 4 #[cfg(feature = "p3")] 5 use crate::p3; 6 use anyhow::{Result, anyhow}; 7 use futures::stream::{FuturesUnordered, StreamExt}; 8 use std::collections::VecDeque; 9 use std::collections::btree_map::{BTreeMap, Entry}; 10 use std::future; 11 use std::pin::{Pin, pin}; 12 use std::sync::{ 13 Arc, Mutex, 14 atomic::{ 15 AtomicBool, AtomicU64, AtomicUsize, 16 Ordering::{Relaxed, SeqCst}, 17 }, 18 }; 19 use std::task::Poll; 20 use std::time::{Duration, Instant}; 21 use tokio::sync::Notify; 22 use wasmtime::AsContextMut; 23 use wasmtime::component::Accessor; 24 use wasmtime::{Store, StoreContextMut}; 25 26 /// Alternative p2 bindings generated with `exports: { default: async | store }` 27 /// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances. 28 pub mod p2 { 29 #[expect(missing_docs, reason = "bindgen-generated code")] 30 pub mod bindings { 31 wasmtime::component::bindgen!({ 32 path: "wit", 33 world: "wasi:http/proxy", 34 imports: { default: tracing }, 35 exports: { default: async | store }, 36 require_store_data_send: true, 37 with: { 38 // http is in this crate 39 "wasi:http": crate::bindings::http, 40 // Upstream package dependencies 41 "wasi:io": wasmtime_wasi::p2::bindings::io, 42 } 43 }); 44 45 pub use wasi::*; 46 } 47 } 48 49 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 50 /// `wasi:http/handler@0.3.x` pre-instance. 51 pub enum ProxyPre<T: 'static> { 52 /// A `wasi:http/incoming-handler@0.2.x` pre-instance. 53 P2(p2::bindings::ProxyPre<T>), 54 /// A `wasi:http/handler@0.3.x` pre-instance. 55 #[cfg(feature = "p3")] 56 P3(p3::bindings::ProxyPre<T>), 57 } 58 59 impl<T: 'static> ProxyPre<T> { 60 async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy> 61 where 62 T: Send, 63 { 64 Ok(match self { 65 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?), 66 #[cfg(feature = "p3")] 67 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?), 68 }) 69 } 70 } 71 72 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 73 /// `wasi:http/handler@0.3.x` instance. 74 pub enum Proxy { 75 /// A `wasi:http/incoming-handler@0.2.x` instance. 76 P2(p2::bindings::Proxy), 77 /// A `wasi:http/handler@0.3.x` instance. 78 #[cfg(feature = "p3")] 79 P3(p3::bindings::Proxy), 80 } 81 82 /// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or 83 /// `wasi:http/handler@0.3.x` instance. 84 pub type TaskFn<T> = Box< 85 dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> 86 + Send, 87 >; 88 89 /// Async MPMC channel where each item is delivered to at most one consumer. 90 struct Queue<T> { 91 queue: Mutex<VecDeque<T>>, 92 notify: Notify, 93 } 94 95 impl<T> Default for Queue<T> { 96 fn default() -> Self { 97 Self { 98 queue: Default::default(), 99 notify: Default::default(), 100 } 101 } 102 } 103 104 impl<T> Queue<T> { 105 fn is_empty(&self) -> bool { 106 self.queue.lock().unwrap().is_empty() 107 } 108 109 fn push(&self, item: T) { 110 self.queue.lock().unwrap().push_back(item); 111 self.notify.notify_one(); 112 } 113 114 fn try_pop(&self) -> Option<T> { 115 self.queue.lock().unwrap().pop_front() 116 } 117 118 async fn pop(&self) -> T { 119 // This code comes from the Unbound MPMC Channel example in [the 120 // `tokio::sync::Notify` 121 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html). 122 123 let mut notified = pin!(self.notify.notified()); 124 125 loop { 126 notified.as_mut().enable(); 127 if let Some(item) = self.try_pop() { 128 return item; 129 } 130 notified.as_mut().await; 131 notified.set(self.notify.notified()); 132 } 133 } 134 } 135 136 /// Bundles a [`Store`] with a callback to write a profile (if configured). 137 pub struct StoreBundle<T: 'static> { 138 /// The [`Store`] to use to handle requests. 139 pub store: Store<T>, 140 /// Callback to write a profile (if enabled) once all requests have been 141 /// handled. 142 pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>, 143 } 144 145 /// Represents the application-specific state of a web server. 146 pub trait HandlerState: 'static + Sync + Send { 147 /// The type of the associated data for [`Store`]s created using 148 /// [`new_store`]. 149 type StoreData: Send; 150 151 /// Create a new [`Store`] for handling one or more requests. 152 /// 153 /// The `req_id` parameter is the value passed in the call to 154 /// [`ProxyHandler::spawn`] that created the worker to which the new `Store` 155 /// will belong. See that function's documentation for details. 156 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>; 157 158 /// Maximum time allowed to handle a request. 159 /// 160 /// In practice, a guest may be allowed to run up to 2x this time in the 161 /// case of instance reuse to avoid penalizing concurrent requests being 162 /// handled by the same instance. 163 fn request_timeout(&self) -> Duration; 164 165 /// Maximum time to keep an idle instance around before dropping it. 166 fn idle_instance_timeout(&self) -> Duration; 167 168 /// Maximum number of requests to handle using a single instance before 169 /// dropping it. 170 fn max_instance_reuse_count(&self) -> usize; 171 172 /// Maximum number of requests to handle concurrently using a single 173 /// instance. 174 fn max_instance_concurrent_reuse_count(&self) -> usize; 175 176 /// Called when a worker exits with an error. 177 fn handle_worker_error(&self, error: anyhow::Error); 178 } 179 180 struct ProxyHandlerInner<S: HandlerState> { 181 state: S, 182 instance_pre: ProxyPre<S::StoreData>, 183 next_id: AtomicU64, 184 task_queue: Queue<TaskFn<S::StoreData>>, 185 worker_count: AtomicUsize, 186 } 187 188 /// Helper utility to track the start times of tasks accepted by a worker. 189 /// 190 /// This is used to ensure that timeouts are enforced even when the 191 /// `StoreContextMut::run_concurrent` event loop is unable to make progress due 192 /// to the guest either busy looping or being blocked on a synchronous call to a 193 /// host function which has exclusive access to the `Store`. 194 #[derive(Default)] 195 struct StartTimes(BTreeMap<Instant, usize>); 196 197 impl StartTimes { 198 fn add(&mut self, time: Instant) { 199 *self.0.entry(time).or_insert(0) += 1; 200 } 201 202 fn remove(&mut self, time: Instant) { 203 let Entry::Occupied(mut entry) = self.0.entry(time) else { 204 unreachable!() 205 }; 206 match *entry.get() { 207 0 => unreachable!(), 208 1 => { 209 entry.remove(); 210 } 211 _ => { 212 *entry.get_mut() -= 1; 213 } 214 } 215 } 216 217 fn earliest(&self) -> Option<Instant> { 218 self.0.first_key_value().map(|(&k, _)| k) 219 } 220 } 221 222 struct Worker<S> 223 where 224 S: HandlerState, 225 { 226 handler: ProxyHandler<S>, 227 available: bool, 228 } 229 230 impl<S> Worker<S> 231 where 232 S: HandlerState, 233 { 234 fn set_available(&mut self, available: bool) { 235 if available != self.available { 236 self.available = available; 237 if available { 238 self.handler.0.worker_count.fetch_add(1, Relaxed); 239 } else { 240 // Here we use `SeqCst` to ensure the load/store is ordered 241 // correctly with respect to the `Queue::is_empty` check we do 242 // below. 243 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst); 244 // This addresses what would otherwise be a race condition in 245 // `ProxyHandler::spawn` where it only starts a worker if the 246 // available worker count is zero. If we decrement the count to 247 // zero right after `ProxyHandler::spawn` checks it, then no 248 // worker will be started; thus it becomes our responsibility to 249 // start a worker here instead. 250 if count == 1 && !self.handler.0.task_queue.is_empty() { 251 self.handler.start_worker(None, None); 252 } 253 } 254 } 255 } 256 257 async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 258 if let Err(error) = self.run_(task, req_id).await { 259 self.handler.0.state.handle_worker_error(error); 260 } 261 } 262 263 async fn run_( 264 &mut self, 265 task: Option<TaskFn<S::StoreData>>, 266 req_id: Option<u64>, 267 ) -> Result<()> { 268 // NB: The code the follows is rather subtle in that it is structured 269 // carefully to provide a few key invariants related to how instance 270 // reuse and request timeouts interact: 271 // 272 // - A task must never be allowed to run for more than 2x the request 273 // timeout, if any. 274 // 275 // - Every task we accept here must be allowed to run for at least 1x 276 // the request timeout, if any. 277 // 278 // - When more than one task is run concurrently in the same instance, 279 // we must stop accepting new tasks as soon as any existing task reaches 280 // the request timeout. This serves to cap the amount of time we need 281 // to keep the instance alive before _all_ tasks have either completed 282 // or timed out. 283 // 284 // As of this writing, there's an additional wrinkle that makes 285 // guaranteeing those invariants particularly tricky: per #11869 and 286 // #11870, busy guest loops, epoch interruption, and host functions 287 // registered using `Linker::func_{wrap,new}_async` all require 288 // blocking, exclusive access to the `Store`, which effectively prevents 289 // the `StoreContextMut::run_concurrent` event loop from making 290 // progress. That, in turn, prevents any concurrent tasks from 291 // executing, and also prevents the `AsyncFnOnce` passed to 292 // `run_concurrent` from being polled. Consequently, we must rely on a 293 // "second line of defense" to ensure tasks are timed out promptly, 294 // which is to check for timeouts _outside_ the `run_concurrent` future. 295 // Once the aforementioned issues have been addressed, we'll be able to 296 // remove that check and its associated baggage. 297 298 let handler = &self.handler.0; 299 300 let StoreBundle { 301 mut store, 302 write_profile, 303 } = handler.state.new_store(req_id)?; 304 305 let request_timeout = handler.state.request_timeout(); 306 let idle_instance_timeout = handler.state.idle_instance_timeout(); 307 let max_instance_reuse_count = handler.state.max_instance_reuse_count(); 308 let max_instance_concurrent_reuse_count = 309 handler.state.max_instance_concurrent_reuse_count(); 310 311 let proxy = &handler.instance_pre.instantiate_async(&mut store).await?; 312 let accept_concurrent = AtomicBool::new(true); 313 let task_start_times = Mutex::new(StartTimes::default()); 314 315 let mut future = pin!(store.run_concurrent(async |accessor| { 316 let mut reuse_count = 0; 317 let mut timed_out = false; 318 let mut futures = FuturesUnordered::new(); 319 320 let accept_task = |task: TaskFn<S::StoreData>, 321 futures: &mut FuturesUnordered<_>, 322 reuse_count: &mut usize| { 323 // Set `accept_concurrent` to false, conservatively assuming 324 // that the new task will be CPU-bound, at least to begin with. 325 // Only once the `StoreContextMut::run_concurrent` event loop 326 // returns `Pending` will we set `accept_concurrent` back to 327 // true and consider accepting more tasks. 328 // 329 // This approach avoids taking on more than one CPU-bound task 330 // at a time, which would hurt throughput vs. leaving the 331 // additional tasks for other workers to handle. 332 accept_concurrent.store(false, Relaxed); 333 *reuse_count += 1; 334 335 let start_time = Instant::now().checked_add(request_timeout); 336 if let Some(start_time) = start_time { 337 task_start_times.lock().unwrap().add(start_time); 338 } 339 340 futures.push(tokio::time::timeout(request_timeout, async move { 341 (task)(accessor, proxy).await; 342 start_time 343 })); 344 }; 345 346 if let Some(task) = task { 347 accept_task(task, &mut futures, &mut reuse_count); 348 } 349 350 let handler = self.handler.clone(); 351 while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) { 352 let new_task = { 353 let future_count = futures.len(); 354 let mut next_future = pin!(async { 355 if futures.is_empty() { 356 future::pending().await 357 } else { 358 futures.next().await.unwrap() 359 } 360 }); 361 let mut next_task = pin!(tokio::time::timeout( 362 if future_count == 0 { 363 idle_instance_timeout 364 } else { 365 Duration::MAX 366 }, 367 handler.0.task_queue.pop() 368 )); 369 // Poll any existing tasks, and if they're all `Pending` 370 // _and_ we haven't reached any reuse limits yet, poll for a 371 // new task from the queue. 372 // 373 // Note the the order of operations here is important. By 374 // polling `next_future` first, we'll disover any tasks that 375 // may have timed out, at which point we'll stop accepting 376 // new tasks altogether (see below for details). This is 377 // especially imporant in the case where the task was 378 // blocked on a synchronous call to a host function which 379 // has exclusive access to the `Store`; once that call 380 // finishes, the first think we need to do is time out the 381 // task. If we were to poll for a new task first, then we'd 382 // have to wait for _that_ task to finish or time out before 383 // we could kill the instance. 384 future::poll_fn(|cx| match next_future.as_mut().poll(cx) { 385 Poll::Pending => { 386 // Note that `Pending` here doesn't necessarily mean 387 // all tasks are blocked on I/O. They might simply 388 // be waiting for some deferred work to be done by 389 // the next turn of the 390 // `StoreContextMut::run_concurrent` event loop. 391 // Therefore, we check `accept_concurrent` here and 392 // only advertise we have capacity for another task 393 // if either we have no tasks at all or all our 394 // tasks really are blocked on I/O. 395 self.set_available( 396 reuse_count < max_instance_reuse_count 397 && future_count < max_instance_concurrent_reuse_count 398 && (future_count == 0 || accept_concurrent.load(Relaxed)), 399 ); 400 401 if self.available { 402 next_task.as_mut().poll(cx).map(Some) 403 } else { 404 Poll::Pending 405 } 406 } 407 Poll::Ready(Ok(start_time)) => { 408 // Task completed; carry on! 409 if let Some(start_time) = start_time { 410 task_start_times.lock().unwrap().remove(start_time); 411 } 412 Poll::Ready(None) 413 } 414 Poll::Ready(Err(_)) => { 415 // Task timed out; stop accepting new tasks, but 416 // continue polling until any other, in-progress 417 // tasks until they have either finished or timed 418 // out. This effectively kicks off a "graceful 419 // shutdown" of the worker, allowing any other 420 // concurrent tasks time to finish before we drop 421 // the instance. 422 // 423 // TODO: We should also send a cancel request to the 424 // timed-out task to give it a chance to shut down 425 // gracefully (and delay dropping the instance for a 426 // reasonable amount of time), but as of this 427 // writing Wasmtime does not yet provide an API for 428 // doing that. See issue #11833. 429 timed_out = true; 430 reuse_count = max_instance_reuse_count; 431 Poll::Ready(None) 432 } 433 }) 434 .await 435 }; 436 437 match new_task { 438 Some(Ok(task)) => { 439 accept_task(task, &mut futures, &mut reuse_count); 440 } 441 Some(Err(_)) => break, 442 None => {} 443 } 444 } 445 446 accessor.with(|mut access| write_profile(access.as_context_mut())); 447 448 if timed_out { 449 Err(anyhow!("guest timed out")) 450 } else { 451 anyhow::Ok(()) 452 } 453 })); 454 455 let mut sleep = pin!(tokio::time::sleep(Duration::MAX)); 456 457 future::poll_fn(|cx| { 458 let poll = future.as_mut().poll(cx); 459 if poll.is_pending() { 460 // If the future returns `Pending`, that's either because it's 461 // idle (in which case it can definitely accept a new task) or 462 // because all its tasks are awaiting I/O, in which case it may 463 // have capacity for additional tasks to run concurrently. 464 // 465 // However, if one of the tasks is blocked on a sync call to a 466 // host function which has exclusive access to the `Store`, the 467 // `StoreContextMut::run_concurrent` event loop will be unable 468 // to make progress until that call finishes. Similarly, if the 469 // task loops indefinitely, subject only to epoch interruption, 470 // the event loop will also be stuck. Either way, any task 471 // timeouts created inside the `AsyncFnOnce` we passed to 472 // `run_concurrent` won't have a chance to trigger. 473 // Consequently, we need to _also_ enforce timeouts here, 474 // outside the event loop. 475 // 476 // Therefore, we check if the oldest outstanding task has been 477 // running for at least `request_timeout*2`, which is the 478 // maximum time needed for any other concurrent tasks to 479 // complete or time out, at which point we can safely discard 480 // the instance. If that deadline has not yet arrived, we 481 // schedule a wakeup to occur when it does. 482 // 483 // We uphold the "never kill an instance with a task which has 484 // been running for less than the request timeout" invariant 485 // here by noting that this timeout will only trigger if the 486 // `AsyncFnOnce` we passed to `run_concurrent` has been unable 487 // to run for at least the past `request_timeout` amount of 488 // time, meaning it can't possibly have accepted a task newer 489 // than that. 490 if let Some(deadline) = task_start_times 491 .lock() 492 .unwrap() 493 .earliest() 494 .and_then(|v| v.checked_add(request_timeout.saturating_mul(2))) 495 { 496 sleep.as_mut().reset(deadline.into()); 497 // Note that this will schedule a wakeup for later if the 498 // deadline has not yet arrived: 499 if sleep.as_mut().poll(cx).is_ready() { 500 // Deadline has been reached; kill the instance with an 501 // error. 502 return Poll::Ready(Err(anyhow!("guest timed out"))); 503 } 504 } 505 506 // Otherwise, if no timeouts have elapsed, we set 507 // `accept_concurrent` to true and, if it wasn't already true 508 // before, poll the future one more time so it can ask for 509 // another task if appropriate. 510 if !accept_concurrent.swap(true, Relaxed) { 511 return future.as_mut().poll(cx); 512 } 513 } 514 515 poll 516 }) 517 .await? 518 } 519 } 520 521 impl<S> Drop for Worker<S> 522 where 523 S: HandlerState, 524 { 525 fn drop(&mut self) { 526 self.set_available(false); 527 } 528 } 529 530 /// Represents the state of a web server. 531 /// 532 /// Note that this supports optional instance reuse, enabled when 533 /// `S::max_instance_reuse_count()` returns a number greater than one. See 534 /// [`Self::push`] for details. 535 pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>); 536 537 impl<S: HandlerState> Clone for ProxyHandler<S> { 538 fn clone(&self) -> Self { 539 Self(self.0.clone()) 540 } 541 } 542 543 impl<S> ProxyHandler<S> 544 where 545 S: HandlerState, 546 { 547 /// Create a new `ProxyHandler` with the specified application state and 548 /// pre-instance. 549 pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self { 550 Self(Arc::new(ProxyHandlerInner { 551 state, 552 instance_pre, 553 next_id: AtomicU64::from(0), 554 task_queue: Default::default(), 555 worker_count: AtomicUsize::from(0), 556 })) 557 } 558 559 /// Push a task to the task queue for this handler. 560 /// 561 /// This will either spawn a new background worker to run the task or 562 /// deliver it to an already-running worker. 563 /// 564 /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a 565 /// new worker is started for this task. It is intended to be used as a 566 /// "request identifier" corresponding to that task and can be used e.g. to 567 /// prefix all logging from the `Store` with that identifier. Note that a 568 /// non-`None` value only makes sense when `<S as 569 /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier 570 /// will not match subsequent tasks handled by the worker. 571 pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) { 572 match self.0.state.max_instance_reuse_count() { 573 0 => panic!("`max_instance_reuse_count` must be at least 1"), 574 _ => { 575 if self.0.worker_count.load(Relaxed) == 0 { 576 // There are no available workers; skip the queue and pass 577 // the task directly to the worker, which improves 578 // performance as measured by `wasmtime-server-rps.sh` by 579 // about 15%. 580 self.start_worker(Some(task), req_id); 581 } else { 582 self.0.task_queue.push(task); 583 // Start a new worker to handle the task if the last worker 584 // just went unavailable. See also `Worker::set_available` 585 // for what happens if the available worker count goes to 586 // zero right after we check it here, and note that we only 587 // check the count _after_ we've pushed the task to the 588 // queue. We use `SeqCst` here to ensure that we get an 589 // updated view of `worker_count` as it exists after the 590 // `Queue::push` above. 591 // 592 // The upshot is that at least one (or more) of the 593 // following will happen: 594 // 595 // - An existing worker will accept the task 596 // - We'll start a new worker here to accept the task 597 // - `Worker::set_available` will start a new worker to accept the task 598 // 599 // I.e. it should not be possible for the task to be 600 // orphaned indefinitely in the queue without being 601 // accepted. 602 if self.0.worker_count.load(SeqCst) == 0 { 603 self.start_worker(None, None); 604 } 605 } 606 } 607 } 608 } 609 610 /// Generate a unique request ID. 611 pub fn next_req_id(&self) -> u64 { 612 self.0.next_id.fetch_add(1, Relaxed) 613 } 614 615 /// Return a reference to the application state. 616 pub fn state(&self) -> &S { 617 &self.0.state 618 } 619 620 /// Return a reference to the pre-instance. 621 pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> { 622 &self.0.instance_pre 623 } 624 625 fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 626 tokio::spawn( 627 Worker { 628 handler: self.clone(), 629 available: false, 630 } 631 .run(task, req_id), 632 ); 633 } 634 } 635