1 //! Implements the pooling instance allocator.
2 //!
3 //! The pooling instance allocator maps memory in advance and allocates
4 //! instances, memories, tables, and stacks from a pool of available resources.
5 //! Using the pooling instance allocator can speed up module instantiation when
6 //! modules can be constrained based on configurable limits
7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8 //! are allocated and freed, these slots are either filled or emptied:
9 //!
10 //! ```text
11 //! ┌──────┬──────┬──────┬──────┬──────┐
12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13 //! └──────┴──────┴──────┴──────┴──────┘
14 //! ```
15 //!
16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated
18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19 //! [`stack_pool`]. See those modules for more details.
20 
21 mod decommit_queue;
22 mod index_allocator;
23 mod memory_pool;
24 mod metrics;
25 mod table_pool;
26 
27 #[cfg(feature = "gc")]
28 mod gc_heap_pool;
29 
30 #[cfg(all(feature = "async"))]
31 mod generic_stack_pool;
32 #[cfg(all(feature = "async", unix, not(miri)))]
33 mod unix_stack_pool;
34 
35 #[cfg(all(feature = "async"))]
36 cfg_if::cfg_if! {
37     if #[cfg(all(unix, not(miri), not(asan)))] {
38         use unix_stack_pool as stack_pool;
39     } else {
40         use generic_stack_pool as stack_pool;
41     }
42 }
43 
44 use self::decommit_queue::DecommitQueue;
45 use self::memory_pool::MemoryPool;
46 use self::table_pool::TablePool;
47 use super::{
48     InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49 };
50 use crate::Enabled;
51 use crate::prelude::*;
52 use crate::runtime::vm::{
53     CompiledModuleId, Memory, Table,
54     instance::Instance,
55     mpk::{self, ProtectionKey, ProtectionMask},
56     sys::vm::PageMap,
57 };
58 use core::future::Future;
59 use core::pin::Pin;
60 use core::sync::atomic::AtomicUsize;
61 use std::borrow::Cow;
62 use std::fmt::Display;
63 use std::sync::{Mutex, MutexGuard};
64 use std::{
65     mem,
66     sync::atomic::{AtomicU64, Ordering},
67 };
68 use wasmtime_environ::{
69     DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
70 };
71 
72 pub use self::metrics::PoolingAllocatorMetrics;
73 
74 #[cfg(feature = "gc")]
75 use super::GcHeapAllocationIndex;
76 #[cfg(feature = "gc")]
77 use crate::runtime::vm::{GcHeap, GcRuntime};
78 #[cfg(feature = "gc")]
79 use gc_heap_pool::GcHeapPool;
80 
81 #[cfg(feature = "async")]
82 use stack_pool::StackPool;
83 
84 #[cfg(feature = "component-model")]
85 use wasmtime_environ::{
86     StaticModuleIndex,
87     component::{Component, VMComponentOffsets},
88 };
89 
round_up_to_pow2(n: usize, to: usize) -> usize90 fn round_up_to_pow2(n: usize, to: usize) -> usize {
91     debug_assert!(to > 0);
92     debug_assert!(to.is_power_of_two());
93     (n + to - 1) & !(to - 1)
94 }
95 
96 /// Instance-related limit configuration for pooling.
97 ///
98 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
99 #[derive(Debug, Copy, Clone)]
100 pub struct InstanceLimits {
101     /// The maximum number of component instances that may be allocated
102     /// concurrently.
103     pub total_component_instances: u32,
104 
105     /// The maximum size of a component's `VMComponentContext`, including
106     /// the aggregate size of all its inner core modules' `VMContext` sizes.
107     pub component_instance_size: usize,
108 
109     /// The maximum number of core module instances that may be allocated
110     /// concurrently.
111     pub total_core_instances: u32,
112 
113     /// The maximum number of core module instances that a single component may
114     /// transitively contain.
115     pub max_core_instances_per_component: u32,
116 
117     /// The maximum number of Wasm linear memories that a component may
118     /// transitively contain.
119     pub max_memories_per_component: u32,
120 
121     /// The maximum number of tables that a component may transitively contain.
122     pub max_tables_per_component: u32,
123 
124     /// The total number of linear memories in the pool, across all instances.
125     pub total_memories: u32,
126 
127     /// The total number of tables in the pool, across all instances.
128     pub total_tables: u32,
129 
130     /// The total number of async stacks in the pool, across all instances.
131     #[cfg(feature = "async")]
132     pub total_stacks: u32,
133 
134     /// Maximum size of a core instance's `VMContext`.
135     pub core_instance_size: usize,
136 
137     /// Maximum number of tables per instance.
138     pub max_tables_per_module: u32,
139 
140     /// Maximum number of word-size elements per table.
141     ///
142     /// Note that tables for element types such as continuations
143     /// that use more than one word of storage may store fewer
144     /// elements.
145     pub table_elements: usize,
146 
147     /// Maximum number of linear memories per instance.
148     pub max_memories_per_module: u32,
149 
150     /// Maximum byte size of a linear memory, must be smaller than
151     /// `memory_reservation` in `Tunables`.
152     pub max_memory_size: usize,
153 
154     /// The total number of GC heaps in the pool, across all instances.
155     #[cfg(feature = "gc")]
156     pub total_gc_heaps: u32,
157 }
158 
159 impl Default for InstanceLimits {
default() -> Self160     fn default() -> Self {
161         let total = if cfg!(target_pointer_width = "32") {
162             100
163         } else {
164             1000
165         };
166         // See doc comments for `wasmtime::PoolingAllocationConfig` for these
167         // default values
168         Self {
169             total_component_instances: total,
170             component_instance_size: 1 << 20, // 1 MiB
171             total_core_instances: total,
172             max_core_instances_per_component: u32::MAX,
173             max_memories_per_component: u32::MAX,
174             max_tables_per_component: u32::MAX,
175             total_memories: total,
176             total_tables: total,
177             #[cfg(feature = "async")]
178             total_stacks: total,
179             core_instance_size: 1 << 20, // 1 MiB
180             max_tables_per_module: 1,
181             // NB: in #8504 it was seen that a C# module in debug module can
182             // have 10k+ elements.
183             table_elements: 20_000,
184             max_memories_per_module: 1,
185             #[cfg(target_pointer_width = "64")]
186             max_memory_size: 1 << 32, // 4G,
187             #[cfg(target_pointer_width = "32")]
188             max_memory_size: 10 << 20, // 10 MiB
189             #[cfg(feature = "gc")]
190             total_gc_heaps: total,
191         }
192     }
193 }
194 
195 /// Configuration options for the pooling instance allocator supplied at
196 /// construction.
197 #[derive(Copy, Clone, Debug)]
198 pub struct PoolingInstanceAllocatorConfig {
199     /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
200     pub max_unused_warm_slots: u32,
201     /// The target number of decommits to do per batch. This is not precise, as
202     /// we can queue up decommits at times when we aren't prepared to
203     /// immediately flush them, and so we may go over this target size
204     /// occasionally.
205     pub decommit_batch_size: usize,
206     /// The size, in bytes, of async stacks to allocate (not including the guard
207     /// page).
208     pub stack_size: usize,
209     /// The limits to apply to instances allocated within this allocator.
210     pub limits: InstanceLimits,
211     /// Whether or not async stacks are zeroed after use.
212     pub async_stack_zeroing: bool,
213     /// If async stack zeroing is enabled and the host platform is Linux this is
214     /// how much memory to zero out with `memset`.
215     ///
216     /// The rest of memory will be zeroed out with `madvise`.
217     #[cfg(feature = "async")]
218     pub async_stack_keep_resident: usize,
219     /// How much linear memory, in bytes, to keep resident after resetting for
220     /// use with the next instance. This much memory will be `memset` to zero
221     /// when a linear memory is deallocated.
222     ///
223     /// Memory exceeding this amount in the wasm linear memory will be released
224     /// with `madvise` back to the kernel.
225     ///
226     /// Only applicable on Linux.
227     pub linear_memory_keep_resident: usize,
228     /// Same as `linear_memory_keep_resident` but for tables.
229     pub table_keep_resident: usize,
230     /// Whether to enable memory protection keys.
231     pub memory_protection_keys: Enabled,
232     /// How many memory protection keys to allocate.
233     pub max_memory_protection_keys: usize,
234     /// Whether to enable PAGEMAP_SCAN on Linux.
235     pub pagemap_scan: Enabled,
236 }
237 
238 impl Default for PoolingInstanceAllocatorConfig {
default() -> PoolingInstanceAllocatorConfig239     fn default() -> PoolingInstanceAllocatorConfig {
240         PoolingInstanceAllocatorConfig {
241             max_unused_warm_slots: 100,
242             decommit_batch_size: 1,
243             stack_size: 2 << 20,
244             limits: InstanceLimits::default(),
245             async_stack_zeroing: false,
246             #[cfg(feature = "async")]
247             async_stack_keep_resident: 0,
248             linear_memory_keep_resident: 0,
249             table_keep_resident: 0,
250             memory_protection_keys: Enabled::No,
251             max_memory_protection_keys: 16,
252             pagemap_scan: Enabled::No,
253         }
254     }
255 }
256 
257 impl PoolingInstanceAllocatorConfig {
is_pagemap_scan_available() -> bool258     pub fn is_pagemap_scan_available() -> bool {
259         PageMap::new().is_some()
260     }
261 }
262 
263 /// An error returned when the pooling allocator cannot allocate a table,
264 /// memory, etc... because the maximum number of concurrent allocations for that
265 /// entity has been reached.
266 #[derive(Debug)]
267 pub struct PoolConcurrencyLimitError {
268     limit: usize,
269     kind: Cow<'static, str>,
270 }
271 
272 impl core::error::Error for PoolConcurrencyLimitError {}
273 
274 impl Display for PoolConcurrencyLimitError {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result275     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276         let limit = self.limit;
277         let kind = &self.kind;
278         write!(f, "maximum concurrent limit of {limit} for {kind} reached")
279     }
280 }
281 
282 impl PoolConcurrencyLimitError {
new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self283     fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
284         Self {
285             limit,
286             kind: kind.into(),
287         }
288     }
289 }
290 
291 /// Implements the pooling instance allocator.
292 ///
293 /// This allocator internally maintains pools of instances, memories, tables,
294 /// and stacks.
295 ///
296 /// Note: the resource pools are manually dropped so that the fault handler
297 /// terminates correctly.
298 #[derive(Debug)]
299 pub struct PoolingInstanceAllocator {
300     decommit_batch_size: usize,
301     limits: InstanceLimits,
302 
303     // The number of live core module and component instances at any given
304     // time. Note that this can temporarily go over the configured limit. This
305     // doesn't mean we have actually overshot, but that we attempted to allocate
306     // a new instance and incremented the counter, we've seen (or are about to
307     // see) that the counter is beyond the configured threshold, and are going
308     // to decrement the counter and return an error but haven't done so yet. See
309     // the increment trait methods for more details.
310     live_core_instances: AtomicU64,
311     live_component_instances: AtomicU64,
312 
313     decommit_queue: Mutex<DecommitQueue>,
314 
315     memories: MemoryPool,
316     live_memories: AtomicUsize,
317 
318     tables: TablePool,
319     live_tables: AtomicUsize,
320 
321     #[cfg(feature = "gc")]
322     gc_heaps: GcHeapPool,
323     #[cfg(feature = "gc")]
324     live_gc_heaps: AtomicUsize,
325 
326     #[cfg(feature = "async")]
327     stacks: StackPool,
328     #[cfg(feature = "async")]
329     live_stacks: AtomicUsize,
330 
331     pagemap: Option<PageMap>,
332 }
333 
334 impl Drop for PoolingInstanceAllocator {
drop(&mut self)335     fn drop(&mut self) {
336         if !cfg!(debug_assertions) {
337             return;
338         }
339 
340         // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
341         // the queue, as the sub-pools will unmap those ranges anyways, so
342         // there's no point in decommitting them. But we do need to flush the
343         // queue when debug assertions are enabled to make sure that all
344         // entities get returned to their associated sub-pools and we can
345         // differentiate between a leaking slot and an enqueued-for-decommit
346         // slot.
347         let queue = self.decommit_queue.lock().unwrap();
348         self.flush_decommit_queue(queue);
349 
350         debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
351         debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
352         debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
353         debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
354 
355         debug_assert!(self.memories.is_empty());
356         debug_assert!(self.tables.is_empty());
357 
358         #[cfg(feature = "gc")]
359         {
360             debug_assert!(self.gc_heaps.is_empty());
361             debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
362         }
363 
364         #[cfg(feature = "async")]
365         {
366             debug_assert!(self.stacks.is_empty());
367             debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
368         }
369     }
370 }
371 
372 impl PoolingInstanceAllocator {
373     /// Creates a new pooling instance allocator with the given strategy and limits.
new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self>374     pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
375         Ok(Self {
376             decommit_batch_size: config.decommit_batch_size,
377             limits: config.limits,
378             live_component_instances: AtomicU64::new(0),
379             live_core_instances: AtomicU64::new(0),
380             decommit_queue: Mutex::new(DecommitQueue::default()),
381             memories: MemoryPool::new(config, tunables)?,
382             live_memories: AtomicUsize::new(0),
383             tables: TablePool::new(config)?,
384             live_tables: AtomicUsize::new(0),
385             #[cfg(feature = "gc")]
386             gc_heaps: GcHeapPool::new(config)?,
387             #[cfg(feature = "gc")]
388             live_gc_heaps: AtomicUsize::new(0),
389             #[cfg(feature = "async")]
390             stacks: StackPool::new(config)?,
391             #[cfg(feature = "async")]
392             live_stacks: AtomicUsize::new(0),
393             pagemap: match config.pagemap_scan {
394                 Enabled::Auto => PageMap::new(),
395                 Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
396                     format_err!(
397                         "required to enable PAGEMAP_SCAN but this system \
398                          does not support it"
399                     )
400                 })?),
401                 Enabled::No => None,
402             },
403         })
404     }
405 
core_instance_size(&self) -> usize406     fn core_instance_size(&self) -> usize {
407         round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
408     }
409 
validate_table_plans(&self, module: &Module) -> Result<()>410     fn validate_table_plans(&self, module: &Module) -> Result<()> {
411         self.tables.validate(module)
412     }
413 
validate_memory_plans(&self, module: &Module) -> Result<()>414     fn validate_memory_plans(&self, module: &Module) -> Result<()> {
415         self.memories.validate_memories(module)
416     }
417 
validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()>418     fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
419         let layout = Instance::alloc_layout(offsets);
420         if layout.size() <= self.core_instance_size() {
421             return Ok(());
422         }
423 
424         // If this `module` exceeds the allocation size allotted to it then an
425         // error will be reported here. The error of "required N bytes but
426         // cannot allocate that" is pretty opaque, however, because it's not
427         // clear what the breakdown of the N bytes are and what to optimize
428         // next. To help provide a better error message here some fancy-ish
429         // logic is done here to report the breakdown of the byte request into
430         // the largest portions and where it's coming from.
431         let mut message = format!(
432             "instance allocation for this module \
433              requires {} bytes which exceeds the configured maximum \
434              of {} bytes; breakdown of allocation requirement:\n\n",
435             layout.size(),
436             self.core_instance_size(),
437         );
438 
439         let mut remaining = layout.size();
440         let mut push = |name: &str, bytes: usize| {
441             assert!(remaining >= bytes);
442             remaining -= bytes;
443 
444             // If the `name` region is more than 5% of the allocation request
445             // then report it here, otherwise ignore it. We have less than 20
446             // fields so we're guaranteed that something should be reported, and
447             // otherwise it's not particularly interesting to learn about 5
448             // different fields that are all 8 or 0 bytes. Only try to report
449             // the "major" sources of bytes here.
450             if bytes > layout.size() / 20 {
451                 message.push_str(&format!(
452                     " * {:.02}% - {} bytes - {}\n",
453                     ((bytes as f32) / (layout.size() as f32)) * 100.0,
454                     bytes,
455                     name,
456                 ));
457             }
458         };
459 
460         // The `Instance` itself requires some size allocated to it.
461         push("instance state management", mem::size_of::<Instance>());
462 
463         // Afterwards the `VMContext`'s regions are why we're requesting bytes,
464         // so ask it for descriptions on each region's byte size.
465         for (desc, size) in offsets.region_sizes() {
466             push(desc, size as usize);
467         }
468 
469         // double-check we accounted for all the bytes
470         assert_eq!(remaining, 0);
471 
472         bail!("{message}")
473     }
474 
475     #[cfg(feature = "component-model")]
validate_component_instance_size( &self, offsets: &VMComponentOffsets<HostPtr>, core_instances_aggregate_size: usize, ) -> Result<()>476     fn validate_component_instance_size(
477         &self,
478         offsets: &VMComponentOffsets<HostPtr>,
479         core_instances_aggregate_size: usize,
480     ) -> Result<()> {
481         let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
482         let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
483         if total_instance_size <= self.limits.component_instance_size {
484             return Ok(());
485         }
486 
487         // TODO: Add context with detailed accounting of what makes up all the
488         // `VMComponentContext`'s space like we do for module instances.
489         bail!(
490             "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
491              and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
492              `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
493              {core_instances_aggregate_size} bytes.",
494             self.limits.component_instance_size
495         )
496     }
497 
flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool498     fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
499         // Take the queue out of the mutex and drop the lock, to minimize
500         // contention.
501         let queue = mem::take(&mut *locked_queue);
502         drop(locked_queue);
503         queue.flush(self)
504     }
505 
506     /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
507     /// flushing the decommit queue. If flushing the queue freed up slots, then
508     /// try running `f` again.
509     #[cfg(feature = "async")]
with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T>510     fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
511         f().or_else(|e| {
512             if e.is::<PoolConcurrencyLimitError>() {
513                 let queue = self.decommit_queue.lock().unwrap();
514                 if self.flush_decommit_queue(queue) {
515                     return f();
516                 }
517             }
518 
519             Err(e)
520         })
521     }
522 
merge_or_flush(&self, mut local_queue: DecommitQueue)523     fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
524         match local_queue.raw_len() {
525             // If we didn't enqueue any regions for decommit, then we must have
526             // either memset the whole entity or eagerly remapped it to zero
527             // because we don't have linux's `madvise(DONTNEED)` semantics. In
528             // either case, the entity slot is ready for reuse immediately.
529             0 => {
530                 local_queue.flush(self);
531             }
532 
533             // We enqueued at least our batch size of regions for decommit, so
534             // flush the local queue immediately. Don't bother inspecting (or
535             // locking!) the shared queue.
536             n if n >= self.decommit_batch_size => {
537                 local_queue.flush(self);
538             }
539 
540             // If we enqueued some regions for decommit, but did not reach our
541             // batch size, so we don't want to flush it yet, then merge the
542             // local queue into the shared queue.
543             n => {
544                 debug_assert!(n < self.decommit_batch_size);
545                 let mut shared_queue = self.decommit_queue.lock().unwrap();
546                 shared_queue.append(&mut local_queue);
547                 // And if the shared queue now has at least as many regions
548                 // enqueued for decommit as our batch size, then we can flush
549                 // it.
550                 if shared_queue.raw_len() >= self.decommit_batch_size {
551                     self.flush_decommit_queue(shared_queue);
552                 }
553             }
554         }
555     }
556 }
557 
558 unsafe impl InstanceAllocator for PoolingInstanceAllocator {
559     #[cfg(feature = "component-model")]
validate_component<'a>( &self, component: &Component, offsets: &VMComponentOffsets<HostPtr>, get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module, ) -> Result<()>560     fn validate_component<'a>(
561         &self,
562         component: &Component,
563         offsets: &VMComponentOffsets<HostPtr>,
564         get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
565     ) -> Result<()> {
566         let mut num_core_instances = 0;
567         let mut num_memories = 0;
568         let mut num_tables = 0;
569         let mut core_instances_aggregate_size: usize = 0;
570         for init in &component.initializers {
571             use wasmtime_environ::component::GlobalInitializer::*;
572             use wasmtime_environ::component::InstantiateModule;
573             match init {
574                 InstantiateModule(InstantiateModule::Import(_, _), _) => {
575                     num_core_instances += 1;
576                     // Can't statically account for the total vmctx size, number
577                     // of memories, and number of tables in this component.
578                 }
579                 InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
580                     let module = get_module(*static_module_index);
581                     let offsets = VMOffsets::new(HostPtr, &module);
582                     let layout = Instance::alloc_layout(&offsets);
583                     self.validate_module(module, &offsets)?;
584                     num_core_instances += 1;
585                     num_memories += module.num_defined_memories();
586                     num_tables += module.num_defined_tables();
587                     core_instances_aggregate_size += layout.size();
588                 }
589                 LowerImport { .. }
590                 | ExtractMemory(_)
591                 | ExtractTable(_)
592                 | ExtractRealloc(_)
593                 | ExtractCallback(_)
594                 | ExtractPostReturn(_)
595                 | Resource(_) => {}
596             }
597         }
598 
599         if num_core_instances
600             > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
601         {
602             bail!(
603                 "The component transitively contains {num_core_instances} core module instances, \
604                  which exceeds the configured maximum of {} in the pooling allocator",
605                 self.limits.max_core_instances_per_component
606             );
607         }
608 
609         if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
610             bail!(
611                 "The component transitively contains {num_memories} Wasm linear memories, which \
612                  exceeds the configured maximum of {} in the pooling allocator",
613                 self.limits.max_memories_per_component
614             );
615         }
616 
617         if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
618             bail!(
619                 "The component transitively contains {num_tables} tables, which exceeds the \
620                  configured maximum of {} in the pooling allocator",
621                 self.limits.max_tables_per_component
622             );
623         }
624 
625         self.validate_component_instance_size(offsets, core_instances_aggregate_size)
626             .context("component instance size does not fit in pooling allocator requirements")?;
627 
628         Ok(())
629     }
630 
validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()>631     fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
632         self.validate_memory_plans(module)
633             .context("module memory does not fit in pooling allocator requirements")?;
634         self.validate_table_plans(module)
635             .context("module table does not fit in pooling allocator requirements")?;
636         self.validate_core_instance_size(offsets)
637             .context("module instance size does not fit in pooling allocator requirements")?;
638         Ok(())
639     }
640 
641     #[cfg(feature = "gc")]
validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()>642     fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
643         self.memories.validate_memory(memory)
644     }
645 
646     #[cfg(feature = "component-model")]
increment_component_instance_count(&self) -> Result<()>647     fn increment_component_instance_count(&self) -> Result<()> {
648         let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
649         if old_count >= u64::from(self.limits.total_component_instances) {
650             self.decrement_component_instance_count();
651             return Err(PoolConcurrencyLimitError::new(
652                 usize::try_from(self.limits.total_component_instances).unwrap(),
653                 "component instances",
654             )
655             .into());
656         }
657         Ok(())
658     }
659 
660     #[cfg(feature = "component-model")]
decrement_component_instance_count(&self)661     fn decrement_component_instance_count(&self) {
662         self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
663     }
664 
increment_core_instance_count(&self) -> Result<()>665     fn increment_core_instance_count(&self) -> Result<()> {
666         let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
667         if old_count >= u64::from(self.limits.total_core_instances) {
668             self.decrement_core_instance_count();
669             return Err(PoolConcurrencyLimitError::new(
670                 usize::try_from(self.limits.total_core_instances).unwrap(),
671                 "core instances",
672             )
673             .into());
674         }
675         Ok(())
676     }
677 
decrement_core_instance_count(&self)678     fn decrement_core_instance_count(&self) {
679         self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
680     }
681 
allocate_memory<'a, 'b: 'a, 'c: 'a>( &'a self, request: &'a mut InstanceAllocationRequest<'b, 'c>, ty: &'a wasmtime_environ::Memory, memory_index: Option<DefinedMemoryIndex>, ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>>682     fn allocate_memory<'a, 'b: 'a, 'c: 'a>(
683         &'a self,
684         request: &'a mut InstanceAllocationRequest<'b, 'c>,
685         ty: &'a wasmtime_environ::Memory,
686         memory_index: Option<DefinedMemoryIndex>,
687     ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>> {
688         crate::runtime::box_future(async move {
689             async {
690                 // FIXME(rust-lang/rust#145127) this should ideally use a version of
691                 // `with_flush_and_retry` but adapted for async closures instead of only
692                 // sync closures. Right now that won't compile though so this is the
693                 // manually expanded version of the method.
694                 let e = match self.memories.allocate(request, ty, memory_index).await {
695                     Ok(result) => return Ok(result),
696                     Err(e) => e,
697                 };
698 
699                 if e.is::<PoolConcurrencyLimitError>() {
700                     let queue = self.decommit_queue.lock().unwrap();
701                     if self.flush_decommit_queue(queue) {
702                         return self.memories.allocate(request, ty, memory_index).await;
703                     }
704                 }
705 
706                 Err(e)
707             }
708             .await
709             .inspect(|_| {
710                 self.live_memories.fetch_add(1, Ordering::Relaxed);
711             })
712         })
713     }
714 
deallocate_memory( &self, _memory_index: Option<DefinedMemoryIndex>, allocation_index: MemoryAllocationIndex, memory: Memory, )715     unsafe fn deallocate_memory(
716         &self,
717         _memory_index: Option<DefinedMemoryIndex>,
718         allocation_index: MemoryAllocationIndex,
719         memory: Memory,
720     ) {
721         let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
722         debug_assert!(prev > 0);
723 
724         // Reset the image slot. Depending on whether this is successful or not
725         // the `image` is preserved for future use. On success it's queued up to
726         // get deallocated later, and on failure the slot is deallocated
727         // immediately without preserving the image.
728         let mut image = memory.unwrap_static_image();
729         let mut queue = DecommitQueue::default();
730         let bytes_resident = image.clear_and_remain_ready(
731             self.pagemap.as_ref(),
732             self.memories.keep_resident,
733             |ptr, len| {
734                 // SAFETY: the memory in `image` won't be used until this
735                 // decommit queue is flushed, and by definition the memory is
736                 // not in use when calling this function.
737                 unsafe {
738                     queue.push_raw(ptr, len);
739                 }
740             },
741         );
742 
743         match bytes_resident {
744             Ok(bytes_resident) => {
745                 // SAFETY: this image is not in use and its memory regions were enqueued
746                 // with `push_raw` above.
747                 unsafe {
748                     queue.push_memory(allocation_index, image, bytes_resident);
749                 }
750                 self.merge_or_flush(queue);
751             }
752             Err(e) => {
753                 log::warn!("ignoring clear_and_remain_ready error {e}");
754                 // SAFETY: `allocation_index` comes from this pool, as an unsafe
755                 // contract of this function itself, and it's guaranteed to be no
756                 // longer in use so safe to deallocate. The slot couldn't be
757                 // preserved so it's dropped here.
758                 //
759                 // Note that at this point it's not clear how many bytes are
760                 // resident in memory, so it's inevitably going to leave statistics
761                 // a little off. Also note though that non-Linux platforms don't
762                 // keep track of resident bytes anyway, and this path is only
763                 // reachable on non-Linux platforms because Linux can't return an
764                 // error.
765                 unsafe {
766                     self.memories.deallocate(allocation_index, None, 0);
767                 }
768             }
769         }
770     }
771 
allocate_table<'a, 'b: 'a, 'c: 'a>( &'a self, request: &'a mut InstanceAllocationRequest<'b, 'c>, ty: &'a wasmtime_environ::Table, _table_index: DefinedTableIndex, ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>772     fn allocate_table<'a, 'b: 'a, 'c: 'a>(
773         &'a self,
774         request: &'a mut InstanceAllocationRequest<'b, 'c>,
775         ty: &'a wasmtime_environ::Table,
776         _table_index: DefinedTableIndex,
777     ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>
778     {
779         crate::runtime::box_future(async move {
780             async {
781                 // FIXME: see `allocate_memory` above for comments about duplication
782                 // with `with_flush_and_retry`.
783                 let e = match self.tables.allocate(request, ty).await {
784                     Ok(result) => return Ok(result),
785                     Err(e) => e,
786                 };
787 
788                 if e.is::<PoolConcurrencyLimitError>() {
789                     let queue = self.decommit_queue.lock().unwrap();
790                     if self.flush_decommit_queue(queue) {
791                         return self.tables.allocate(request, ty).await;
792                     }
793                 }
794 
795                 Err(e)
796             }
797             .await
798             .inspect(|_| {
799                 self.live_tables.fetch_add(1, Ordering::Relaxed);
800             })
801         })
802     }
803 
deallocate_table( &self, _table_index: DefinedTableIndex, allocation_index: TableAllocationIndex, mut table: Table, )804     unsafe fn deallocate_table(
805         &self,
806         _table_index: DefinedTableIndex,
807         allocation_index: TableAllocationIndex,
808         mut table: Table,
809     ) {
810         let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
811         debug_assert!(prev > 0);
812 
813         let mut queue = DecommitQueue::default();
814         // SAFETY: This table is no longer in use by the allocator when this
815         // method is called and additionally all image ranges are pushed with
816         // the understanding that the memory won't get used until the whole
817         // queue is flushed.
818         let bytes_resident = unsafe {
819             self.tables.reset_table_pages_to_zero(
820                 self.pagemap.as_ref(),
821                 allocation_index,
822                 &mut table,
823                 |ptr, len| {
824                     queue.push_raw(ptr, len);
825                 },
826             )
827         };
828 
829         // SAFETY: the table has had all its memory regions enqueued above.
830         unsafe {
831             queue.push_table(allocation_index, table, bytes_resident);
832         }
833         self.merge_or_flush(queue);
834     }
835 
836     #[cfg(feature = "async")]
allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack>837     fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
838         let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
839         self.live_stacks.fetch_add(1, Ordering::Relaxed);
840         Ok(ret)
841     }
842 
843     #[cfg(feature = "async")]
deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack)844     unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
845         self.live_stacks.fetch_sub(1, Ordering::Relaxed);
846         let mut queue = DecommitQueue::default();
847         // SAFETY: the stack is no longer in use by definition when this
848         // function is called and memory ranges pushed here are otherwise no
849         // longer in use.
850         let bytes_resident = unsafe {
851             self.stacks
852                 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
853         };
854         // SAFETY: this stack's memory regions were enqueued above.
855         unsafe {
856             queue.push_stack(stack, bytes_resident);
857         }
858         self.merge_or_flush(queue);
859     }
860 
purge_module(&self, module: CompiledModuleId)861     fn purge_module(&self, module: CompiledModuleId) {
862         self.memories.purge_module(module);
863     }
864 
next_available_pkey(&self) -> Option<ProtectionKey>865     fn next_available_pkey(&self) -> Option<ProtectionKey> {
866         self.memories.next_available_pkey()
867     }
868 
restrict_to_pkey(&self, pkey: ProtectionKey)869     fn restrict_to_pkey(&self, pkey: ProtectionKey) {
870         mpk::allow(ProtectionMask::zero().or(pkey));
871     }
872 
allow_all_pkeys(&self)873     fn allow_all_pkeys(&self) {
874         mpk::allow(ProtectionMask::all());
875     }
876 
877     #[cfg(feature = "gc")]
allocate_gc_heap( &self, engine: &crate::Engine, gc_runtime: &dyn GcRuntime, memory_alloc_index: MemoryAllocationIndex, memory: Memory, ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)>878     fn allocate_gc_heap(
879         &self,
880         engine: &crate::Engine,
881         gc_runtime: &dyn GcRuntime,
882         memory_alloc_index: MemoryAllocationIndex,
883         memory: Memory,
884     ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
885         let ret = self
886             .gc_heaps
887             .allocate(engine, gc_runtime, memory_alloc_index, memory)?;
888         self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
889         Ok(ret)
890     }
891 
892     #[cfg(feature = "gc")]
deallocate_gc_heap( &self, allocation_index: GcHeapAllocationIndex, gc_heap: Box<dyn GcHeap>, ) -> (MemoryAllocationIndex, Memory)893     fn deallocate_gc_heap(
894         &self,
895         allocation_index: GcHeapAllocationIndex,
896         gc_heap: Box<dyn GcHeap>,
897     ) -> (MemoryAllocationIndex, Memory) {
898         self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
899         self.gc_heaps.deallocate(allocation_index, gc_heap)
900     }
901 
as_pooling(&self) -> Option<&PoolingInstanceAllocator>902     fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
903         Some(self)
904     }
905 }
906 
907 #[cfg(test)]
908 #[cfg(target_pointer_width = "64")]
909 mod test {
910     use super::*;
911 
912     #[test]
test_pooling_allocator_with_memory_pages_exceeded()913     fn test_pooling_allocator_with_memory_pages_exceeded() {
914         let config = PoolingInstanceAllocatorConfig {
915             limits: InstanceLimits {
916                 total_memories: 1,
917                 max_memory_size: 0x100010000,
918                 ..Default::default()
919             },
920             ..PoolingInstanceAllocatorConfig::default()
921         };
922         assert_eq!(
923             PoolingInstanceAllocator::new(
924                 &config,
925                 &Tunables {
926                     memory_reservation: 0x10000,
927                     ..Tunables::default_host()
928                 },
929             )
930             .map_err(|e| e.to_string())
931             .expect_err("expected a failure constructing instance allocator"),
932             "maximum memory size of 0x100010000 bytes exceeds the configured \
933              memory reservation of 0x10000 bytes"
934         );
935     }
936 
937     #[cfg(all(
938         unix,
939         target_pointer_width = "64",
940         feature = "async",
941         not(miri),
942         not(asan)
943     ))]
944     #[test]
test_stack_zeroed() -> Result<()>945     fn test_stack_zeroed() -> Result<()> {
946         let config = PoolingInstanceAllocatorConfig {
947             max_unused_warm_slots: 0,
948             limits: InstanceLimits {
949                 total_stacks: 1,
950                 total_memories: 0,
951                 total_tables: 0,
952                 ..Default::default()
953             },
954             stack_size: 128,
955             async_stack_zeroing: true,
956             ..PoolingInstanceAllocatorConfig::default()
957         };
958         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
959 
960         unsafe {
961             for _ in 0..255 {
962                 let stack = allocator.allocate_fiber_stack()?;
963 
964                 // The stack pointer is at the top, so decrement it first
965                 let addr = stack.top().unwrap().sub(1);
966 
967                 assert_eq!(*addr, 0);
968                 *addr = 1;
969 
970                 allocator.deallocate_fiber_stack(stack);
971             }
972         }
973 
974         Ok(())
975     }
976 
977     #[cfg(all(
978         unix,
979         target_pointer_width = "64",
980         feature = "async",
981         not(miri),
982         not(asan)
983     ))]
984     #[test]
test_stack_unzeroed() -> Result<()>985     fn test_stack_unzeroed() -> Result<()> {
986         let config = PoolingInstanceAllocatorConfig {
987             max_unused_warm_slots: 0,
988             limits: InstanceLimits {
989                 total_stacks: 1,
990                 total_memories: 0,
991                 total_tables: 0,
992                 ..Default::default()
993             },
994             stack_size: 128,
995             async_stack_zeroing: false,
996             ..PoolingInstanceAllocatorConfig::default()
997         };
998         let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
999 
1000         unsafe {
1001             for i in 0..255 {
1002                 let stack = allocator.allocate_fiber_stack()?;
1003 
1004                 // The stack pointer is at the top, so decrement it first
1005                 let addr = stack.top().unwrap().sub(1);
1006 
1007                 assert_eq!(*addr, i);
1008                 *addr = i + 1;
1009 
1010                 allocator.deallocate_fiber_stack(stack);
1011             }
1012         }
1013 
1014         Ok(())
1015     }
1016 }
1017