1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 //
10 // WriteBufferManager is for managing memory allocation for one or more
11 // MemTables.
12 
13 #pragma once
14 
15 #include <atomic>
16 #include <cstddef>
17 #include "rocksdb/cache.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 class WriteBufferManager {
22  public:
23   // _buffer_size = 0 indicates no limit. Memory won't be capped.
24   // memory_usage() won't be valid and ShouldFlush() will always return true.
25   // if `cache` is provided, we'll put dummy entries in the cache and cost
26   // the memory allocated to the cache. It can be used even if _buffer_size = 0.
27   explicit WriteBufferManager(size_t _buffer_size,
28                               std::shared_ptr<Cache> cache = {});
29   // No copying allowed
30   WriteBufferManager(const WriteBufferManager&) = delete;
31   WriteBufferManager& operator=(const WriteBufferManager&) = delete;
32 
33   ~WriteBufferManager();
34 
enabled()35   bool enabled() const { return buffer_size_ != 0; }
36 
cost_to_cache()37   bool cost_to_cache() const { return cache_rep_ != nullptr; }
38 
39   // Only valid if enabled()
memory_usage()40   size_t memory_usage() const {
41     return memory_used_.load(std::memory_order_relaxed);
42   }
mutable_memtable_memory_usage()43   size_t mutable_memtable_memory_usage() const {
44     return memory_active_.load(std::memory_order_relaxed);
45   }
buffer_size()46   size_t buffer_size() const { return buffer_size_; }
47 
48   // Should only be called from write thread
ShouldFlush()49   bool ShouldFlush() const {
50     if (enabled()) {
51       if (mutable_memtable_memory_usage() > mutable_limit_) {
52         return true;
53       }
54       if (memory_usage() >= buffer_size_ &&
55           mutable_memtable_memory_usage() >= buffer_size_ / 2) {
56         // If the memory exceeds the buffer size, we trigger more aggressive
57         // flush. But if already more than half memory is being flushed,
58         // triggering more flush may not help. We will hold it instead.
59         return true;
60       }
61     }
62     return false;
63   }
64 
ReserveMem(size_t mem)65   void ReserveMem(size_t mem) {
66     if (cache_rep_ != nullptr) {
67       ReserveMemWithCache(mem);
68     } else if (enabled()) {
69       memory_used_.fetch_add(mem, std::memory_order_relaxed);
70     }
71     if (enabled()) {
72       memory_active_.fetch_add(mem, std::memory_order_relaxed);
73     }
74   }
75   // We are in the process of freeing `mem` bytes, so it is not considered
76   // when checking the soft limit.
ScheduleFreeMem(size_t mem)77   void ScheduleFreeMem(size_t mem) {
78     if (enabled()) {
79       memory_active_.fetch_sub(mem, std::memory_order_relaxed);
80     }
81   }
FreeMem(size_t mem)82   void FreeMem(size_t mem) {
83     if (cache_rep_ != nullptr) {
84       FreeMemWithCache(mem);
85     } else if (enabled()) {
86       memory_used_.fetch_sub(mem, std::memory_order_relaxed);
87     }
88   }
89 
90  private:
91   const size_t buffer_size_;
92   const size_t mutable_limit_;
93   std::atomic<size_t> memory_used_;
94   // Memory that hasn't been scheduled to free.
95   std::atomic<size_t> memory_active_;
96   struct CacheRep;
97   std::unique_ptr<CacheRep> cache_rep_;
98 
99   void ReserveMemWithCache(size_t mem);
100   void FreeMemWithCache(size_t mem);
101 };
102 }  // namespace ROCKSDB_NAMESPACE
103