1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <atomic> 12 #include <memory> 13 #include <utility> 14 #include "memory/allocator.h" 15 #include "memory/arena.h" 16 #include "port/likely.h" 17 #include "util/core_local.h" 18 #include "util/mutexlock.h" 19 #include "util/thread_local.h" 20 21 // Only generate field unused warning for padding array, or build under 22 // GCC 4.8.1 will fail. 23 #ifdef __clang__ 24 #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__)) 25 #else 26 #define ROCKSDB_FIELD_UNUSED 27 #endif // __clang__ 28 29 namespace ROCKSDB_NAMESPACE { 30 31 class Logger; 32 33 // ConcurrentArena wraps an Arena. It makes it thread safe using a fast 34 // inlined spinlock, and adds small per-core allocation caches to avoid 35 // contention for small allocations. To avoid any memory waste from the 36 // per-core shards, they are kept small, they are lazily instantiated 37 // only if ConcurrentArena actually notices concurrent use, and they 38 // adjust their size so that there is no fragmentation waste when the 39 // shard blocks are allocated from the underlying main arena. 40 class ConcurrentArena : public Allocator { 41 public: 42 // block_size and huge_page_size are the same as for Arena (and are 43 // in fact just passed to the constructor of arena_. The core-local 44 // shards compute their shard_block_size as a fraction of block_size 45 // that varies according to the hardware concurrency level. 46 explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize, 47 AllocTracker* tracker = nullptr, 48 size_t huge_page_size = 0); 49 Allocate(size_t bytes)50 char* Allocate(size_t bytes) override { 51 return AllocateImpl(bytes, false /*force_arena*/, 52 [=]() { return arena_.Allocate(bytes); }); 53 } 54 55 char* AllocateAligned(size_t bytes, size_t huge_page_size = 0, 56 Logger* logger = nullptr) override { 57 size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1; 58 assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) && 59 (rounded_up % sizeof(void*)) == 0); 60 61 return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() { 62 return arena_.AllocateAligned(rounded_up, huge_page_size, logger); 63 }); 64 } 65 ApproximateMemoryUsage()66 size_t ApproximateMemoryUsage() const { 67 std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock); 68 lock.lock(); 69 return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); 70 } 71 MemoryAllocatedBytes()72 size_t MemoryAllocatedBytes() const { 73 return memory_allocated_bytes_.load(std::memory_order_relaxed); 74 } 75 AllocatedAndUnused()76 size_t AllocatedAndUnused() const { 77 return arena_allocated_and_unused_.load(std::memory_order_relaxed) + 78 ShardAllocatedAndUnused(); 79 } 80 IrregularBlockNum()81 size_t IrregularBlockNum() const { 82 return irregular_block_num_.load(std::memory_order_relaxed); 83 } 84 BlockSize()85 size_t BlockSize() const override { return arena_.BlockSize(); } 86 87 private: 88 struct Shard { 89 char padding[40] ROCKSDB_FIELD_UNUSED; 90 mutable SpinMutex mutex; 91 char* free_begin_; 92 std::atomic<size_t> allocated_and_unused_; 93 ShardShard94 Shard() : free_begin_(nullptr), allocated_and_unused_(0) {} 95 }; 96 97 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL 98 static __thread size_t tls_cpuid; 99 #else 100 enum ZeroFirstEnum : size_t { tls_cpuid = 0 }; 101 #endif 102 103 char padding0[56] ROCKSDB_FIELD_UNUSED; 104 105 size_t shard_block_size_; 106 107 CoreLocalArray<Shard> shards_; 108 109 Arena arena_; 110 mutable SpinMutex arena_mutex_; 111 std::atomic<size_t> arena_allocated_and_unused_; 112 std::atomic<size_t> memory_allocated_bytes_; 113 std::atomic<size_t> irregular_block_num_; 114 115 char padding1[56] ROCKSDB_FIELD_UNUSED; 116 117 Shard* Repick(); 118 ShardAllocatedAndUnused()119 size_t ShardAllocatedAndUnused() const { 120 size_t total = 0; 121 for (size_t i = 0; i < shards_.Size(); ++i) { 122 total += shards_.AccessAtCore(i)->allocated_and_unused_.load( 123 std::memory_order_relaxed); 124 } 125 return total; 126 } 127 128 template <typename Func> AllocateImpl(size_t bytes,bool force_arena,const Func & func)129 char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { 130 size_t cpu; 131 132 // Go directly to the arena if the allocation is too large, or if 133 // we've never needed to Repick() and the arena mutex is available 134 // with no waiting. This keeps the fragmentation penalty of 135 // concurrency zero unless it might actually confer an advantage. 136 std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); 137 if (bytes > shard_block_size_ / 4 || force_arena || 138 ((cpu = tls_cpuid) == 0 && 139 !shards_.AccessAtCore(0)->allocated_and_unused_.load( 140 std::memory_order_relaxed) && 141 arena_lock.try_lock())) { 142 if (!arena_lock.owns_lock()) { 143 arena_lock.lock(); 144 } 145 auto rv = func(); 146 Fixup(); 147 return rv; 148 } 149 150 // pick a shard from which to allocate 151 Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1)); 152 if (!s->mutex.try_lock()) { 153 s = Repick(); 154 s->mutex.lock(); 155 } 156 std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock); 157 158 size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed); 159 if (avail < bytes) { 160 // reload 161 std::lock_guard<SpinMutex> reload_lock(arena_mutex_); 162 163 // If the arena's current block is within a factor of 2 of the right 164 // size, we adjust our request to avoid arena waste. 165 auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed); 166 assert(exact == arena_.AllocatedAndUnused()); 167 168 if (exact >= bytes && arena_.IsInInlineBlock()) { 169 // If we haven't exhausted arena's inline block yet, allocate from arena 170 // directly. This ensures that we'll do the first few small allocations 171 // without allocating any blocks. 172 // In particular this prevents empty memtables from using 173 // disproportionately large amount of memory: a memtable allocates on 174 // the order of 1 KB of memory when created; we wouldn't want to 175 // allocate a full arena block (typically a few megabytes) for that, 176 // especially if there are thousands of empty memtables. 177 auto rv = func(); 178 Fixup(); 179 return rv; 180 } 181 182 avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2 183 ? exact 184 : shard_block_size_; 185 s->free_begin_ = arena_.AllocateAligned(avail); 186 Fixup(); 187 } 188 s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed); 189 190 char* rv; 191 if ((bytes % sizeof(void*)) == 0) { 192 // aligned allocation from the beginning 193 rv = s->free_begin_; 194 s->free_begin_ += bytes; 195 } else { 196 // unaligned from the end 197 rv = s->free_begin_ + avail - bytes; 198 } 199 return rv; 200 } 201 Fixup()202 void Fixup() { 203 arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(), 204 std::memory_order_relaxed); 205 memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(), 206 std::memory_order_relaxed); 207 irregular_block_num_.store(arena_.IrregularBlockNum(), 208 std::memory_order_relaxed); 209 } 210 211 ConcurrentArena(const ConcurrentArena&) = delete; 212 ConcurrentArena& operator=(const ConcurrentArena&) = delete; 213 }; 214 215 } // namespace ROCKSDB_NAMESPACE 216