1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 #include "db/memtable_list.h"
7 
8 #include <cinttypes>
9 #include <limits>
10 #include <queue>
11 #include <string>
12 #include "db/db_impl/db_impl.h"
13 #include "db/memtable.h"
14 #include "db/range_tombstone_fragmenter.h"
15 #include "db/version_set.h"
16 #include "logging/log_buffer.h"
17 #include "monitoring/thread_status_util.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/env.h"
20 #include "rocksdb/iterator.h"
21 #include "table/merging_iterator.h"
22 #include "test_util/sync_point.h"
23 #include "util/coding.h"
24 
25 namespace ROCKSDB_NAMESPACE {
26 
27 class InternalKeyComparator;
28 class Mutex;
29 class VersionSet;
30 
AddMemTable(MemTable * m)31 void MemTableListVersion::AddMemTable(MemTable* m) {
32   memlist_.push_front(m);
33   *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
34 }
35 
UnrefMemTable(autovector<MemTable * > * to_delete,MemTable * m)36 void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
37                                         MemTable* m) {
38   if (m->Unref()) {
39     to_delete->push_back(m);
40     assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
41     *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
42   }
43 }
44 
MemTableListVersion(size_t * parent_memtable_list_memory_usage,MemTableListVersion * old)45 MemTableListVersion::MemTableListVersion(
46     size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
47     : max_write_buffer_number_to_maintain_(
48           old->max_write_buffer_number_to_maintain_),
49       max_write_buffer_size_to_maintain_(
50           old->max_write_buffer_size_to_maintain_),
51       parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
52   if (old != nullptr) {
53     memlist_ = old->memlist_;
54     for (auto& m : memlist_) {
55       m->Ref();
56     }
57 
58     memlist_history_ = old->memlist_history_;
59     for (auto& m : memlist_history_) {
60       m->Ref();
61     }
62   }
63 }
64 
MemTableListVersion(size_t * parent_memtable_list_memory_usage,int max_write_buffer_number_to_maintain,int64_t max_write_buffer_size_to_maintain)65 MemTableListVersion::MemTableListVersion(
66     size_t* parent_memtable_list_memory_usage,
67     int max_write_buffer_number_to_maintain,
68     int64_t max_write_buffer_size_to_maintain)
69     : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
70       max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
71       parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
72 
Ref()73 void MemTableListVersion::Ref() { ++refs_; }
74 
75 // called by superversion::clean()
Unref(autovector<MemTable * > * to_delete)76 void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
77   assert(refs_ >= 1);
78   --refs_;
79   if (refs_ == 0) {
80     // if to_delete is equal to nullptr it means we're confident
81     // that refs_ will not be zero
82     assert(to_delete != nullptr);
83     for (const auto& m : memlist_) {
84       UnrefMemTable(to_delete, m);
85     }
86     for (const auto& m : memlist_history_) {
87       UnrefMemTable(to_delete, m);
88     }
89     delete this;
90   }
91 }
92 
NumNotFlushed() const93 int MemTableList::NumNotFlushed() const {
94   int size = static_cast<int>(current_->memlist_.size());
95   assert(num_flush_not_started_ <= size);
96   return size;
97 }
98 
NumFlushed() const99 int MemTableList::NumFlushed() const {
100   return static_cast<int>(current_->memlist_history_.size());
101 }
102 
103 // Search all the memtables starting from the most recent one.
104 // Return the most recent value found, if any.
105 // Operands stores the list of merge operations to apply, so far.
Get(const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)106 bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
107                               std::string* timestamp, Status* s,
108                               MergeContext* merge_context,
109                               SequenceNumber* max_covering_tombstone_seq,
110                               SequenceNumber* seq, const ReadOptions& read_opts,
111                               ReadCallback* callback, bool* is_blob_index) {
112   return GetFromList(&memlist_, key, value, timestamp, s, merge_context,
113                      max_covering_tombstone_seq, seq, read_opts, callback,
114                      is_blob_index);
115 }
116 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)117 void MemTableListVersion::MultiGet(const ReadOptions& read_options,
118                                    MultiGetRange* range, ReadCallback* callback,
119                                    bool* is_blob) {
120   for (auto memtable : memlist_) {
121     memtable->MultiGet(read_options, range, callback, is_blob);
122     if (range->empty()) {
123       return;
124     }
125   }
126 }
127 
GetMergeOperands(const LookupKey & key,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,const ReadOptions & read_opts)128 bool MemTableListVersion::GetMergeOperands(
129     const LookupKey& key, Status* s, MergeContext* merge_context,
130     SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
131   for (MemTable* memtable : memlist_) {
132     bool done = memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
133                               merge_context, max_covering_tombstone_seq,
134                               read_opts, nullptr, nullptr, false);
135     if (done) {
136       return true;
137     }
138   }
139   return false;
140 }
141 
GetFromHistory(const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,bool * is_blob_index)142 bool MemTableListVersion::GetFromHistory(
143     const LookupKey& key, std::string* value, std::string* timestamp, Status* s,
144     MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
145     SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
146   return GetFromList(&memlist_history_, key, value, timestamp, s, merge_context,
147                      max_covering_tombstone_seq, seq, read_opts,
148                      nullptr /*read_callback*/, is_blob_index);
149 }
150 
GetFromList(std::list<MemTable * > * list,const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)151 bool MemTableListVersion::GetFromList(
152     std::list<MemTable*>* list, const LookupKey& key, std::string* value,
153     std::string* timestamp, Status* s, MergeContext* merge_context,
154     SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
155     const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
156   *seq = kMaxSequenceNumber;
157 
158   for (auto& memtable : *list) {
159     SequenceNumber current_seq = kMaxSequenceNumber;
160 
161     bool done = memtable->Get(key, value, timestamp, s, merge_context,
162                               max_covering_tombstone_seq, &current_seq,
163                               read_opts, callback, is_blob_index);
164     if (*seq == kMaxSequenceNumber) {
165       // Store the most recent sequence number of any operation on this key.
166       // Since we only care about the most recent change, we only need to
167       // return the first operation found when searching memtables in
168       // reverse-chronological order.
169       // current_seq would be equal to kMaxSequenceNumber if the value was to be
170       // skipped. This allows seq to be assigned again when the next value is
171       // read.
172       *seq = current_seq;
173     }
174 
175     if (done) {
176       assert(*seq != kMaxSequenceNumber || s->IsNotFound());
177       return true;
178     }
179     if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
180       return false;
181     }
182   }
183   return false;
184 }
185 
AddRangeTombstoneIterators(const ReadOptions & read_opts,Arena *,RangeDelAggregator * range_del_agg)186 Status MemTableListVersion::AddRangeTombstoneIterators(
187     const ReadOptions& read_opts, Arena* /*arena*/,
188     RangeDelAggregator* range_del_agg) {
189   assert(range_del_agg != nullptr);
190   // Except for snapshot read, using kMaxSequenceNumber is OK because these
191   // are immutable memtables.
192   SequenceNumber read_seq = read_opts.snapshot != nullptr
193                                 ? read_opts.snapshot->GetSequenceNumber()
194                                 : kMaxSequenceNumber;
195   for (auto& m : memlist_) {
196     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
197         m->NewRangeTombstoneIterator(read_opts, read_seq));
198     range_del_agg->AddTombstones(std::move(range_del_iter));
199   }
200   return Status::OK();
201 }
202 
AddIterators(const ReadOptions & options,std::vector<InternalIterator * > * iterator_list,Arena * arena)203 void MemTableListVersion::AddIterators(
204     const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
205     Arena* arena) {
206   for (auto& m : memlist_) {
207     iterator_list->push_back(m->NewIterator(options, arena));
208   }
209 }
210 
AddIterators(const ReadOptions & options,MergeIteratorBuilder * merge_iter_builder)211 void MemTableListVersion::AddIterators(
212     const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
213   for (auto& m : memlist_) {
214     merge_iter_builder->AddIterator(
215         m->NewIterator(options, merge_iter_builder->GetArena()));
216   }
217 }
218 
GetTotalNumEntries() const219 uint64_t MemTableListVersion::GetTotalNumEntries() const {
220   uint64_t total_num = 0;
221   for (auto& m : memlist_) {
222     total_num += m->num_entries();
223   }
224   return total_num;
225 }
226 
ApproximateStats(const Slice & start_ikey,const Slice & end_ikey)227 MemTable::MemTableStats MemTableListVersion::ApproximateStats(
228     const Slice& start_ikey, const Slice& end_ikey) {
229   MemTable::MemTableStats total_stats = {0, 0};
230   for (auto& m : memlist_) {
231     auto mStats = m->ApproximateStats(start_ikey, end_ikey);
232     total_stats.size += mStats.size;
233     total_stats.count += mStats.count;
234   }
235   return total_stats;
236 }
237 
GetTotalNumDeletes() const238 uint64_t MemTableListVersion::GetTotalNumDeletes() const {
239   uint64_t total_num = 0;
240   for (auto& m : memlist_) {
241     total_num += m->num_deletes();
242   }
243   return total_num;
244 }
245 
GetEarliestSequenceNumber(bool include_history) const246 SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
247     bool include_history) const {
248   if (include_history && !memlist_history_.empty()) {
249     return memlist_history_.back()->GetEarliestSequenceNumber();
250   } else if (!memlist_.empty()) {
251     return memlist_.back()->GetEarliestSequenceNumber();
252   } else {
253     return kMaxSequenceNumber;
254   }
255 }
256 
257 // caller is responsible for referencing m
Add(MemTable * m,autovector<MemTable * > * to_delete)258 void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
259   assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
260   AddMemTable(m);
261 
262   TrimHistory(to_delete, m->ApproximateMemoryUsage());
263 }
264 
265 // Removes m from list of memtables not flushed.  Caller should NOT Unref m.
Remove(MemTable * m,autovector<MemTable * > * to_delete)266 void MemTableListVersion::Remove(MemTable* m,
267                                  autovector<MemTable*>* to_delete) {
268   assert(refs_ == 1);  // only when refs_ == 1 is MemTableListVersion mutable
269   memlist_.remove(m);
270 
271   m->MarkFlushed();
272   if (max_write_buffer_size_to_maintain_ > 0 ||
273       max_write_buffer_number_to_maintain_ > 0) {
274     memlist_history_.push_front(m);
275     // Unable to get size of mutable memtable at this point, pass 0 to
276     // TrimHistory as a best effort.
277     TrimHistory(to_delete, 0);
278   } else {
279     UnrefMemTable(to_delete, m);
280   }
281 }
282 
283 // return the total memory usage assuming the oldest flushed memtable is dropped
ApproximateMemoryUsageExcludingLast() const284 size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
285   size_t total_memtable_size = 0;
286   for (auto& memtable : memlist_) {
287     total_memtable_size += memtable->ApproximateMemoryUsage();
288   }
289   for (auto& memtable : memlist_history_) {
290     total_memtable_size += memtable->ApproximateMemoryUsage();
291   }
292   if (!memlist_history_.empty()) {
293     total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
294   }
295   return total_memtable_size;
296 }
297 
MemtableLimitExceeded(size_t usage)298 bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
299   if (max_write_buffer_size_to_maintain_ > 0) {
300     // calculate the total memory usage after dropping the oldest flushed
301     // memtable, compare with max_write_buffer_size_to_maintain_ to decide
302     // whether to trim history
303     return ApproximateMemoryUsageExcludingLast() + usage >=
304            static_cast<size_t>(max_write_buffer_size_to_maintain_);
305   } else if (max_write_buffer_number_to_maintain_ > 0) {
306     return memlist_.size() + memlist_history_.size() >
307            static_cast<size_t>(max_write_buffer_number_to_maintain_);
308   } else {
309     return false;
310   }
311 }
312 
313 // Make sure we don't use up too much space in history
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)314 void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
315                                       size_t usage) {
316   while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
317     MemTable* x = memlist_history_.back();
318     memlist_history_.pop_back();
319 
320     UnrefMemTable(to_delete, x);
321   }
322 }
323 
324 // Returns true if there is at least one memtable on which flush has
325 // not yet started.
IsFlushPending() const326 bool MemTableList::IsFlushPending() const {
327   if ((flush_requested_ && num_flush_not_started_ > 0) ||
328       (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
329     assert(imm_flush_needed.load(std::memory_order_relaxed));
330     return true;
331   }
332   return false;
333 }
334 
335 // Returns the memtables that need to be flushed.
PickMemtablesToFlush(const uint64_t * max_memtable_id,autovector<MemTable * > * ret)336 void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
337                                         autovector<MemTable*>* ret) {
338   AutoThreadOperationStageUpdater stage_updater(
339       ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
340   const auto& memlist = current_->memlist_;
341   bool atomic_flush = false;
342   for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
343     MemTable* m = *it;
344     if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
345       atomic_flush = true;
346     }
347     if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
348       break;
349     }
350     if (!m->flush_in_progress_) {
351       assert(!m->flush_completed_);
352       num_flush_not_started_--;
353       if (num_flush_not_started_ == 0) {
354         imm_flush_needed.store(false, std::memory_order_release);
355       }
356       m->flush_in_progress_ = true;  // flushing will start very soon
357       ret->push_back(m);
358     }
359   }
360   if (!atomic_flush || num_flush_not_started_ == 0) {
361     flush_requested_ = false;  // start-flush request is complete
362   }
363 }
364 
RollbackMemtableFlush(const autovector<MemTable * > & mems,uint64_t)365 void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
366                                          uint64_t /*file_number*/) {
367   AutoThreadOperationStageUpdater stage_updater(
368       ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
369   assert(!mems.empty());
370 
371   // If the flush was not successful, then just reset state.
372   // Maybe a succeeding attempt to flush will be successful.
373   for (MemTable* m : mems) {
374     assert(m->flush_in_progress_);
375     assert(m->file_number_ == 0);
376 
377     m->flush_in_progress_ = false;
378     m->flush_completed_ = false;
379     m->edit_.Clear();
380     num_flush_not_started_++;
381   }
382   imm_flush_needed.store(true, std::memory_order_release);
383 }
384 
385 // Try record a successful flush in the manifest file. It might just return
386 // Status::OK letting a concurrent flush to do actual the recording..
TryInstallMemtableFlushResults(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & mems,LogsWithPrepTracker * prep_tracker,VersionSet * vset,InstrumentedMutex * mu,uint64_t file_number,autovector<MemTable * > * to_delete,FSDirectory * db_directory,LogBuffer * log_buffer,std::list<std::unique_ptr<FlushJobInfo>> * committed_flush_jobs_info,IOStatus * io_s)387 Status MemTableList::TryInstallMemtableFlushResults(
388     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
389     const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
390     VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
391     autovector<MemTable*>* to_delete, FSDirectory* db_directory,
392     LogBuffer* log_buffer,
393     std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
394     IOStatus* io_s) {
395   AutoThreadOperationStageUpdater stage_updater(
396       ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
397   mu->AssertHeld();
398 
399   // Flush was successful
400   // Record the status on the memtable object. Either this call or a call by a
401   // concurrent flush thread will read the status and write it to manifest.
402   for (size_t i = 0; i < mems.size(); ++i) {
403     // All the edits are associated with the first memtable of this batch.
404     assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
405 
406     mems[i]->flush_completed_ = true;
407     mems[i]->file_number_ = file_number;
408   }
409 
410   // if some other thread is already committing, then return
411   Status s;
412   if (commit_in_progress_) {
413     TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
414     return s;
415   }
416 
417   // Only a single thread can be executing this piece of code
418   commit_in_progress_ = true;
419 
420   // Retry until all completed flushes are committed. New flushes can finish
421   // while the current thread is writing manifest where mutex is released.
422   while (s.ok()) {
423     auto& memlist = current_->memlist_;
424     // The back is the oldest; if flush_completed_ is not set to it, it means
425     // that we were assigned a more recent memtable. The memtables' flushes must
426     // be recorded in manifest in order. A concurrent flush thread, who is
427     // assigned to flush the oldest memtable, will later wake up and does all
428     // the pending writes to manifest, in order.
429     if (memlist.empty() || !memlist.back()->flush_completed_) {
430       break;
431     }
432     // scan all memtables from the earliest, and commit those
433     // (in that order) that have finished flushing. Memtables
434     // are always committed in the order that they were created.
435     uint64_t batch_file_number = 0;
436     size_t batch_count = 0;
437     autovector<VersionEdit*> edit_list;
438     autovector<MemTable*> memtables_to_flush;
439     // enumerate from the last (earliest) element to see how many batch finished
440     for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
441       MemTable* m = *it;
442       if (!m->flush_completed_) {
443         break;
444       }
445       if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
446         batch_file_number = m->file_number_;
447         ROCKS_LOG_BUFFER(log_buffer,
448                          "[%s] Level-0 commit table #%" PRIu64 " started",
449                          cfd->GetName().c_str(), m->file_number_);
450         edit_list.push_back(&m->edit_);
451         memtables_to_flush.push_back(m);
452 #ifndef ROCKSDB_LITE
453         std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
454         if (info != nullptr) {
455           committed_flush_jobs_info->push_back(std::move(info));
456         }
457 #else
458         (void)committed_flush_jobs_info;
459 #endif  // !ROCKSDB_LITE
460       }
461       batch_count++;
462     }
463 
464     // TODO(myabandeh): Not sure how batch_count could be 0 here.
465     if (batch_count > 0) {
466       if (vset->db_options()->allow_2pc) {
467         assert(edit_list.size() > 0);
468         // We piggyback the information of  earliest log file to keep in the
469         // manifest entry for the last file flushed.
470         edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
471             vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
472       }
473 
474       // this can release and reacquire the mutex.
475       vset->SetIOStatusOK();
476       s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
477                             db_directory);
478       *io_s = vset->io_status();
479 
480       // we will be changing the version in the next code path,
481       // so we better create a new one, since versions are immutable
482       InstallNewVersion();
483 
484       // All the later memtables that have the same filenum
485       // are part of the same batch. They can be committed now.
486       uint64_t mem_id = 1;  // how many memtables have been flushed.
487 
488       // commit new state only if the column family is NOT dropped.
489       // The reason is as follows (refer to
490       // ColumnFamilyTest.FlushAndDropRaceCondition).
491       // If the column family is dropped, then according to LogAndApply, its
492       // corresponding flush operation is NOT written to the MANIFEST. This
493       // means the DB is not aware of the L0 files generated from the flush.
494       // By committing the new state, we remove the memtable from the memtable
495       // list. Creating an iterator on this column family will not be able to
496       // read full data since the memtable is removed, and the DB is not aware
497       // of the L0 files, causing MergingIterator unable to build child
498       // iterators. RocksDB contract requires that the iterator can be created
499       // on a dropped column family, and we must be able to
500       // read full data as long as column family handle is not deleted, even if
501       // the column family is dropped.
502       if (s.ok() && !cfd->IsDropped()) {  // commit new state
503         while (batch_count-- > 0) {
504           MemTable* m = current_->memlist_.back();
505           ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
506                                        ": memtable #%" PRIu64 " done",
507                            cfd->GetName().c_str(), m->file_number_, mem_id);
508           assert(m->file_number_ > 0);
509           current_->Remove(m, to_delete);
510           UpdateCachedValuesFromMemTableListVersion();
511           ResetTrimHistoryNeeded();
512           ++mem_id;
513         }
514       } else {
515         for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
516           MemTable* m = *it;
517           // commit failed. setup state so that we can flush again.
518           ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
519                                        ": memtable #%" PRIu64 " failed",
520                            m->file_number_, mem_id);
521           m->flush_completed_ = false;
522           m->flush_in_progress_ = false;
523           m->edit_.Clear();
524           num_flush_not_started_++;
525           m->file_number_ = 0;
526           imm_flush_needed.store(true, std::memory_order_release);
527           ++mem_id;
528         }
529       }
530     }
531   }
532   commit_in_progress_ = false;
533   return s;
534 }
535 
536 // New memtables are inserted at the front of the list.
Add(MemTable * m,autovector<MemTable * > * to_delete)537 void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
538   assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
539   InstallNewVersion();
540   // this method is used to move mutable memtable into an immutable list.
541   // since mutable memtable is already refcounted by the DBImpl,
542   // and when moving to the imutable list we don't unref it,
543   // we don't have to ref the memtable here. we just take over the
544   // reference from the DBImpl.
545   current_->Add(m, to_delete);
546   m->MarkImmutable();
547   num_flush_not_started_++;
548   if (num_flush_not_started_ == 1) {
549     imm_flush_needed.store(true, std::memory_order_release);
550   }
551   UpdateCachedValuesFromMemTableListVersion();
552   ResetTrimHistoryNeeded();
553 }
554 
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)555 void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
556   InstallNewVersion();
557   current_->TrimHistory(to_delete, usage);
558   UpdateCachedValuesFromMemTableListVersion();
559   ResetTrimHistoryNeeded();
560 }
561 
562 // Returns an estimate of the number of bytes of data in use.
ApproximateUnflushedMemTablesMemoryUsage()563 size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
564   size_t total_size = 0;
565   for (auto& memtable : current_->memlist_) {
566     total_size += memtable->ApproximateMemoryUsage();
567   }
568   return total_size;
569 }
570 
ApproximateMemoryUsage()571 size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
572 
ApproximateMemoryUsageExcludingLast() const573 size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
574   const size_t usage =
575       current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
576   return usage;
577 }
578 
HasHistory() const579 bool MemTableList::HasHistory() const {
580   const bool has_history = current_has_history_.load(std::memory_order_relaxed);
581   return has_history;
582 }
583 
UpdateCachedValuesFromMemTableListVersion()584 void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
585   const size_t total_memtable_size =
586       current_->ApproximateMemoryUsageExcludingLast();
587   current_memory_usage_excluding_last_.store(total_memtable_size,
588                                              std::memory_order_relaxed);
589 
590   const bool has_history = current_->HasHistory();
591   current_has_history_.store(has_history, std::memory_order_relaxed);
592 }
593 
ApproximateOldestKeyTime() const594 uint64_t MemTableList::ApproximateOldestKeyTime() const {
595   if (!current_->memlist_.empty()) {
596     return current_->memlist_.back()->ApproximateOldestKeyTime();
597   }
598   return std::numeric_limits<uint64_t>::max();
599 }
600 
InstallNewVersion()601 void MemTableList::InstallNewVersion() {
602   if (current_->refs_ == 1) {
603     // we're the only one using the version, just keep using it
604   } else {
605     // somebody else holds the current version, we need to create new one
606     MemTableListVersion* version = current_;
607     current_ = new MemTableListVersion(&current_memory_usage_, current_);
608     current_->Ref();
609     version->Unref();
610   }
611 }
612 
PrecomputeMinLogContainingPrepSection(const autovector<MemTable * > & memtables_to_flush)613 uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
614     const autovector<MemTable*>& memtables_to_flush) {
615   uint64_t min_log = 0;
616 
617   for (auto& m : current_->memlist_) {
618     // Assume the list is very short, we can live with O(m*n). We can optimize
619     // if the performance has some problem.
620     bool should_skip = false;
621     for (MemTable* m_to_flush : memtables_to_flush) {
622       if (m == m_to_flush) {
623         should_skip = true;
624         break;
625       }
626     }
627     if (should_skip) {
628       continue;
629     }
630 
631     auto log = m->GetMinLogContainingPrepSection();
632 
633     if (log > 0 && (min_log == 0 || log < min_log)) {
634       min_log = log;
635     }
636   }
637 
638   return min_log;
639 }
640 
641 // Commit a successful atomic flush in the manifest file.
InstallMemtableAtomicFlushResults(const autovector<MemTableList * > * imm_lists,const autovector<ColumnFamilyData * > & cfds,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,VersionSet * vset,InstrumentedMutex * mu,const autovector<FileMetaData * > & file_metas,autovector<MemTable * > * to_delete,FSDirectory * db_directory,LogBuffer * log_buffer)642 Status InstallMemtableAtomicFlushResults(
643     const autovector<MemTableList*>* imm_lists,
644     const autovector<ColumnFamilyData*>& cfds,
645     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
646     const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
647     InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
648     autovector<MemTable*>* to_delete, FSDirectory* db_directory,
649     LogBuffer* log_buffer) {
650   AutoThreadOperationStageUpdater stage_updater(
651       ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
652   mu->AssertHeld();
653 
654   size_t num = mems_list.size();
655   assert(cfds.size() == num);
656   if (imm_lists != nullptr) {
657     assert(imm_lists->size() == num);
658   }
659   for (size_t k = 0; k != num; ++k) {
660 #ifndef NDEBUG
661     const auto* imm =
662         (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
663     if (!mems_list[k]->empty()) {
664       assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
665     }
666 #endif
667     assert(nullptr != file_metas[k]);
668     for (size_t i = 0; i != mems_list[k]->size(); ++i) {
669       assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
670       (*mems_list[k])[i]->SetFlushCompleted(true);
671       (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
672     }
673   }
674 
675   Status s;
676 
677   autovector<autovector<VersionEdit*>> edit_lists;
678   uint32_t num_entries = 0;
679   for (const auto mems : mems_list) {
680     assert(mems != nullptr);
681     autovector<VersionEdit*> edits;
682     assert(!mems->empty());
683     edits.emplace_back((*mems)[0]->GetEdits());
684     ++num_entries;
685     edit_lists.emplace_back(edits);
686   }
687   // Mark the version edits as an atomic group if the number of version edits
688   // exceeds 1.
689   if (cfds.size() > 1) {
690     for (auto& edits : edit_lists) {
691       assert(edits.size() == 1);
692       edits[0]->MarkAtomicGroup(--num_entries);
693     }
694     assert(0 == num_entries);
695   }
696 
697   // this can release and reacquire the mutex.
698   s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
699                         db_directory);
700 
701   for (size_t k = 0; k != cfds.size(); ++k) {
702     auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
703     imm->InstallNewVersion();
704   }
705 
706   if (s.ok() || s.IsColumnFamilyDropped()) {
707     for (size_t i = 0; i != cfds.size(); ++i) {
708       if (cfds[i]->IsDropped()) {
709         continue;
710       }
711       auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
712       for (auto m : *mems_list[i]) {
713         assert(m->GetFileNumber() > 0);
714         uint64_t mem_id = m->GetID();
715         ROCKS_LOG_BUFFER(log_buffer,
716                          "[%s] Level-0 commit table #%" PRIu64
717                          ": memtable #%" PRIu64 " done",
718                          cfds[i]->GetName().c_str(), m->GetFileNumber(),
719                          mem_id);
720         imm->current_->Remove(m, to_delete);
721         imm->UpdateCachedValuesFromMemTableListVersion();
722         imm->ResetTrimHistoryNeeded();
723       }
724     }
725   } else {
726     for (size_t i = 0; i != cfds.size(); ++i) {
727       auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
728       for (auto m : *mems_list[i]) {
729         uint64_t mem_id = m->GetID();
730         ROCKS_LOG_BUFFER(log_buffer,
731                          "[%s] Level-0 commit table #%" PRIu64
732                          ": memtable #%" PRIu64 " failed",
733                          cfds[i]->GetName().c_str(), m->GetFileNumber(),
734                          mem_id);
735         m->SetFlushCompleted(false);
736         m->SetFlushInProgress(false);
737         m->GetEdits()->Clear();
738         m->SetFileNumber(0);
739         imm->num_flush_not_started_++;
740       }
741       imm->imm_flush_needed.store(true, std::memory_order_release);
742     }
743   }
744 
745   return s;
746 }
747 
RemoveOldMemTables(uint64_t log_number,autovector<MemTable * > * to_delete)748 void MemTableList::RemoveOldMemTables(uint64_t log_number,
749                                       autovector<MemTable*>* to_delete) {
750   assert(to_delete != nullptr);
751   InstallNewVersion();
752   auto& memlist = current_->memlist_;
753   autovector<MemTable*> old_memtables;
754   for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
755     MemTable* mem = *it;
756     if (mem->GetNextLogNumber() > log_number) {
757       break;
758     }
759     old_memtables.push_back(mem);
760   }
761 
762   for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
763     MemTable* mem = *it;
764     current_->Remove(mem, to_delete);
765     --num_flush_not_started_;
766     if (0 == num_flush_not_started_) {
767       imm_flush_needed.store(false, std::memory_order_release);
768     }
769   }
770 
771   UpdateCachedValuesFromMemTableListVersion();
772   ResetTrimHistoryNeeded();
773 }
774 
775 }  // namespace ROCKSDB_NAMESPACE
776