1 //! Provides utilities useful for dispatching incoming HTTP requests 2 //! `wasi:http/handler` guest instances. 3 4 #[cfg(feature = "p3")] 5 use crate::p3; 6 use futures::stream::{FuturesUnordered, StreamExt}; 7 use std::collections::VecDeque; 8 use std::collections::btree_map::{BTreeMap, Entry}; 9 use std::future; 10 use std::pin::{Pin, pin}; 11 use std::sync::{ 12 Arc, Mutex, 13 atomic::{ 14 AtomicBool, AtomicU64, AtomicUsize, 15 Ordering::{Relaxed, SeqCst}, 16 }, 17 }; 18 use std::task::Poll; 19 use std::time::{Duration, Instant}; 20 use tokio::sync::Notify; 21 use wasmtime::AsContextMut; 22 use wasmtime::component::Accessor; 23 use wasmtime::{Result, Store, StoreContextMut, format_err}; 24 25 /// Alternative p2 bindings generated with `exports: { default: async | store }` 26 /// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances. 27 #[cfg(feature = "p2")] 28 pub mod p2 { 29 #[expect(missing_docs, reason = "bindgen-generated code")] 30 pub mod bindings { 31 wasmtime::component::bindgen!({ 32 path: "wit", 33 world: "wasi:http/proxy", 34 imports: { default: tracing }, 35 exports: { default: async | store }, 36 require_store_data_send: true, 37 with: { 38 // http is in this crate 39 "wasi:http": crate::p2::bindings::http, 40 // Upstream package dependencies 41 "wasi:io": wasmtime_wasi::p2::bindings::io, 42 } 43 }); 44 45 pub use wasi::*; 46 } 47 } 48 49 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 50 /// `wasi:http/handler@0.3.x` pre-instance. 51 pub enum ProxyPre<T: 'static> { 52 /// A `wasi:http/incoming-handler@0.2.x` pre-instance. 53 #[cfg(feature = "p2")] 54 P2(p2::bindings::ProxyPre<T>), 55 /// A `wasi:http/handler@0.3.x` pre-instance. 56 #[cfg(feature = "p3")] 57 P3(p3::bindings::ServicePre<T>), 58 } 59 60 impl<T: 'static> ProxyPre<T> { instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy> where T: Send,61 async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy> 62 where 63 T: Send, 64 { 65 Ok(match self { 66 #[cfg(feature = "p2")] 67 Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?), 68 #[cfg(feature = "p3")] 69 Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?), 70 }) 71 } 72 } 73 74 /// Represents either a `wasi:http/incoming-handler@0.2.x` or 75 /// `wasi:http/handler@0.3.x` instance. 76 pub enum Proxy { 77 /// A `wasi:http/incoming-handler@0.2.x` instance. 78 #[cfg(feature = "p2")] 79 P2(p2::bindings::Proxy), 80 /// A `wasi:http/handler@0.3.x` instance. 81 #[cfg(feature = "p3")] 82 P3(p3::bindings::Service), 83 } 84 85 /// Represents a task to run using a `wasi:http/incoming-handler@0.2.x` or 86 /// `wasi:http/handler@0.3.x` instance. 87 pub type TaskFn<T> = Box< 88 dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> 89 + Send, 90 >; 91 92 /// Async MPMC channel where each item is delivered to at most one consumer. 93 struct Queue<T> { 94 queue: Mutex<VecDeque<T>>, 95 notify: Notify, 96 } 97 98 impl<T> Default for Queue<T> { default() -> Self99 fn default() -> Self { 100 Self { 101 queue: Default::default(), 102 notify: Default::default(), 103 } 104 } 105 } 106 107 impl<T> Queue<T> { is_empty(&self) -> bool108 fn is_empty(&self) -> bool { 109 self.queue.lock().unwrap().is_empty() 110 } 111 push(&self, item: T)112 fn push(&self, item: T) { 113 self.queue.lock().unwrap().push_back(item); 114 self.notify.notify_one(); 115 } 116 try_pop(&self) -> Option<T>117 fn try_pop(&self) -> Option<T> { 118 self.queue.lock().unwrap().pop_front() 119 } 120 pop(&self) -> T121 async fn pop(&self) -> T { 122 // This code comes from the Unbound MPMC Channel example in [the 123 // `tokio::sync::Notify` 124 // docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html). 125 126 let mut notified = pin!(self.notify.notified()); 127 128 loop { 129 notified.as_mut().enable(); 130 if let Some(item) = self.try_pop() { 131 return item; 132 } 133 notified.as_mut().await; 134 notified.set(self.notify.notified()); 135 } 136 } 137 } 138 139 /// Bundles a [`Store`] with a callback to write a profile (if configured). 140 pub struct StoreBundle<T: 'static> { 141 /// The [`Store`] to use to handle requests. 142 pub store: Store<T>, 143 /// Callback to write a profile (if enabled) once all requests have been 144 /// handled. 145 pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>, 146 } 147 148 /// Represents the application-specific state of a web server. 149 pub trait HandlerState: 'static + Sync + Send { 150 /// The type of the associated data for [`Store`]s created using 151 /// [`Self::new_store`]. 152 type StoreData: Send; 153 154 /// Create a new [`Store`] for handling one or more requests. 155 /// 156 /// The `req_id` parameter is the value passed in the call to 157 /// [`ProxyHandler::spawn`] that created the worker to which the new `Store` 158 /// will belong. See that function's documentation for details. new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>159 fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>; 160 161 /// Maximum time allowed to handle a request. 162 /// 163 /// In practice, a guest may be allowed to run up to 2x this time in the 164 /// case of instance reuse to avoid penalizing concurrent requests being 165 /// handled by the same instance. request_timeout(&self) -> Duration166 fn request_timeout(&self) -> Duration; 167 168 /// Maximum time to keep an idle instance around before dropping it. idle_instance_timeout(&self) -> Duration169 fn idle_instance_timeout(&self) -> Duration; 170 171 /// Maximum number of requests to handle using a single instance before 172 /// dropping it. max_instance_reuse_count(&self) -> usize173 fn max_instance_reuse_count(&self) -> usize; 174 175 /// Maximum number of requests to handle concurrently using a single 176 /// instance. max_instance_concurrent_reuse_count(&self) -> usize177 fn max_instance_concurrent_reuse_count(&self) -> usize; 178 179 /// Called when a worker exits with an error. handle_worker_error(&self, error: wasmtime::Error)180 fn handle_worker_error(&self, error: wasmtime::Error); 181 } 182 183 struct ProxyHandlerInner<S: HandlerState> { 184 state: S, 185 instance_pre: ProxyPre<S::StoreData>, 186 next_id: AtomicU64, 187 task_queue: Queue<TaskFn<S::StoreData>>, 188 worker_count: AtomicUsize, 189 } 190 191 /// Helper utility to track the start times of tasks accepted by a worker. 192 /// 193 /// This is used to ensure that timeouts are enforced even when the 194 /// `StoreContextMut::run_concurrent` event loop is unable to make progress due 195 /// to the guest either busy looping or being blocked on a synchronous call to a 196 /// host function which has exclusive access to the `Store`. 197 #[derive(Default)] 198 struct StartTimes(BTreeMap<Instant, usize>); 199 200 impl StartTimes { add(&mut self, time: Instant)201 fn add(&mut self, time: Instant) { 202 *self.0.entry(time).or_insert(0) += 1; 203 } 204 remove(&mut self, time: Instant)205 fn remove(&mut self, time: Instant) { 206 let Entry::Occupied(mut entry) = self.0.entry(time) else { 207 unreachable!() 208 }; 209 match *entry.get() { 210 0 => unreachable!(), 211 1 => { 212 entry.remove(); 213 } 214 _ => { 215 *entry.get_mut() -= 1; 216 } 217 } 218 } 219 earliest(&self) -> Option<Instant>220 fn earliest(&self) -> Option<Instant> { 221 self.0.first_key_value().map(|(&k, _)| k) 222 } 223 } 224 225 struct Worker<S> 226 where 227 S: HandlerState, 228 { 229 handler: ProxyHandler<S>, 230 available: bool, 231 } 232 233 impl<S> Worker<S> 234 where 235 S: HandlerState, 236 { set_available(&mut self, available: bool)237 fn set_available(&mut self, available: bool) { 238 if available != self.available { 239 self.available = available; 240 if available { 241 self.handler.0.worker_count.fetch_add(1, Relaxed); 242 } else { 243 // Here we use `SeqCst` to ensure the load/store is ordered 244 // correctly with respect to the `Queue::is_empty` check we do 245 // below. 246 let count = self.handler.0.worker_count.fetch_sub(1, SeqCst); 247 // This addresses what would otherwise be a race condition in 248 // `ProxyHandler::spawn` where it only starts a worker if the 249 // available worker count is zero. If we decrement the count to 250 // zero right after `ProxyHandler::spawn` checks it, then no 251 // worker will be started; thus it becomes our responsibility to 252 // start a worker here instead. 253 if count == 1 && !self.handler.0.task_queue.is_empty() { 254 self.handler.start_worker(None, None); 255 } 256 } 257 } 258 } 259 run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>)260 async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 261 if let Err(error) = self.run_(task, req_id).await { 262 self.handler.0.state.handle_worker_error(error); 263 } 264 } 265 run_( &mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>, ) -> Result<()>266 async fn run_( 267 &mut self, 268 task: Option<TaskFn<S::StoreData>>, 269 req_id: Option<u64>, 270 ) -> Result<()> { 271 // NB: The code the follows is rather subtle in that it is structured 272 // carefully to provide a few key invariants related to how instance 273 // reuse and request timeouts interact: 274 // 275 // - A task must never be allowed to run for more than 2x the request 276 // timeout, if any. 277 // 278 // - Every task we accept here must be allowed to run for at least 1x 279 // the request timeout, if any. 280 // 281 // - When more than one task is run concurrently in the same instance, 282 // we must stop accepting new tasks as soon as any existing task reaches 283 // the request timeout. This serves to cap the amount of time we need 284 // to keep the instance alive before _all_ tasks have either completed 285 // or timed out. 286 // 287 // As of this writing, there's an additional wrinkle that makes 288 // guaranteeing those invariants particularly tricky: per #11869 and 289 // #11870, busy guest loops, epoch interruption, and host functions 290 // registered using `Linker::func_{wrap,new}_async` all require 291 // blocking, exclusive access to the `Store`, which effectively prevents 292 // the `StoreContextMut::run_concurrent` event loop from making 293 // progress. That, in turn, prevents any concurrent tasks from 294 // executing, and also prevents the `AsyncFnOnce` passed to 295 // `run_concurrent` from being polled. Consequently, we must rely on a 296 // "second line of defense" to ensure tasks are timed out promptly, 297 // which is to check for timeouts _outside_ the `run_concurrent` future. 298 // Once the aforementioned issues have been addressed, we'll be able to 299 // remove that check and its associated baggage. 300 301 let handler = &self.handler.0; 302 303 let StoreBundle { 304 mut store, 305 write_profile, 306 } = handler.state.new_store(req_id)?; 307 308 let request_timeout = handler.state.request_timeout(); 309 let idle_instance_timeout = handler.state.idle_instance_timeout(); 310 let max_instance_reuse_count = handler.state.max_instance_reuse_count(); 311 let max_instance_concurrent_reuse_count = 312 handler.state.max_instance_concurrent_reuse_count(); 313 314 let proxy = &handler.instance_pre.instantiate_async(&mut store).await?; 315 let accept_concurrent = AtomicBool::new(true); 316 let task_start_times = Mutex::new(StartTimes::default()); 317 318 let mut future = pin!(store.run_concurrent(async |accessor| { 319 let mut reuse_count = 0; 320 let mut timed_out = false; 321 let mut futures = FuturesUnordered::new(); 322 323 let accept_task = |task: TaskFn<S::StoreData>, 324 futures: &mut FuturesUnordered<_>, 325 reuse_count: &mut usize| { 326 // Set `accept_concurrent` to false, conservatively assuming 327 // that the new task will be CPU-bound, at least to begin with. 328 // Only once the `StoreContextMut::run_concurrent` event loop 329 // returns `Pending` will we set `accept_concurrent` back to 330 // true and consider accepting more tasks. 331 // 332 // This approach avoids taking on more than one CPU-bound task 333 // at a time, which would hurt throughput vs. leaving the 334 // additional tasks for other workers to handle. 335 accept_concurrent.store(false, Relaxed); 336 *reuse_count += 1; 337 338 let start_time = Instant::now().checked_add(request_timeout); 339 if let Some(start_time) = start_time { 340 task_start_times.lock().unwrap().add(start_time); 341 } 342 343 futures.push(tokio::time::timeout(request_timeout, async move { 344 (task)(accessor, proxy).await; 345 start_time 346 })); 347 }; 348 349 if let Some(task) = task { 350 accept_task(task, &mut futures, &mut reuse_count); 351 } 352 353 let handler = self.handler.clone(); 354 while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) { 355 let new_task = { 356 let future_count = futures.len(); 357 let mut next_future = pin!(async { 358 if futures.is_empty() { 359 future::pending().await 360 } else { 361 futures.next().await.unwrap() 362 } 363 }); 364 let mut next_task = pin!(tokio::time::timeout( 365 if future_count == 0 { 366 idle_instance_timeout 367 } else { 368 Duration::MAX 369 }, 370 handler.0.task_queue.pop() 371 )); 372 // Poll any existing tasks, and if they're all `Pending` 373 // _and_ we haven't reached any reuse limits yet, poll for a 374 // new task from the queue. 375 // 376 // Note the the order of operations here is important. By 377 // polling `next_future` first, we'll discover any tasks that 378 // may have timed out, at which point we'll stop accepting 379 // new tasks altogether (see below for details). This is 380 // especially important in the case where the task was 381 // blocked on a synchronous call to a host function which 382 // has exclusive access to the `Store`; once that call 383 // finishes, the first think we need to do is time out the 384 // task. If we were to poll for a new task first, then we'd 385 // have to wait for _that_ task to finish or time out before 386 // we could kill the instance. 387 future::poll_fn(|cx| match next_future.as_mut().poll(cx) { 388 Poll::Pending => { 389 // Note that `Pending` here doesn't necessarily mean 390 // all tasks are blocked on I/O. They might simply 391 // be waiting for some deferred work to be done by 392 // the next turn of the 393 // `StoreContextMut::run_concurrent` event loop. 394 // Therefore, we check `accept_concurrent` here and 395 // only advertise we have capacity for another task 396 // if either we have no tasks at all or all our 397 // tasks really are blocked on I/O. 398 self.set_available( 399 reuse_count < max_instance_reuse_count 400 && future_count < max_instance_concurrent_reuse_count 401 && (future_count == 0 || accept_concurrent.load(Relaxed)), 402 ); 403 404 if self.available { 405 next_task.as_mut().poll(cx).map(Some) 406 } else { 407 Poll::Pending 408 } 409 } 410 Poll::Ready(Ok(start_time)) => { 411 // Task completed; carry on! 412 if let Some(start_time) = start_time { 413 task_start_times.lock().unwrap().remove(start_time); 414 } 415 Poll::Ready(None) 416 } 417 Poll::Ready(Err(_)) => { 418 // Task timed out; stop accepting new tasks, but 419 // continue polling until any other, in-progress 420 // tasks until they have either finished or timed 421 // out. This effectively kicks off a "graceful 422 // shutdown" of the worker, allowing any other 423 // concurrent tasks time to finish before we drop 424 // the instance. 425 // 426 // TODO: We should also send a cancel request to the 427 // timed-out task to give it a chance to shut down 428 // gracefully (and delay dropping the instance for a 429 // reasonable amount of time), but as of this 430 // writing Wasmtime does not yet provide an API for 431 // doing that. See issue #11833. 432 timed_out = true; 433 reuse_count = max_instance_reuse_count; 434 Poll::Ready(None) 435 } 436 }) 437 .await 438 }; 439 440 match new_task { 441 Some(Ok(task)) => { 442 accept_task(task, &mut futures, &mut reuse_count); 443 } 444 Some(Err(_)) => break, 445 None => {} 446 } 447 } 448 449 accessor.with(|mut access| write_profile(access.as_context_mut())); 450 451 if timed_out { 452 Err(format_err!("guest timed out")) 453 } else { 454 wasmtime::error::Ok(()) 455 } 456 })); 457 458 let mut sleep = pin!(tokio::time::sleep(Duration::MAX)); 459 460 future::poll_fn(|cx| { 461 let poll = future.as_mut().poll(cx); 462 if poll.is_pending() { 463 // If the future returns `Pending`, that's either because it's 464 // idle (in which case it can definitely accept a new task) or 465 // because all its tasks are awaiting I/O, in which case it may 466 // have capacity for additional tasks to run concurrently. 467 // 468 // However, if one of the tasks is blocked on a sync call to a 469 // host function which has exclusive access to the `Store`, the 470 // `StoreContextMut::run_concurrent` event loop will be unable 471 // to make progress until that call finishes. Similarly, if the 472 // task loops indefinitely, subject only to epoch interruption, 473 // the event loop will also be stuck. Either way, any task 474 // timeouts created inside the `AsyncFnOnce` we passed to 475 // `run_concurrent` won't have a chance to trigger. 476 // Consequently, we need to _also_ enforce timeouts here, 477 // outside the event loop. 478 // 479 // Therefore, we check if the oldest outstanding task has been 480 // running for at least `request_timeout*2`, which is the 481 // maximum time needed for any other concurrent tasks to 482 // complete or time out, at which point we can safely discard 483 // the instance. If that deadline has not yet arrived, we 484 // schedule a wakeup to occur when it does. 485 // 486 // We uphold the "never kill an instance with a task which has 487 // been running for less than the request timeout" invariant 488 // here by noting that this timeout will only trigger if the 489 // `AsyncFnOnce` we passed to `run_concurrent` has been unable 490 // to run for at least the past `request_timeout` amount of 491 // time, meaning it can't possibly have accepted a task newer 492 // than that. 493 if let Some(deadline) = task_start_times 494 .lock() 495 .unwrap() 496 .earliest() 497 .and_then(|v| v.checked_add(request_timeout.saturating_mul(2))) 498 { 499 sleep.as_mut().reset(deadline.into()); 500 // Note that this will schedule a wakeup for later if the 501 // deadline has not yet arrived: 502 if sleep.as_mut().poll(cx).is_ready() { 503 // Deadline has been reached; kill the instance with an 504 // error. 505 return Poll::Ready(Err(format_err!("guest timed out"))); 506 } 507 } 508 509 // Otherwise, if no timeouts have elapsed, we set 510 // `accept_concurrent` to true and, if it wasn't already true 511 // before, poll the future one more time so it can ask for 512 // another task if appropriate. 513 if !accept_concurrent.swap(true, Relaxed) { 514 return future.as_mut().poll(cx); 515 } 516 } 517 518 poll 519 }) 520 .await? 521 } 522 } 523 524 impl<S> Drop for Worker<S> 525 where 526 S: HandlerState, 527 { drop(&mut self)528 fn drop(&mut self) { 529 self.set_available(false); 530 } 531 } 532 533 /// Represents the state of a web server. 534 /// 535 /// Note that this supports optional instance reuse, enabled when 536 /// `S::max_instance_reuse_count()` returns a number greater than one. See 537 /// [`Self::spawn`] for details. 538 pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>); 539 540 impl<S: HandlerState> Clone for ProxyHandler<S> { clone(&self) -> Self541 fn clone(&self) -> Self { 542 Self(self.0.clone()) 543 } 544 } 545 546 impl<S> ProxyHandler<S> 547 where 548 S: HandlerState, 549 { 550 /// Create a new `ProxyHandler` with the specified application state and 551 /// pre-instance. new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self552 pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self { 553 Self(Arc::new(ProxyHandlerInner { 554 state, 555 instance_pre, 556 next_id: AtomicU64::from(0), 557 task_queue: Default::default(), 558 worker_count: AtomicUsize::from(0), 559 })) 560 } 561 562 /// Push a task to the task queue for this handler. 563 /// 564 /// This will either spawn a new background worker to run the task or 565 /// deliver it to an already-running worker. 566 /// 567 /// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a 568 /// new worker is started for this task. It is intended to be used as a 569 /// "request identifier" corresponding to that task and can be used e.g. to 570 /// prefix all logging from the `Store` with that identifier. Note that a 571 /// non-`None` value only makes sense when `<S as 572 /// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier 573 /// will not match subsequent tasks handled by the worker. spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>)574 pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) { 575 match self.0.state.max_instance_reuse_count() { 576 0 => panic!("`max_instance_reuse_count` must be at least 1"), 577 _ => { 578 if self.0.worker_count.load(Relaxed) == 0 { 579 // There are no available workers; skip the queue and pass 580 // the task directly to the worker, which improves 581 // performance as measured by `wasmtime-server-rps.sh` by 582 // about 15%. 583 self.start_worker(Some(task), req_id); 584 } else { 585 self.0.task_queue.push(task); 586 // Start a new worker to handle the task if the last worker 587 // just went unavailable. See also `Worker::set_available` 588 // for what happens if the available worker count goes to 589 // zero right after we check it here, and note that we only 590 // check the count _after_ we've pushed the task to the 591 // queue. We use `SeqCst` here to ensure that we get an 592 // updated view of `worker_count` as it exists after the 593 // `Queue::push` above. 594 // 595 // The upshot is that at least one (or more) of the 596 // following will happen: 597 // 598 // - An existing worker will accept the task 599 // - We'll start a new worker here to accept the task 600 // - `Worker::set_available` will start a new worker to accept the task 601 // 602 // I.e. it should not be possible for the task to be 603 // orphaned indefinitely in the queue without being 604 // accepted. 605 if self.0.worker_count.load(SeqCst) == 0 { 606 self.start_worker(None, None); 607 } 608 } 609 } 610 } 611 } 612 613 /// Generate a unique request ID. next_req_id(&self) -> u64614 pub fn next_req_id(&self) -> u64 { 615 self.0.next_id.fetch_add(1, Relaxed) 616 } 617 618 /// Return a reference to the application state. state(&self) -> &S619 pub fn state(&self) -> &S { 620 &self.0.state 621 } 622 623 /// Return a reference to the pre-instance. instance_pre(&self) -> &ProxyPre<S::StoreData>624 pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> { 625 &self.0.instance_pre 626 } 627 start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>)628 fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) { 629 tokio::spawn( 630 Worker { 631 handler: self.clone(), 632 available: false, 633 } 634 .run(task, req_id), 635 ); 636 } 637 } 638