1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #pragma once
7 
8 #include <deque>
9 #include <limits>
10 #include <list>
11 #include <set>
12 #include <string>
13 #include <vector>
14 
15 #include "db/dbformat.h"
16 #include "db/logs_with_prep_tracker.h"
17 #include "db/memtable.h"
18 #include "db/range_del_aggregator.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/instrumented_mutex.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/iterator.h"
24 #include "rocksdb/options.h"
25 #include "rocksdb/types.h"
26 #include "util/autovector.h"
27 
28 namespace ROCKSDB_NAMESPACE {
29 
30 class ColumnFamilyData;
31 class InternalKeyComparator;
32 class InstrumentedMutex;
33 class MergeIteratorBuilder;
34 class MemTableList;
35 
36 struct FlushJobInfo;
37 
38 // keeps a list of immutable memtables in a vector. the list is immutable
39 // if refcount is bigger than one. It is used as a state for Get() and
40 // Iterator code paths
41 //
42 // This class is not thread-safe.  External synchronization is required
43 // (such as holding the db mutex or being on the write thread).
44 class MemTableListVersion {
45  public:
46   explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
47                                MemTableListVersion* old = nullptr);
48   explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage,
49                                int max_write_buffer_number_to_maintain,
50                                int64_t max_write_buffer_size_to_maintain);
51 
52   void Ref();
53   void Unref(autovector<MemTable*>* to_delete = nullptr);
54 
55   // Search all the memtables starting from the most recent one.
56   // Return the most recent value found, if any.
57   //
58   // If any operation was found for this key, its most recent sequence number
59   // will be stored in *seq on success (regardless of whether true/false is
60   // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
61   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
62            Status* s, MergeContext* merge_context,
63            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
64            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
65            bool* is_blob_index = nullptr);
66 
67   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
68            Status* s, MergeContext* merge_context,
69            SequenceNumber* max_covering_tombstone_seq,
70            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
71            bool* is_blob_index = nullptr) {
72     SequenceNumber seq;
73     return Get(key, value, timestamp, s, merge_context,
74                max_covering_tombstone_seq, &seq, read_opts, callback,
75                is_blob_index);
76   }
77 
78   void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
79                 ReadCallback* callback, bool* is_blob);
80 
81   // Returns all the merge operands corresponding to the key by searching all
82   // memtables starting from the most recent one.
83   bool GetMergeOperands(const LookupKey& key, Status* s,
84                         MergeContext* merge_context,
85                         SequenceNumber* max_covering_tombstone_seq,
86                         const ReadOptions& read_opts);
87 
88   // Similar to Get(), but searches the Memtable history of memtables that
89   // have already been flushed.  Should only be used from in-memory only
90   // queries (such as Transaction validation) as the history may contain
91   // writes that are also present in the SST files.
92   bool GetFromHistory(const LookupKey& key, std::string* value,
93                       std::string* timestamp, Status* s,
94                       MergeContext* merge_context,
95                       SequenceNumber* max_covering_tombstone_seq,
96                       SequenceNumber* seq, const ReadOptions& read_opts,
97                       bool* is_blob_index = nullptr);
98   bool GetFromHistory(const LookupKey& key, std::string* value,
99                       std::string* timestamp, Status* s,
100                       MergeContext* merge_context,
101                       SequenceNumber* max_covering_tombstone_seq,
102                       const ReadOptions& read_opts,
103                       bool* is_blob_index = nullptr) {
104     SequenceNumber seq;
105     return GetFromHistory(key, value, timestamp, s, merge_context,
106                           max_covering_tombstone_seq, &seq, read_opts,
107                           is_blob_index);
108   }
109 
110   Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
111                                     RangeDelAggregator* range_del_agg);
112 
113   void AddIterators(const ReadOptions& options,
114                     std::vector<InternalIterator*>* iterator_list,
115                     Arena* arena);
116 
117   void AddIterators(const ReadOptions& options,
118                     MergeIteratorBuilder* merge_iter_builder);
119 
120   uint64_t GetTotalNumEntries() const;
121 
122   uint64_t GetTotalNumDeletes() const;
123 
124   MemTable::MemTableStats ApproximateStats(const Slice& start_ikey,
125                                            const Slice& end_ikey);
126 
127   // Returns the value of MemTable::GetEarliestSequenceNumber() on the most
128   // recent MemTable in this list or kMaxSequenceNumber if the list is empty.
129   // If include_history=true, will also search Memtables in MemTableList
130   // History.
131   SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const;
132 
133  private:
134   friend class MemTableList;
135 
136   friend Status InstallMemtableAtomicFlushResults(
137       const autovector<MemTableList*>* imm_lists,
138       const autovector<ColumnFamilyData*>& cfds,
139       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
140       const autovector<const autovector<MemTable*>*>& mems_list,
141       VersionSet* vset, InstrumentedMutex* mu,
142       const autovector<FileMetaData*>& file_meta,
143       autovector<MemTable*>* to_delete, FSDirectory* db_directory,
144       LogBuffer* log_buffer);
145 
146   // REQUIRE: m is an immutable memtable
147   void Add(MemTable* m, autovector<MemTable*>* to_delete);
148   // REQUIRE: m is an immutable memtable
149   void Remove(MemTable* m, autovector<MemTable*>* to_delete);
150 
151   void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
152 
153   bool GetFromList(std::list<MemTable*>* list, const LookupKey& key,
154                    std::string* value, std::string* timestamp, Status* s,
155                    MergeContext* merge_context,
156                    SequenceNumber* max_covering_tombstone_seq,
157                    SequenceNumber* seq, const ReadOptions& read_opts,
158                    ReadCallback* callback = nullptr,
159                    bool* is_blob_index = nullptr);
160 
161   void AddMemTable(MemTable* m);
162 
163   void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m);
164 
165   // Calculate the total amount of memory used by memlist_ and memlist_history_
166   // excluding the last MemTable in memlist_history_. The reason for excluding
167   // the last MemTable is to see if dropping the last MemTable will keep total
168   // memory usage above or equal to max_write_buffer_size_to_maintain_
169   size_t ApproximateMemoryUsageExcludingLast() const;
170 
171   // Whether this version contains flushed memtables that are only kept around
172   // for transaction conflict checking.
HasHistory()173   bool HasHistory() const { return !memlist_history_.empty(); }
174 
175   bool MemtableLimitExceeded(size_t usage);
176 
177   // Immutable MemTables that have not yet been flushed.
178   std::list<MemTable*> memlist_;
179 
180   // MemTables that have already been flushed
181   // (used during Transaction validation)
182   std::list<MemTable*> memlist_history_;
183 
184   // Maximum number of MemTables to keep in memory (including both flushed
185   const int max_write_buffer_number_to_maintain_;
186   // Maximum size of MemTables to keep in memory (including both flushed
187   // and not-yet-flushed tables).
188   const int64_t max_write_buffer_size_to_maintain_;
189 
190   int refs_ = 0;
191 
192   size_t* parent_memtable_list_memory_usage_;
193 };
194 
195 // This class stores references to all the immutable memtables.
196 // The memtables are flushed to L0 as soon as possible and in
197 // any order. If there are more than one immutable memtable, their
198 // flushes can occur concurrently.  However, they are 'committed'
199 // to the manifest in FIFO order to maintain correctness and
200 // recoverability from a crash.
201 //
202 //
203 // Other than imm_flush_needed and imm_trim_needed, this class is not
204 // thread-safe and requires external synchronization (such as holding the db
205 // mutex or being on the write thread.)
206 class MemTableList {
207  public:
208   // A list of memtables.
MemTableList(int min_write_buffer_number_to_merge,int max_write_buffer_number_to_maintain,int64_t max_write_buffer_size_to_maintain)209   explicit MemTableList(int min_write_buffer_number_to_merge,
210                         int max_write_buffer_number_to_maintain,
211                         int64_t max_write_buffer_size_to_maintain)
212       : imm_flush_needed(false),
213         imm_trim_needed(false),
214         min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
215         current_(new MemTableListVersion(&current_memory_usage_,
216                                          max_write_buffer_number_to_maintain,
217                                          max_write_buffer_size_to_maintain)),
218         num_flush_not_started_(0),
219         commit_in_progress_(false),
220         flush_requested_(false),
221         current_memory_usage_(0),
222         current_memory_usage_excluding_last_(0),
223         current_has_history_(false) {
224     current_->Ref();
225   }
226 
227   // Should not delete MemTableList without making sure MemTableList::current()
228   // is Unref()'d.
~MemTableList()229   ~MemTableList() {}
230 
current()231   MemTableListVersion* current() const { return current_; }
232 
233   // so that background threads can detect non-nullptr pointer to
234   // determine whether there is anything more to start flushing.
235   std::atomic<bool> imm_flush_needed;
236 
237   std::atomic<bool> imm_trim_needed;
238 
239   // Returns the total number of memtables in the list that haven't yet
240   // been flushed and logged.
241   int NumNotFlushed() const;
242 
243   // Returns total number of memtables in the list that have been
244   // completely flushed and logged.
245   int NumFlushed() const;
246 
247   // Returns true if there is at least one memtable on which flush has
248   // not yet started.
249   bool IsFlushPending() const;
250 
251   // Returns the earliest memtables that needs to be flushed. The returned
252   // memtables are guaranteed to be in the ascending order of created time.
253   void PickMemtablesToFlush(const uint64_t* max_memtable_id,
254                             autovector<MemTable*>* mems);
255 
256   // Reset status of the given memtable list back to pending state so that
257   // they can get picked up again on the next round of flush.
258   void RollbackMemtableFlush(const autovector<MemTable*>& mems,
259                              uint64_t file_number);
260 
261   // Try commit a successful flush in the manifest file. It might just return
262   // Status::OK letting a concurrent flush to do the actual the recording.
263   Status TryInstallMemtableFlushResults(
264       ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
265       const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
266       VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
267       autovector<MemTable*>* to_delete, FSDirectory* db_directory,
268       LogBuffer* log_buffer,
269       std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
270       IOStatus* io_s);
271 
272   // New memtables are inserted at the front of the list.
273   // Takes ownership of the referenced held on *m by the caller of Add().
274   void Add(MemTable* m, autovector<MemTable*>* to_delete);
275 
276   // Returns an estimate of the number of bytes of data in use.
277   size_t ApproximateMemoryUsage();
278 
279   // Returns the cached current_memory_usage_excluding_last_ value.
280   size_t ApproximateMemoryUsageExcludingLast() const;
281 
282   // Returns the cached current_has_history_ value.
283   bool HasHistory() const;
284 
285   // Updates current_memory_usage_excluding_last_ and current_has_history_
286   // from MemTableListVersion. Must be called whenever InstallNewVersion is
287   // called.
288   void UpdateCachedValuesFromMemTableListVersion();
289 
290   // `usage` is the current size of the mutable Memtable. When
291   // max_write_buffer_size_to_maintain is used, total size of mutable and
292   // immutable memtables is checked against it to decide whether to trim
293   // memtable list.
294   void TrimHistory(autovector<MemTable*>* to_delete, size_t usage);
295 
296   // Returns an estimate of the number of bytes of data used by
297   // the unflushed mem-tables.
298   size_t ApproximateUnflushedMemTablesMemoryUsage();
299 
300   // Returns an estimate of the timestamp of the earliest key.
301   uint64_t ApproximateOldestKeyTime() const;
302 
303   // Request a flush of all existing memtables to storage.  This will
304   // cause future calls to IsFlushPending() to return true if this list is
305   // non-empty (regardless of the min_write_buffer_number_to_merge
306   // parameter). This flush request will persist until the next time
307   // PickMemtablesToFlush() is called.
FlushRequested()308   void FlushRequested() { flush_requested_ = true; }
309 
HasFlushRequested()310   bool HasFlushRequested() { return flush_requested_; }
311 
312   // Returns true if a trim history should be scheduled and the caller should
313   // be the one to schedule it
MarkTrimHistoryNeeded()314   bool MarkTrimHistoryNeeded() {
315     auto expected = false;
316     return imm_trim_needed.compare_exchange_strong(
317         expected, true, std::memory_order_relaxed, std::memory_order_relaxed);
318   }
319 
ResetTrimHistoryNeeded()320   void ResetTrimHistoryNeeded() {
321     auto expected = true;
322     imm_trim_needed.compare_exchange_strong(
323         expected, false, std::memory_order_relaxed, std::memory_order_relaxed);
324   }
325 
326   // Copying allowed
327   // MemTableList(const MemTableList&);
328   // void operator=(const MemTableList&);
329 
current_memory_usage()330   size_t* current_memory_usage() { return &current_memory_usage_; }
331 
332   // Returns the min log containing the prep section after memtables listsed in
333   // `memtables_to_flush` are flushed and their status is persisted in manifest.
334   uint64_t PrecomputeMinLogContainingPrepSection(
335       const autovector<MemTable*>& memtables_to_flush);
336 
GetEarliestMemTableID()337   uint64_t GetEarliestMemTableID() const {
338     auto& memlist = current_->memlist_;
339     if (memlist.empty()) {
340       return std::numeric_limits<uint64_t>::max();
341     }
342     return memlist.back()->GetID();
343   }
344 
GetLatestMemTableID()345   uint64_t GetLatestMemTableID() const {
346     auto& memlist = current_->memlist_;
347     if (memlist.empty()) {
348       return 0;
349     }
350     return memlist.front()->GetID();
351   }
352 
AssignAtomicFlushSeq(const SequenceNumber & seq)353   void AssignAtomicFlushSeq(const SequenceNumber& seq) {
354     const auto& memlist = current_->memlist_;
355     // Scan the memtable list from new to old
356     for (auto it = memlist.begin(); it != memlist.end(); ++it) {
357       MemTable* mem = *it;
358       if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
359         mem->atomic_flush_seqno_ = seq;
360       } else {
361         // Earlier memtables must have been assigned a atomic flush seq, no
362         // need to continue scan.
363         break;
364       }
365     }
366   }
367 
368   // Used only by DBImplSecondary during log replay.
369   // Remove memtables whose data were written before the WAL with log_number
370   // was created, i.e. mem->GetNextLogNumber() <= log_number. The memtables are
371   // not freed, but put into a vector for future deref and reclamation.
372   void RemoveOldMemTables(uint64_t log_number,
373                           autovector<MemTable*>* to_delete);
374 
375  private:
376   friend Status InstallMemtableAtomicFlushResults(
377       const autovector<MemTableList*>* imm_lists,
378       const autovector<ColumnFamilyData*>& cfds,
379       const autovector<const MutableCFOptions*>& mutable_cf_options_list,
380       const autovector<const autovector<MemTable*>*>& mems_list,
381       VersionSet* vset, InstrumentedMutex* mu,
382       const autovector<FileMetaData*>& file_meta,
383       autovector<MemTable*>* to_delete, FSDirectory* db_directory,
384       LogBuffer* log_buffer);
385 
386   // DB mutex held
387   void InstallNewVersion();
388 
389   const int min_write_buffer_number_to_merge_;
390 
391   MemTableListVersion* current_;
392 
393   // the number of elements that still need flushing
394   int num_flush_not_started_;
395 
396   // committing in progress
397   bool commit_in_progress_;
398 
399   // Requested a flush of memtables to storage. It's possible to request that
400   // a subset of memtables be flushed.
401   bool flush_requested_;
402 
403   // The current memory usage.
404   size_t current_memory_usage_;
405 
406   // Cached value of current_->ApproximateMemoryUsageExcludingLast().
407   std::atomic<size_t> current_memory_usage_excluding_last_;
408 
409   // Cached value of current_->HasHistory().
410   std::atomic<bool> current_has_history_;
411 };
412 
413 // Installs memtable atomic flush results.
414 // In most cases, imm_lists is nullptr, and the function simply uses the
415 // immutable memtable lists associated with the cfds. There are unit tests that
416 // installs flush results for external immutable memtable lists other than the
417 // cfds' own immutable memtable lists, e.g. MemTableLIstTest. In this case,
418 // imm_lists parameter is not nullptr.
419 extern Status InstallMemtableAtomicFlushResults(
420     const autovector<MemTableList*>* imm_lists,
421     const autovector<ColumnFamilyData*>& cfds,
422     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
423     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
424     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
425     autovector<MemTable*>* to_delete, FSDirectory* db_directory,
426     LogBuffer* log_buffer);
427 }  // namespace ROCKSDB_NAMESPACE
428