1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <atomic>
12 #include <memory>
13 #include <utility>
14 #include "memory/allocator.h"
15 #include "memory/arena.h"
16 #include "port/likely.h"
17 #include "util/core_local.h"
18 #include "util/mutexlock.h"
19 #include "util/thread_local.h"
20 
21 // Only generate field unused warning for padding array, or build under
22 // GCC 4.8.1 will fail.
23 #ifdef __clang__
24 #define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
25 #else
26 #define ROCKSDB_FIELD_UNUSED
27 #endif  // __clang__
28 
29 namespace ROCKSDB_NAMESPACE {
30 
31 class Logger;
32 
33 // ConcurrentArena wraps an Arena.  It makes it thread safe using a fast
34 // inlined spinlock, and adds small per-core allocation caches to avoid
35 // contention for small allocations.  To avoid any memory waste from the
36 // per-core shards, they are kept small, they are lazily instantiated
37 // only if ConcurrentArena actually notices concurrent use, and they
38 // adjust their size so that there is no fragmentation waste when the
39 // shard blocks are allocated from the underlying main arena.
40 class ConcurrentArena : public Allocator {
41  public:
42   // block_size and huge_page_size are the same as for Arena (and are
43   // in fact just passed to the constructor of arena_.  The core-local
44   // shards compute their shard_block_size as a fraction of block_size
45   // that varies according to the hardware concurrency level.
46   explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
47                            AllocTracker* tracker = nullptr,
48                            size_t huge_page_size = 0);
49 
Allocate(size_t bytes)50   char* Allocate(size_t bytes) override {
51     return AllocateImpl(bytes, false /*force_arena*/,
52                         [=]() { return arena_.Allocate(bytes); });
53   }
54 
55   char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
56                         Logger* logger = nullptr) override {
57     size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
58     assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
59            (rounded_up % sizeof(void*)) == 0);
60 
61     return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
62       return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
63     });
64   }
65 
ApproximateMemoryUsage()66   size_t ApproximateMemoryUsage() const {
67     std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
68     lock.lock();
69     return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
70   }
71 
MemoryAllocatedBytes()72   size_t MemoryAllocatedBytes() const {
73     return memory_allocated_bytes_.load(std::memory_order_relaxed);
74   }
75 
AllocatedAndUnused()76   size_t AllocatedAndUnused() const {
77     return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
78            ShardAllocatedAndUnused();
79   }
80 
IrregularBlockNum()81   size_t IrregularBlockNum() const {
82     return irregular_block_num_.load(std::memory_order_relaxed);
83   }
84 
BlockSize()85   size_t BlockSize() const override { return arena_.BlockSize(); }
86 
87  private:
88   struct Shard {
89     char padding[40] ROCKSDB_FIELD_UNUSED;
90     mutable SpinMutex mutex;
91     char* free_begin_;
92     std::atomic<size_t> allocated_and_unused_;
93 
ShardShard94     Shard() : free_begin_(nullptr), allocated_and_unused_(0) {}
95   };
96 
97 #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
98   static __thread size_t tls_cpuid;
99 #else
100   enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
101 #endif
102 
103   char padding0[56] ROCKSDB_FIELD_UNUSED;
104 
105   size_t shard_block_size_;
106 
107   CoreLocalArray<Shard> shards_;
108 
109   Arena arena_;
110   mutable SpinMutex arena_mutex_;
111   std::atomic<size_t> arena_allocated_and_unused_;
112   std::atomic<size_t> memory_allocated_bytes_;
113   std::atomic<size_t> irregular_block_num_;
114 
115   char padding1[56] ROCKSDB_FIELD_UNUSED;
116 
117   Shard* Repick();
118 
ShardAllocatedAndUnused()119   size_t ShardAllocatedAndUnused() const {
120     size_t total = 0;
121     for (size_t i = 0; i < shards_.Size(); ++i) {
122       total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
123           std::memory_order_relaxed);
124     }
125     return total;
126   }
127 
128   template <typename Func>
AllocateImpl(size_t bytes,bool force_arena,const Func & func)129   char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
130     size_t cpu;
131 
132     // Go directly to the arena if the allocation is too large, or if
133     // we've never needed to Repick() and the arena mutex is available
134     // with no waiting.  This keeps the fragmentation penalty of
135     // concurrency zero unless it might actually confer an advantage.
136     std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
137     if (bytes > shard_block_size_ / 4 || force_arena ||
138         ((cpu = tls_cpuid) == 0 &&
139          !shards_.AccessAtCore(0)->allocated_and_unused_.load(
140              std::memory_order_relaxed) &&
141          arena_lock.try_lock())) {
142       if (!arena_lock.owns_lock()) {
143         arena_lock.lock();
144       }
145       auto rv = func();
146       Fixup();
147       return rv;
148     }
149 
150     // pick a shard from which to allocate
151     Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
152     if (!s->mutex.try_lock()) {
153       s = Repick();
154       s->mutex.lock();
155     }
156     std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
157 
158     size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
159     if (avail < bytes) {
160       // reload
161       std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
162 
163       // If the arena's current block is within a factor of 2 of the right
164       // size, we adjust our request to avoid arena waste.
165       auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
166       assert(exact == arena_.AllocatedAndUnused());
167 
168       if (exact >= bytes && arena_.IsInInlineBlock()) {
169         // If we haven't exhausted arena's inline block yet, allocate from arena
170         // directly. This ensures that we'll do the first few small allocations
171         // without allocating any blocks.
172         // In particular this prevents empty memtables from using
173         // disproportionately large amount of memory: a memtable allocates on
174         // the order of 1 KB of memory when created; we wouldn't want to
175         // allocate a full arena block (typically a few megabytes) for that,
176         // especially if there are thousands of empty memtables.
177         auto rv = func();
178         Fixup();
179         return rv;
180       }
181 
182       avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
183                   ? exact
184                   : shard_block_size_;
185       s->free_begin_ = arena_.AllocateAligned(avail);
186       Fixup();
187     }
188     s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
189 
190     char* rv;
191     if ((bytes % sizeof(void*)) == 0) {
192       // aligned allocation from the beginning
193       rv = s->free_begin_;
194       s->free_begin_ += bytes;
195     } else {
196       // unaligned from the end
197       rv = s->free_begin_ + avail - bytes;
198     }
199     return rv;
200   }
201 
Fixup()202   void Fixup() {
203     arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
204                                       std::memory_order_relaxed);
205     memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
206                                   std::memory_order_relaxed);
207     irregular_block_num_.store(arena_.IrregularBlockNum(),
208                                std::memory_order_relaxed);
209   }
210 
211   ConcurrentArena(const ConcurrentArena&) = delete;
212   ConcurrentArena& operator=(const ConcurrentArena&) = delete;
213 };
214 
215 }  // namespace ROCKSDB_NAMESPACE
216