1 //! Provides utilities useful for dispatching incoming HTTP requests 2 //! `wasi:http/handler` guest instances. 3 4 #[cfg(feature = "p3")] 5 use crate::p3; 6 use futures::stream::{FuturesUnordered, StreamExt}; 7 use std::collections::VecDeque; 8 use std::collections::btree_map::{BTreeMap, Entry}; 9 use std::future; 10 use std::pin::{Pin, pin}; 11 use std::sync::{ 12 Arc, Mutex, 13 atomic::{ 14 AtomicBool, AtomicU64, AtomicUsize, 15 Ordering::{Relaxed, SeqCst}, 16 }, 17 }; 18 use std::task::Poll; 19 use std::time::{Duration, Instant}; 20 use tokio::sync::Notify; 21 use wasmtime::AsContextMut; 22 use wasmtime::component::Accessor; 23 use wasmtime::{Result, Store, StoreContextMut, format_err}; 24 25 /// Alternative p2 bindings generated with `exports: { default: async | store }` 26 /// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances. 27 pub mod p2 { 28 #[expect(missing_docs, reason = "bindgen-generated code")] 29 pub mod bindings { 30 wasmtime::component::bindgen!({ 31 path: "wit", 32 world: "wasi:http/proxy", 33 imports: { default: tracing }, 34 exports: { default: async | store }, 35 require_store_data_send: true, 36 with: { 37 // http is in this crate 38 "wasi:http": crate::bindings::http, 39 // Upstream package dependencies 40 "wasi:io": wasmtime_wasi::p2::bindings::io, 41 } 42 }); 43 44 pub use wasi::*; 45 } 46 } 47 48 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 49 /// `wasi:http/handler@0.3.x` pre-instance. 50 pub enum ProxyPre<T: 'static> { 51 /// A `wasi:http/incoming-handler@0.2.x` pre-instance. 52 P2(p2::bindings::ProxyPre<T>), 53 /// A `wasi:http/handler@0.3.x` pre-instance. 54 #[cfg(feature = "p3")] 55 P3(p3::bindings::ServicePre<T>), 56 } 57 58 impl<T: 'static> ProxyPre<T> { 59 async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy> 60 where 61 T: Send, 62 { 63 Ok(match self { 64 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?), 65 #[cfg(feature = "p3")] 66 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?), 67 }) 68 } 69 } 70 71 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 72 /// `wasi:http/handler@0.3.x` instance. 73 pub enum Proxy { 74 /// A `wasi:http/incoming-handler@0.2.x` instance. 75 P2(p2::bindings::Proxy), 76 /// A `wasi:http/handler@0.3.x` instance. 77 #[cfg(feature = "p3")] 78 P3(p3::bindings::Service), 79 } 80 81 /// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or 82 /// `wasi:http/handler@0.3.x` instance. 83 pub type TaskFn<T> = Box< 84 dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> 85 + Send, 86 >; 87 88 /// Async MPMC channel where each item is delivered to at most one consumer. 89 struct Queue<T> { 90 queue: Mutex<VecDeque<T>>, 91 notify: Notify, 92 } 93 94 impl<T> Default for Queue<T> { 95 fn default() -> Self { 96 Self { 97 queue: Default::default(), 98 notify: Default::default(), 99 } 100 } 101 } 102 103 impl<T> Queue<T> { 104 fn is_empty(&self) -> bool { 105 self.queue.lock().unwrap().is_empty() 106 } 107 108 fn push(&self, item: T) { 109 self.queue.lock().unwrap().push_back(item); 110 self.notify.notify_one(); 111 } 112 113 fn try_pop(&self) -> Option<T> { 114 self.queue.lock().unwrap().pop_front() 115 } 116 117 async fn pop(&self) -> T { 118 // This code comes from the Unbound MPMC Channel example in [the 119 // `tokio::sync::Notify` 120 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html). 121 122 let mut notified = pin!(self.notify.notified()); 123 124 loop { 125 notified.as_mut().enable(); 126 if let Some(item) = self.try_pop() { 127 return item; 128 } 129 notified.as_mut().await; 130 notified.set(self.notify.notified()); 131 } 132 } 133 } 134 135 /// Bundles a [`Store`] with a callback to write a profile (if configured). 136 pub struct StoreBundle<T: 'static> { 137 /// The [`Store`] to use to handle requests. 138 pub store: Store<T>, 139 /// Callback to write a profile (if enabled) once all requests have been 140 /// handled. 141 pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>, 142 } 143 144 /// Represents the application-specific state of a web server. 145 pub trait HandlerState: 'static + Sync + Send { 146 /// The type of the associated data for [`Store`]s created using 147 /// [`Self::new_store`]. 148 type StoreData: Send; 149 150 /// Create a new [`Store`] for handling one or more requests. 151 /// 152 /// The `req_id` parameter is the value passed in the call to 153 /// [`ProxyHandler::spawn`] that created the worker to which the new `Store` 154 /// will belong. See that function's documentation for details. 155 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>; 156 157 /// Maximum time allowed to handle a request. 158 /// 159 /// In practice, a guest may be allowed to run up to 2x this time in the 160 /// case of instance reuse to avoid penalizing concurrent requests being 161 /// handled by the same instance. 162 fn request_timeout(&self) -> Duration; 163 164 /// Maximum time to keep an idle instance around before dropping it. 165 fn idle_instance_timeout(&self) -> Duration; 166 167 /// Maximum number of requests to handle using a single instance before 168 /// dropping it. 169 fn max_instance_reuse_count(&self) -> usize; 170 171 /// Maximum number of requests to handle concurrently using a single 172 /// instance. 173 fn max_instance_concurrent_reuse_count(&self) -> usize; 174 175 /// Called when a worker exits with an error. 176 fn handle_worker_error(&self, error: wasmtime::Error); 177 } 178 179 struct ProxyHandlerInner<S: HandlerState> { 180 state: S, 181 instance_pre: ProxyPre<S::StoreData>, 182 next_id: AtomicU64, 183 task_queue: Queue<TaskFn<S::StoreData>>, 184 worker_count: AtomicUsize, 185 } 186 187 /// Helper utility to track the start times of tasks accepted by a worker. 188 /// 189 /// This is used to ensure that timeouts are enforced even when the 190 /// `StoreContextMut::run_concurrent` event loop is unable to make progress due 191 /// to the guest either busy looping or being blocked on a synchronous call to a 192 /// host function which has exclusive access to the `Store`. 193 #[derive(Default)] 194 struct StartTimes(BTreeMap<Instant, usize>); 195 196 impl StartTimes { 197 fn add(&mut self, time: Instant) { 198 *self.0.entry(time).or_insert(0) += 1; 199 } 200 201 fn remove(&mut self, time: Instant) { 202 let Entry::Occupied(mut entry) = self.0.entry(time) else { 203 unreachable!() 204 }; 205 match *entry.get() { 206 0 => unreachable!(), 207 1 => { 208 entry.remove(); 209 } 210 _ => { 211 *entry.get_mut() -= 1; 212 } 213 } 214 } 215 216 fn earliest(&self) -> Option<Instant> { 217 self.0.first_key_value().map(|(&k, _)| k) 218 } 219 } 220 221 struct Worker<S> 222 where 223 S: HandlerState, 224 { 225 handler: ProxyHandler<S>, 226 available: bool, 227 } 228 229 impl<S> Worker<S> 230 where 231 S: HandlerState, 232 { 233 fn set_available(&mut self, available: bool) { 234 if available != self.available { 235 self.available = available; 236 if available { 237 self.handler.0.worker_count.fetch_add(1, Relaxed); 238 } else { 239 // Here we use `SeqCst` to ensure the load/store is ordered 240 // correctly with respect to the `Queue::is_empty` check we do 241 // below. 242 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst); 243 // This addresses what would otherwise be a race condition in 244 // `ProxyHandler::spawn` where it only starts a worker if the 245 // available worker count is zero. If we decrement the count to 246 // zero right after `ProxyHandler::spawn` checks it, then no 247 // worker will be started; thus it becomes our responsibility to 248 // start a worker here instead. 249 if count == 1 && !self.handler.0.task_queue.is_empty() { 250 self.handler.start_worker(None, None); 251 } 252 } 253 } 254 } 255 256 async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 257 if let Err(error) = self.run_(task, req_id).await { 258 self.handler.0.state.handle_worker_error(error); 259 } 260 } 261 262 async fn run_( 263 &mut self, 264 task: Option<TaskFn<S::StoreData>>, 265 req_id: Option<u64>, 266 ) -> Result<()> { 267 // NB: The code the follows is rather subtle in that it is structured 268 // carefully to provide a few key invariants related to how instance 269 // reuse and request timeouts interact: 270 // 271 // - A task must never be allowed to run for more than 2x the request 272 // timeout, if any. 273 // 274 // - Every task we accept here must be allowed to run for at least 1x 275 // the request timeout, if any. 276 // 277 // - When more than one task is run concurrently in the same instance, 278 // we must stop accepting new tasks as soon as any existing task reaches 279 // the request timeout. This serves to cap the amount of time we need 280 // to keep the instance alive before _all_ tasks have either completed 281 // or timed out. 282 // 283 // As of this writing, there's an additional wrinkle that makes 284 // guaranteeing those invariants particularly tricky: per #11869 and 285 // #11870, busy guest loops, epoch interruption, and host functions 286 // registered using `Linker::func_{wrap,new}_async` all require 287 // blocking, exclusive access to the `Store`, which effectively prevents 288 // the `StoreContextMut::run_concurrent` event loop from making 289 // progress. That, in turn, prevents any concurrent tasks from 290 // executing, and also prevents the `AsyncFnOnce` passed to 291 // `run_concurrent` from being polled. Consequently, we must rely on a 292 // "second line of defense" to ensure tasks are timed out promptly, 293 // which is to check for timeouts _outside_ the `run_concurrent` future. 294 // Once the aforementioned issues have been addressed, we'll be able to 295 // remove that check and its associated baggage. 296 297 let handler = &self.handler.0; 298 299 let StoreBundle { 300 mut store, 301 write_profile, 302 } = handler.state.new_store(req_id)?; 303 304 let request_timeout = handler.state.request_timeout(); 305 let idle_instance_timeout = handler.state.idle_instance_timeout(); 306 let max_instance_reuse_count = handler.state.max_instance_reuse_count(); 307 let max_instance_concurrent_reuse_count = 308 handler.state.max_instance_concurrent_reuse_count(); 309 310 let proxy = &handler.instance_pre.instantiate_async(&mut store).await?; 311 let accept_concurrent = AtomicBool::new(true); 312 let task_start_times = Mutex::new(StartTimes::default()); 313 314 let mut future = pin!(store.run_concurrent(async |accessor| { 315 let mut reuse_count = 0; 316 let mut timed_out = false; 317 let mut futures = FuturesUnordered::new(); 318 319 let accept_task = |task: TaskFn<S::StoreData>, 320 futures: &mut FuturesUnordered<_>, 321 reuse_count: &mut usize| { 322 // Set `accept_concurrent` to false, conservatively assuming 323 // that the new task will be CPU-bound, at least to begin with. 324 // Only once the `StoreContextMut::run_concurrent` event loop 325 // returns `Pending` will we set `accept_concurrent` back to 326 // true and consider accepting more tasks. 327 // 328 // This approach avoids taking on more than one CPU-bound task 329 // at a time, which would hurt throughput vs. leaving the 330 // additional tasks for other workers to handle. 331 accept_concurrent.store(false, Relaxed); 332 *reuse_count += 1; 333 334 let start_time = Instant::now().checked_add(request_timeout); 335 if let Some(start_time) = start_time { 336 task_start_times.lock().unwrap().add(start_time); 337 } 338 339 futures.push(tokio::time::timeout(request_timeout, async move { 340 (task)(accessor, proxy).await; 341 start_time 342 })); 343 }; 344 345 if let Some(task) = task { 346 accept_task(task, &mut futures, &mut reuse_count); 347 } 348 349 let handler = self.handler.clone(); 350 while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) { 351 let new_task = { 352 let future_count = futures.len(); 353 let mut next_future = pin!(async { 354 if futures.is_empty() { 355 future::pending().await 356 } else { 357 futures.next().await.unwrap() 358 } 359 }); 360 let mut next_task = pin!(tokio::time::timeout( 361 if future_count == 0 { 362 idle_instance_timeout 363 } else { 364 Duration::MAX 365 }, 366 handler.0.task_queue.pop() 367 )); 368 // Poll any existing tasks, and if they're all `Pending` 369 // _and_ we haven't reached any reuse limits yet, poll for a 370 // new task from the queue. 371 // 372 // Note the the order of operations here is important. By 373 // polling `next_future` first, we'll disover any tasks that 374 // may have timed out, at which point we'll stop accepting 375 // new tasks altogether (see below for details). This is 376 // especially imporant in the case where the task was 377 // blocked on a synchronous call to a host function which 378 // has exclusive access to the `Store`; once that call 379 // finishes, the first think we need to do is time out the 380 // task. If we were to poll for a new task first, then we'd 381 // have to wait for _that_ task to finish or time out before 382 // we could kill the instance. 383 future::poll_fn(|cx| match next_future.as_mut().poll(cx) { 384 Poll::Pending => { 385 // Note that `Pending` here doesn't necessarily mean 386 // all tasks are blocked on I/O. They might simply 387 // be waiting for some deferred work to be done by 388 // the next turn of the 389 // `StoreContextMut::run_concurrent` event loop. 390 // Therefore, we check `accept_concurrent` here and 391 // only advertise we have capacity for another task 392 // if either we have no tasks at all or all our 393 // tasks really are blocked on I/O. 394 self.set_available( 395 reuse_count < max_instance_reuse_count 396 && future_count < max_instance_concurrent_reuse_count 397 && (future_count == 0 || accept_concurrent.load(Relaxed)), 398 ); 399 400 if self.available { 401 next_task.as_mut().poll(cx).map(Some) 402 } else { 403 Poll::Pending 404 } 405 } 406 Poll::Ready(Ok(start_time)) => { 407 // Task completed; carry on! 408 if let Some(start_time) = start_time { 409 task_start_times.lock().unwrap().remove(start_time); 410 } 411 Poll::Ready(None) 412 } 413 Poll::Ready(Err(_)) => { 414 // Task timed out; stop accepting new tasks, but 415 // continue polling until any other, in-progress 416 // tasks until they have either finished or timed 417 // out. This effectively kicks off a "graceful 418 // shutdown" of the worker, allowing any other 419 // concurrent tasks time to finish before we drop 420 // the instance. 421 // 422 // TODO: We should also send a cancel request to the 423 // timed-out task to give it a chance to shut down 424 // gracefully (and delay dropping the instance for a 425 // reasonable amount of time), but as of this 426 // writing Wasmtime does not yet provide an API for 427 // doing that. See issue #11833. 428 timed_out = true; 429 reuse_count = max_instance_reuse_count; 430 Poll::Ready(None) 431 } 432 }) 433 .await 434 }; 435 436 match new_task { 437 Some(Ok(task)) => { 438 accept_task(task, &mut futures, &mut reuse_count); 439 } 440 Some(Err(_)) => break, 441 None => {} 442 } 443 } 444 445 accessor.with(|mut access| write_profile(access.as_context_mut())); 446 447 if timed_out { 448 Err(format_err!("guest timed out")) 449 } else { 450 wasmtime::error::Ok(()) 451 } 452 })); 453 454 let mut sleep = pin!(tokio::time::sleep(Duration::MAX)); 455 456 future::poll_fn(|cx| { 457 let poll = future.as_mut().poll(cx); 458 if poll.is_pending() { 459 // If the future returns `Pending`, that's either because it's 460 // idle (in which case it can definitely accept a new task) or 461 // because all its tasks are awaiting I/O, in which case it may 462 // have capacity for additional tasks to run concurrently. 463 // 464 // However, if one of the tasks is blocked on a sync call to a 465 // host function which has exclusive access to the `Store`, the 466 // `StoreContextMut::run_concurrent` event loop will be unable 467 // to make progress until that call finishes. Similarly, if the 468 // task loops indefinitely, subject only to epoch interruption, 469 // the event loop will also be stuck. Either way, any task 470 // timeouts created inside the `AsyncFnOnce` we passed to 471 // `run_concurrent` won't have a chance to trigger. 472 // Consequently, we need to _also_ enforce timeouts here, 473 // outside the event loop. 474 // 475 // Therefore, we check if the oldest outstanding task has been 476 // running for at least `request_timeout*2`, which is the 477 // maximum time needed for any other concurrent tasks to 478 // complete or time out, at which point we can safely discard 479 // the instance. If that deadline has not yet arrived, we 480 // schedule a wakeup to occur when it does. 481 // 482 // We uphold the "never kill an instance with a task which has 483 // been running for less than the request timeout" invariant 484 // here by noting that this timeout will only trigger if the 485 // `AsyncFnOnce` we passed to `run_concurrent` has been unable 486 // to run for at least the past `request_timeout` amount of 487 // time, meaning it can't possibly have accepted a task newer 488 // than that. 489 if let Some(deadline) = task_start_times 490 .lock() 491 .unwrap() 492 .earliest() 493 .and_then(|v| v.checked_add(request_timeout.saturating_mul(2))) 494 { 495 sleep.as_mut().reset(deadline.into()); 496 // Note that this will schedule a wakeup for later if the 497 // deadline has not yet arrived: 498 if sleep.as_mut().poll(cx).is_ready() { 499 // Deadline has been reached; kill the instance with an 500 // error. 501 return Poll::Ready(Err(format_err!("guest timed out"))); 502 } 503 } 504 505 // Otherwise, if no timeouts have elapsed, we set 506 // `accept_concurrent` to true and, if it wasn't already true 507 // before, poll the future one more time so it can ask for 508 // another task if appropriate. 509 if !accept_concurrent.swap(true, Relaxed) { 510 return future.as_mut().poll(cx); 511 } 512 } 513 514 poll 515 }) 516 .await? 517 } 518 } 519 520 impl<S> Drop for Worker<S> 521 where 522 S: HandlerState, 523 { 524 fn drop(&mut self) { 525 self.set_available(false); 526 } 527 } 528 529 /// Represents the state of a web server. 530 /// 531 /// Note that this supports optional instance reuse, enabled when 532 /// `S::max_instance_reuse_count()` returns a number greater than one. See 533 /// [`Self::spawn`] for details. 534 pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>); 535 536 impl<S: HandlerState> Clone for ProxyHandler<S> { 537 fn clone(&self) -> Self { 538 Self(self.0.clone()) 539 } 540 } 541 542 impl<S> ProxyHandler<S> 543 where 544 S: HandlerState, 545 { 546 /// Create a new `ProxyHandler` with the specified application state and 547 /// pre-instance. 548 pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self { 549 Self(Arc::new(ProxyHandlerInner { 550 state, 551 instance_pre, 552 next_id: AtomicU64::from(0), 553 task_queue: Default::default(), 554 worker_count: AtomicUsize::from(0), 555 })) 556 } 557 558 /// Push a task to the task queue for this handler. 559 /// 560 /// This will either spawn a new background worker to run the task or 561 /// deliver it to an already-running worker. 562 /// 563 /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a 564 /// new worker is started for this task. It is intended to be used as a 565 /// "request identifier" corresponding to that task and can be used e.g. to 566 /// prefix all logging from the `Store` with that identifier. Note that a 567 /// non-`None` value only makes sense when `<S as 568 /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier 569 /// will not match subsequent tasks handled by the worker. 570 pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) { 571 match self.0.state.max_instance_reuse_count() { 572 0 => panic!("`max_instance_reuse_count` must be at least 1"), 573 _ => { 574 if self.0.worker_count.load(Relaxed) == 0 { 575 // There are no available workers; skip the queue and pass 576 // the task directly to the worker, which improves 577 // performance as measured by `wasmtime-server-rps.sh` by 578 // about 15%. 579 self.start_worker(Some(task), req_id); 580 } else { 581 self.0.task_queue.push(task); 582 // Start a new worker to handle the task if the last worker 583 // just went unavailable. See also `Worker::set_available` 584 // for what happens if the available worker count goes to 585 // zero right after we check it here, and note that we only 586 // check the count _after_ we've pushed the task to the 587 // queue. We use `SeqCst` here to ensure that we get an 588 // updated view of `worker_count` as it exists after the 589 // `Queue::push` above. 590 // 591 // The upshot is that at least one (or more) of the 592 // following will happen: 593 // 594 // - An existing worker will accept the task 595 // - We'll start a new worker here to accept the task 596 // - `Worker::set_available` will start a new worker to accept the task 597 // 598 // I.e. it should not be possible for the task to be 599 // orphaned indefinitely in the queue without being 600 // accepted. 601 if self.0.worker_count.load(SeqCst) == 0 { 602 self.start_worker(None, None); 603 } 604 } 605 } 606 } 607 } 608 609 /// Generate a unique request ID. 610 pub fn next_req_id(&self) -> u64 { 611 self.0.next_id.fetch_add(1, Relaxed) 612 } 613 614 /// Return a reference to the application state. 615 pub fn state(&self) -> &S { 616 &self.0.state 617 } 618 619 /// Return a reference to the pre-instance. 620 pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> { 621 &self.0.instance_pre 622 } 623 624 fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 625 tokio::spawn( 626 Worker { 627 handler: self.clone(), 628 available: false, 629 } 630 .run(task, req_id), 631 ); 632 } 633 } 634