1 //! Implements the pooling instance allocator. 2 //! 3 //! The pooling instance allocator maps memory in advance and allocates 4 //! instances, memories, tables, and stacks from a pool of available resources. 5 //! Using the pooling instance allocator can speed up module instantiation when 6 //! modules can be constrained based on configurable limits 7 //! ([`InstanceLimits`]). Each new instance is stored in a "slot"; as instances 8 //! are allocated and freed, these slots are either filled or emptied: 9 //! 10 //! ```text 11 //! ┌──────┬──────┬──────┬──────┬──────┐ 12 //! │Slot 0│Slot 1│Slot 2│Slot 3│......│ 13 //! └──────┴──────┴──────┴──────┴──────┘ 14 //! ``` 15 //! 16 //! Each slot has a "slot ID"--an index into the pool. Slot IDs are handed out 17 //! by the [`index_allocator`] module. Note that each kind of pool-allocated 18 //! item is stored in its own separate pool: [`memory_pool`], [`table_pool`], 19 //! [`stack_pool`]. See those modules for more details. 20 21 mod decommit_queue; 22 mod index_allocator; 23 mod memory_pool; 24 mod metrics; 25 mod table_pool; 26 27 #[cfg(feature = "gc")] 28 mod gc_heap_pool; 29 30 #[cfg(all(feature = "async"))] 31 mod generic_stack_pool; 32 #[cfg(all(feature = "async", unix, not(miri)))] 33 mod unix_stack_pool; 34 35 #[cfg(all(feature = "async"))] 36 cfg_if::cfg_if! { 37 if #[cfg(all(unix, not(miri), not(asan)))] { 38 use unix_stack_pool as stack_pool; 39 } else { 40 use generic_stack_pool as stack_pool; 41 } 42 } 43 44 use self::decommit_queue::DecommitQueue; 45 use self::memory_pool::MemoryPool; 46 use self::table_pool::TablePool; 47 use super::{ 48 InstanceAllocationRequest, InstanceAllocator, MemoryAllocationIndex, TableAllocationIndex, 49 }; 50 use crate::Enabled; 51 use crate::prelude::*; 52 use crate::runtime::vm::{ 53 CompiledModuleId, Memory, Table, 54 instance::Instance, 55 mpk::{self, ProtectionKey, ProtectionMask}, 56 sys::vm::PageMap, 57 }; 58 use core::future::Future; 59 use core::pin::Pin; 60 use core::sync::atomic::AtomicUsize; 61 use std::borrow::Cow; 62 use std::fmt::Display; 63 use std::sync::{Mutex, MutexGuard}; 64 use std::{ 65 mem, 66 sync::atomic::{AtomicU64, Ordering}, 67 }; 68 use wasmtime_environ::{ 69 DefinedMemoryIndex, DefinedTableIndex, HostPtr, Module, Tunables, VMOffsets, 70 }; 71 72 pub use self::metrics::PoolingAllocatorMetrics; 73 74 #[cfg(feature = "gc")] 75 use super::GcHeapAllocationIndex; 76 #[cfg(feature = "gc")] 77 use crate::runtime::vm::{GcHeap, GcRuntime}; 78 #[cfg(feature = "gc")] 79 use gc_heap_pool::GcHeapPool; 80 81 #[cfg(feature = "async")] 82 use stack_pool::StackPool; 83 84 #[cfg(feature = "component-model")] 85 use wasmtime_environ::{ 86 StaticModuleIndex, 87 component::{Component, VMComponentOffsets}, 88 }; 89 90 fn round_up_to_pow2(n: usize, to: usize) -> usize { 91 debug_assert!(to > 0); 92 debug_assert!(to.is_power_of_two()); 93 (n + to - 1) & !(to - 1) 94 } 95 96 /// Instance-related limit configuration for pooling. 97 /// 98 /// More docs on this can be found at `wasmtime::PoolingAllocationConfig`. 99 #[derive(Debug, Copy, Clone)] 100 pub struct InstanceLimits { 101 /// The maximum number of component instances that may be allocated 102 /// concurrently. 103 pub total_component_instances: u32, 104 105 /// The maximum size of a component's `VMComponentContext`, including 106 /// the aggregate size of all its inner core modules' `VMContext` sizes. 107 pub component_instance_size: usize, 108 109 /// The maximum number of core module instances that may be allocated 110 /// concurrently. 111 pub total_core_instances: u32, 112 113 /// The maximum number of core module instances that a single component may 114 /// transitively contain. 115 pub max_core_instances_per_component: u32, 116 117 /// The maximum number of Wasm linear memories that a component may 118 /// transitively contain. 119 pub max_memories_per_component: u32, 120 121 /// The maximum number of tables that a component may transitively contain. 122 pub max_tables_per_component: u32, 123 124 /// The total number of linear memories in the pool, across all instances. 125 pub total_memories: u32, 126 127 /// The total number of tables in the pool, across all instances. 128 pub total_tables: u32, 129 130 /// The total number of async stacks in the pool, across all instances. 131 #[cfg(feature = "async")] 132 pub total_stacks: u32, 133 134 /// Maximum size of a core instance's `VMContext`. 135 pub core_instance_size: usize, 136 137 /// Maximum number of tables per instance. 138 pub max_tables_per_module: u32, 139 140 /// Maximum number of word-size elements per table. 141 /// 142 /// Note that tables for element types such as continuations 143 /// that use more than one word of storage may store fewer 144 /// elements. 145 pub table_elements: usize, 146 147 /// Maximum number of linear memories per instance. 148 pub max_memories_per_module: u32, 149 150 /// Maximum byte size of a linear memory, must be smaller than 151 /// `memory_reservation` in `Tunables`. 152 pub max_memory_size: usize, 153 154 /// The total number of GC heaps in the pool, across all instances. 155 #[cfg(feature = "gc")] 156 pub total_gc_heaps: u32, 157 } 158 159 impl Default for InstanceLimits { 160 fn default() -> Self { 161 let total = if cfg!(target_pointer_width = "32") { 162 100 163 } else { 164 1000 165 }; 166 // See doc comments for `wasmtime::PoolingAllocationConfig` for these 167 // default values 168 Self { 169 total_component_instances: total, 170 component_instance_size: 1 << 20, // 1 MiB 171 total_core_instances: total, 172 max_core_instances_per_component: u32::MAX, 173 max_memories_per_component: u32::MAX, 174 max_tables_per_component: u32::MAX, 175 total_memories: total, 176 total_tables: total, 177 #[cfg(feature = "async")] 178 total_stacks: total, 179 core_instance_size: 1 << 20, // 1 MiB 180 max_tables_per_module: 1, 181 // NB: in #8504 it was seen that a C# module in debug module can 182 // have 10k+ elements. 183 table_elements: 20_000, 184 max_memories_per_module: 1, 185 #[cfg(target_pointer_width = "64")] 186 max_memory_size: 1 << 32, // 4G, 187 #[cfg(target_pointer_width = "32")] 188 max_memory_size: 10 << 20, // 10 MiB 189 #[cfg(feature = "gc")] 190 total_gc_heaps: total, 191 } 192 } 193 } 194 195 /// Configuration options for the pooling instance allocator supplied at 196 /// construction. 197 #[derive(Copy, Clone, Debug)] 198 pub struct PoolingInstanceAllocatorConfig { 199 /// See `PoolingAllocatorConfig::max_unused_warm_slots` in `wasmtime` 200 pub max_unused_warm_slots: u32, 201 /// The target number of decommits to do per batch. This is not precise, as 202 /// we can queue up decommits at times when we aren't prepared to 203 /// immediately flush them, and so we may go over this target size 204 /// occasionally. 205 pub decommit_batch_size: usize, 206 /// The size, in bytes, of async stacks to allocate (not including the guard 207 /// page). 208 pub stack_size: usize, 209 /// The limits to apply to instances allocated within this allocator. 210 pub limits: InstanceLimits, 211 /// Whether or not async stacks are zeroed after use. 212 pub async_stack_zeroing: bool, 213 /// If async stack zeroing is enabled and the host platform is Linux this is 214 /// how much memory to zero out with `memset`. 215 /// 216 /// The rest of memory will be zeroed out with `madvise`. 217 #[cfg(feature = "async")] 218 pub async_stack_keep_resident: usize, 219 /// How much linear memory, in bytes, to keep resident after resetting for 220 /// use with the next instance. This much memory will be `memset` to zero 221 /// when a linear memory is deallocated. 222 /// 223 /// Memory exceeding this amount in the wasm linear memory will be released 224 /// with `madvise` back to the kernel. 225 /// 226 /// Only applicable on Linux. 227 pub linear_memory_keep_resident: usize, 228 /// Same as `linear_memory_keep_resident` but for tables. 229 pub table_keep_resident: usize, 230 /// Whether to enable memory protection keys. 231 pub memory_protection_keys: Enabled, 232 /// How many memory protection keys to allocate. 233 pub max_memory_protection_keys: usize, 234 /// Whether to enable PAGEMAP_SCAN on Linux. 235 pub pagemap_scan: Enabled, 236 } 237 238 impl Default for PoolingInstanceAllocatorConfig { 239 fn default() -> PoolingInstanceAllocatorConfig { 240 PoolingInstanceAllocatorConfig { 241 max_unused_warm_slots: 100, 242 decommit_batch_size: 1, 243 stack_size: 2 << 20, 244 limits: InstanceLimits::default(), 245 async_stack_zeroing: false, 246 #[cfg(feature = "async")] 247 async_stack_keep_resident: 0, 248 linear_memory_keep_resident: 0, 249 table_keep_resident: 0, 250 memory_protection_keys: Enabled::No, 251 max_memory_protection_keys: 16, 252 pagemap_scan: Enabled::No, 253 } 254 } 255 } 256 257 impl PoolingInstanceAllocatorConfig { 258 pub fn is_pagemap_scan_available() -> bool { 259 PageMap::new().is_some() 260 } 261 } 262 263 /// An error returned when the pooling allocator cannot allocate a table, 264 /// memory, etc... because the maximum number of concurrent allocations for that 265 /// entity has been reached. 266 #[derive(Debug)] 267 pub struct PoolConcurrencyLimitError { 268 limit: usize, 269 kind: Cow<'static, str>, 270 } 271 272 impl core::error::Error for PoolConcurrencyLimitError {} 273 274 impl Display for PoolConcurrencyLimitError { 275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 276 let limit = self.limit; 277 let kind = &self.kind; 278 write!(f, "maximum concurrent limit of {limit} for {kind} reached") 279 } 280 } 281 282 impl PoolConcurrencyLimitError { 283 fn new(limit: usize, kind: impl Into<Cow<'static, str>>) -> Self { 284 Self { 285 limit, 286 kind: kind.into(), 287 } 288 } 289 } 290 291 /// Implements the pooling instance allocator. 292 /// 293 /// This allocator internally maintains pools of instances, memories, tables, 294 /// and stacks. 295 /// 296 /// Note: the resource pools are manually dropped so that the fault handler 297 /// terminates correctly. 298 #[derive(Debug)] 299 pub struct PoolingInstanceAllocator { 300 decommit_batch_size: usize, 301 limits: InstanceLimits, 302 303 // The number of live core module and component instances at any given 304 // time. Note that this can temporarily go over the configured limit. This 305 // doesn't mean we have actually overshot, but that we attempted to allocate 306 // a new instance and incremented the counter, we've seen (or are about to 307 // see) that the counter is beyond the configured threshold, and are going 308 // to decrement the counter and return an error but haven't done so yet. See 309 // the increment trait methods for more details. 310 live_core_instances: AtomicU64, 311 live_component_instances: AtomicU64, 312 313 decommit_queue: Mutex<DecommitQueue>, 314 315 memories: MemoryPool, 316 live_memories: AtomicUsize, 317 318 tables: TablePool, 319 live_tables: AtomicUsize, 320 321 #[cfg(feature = "gc")] 322 gc_heaps: GcHeapPool, 323 #[cfg(feature = "gc")] 324 live_gc_heaps: AtomicUsize, 325 326 #[cfg(feature = "async")] 327 stacks: StackPool, 328 #[cfg(feature = "async")] 329 live_stacks: AtomicUsize, 330 331 pagemap: Option<PageMap>, 332 } 333 334 impl Drop for PoolingInstanceAllocator { 335 fn drop(&mut self) { 336 if !cfg!(debug_assertions) { 337 return; 338 } 339 340 // NB: when cfg(not(debug_assertions)) it is okay that we don't flush 341 // the queue, as the sub-pools will unmap those ranges anyways, so 342 // there's no point in decommitting them. But we do need to flush the 343 // queue when debug assertions are enabled to make sure that all 344 // entities get returned to their associated sub-pools and we can 345 // differentiate between a leaking slot and an enqueued-for-decommit 346 // slot. 347 let queue = self.decommit_queue.lock().unwrap(); 348 self.flush_decommit_queue(queue); 349 350 debug_assert_eq!(self.live_component_instances.load(Ordering::Acquire), 0); 351 debug_assert_eq!(self.live_core_instances.load(Ordering::Acquire), 0); 352 debug_assert_eq!(self.live_memories.load(Ordering::Acquire), 0); 353 debug_assert_eq!(self.live_tables.load(Ordering::Acquire), 0); 354 355 debug_assert!(self.memories.is_empty()); 356 debug_assert!(self.tables.is_empty()); 357 358 #[cfg(feature = "gc")] 359 { 360 debug_assert!(self.gc_heaps.is_empty()); 361 debug_assert_eq!(self.live_gc_heaps.load(Ordering::Acquire), 0); 362 } 363 364 #[cfg(feature = "async")] 365 { 366 debug_assert!(self.stacks.is_empty()); 367 debug_assert_eq!(self.live_stacks.load(Ordering::Acquire), 0); 368 } 369 } 370 } 371 372 impl PoolingInstanceAllocator { 373 /// Creates a new pooling instance allocator with the given strategy and limits. 374 pub fn new(config: &PoolingInstanceAllocatorConfig, tunables: &Tunables) -> Result<Self> { 375 Ok(Self { 376 decommit_batch_size: config.decommit_batch_size, 377 limits: config.limits, 378 live_component_instances: AtomicU64::new(0), 379 live_core_instances: AtomicU64::new(0), 380 decommit_queue: Mutex::new(DecommitQueue::default()), 381 memories: MemoryPool::new(config, tunables)?, 382 live_memories: AtomicUsize::new(0), 383 tables: TablePool::new(config)?, 384 live_tables: AtomicUsize::new(0), 385 #[cfg(feature = "gc")] 386 gc_heaps: GcHeapPool::new(config)?, 387 #[cfg(feature = "gc")] 388 live_gc_heaps: AtomicUsize::new(0), 389 #[cfg(feature = "async")] 390 stacks: StackPool::new(config)?, 391 #[cfg(feature = "async")] 392 live_stacks: AtomicUsize::new(0), 393 pagemap: match config.pagemap_scan { 394 Enabled::Auto => PageMap::new(), 395 Enabled::Yes => Some(PageMap::new().ok_or_else(|| { 396 format_err!( 397 "required to enable PAGEMAP_SCAN but this system \ 398 does not support it" 399 ) 400 })?), 401 Enabled::No => None, 402 }, 403 }) 404 } 405 406 fn core_instance_size(&self) -> usize { 407 round_up_to_pow2(self.limits.core_instance_size, mem::align_of::<Instance>()) 408 } 409 410 fn validate_table_plans(&self, module: &Module) -> Result<()> { 411 self.tables.validate(module) 412 } 413 414 fn validate_memory_plans(&self, module: &Module) -> Result<()> { 415 self.memories.validate_memories(module) 416 } 417 418 fn validate_core_instance_size(&self, offsets: &VMOffsets<HostPtr>) -> Result<()> { 419 let layout = Instance::alloc_layout(offsets); 420 if layout.size() <= self.core_instance_size() { 421 return Ok(()); 422 } 423 424 // If this `module` exceeds the allocation size allotted to it then an 425 // error will be reported here. The error of "required N bytes but 426 // cannot allocate that" is pretty opaque, however, because it's not 427 // clear what the breakdown of the N bytes are and what to optimize 428 // next. To help provide a better error message here some fancy-ish 429 // logic is done here to report the breakdown of the byte request into 430 // the largest portions and where it's coming from. 431 let mut message = format!( 432 "instance allocation for this module \ 433 requires {} bytes which exceeds the configured maximum \ 434 of {} bytes; breakdown of allocation requirement:\n\n", 435 layout.size(), 436 self.core_instance_size(), 437 ); 438 439 let mut remaining = layout.size(); 440 let mut push = |name: &str, bytes: usize| { 441 assert!(remaining >= bytes); 442 remaining -= bytes; 443 444 // If the `name` region is more than 5% of the allocation request 445 // then report it here, otherwise ignore it. We have less than 20 446 // fields so we're guaranteed that something should be reported, and 447 // otherwise it's not particularly interesting to learn about 5 448 // different fields that are all 8 or 0 bytes. Only try to report 449 // the "major" sources of bytes here. 450 if bytes > layout.size() / 20 { 451 message.push_str(&format!( 452 " * {:.02}% - {} bytes - {}\n", 453 ((bytes as f32) / (layout.size() as f32)) * 100.0, 454 bytes, 455 name, 456 )); 457 } 458 }; 459 460 // The `Instance` itself requires some size allocated to it. 461 push("instance state management", mem::size_of::<Instance>()); 462 463 // Afterwards the `VMContext`'s regions are why we're requesting bytes, 464 // so ask it for descriptions on each region's byte size. 465 for (desc, size) in offsets.region_sizes() { 466 push(desc, size as usize); 467 } 468 469 // double-check we accounted for all the bytes 470 assert_eq!(remaining, 0); 471 472 bail!("{message}") 473 } 474 475 #[cfg(feature = "component-model")] 476 fn validate_component_instance_size( 477 &self, 478 offsets: &VMComponentOffsets<HostPtr>, 479 core_instances_aggregate_size: usize, 480 ) -> Result<()> { 481 let vmcomponentctx_size = usize::try_from(offsets.size_of_vmctx()).unwrap(); 482 let total_instance_size = core_instances_aggregate_size.saturating_add(vmcomponentctx_size); 483 if total_instance_size <= self.limits.component_instance_size { 484 return Ok(()); 485 } 486 487 // TODO: Add context with detailed accounting of what makes up all the 488 // `VMComponentContext`'s space like we do for module instances. 489 bail!( 490 "instance allocation for this component requires {total_instance_size} bytes of `VMComponentContext` \ 491 and aggregated core instance runtime space which exceeds the configured maximum of {} bytes. \ 492 `VMComponentContext` used {vmcomponentctx_size} bytes, `core module instances` used \ 493 {core_instances_aggregate_size} bytes.", 494 self.limits.component_instance_size 495 ) 496 } 497 498 fn flush_decommit_queue(&self, mut locked_queue: MutexGuard<'_, DecommitQueue>) -> bool { 499 // Take the queue out of the mutex and drop the lock, to minimize 500 // contention. 501 let queue = mem::take(&mut *locked_queue); 502 drop(locked_queue); 503 queue.flush(self) 504 } 505 506 /// Execute `f` and if it returns `Err(PoolConcurrencyLimitError)`, then try 507 /// flushing the decommit queue. If flushing the queue freed up slots, then 508 /// try running `f` again. 509 #[cfg(feature = "async")] 510 fn with_flush_and_retry<T>(&self, mut f: impl FnMut() -> Result<T>) -> Result<T> { 511 f().or_else(|e| { 512 if e.is::<PoolConcurrencyLimitError>() { 513 let queue = self.decommit_queue.lock().unwrap(); 514 if self.flush_decommit_queue(queue) { 515 return f(); 516 } 517 } 518 519 Err(e) 520 }) 521 } 522 523 fn merge_or_flush(&self, mut local_queue: DecommitQueue) { 524 match local_queue.raw_len() { 525 // If we didn't enqueue any regions for decommit, then we must have 526 // either memset the whole entity or eagerly remapped it to zero 527 // because we don't have linux's `madvise(DONTNEED)` semantics. In 528 // either case, the entity slot is ready for reuse immediately. 529 0 => { 530 local_queue.flush(self); 531 } 532 533 // We enqueued at least our batch size of regions for decommit, so 534 // flush the local queue immediately. Don't bother inspecting (or 535 // locking!) the shared queue. 536 n if n >= self.decommit_batch_size => { 537 local_queue.flush(self); 538 } 539 540 // If we enqueued some regions for decommit, but did not reach our 541 // batch size, so we don't want to flush it yet, then merge the 542 // local queue into the shared queue. 543 n => { 544 debug_assert!(n < self.decommit_batch_size); 545 let mut shared_queue = self.decommit_queue.lock().unwrap(); 546 shared_queue.append(&mut local_queue); 547 // And if the shared queue now has at least as many regions 548 // enqueued for decommit as our batch size, then we can flush 549 // it. 550 if shared_queue.raw_len() >= self.decommit_batch_size { 551 self.flush_decommit_queue(shared_queue); 552 } 553 } 554 } 555 } 556 } 557 558 unsafe impl InstanceAllocator for PoolingInstanceAllocator { 559 #[cfg(feature = "component-model")] 560 fn validate_component<'a>( 561 &self, 562 component: &Component, 563 offsets: &VMComponentOffsets<HostPtr>, 564 get_module: &'a dyn Fn(StaticModuleIndex) -> &'a Module, 565 ) -> Result<()> { 566 let mut num_core_instances = 0; 567 let mut num_memories = 0; 568 let mut num_tables = 0; 569 let mut core_instances_aggregate_size: usize = 0; 570 for init in &component.initializers { 571 use wasmtime_environ::component::GlobalInitializer::*; 572 use wasmtime_environ::component::InstantiateModule; 573 match init { 574 InstantiateModule(InstantiateModule::Import(_, _), _) => { 575 num_core_instances += 1; 576 // Can't statically account for the total vmctx size, number 577 // of memories, and number of tables in this component. 578 } 579 InstantiateModule(InstantiateModule::Static(static_module_index, _), _) => { 580 let module = get_module(*static_module_index); 581 let offsets = VMOffsets::new(HostPtr, &module); 582 let layout = Instance::alloc_layout(&offsets); 583 self.validate_module(module, &offsets)?; 584 num_core_instances += 1; 585 num_memories += module.num_defined_memories(); 586 num_tables += module.num_defined_tables(); 587 core_instances_aggregate_size += layout.size(); 588 } 589 LowerImport { .. } 590 | ExtractMemory(_) 591 | ExtractTable(_) 592 | ExtractRealloc(_) 593 | ExtractCallback(_) 594 | ExtractPostReturn(_) 595 | Resource(_) => {} 596 } 597 } 598 599 if num_core_instances 600 > usize::try_from(self.limits.max_core_instances_per_component).unwrap() 601 { 602 bail!( 603 "The component transitively contains {num_core_instances} core module instances, \ 604 which exceeds the configured maximum of {} in the pooling allocator", 605 self.limits.max_core_instances_per_component 606 ); 607 } 608 609 if num_memories > usize::try_from(self.limits.max_memories_per_component).unwrap() { 610 bail!( 611 "The component transitively contains {num_memories} Wasm linear memories, which \ 612 exceeds the configured maximum of {} in the pooling allocator", 613 self.limits.max_memories_per_component 614 ); 615 } 616 617 if num_tables > usize::try_from(self.limits.max_tables_per_component).unwrap() { 618 bail!( 619 "The component transitively contains {num_tables} tables, which exceeds the \ 620 configured maximum of {} in the pooling allocator", 621 self.limits.max_tables_per_component 622 ); 623 } 624 625 self.validate_component_instance_size(offsets, core_instances_aggregate_size) 626 .context("component instance size does not fit in pooling allocator requirements")?; 627 628 Ok(()) 629 } 630 631 fn validate_module(&self, module: &Module, offsets: &VMOffsets<HostPtr>) -> Result<()> { 632 self.validate_memory_plans(module) 633 .context("module memory does not fit in pooling allocator requirements")?; 634 self.validate_table_plans(module) 635 .context("module table does not fit in pooling allocator requirements")?; 636 self.validate_core_instance_size(offsets) 637 .context("module instance size does not fit in pooling allocator requirements")?; 638 Ok(()) 639 } 640 641 #[cfg(feature = "gc")] 642 fn validate_memory(&self, memory: &wasmtime_environ::Memory) -> Result<()> { 643 self.memories.validate_memory(memory) 644 } 645 646 #[cfg(feature = "component-model")] 647 fn increment_component_instance_count(&self) -> Result<()> { 648 let old_count = self.live_component_instances.fetch_add(1, Ordering::AcqRel); 649 if old_count >= u64::from(self.limits.total_component_instances) { 650 self.decrement_component_instance_count(); 651 return Err(PoolConcurrencyLimitError::new( 652 usize::try_from(self.limits.total_component_instances).unwrap(), 653 "component instances", 654 ) 655 .into()); 656 } 657 Ok(()) 658 } 659 660 #[cfg(feature = "component-model")] 661 fn decrement_component_instance_count(&self) { 662 self.live_component_instances.fetch_sub(1, Ordering::AcqRel); 663 } 664 665 fn increment_core_instance_count(&self) -> Result<()> { 666 let old_count = self.live_core_instances.fetch_add(1, Ordering::AcqRel); 667 if old_count >= u64::from(self.limits.total_core_instances) { 668 self.decrement_core_instance_count(); 669 return Err(PoolConcurrencyLimitError::new( 670 usize::try_from(self.limits.total_core_instances).unwrap(), 671 "core instances", 672 ) 673 .into()); 674 } 675 Ok(()) 676 } 677 678 fn decrement_core_instance_count(&self) { 679 self.live_core_instances.fetch_sub(1, Ordering::AcqRel); 680 } 681 682 fn allocate_memory<'a, 'b: 'a, 'c: 'a>( 683 &'a self, 684 request: &'a mut InstanceAllocationRequest<'b, 'c>, 685 ty: &'a wasmtime_environ::Memory, 686 memory_index: Option<DefinedMemoryIndex>, 687 ) -> Pin<Box<dyn Future<Output = Result<(MemoryAllocationIndex, Memory)>> + Send + 'a>> { 688 crate::runtime::box_future(async move { 689 async { 690 // FIXME(rust-lang/rust#145127) this should ideally use a version of 691 // `with_flush_and_retry` but adapted for async closures instead of only 692 // sync closures. Right now that won't compile though so this is the 693 // manually expanded version of the method. 694 let e = match self.memories.allocate(request, ty, memory_index).await { 695 Ok(result) => return Ok(result), 696 Err(e) => e, 697 }; 698 699 if e.is::<PoolConcurrencyLimitError>() { 700 let queue = self.decommit_queue.lock().unwrap(); 701 if self.flush_decommit_queue(queue) { 702 return self.memories.allocate(request, ty, memory_index).await; 703 } 704 } 705 706 Err(e) 707 } 708 .await 709 .inspect(|_| { 710 self.live_memories.fetch_add(1, Ordering::Relaxed); 711 }) 712 }) 713 } 714 715 unsafe fn deallocate_memory( 716 &self, 717 _memory_index: Option<DefinedMemoryIndex>, 718 allocation_index: MemoryAllocationIndex, 719 memory: Memory, 720 ) { 721 let prev = self.live_memories.fetch_sub(1, Ordering::Relaxed); 722 debug_assert!(prev > 0); 723 724 // Reset the image slot. Depending on whether this is successful or not 725 // the `image` is preserved for future use. On success it's queued up to 726 // get deallocated later, and on failure the slot is deallocated 727 // immediately without preserving the image. 728 let mut image = memory.unwrap_static_image(); 729 let mut queue = DecommitQueue::default(); 730 let bytes_resident = image.clear_and_remain_ready( 731 self.pagemap.as_ref(), 732 self.memories.keep_resident, 733 |ptr, len| { 734 // SAFETY: the memory in `image` won't be used until this 735 // decommit queue is flushed, and by definition the memory is 736 // not in use when calling this function. 737 unsafe { 738 queue.push_raw(ptr, len); 739 } 740 }, 741 ); 742 743 match bytes_resident { 744 Ok(bytes_resident) => { 745 // SAFETY: this image is not in use and its memory regions were enqueued 746 // with `push_raw` above. 747 unsafe { 748 queue.push_memory(allocation_index, image, bytes_resident); 749 } 750 self.merge_or_flush(queue); 751 } 752 Err(e) => { 753 log::warn!("ignoring clear_and_remain_ready error {e}"); 754 // SAFETY: `allocation_index` comes from this pool, as an unsafe 755 // contract of this function itself, and it's guaranteed to be no 756 // longer in use so safe to deallocate. The slot couldn't be 757 // preserved so it's dropped here. 758 // 759 // Note that at this point it's not clear how many bytes are 760 // resident in memory, so it's inevitably going to leave statistics 761 // a little off. Also note though that non-Linux platforms don't 762 // keep track of resident bytes anyway, and this path is only 763 // reachable on non-Linux platforms because Linux can't return an 764 // error. 765 unsafe { 766 self.memories.deallocate(allocation_index, None, 0); 767 } 768 } 769 } 770 } 771 772 fn allocate_table<'a, 'b: 'a, 'c: 'a>( 773 &'a self, 774 request: &'a mut InstanceAllocationRequest<'b, 'c>, 775 ty: &'a wasmtime_environ::Table, 776 _table_index: DefinedTableIndex, 777 ) -> Pin<Box<dyn Future<Output = Result<(super::TableAllocationIndex, Table)>> + Send + 'a>> 778 { 779 crate::runtime::box_future(async move { 780 async { 781 // FIXME: see `allocate_memory` above for comments about duplication 782 // with `with_flush_and_retry`. 783 let e = match self.tables.allocate(request, ty).await { 784 Ok(result) => return Ok(result), 785 Err(e) => e, 786 }; 787 788 if e.is::<PoolConcurrencyLimitError>() { 789 let queue = self.decommit_queue.lock().unwrap(); 790 if self.flush_decommit_queue(queue) { 791 return self.tables.allocate(request, ty).await; 792 } 793 } 794 795 Err(e) 796 } 797 .await 798 .inspect(|_| { 799 self.live_tables.fetch_add(1, Ordering::Relaxed); 800 }) 801 }) 802 } 803 804 unsafe fn deallocate_table( 805 &self, 806 _table_index: DefinedTableIndex, 807 allocation_index: TableAllocationIndex, 808 mut table: Table, 809 ) { 810 let prev = self.live_tables.fetch_sub(1, Ordering::Relaxed); 811 debug_assert!(prev > 0); 812 813 let mut queue = DecommitQueue::default(); 814 // SAFETY: This table is no longer in use by the allocator when this 815 // method is called and additionally all image ranges are pushed with 816 // the understanding that the memory won't get used until the whole 817 // queue is flushed. 818 let bytes_resident = unsafe { 819 self.tables.reset_table_pages_to_zero( 820 self.pagemap.as_ref(), 821 allocation_index, 822 &mut table, 823 |ptr, len| { 824 queue.push_raw(ptr, len); 825 }, 826 ) 827 }; 828 829 // SAFETY: the table has had all its memory regions enqueued above. 830 unsafe { 831 queue.push_table(allocation_index, table, bytes_resident); 832 } 833 self.merge_or_flush(queue); 834 } 835 836 #[cfg(feature = "async")] 837 fn allocate_fiber_stack(&self) -> Result<wasmtime_fiber::FiberStack> { 838 let ret = self.with_flush_and_retry(|| self.stacks.allocate())?; 839 self.live_stacks.fetch_add(1, Ordering::Relaxed); 840 Ok(ret) 841 } 842 843 #[cfg(feature = "async")] 844 unsafe fn deallocate_fiber_stack(&self, mut stack: wasmtime_fiber::FiberStack) { 845 self.live_stacks.fetch_sub(1, Ordering::Relaxed); 846 let mut queue = DecommitQueue::default(); 847 // SAFETY: the stack is no longer in use by definition when this 848 // function is called and memory ranges pushed here are otherwise no 849 // longer in use. 850 let bytes_resident = unsafe { 851 self.stacks 852 .zero_stack(&mut stack, |ptr, len| queue.push_raw(ptr, len)) 853 }; 854 // SAFETY: this stack's memory regions were enqueued above. 855 unsafe { 856 queue.push_stack(stack, bytes_resident); 857 } 858 self.merge_or_flush(queue); 859 } 860 861 fn purge_module(&self, module: CompiledModuleId) { 862 self.memories.purge_module(module); 863 } 864 865 fn next_available_pkey(&self) -> Option<ProtectionKey> { 866 self.memories.next_available_pkey() 867 } 868 869 fn restrict_to_pkey(&self, pkey: ProtectionKey) { 870 mpk::allow(ProtectionMask::zero().or(pkey)); 871 } 872 873 fn allow_all_pkeys(&self) { 874 mpk::allow(ProtectionMask::all()); 875 } 876 877 #[cfg(feature = "gc")] 878 fn allocate_gc_heap( 879 &self, 880 engine: &crate::Engine, 881 gc_runtime: &dyn GcRuntime, 882 memory_alloc_index: MemoryAllocationIndex, 883 memory: Memory, 884 ) -> Result<(GcHeapAllocationIndex, Box<dyn GcHeap>)> { 885 let ret = self 886 .gc_heaps 887 .allocate(engine, gc_runtime, memory_alloc_index, memory)?; 888 self.live_gc_heaps.fetch_add(1, Ordering::Relaxed); 889 Ok(ret) 890 } 891 892 #[cfg(feature = "gc")] 893 fn deallocate_gc_heap( 894 &self, 895 allocation_index: GcHeapAllocationIndex, 896 gc_heap: Box<dyn GcHeap>, 897 ) -> (MemoryAllocationIndex, Memory) { 898 self.live_gc_heaps.fetch_sub(1, Ordering::Relaxed); 899 self.gc_heaps.deallocate(allocation_index, gc_heap) 900 } 901 902 fn as_pooling(&self) -> Option<&PoolingInstanceAllocator> { 903 Some(self) 904 } 905 } 906 907 #[cfg(test)] 908 #[cfg(target_pointer_width = "64")] 909 mod test { 910 use super::*; 911 912 #[test] 913 fn test_pooling_allocator_with_memory_pages_exceeded() { 914 let config = PoolingInstanceAllocatorConfig { 915 limits: InstanceLimits { 916 total_memories: 1, 917 max_memory_size: 0x100010000, 918 ..Default::default() 919 }, 920 ..PoolingInstanceAllocatorConfig::default() 921 }; 922 assert_eq!( 923 PoolingInstanceAllocator::new( 924 &config, 925 &Tunables { 926 memory_reservation: 0x10000, 927 ..Tunables::default_host() 928 }, 929 ) 930 .map_err(|e| e.to_string()) 931 .expect_err("expected a failure constructing instance allocator"), 932 "maximum memory size of 0x100010000 bytes exceeds the configured \ 933 memory reservation of 0x10000 bytes" 934 ); 935 } 936 937 #[cfg(all( 938 unix, 939 target_pointer_width = "64", 940 feature = "async", 941 not(miri), 942 not(asan) 943 ))] 944 #[test] 945 fn test_stack_zeroed() -> Result<()> { 946 let config = PoolingInstanceAllocatorConfig { 947 max_unused_warm_slots: 0, 948 limits: InstanceLimits { 949 total_stacks: 1, 950 total_memories: 0, 951 total_tables: 0, 952 ..Default::default() 953 }, 954 stack_size: 128, 955 async_stack_zeroing: true, 956 ..PoolingInstanceAllocatorConfig::default() 957 }; 958 let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?; 959 960 unsafe { 961 for _ in 0..255 { 962 let stack = allocator.allocate_fiber_stack()?; 963 964 // The stack pointer is at the top, so decrement it first 965 let addr = stack.top().unwrap().sub(1); 966 967 assert_eq!(*addr, 0); 968 *addr = 1; 969 970 allocator.deallocate_fiber_stack(stack); 971 } 972 } 973 974 Ok(()) 975 } 976 977 #[cfg(all( 978 unix, 979 target_pointer_width = "64", 980 feature = "async", 981 not(miri), 982 not(asan) 983 ))] 984 #[test] 985 fn test_stack_unzeroed() -> Result<()> { 986 let config = PoolingInstanceAllocatorConfig { 987 max_unused_warm_slots: 0, 988 limits: InstanceLimits { 989 total_stacks: 1, 990 total_memories: 0, 991 total_tables: 0, 992 ..Default::default() 993 }, 994 stack_size: 128, 995 async_stack_zeroing: false, 996 ..PoolingInstanceAllocatorConfig::default() 997 }; 998 let allocator = PoolingInstanceAllocator::new(&config, &Tunables::default_host())?; 999 1000 unsafe { 1001 for i in 0..255 { 1002 let stack = allocator.allocate_fiber_stack()?; 1003 1004 // The stack pointer is at the top, so decrement it first 1005 let addr = stack.top().unwrap().sub(1); 1006 1007 assert_eq!(*addr, i); 1008 *addr = i + 1; 1009 1010 allocator.deallocate_fiber_stack(stack); 1011 } 1012 } 1013 1014 Ok(()) 1015 } 1016 } 1017