1 //! Implements the pooling instance allocator.
2 //!
3 //! The pooling instance allocator maps memory in advance and allocates
4 //! instances, memories, tables, and stacks from a pool of available resources.
5 //! Using the pooling instance allocator can speed up module instantiation when
6 //! modules can be constrained based on configurable limits
7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8 //! are allocated and freed, these slots are either filled or emptied:
9 //!
10 //! ```text
11 //! ┌──────┬──────┬──────┬──────┬──────┐
12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13 //! └──────┴──────┴──────┴──────┴──────┘
14 //! ```
15 //!
16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated
18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19 //! [`stack_pool`]. See those modules for more details.
20 
21 mod decommit_queue;
22 mod index_allocator;
23 mod memory_pool;
24 mod metrics;
25 mod table_pool;
26 
27 #[cfg(feature = "gc")]
28 mod gc_heap_pool;
29 
30 #[cfg(all(feature = "async"))]
31 mod generic_stack_pool;
32 #[cfg(all(feature = "async", unix, not(miri)))]
33 mod unix_stack_pool;
34 
35 #[cfg(all(feature = "async"))]
36 cfg_if::cfg_if! {
37     if #[cfg(all(unix, not(miri), not(asan)))] {
38         use unix_stack_pool as stack_pool;
39     } else {
40         use generic_stack_pool as stack_pool;
41     }
42 }
43 
44 use self::decommit_queue::DecommitQueue;
45 use self::memory_pool::MemoryPool;
46 use self::table_pool::TablePool;
47 use super::{
48     InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49 };
50 use crate::Enabled;
51 use crate::prelude::*;
52 use crate::runtime::vm::{
53     CompiledModuleId, Memory, Table,
54     instance::Instance,
55     mpk::{self, ProtectionKey, ProtectionMask},
56     sys::vm::PageMap,
57 };
58 use core::future::Future;
59 use core::pin::Pin;
60 use core::sync::atomic::AtomicUsize;
61 use std::borrow::Cow;
62 use std::fmt::Display;
63 use std::sync::{Mutex, MutexGuard};
64 use std::{
65     mem,
66     sync::atomic::{AtomicU64, Ordering},
67 };
68 use wasmtime_environ::{
69     DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
70 };
71 
72 pub use self::metrics::PoolingAllocatorMetrics;
73 
74 #[cfg(feature = "gc")]
75 use super::GcHeapAllocationIndex;
76 #[cfg(feature = "gc")]
77 use crate::runtime::vm::{GcHeap, GcRuntime};
78 #[cfg(feature = "gc")]
79 use gc_heap_pool::GcHeapPool;
80 
81 #[cfg(feature = "async")]
82 use stack_pool::StackPool;
83 
84 #[cfg(feature = "component-model")]
85 use wasmtime_environ::{
86     StaticModuleIndex,
87     component::{Component, VMComponentOffsets},
88 };
89 
90 fn round_up_to_pow2(n: usize, to: usize) -> usize {
91     debug_assert!(to > 0);
92     debug_assert!(to.is_power_of_two());
93     (n + to - 1) & !(to - 1)
94 }
95 
96 /// Instance-related limit configuration for pooling.
97 ///
98 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
99 #[derive(Debug, Copy, Clone)]
100 pub struct InstanceLimits {
101     /// The maximum number of component instances that may be allocated
102     /// concurrently.
103     pub total_component_instances: u32,
104 
105     /// The maximum size of a component's `VMComponentContext`, including
106     /// the aggregate size of all its inner core modules' `VMContext` sizes.
107     pub component_instance_size: usize,
108 
109     /// The maximum number of core module instances that may be allocated
110     /// concurrently.
111     pub total_core_instances: u32,
112 
113     /// The maximum number of core module instances that a single component may
114     /// transitively contain.
115     pub max_core_instances_per_component: u32,
116 
117     /// The maximum number of Wasm linear memories that a component may
118     /// transitively contain.
119     pub max_memories_per_component: u32,
120 
121     /// The maximum number of tables that a component may transitively contain.
122     pub max_tables_per_component: u32,
123 
124     /// The total number of linear memories in the pool, across all instances.
125     pub total_memories: u32,
126 
127     /// The total number of tables in the pool, across all instances.
128     pub total_tables: u32,
129 
130     /// The total number of async stacks in the pool, across all instances.
131     #[cfg(feature = "async")]
132     pub total_stacks: u32,
133 
134     /// Maximum size of a core instance's `VMContext`.
135     pub core_instance_size: usize,
136 
137     /// Maximum number of tables per instance.
138     pub max_tables_per_module: u32,
139 
140     /// Maximum number of word-size elements per table.
141     ///
142     /// Note that tables for element types such as continuations
143     /// that use more than one word of storage may store fewer
144     /// elements.
145     pub table_elements: usize,
146 
147     /// Maximum number of linear memories per instance.
148     pub max_memories_per_module: u32,
149 
150     /// Maximum byte size of a linear memory, must be smaller than
151     /// `memory_reservation` in `Tunables`.
152     pub max_memory_size: usize,
153 
154     /// The total number of GC heaps in the pool, across all instances.
155     #[cfg(feature = "gc")]
156     pub total_gc_heaps: u32,
157 }
158 
159 impl Default for InstanceLimits {
160     fn default() -> Self {
161         let total = if cfg!(target_pointer_width = "32") {
162             100
163         } else {
164             1000
165         };
166         // See doc comments for `wasmtime::PoolingAllocationConfig` for these
167         // default values
168         Self {
169             total_component_instances: total,
170             component_instance_size: 1 << 20, // 1 MiB
171             total_core_instances: total,
172             max_core_instances_per_component: u32::MAX,
173             max_memories_per_component: u32::MAX,
174             max_tables_per_component: u32::MAX,
175             total_memories: total,
176             total_tables: total,
177             #[cfg(feature = "async")]
178             total_stacks: total,
179             core_instance_size: 1 << 20, // 1 MiB
180             max_tables_per_module: 1,
181             // NB: in #8504 it was seen that a C# module in debug module can
182             // have 10k+ elements.
183             table_elements: 20_000,
184             max_memories_per_module: 1,
185             #[cfg(target_pointer_width = "64")]
186             max_memory_size: 1 << 32, // 4G,
187             #[cfg(target_pointer_width = "32")]
188             max_memory_size: 10 << 20, // 10 MiB
189             #[cfg(feature = "gc")]
190             total_gc_heaps: total,
191         }
192     }
193 }
194 
195 /// Configuration options for the pooling instance allocator supplied at
196 /// construction.
197 #[derive(Copy, Clone, Debug)]
198 pub struct PoolingInstanceAllocatorConfig {
199     /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
200     pub max_unused_warm_slots: u32,
201     /// The target number of decommits to do per batch. This is not precise, as
202     /// we can queue up decommits at times when we aren't prepared to
203     /// immediately flush them, and so we may go over this target size
204     /// occasionally.
205     pub decommit_batch_size: usize,
206     /// The size, in bytes, of async stacks to allocate (not including the guard
207     /// page).
208     pub stack_size: usize,
209     /// The limits to apply to instances allocated within this allocator.
210     pub limits: InstanceLimits,
211     /// Whether or not async stacks are zeroed after use.
212     pub async_stack_zeroing: bool,
213     /// If async stack zeroing is enabled and the host platform is Linux this is
214     /// how much memory to zero out with `memset`.
215     ///
216     /// The rest of memory will be zeroed out with `madvise`.
217     #[cfg(feature = "async")]
218     pub async_stack_keep_resident: usize,
219     /// How much linear memory, in bytes, to keep resident after resetting for
220     /// use with the next instance. This much memory will be `memset` to zero
221     /// when a linear memory is deallocated.
222     ///
223     /// Memory exceeding this amount in the wasm linear memory will be released
224     /// with `madvise` back to the kernel.
225     ///
226     /// Only applicable on Linux.
227     pub linear_memory_keep_resident: usize,
228     /// Same as `linear_memory_keep_resident` but for tables.
229     pub table_keep_resident: usize,
230     /// Whether to enable memory protection keys.
231     pub memory_protection_keys: Enabled,
232     /// How many memory protection keys to allocate.
233     pub max_memory_protection_keys: usize,
234     /// Whether to enable PAGEMAP_SCAN on Linux.
235     pub pagemap_scan: Enabled,
236 }
237 
238 impl Default for PoolingInstanceAllocatorConfig {
239     fn default() -> PoolingInstanceAllocatorConfig {
240         PoolingInstanceAllocatorConfig {
241             max_unused_warm_slots: 100,
242             decommit_batch_size: 1,
243             stack_size: 2 << 20,
244             limits: InstanceLimits::default(),
245             async_stack_zeroing: false,
246             #[cfg(feature = "async")]
247             async_stack_keep_resident: 0,
248             linear_memory_keep_resident: 0,
249             table_keep_resident: 0,
250             memory_protection_keys: Enabled::No,
251             max_memory_protection_keys: 16,
252             pagemap_scan: Enabled::No,
253         }
254     }
255 }
256 
257 impl PoolingInstanceAllocatorConfig {
258     pub fn is_pagemap_scan_available() -> bool {
259         PageMap::new().is_some()
260     }
261 }
262 
263 /// An error returned when the pooling allocator cannot allocate a table,
264 /// memory, etc... because the maximum number of concurrent allocations for that
265 /// entity has been reached.
266 #[derive(Debug)]
267 pub struct PoolConcurrencyLimitError {
268     limit: usize,
269     kind: Cow<'static, str>,
270 }
271 
272 impl core::error::Error for PoolConcurrencyLimitError {}
273 
274 impl Display for PoolConcurrencyLimitError {
275     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276         let limit = self.limit;
277         let kind = &self.kind;
278         write!(f, "maximum concurrent limit of {limit} for {kind} reached")
279     }
280 }
281 
282 impl PoolConcurrencyLimitError {
283     fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
284         Self {
285             limit,
286             kind: kind.into(),
287         }
288     }
289 }
290 
291 /// Implements the pooling instance allocator.
292 ///
293 /// This allocator internally maintains pools of instances, memories, tables,
294 /// and stacks.
295 ///
296 /// Note: the resource pools are manually dropped so that the fault handler
297 /// terminates correctly.
298 #[derive(Debug)]
299 pub struct PoolingInstanceAllocator {
300     decommit_batch_size: usize,
301     limits: InstanceLimits,
302 
303     // The number of live core module and component instances at any given
304     // time. Note that this can temporarily go over the configured limit. This
305     // doesn't mean we have actually overshot, but that we attempted to allocate
306     // a new instance and incremented the counter, we've seen (or are about to
307     // see) that the counter is beyond the configured threshold, and are going
308     // to decrement the counter and return an error but haven't done so yet. See
309     // the increment trait methods for more details.
310     live_core_instances: AtomicU64,
311     live_component_instances: AtomicU64,
312 
313     decommit_queue: Mutex<DecommitQueue>,
314 
315     memories: MemoryPool,
316     live_memories: AtomicUsize,
317 
318     tables: TablePool,
319     live_tables: AtomicUsize,
320 
321     #[cfg(feature = "gc")]
322     gc_heaps: GcHeapPool,
323     #[cfg(feature = "gc")]
324     live_gc_heaps: AtomicUsize,
325 
326     #[cfg(feature = "async")]
327     stacks: StackPool,
328     #[cfg(feature = "async")]
329     live_stacks: AtomicUsize,
330 
331     pagemap: Option<PageMap>,
332 }
333 
334 impl Drop for PoolingInstanceAllocator {
335     fn drop(&mut self) {
336         if !cfg!(debug_assertions) {
337             return;
338         }
339 
340         // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
341         // the queue, as the sub-pools will unmap those ranges anyways, so
342         // there's no point in decommitting them. But we do need to flush the
343         // queue when debug assertions are enabled to make sure that all
344         // entities get returned to their associated sub-pools and we can
345         // differentiate between a leaking slot and an enqueued-for-decommit
346         // slot.
347         let queue = self.decommit_queue.lock().unwrap();
348         self.flush_decommit_queue(queue);
349 
350         debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
351         debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
352         debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
353         debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
354 
355         debug_assert!(self.memories.is_empty());
356         debug_assert!(self.tables.is_empty());
357 
358         #[cfg(feature = "gc")]
359         {
360             debug_assert!(self.gc_heaps.is_empty());
361             debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
362         }
363 
364         #[cfg(feature = "async")]
365         {
366             debug_assert!(self.stacks.is_empty());
367             debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
368         }
369     }
370 }
371 
372 impl PoolingInstanceAllocator {
373     /// Creates a new pooling instance allocator with the given strategy and limits.
374     pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
375         Ok(Self {
376             decommit_batch_size: config.decommit_batch_size,
377             limits: config.limits,
378             live_component_instances: AtomicU64::new(0),
379             live_core_instances: AtomicU64::new(0),
380             decommit_queue: Mutex::new(DecommitQueue::default()),
381             memories: MemoryPool::new(config, tunables)?,
382             live_memories: AtomicUsize::new(0),
383             tables: TablePool::new(config)?,
384             live_tables: AtomicUsize::new(0),
385             #[cfg(feature = "gc")]
386             gc_heaps: GcHeapPool::new(config)?,
387             #[cfg(feature = "gc")]
388             live_gc_heaps: AtomicUsize::new(0),
389             #[cfg(feature = "async")]
390             stacks: StackPool::new(config)?,
391             #[cfg(feature = "async")]
392             live_stacks: AtomicUsize::new(0),
393             pagemap: match config.pagemap_scan {
394                 Enabled::Auto => PageMap::new(),
395                 Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
396                     format_err!(
397                         "required to enable PAGEMAP_SCAN but this system \
398                          does not support it"
399                     )
400                 })?),
401                 Enabled::No => None,
402             },
403         })
404     }
405 
406     fn core_instance_size(&self) -> usize {
407         round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
408     }
409 
410     fn validate_table_plans(&self, module: &Module) -> Result<()> {
411         self.tables.validate(module)
412     }
413 
414     fn validate_memory_plans(&self, module: &Module) -> Result<()> {
415         self.memories.validate_memories(module)
416     }
417 
418     fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
419         let layout = Instance::alloc_layout(offsets);
420         if layout.size() <= self.core_instance_size() {
421             return Ok(());
422         }
423 
424         // If this `module` exceeds the allocation size allotted to it then an
425         // error will be reported here. The error of "required N bytes but
426         // cannot allocate that" is pretty opaque, however, because it's not
427         // clear what the breakdown of the N bytes are and what to optimize
428         // next. To help provide a better error message here some fancy-ish
429         // logic is done here to report the breakdown of the byte request into
430         // the largest portions and where it's coming from.
431         let mut message = format!(
432             "instance allocation for this module \
433              requires {} bytes which exceeds the configured maximum \
434              of {} bytes; breakdown of allocation requirement:\n\n",
435             layout.size(),
436             self.core_instance_size(),
437         );
438 
439         let mut remaining = layout.size();
440         let mut push = |name: &str, bytes: usize| {
441             assert!(remaining >= bytes);
442             remaining -= bytes;
443 
444             // If the `name` region is more than 5% of the allocation request
445             // then report it here, otherwise ignore it. We have less than 20
446             // fields so we're guaranteed that something should be reported, and
447             // otherwise it's not particularly interesting to learn about 5
448             // different fields that are all 8 or 0 bytes. Only try to report
449             // the "major" sources of bytes here.
450             if bytes > layout.size() / 20 {
451                 message.push_str(&format!(
452                     " * {:.02}% - {} bytes - {}\n",
453                     ((bytes as f32) / (layout.size() as f32)) * 100.0,
454                     bytes,
455                     name,
456                 ));
457             }
458         };
459 
460         // The `Instance` itself requires some size allocated to it.
461         push("instance state management", mem::size_of::<Instance>());
462 
463         // Afterwards the `VMContext`'s regions are why we're requesting bytes,
464         // so ask it for descriptions on each region's byte size.
465         for (desc, size) in offsets.region_sizes() {
466             push(desc, size as usize);
467         }
468 
469         // double-check we accounted for all the bytes
470         assert_eq!(remaining, 0);
471 
472         bail!("{message}")
473     }
474 
475     #[cfg(feature = "component-model")]
476     fn validate_component_instance_size(
477         &self,
478         offsets: &VMComponentOffsets<HostPtr>,
479         core_instances_aggregate_size: usize,
480     ) -> Result<()> {
481         let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
482         let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
483         if total_instance_size <= self.limits.component_instance_size {
484             return Ok(());
485         }
486 
487         // TODO: Add context with detailed accounting of what makes up all the
488         // `VMComponentContext`'s space like we do for module instances.
489         bail!(
490             "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
491              and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
492              `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
493              {core_instances_aggregate_size} bytes.",
494             self.limits.component_instance_size
495         )
496     }
497 
498     fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
499         // Take the queue out of the mutex and drop the lock, to minimize
500         // contention.
501         let queue = mem::take(&mut *locked_queue);
502         drop(locked_queue);
503         queue.flush(self)
504     }
505 
506     /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
507     /// flushing the decommit queue. If flushing the queue freed up slots, then
508     /// try running `f` again.
509     #[cfg(feature = "async")]
510     fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
511         f().or_else(|e| {
512             if e.is::<PoolConcurrencyLimitError>() {
513                 let queue = self.decommit_queue.lock().unwrap();
514                 if self.flush_decommit_queue(queue) {
515                     return f();
516                 }
517             }
518 
519             Err(e)
520         })
521     }
522 
523     fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
524         match local_queue.raw_len() {
525             // If we didn't enqueue any regions for decommit, then we must have
526             // either memset the whole entity or eagerly remapped it to zero
527             // because we don't have linux's `madvise(DONTNEED)` semantics. In
528             // either case, the entity slot is ready for reuse immediately.
529             0 => {
530                 local_queue.flush(self);
531             }
532 
533             // We enqueued at least our batch size of regions for decommit, so
534             // flush the local queue immediately. Don't bother inspecting (or
535             // locking!) the shared queue.
536             n if n >= self.decommit_batch_size => {
537                 local_queue.flush(self);
538             }
539 
540             // If we enqueued some regions for decommit, but did not reach our
541             // batch size, so we don't want to flush it yet, then merge the
542             // local queue into the shared queue.
543             n => {
544                 debug_assert!(n < self.decommit_batch_size);
545                 let mut shared_queue = self.decommit_queue.lock().unwrap();
546                 shared_queue.append(&mut local_queue);
547                 // And if the shared queue now has at least as many regions
548                 // enqueued for decommit as our batch size, then we can flush
549                 // it.
550                 if shared_queue.raw_len() >= self.decommit_batch_size {
551                     self.flush_decommit_queue(shared_queue);
552                 }
553             }
554         }
555     }
556 }
557 
558 unsafe impl InstanceAllocator for PoolingInstanceAllocator {
559     #[cfg(feature = "component-model")]
560     fn validate_component<'a>(
561         &self,
562         component: &Component,
563         offsets: &VMComponentOffsets<HostPtr>,
564         get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
565     ) -> Result<()> {
566         let mut num_core_instances = 0;
567         let mut num_memories = 0;
568         let mut num_tables = 0;
569         let mut core_instances_aggregate_size: usize = 0;
570         for init in &component.initializers {
571             use wasmtime_environ::component::GlobalInitializer::*;
572             use wasmtime_environ::component::InstantiateModule;
573             match init {
574                 InstantiateModule(InstantiateModule::Import(_, _), _) => {
575                     num_core_instances += 1;
576                     // Can't statically account for the total vmctx size, number
577                     // of memories, and number of tables in this component.
578                 }
579                 InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
580                     let module = get_module(*static_module_index);
581                     let offsets = VMOffsets::new(HostPtr, &module);
582                     let layout = Instance::alloc_layout(&offsets);
583                     self.validate_module(module, &offsets)?;
584                     num_core_instances += 1;
585                     num_memories += module.num_defined_memories();
586                     num_tables += module.num_defined_tables();
587                     core_instances_aggregate_size += layout.size();
588                 }
589                 LowerImport { .. }
590                 | ExtractMemory(_)
591                 | ExtractTable(_)
592                 | ExtractRealloc(_)
593                 | ExtractCallback(_)
594                 | ExtractPostReturn(_)
595                 | Resource(_) => {}
596             }
597         }
598 
599         if num_core_instances
600             > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
601         {
602             bail!(
603                 "The component transitively contains {num_core_instances} core module instances, \
604                  which exceeds the configured maximum of {} in the pooling allocator",
605                 self.limits.max_core_instances_per_component
606             );
607         }
608 
609         if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
610             bail!(
611                 "The component transitively contains {num_memories} Wasm linear memories, which \
612                  exceeds the configured maximum of {} in the pooling allocator",
613                 self.limits.max_memories_per_component
614             );
615         }
616 
617         if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
618             bail!(
619                 "The component transitively contains {num_tables} tables, which exceeds the \
620                  configured maximum of {} in the pooling allocator",
621                 self.limits.max_tables_per_component
622             );
623         }
624 
625         self.validate_component_instance_size(offsets, core_instances_aggregate_size)
626             .context("component instance size does not fit in pooling allocator requirements")?;
627 
628         Ok(())
629     }
630 
631     fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
632         self.validate_memory_plans(module)
633             .context("module memory does not fit in pooling allocator requirements")?;
634         self.validate_table_plans(module)
635             .context("module table does not fit in pooling allocator requirements")?;
636         self.validate_core_instance_size(offsets)
637             .context("module instance size does not fit in pooling allocator requirements")?;
638         Ok(())
639     }
640 
641     #[cfg(feature = "gc")]
642     fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
643         self.memories.validate_memory(memory)
644     }
645 
646     #[cfg(feature = "component-model")]
647     fn increment_component_instance_count(&self) -> Result<()> {
648         let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
649         if old_count >= u64::from(self.limits.total_component_instances) {
650             self.decrement_component_instance_count();
651             return Err(PoolConcurrencyLimitError::new(
652                 usize::try_from(self.limits.total_component_instances).unwrap(),
653                 "component instances",
654             )
655             .into());
656         }
657         Ok(())
658     }
659 
660     #[cfg(feature = "component-model")]
661     fn decrement_component_instance_count(&self) {
662         self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
663     }
664 
665     fn increment_core_instance_count(&self) -> Result<()> {
666         let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
667         if old_count >= u64::from(self.limits.total_core_instances) {
668             self.decrement_core_instance_count();
669             return Err(PoolConcurrencyLimitError::new(
670                 usize::try_from(self.limits.total_core_instances).unwrap(),
671                 "core instances",
672             )
673             .into());
674         }
675         Ok(())
676     }
677 
678     fn decrement_core_instance_count(&self) {
679         self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
680     }
681 
682     fn allocate_memory<'a, 'b: 'a, 'c: 'a>(
683         &'a self,
684         request: &'a mut InstanceAllocationRequest<'b, 'c>,
685         ty: &'a wasmtime_environ::Memory,
686         memory_index: Option<DefinedMemoryIndex>,
687     ) -> Result<
688         Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>>,
689         OutOfMemory,
690     > {
691         Ok(Box::into_pin(try_new::<Box<_>>(async move {
692             async {
693                 // FIXME(rust-lang/rust#145127) this should ideally use a version of
694                 // `with_flush_and_retry` but adapted for async closures instead of only
695                 // sync closures. Right now that won't compile though so this is the
696                 // manually expanded version of the method.
697                 let e = match self.memories.allocate(request, ty, memory_index).await {
698                     Ok(result) => return Ok(result),
699                     Err(e) => e,
700                 };
701 
702                 if e.is::<PoolConcurrencyLimitError>() {
703                     let queue = self.decommit_queue.lock().unwrap();
704                     if self.flush_decommit_queue(queue) {
705                         return self.memories.allocate(request, ty, memory_index).await;
706                     }
707                 }
708 
709                 Err(e)
710             }
711             .await
712             .inspect(|_| {
713                 self.live_memories.fetch_add(1, Ordering::Relaxed);
714             })
715         })?))
716     }
717 
718     unsafe fn deallocate_memory(
719         &self,
720         _memory_index: Option<DefinedMemoryIndex>,
721         allocation_index: MemoryAllocationIndex,
722         memory: Memory,
723     ) {
724         let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
725         debug_assert!(prev > 0);
726 
727         // Reset the image slot. If there is any error clearing the
728         // image, just drop it here, and let the drop handler for the
729         // slot unmap in a way that retains the address space
730         // reservation.
731         let mut image = memory.unwrap_static_image();
732         let mut queue = DecommitQueue::default();
733         let bytes_resident = image
734             .clear_and_remain_ready(
735                 self.pagemap.as_ref(),
736                 self.memories.keep_resident,
737                 |ptr, len| {
738                     // SAFETY: the memory in `image` won't be used until this
739                     // decommit queue is flushed, and by definition the memory is
740                     // not in use when calling this function.
741                     unsafe {
742                         queue.push_raw(ptr, len);
743                     }
744                 },
745             )
746             .expect("failed to reset memory image");
747 
748         // SAFETY: this image is not in use and its memory regions were enqueued
749         // with `push_raw` above.
750         unsafe {
751             queue.push_memory(allocation_index, image, bytes_resident);
752         }
753         self.merge_or_flush(queue);
754     }
755 
756     fn allocate_table<'a, 'b: 'a, 'c: 'a>(
757         &'a self,
758         request: &'a mut InstanceAllocationRequest<'b, 'c>,
759         ty: &'a wasmtime_environ::Table,
760         _table_index: DefinedTableIndex,
761     ) -> Result<
762         Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>,
763         OutOfMemory,
764     > {
765         Ok(Box::into_pin(try_new::<Box<_>>(async move {
766             async {
767                 // FIXME: see `allocate_memory` above for comments about duplication
768                 // with `with_flush_and_retry`.
769                 let e = match self.tables.allocate(request, ty).await {
770                     Ok(result) => return Ok(result),
771                     Err(e) => e,
772                 };
773 
774                 if e.is::<PoolConcurrencyLimitError>() {
775                     let queue = self.decommit_queue.lock().unwrap();
776                     if self.flush_decommit_queue(queue) {
777                         return self.tables.allocate(request, ty).await;
778                     }
779                 }
780 
781                 Err(e)
782             }
783             .await
784             .inspect(|_| {
785                 self.live_tables.fetch_add(1, Ordering::Relaxed);
786             })
787         })?))
788     }
789 
790     unsafe fn deallocate_table(
791         &self,
792         _table_index: DefinedTableIndex,
793         allocation_index: TableAllocationIndex,
794         mut table: Table,
795     ) {
796         let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
797         debug_assert!(prev > 0);
798 
799         let mut queue = DecommitQueue::default();
800         // SAFETY: This table is no longer in use by the allocator when this
801         // method is called and additionally all image ranges are pushed with
802         // the understanding that the memory won't get used until the whole
803         // queue is flushed.
804         let bytes_resident = unsafe {
805             self.tables.reset_table_pages_to_zero(
806                 self.pagemap.as_ref(),
807                 allocation_index,
808                 &mut table,
809                 |ptr, len| {
810                     queue.push_raw(ptr, len);
811                 },
812             )
813         };
814 
815         // SAFETY: the table has had all its memory regions enqueued above.
816         unsafe {
817             queue.push_table(allocation_index, table, bytes_resident);
818         }
819         self.merge_or_flush(queue);
820     }
821 
822     #[cfg(feature = "async")]
823     fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
824         let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
825         self.live_stacks.fetch_add(1, Ordering::Relaxed);
826         Ok(ret)
827     }
828 
829     #[cfg(feature = "async")]
830     unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
831         self.live_stacks.fetch_sub(1, Ordering::Relaxed);
832         let mut queue = DecommitQueue::default();
833         // SAFETY: the stack is no longer in use by definition when this
834         // function is called and memory ranges pushed here are otherwise no
835         // longer in use.
836         let bytes_resident = unsafe {
837             self.stacks
838                 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
839         };
840         // SAFETY: this stack's memory regions were enqueued above.
841         unsafe {
842             queue.push_stack(stack, bytes_resident);
843         }
844         self.merge_or_flush(queue);
845     }
846 
847     fn purge_module(&self, module: CompiledModuleId) {
848         self.memories.purge_module(module);
849     }
850 
851     fn next_available_pkey(&self) -> Option<ProtectionKey> {
852         self.memories.next_available_pkey()
853     }
854 
855     fn restrict_to_pkey(&self, pkey: ProtectionKey) {
856         mpk::allow(ProtectionMask::zero().or(pkey));
857     }
858 
859     fn allow_all_pkeys(&self) {
860         mpk::allow(ProtectionMask::all());
861     }
862 
863     #[cfg(feature = "gc")]
864     fn allocate_gc_heap(
865         &self,
866         engine: &crate::Engine,
867         gc_runtime: &dyn GcRuntime,
868         memory_alloc_index: MemoryAllocationIndex,
869         memory: Memory,
870     ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
871         let ret = self
872             .gc_heaps
873             .allocate(engine, gc_runtime, memory_alloc_index, memory)?;
874         self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
875         Ok(ret)
876     }
877 
878     #[cfg(feature = "gc")]
879     fn deallocate_gc_heap(
880         &self,
881         allocation_index: GcHeapAllocationIndex,
882         gc_heap: Box<dyn GcHeap>,
883     ) -> (MemoryAllocationIndex, Memory) {
884         self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
885         self.gc_heaps.deallocate(allocation_index, gc_heap)
886     }
887 
888     fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
889         Some(self)
890     }
891 }
892 
893 #[cfg(test)]
894 #[cfg(target_pointer_width = "64")]
895 mod test {
896     use super::*;
897 
898     #[test]
899     fn test_pooling_allocator_with_memory_pages_exceeded() {
900         let config = PoolingInstanceAllocatorConfig {
901             limits: InstanceLimits {
902                 total_memories: 1,
903                 max_memory_size: 0x100010000,
904                 ..Default::default()
905             },
906             ..PoolingInstanceAllocatorConfig::default()
907         };
908         assert_eq!(
909             PoolingInstanceAllocator::new(
910                 &config,
911                 &Tunables {
912                     memory_reservation: 0x10000,
913                     ..Tunables::default_host()
914                 },
915             )
916             .map_err(|e| e.to_string())
917             .expect_err("expected a failure constructing instance allocator"),
918             "maximum memory size of 0x100010000 bytes exceeds the configured \
919              memory reservation of 0x10000 bytes"
920         );
921     }
922 
923     #[cfg(all(
924         unix,
925         target_pointer_width = "64",
926         feature = "async",
927         not(miri),
928         not(asan)
929     ))]
930     #[test]
931     fn test_stack_zeroed() -> Result<()> {
932         let config = PoolingInstanceAllocatorConfig {
933             max_unused_warm_slots: 0,
934             limits: InstanceLimits {
935                 total_stacks: 1,
936                 total_memories: 0,
937                 total_tables: 0,
938                 ..Default::default()
939             },
940             stack_size: 128,
941             async_stack_zeroing: true,
942             ..PoolingInstanceAllocatorConfig::default()
943         };
944         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
945 
946         unsafe {
947             for _ in 0..255 {
948                 let stack = allocator.allocate_fiber_stack()?;
949 
950                 // The stack pointer is at the top, so decrement it first
951                 let addr = stack.top().unwrap().sub(1);
952 
953                 assert_eq!(*addr, 0);
954                 *addr = 1;
955 
956                 allocator.deallocate_fiber_stack(stack);
957             }
958         }
959 
960         Ok(())
961     }
962 
963     #[cfg(all(
964         unix,
965         target_pointer_width = "64",
966         feature = "async",
967         not(miri),
968         not(asan)
969     ))]
970     #[test]
971     fn test_stack_unzeroed() -> Result<()> {
972         let config = PoolingInstanceAllocatorConfig {
973             max_unused_warm_slots: 0,
974             limits: InstanceLimits {
975                 total_stacks: 1,
976                 total_memories: 0,
977                 total_tables: 0,
978                 ..Default::default()
979             },
980             stack_size: 128,
981             async_stack_zeroing: false,
982             ..PoolingInstanceAllocatorConfig::default()
983         };
984         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
985 
986         unsafe {
987             for i in 0..255 {
988                 let stack = allocator.allocate_fiber_stack()?;
989 
990                 // The stack pointer is at the top, so decrement it first
991                 let addr = stack.top().unwrap().sub(1);
992 
993                 assert_eq!(*addr, i);
994                 *addr = i + 1;
995 
996                 allocator.deallocate_fiber_stack(stack);
997             }
998         }
999 
1000         Ok(())
1001     }
1002 }
1003