1 //! Implements the pooling instance allocator.
2 //!
3 //! The pooling instance allocator maps memory in advance and allocates
4 //! instances, memories, tables, and stacks from a pool of available resources.
5 //! Using the pooling instance allocator can speed up module instantiation when
6 //! modules can be constrained based on configurable limits
7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8 //! are allocated and freed, these slots are either filled or emptied:
9 //!
10 //! ```text
11 //! ┌──────┬──────┬──────┬──────┬──────┐
12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13 //! └──────┴──────┴──────┴──────┴──────┘
14 //! ```
15 //!
16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated
18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19 //! [`stack_pool`]. See those modules for more details.
20 
21 mod decommit_queue;
22 mod index_allocator;
23 mod memory_pool;
24 mod metrics;
25 mod table_pool;
26 
27 #[cfg(feature = "gc")]
28 mod gc_heap_pool;
29 
30 #[cfg(all(feature = "async"))]
31 mod generic_stack_pool;
32 #[cfg(all(feature = "async", unix, not(miri)))]
33 mod unix_stack_pool;
34 
35 #[cfg(all(feature = "async"))]
36 cfg_if::cfg_if! {
37     if #[cfg(all(unix, not(miri), not(asan)))] {
38         use unix_stack_pool as stack_pool;
39     } else {
40         use generic_stack_pool as stack_pool;
41     }
42 }
43 
44 use self::decommit_queue::DecommitQueue;
45 use self::memory_pool::MemoryPool;
46 use self::table_pool::TablePool;
47 use super::{
48     InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49 };
50 use crate::Enabled;
51 use crate::prelude::*;
52 use crate::runtime::vm::{
53     CompiledModuleId, Memory, Table,
54     instance::Instance,
55     mpk::{self, ProtectionKey, ProtectionMask},
56     sys::vm::PageMap,
57 };
58 use core::sync::atomic::AtomicUsize;
59 use std::borrow::Cow;
60 use std::fmt::Display;
61 use std::sync::{Mutex, MutexGuard};
62 use std::{
63     mem,
64     sync::atomic::{AtomicU64, Ordering},
65 };
66 use wasmtime_environ::{
67     DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
68 };
69 
70 pub use self::metrics::PoolingAllocatorMetrics;
71 
72 #[cfg(feature = "gc")]
73 use super::GcHeapAllocationIndex;
74 #[cfg(feature = "gc")]
75 use crate::runtime::vm::{GcHeap, GcRuntime};
76 #[cfg(feature = "gc")]
77 use gc_heap_pool::GcHeapPool;
78 
79 #[cfg(feature = "async")]
80 use stack_pool::StackPool;
81 
82 #[cfg(feature = "component-model")]
83 use wasmtime_environ::{
84     StaticModuleIndex,
85     component::{Component, VMComponentOffsets},
86 };
87 
88 fn round_up_to_pow2(n: usize, to: usize) -> usize {
89     debug_assert!(to > 0);
90     debug_assert!(to.is_power_of_two());
91     (n + to - 1) & !(to - 1)
92 }
93 
94 /// Instance-related limit configuration for pooling.
95 ///
96 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
97 #[derive(Debug, Copy, Clone)]
98 pub struct InstanceLimits {
99     /// The maximum number of component instances that may be allocated
100     /// concurrently.
101     pub total_component_instances: u32,
102 
103     /// The maximum size of a component's `VMComponentContext`, not including
104     /// any of its inner core modules' `VMContext` sizes.
105     pub component_instance_size: usize,
106 
107     /// The maximum number of core module instances that may be allocated
108     /// concurrently.
109     pub total_core_instances: u32,
110 
111     /// The maximum number of core module instances that a single component may
112     /// transitively contain.
113     pub max_core_instances_per_component: u32,
114 
115     /// The maximum number of Wasm linear memories that a component may
116     /// transitively contain.
117     pub max_memories_per_component: u32,
118 
119     /// The maximum number of tables that a component may transitively contain.
120     pub max_tables_per_component: u32,
121 
122     /// The total number of linear memories in the pool, across all instances.
123     pub total_memories: u32,
124 
125     /// The total number of tables in the pool, across all instances.
126     pub total_tables: u32,
127 
128     /// The total number of async stacks in the pool, across all instances.
129     #[cfg(feature = "async")]
130     pub total_stacks: u32,
131 
132     /// Maximum size of a core instance's `VMContext`.
133     pub core_instance_size: usize,
134 
135     /// Maximum number of tables per instance.
136     pub max_tables_per_module: u32,
137 
138     /// Maximum number of word-size elements per table.
139     ///
140     /// Note that tables for element types such as continuations
141     /// that use more than one word of storage may store fewer
142     /// elements.
143     pub table_elements: usize,
144 
145     /// Maximum number of linear memories per instance.
146     pub max_memories_per_module: u32,
147 
148     /// Maximum byte size of a linear memory, must be smaller than
149     /// `memory_reservation` in `Tunables`.
150     pub max_memory_size: usize,
151 
152     /// The total number of GC heaps in the pool, across all instances.
153     #[cfg(feature = "gc")]
154     pub total_gc_heaps: u32,
155 }
156 
157 impl Default for InstanceLimits {
158     fn default() -> Self {
159         let total = if cfg!(target_pointer_width = "32") {
160             100
161         } else {
162             1000
163         };
164         // See doc comments for `wasmtime::PoolingAllocationConfig` for these
165         // default values
166         Self {
167             total_component_instances: total,
168             component_instance_size: 1 << 20, // 1 MiB
169             total_core_instances: total,
170             max_core_instances_per_component: u32::MAX,
171             max_memories_per_component: u32::MAX,
172             max_tables_per_component: u32::MAX,
173             total_memories: total,
174             total_tables: total,
175             #[cfg(feature = "async")]
176             total_stacks: total,
177             core_instance_size: 1 << 20, // 1 MiB
178             max_tables_per_module: 1,
179             // NB: in #8504 it was seen that a C# module in debug module can
180             // have 10k+ elements.
181             table_elements: 20_000,
182             max_memories_per_module: 1,
183             #[cfg(target_pointer_width = "64")]
184             max_memory_size: 1 << 32, // 4G,
185             #[cfg(target_pointer_width = "32")]
186             max_memory_size: 10 << 20, // 10 MiB
187             #[cfg(feature = "gc")]
188             total_gc_heaps: total,
189         }
190     }
191 }
192 
193 /// Configuration options for the pooling instance allocator supplied at
194 /// construction.
195 #[derive(Copy, Clone, Debug)]
196 pub struct PoolingInstanceAllocatorConfig {
197     /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
198     pub max_unused_warm_slots: u32,
199     /// The target number of decommits to do per batch. This is not precise, as
200     /// we can queue up decommits at times when we aren't prepared to
201     /// immediately flush them, and so we may go over this target size
202     /// occasionally.
203     pub decommit_batch_size: usize,
204     /// The size, in bytes, of async stacks to allocate (not including the guard
205     /// page).
206     pub stack_size: usize,
207     /// The limits to apply to instances allocated within this allocator.
208     pub limits: InstanceLimits,
209     /// Whether or not async stacks are zeroed after use.
210     pub async_stack_zeroing: bool,
211     /// If async stack zeroing is enabled and the host platform is Linux this is
212     /// how much memory to zero out with `memset`.
213     ///
214     /// The rest of memory will be zeroed out with `madvise`.
215     #[cfg(feature = "async")]
216     pub async_stack_keep_resident: usize,
217     /// How much linear memory, in bytes, to keep resident after resetting for
218     /// use with the next instance. This much memory will be `memset` to zero
219     /// when a linear memory is deallocated.
220     ///
221     /// Memory exceeding this amount in the wasm linear memory will be released
222     /// with `madvise` back to the kernel.
223     ///
224     /// Only applicable on Linux.
225     pub linear_memory_keep_resident: usize,
226     /// Same as `linear_memory_keep_resident` but for tables.
227     pub table_keep_resident: usize,
228     /// Whether to enable memory protection keys.
229     pub memory_protection_keys: Enabled,
230     /// How many memory protection keys to allocate.
231     pub max_memory_protection_keys: usize,
232     /// Whether to enable PAGEMAP_SCAN on Linux.
233     pub pagemap_scan: Enabled,
234 }
235 
236 impl Default for PoolingInstanceAllocatorConfig {
237     fn default() -> PoolingInstanceAllocatorConfig {
238         PoolingInstanceAllocatorConfig {
239             max_unused_warm_slots: 100,
240             decommit_batch_size: 1,
241             stack_size: 2 << 20,
242             limits: InstanceLimits::default(),
243             async_stack_zeroing: false,
244             #[cfg(feature = "async")]
245             async_stack_keep_resident: 0,
246             linear_memory_keep_resident: 0,
247             table_keep_resident: 0,
248             memory_protection_keys: Enabled::No,
249             max_memory_protection_keys: 16,
250             pagemap_scan: Enabled::No,
251         }
252     }
253 }
254 
255 impl PoolingInstanceAllocatorConfig {
256     pub fn is_pagemap_scan_available() -> bool {
257         PageMap::new().is_some()
258     }
259 }
260 
261 /// An error returned when the pooling allocator cannot allocate a table,
262 /// memory, etc... because the maximum number of concurrent allocations for that
263 /// entity has been reached.
264 #[derive(Debug)]
265 pub struct PoolConcurrencyLimitError {
266     limit: usize,
267     kind: Cow<'static, str>,
268 }
269 
270 impl core::error::Error for PoolConcurrencyLimitError {}
271 
272 impl Display for PoolConcurrencyLimitError {
273     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274         let limit = self.limit;
275         let kind = &self.kind;
276         write!(f, "maximum concurrent limit of {limit} for {kind} reached")
277     }
278 }
279 
280 impl PoolConcurrencyLimitError {
281     fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
282         Self {
283             limit,
284             kind: kind.into(),
285         }
286     }
287 }
288 
289 /// Implements the pooling instance allocator.
290 ///
291 /// This allocator internally maintains pools of instances, memories, tables,
292 /// and stacks.
293 ///
294 /// Note: the resource pools are manually dropped so that the fault handler
295 /// terminates correctly.
296 #[derive(Debug)]
297 pub struct PoolingInstanceAllocator {
298     decommit_batch_size: usize,
299     limits: InstanceLimits,
300 
301     // The number of live core module and component instances at any given
302     // time. Note that this can temporarily go over the configured limit. This
303     // doesn't mean we have actually overshot, but that we attempted to allocate
304     // a new instance and incremented the counter, we've seen (or are about to
305     // see) that the counter is beyond the configured threshold, and are going
306     // to decrement the counter and return an error but haven't done so yet. See
307     // the increment trait methods for more details.
308     live_core_instances: AtomicU64,
309     live_component_instances: AtomicU64,
310 
311     decommit_queue: Mutex<DecommitQueue>,
312 
313     memories: MemoryPool,
314     live_memories: AtomicUsize,
315 
316     tables: TablePool,
317     live_tables: AtomicUsize,
318 
319     #[cfg(feature = "gc")]
320     gc_heaps: GcHeapPool,
321     #[cfg(feature = "gc")]
322     live_gc_heaps: AtomicUsize,
323 
324     #[cfg(feature = "async")]
325     stacks: StackPool,
326     #[cfg(feature = "async")]
327     live_stacks: AtomicUsize,
328 
329     pagemap: Option<PageMap>,
330 }
331 
332 impl Drop for PoolingInstanceAllocator {
333     fn drop(&mut self) {
334         if !cfg!(debug_assertions) {
335             return;
336         }
337 
338         // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
339         // the queue, as the sub-pools will unmap those ranges anyways, so
340         // there's no point in decommitting them. But we do need to flush the
341         // queue when debug assertions are enabled to make sure that all
342         // entities get returned to their associated sub-pools and we can
343         // differentiate between a leaking slot and an enqueued-for-decommit
344         // slot.
345         let queue = self.decommit_queue.lock().unwrap();
346         self.flush_decommit_queue(queue);
347 
348         debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
349         debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
350         debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
351         debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
352 
353         debug_assert!(self.memories.is_empty());
354         debug_assert!(self.tables.is_empty());
355 
356         #[cfg(feature = "gc")]
357         {
358             debug_assert!(self.gc_heaps.is_empty());
359             debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
360         }
361 
362         #[cfg(feature = "async")]
363         {
364             debug_assert!(self.stacks.is_empty());
365             debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
366         }
367     }
368 }
369 
370 impl PoolingInstanceAllocator {
371     /// Creates a new pooling instance allocator with the given strategy and limits.
372     pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
373         Ok(Self {
374             decommit_batch_size: config.decommit_batch_size,
375             limits: config.limits,
376             live_component_instances: AtomicU64::new(0),
377             live_core_instances: AtomicU64::new(0),
378             decommit_queue: Mutex::new(DecommitQueue::default()),
379             memories: MemoryPool::new(config, tunables)?,
380             live_memories: AtomicUsize::new(0),
381             tables: TablePool::new(config)?,
382             live_tables: AtomicUsize::new(0),
383             #[cfg(feature = "gc")]
384             gc_heaps: GcHeapPool::new(config)?,
385             #[cfg(feature = "gc")]
386             live_gc_heaps: AtomicUsize::new(0),
387             #[cfg(feature = "async")]
388             stacks: StackPool::new(config)?,
389             #[cfg(feature = "async")]
390             live_stacks: AtomicUsize::new(0),
391             pagemap: match config.pagemap_scan {
392                 Enabled::Auto => PageMap::new(),
393                 Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
394                     format_err!(
395                         "required to enable PAGEMAP_SCAN but this system \
396                          does not support it"
397                     )
398                 })?),
399                 Enabled::No => None,
400             },
401         })
402     }
403 
404     fn core_instance_size(&self) -> usize {
405         round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
406     }
407 
408     fn validate_table_plans(&self, module: &Module) -> Result<()> {
409         self.tables.validate(module)
410     }
411 
412     fn validate_memory_plans(&self, module: &Module) -> Result<()> {
413         self.memories.validate_memories(module)
414     }
415 
416     fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
417         let layout = Instance::alloc_layout(offsets);
418         if layout.size() <= self.core_instance_size() {
419             return Ok(());
420         }
421 
422         // If this `module` exceeds the allocation size allotted to it then an
423         // error will be reported here. The error of "required N bytes but
424         // cannot allocate that" is pretty opaque, however, because it's not
425         // clear what the breakdown of the N bytes are and what to optimize
426         // next. To help provide a better error message here some fancy-ish
427         // logic is done here to report the breakdown of the byte request into
428         // the largest portions and where it's coming from.
429         let mut message = format!(
430             "instance allocation for this module \
431              requires {} bytes which exceeds the configured maximum \
432              of {} bytes; breakdown of allocation requirement:\n\n",
433             layout.size(),
434             self.core_instance_size(),
435         );
436 
437         let mut remaining = layout.size();
438         let mut push = |name: &str, bytes: usize| {
439             assert!(remaining >= bytes);
440             remaining -= bytes;
441 
442             // If the `name` region is more than 5% of the allocation request
443             // then report it here, otherwise ignore it. We have less than 20
444             // fields so we're guaranteed that something should be reported, and
445             // otherwise it's not particularly interesting to learn about 5
446             // different fields that are all 8 or 0 bytes. Only try to report
447             // the "major" sources of bytes here.
448             if bytes > layout.size() / 20 {
449                 message.push_str(&format!(
450                     " * {:.02}% - {} bytes - {}\n",
451                     ((bytes as f32) / (layout.size() as f32)) * 100.0,
452                     bytes,
453                     name,
454                 ));
455             }
456         };
457 
458         // The `Instance` itself requires some size allocated to it.
459         push("instance state management", mem::size_of::<Instance>());
460 
461         // Afterwards the `VMContext`'s regions are why we're requesting bytes,
462         // so ask it for descriptions on each region's byte size.
463         for (desc, size) in offsets.region_sizes() {
464             push(desc, size as usize);
465         }
466 
467         // double-check we accounted for all the bytes
468         assert_eq!(remaining, 0);
469 
470         bail!("{message}")
471     }
472 
473     #[cfg(feature = "component-model")]
474     fn validate_component_instance_size(
475         &self,
476         offsets: &VMComponentOffsets<HostPtr>,
477     ) -> Result<()> {
478         if usize::try_from(offsets.size_of_vmctx()).unwrap() <= self.limits.component_instance_size
479         {
480             return Ok(());
481         }
482 
483         // TODO: Add context with detailed accounting of what makes up all the
484         // `VMComponentContext`'s space like we do for module instances.
485         bail!(
486             "instance allocation for this component requires {} bytes of `VMComponentContext` \
487              space which exceeds the configured maximum of {} bytes",
488             offsets.size_of_vmctx(),
489             self.limits.component_instance_size
490         )
491     }
492 
493     fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
494         // Take the queue out of the mutex and drop the lock, to minimize
495         // contention.
496         let queue = mem::take(&mut *locked_queue);
497         drop(locked_queue);
498         queue.flush(self)
499     }
500 
501     /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
502     /// flushing the decommit queue. If flushing the queue freed up slots, then
503     /// try running `f` again.
504     #[cfg(feature = "async")]
505     fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
506         f().or_else(|e| {
507             if e.is::<PoolConcurrencyLimitError>() {
508                 let queue = self.decommit_queue.lock().unwrap();
509                 if self.flush_decommit_queue(queue) {
510                     return f();
511                 }
512             }
513 
514             Err(e)
515         })
516     }
517 
518     fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
519         match local_queue.raw_len() {
520             // If we didn't enqueue any regions for decommit, then we must have
521             // either memset the whole entity or eagerly remapped it to zero
522             // because we don't have linux's `madvise(DONTNEED)` semantics. In
523             // either case, the entity slot is ready for reuse immediately.
524             0 => {
525                 local_queue.flush(self);
526             }
527 
528             // We enqueued at least our batch size of regions for decommit, so
529             // flush the local queue immediately. Don't bother inspecting (or
530             // locking!) the shared queue.
531             n if n >= self.decommit_batch_size => {
532                 local_queue.flush(self);
533             }
534 
535             // If we enqueued some regions for decommit, but did not reach our
536             // batch size, so we don't want to flush it yet, then merge the
537             // local queue into the shared queue.
538             n => {
539                 debug_assert!(n < self.decommit_batch_size);
540                 let mut shared_queue = self.decommit_queue.lock().unwrap();
541                 shared_queue.append(&mut local_queue);
542                 // And if the shared queue now has at least as many regions
543                 // enqueued for decommit as our batch size, then we can flush
544                 // it.
545                 if shared_queue.raw_len() >= self.decommit_batch_size {
546                     self.flush_decommit_queue(shared_queue);
547                 }
548             }
549         }
550     }
551 }
552 
553 #[async_trait::async_trait]
554 unsafe impl InstanceAllocator for PoolingInstanceAllocator {
555     #[cfg(feature = "component-model")]
556     fn validate_component<'a>(
557         &self,
558         component: &Component,
559         offsets: &VMComponentOffsets<HostPtr>,
560         get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
561     ) -> Result<()> {
562         self.validate_component_instance_size(offsets)
563             .context("component instance size does not fit in pooling allocator requirements")?;
564 
565         let mut num_core_instances = 0;
566         let mut num_memories = 0;
567         let mut num_tables = 0;
568         for init in &component.initializers {
569             use wasmtime_environ::component::GlobalInitializer::*;
570             use wasmtime_environ::component::InstantiateModule;
571             match init {
572                 InstantiateModule(InstantiateModule::Import(_, _), _) => {
573                     num_core_instances += 1;
574                     // Can't statically account for the total vmctx size, number
575                     // of memories, and number of tables in this component.
576                 }
577                 InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
578                     let module = get_module(*static_module_index);
579                     let offsets = VMOffsets::new(HostPtr, &module);
580                     self.validate_module(module, &offsets)?;
581                     num_core_instances += 1;
582                     num_memories += module.num_defined_memories();
583                     num_tables += module.num_defined_tables();
584                 }
585                 LowerImport { .. }
586                 | ExtractMemory(_)
587                 | ExtractTable(_)
588                 | ExtractRealloc(_)
589                 | ExtractCallback(_)
590                 | ExtractPostReturn(_)
591                 | Resource(_) => {}
592             }
593         }
594 
595         if num_core_instances
596             > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
597         {
598             bail!(
599                 "The component transitively contains {num_core_instances} core module instances, \
600                  which exceeds the configured maximum of {} in the pooling allocator",
601                 self.limits.max_core_instances_per_component
602             );
603         }
604 
605         if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
606             bail!(
607                 "The component transitively contains {num_memories} Wasm linear memories, which \
608                  exceeds the configured maximum of {} in the pooling allocator",
609                 self.limits.max_memories_per_component
610             );
611         }
612 
613         if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
614             bail!(
615                 "The component transitively contains {num_tables} tables, which exceeds the \
616                  configured maximum of {} in the pooling allocator",
617                 self.limits.max_tables_per_component
618             );
619         }
620 
621         Ok(())
622     }
623 
624     fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
625         self.validate_memory_plans(module)
626             .context("module memory does not fit in pooling allocator requirements")?;
627         self.validate_table_plans(module)
628             .context("module table does not fit in pooling allocator requirements")?;
629         self.validate_core_instance_size(offsets)
630             .context("module instance size does not fit in pooling allocator requirements")?;
631         Ok(())
632     }
633 
634     #[cfg(feature = "gc")]
635     fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
636         self.memories.validate_memory(memory)
637     }
638 
639     #[cfg(feature = "component-model")]
640     fn increment_component_instance_count(&self) -> Result<()> {
641         let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
642         if old_count >= u64::from(self.limits.total_component_instances) {
643             self.decrement_component_instance_count();
644             return Err(PoolConcurrencyLimitError::new(
645                 usize::try_from(self.limits.total_component_instances).unwrap(),
646                 "component instances",
647             )
648             .into());
649         }
650         Ok(())
651     }
652 
653     #[cfg(feature = "component-model")]
654     fn decrement_component_instance_count(&self) {
655         self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
656     }
657 
658     fn increment_core_instance_count(&self) -> Result<()> {
659         let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
660         if old_count >= u64::from(self.limits.total_core_instances) {
661             self.decrement_core_instance_count();
662             return Err(PoolConcurrencyLimitError::new(
663                 usize::try_from(self.limits.total_core_instances).unwrap(),
664                 "core instances",
665             )
666             .into());
667         }
668         Ok(())
669     }
670 
671     fn decrement_core_instance_count(&self) {
672         self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
673     }
674 
675     async fn allocate_memory(
676         &self,
677         request: &mut InstanceAllocationRequest<'_, '_>,
678         ty: &wasmtime_environ::Memory,
679         memory_index: Option<DefinedMemoryIndex>,
680     ) -> Result<(MemoryAllocationIndex, Memory)> {
681         async {
682             // FIXME(rust-lang/rust#145127) this should ideally use a version of
683             // `with_flush_and_retry` but adapted for async closures instead of only
684             // sync closures. Right now that won't compile though so this is the
685             // manually expanded version of the method.
686             let e = match self.memories.allocate(request, ty, memory_index).await {
687                 Ok(result) => return Ok(result),
688                 Err(e) => e,
689             };
690 
691             if e.is::<PoolConcurrencyLimitError>() {
692                 let queue = self.decommit_queue.lock().unwrap();
693                 if self.flush_decommit_queue(queue) {
694                     return self.memories.allocate(request, ty, memory_index).await;
695                 }
696             }
697 
698             Err(e)
699         }
700         .await
701         .inspect(|_| {
702             self.live_memories.fetch_add(1, Ordering::Relaxed);
703         })
704     }
705 
706     unsafe fn deallocate_memory(
707         &self,
708         _memory_index: Option<DefinedMemoryIndex>,
709         allocation_index: MemoryAllocationIndex,
710         memory: Memory,
711     ) {
712         let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
713         debug_assert!(prev > 0);
714 
715         // Reset the image slot. If there is any error clearing the
716         // image, just drop it here, and let the drop handler for the
717         // slot unmap in a way that retains the address space
718         // reservation.
719         let mut image = memory.unwrap_static_image();
720         let mut queue = DecommitQueue::default();
721         let bytes_resident = image
722             .clear_and_remain_ready(
723                 self.pagemap.as_ref(),
724                 self.memories.keep_resident,
725                 |ptr, len| {
726                     // SAFETY: the memory in `image` won't be used until this
727                     // decommit queue is flushed, and by definition the memory is
728                     // not in use when calling this function.
729                     unsafe {
730                         queue.push_raw(ptr, len);
731                     }
732                 },
733             )
734             .expect("failed to reset memory image");
735 
736         // SAFETY: this image is not in use and its memory regions were enqueued
737         // with `push_raw` above.
738         unsafe {
739             queue.push_memory(allocation_index, image, bytes_resident);
740         }
741         self.merge_or_flush(queue);
742     }
743 
744     async fn allocate_table(
745         &self,
746         request: &mut InstanceAllocationRequest<'_, '_>,
747         ty: &wasmtime_environ::Table,
748         _table_index: DefinedTableIndex,
749     ) -> Result<(super::TableAllocationIndex, Table)> {
750         async {
751             // FIXME: see `allocate_memory` above for comments about duplication
752             // with `with_flush_and_retry`.
753             let e = match self.tables.allocate(request, ty).await {
754                 Ok(result) => return Ok(result),
755                 Err(e) => e,
756             };
757 
758             if e.is::<PoolConcurrencyLimitError>() {
759                 let queue = self.decommit_queue.lock().unwrap();
760                 if self.flush_decommit_queue(queue) {
761                     return self.tables.allocate(request, ty).await;
762                 }
763             }
764 
765             Err(e)
766         }
767         .await
768         .inspect(|_| {
769             self.live_tables.fetch_add(1, Ordering::Relaxed);
770         })
771     }
772 
773     unsafe fn deallocate_table(
774         &self,
775         _table_index: DefinedTableIndex,
776         allocation_index: TableAllocationIndex,
777         mut table: Table,
778     ) {
779         let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
780         debug_assert!(prev > 0);
781 
782         let mut queue = DecommitQueue::default();
783         // SAFETY: This table is no longer in use by the allocator when this
784         // method is called and additionally all image ranges are pushed with
785         // the understanding that the memory won't get used until the whole
786         // queue is flushed.
787         let bytes_resident = unsafe {
788             self.tables.reset_table_pages_to_zero(
789                 self.pagemap.as_ref(),
790                 allocation_index,
791                 &mut table,
792                 |ptr, len| {
793                     queue.push_raw(ptr, len);
794                 },
795             )
796         };
797 
798         // SAFETY: the table has had all its memory regions enqueued above.
799         unsafe {
800             queue.push_table(allocation_index, table, bytes_resident);
801         }
802         self.merge_or_flush(queue);
803     }
804 
805     #[cfg(feature = "async")]
806     fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
807         let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
808         self.live_stacks.fetch_add(1, Ordering::Relaxed);
809         Ok(ret)
810     }
811 
812     #[cfg(feature = "async")]
813     unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
814         self.live_stacks.fetch_sub(1, Ordering::Relaxed);
815         let mut queue = DecommitQueue::default();
816         // SAFETY: the stack is no longer in use by definition when this
817         // function is called and memory ranges pushed here are otherwise no
818         // longer in use.
819         let bytes_resident = unsafe {
820             self.stacks
821                 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
822         };
823         // SAFETY: this stack's memory regions were enqueued above.
824         unsafe {
825             queue.push_stack(stack, bytes_resident);
826         }
827         self.merge_or_flush(queue);
828     }
829 
830     fn purge_module(&self, module: CompiledModuleId) {
831         self.memories.purge_module(module);
832     }
833 
834     fn next_available_pkey(&self) -> Option<ProtectionKey> {
835         self.memories.next_available_pkey()
836     }
837 
838     fn restrict_to_pkey(&self, pkey: ProtectionKey) {
839         mpk::allow(ProtectionMask::zero().or(pkey));
840     }
841 
842     fn allow_all_pkeys(&self) {
843         mpk::allow(ProtectionMask::all());
844     }
845 
846     #[cfg(feature = "gc")]
847     fn allocate_gc_heap(
848         &self,
849         engine: &crate::Engine,
850         gc_runtime: &dyn GcRuntime,
851         memory_alloc_index: MemoryAllocationIndex,
852         memory: Memory,
853     ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
854         let ret = self
855             .gc_heaps
856             .allocate(engine, gc_runtime, memory_alloc_index, memory)?;
857         self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
858         Ok(ret)
859     }
860 
861     #[cfg(feature = "gc")]
862     fn deallocate_gc_heap(
863         &self,
864         allocation_index: GcHeapAllocationIndex,
865         gc_heap: Box<dyn GcHeap>,
866     ) -> (MemoryAllocationIndex, Memory) {
867         self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
868         self.gc_heaps.deallocate(allocation_index, gc_heap)
869     }
870 
871     fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
872         Some(self)
873     }
874 }
875 
876 #[cfg(test)]
877 #[cfg(target_pointer_width = "64")]
878 mod test {
879     use super::*;
880 
881     #[test]
882     fn test_pooling_allocator_with_memory_pages_exceeded() {
883         let config = PoolingInstanceAllocatorConfig {
884             limits: InstanceLimits {
885                 total_memories: 1,
886                 max_memory_size: 0x100010000,
887                 ..Default::default()
888             },
889             ..PoolingInstanceAllocatorConfig::default()
890         };
891         assert_eq!(
892             PoolingInstanceAllocator::new(
893                 &config,
894                 &Tunables {
895                     memory_reservation: 0x10000,
896                     ..Tunables::default_host()
897                 },
898             )
899             .map_err(|e| e.to_string())
900             .expect_err("expected a failure constructing instance allocator"),
901             "maximum memory size of 0x100010000 bytes exceeds the configured \
902              memory reservation of 0x10000 bytes"
903         );
904     }
905 
906     #[cfg(all(
907         unix,
908         target_pointer_width = "64",
909         feature = "async",
910         not(miri),
911         not(asan)
912     ))]
913     #[test]
914     fn test_stack_zeroed() -> Result<()> {
915         let config = PoolingInstanceAllocatorConfig {
916             max_unused_warm_slots: 0,
917             limits: InstanceLimits {
918                 total_stacks: 1,
919                 total_memories: 0,
920                 total_tables: 0,
921                 ..Default::default()
922             },
923             stack_size: 128,
924             async_stack_zeroing: true,
925             ..PoolingInstanceAllocatorConfig::default()
926         };
927         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
928 
929         unsafe {
930             for _ in 0..255 {
931                 let stack = allocator.allocate_fiber_stack()?;
932 
933                 // The stack pointer is at the top, so decrement it first
934                 let addr = stack.top().unwrap().sub(1);
935 
936                 assert_eq!(*addr, 0);
937                 *addr = 1;
938 
939                 allocator.deallocate_fiber_stack(stack);
940             }
941         }
942 
943         Ok(())
944     }
945 
946     #[cfg(all(
947         unix,
948         target_pointer_width = "64",
949         feature = "async",
950         not(miri),
951         not(asan)
952     ))]
953     #[test]
954     fn test_stack_unzeroed() -> Result<()> {
955         let config = PoolingInstanceAllocatorConfig {
956             max_unused_warm_slots: 0,
957             limits: InstanceLimits {
958                 total_stacks: 1,
959                 total_memories: 0,
960                 total_tables: 0,
961                 ..Default::default()
962             },
963             stack_size: 128,
964             async_stack_zeroing: false,
965             ..PoolingInstanceAllocatorConfig::default()
966         };
967         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
968 
969         unsafe {
970             for i in 0..255 {
971                 let stack = allocator.allocate_fiber_stack()?;
972 
973                 // The stack pointer is at the top, so decrement it first
974                 let addr = stack.top().unwrap().sub(1);
975 
976                 assert_eq!(*addr, i);
977                 *addr = i + 1;
978 
979                 allocator.deallocate_fiber_stack(stack);
980             }
981         }
982 
983         Ok(())
984     }
985 }
986