1 //! Implements the pooling instance allocator.
2 //!
3 //! The pooling instance allocator maps memory in advance and allocates
4 //! instances, memories, tables, and stacks from a pool of available resources.
5 //! Using the pooling instance allocator can speed up module instantiation when
6 //! modules can be constrained based on configurable limits
7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances
8 //! are allocated and freed, these slots are either filled or emptied:
9 //!
10 //! ```text
11 //! ┌──────┬──────┬──────┬──────┬──────┐
12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│
13 //! └──────┴──────┴──────┴──────┴──────┘
14 //! ```
15 //!
16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out
17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated
18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`],
19 //! [`stack_pool`]. See those modules for more details.
20
21 mod decommit_queue;
22 mod index_allocator;
23 mod memory_pool;
24 mod metrics;
25 mod table_pool;
26
27 #[cfg(feature = "gc")]
28 mod gc_heap_pool;
29
30 #[cfg(all(feature = "async"))]
31 mod generic_stack_pool;
32 #[cfg(all(feature = "async", unix, not(miri)))]
33 mod unix_stack_pool;
34
35 #[cfg(all(feature = "async"))]
36 cfg_if::cfg_if! {
37 if #[cfg(all(unix, not(miri), not(asan)))] {
38 use unix_stack_pool as stack_pool;
39 } else {
40 use generic_stack_pool as stack_pool;
41 }
42 }
43
44 use self::decommit_queue::DecommitQueue;
45 use self::memory_pool::MemoryPool;
46 use self::table_pool::TablePool;
47 use super::{
48 InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex,
49 };
50 use crate::Enabled;
51 use crate::prelude::*;
52 use crate::runtime::vm::{
53 CompiledModuleId, Memory, Table,
54 instance::Instance,
55 mpk::{self, ProtectionKey, ProtectionMask},
56 sys::vm::PageMap,
57 };
58 use core::future::Future;
59 use core::pin::Pin;
60 use core::sync::atomic::AtomicUsize;
61 use std::borrow::Cow;
62 use std::fmt::Display;
63 use std::sync::{Mutex, MutexGuard};
64 use std::{
65 mem,
66 sync::atomic::{AtomicU64, Ordering},
67 };
68 use wasmtime_environ::{
69 DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets,
70 };
71
72 pub use self::metrics::PoolingAllocatorMetrics;
73
74 #[cfg(feature = "gc")]
75 use super::GcHeapAllocationIndex;
76 #[cfg(feature = "gc")]
77 use crate::runtime::vm::{GcHeap, GcRuntime};
78 #[cfg(feature = "gc")]
79 use gc_heap_pool::GcHeapPool;
80
81 #[cfg(feature = "async")]
82 use stack_pool::StackPool;
83
84 #[cfg(feature = "component-model")]
85 use wasmtime_environ::{
86 StaticModuleIndex,
87 component::{Component, VMComponentOffsets},
88 };
89
round_up_to_pow2(n: usize, to: usize) -> usize90 fn round_up_to_pow2(n: usize, to: usize) -> usize {
91 debug_assert!(to > 0);
92 debug_assert!(to.is_power_of_two());
93 (n + to - 1) & !(to - 1)
94 }
95
96 /// Instance-related limit configuration for pooling.
97 ///
98 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`.
99 #[derive(Debug, Copy, Clone)]
100 pub struct InstanceLimits {
101 /// The maximum number of component instances that may be allocated
102 /// concurrently.
103 pub total_component_instances: u32,
104
105 /// The maximum size of a component's `VMComponentContext`, including
106 /// the aggregate size of all its inner core modules' `VMContext` sizes.
107 pub component_instance_size: usize,
108
109 /// The maximum number of core module instances that may be allocated
110 /// concurrently.
111 pub total_core_instances: u32,
112
113 /// The maximum number of core module instances that a single component may
114 /// transitively contain.
115 pub max_core_instances_per_component: u32,
116
117 /// The maximum number of Wasm linear memories that a component may
118 /// transitively contain.
119 pub max_memories_per_component: u32,
120
121 /// The maximum number of tables that a component may transitively contain.
122 pub max_tables_per_component: u32,
123
124 /// The total number of linear memories in the pool, across all instances.
125 pub total_memories: u32,
126
127 /// The total number of tables in the pool, across all instances.
128 pub total_tables: u32,
129
130 /// The total number of async stacks in the pool, across all instances.
131 #[cfg(feature = "async")]
132 pub total_stacks: u32,
133
134 /// Maximum size of a core instance's `VMContext`.
135 pub core_instance_size: usize,
136
137 /// Maximum number of tables per instance.
138 pub max_tables_per_module: u32,
139
140 /// Maximum number of word-size elements per table.
141 ///
142 /// Note that tables for element types such as continuations
143 /// that use more than one word of storage may store fewer
144 /// elements.
145 pub table_elements: usize,
146
147 /// Maximum number of linear memories per instance.
148 pub max_memories_per_module: u32,
149
150 /// Maximum byte size of a linear memory, must be smaller than
151 /// `memory_reservation` in `Tunables`.
152 pub max_memory_size: usize,
153
154 /// The total number of GC heaps in the pool, across all instances.
155 #[cfg(feature = "gc")]
156 pub total_gc_heaps: u32,
157 }
158
159 impl Default for InstanceLimits {
default() -> Self160 fn default() -> Self {
161 let total = if cfg!(target_pointer_width = "32") {
162 100
163 } else {
164 1000
165 };
166 // See doc comments for `wasmtime::PoolingAllocationConfig` for these
167 // default values
168 Self {
169 total_component_instances: total,
170 component_instance_size: 1 << 20, // 1 MiB
171 total_core_instances: total,
172 max_core_instances_per_component: u32::MAX,
173 max_memories_per_component: u32::MAX,
174 max_tables_per_component: u32::MAX,
175 total_memories: total,
176 total_tables: total,
177 #[cfg(feature = "async")]
178 total_stacks: total,
179 core_instance_size: 1 << 20, // 1 MiB
180 max_tables_per_module: 1,
181 // NB: in #8504 it was seen that a C# module in debug module can
182 // have 10k+ elements.
183 table_elements: 20_000,
184 max_memories_per_module: 1,
185 #[cfg(target_pointer_width = "64")]
186 max_memory_size: 1 << 32, // 4G,
187 #[cfg(target_pointer_width = "32")]
188 max_memory_size: 10 << 20, // 10 MiB
189 #[cfg(feature = "gc")]
190 total_gc_heaps: total,
191 }
192 }
193 }
194
195 /// Configuration options for the pooling instance allocator supplied at
196 /// construction.
197 #[derive(Copy, Clone, Debug)]
198 pub struct PoolingInstanceAllocatorConfig {
199 /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime`
200 pub max_unused_warm_slots: u32,
201 /// The target number of decommits to do per batch. This is not precise, as
202 /// we can queue up decommits at times when we aren't prepared to
203 /// immediately flush them, and so we may go over this target size
204 /// occasionally.
205 pub decommit_batch_size: usize,
206 /// The size, in bytes, of async stacks to allocate (not including the guard
207 /// page).
208 pub stack_size: usize,
209 /// The limits to apply to instances allocated within this allocator.
210 pub limits: InstanceLimits,
211 /// Whether or not async stacks are zeroed after use.
212 pub async_stack_zeroing: bool,
213 /// If async stack zeroing is enabled and the host platform is Linux this is
214 /// how much memory to zero out with `memset`.
215 ///
216 /// The rest of memory will be zeroed out with `madvise`.
217 #[cfg(feature = "async")]
218 pub async_stack_keep_resident: usize,
219 /// How much linear memory, in bytes, to keep resident after resetting for
220 /// use with the next instance. This much memory will be `memset` to zero
221 /// when a linear memory is deallocated.
222 ///
223 /// Memory exceeding this amount in the wasm linear memory will be released
224 /// with `madvise` back to the kernel.
225 ///
226 /// Only applicable on Linux.
227 pub linear_memory_keep_resident: usize,
228 /// Same as `linear_memory_keep_resident` but for tables.
229 pub table_keep_resident: usize,
230 /// Whether to enable memory protection keys.
231 pub memory_protection_keys: Enabled,
232 /// How many memory protection keys to allocate.
233 pub max_memory_protection_keys: usize,
234 /// Whether to enable PAGEMAP_SCAN on Linux.
235 pub pagemap_scan: Enabled,
236 }
237
238 impl Default for PoolingInstanceAllocatorConfig {
default() -> PoolingInstanceAllocatorConfig239 fn default() -> PoolingInstanceAllocatorConfig {
240 PoolingInstanceAllocatorConfig {
241 max_unused_warm_slots: 100,
242 decommit_batch_size: 1,
243 stack_size: 2 << 20,
244 limits: InstanceLimits::default(),
245 async_stack_zeroing: false,
246 #[cfg(feature = "async")]
247 async_stack_keep_resident: 0,
248 linear_memory_keep_resident: 0,
249 table_keep_resident: 0,
250 memory_protection_keys: Enabled::No,
251 max_memory_protection_keys: 16,
252 pagemap_scan: Enabled::No,
253 }
254 }
255 }
256
257 impl PoolingInstanceAllocatorConfig {
is_pagemap_scan_available() -> bool258 pub fn is_pagemap_scan_available() -> bool {
259 PageMap::new().is_some()
260 }
261 }
262
263 /// An error returned when the pooling allocator cannot allocate a table,
264 /// memory, etc... because the maximum number of concurrent allocations for that
265 /// entity has been reached.
266 #[derive(Debug)]
267 pub struct PoolConcurrencyLimitError {
268 limit: usize,
269 kind: Cow<'static, str>,
270 }
271
272 impl core::error::Error for PoolConcurrencyLimitError {}
273
274 impl Display for PoolConcurrencyLimitError {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 let limit = self.limit;
277 let kind = &self.kind;
278 write!(f, "maximum concurrent limit of {limit} for {kind} reached")
279 }
280 }
281
282 impl PoolConcurrencyLimitError {
new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self283 fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self {
284 Self {
285 limit,
286 kind: kind.into(),
287 }
288 }
289 }
290
291 /// Implements the pooling instance allocator.
292 ///
293 /// This allocator internally maintains pools of instances, memories, tables,
294 /// and stacks.
295 ///
296 /// Note: the resource pools are manually dropped so that the fault handler
297 /// terminates correctly.
298 #[derive(Debug)]
299 pub struct PoolingInstanceAllocator {
300 decommit_batch_size: usize,
301 limits: InstanceLimits,
302
303 // The number of live core module and component instances at any given
304 // time. Note that this can temporarily go over the configured limit. This
305 // doesn't mean we have actually overshot, but that we attempted to allocate
306 // a new instance and incremented the counter, we've seen (or are about to
307 // see) that the counter is beyond the configured threshold, and are going
308 // to decrement the counter and return an error but haven't done so yet. See
309 // the increment trait methods for more details.
310 live_core_instances: AtomicU64,
311 live_component_instances: AtomicU64,
312
313 decommit_queue: Mutex<DecommitQueue>,
314
315 memories: MemoryPool,
316 live_memories: AtomicUsize,
317
318 tables: TablePool,
319 live_tables: AtomicUsize,
320
321 #[cfg(feature = "gc")]
322 gc_heaps: GcHeapPool,
323 #[cfg(feature = "gc")]
324 live_gc_heaps: AtomicUsize,
325
326 #[cfg(feature = "async")]
327 stacks: StackPool,
328 #[cfg(feature = "async")]
329 live_stacks: AtomicUsize,
330
331 pagemap: Option<PageMap>,
332 }
333
334 impl Drop for PoolingInstanceAllocator {
drop(&mut self)335 fn drop(&mut self) {
336 if !cfg!(debug_assertions) {
337 return;
338 }
339
340 // NB: when cfg(not(debug_assertions)) it is okay that we don't flush
341 // the queue, as the sub-pools will unmap those ranges anyways, so
342 // there's no point in decommitting them. But we do need to flush the
343 // queue when debug assertions are enabled to make sure that all
344 // entities get returned to their associated sub-pools and we can
345 // differentiate between a leaking slot and an enqueued-for-decommit
346 // slot.
347 let queue = self.decommit_queue.lock().unwrap();
348 self.flush_decommit_queue(queue);
349
350 debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0);
351 debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0);
352 debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0);
353 debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0);
354
355 debug_assert!(self.memories.is_empty());
356 debug_assert!(self.tables.is_empty());
357
358 #[cfg(feature = "gc")]
359 {
360 debug_assert!(self.gc_heaps.is_empty());
361 debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0);
362 }
363
364 #[cfg(feature = "async")]
365 {
366 debug_assert!(self.stacks.is_empty());
367 debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0);
368 }
369 }
370 }
371
372 impl PoolingInstanceAllocator {
373 /// Creates a new pooling instance allocator with the given strategy and limits.
new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self>374 pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> {
375 Ok(Self {
376 decommit_batch_size: config.decommit_batch_size,
377 limits: config.limits,
378 live_component_instances: AtomicU64::new(0),
379 live_core_instances: AtomicU64::new(0),
380 decommit_queue: Mutex::new(DecommitQueue::default()),
381 memories: MemoryPool::new(config, tunables)?,
382 live_memories: AtomicUsize::new(0),
383 tables: TablePool::new(config)?,
384 live_tables: AtomicUsize::new(0),
385 #[cfg(feature = "gc")]
386 gc_heaps: GcHeapPool::new(config)?,
387 #[cfg(feature = "gc")]
388 live_gc_heaps: AtomicUsize::new(0),
389 #[cfg(feature = "async")]
390 stacks: StackPool::new(config)?,
391 #[cfg(feature = "async")]
392 live_stacks: AtomicUsize::new(0),
393 pagemap: match config.pagemap_scan {
394 Enabled::Auto => PageMap::new(),
395 Enabled::Yes => Some(PageMap::new().ok_or_else(|| {
396 format_err!(
397 "required to enable PAGEMAP_SCAN but this system \
398 does not support it"
399 )
400 })?),
401 Enabled::No => None,
402 },
403 })
404 }
405
core_instance_size(&self) -> usize406 fn core_instance_size(&self) -> usize {
407 round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>())
408 }
409
validate_table_plans(&self, module: &Module) -> Result<()>410 fn validate_table_plans(&self, module: &Module) -> Result<()> {
411 self.tables.validate(module)
412 }
413
validate_memory_plans(&self, module: &Module) -> Result<()>414 fn validate_memory_plans(&self, module: &Module) -> Result<()> {
415 self.memories.validate_memories(module)
416 }
417
validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()>418 fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> {
419 let layout = Instance::alloc_layout(offsets);
420 if layout.size() <= self.core_instance_size() {
421 return Ok(());
422 }
423
424 // If this `module` exceeds the allocation size allotted to it then an
425 // error will be reported here. The error of "required N bytes but
426 // cannot allocate that" is pretty opaque, however, because it's not
427 // clear what the breakdown of the N bytes are and what to optimize
428 // next. To help provide a better error message here some fancy-ish
429 // logic is done here to report the breakdown of the byte request into
430 // the largest portions and where it's coming from.
431 let mut message = format!(
432 "instance allocation for this module \
433 requires {} bytes which exceeds the configured maximum \
434 of {} bytes; breakdown of allocation requirement:\n\n",
435 layout.size(),
436 self.core_instance_size(),
437 );
438
439 let mut remaining = layout.size();
440 let mut push = |name: &str, bytes: usize| {
441 assert!(remaining >= bytes);
442 remaining -= bytes;
443
444 // If the `name` region is more than 5% of the allocation request
445 // then report it here, otherwise ignore it. We have less than 20
446 // fields so we're guaranteed that something should be reported, and
447 // otherwise it's not particularly interesting to learn about 5
448 // different fields that are all 8 or 0 bytes. Only try to report
449 // the "major" sources of bytes here.
450 if bytes > layout.size() / 20 {
451 message.push_str(&format!(
452 " * {:.02}% - {} bytes - {}\n",
453 ((bytes as f32) / (layout.size() as f32)) * 100.0,
454 bytes,
455 name,
456 ));
457 }
458 };
459
460 // The `Instance` itself requires some size allocated to it.
461 push("instance state management", mem::size_of::<Instance>());
462
463 // Afterwards the `VMContext`'s regions are why we're requesting bytes,
464 // so ask it for descriptions on each region's byte size.
465 for (desc, size) in offsets.region_sizes() {
466 push(desc, size as usize);
467 }
468
469 // double-check we accounted for all the bytes
470 assert_eq!(remaining, 0);
471
472 bail!("{message}")
473 }
474
475 #[cfg(feature = "component-model")]
validate_component_instance_size( &self, offsets: &VMComponentOffsets<HostPtr>, core_instances_aggregate_size: usize, ) -> Result<()>476 fn validate_component_instance_size(
477 &self,
478 offsets: &VMComponentOffsets<HostPtr>,
479 core_instances_aggregate_size: usize,
480 ) -> Result<()> {
481 let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap();
482 let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size);
483 if total_instance_size <= self.limits.component_instance_size {
484 return Ok(());
485 }
486
487 // TODO: Add context with detailed accounting of what makes up all the
488 // `VMComponentContext`'s space like we do for module instances.
489 bail!(
490 "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \
491 and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \
492 `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \
493 {core_instances_aggregate_size} bytes.",
494 self.limits.component_instance_size
495 )
496 }
497
flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool498 fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool {
499 // Take the queue out of the mutex and drop the lock, to minimize
500 // contention.
501 let queue = mem::take(&mut *locked_queue);
502 drop(locked_queue);
503 queue.flush(self)
504 }
505
506 /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try
507 /// flushing the decommit queue. If flushing the queue freed up slots, then
508 /// try running `f` again.
509 #[cfg(feature = "async")]
with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T>510 fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> {
511 f().or_else(|e| {
512 if e.is::<PoolConcurrencyLimitError>() {
513 let queue = self.decommit_queue.lock().unwrap();
514 if self.flush_decommit_queue(queue) {
515 return f();
516 }
517 }
518
519 Err(e)
520 })
521 }
522
merge_or_flush(&self, mut local_queue: DecommitQueue)523 fn merge_or_flush(&self, mut local_queue: DecommitQueue) {
524 match local_queue.raw_len() {
525 // If we didn't enqueue any regions for decommit, then we must have
526 // either memset the whole entity or eagerly remapped it to zero
527 // because we don't have linux's `madvise(DONTNEED)` semantics. In
528 // either case, the entity slot is ready for reuse immediately.
529 0 => {
530 local_queue.flush(self);
531 }
532
533 // We enqueued at least our batch size of regions for decommit, so
534 // flush the local queue immediately. Don't bother inspecting (or
535 // locking!) the shared queue.
536 n if n >= self.decommit_batch_size => {
537 local_queue.flush(self);
538 }
539
540 // If we enqueued some regions for decommit, but did not reach our
541 // batch size, so we don't want to flush it yet, then merge the
542 // local queue into the shared queue.
543 n => {
544 debug_assert!(n < self.decommit_batch_size);
545 let mut shared_queue = self.decommit_queue.lock().unwrap();
546 shared_queue.append(&mut local_queue);
547 // And if the shared queue now has at least as many regions
548 // enqueued for decommit as our batch size, then we can flush
549 // it.
550 if shared_queue.raw_len() >= self.decommit_batch_size {
551 self.flush_decommit_queue(shared_queue);
552 }
553 }
554 }
555 }
556 }
557
558 unsafe impl InstanceAllocator for PoolingInstanceAllocator {
559 #[cfg(feature = "component-model")]
validate_component<'a>( &self, component: &Component, offsets: &VMComponentOffsets<HostPtr>, get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module, ) -> Result<()>560 fn validate_component<'a>(
561 &self,
562 component: &Component,
563 offsets: &VMComponentOffsets<HostPtr>,
564 get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module,
565 ) -> Result<()> {
566 let mut num_core_instances = 0;
567 let mut num_memories = 0;
568 let mut num_tables = 0;
569 let mut core_instances_aggregate_size: usize = 0;
570 for init in &component.initializers {
571 use wasmtime_environ::component::GlobalInitializer::*;
572 use wasmtime_environ::component::InstantiateModule;
573 match init {
574 InstantiateModule(InstantiateModule::Import(_, _), _) => {
575 num_core_instances += 1;
576 // Can't statically account for the total vmctx size, number
577 // of memories, and number of tables in this component.
578 }
579 InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => {
580 let module = get_module(*static_module_index);
581 let offsets = VMOffsets::new(HostPtr, &module);
582 let layout = Instance::alloc_layout(&offsets);
583 self.validate_module(module, &offsets)?;
584 num_core_instances += 1;
585 num_memories += module.num_defined_memories();
586 num_tables += module.num_defined_tables();
587 core_instances_aggregate_size += layout.size();
588 }
589 LowerImport { .. }
590 | ExtractMemory(_)
591 | ExtractTable(_)
592 | ExtractRealloc(_)
593 | ExtractCallback(_)
594 | ExtractPostReturn(_)
595 | Resource(_) => {}
596 }
597 }
598
599 if num_core_instances
600 > usize::try_from(self.limits.max_core_instances_per_component).unwrap()
601 {
602 bail!(
603 "The component transitively contains {num_core_instances} core module instances, \
604 which exceeds the configured maximum of {} in the pooling allocator",
605 self.limits.max_core_instances_per_component
606 );
607 }
608
609 if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() {
610 bail!(
611 "The component transitively contains {num_memories} Wasm linear memories, which \
612 exceeds the configured maximum of {} in the pooling allocator",
613 self.limits.max_memories_per_component
614 );
615 }
616
617 if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() {
618 bail!(
619 "The component transitively contains {num_tables} tables, which exceeds the \
620 configured maximum of {} in the pooling allocator",
621 self.limits.max_tables_per_component
622 );
623 }
624
625 self.validate_component_instance_size(offsets, core_instances_aggregate_size)
626 .context("component instance size does not fit in pooling allocator requirements")?;
627
628 Ok(())
629 }
630
validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()>631 fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> {
632 self.validate_memory_plans(module)
633 .context("module memory does not fit in pooling allocator requirements")?;
634 self.validate_table_plans(module)
635 .context("module table does not fit in pooling allocator requirements")?;
636 self.validate_core_instance_size(offsets)
637 .context("module instance size does not fit in pooling allocator requirements")?;
638 Ok(())
639 }
640
641 #[cfg(feature = "gc")]
validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()>642 fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> {
643 self.memories.validate_memory(memory)
644 }
645
646 #[cfg(feature = "component-model")]
increment_component_instance_count(&self) -> Result<()>647 fn increment_component_instance_count(&self) -> Result<()> {
648 let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel);
649 if old_count >= u64::from(self.limits.total_component_instances) {
650 self.decrement_component_instance_count();
651 return Err(PoolConcurrencyLimitError::new(
652 usize::try_from(self.limits.total_component_instances).unwrap(),
653 "component instances",
654 )
655 .into());
656 }
657 Ok(())
658 }
659
660 #[cfg(feature = "component-model")]
decrement_component_instance_count(&self)661 fn decrement_component_instance_count(&self) {
662 self.live_component_instances.fetch_sub(1, Ordering::AcqRel);
663 }
664
increment_core_instance_count(&self) -> Result<()>665 fn increment_core_instance_count(&self) -> Result<()> {
666 let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel);
667 if old_count >= u64::from(self.limits.total_core_instances) {
668 self.decrement_core_instance_count();
669 return Err(PoolConcurrencyLimitError::new(
670 usize::try_from(self.limits.total_core_instances).unwrap(),
671 "core instances",
672 )
673 .into());
674 }
675 Ok(())
676 }
677
decrement_core_instance_count(&self)678 fn decrement_core_instance_count(&self) {
679 self.live_core_instances.fetch_sub(1, Ordering::AcqRel);
680 }
681
allocate_memory<'a, 'b: 'a, 'c: 'a>( &'a self, request: &'a mut InstanceAllocationRequest<'b, 'c>, ty: &'a wasmtime_environ::Memory, memory_index: Option<DefinedMemoryIndex>, ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>>682 fn allocate_memory<'a, 'b: 'a, 'c: 'a>(
683 &'a self,
684 request: &'a mut InstanceAllocationRequest<'b, 'c>,
685 ty: &'a wasmtime_environ::Memory,
686 memory_index: Option<DefinedMemoryIndex>,
687 ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>> {
688 crate::runtime::box_future(async move {
689 async {
690 // FIXME(rust-lang/rust#145127) this should ideally use a version of
691 // `with_flush_and_retry` but adapted for async closures instead of only
692 // sync closures. Right now that won't compile though so this is the
693 // manually expanded version of the method.
694 let e = match self.memories.allocate(request, ty, memory_index).await {
695 Ok(result) => return Ok(result),
696 Err(e) => e,
697 };
698
699 if e.is::<PoolConcurrencyLimitError>() {
700 let queue = self.decommit_queue.lock().unwrap();
701 if self.flush_decommit_queue(queue) {
702 return self.memories.allocate(request, ty, memory_index).await;
703 }
704 }
705
706 Err(e)
707 }
708 .await
709 .inspect(|_| {
710 self.live_memories.fetch_add(1, Ordering::Relaxed);
711 })
712 })
713 }
714
deallocate_memory( &self, _memory_index: Option<DefinedMemoryIndex>, allocation_index: MemoryAllocationIndex, memory: Memory, )715 unsafe fn deallocate_memory(
716 &self,
717 _memory_index: Option<DefinedMemoryIndex>,
718 allocation_index: MemoryAllocationIndex,
719 memory: Memory,
720 ) {
721 let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed);
722 debug_assert!(prev > 0);
723
724 // Reset the image slot. Depending on whether this is successful or not
725 // the `image` is preserved for future use. On success it's queued up to
726 // get deallocated later, and on failure the slot is deallocated
727 // immediately without preserving the image.
728 let mut image = memory.unwrap_static_image();
729 let mut queue = DecommitQueue::default();
730 let bytes_resident = image.clear_and_remain_ready(
731 self.pagemap.as_ref(),
732 self.memories.keep_resident,
733 |ptr, len| {
734 // SAFETY: the memory in `image` won't be used until this
735 // decommit queue is flushed, and by definition the memory is
736 // not in use when calling this function.
737 unsafe {
738 queue.push_raw(ptr, len);
739 }
740 },
741 );
742
743 match bytes_resident {
744 Ok(bytes_resident) => {
745 // SAFETY: this image is not in use and its memory regions were enqueued
746 // with `push_raw` above.
747 unsafe {
748 queue.push_memory(allocation_index, image, bytes_resident);
749 }
750 self.merge_or_flush(queue);
751 }
752 Err(e) => {
753 log::warn!("ignoring clear_and_remain_ready error {e}");
754 // SAFETY: `allocation_index` comes from this pool, as an unsafe
755 // contract of this function itself, and it's guaranteed to be no
756 // longer in use so safe to deallocate. The slot couldn't be
757 // preserved so it's dropped here.
758 //
759 // Note that at this point it's not clear how many bytes are
760 // resident in memory, so it's inevitably going to leave statistics
761 // a little off. Also note though that non-Linux platforms don't
762 // keep track of resident bytes anyway, and this path is only
763 // reachable on non-Linux platforms because Linux can't return an
764 // error.
765 unsafe {
766 self.memories.deallocate(allocation_index, None, 0);
767 }
768 }
769 }
770 }
771
allocate_table<'a, 'b: 'a, 'c: 'a>( &'a self, request: &'a mut InstanceAllocationRequest<'b, 'c>, ty: &'a wasmtime_environ::Table, _table_index: DefinedTableIndex, ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>772 fn allocate_table<'a, 'b: 'a, 'c: 'a>(
773 &'a self,
774 request: &'a mut InstanceAllocationRequest<'b, 'c>,
775 ty: &'a wasmtime_environ::Table,
776 _table_index: DefinedTableIndex,
777 ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>>
778 {
779 crate::runtime::box_future(async move {
780 async {
781 // FIXME: see `allocate_memory` above for comments about duplication
782 // with `with_flush_and_retry`.
783 let e = match self.tables.allocate(request, ty).await {
784 Ok(result) => return Ok(result),
785 Err(e) => e,
786 };
787
788 if e.is::<PoolConcurrencyLimitError>() {
789 let queue = self.decommit_queue.lock().unwrap();
790 if self.flush_decommit_queue(queue) {
791 return self.tables.allocate(request, ty).await;
792 }
793 }
794
795 Err(e)
796 }
797 .await
798 .inspect(|_| {
799 self.live_tables.fetch_add(1, Ordering::Relaxed);
800 })
801 })
802 }
803
deallocate_table( &self, _table_index: DefinedTableIndex, allocation_index: TableAllocationIndex, mut table: Table, )804 unsafe fn deallocate_table(
805 &self,
806 _table_index: DefinedTableIndex,
807 allocation_index: TableAllocationIndex,
808 mut table: Table,
809 ) {
810 let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed);
811 debug_assert!(prev > 0);
812
813 let mut queue = DecommitQueue::default();
814 // SAFETY: This table is no longer in use by the allocator when this
815 // method is called and additionally all image ranges are pushed with
816 // the understanding that the memory won't get used until the whole
817 // queue is flushed.
818 let bytes_resident = unsafe {
819 self.tables.reset_table_pages_to_zero(
820 self.pagemap.as_ref(),
821 allocation_index,
822 &mut table,
823 |ptr, len| {
824 queue.push_raw(ptr, len);
825 },
826 )
827 };
828
829 // SAFETY: the table has had all its memory regions enqueued above.
830 unsafe {
831 queue.push_table(allocation_index, table, bytes_resident);
832 }
833 self.merge_or_flush(queue);
834 }
835
836 #[cfg(feature = "async")]
allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack>837 fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> {
838 let ret = self.with_flush_and_retry(|| self.stacks.allocate())?;
839 self.live_stacks.fetch_add(1, Ordering::Relaxed);
840 Ok(ret)
841 }
842
843 #[cfg(feature = "async")]
deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack)844 unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) {
845 self.live_stacks.fetch_sub(1, Ordering::Relaxed);
846 let mut queue = DecommitQueue::default();
847 // SAFETY: the stack is no longer in use by definition when this
848 // function is called and memory ranges pushed here are otherwise no
849 // longer in use.
850 let bytes_resident = unsafe {
851 self.stacks
852 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len))
853 };
854 // SAFETY: this stack's memory regions were enqueued above.
855 unsafe {
856 queue.push_stack(stack, bytes_resident);
857 }
858 self.merge_or_flush(queue);
859 }
860
purge_module(&self, module: CompiledModuleId)861 fn purge_module(&self, module: CompiledModuleId) {
862 self.memories.purge_module(module);
863 }
864
next_available_pkey(&self) -> Option<ProtectionKey>865 fn next_available_pkey(&self) -> Option<ProtectionKey> {
866 self.memories.next_available_pkey()
867 }
868
restrict_to_pkey(&self, pkey: ProtectionKey)869 fn restrict_to_pkey(&self, pkey: ProtectionKey) {
870 mpk::allow(ProtectionMask::zero().or(pkey));
871 }
872
allow_all_pkeys(&self)873 fn allow_all_pkeys(&self) {
874 mpk::allow(ProtectionMask::all());
875 }
876
877 #[cfg(feature = "gc")]
allocate_gc_heap( &self, engine: &crate::Engine, gc_runtime: &dyn GcRuntime, memory_alloc_index: MemoryAllocationIndex, memory: Memory, ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)>878 fn allocate_gc_heap(
879 &self,
880 engine: &crate::Engine,
881 gc_runtime: &dyn GcRuntime,
882 memory_alloc_index: MemoryAllocationIndex,
883 memory: Memory,
884 ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> {
885 let ret = self
886 .gc_heaps
887 .allocate(engine, gc_runtime, memory_alloc_index, memory)?;
888 self.live_gc_heaps.fetch_add(1, Ordering::Relaxed);
889 Ok(ret)
890 }
891
892 #[cfg(feature = "gc")]
deallocate_gc_heap( &self, allocation_index: GcHeapAllocationIndex, gc_heap: Box<dyn GcHeap>, ) -> (MemoryAllocationIndex, Memory)893 fn deallocate_gc_heap(
894 &self,
895 allocation_index: GcHeapAllocationIndex,
896 gc_heap: Box<dyn GcHeap>,
897 ) -> (MemoryAllocationIndex, Memory) {
898 self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed);
899 self.gc_heaps.deallocate(allocation_index, gc_heap)
900 }
901
as_pooling(&self) -> Option<&PoolingInstanceAllocator>902 fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> {
903 Some(self)
904 }
905 }
906
907 #[cfg(test)]
908 #[cfg(target_pointer_width = "64")]
909 mod test {
910 use super::*;
911
912 #[test]
test_pooling_allocator_with_memory_pages_exceeded()913 fn test_pooling_allocator_with_memory_pages_exceeded() {
914 let config = PoolingInstanceAllocatorConfig {
915 limits: InstanceLimits {
916 total_memories: 1,
917 max_memory_size: 0x100010000,
918 ..Default::default()
919 },
920 ..PoolingInstanceAllocatorConfig::default()
921 };
922 assert_eq!(
923 PoolingInstanceAllocator::new(
924 &config,
925 &Tunables {
926 memory_reservation: 0x10000,
927 ..Tunables::default_host()
928 },
929 )
930 .map_err(|e| e.to_string())
931 .expect_err("expected a failure constructing instance allocator"),
932 "maximum memory size of 0x100010000 bytes exceeds the configured \
933 memory reservation of 0x10000 bytes"
934 );
935 }
936
937 #[cfg(all(
938 unix,
939 target_pointer_width = "64",
940 feature = "async",
941 not(miri),
942 not(asan)
943 ))]
944 #[test]
test_stack_zeroed() -> Result<()>945 fn test_stack_zeroed() -> Result<()> {
946 let config = PoolingInstanceAllocatorConfig {
947 max_unused_warm_slots: 0,
948 limits: InstanceLimits {
949 total_stacks: 1,
950 total_memories: 0,
951 total_tables: 0,
952 ..Default::default()
953 },
954 stack_size: 128,
955 async_stack_zeroing: true,
956 ..PoolingInstanceAllocatorConfig::default()
957 };
958 let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
959
960 unsafe {
961 for _ in 0..255 {
962 let stack = allocator.allocate_fiber_stack()?;
963
964 // The stack pointer is at the top, so decrement it first
965 let addr = stack.top().unwrap().sub(1);
966
967 assert_eq!(*addr, 0);
968 *addr = 1;
969
970 allocator.deallocate_fiber_stack(stack);
971 }
972 }
973
974 Ok(())
975 }
976
977 #[cfg(all(
978 unix,
979 target_pointer_width = "64",
980 feature = "async",
981 not(miri),
982 not(asan)
983 ))]
984 #[test]
test_stack_unzeroed() -> Result<()>985 fn test_stack_unzeroed() -> Result<()> {
986 let config = PoolingInstanceAllocatorConfig {
987 max_unused_warm_slots: 0,
988 limits: InstanceLimits {
989 total_stacks: 1,
990 total_memories: 0,
991 total_tables: 0,
992 ..Default::default()
993 },
994 stack_size: 128,
995 async_stack_zeroing: false,
996 ..PoolingInstanceAllocatorConfig::default()
997 };
998 let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?;
999
1000 unsafe {
1001 for i in 0..255 {
1002 let stack = allocator.allocate_fiber_stack()?;
1003
1004 // The stack pointer is at the top, so decrement it first
1005 let addr = stack.top().unwrap().sub(1);
1006
1007 assert_eq!(*addr, i);
1008 *addr = i + 1;
1009
1010 allocator.deallocate_fiber_stack(stack);
1011 }
1012 }
1013
1014 Ok(())
1015 }
1016 }
1017