1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 #include "db/memtable_list.h"
7
8 #include <cinttypes>
9 #include <limits>
10 #include <queue>
11 #include <string>
12 #include "db/db_impl/db_impl.h"
13 #include "db/memtable.h"
14 #include "db/range_tombstone_fragmenter.h"
15 #include "db/version_set.h"
16 #include "logging/log_buffer.h"
17 #include "monitoring/thread_status_util.h"
18 #include "rocksdb/db.h"
19 #include "rocksdb/env.h"
20 #include "rocksdb/iterator.h"
21 #include "table/merging_iterator.h"
22 #include "test_util/sync_point.h"
23 #include "util/coding.h"
24
25 namespace ROCKSDB_NAMESPACE {
26
27 class InternalKeyComparator;
28 class Mutex;
29 class VersionSet;
30
AddMemTable(MemTable * m)31 void MemTableListVersion::AddMemTable(MemTable* m) {
32 memlist_.push_front(m);
33 *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage();
34 }
35
UnrefMemTable(autovector<MemTable * > * to_delete,MemTable * m)36 void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete,
37 MemTable* m) {
38 if (m->Unref()) {
39 to_delete->push_back(m);
40 assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage());
41 *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage();
42 }
43 }
44
MemTableListVersion(size_t * parent_memtable_list_memory_usage,MemTableListVersion * old)45 MemTableListVersion::MemTableListVersion(
46 size_t* parent_memtable_list_memory_usage, MemTableListVersion* old)
47 : max_write_buffer_number_to_maintain_(
48 old->max_write_buffer_number_to_maintain_),
49 max_write_buffer_size_to_maintain_(
50 old->max_write_buffer_size_to_maintain_),
51 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {
52 if (old != nullptr) {
53 memlist_ = old->memlist_;
54 for (auto& m : memlist_) {
55 m->Ref();
56 }
57
58 memlist_history_ = old->memlist_history_;
59 for (auto& m : memlist_history_) {
60 m->Ref();
61 }
62 }
63 }
64
MemTableListVersion(size_t * parent_memtable_list_memory_usage,int max_write_buffer_number_to_maintain,int64_t max_write_buffer_size_to_maintain)65 MemTableListVersion::MemTableListVersion(
66 size_t* parent_memtable_list_memory_usage,
67 int max_write_buffer_number_to_maintain,
68 int64_t max_write_buffer_size_to_maintain)
69 : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain),
70 max_write_buffer_size_to_maintain_(max_write_buffer_size_to_maintain),
71 parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {}
72
Ref()73 void MemTableListVersion::Ref() { ++refs_; }
74
75 // called by superversion::clean()
Unref(autovector<MemTable * > * to_delete)76 void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) {
77 assert(refs_ >= 1);
78 --refs_;
79 if (refs_ == 0) {
80 // if to_delete is equal to nullptr it means we're confident
81 // that refs_ will not be zero
82 assert(to_delete != nullptr);
83 for (const auto& m : memlist_) {
84 UnrefMemTable(to_delete, m);
85 }
86 for (const auto& m : memlist_history_) {
87 UnrefMemTable(to_delete, m);
88 }
89 delete this;
90 }
91 }
92
NumNotFlushed() const93 int MemTableList::NumNotFlushed() const {
94 int size = static_cast<int>(current_->memlist_.size());
95 assert(num_flush_not_started_ <= size);
96 return size;
97 }
98
NumFlushed() const99 int MemTableList::NumFlushed() const {
100 return static_cast<int>(current_->memlist_history_.size());
101 }
102
103 // Search all the memtables starting from the most recent one.
104 // Return the most recent value found, if any.
105 // Operands stores the list of merge operations to apply, so far.
Get(const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)106 bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
107 std::string* timestamp, Status* s,
108 MergeContext* merge_context,
109 SequenceNumber* max_covering_tombstone_seq,
110 SequenceNumber* seq, const ReadOptions& read_opts,
111 ReadCallback* callback, bool* is_blob_index) {
112 return GetFromList(&memlist_, key, value, timestamp, s, merge_context,
113 max_covering_tombstone_seq, seq, read_opts, callback,
114 is_blob_index);
115 }
116
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)117 void MemTableListVersion::MultiGet(const ReadOptions& read_options,
118 MultiGetRange* range, ReadCallback* callback,
119 bool* is_blob) {
120 for (auto memtable : memlist_) {
121 memtable->MultiGet(read_options, range, callback, is_blob);
122 if (range->empty()) {
123 return;
124 }
125 }
126 }
127
GetMergeOperands(const LookupKey & key,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,const ReadOptions & read_opts)128 bool MemTableListVersion::GetMergeOperands(
129 const LookupKey& key, Status* s, MergeContext* merge_context,
130 SequenceNumber* max_covering_tombstone_seq, const ReadOptions& read_opts) {
131 for (MemTable* memtable : memlist_) {
132 bool done = memtable->Get(key, /*value*/ nullptr, /*timestamp*/ nullptr, s,
133 merge_context, max_covering_tombstone_seq,
134 read_opts, nullptr, nullptr, false);
135 if (done) {
136 return true;
137 }
138 }
139 return false;
140 }
141
GetFromHistory(const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,bool * is_blob_index)142 bool MemTableListVersion::GetFromHistory(
143 const LookupKey& key, std::string* value, std::string* timestamp, Status* s,
144 MergeContext* merge_context, SequenceNumber* max_covering_tombstone_seq,
145 SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
146 return GetFromList(&memlist_history_, key, value, timestamp, s, merge_context,
147 max_covering_tombstone_seq, seq, read_opts,
148 nullptr /*read_callback*/, is_blob_index);
149 }
150
GetFromList(std::list<MemTable * > * list,const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index)151 bool MemTableListVersion::GetFromList(
152 std::list<MemTable*>* list, const LookupKey& key, std::string* value,
153 std::string* timestamp, Status* s, MergeContext* merge_context,
154 SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
155 const ReadOptions& read_opts, ReadCallback* callback, bool* is_blob_index) {
156 *seq = kMaxSequenceNumber;
157
158 for (auto& memtable : *list) {
159 SequenceNumber current_seq = kMaxSequenceNumber;
160
161 bool done = memtable->Get(key, value, timestamp, s, merge_context,
162 max_covering_tombstone_seq, ¤t_seq,
163 read_opts, callback, is_blob_index);
164 if (*seq == kMaxSequenceNumber) {
165 // Store the most recent sequence number of any operation on this key.
166 // Since we only care about the most recent change, we only need to
167 // return the first operation found when searching memtables in
168 // reverse-chronological order.
169 // current_seq would be equal to kMaxSequenceNumber if the value was to be
170 // skipped. This allows seq to be assigned again when the next value is
171 // read.
172 *seq = current_seq;
173 }
174
175 if (done) {
176 assert(*seq != kMaxSequenceNumber || s->IsNotFound());
177 return true;
178 }
179 if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) {
180 return false;
181 }
182 }
183 return false;
184 }
185
AddRangeTombstoneIterators(const ReadOptions & read_opts,Arena *,RangeDelAggregator * range_del_agg)186 Status MemTableListVersion::AddRangeTombstoneIterators(
187 const ReadOptions& read_opts, Arena* /*arena*/,
188 RangeDelAggregator* range_del_agg) {
189 assert(range_del_agg != nullptr);
190 // Except for snapshot read, using kMaxSequenceNumber is OK because these
191 // are immutable memtables.
192 SequenceNumber read_seq = read_opts.snapshot != nullptr
193 ? read_opts.snapshot->GetSequenceNumber()
194 : kMaxSequenceNumber;
195 for (auto& m : memlist_) {
196 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
197 m->NewRangeTombstoneIterator(read_opts, read_seq));
198 range_del_agg->AddTombstones(std::move(range_del_iter));
199 }
200 return Status::OK();
201 }
202
AddIterators(const ReadOptions & options,std::vector<InternalIterator * > * iterator_list,Arena * arena)203 void MemTableListVersion::AddIterators(
204 const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
205 Arena* arena) {
206 for (auto& m : memlist_) {
207 iterator_list->push_back(m->NewIterator(options, arena));
208 }
209 }
210
AddIterators(const ReadOptions & options,MergeIteratorBuilder * merge_iter_builder)211 void MemTableListVersion::AddIterators(
212 const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
213 for (auto& m : memlist_) {
214 merge_iter_builder->AddIterator(
215 m->NewIterator(options, merge_iter_builder->GetArena()));
216 }
217 }
218
GetTotalNumEntries() const219 uint64_t MemTableListVersion::GetTotalNumEntries() const {
220 uint64_t total_num = 0;
221 for (auto& m : memlist_) {
222 total_num += m->num_entries();
223 }
224 return total_num;
225 }
226
ApproximateStats(const Slice & start_ikey,const Slice & end_ikey)227 MemTable::MemTableStats MemTableListVersion::ApproximateStats(
228 const Slice& start_ikey, const Slice& end_ikey) {
229 MemTable::MemTableStats total_stats = {0, 0};
230 for (auto& m : memlist_) {
231 auto mStats = m->ApproximateStats(start_ikey, end_ikey);
232 total_stats.size += mStats.size;
233 total_stats.count += mStats.count;
234 }
235 return total_stats;
236 }
237
GetTotalNumDeletes() const238 uint64_t MemTableListVersion::GetTotalNumDeletes() const {
239 uint64_t total_num = 0;
240 for (auto& m : memlist_) {
241 total_num += m->num_deletes();
242 }
243 return total_num;
244 }
245
GetEarliestSequenceNumber(bool include_history) const246 SequenceNumber MemTableListVersion::GetEarliestSequenceNumber(
247 bool include_history) const {
248 if (include_history && !memlist_history_.empty()) {
249 return memlist_history_.back()->GetEarliestSequenceNumber();
250 } else if (!memlist_.empty()) {
251 return memlist_.back()->GetEarliestSequenceNumber();
252 } else {
253 return kMaxSequenceNumber;
254 }
255 }
256
257 // caller is responsible for referencing m
Add(MemTable * m,autovector<MemTable * > * to_delete)258 void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) {
259 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
260 AddMemTable(m);
261
262 TrimHistory(to_delete, m->ApproximateMemoryUsage());
263 }
264
265 // Removes m from list of memtables not flushed. Caller should NOT Unref m.
Remove(MemTable * m,autovector<MemTable * > * to_delete)266 void MemTableListVersion::Remove(MemTable* m,
267 autovector<MemTable*>* to_delete) {
268 assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable
269 memlist_.remove(m);
270
271 m->MarkFlushed();
272 if (max_write_buffer_size_to_maintain_ > 0 ||
273 max_write_buffer_number_to_maintain_ > 0) {
274 memlist_history_.push_front(m);
275 // Unable to get size of mutable memtable at this point, pass 0 to
276 // TrimHistory as a best effort.
277 TrimHistory(to_delete, 0);
278 } else {
279 UnrefMemTable(to_delete, m);
280 }
281 }
282
283 // return the total memory usage assuming the oldest flushed memtable is dropped
ApproximateMemoryUsageExcludingLast() const284 size_t MemTableListVersion::ApproximateMemoryUsageExcludingLast() const {
285 size_t total_memtable_size = 0;
286 for (auto& memtable : memlist_) {
287 total_memtable_size += memtable->ApproximateMemoryUsage();
288 }
289 for (auto& memtable : memlist_history_) {
290 total_memtable_size += memtable->ApproximateMemoryUsage();
291 }
292 if (!memlist_history_.empty()) {
293 total_memtable_size -= memlist_history_.back()->ApproximateMemoryUsage();
294 }
295 return total_memtable_size;
296 }
297
MemtableLimitExceeded(size_t usage)298 bool MemTableListVersion::MemtableLimitExceeded(size_t usage) {
299 if (max_write_buffer_size_to_maintain_ > 0) {
300 // calculate the total memory usage after dropping the oldest flushed
301 // memtable, compare with max_write_buffer_size_to_maintain_ to decide
302 // whether to trim history
303 return ApproximateMemoryUsageExcludingLast() + usage >=
304 static_cast<size_t>(max_write_buffer_size_to_maintain_);
305 } else if (max_write_buffer_number_to_maintain_ > 0) {
306 return memlist_.size() + memlist_history_.size() >
307 static_cast<size_t>(max_write_buffer_number_to_maintain_);
308 } else {
309 return false;
310 }
311 }
312
313 // Make sure we don't use up too much space in history
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)314 void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete,
315 size_t usage) {
316 while (MemtableLimitExceeded(usage) && !memlist_history_.empty()) {
317 MemTable* x = memlist_history_.back();
318 memlist_history_.pop_back();
319
320 UnrefMemTable(to_delete, x);
321 }
322 }
323
324 // Returns true if there is at least one memtable on which flush has
325 // not yet started.
IsFlushPending() const326 bool MemTableList::IsFlushPending() const {
327 if ((flush_requested_ && num_flush_not_started_ > 0) ||
328 (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
329 assert(imm_flush_needed.load(std::memory_order_relaxed));
330 return true;
331 }
332 return false;
333 }
334
335 // Returns the memtables that need to be flushed.
PickMemtablesToFlush(const uint64_t * max_memtable_id,autovector<MemTable * > * ret)336 void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id,
337 autovector<MemTable*>* ret) {
338 AutoThreadOperationStageUpdater stage_updater(
339 ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
340 const auto& memlist = current_->memlist_;
341 bool atomic_flush = false;
342 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
343 MemTable* m = *it;
344 if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
345 atomic_flush = true;
346 }
347 if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) {
348 break;
349 }
350 if (!m->flush_in_progress_) {
351 assert(!m->flush_completed_);
352 num_flush_not_started_--;
353 if (num_flush_not_started_ == 0) {
354 imm_flush_needed.store(false, std::memory_order_release);
355 }
356 m->flush_in_progress_ = true; // flushing will start very soon
357 ret->push_back(m);
358 }
359 }
360 if (!atomic_flush || num_flush_not_started_ == 0) {
361 flush_requested_ = false; // start-flush request is complete
362 }
363 }
364
RollbackMemtableFlush(const autovector<MemTable * > & mems,uint64_t)365 void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
366 uint64_t /*file_number*/) {
367 AutoThreadOperationStageUpdater stage_updater(
368 ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
369 assert(!mems.empty());
370
371 // If the flush was not successful, then just reset state.
372 // Maybe a succeeding attempt to flush will be successful.
373 for (MemTable* m : mems) {
374 assert(m->flush_in_progress_);
375 assert(m->file_number_ == 0);
376
377 m->flush_in_progress_ = false;
378 m->flush_completed_ = false;
379 m->edit_.Clear();
380 num_flush_not_started_++;
381 }
382 imm_flush_needed.store(true, std::memory_order_release);
383 }
384
385 // Try record a successful flush in the manifest file. It might just return
386 // Status::OK letting a concurrent flush to do actual the recording..
TryInstallMemtableFlushResults(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,const autovector<MemTable * > & mems,LogsWithPrepTracker * prep_tracker,VersionSet * vset,InstrumentedMutex * mu,uint64_t file_number,autovector<MemTable * > * to_delete,FSDirectory * db_directory,LogBuffer * log_buffer,std::list<std::unique_ptr<FlushJobInfo>> * committed_flush_jobs_info,IOStatus * io_s)387 Status MemTableList::TryInstallMemtableFlushResults(
388 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
389 const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
390 VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
391 autovector<MemTable*>* to_delete, FSDirectory* db_directory,
392 LogBuffer* log_buffer,
393 std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
394 IOStatus* io_s) {
395 AutoThreadOperationStageUpdater stage_updater(
396 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
397 mu->AssertHeld();
398
399 // Flush was successful
400 // Record the status on the memtable object. Either this call or a call by a
401 // concurrent flush thread will read the status and write it to manifest.
402 for (size_t i = 0; i < mems.size(); ++i) {
403 // All the edits are associated with the first memtable of this batch.
404 assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0);
405
406 mems[i]->flush_completed_ = true;
407 mems[i]->file_number_ = file_number;
408 }
409
410 // if some other thread is already committing, then return
411 Status s;
412 if (commit_in_progress_) {
413 TEST_SYNC_POINT("MemTableList::TryInstallMemtableFlushResults:InProgress");
414 return s;
415 }
416
417 // Only a single thread can be executing this piece of code
418 commit_in_progress_ = true;
419
420 // Retry until all completed flushes are committed. New flushes can finish
421 // while the current thread is writing manifest where mutex is released.
422 while (s.ok()) {
423 auto& memlist = current_->memlist_;
424 // The back is the oldest; if flush_completed_ is not set to it, it means
425 // that we were assigned a more recent memtable. The memtables' flushes must
426 // be recorded in manifest in order. A concurrent flush thread, who is
427 // assigned to flush the oldest memtable, will later wake up and does all
428 // the pending writes to manifest, in order.
429 if (memlist.empty() || !memlist.back()->flush_completed_) {
430 break;
431 }
432 // scan all memtables from the earliest, and commit those
433 // (in that order) that have finished flushing. Memtables
434 // are always committed in the order that they were created.
435 uint64_t batch_file_number = 0;
436 size_t batch_count = 0;
437 autovector<VersionEdit*> edit_list;
438 autovector<MemTable*> memtables_to_flush;
439 // enumerate from the last (earliest) element to see how many batch finished
440 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
441 MemTable* m = *it;
442 if (!m->flush_completed_) {
443 break;
444 }
445 if (it == memlist.rbegin() || batch_file_number != m->file_number_) {
446 batch_file_number = m->file_number_;
447 ROCKS_LOG_BUFFER(log_buffer,
448 "[%s] Level-0 commit table #%" PRIu64 " started",
449 cfd->GetName().c_str(), m->file_number_);
450 edit_list.push_back(&m->edit_);
451 memtables_to_flush.push_back(m);
452 #ifndef ROCKSDB_LITE
453 std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
454 if (info != nullptr) {
455 committed_flush_jobs_info->push_back(std::move(info));
456 }
457 #else
458 (void)committed_flush_jobs_info;
459 #endif // !ROCKSDB_LITE
460 }
461 batch_count++;
462 }
463
464 // TODO(myabandeh): Not sure how batch_count could be 0 here.
465 if (batch_count > 0) {
466 if (vset->db_options()->allow_2pc) {
467 assert(edit_list.size() > 0);
468 // We piggyback the information of earliest log file to keep in the
469 // manifest entry for the last file flushed.
470 edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
471 vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
472 }
473
474 // this can release and reacquire the mutex.
475 vset->SetIOStatusOK();
476 s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
477 db_directory);
478 *io_s = vset->io_status();
479
480 // we will be changing the version in the next code path,
481 // so we better create a new one, since versions are immutable
482 InstallNewVersion();
483
484 // All the later memtables that have the same filenum
485 // are part of the same batch. They can be committed now.
486 uint64_t mem_id = 1; // how many memtables have been flushed.
487
488 // commit new state only if the column family is NOT dropped.
489 // The reason is as follows (refer to
490 // ColumnFamilyTest.FlushAndDropRaceCondition).
491 // If the column family is dropped, then according to LogAndApply, its
492 // corresponding flush operation is NOT written to the MANIFEST. This
493 // means the DB is not aware of the L0 files generated from the flush.
494 // By committing the new state, we remove the memtable from the memtable
495 // list. Creating an iterator on this column family will not be able to
496 // read full data since the memtable is removed, and the DB is not aware
497 // of the L0 files, causing MergingIterator unable to build child
498 // iterators. RocksDB contract requires that the iterator can be created
499 // on a dropped column family, and we must be able to
500 // read full data as long as column family handle is not deleted, even if
501 // the column family is dropped.
502 if (s.ok() && !cfd->IsDropped()) { // commit new state
503 while (batch_count-- > 0) {
504 MemTable* m = current_->memlist_.back();
505 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64
506 ": memtable #%" PRIu64 " done",
507 cfd->GetName().c_str(), m->file_number_, mem_id);
508 assert(m->file_number_ > 0);
509 current_->Remove(m, to_delete);
510 UpdateCachedValuesFromMemTableListVersion();
511 ResetTrimHistoryNeeded();
512 ++mem_id;
513 }
514 } else {
515 for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) {
516 MemTable* m = *it;
517 // commit failed. setup state so that we can flush again.
518 ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64
519 ": memtable #%" PRIu64 " failed",
520 m->file_number_, mem_id);
521 m->flush_completed_ = false;
522 m->flush_in_progress_ = false;
523 m->edit_.Clear();
524 num_flush_not_started_++;
525 m->file_number_ = 0;
526 imm_flush_needed.store(true, std::memory_order_release);
527 ++mem_id;
528 }
529 }
530 }
531 }
532 commit_in_progress_ = false;
533 return s;
534 }
535
536 // New memtables are inserted at the front of the list.
Add(MemTable * m,autovector<MemTable * > * to_delete)537 void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) {
538 assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_);
539 InstallNewVersion();
540 // this method is used to move mutable memtable into an immutable list.
541 // since mutable memtable is already refcounted by the DBImpl,
542 // and when moving to the imutable list we don't unref it,
543 // we don't have to ref the memtable here. we just take over the
544 // reference from the DBImpl.
545 current_->Add(m, to_delete);
546 m->MarkImmutable();
547 num_flush_not_started_++;
548 if (num_flush_not_started_ == 1) {
549 imm_flush_needed.store(true, std::memory_order_release);
550 }
551 UpdateCachedValuesFromMemTableListVersion();
552 ResetTrimHistoryNeeded();
553 }
554
TrimHistory(autovector<MemTable * > * to_delete,size_t usage)555 void MemTableList::TrimHistory(autovector<MemTable*>* to_delete, size_t usage) {
556 InstallNewVersion();
557 current_->TrimHistory(to_delete, usage);
558 UpdateCachedValuesFromMemTableListVersion();
559 ResetTrimHistoryNeeded();
560 }
561
562 // Returns an estimate of the number of bytes of data in use.
ApproximateUnflushedMemTablesMemoryUsage()563 size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
564 size_t total_size = 0;
565 for (auto& memtable : current_->memlist_) {
566 total_size += memtable->ApproximateMemoryUsage();
567 }
568 return total_size;
569 }
570
ApproximateMemoryUsage()571 size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
572
ApproximateMemoryUsageExcludingLast() const573 size_t MemTableList::ApproximateMemoryUsageExcludingLast() const {
574 const size_t usage =
575 current_memory_usage_excluding_last_.load(std::memory_order_relaxed);
576 return usage;
577 }
578
HasHistory() const579 bool MemTableList::HasHistory() const {
580 const bool has_history = current_has_history_.load(std::memory_order_relaxed);
581 return has_history;
582 }
583
UpdateCachedValuesFromMemTableListVersion()584 void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
585 const size_t total_memtable_size =
586 current_->ApproximateMemoryUsageExcludingLast();
587 current_memory_usage_excluding_last_.store(total_memtable_size,
588 std::memory_order_relaxed);
589
590 const bool has_history = current_->HasHistory();
591 current_has_history_.store(has_history, std::memory_order_relaxed);
592 }
593
ApproximateOldestKeyTime() const594 uint64_t MemTableList::ApproximateOldestKeyTime() const {
595 if (!current_->memlist_.empty()) {
596 return current_->memlist_.back()->ApproximateOldestKeyTime();
597 }
598 return std::numeric_limits<uint64_t>::max();
599 }
600
InstallNewVersion()601 void MemTableList::InstallNewVersion() {
602 if (current_->refs_ == 1) {
603 // we're the only one using the version, just keep using it
604 } else {
605 // somebody else holds the current version, we need to create new one
606 MemTableListVersion* version = current_;
607 current_ = new MemTableListVersion(¤t_memory_usage_, current_);
608 current_->Ref();
609 version->Unref();
610 }
611 }
612
PrecomputeMinLogContainingPrepSection(const autovector<MemTable * > & memtables_to_flush)613 uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
614 const autovector<MemTable*>& memtables_to_flush) {
615 uint64_t min_log = 0;
616
617 for (auto& m : current_->memlist_) {
618 // Assume the list is very short, we can live with O(m*n). We can optimize
619 // if the performance has some problem.
620 bool should_skip = false;
621 for (MemTable* m_to_flush : memtables_to_flush) {
622 if (m == m_to_flush) {
623 should_skip = true;
624 break;
625 }
626 }
627 if (should_skip) {
628 continue;
629 }
630
631 auto log = m->GetMinLogContainingPrepSection();
632
633 if (log > 0 && (min_log == 0 || log < min_log)) {
634 min_log = log;
635 }
636 }
637
638 return min_log;
639 }
640
641 // Commit a successful atomic flush in the manifest file.
InstallMemtableAtomicFlushResults(const autovector<MemTableList * > * imm_lists,const autovector<ColumnFamilyData * > & cfds,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<const autovector<MemTable * > * > & mems_list,VersionSet * vset,InstrumentedMutex * mu,const autovector<FileMetaData * > & file_metas,autovector<MemTable * > * to_delete,FSDirectory * db_directory,LogBuffer * log_buffer)642 Status InstallMemtableAtomicFlushResults(
643 const autovector<MemTableList*>* imm_lists,
644 const autovector<ColumnFamilyData*>& cfds,
645 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
646 const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
647 InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
648 autovector<MemTable*>* to_delete, FSDirectory* db_directory,
649 LogBuffer* log_buffer) {
650 AutoThreadOperationStageUpdater stage_updater(
651 ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
652 mu->AssertHeld();
653
654 size_t num = mems_list.size();
655 assert(cfds.size() == num);
656 if (imm_lists != nullptr) {
657 assert(imm_lists->size() == num);
658 }
659 for (size_t k = 0; k != num; ++k) {
660 #ifndef NDEBUG
661 const auto* imm =
662 (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
663 if (!mems_list[k]->empty()) {
664 assert((*mems_list[k])[0]->GetID() == imm->GetEarliestMemTableID());
665 }
666 #endif
667 assert(nullptr != file_metas[k]);
668 for (size_t i = 0; i != mems_list[k]->size(); ++i) {
669 assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0);
670 (*mems_list[k])[i]->SetFlushCompleted(true);
671 (*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
672 }
673 }
674
675 Status s;
676
677 autovector<autovector<VersionEdit*>> edit_lists;
678 uint32_t num_entries = 0;
679 for (const auto mems : mems_list) {
680 assert(mems != nullptr);
681 autovector<VersionEdit*> edits;
682 assert(!mems->empty());
683 edits.emplace_back((*mems)[0]->GetEdits());
684 ++num_entries;
685 edit_lists.emplace_back(edits);
686 }
687 // Mark the version edits as an atomic group if the number of version edits
688 // exceeds 1.
689 if (cfds.size() > 1) {
690 for (auto& edits : edit_lists) {
691 assert(edits.size() == 1);
692 edits[0]->MarkAtomicGroup(--num_entries);
693 }
694 assert(0 == num_entries);
695 }
696
697 // this can release and reacquire the mutex.
698 s = vset->LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu,
699 db_directory);
700
701 for (size_t k = 0; k != cfds.size(); ++k) {
702 auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k);
703 imm->InstallNewVersion();
704 }
705
706 if (s.ok() || s.IsColumnFamilyDropped()) {
707 for (size_t i = 0; i != cfds.size(); ++i) {
708 if (cfds[i]->IsDropped()) {
709 continue;
710 }
711 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
712 for (auto m : *mems_list[i]) {
713 assert(m->GetFileNumber() > 0);
714 uint64_t mem_id = m->GetID();
715 ROCKS_LOG_BUFFER(log_buffer,
716 "[%s] Level-0 commit table #%" PRIu64
717 ": memtable #%" PRIu64 " done",
718 cfds[i]->GetName().c_str(), m->GetFileNumber(),
719 mem_id);
720 imm->current_->Remove(m, to_delete);
721 imm->UpdateCachedValuesFromMemTableListVersion();
722 imm->ResetTrimHistoryNeeded();
723 }
724 }
725 } else {
726 for (size_t i = 0; i != cfds.size(); ++i) {
727 auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i);
728 for (auto m : *mems_list[i]) {
729 uint64_t mem_id = m->GetID();
730 ROCKS_LOG_BUFFER(log_buffer,
731 "[%s] Level-0 commit table #%" PRIu64
732 ": memtable #%" PRIu64 " failed",
733 cfds[i]->GetName().c_str(), m->GetFileNumber(),
734 mem_id);
735 m->SetFlushCompleted(false);
736 m->SetFlushInProgress(false);
737 m->GetEdits()->Clear();
738 m->SetFileNumber(0);
739 imm->num_flush_not_started_++;
740 }
741 imm->imm_flush_needed.store(true, std::memory_order_release);
742 }
743 }
744
745 return s;
746 }
747
RemoveOldMemTables(uint64_t log_number,autovector<MemTable * > * to_delete)748 void MemTableList::RemoveOldMemTables(uint64_t log_number,
749 autovector<MemTable*>* to_delete) {
750 assert(to_delete != nullptr);
751 InstallNewVersion();
752 auto& memlist = current_->memlist_;
753 autovector<MemTable*> old_memtables;
754 for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
755 MemTable* mem = *it;
756 if (mem->GetNextLogNumber() > log_number) {
757 break;
758 }
759 old_memtables.push_back(mem);
760 }
761
762 for (auto it = old_memtables.begin(); it != old_memtables.end(); ++it) {
763 MemTable* mem = *it;
764 current_->Remove(mem, to_delete);
765 --num_flush_not_started_;
766 if (0 == num_flush_not_started_) {
767 imm_flush_needed.store(false, std::memory_order_release);
768 }
769 }
770
771 UpdateCachedValuesFromMemTableListVersion();
772 ResetTrimHistoryNeeded();
773 }
774
775 } // namespace ROCKSDB_NAMESPACE
776