1 //! Implements the pooling instance allocator.
2 //!
3 //! The pooling instance allocator maps memory in advance and allocates
4 //! instances, memories, tables, and stacks from a pool of available resources.
5 //! Using the pooling instance allocator can speed up module instantiation when
6 //! modules can be constrained based on configurable limits
7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8 //! are allocated and freed, these slots are either filled or emptied:
9 //!
10 //! ```text
11 //! ┌──────┬──────┬──────┬──────┬──────┐
12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13 //! └──────┴──────┴──────┴──────┴──────┘
14 //! ```
15 //!
16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated
18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19 //! [`stack_pool`]. See those modules for more details.
20 
21 mod decommit_queue;
22 mod index_allocator;
23 mod memory_pool;
24 mod metrics;
25 mod table_pool;
26 
27 #[cfg(feature = "gc")]
28 mod gc_heap_pool;
29 
30 #[cfg(all(feature = "async"))]
31 mod generic_stack_pool;
32 #[cfg(all(feature = "async", unix, not(miri)))]
33 mod unix_stack_pool;
34 
35 #[cfg(all(feature = "async"))]
36 cfg_if::cfg_if! {
37     if #[cfg(all(unix, not(miri), not(asan)))] {
38         use unix_stack_pool as stack_pool;
39     } else {
40         use generic_stack_pool as stack_pool;
41     }
42 }
43 
44 use self::decommit_queue::DecommitQueue;
45 use self::memory_pool::MemoryPool;
46 use self::table_pool::TablePool;
47 use super::{
48     InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49 };
50 use crate::Enabled;
51 use crate::prelude::*;
52 use crate::runtime::vm::{
53     CompiledModuleId, Memory, Table,
54     instance::Instance,
55     mpk::{self, ProtectionKey, ProtectionMask},
56     sys::vm::PageMap,
57 };
58 use core::sync::atomic::AtomicUsize;
59 use std::borrow::Cow;
60 use std::fmt::Display;
61 use std::sync::{Mutex, MutexGuard};
62 use std::{
63     mem,
64     sync::atomic::{AtomicU64, Ordering},
65 };
66 use wasmtime_environ::{
67     DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
68 };
69 
70 pub use self::metrics::PoolingAllocatorMetrics;
71 
72 #[cfg(feature = "gc")]
73 use super::GcHeapAllocationIndex;
74 #[cfg(feature = "gc")]
75 use crate::runtime::vm::{GcHeap, GcRuntime};
76 #[cfg(feature = "gc")]
77 use gc_heap_pool::GcHeapPool;
78 
79 #[cfg(feature = "async")]
80 use stack_pool::StackPool;
81 
82 #[cfg(feature = "component-model")]
83 use wasmtime_environ::{
84     StaticModuleIndex,
85     component::{Component, VMComponentOffsets},
86 };
87 
88 fn round_up_to_pow2(n: usize, to: usize) -> usize {
89     debug_assert!(to > 0);
90     debug_assert!(to.is_power_of_two());
91     (n + to - 1) & !(to - 1)
92 }
93 
94 /// Instance-related limit configuration for pooling.
95 ///
96 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
97 #[derive(Debug, Copy, Clone)]
98 pub struct InstanceLimits {
99     /// The maximum number of component instances that may be allocated
100     /// concurrently.
101     pub total_component_instances: u32,
102 
103     /// The maximum size of a component's `VMComponentContext`, not including
104     /// any of its inner core modules' `VMContext` sizes.
105     pub component_instance_size: usize,
106 
107     /// The maximum number of core module instances that may be allocated
108     /// concurrently.
109     pub total_core_instances: u32,
110 
111     /// The maximum number of core module instances that a single component may
112     /// transitively contain.
113     pub max_core_instances_per_component: u32,
114 
115     /// The maximum number of Wasm linear memories that a component may
116     /// transitively contain.
117     pub max_memories_per_component: u32,
118 
119     /// The maximum number of tables that a component may transitively contain.
120     pub max_tables_per_component: u32,
121 
122     /// The total number of linear memories in the pool, across all instances.
123     pub total_memories: u32,
124 
125     /// The total number of tables in the pool, across all instances.
126     pub total_tables: u32,
127 
128     /// The total number of async stacks in the pool, across all instances.
129     #[cfg(feature = "async")]
130     pub total_stacks: u32,
131 
132     /// Maximum size of a core instance's `VMContext`.
133     pub core_instance_size: usize,
134 
135     /// Maximum number of tables per instance.
136     pub max_tables_per_module: u32,
137 
138     /// Maximum number of word-size elements per table.
139     ///
140     /// Note that tables for element types such as continuations
141     /// that use more than one word of storage may store fewer
142     /// elements.
143     pub table_elements: usize,
144 
145     /// Maximum number of linear memories per instance.
146     pub max_memories_per_module: u32,
147 
148     /// Maximum byte size of a linear memory, must be smaller than
149     /// `memory_reservation` in `Tunables`.
150     pub max_memory_size: usize,
151 
152     /// The total number of GC heaps in the pool, across all instances.
153     #[cfg(feature = "gc")]
154     pub total_gc_heaps: u32,
155 }
156 
157 impl Default for InstanceLimits {
158     fn default() -> Self {
159         let total = if cfg!(target_pointer_width = "32") {
160             100
161         } else {
162             1000
163         };
164         // See doc comments for `wasmtime::PoolingAllocationConfig` for these
165         // default values
166         Self {
167             total_component_instances: total,
168             component_instance_size: 1 << 20, // 1 MiB
169             total_core_instances: total,
170             max_core_instances_per_component: u32::MAX,
171             max_memories_per_component: u32::MAX,
172             max_tables_per_component: u32::MAX,
173             total_memories: total,
174             total_tables: total,
175             #[cfg(feature = "async")]
176             total_stacks: total,
177             core_instance_size: 1 << 20, // 1 MiB
178             max_tables_per_module: 1,
179             // NB: in #8504 it was seen that a C# module in debug module can
180             // have 10k+ elements.
181             table_elements: 20_000,
182             max_memories_per_module: 1,
183             #[cfg(target_pointer_width = "64")]
184             max_memory_size: 1 << 32, // 4G,
185             #[cfg(target_pointer_width = "32")]
186             max_memory_size: 10 << 20, // 10 MiB
187             #[cfg(feature = "gc")]
188             total_gc_heaps: total,
189         }
190     }
191 }
192 
193 /// Configuration options for the pooling instance allocator supplied at
194 /// construction.
195 #[derive(Copy, Clone, Debug)]
196 pub struct PoolingInstanceAllocatorConfig {
197     /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
198     pub max_unused_warm_slots: u32,
199     /// The target number of decommits to do per batch. This is not precise, as
200     /// we can queue up decommits at times when we aren't prepared to
201     /// immediately flush them, and so we may go over this target size
202     /// occasionally.
203     pub decommit_batch_size: usize,
204     /// The size, in bytes, of async stacks to allocate (not including the guard
205     /// page).
206     pub stack_size: usize,
207     /// The limits to apply to instances allocated within this allocator.
208     pub limits: InstanceLimits,
209     /// Whether or not async stacks are zeroed after use.
210     pub async_stack_zeroing: bool,
211     /// If async stack zeroing is enabled and the host platform is Linux this is
212     /// how much memory to zero out with `memset`.
213     ///
214     /// The rest of memory will be zeroed out with `madvise`.
215     #[cfg(feature = "async")]
216     pub async_stack_keep_resident: usize,
217     /// How much linear memory, in bytes, to keep resident after resetting for
218     /// use with the next instance. This much memory will be `memset` to zero
219     /// when a linear memory is deallocated.
220     ///
221     /// Memory exceeding this amount in the wasm linear memory will be released
222     /// with `madvise` back to the kernel.
223     ///
224     /// Only applicable on Linux.
225     pub linear_memory_keep_resident: usize,
226     /// Same as `linear_memory_keep_resident` but for tables.
227     pub table_keep_resident: usize,
228     /// Whether to enable memory protection keys.
229     pub memory_protection_keys: Enabled,
230     /// How many memory protection keys to allocate.
231     pub max_memory_protection_keys: usize,
232     /// Whether to enable PAGEMAP_SCAN on Linux.
233     pub pagemap_scan: Enabled,
234 }
235 
236 impl Default for PoolingInstanceAllocatorConfig {
237     fn default() -> PoolingInstanceAllocatorConfig {
238         PoolingInstanceAllocatorConfig {
239             max_unused_warm_slots: 100,
240             decommit_batch_size: 1,
241             stack_size: 2 << 20,
242             limits: InstanceLimits::default(),
243             async_stack_zeroing: false,
244             #[cfg(feature = "async")]
245             async_stack_keep_resident: 0,
246             linear_memory_keep_resident: 0,
247             table_keep_resident: 0,
248             memory_protection_keys: Enabled::No,
249             max_memory_protection_keys: 16,
250             pagemap_scan: Enabled::No,
251         }
252     }
253 }
254 
255 impl PoolingInstanceAllocatorConfig {
256     pub fn is_pagemap_scan_available() -> bool {
257         PageMap::new().is_some()
258     }
259 }
260 
261 /// An error returned when the pooling allocator cannot allocate a table,
262 /// memory, etc... because the maximum number of concurrent allocations for that
263 /// entity has been reached.
264 #[derive(Debug)]
265 pub struct PoolConcurrencyLimitError {
266     limit: usize,
267     kind: Cow<'static, str>,
268 }
269 
270 impl core::error::Error for PoolConcurrencyLimitError {}
271 
272 impl Display for PoolConcurrencyLimitError {
273     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274         let limit = self.limit;
275         let kind = &self.kind;
276         write!(f, "maximum concurrent limit of {limit} for {kind} reached")
277     }
278 }
279 
280 impl PoolConcurrencyLimitError {
281     fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
282         Self {
283             limit,
284             kind: kind.into(),
285         }
286     }
287 }
288 
289 /// Implements the pooling instance allocator.
290 ///
291 /// This allocator internally maintains pools of instances, memories, tables,
292 /// and stacks.
293 ///
294 /// Note: the resource pools are manually dropped so that the fault handler
295 /// terminates correctly.
296 #[derive(Debug)]
297 pub struct PoolingInstanceAllocator {
298     decommit_batch_size: usize,
299     limits: InstanceLimits,
300 
301     // The number of live core module and component instances at any given
302     // time. Note that this can temporarily go over the configured limit. This
303     // doesn't mean we have actually overshot, but that we attempted to allocate
304     // a new instance and incremented the counter, we've seen (or are about to
305     // see) that the counter is beyond the configured threshold, and are going
306     // to decrement the counter and return an error but haven't done so yet. See
307     // the increment trait methods for more details.
308     live_core_instances: AtomicU64,
309     live_component_instances: AtomicU64,
310 
311     decommit_queue: Mutex<DecommitQueue>,
312 
313     memories: MemoryPool,
314     live_memories: AtomicUsize,
315 
316     tables: TablePool,
317     live_tables: AtomicUsize,
318 
319     #[cfg(feature = "gc")]
320     gc_heaps: GcHeapPool,
321 
322     #[cfg(feature = "async")]
323     stacks: StackPool,
324 
325     pagemap: Option<PageMap>,
326 }
327 
328 impl Drop for PoolingInstanceAllocator {
329     fn drop(&mut self) {
330         if !cfg!(debug_assertions) {
331             return;
332         }
333 
334         // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
335         // the queue, as the sub-pools will unmap those ranges anyways, so
336         // there's no point in decommitting them. But we do need to flush the
337         // queue when debug assertions are enabled to make sure that all
338         // entities get returned to their associated sub-pools and we can
339         // differentiate between a leaking slot and an enqueued-for-decommit
340         // slot.
341         let queue = self.decommit_queue.lock().unwrap();
342         self.flush_decommit_queue(queue);
343 
344         debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
345         debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
346         debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
347         debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
348 
349         debug_assert!(self.memories.is_empty());
350         debug_assert!(self.tables.is_empty());
351 
352         #[cfg(feature = "gc")]
353         debug_assert!(self.gc_heaps.is_empty());
354 
355         #[cfg(feature = "async")]
356         debug_assert!(self.stacks.is_empty());
357     }
358 }
359 
360 impl PoolingInstanceAllocator {
361     /// Creates a new pooling instance allocator with the given strategy and limits.
362     pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
363         Ok(Self {
364             decommit_batch_size: config.decommit_batch_size,
365             limits: config.limits,
366             live_component_instances: AtomicU64::new(0),
367             live_core_instances: AtomicU64::new(0),
368             decommit_queue: Mutex::new(DecommitQueue::default()),
369             memories: MemoryPool::new(config, tunables)?,
370             live_memories: AtomicUsize::new(0),
371             tables: TablePool::new(config)?,
372             live_tables: AtomicUsize::new(0),
373             #[cfg(feature = "gc")]
374             gc_heaps: GcHeapPool::new(config)?,
375             #[cfg(feature = "async")]
376             stacks: StackPool::new(config)?,
377             pagemap: match config.pagemap_scan {
378                 Enabled::Auto => PageMap::new(),
379                 Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
380                     anyhow!(
381                         "required to enable PAGEMAP_SCAN but this system \
382                          does not support it"
383                     )
384                 })?),
385                 Enabled::No => None,
386             },
387         })
388     }
389 
390     fn core_instance_size(&self) -> usize {
391         round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
392     }
393 
394     fn validate_table_plans(&self, module: &Module) -> Result<()> {
395         self.tables.validate(module)
396     }
397 
398     fn validate_memory_plans(&self, module: &Module) -> Result<()> {
399         self.memories.validate_memories(module)
400     }
401 
402     fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
403         let layout = Instance::alloc_layout(offsets);
404         if layout.size() <= self.core_instance_size() {
405             return Ok(());
406         }
407 
408         // If this `module` exceeds the allocation size allotted to it then an
409         // error will be reported here. The error of "required N bytes but
410         // cannot allocate that" is pretty opaque, however, because it's not
411         // clear what the breakdown of the N bytes are and what to optimize
412         // next. To help provide a better error message here some fancy-ish
413         // logic is done here to report the breakdown of the byte request into
414         // the largest portions and where it's coming from.
415         let mut message = format!(
416             "instance allocation for this module \
417              requires {} bytes which exceeds the configured maximum \
418              of {} bytes; breakdown of allocation requirement:\n\n",
419             layout.size(),
420             self.core_instance_size(),
421         );
422 
423         let mut remaining = layout.size();
424         let mut push = |name: &str, bytes: usize| {
425             assert!(remaining >= bytes);
426             remaining -= bytes;
427 
428             // If the `name` region is more than 5% of the allocation request
429             // then report it here, otherwise ignore it. We have less than 20
430             // fields so we're guaranteed that something should be reported, and
431             // otherwise it's not particularly interesting to learn about 5
432             // different fields that are all 8 or 0 bytes. Only try to report
433             // the "major" sources of bytes here.
434             if bytes > layout.size() / 20 {
435                 message.push_str(&format!(
436                     " * {:.02}% - {} bytes - {}\n",
437                     ((bytes as f32) / (layout.size() as f32)) * 100.0,
438                     bytes,
439                     name,
440                 ));
441             }
442         };
443 
444         // The `Instance` itself requires some size allocated to it.
445         push("instance state management", mem::size_of::<Instance>());
446 
447         // Afterwards the `VMContext`'s regions are why we're requesting bytes,
448         // so ask it for descriptions on each region's byte size.
449         for (desc, size) in offsets.region_sizes() {
450             push(desc, size as usize);
451         }
452 
453         // double-check we accounted for all the bytes
454         assert_eq!(remaining, 0);
455 
456         bail!("{}", message)
457     }
458 
459     #[cfg(feature = "component-model")]
460     fn validate_component_instance_size(
461         &self,
462         offsets: &VMComponentOffsets<HostPtr>,
463     ) -> Result<()> {
464         if usize::try_from(offsets.size_of_vmctx()).unwrap() <= self.limits.component_instance_size
465         {
466             return Ok(());
467         }
468 
469         // TODO: Add context with detailed accounting of what makes up all the
470         // `VMComponentContext`'s space like we do for module instances.
471         bail!(
472             "instance allocation for this component requires {} bytes of `VMComponentContext` \
473              space which exceeds the configured maximum of {} bytes",
474             offsets.size_of_vmctx(),
475             self.limits.component_instance_size
476         )
477     }
478 
479     fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
480         // Take the queue out of the mutex and drop the lock, to minimize
481         // contention.
482         let queue = mem::take(&mut *locked_queue);
483         drop(locked_queue);
484         queue.flush(self)
485     }
486 
487     /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
488     /// flushing the decommit queue. If flushing the queue freed up slots, then
489     /// try running `f` again.
490     #[cfg(feature = "async")]
491     fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
492         f().or_else(|e| {
493             if e.is::<PoolConcurrencyLimitError>() {
494                 let queue = self.decommit_queue.lock().unwrap();
495                 if self.flush_decommit_queue(queue) {
496                     return f();
497                 }
498             }
499 
500             Err(e)
501         })
502     }
503 
504     fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
505         match local_queue.raw_len() {
506             // If we didn't enqueue any regions for decommit, then we must have
507             // either memset the whole entity or eagerly remapped it to zero
508             // because we don't have linux's `madvise(DONTNEED)` semantics. In
509             // either case, the entity slot is ready for reuse immediately.
510             0 => {
511                 local_queue.flush(self);
512             }
513 
514             // We enqueued at least our batch size of regions for decommit, so
515             // flush the local queue immediately. Don't bother inspecting (or
516             // locking!) the shared queue.
517             n if n >= self.decommit_batch_size => {
518                 local_queue.flush(self);
519             }
520 
521             // If we enqueued some regions for decommit, but did not reach our
522             // batch size, so we don't want to flush it yet, then merge the
523             // local queue into the shared queue.
524             n => {
525                 debug_assert!(n < self.decommit_batch_size);
526                 let mut shared_queue = self.decommit_queue.lock().unwrap();
527                 shared_queue.append(&mut local_queue);
528                 // And if the shared queue now has at least as many regions
529                 // enqueued for decommit as our batch size, then we can flush
530                 // it.
531                 if shared_queue.raw_len() >= self.decommit_batch_size {
532                     self.flush_decommit_queue(shared_queue);
533                 }
534             }
535         }
536     }
537 }
538 
539 #[async_trait::async_trait]
540 unsafe impl InstanceAllocator for PoolingInstanceAllocator {
541     #[cfg(feature = "component-model")]
542     fn validate_component<'a>(
543         &self,
544         component: &Component,
545         offsets: &VMComponentOffsets<HostPtr>,
546         get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
547     ) -> Result<()> {
548         self.validate_component_instance_size(offsets)
549             .context("component instance size does not fit in pooling allocator requirements")?;
550 
551         let mut num_core_instances = 0;
552         let mut num_memories = 0;
553         let mut num_tables = 0;
554         for init in &component.initializers {
555             use wasmtime_environ::component::GlobalInitializer::*;
556             use wasmtime_environ::component::InstantiateModule;
557             match init {
558                 InstantiateModule(InstantiateModule::Import(_, _)) => {
559                     num_core_instances += 1;
560                     // Can't statically account for the total vmctx size, number
561                     // of memories, and number of tables in this component.
562                 }
563                 InstantiateModule(InstantiateModule::Static(static_module_index, _)) => {
564                     let module = get_module(*static_module_index);
565                     let offsets = VMOffsets::new(HostPtr, &module);
566                     self.validate_module(module, &offsets)?;
567                     num_core_instances += 1;
568                     num_memories += module.num_defined_memories();
569                     num_tables += module.num_defined_tables();
570                 }
571                 LowerImport { .. }
572                 | ExtractMemory(_)
573                 | ExtractTable(_)
574                 | ExtractRealloc(_)
575                 | ExtractCallback(_)
576                 | ExtractPostReturn(_)
577                 | Resource(_) => {}
578             }
579         }
580 
581         if num_core_instances
582             > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
583         {
584             bail!(
585                 "The component transitively contains {num_core_instances} core module instances, \
586                  which exceeds the configured maximum of {} in the pooling allocator",
587                 self.limits.max_core_instances_per_component
588             );
589         }
590 
591         if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
592             bail!(
593                 "The component transitively contains {num_memories} Wasm linear memories, which \
594                  exceeds the configured maximum of {} in the pooling allocator",
595                 self.limits.max_memories_per_component
596             );
597         }
598 
599         if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
600             bail!(
601                 "The component transitively contains {num_tables} tables, which exceeds the \
602                  configured maximum of {} in the pooling allocator",
603                 self.limits.max_tables_per_component
604             );
605         }
606 
607         Ok(())
608     }
609 
610     fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
611         self.validate_memory_plans(module)
612             .context("module memory does not fit in pooling allocator requirements")?;
613         self.validate_table_plans(module)
614             .context("module table does not fit in pooling allocator requirements")?;
615         self.validate_core_instance_size(offsets)
616             .context("module instance size does not fit in pooling allocator requirements")?;
617         Ok(())
618     }
619 
620     #[cfg(feature = "gc")]
621     fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
622         self.memories.validate_memory(memory)
623     }
624 
625     #[cfg(feature = "component-model")]
626     fn increment_component_instance_count(&self) -> Result<()> {
627         let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
628         if old_count >= u64::from(self.limits.total_component_instances) {
629             self.decrement_component_instance_count();
630             return Err(PoolConcurrencyLimitError::new(
631                 usize::try_from(self.limits.total_component_instances).unwrap(),
632                 "component instances",
633             )
634             .into());
635         }
636         Ok(())
637     }
638 
639     #[cfg(feature = "component-model")]
640     fn decrement_component_instance_count(&self) {
641         self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
642     }
643 
644     fn increment_core_instance_count(&self) -> Result<()> {
645         let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
646         if old_count >= u64::from(self.limits.total_core_instances) {
647             self.decrement_core_instance_count();
648             return Err(PoolConcurrencyLimitError::new(
649                 usize::try_from(self.limits.total_core_instances).unwrap(),
650                 "core instances",
651             )
652             .into());
653         }
654         Ok(())
655     }
656 
657     fn decrement_core_instance_count(&self) {
658         self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
659     }
660 
661     async fn allocate_memory(
662         &self,
663         request: &mut InstanceAllocationRequest<'_, '_>,
664         ty: &wasmtime_environ::Memory,
665         memory_index: Option<DefinedMemoryIndex>,
666     ) -> Result<(MemoryAllocationIndex, Memory)> {
667         async {
668             // FIXME(rust-lang/rust#145127) this should ideally use a version of
669             // `with_flush_and_retry` but adapted for async closures instead of only
670             // sync closures. Right now that won't compile though so this is the
671             // manually expanded version of the method.
672             let e = match self.memories.allocate(request, ty, memory_index).await {
673                 Ok(result) => return Ok(result),
674                 Err(e) => e,
675             };
676 
677             if e.is::<PoolConcurrencyLimitError>() {
678                 let queue = self.decommit_queue.lock().unwrap();
679                 if self.flush_decommit_queue(queue) {
680                     return self.memories.allocate(request, ty, memory_index).await;
681                 }
682             }
683 
684             Err(e)
685         }
686         .await
687         .inspect(|_| {
688             self.live_memories.fetch_add(1, Ordering::Relaxed);
689         })
690     }
691 
692     unsafe fn deallocate_memory(
693         &self,
694         _memory_index: Option<DefinedMemoryIndex>,
695         allocation_index: MemoryAllocationIndex,
696         memory: Memory,
697     ) {
698         let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
699         debug_assert!(prev > 0);
700 
701         // Reset the image slot. If there is any error clearing the
702         // image, just drop it here, and let the drop handler for the
703         // slot unmap in a way that retains the address space
704         // reservation.
705         let mut image = memory.unwrap_static_image();
706         let mut queue = DecommitQueue::default();
707         image
708             .clear_and_remain_ready(
709                 self.pagemap.as_ref(),
710                 self.memories.keep_resident,
711                 |ptr, len| {
712                     // SAFETY: the memory in `image` won't be used until this
713                     // decommit queue is flushed, and by definition the memory is
714                     // not in use when calling this function.
715                     unsafe {
716                         queue.push_raw(ptr, len);
717                     }
718                 },
719             )
720             .expect("failed to reset memory image");
721 
722         // SAFETY: this image is not in use and its memory regions were enqueued
723         // with `push_raw` above.
724         unsafe {
725             queue.push_memory(allocation_index, image);
726         }
727         self.merge_or_flush(queue);
728     }
729 
730     async fn allocate_table(
731         &self,
732         request: &mut InstanceAllocationRequest<'_, '_>,
733         ty: &wasmtime_environ::Table,
734         _table_index: DefinedTableIndex,
735     ) -> Result<(super::TableAllocationIndex, Table)> {
736         async {
737             // FIXME: see `allocate_memory` above for comments about duplication
738             // with `with_flush_and_retry`.
739             let e = match self.tables.allocate(request, ty).await {
740                 Ok(result) => return Ok(result),
741                 Err(e) => e,
742             };
743 
744             if e.is::<PoolConcurrencyLimitError>() {
745                 let queue = self.decommit_queue.lock().unwrap();
746                 if self.flush_decommit_queue(queue) {
747                     return self.tables.allocate(request, ty).await;
748                 }
749             }
750 
751             Err(e)
752         }
753         .await
754         .inspect(|_| {
755             self.live_tables.fetch_add(1, Ordering::Relaxed);
756         })
757     }
758 
759     unsafe fn deallocate_table(
760         &self,
761         _table_index: DefinedTableIndex,
762         allocation_index: TableAllocationIndex,
763         mut table: Table,
764     ) {
765         let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
766         debug_assert!(prev > 0);
767 
768         let mut queue = DecommitQueue::default();
769         // SAFETY: This table is no longer in use by the allocator when this
770         // method is called and additionally all image ranges are pushed with
771         // the understanding that the memory won't get used until the whole
772         // queue is flushed.
773         unsafe {
774             self.tables.reset_table_pages_to_zero(
775                 self.pagemap.as_ref(),
776                 allocation_index,
777                 &mut table,
778                 |ptr, len| {
779                     queue.push_raw(ptr, len);
780                 },
781             );
782         }
783 
784         // SAFETY: the table has had all its memory regions enqueued above.
785         unsafe {
786             queue.push_table(allocation_index, table);
787         }
788         self.merge_or_flush(queue);
789     }
790 
791     #[cfg(feature = "async")]
792     fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
793         self.with_flush_and_retry(|| self.stacks.allocate())
794     }
795 
796     #[cfg(feature = "async")]
797     unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
798         let mut queue = DecommitQueue::default();
799         // SAFETY: the stack is no longer in use by definition when this
800         // function is called and memory ranges pushed here are otherwise no
801         // longer in use.
802         unsafe {
803             self.stacks
804                 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len));
805         }
806         // SAFETY: this stack's memory regions were enqueued above.
807         unsafe {
808             queue.push_stack(stack);
809         }
810         self.merge_or_flush(queue);
811     }
812 
813     fn purge_module(&self, module: CompiledModuleId) {
814         self.memories.purge_module(module);
815     }
816 
817     fn next_available_pkey(&self) -> Option<ProtectionKey> {
818         self.memories.next_available_pkey()
819     }
820 
821     fn restrict_to_pkey(&self, pkey: ProtectionKey) {
822         mpk::allow(ProtectionMask::zero().or(pkey));
823     }
824 
825     fn allow_all_pkeys(&self) {
826         mpk::allow(ProtectionMask::all());
827     }
828 
829     #[cfg(feature = "gc")]
830     fn allocate_gc_heap(
831         &self,
832         engine: &crate::Engine,
833         gc_runtime: &dyn GcRuntime,
834         memory_alloc_index: MemoryAllocationIndex,
835         memory: Memory,
836     ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
837         self.gc_heaps
838             .allocate(engine, gc_runtime, memory_alloc_index, memory)
839     }
840 
841     #[cfg(feature = "gc")]
842     fn deallocate_gc_heap(
843         &self,
844         allocation_index: GcHeapAllocationIndex,
845         gc_heap: Box<dyn GcHeap>,
846     ) -> (MemoryAllocationIndex, Memory) {
847         self.gc_heaps.deallocate(allocation_index, gc_heap)
848     }
849 
850     fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
851         Some(self)
852     }
853 }
854 
855 #[cfg(test)]
856 #[cfg(target_pointer_width = "64")]
857 mod test {
858     use super::*;
859 
860     #[test]
861     fn test_pooling_allocator_with_memory_pages_exceeded() {
862         let config = PoolingInstanceAllocatorConfig {
863             limits: InstanceLimits {
864                 total_memories: 1,
865                 max_memory_size: 0x100010000,
866                 ..Default::default()
867             },
868             ..PoolingInstanceAllocatorConfig::default()
869         };
870         assert_eq!(
871             PoolingInstanceAllocator::new(
872                 &config,
873                 &Tunables {
874                     memory_reservation: 0x10000,
875                     ..Tunables::default_host()
876                 },
877             )
878             .map_err(|e| e.to_string())
879             .expect_err("expected a failure constructing instance allocator"),
880             "maximum memory size of 0x100010000 bytes exceeds the configured \
881              memory reservation of 0x10000 bytes"
882         );
883     }
884 
885     #[cfg(all(
886         unix,
887         target_pointer_width = "64",
888         feature = "async",
889         not(miri),
890         not(asan)
891     ))]
892     #[test]
893     fn test_stack_zeroed() -> Result<()> {
894         let config = PoolingInstanceAllocatorConfig {
895             max_unused_warm_slots: 0,
896             limits: InstanceLimits {
897                 total_stacks: 1,
898                 total_memories: 0,
899                 total_tables: 0,
900                 ..Default::default()
901             },
902             stack_size: 128,
903             async_stack_zeroing: true,
904             ..PoolingInstanceAllocatorConfig::default()
905         };
906         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
907 
908         unsafe {
909             for _ in 0..255 {
910                 let stack = allocator.allocate_fiber_stack()?;
911 
912                 // The stack pointer is at the top, so decrement it first
913                 let addr = stack.top().unwrap().sub(1);
914 
915                 assert_eq!(*addr, 0);
916                 *addr = 1;
917 
918                 allocator.deallocate_fiber_stack(stack);
919             }
920         }
921 
922         Ok(())
923     }
924 
925     #[cfg(all(
926         unix,
927         target_pointer_width = "64",
928         feature = "async",
929         not(miri),
930         not(asan)
931     ))]
932     #[test]
933     fn test_stack_unzeroed() -> Result<()> {
934         let config = PoolingInstanceAllocatorConfig {
935             max_unused_warm_slots: 0,
936             limits: InstanceLimits {
937                 total_stacks: 1,
938                 total_memories: 0,
939                 total_tables: 0,
940                 ..Default::default()
941             },
942             stack_size: 128,
943             async_stack_zeroing: false,
944             ..PoolingInstanceAllocatorConfig::default()
945         };
946         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
947 
948         unsafe {
949             for i in 0..255 {
950                 let stack = allocator.allocate_fiber_stack()?;
951 
952                 // The stack pointer is at the top, so decrement it first
953                 let addr = stack.top().unwrap().sub(1);
954 
955                 assert_eq!(*addr, i);
956                 *addr = i + 1;
957 
958                 allocator.deallocate_fiber_stack(stack);
959             }
960         }
961 
962         Ok(())
963     }
964 }
965