1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/memtable.h"
11
12 #include <algorithm>
13 #include <array>
14 #include <limits>
15 #include <memory>
16 #include "db/dbformat.h"
17 #include "db/merge_context.h"
18 #include "db/merge_helper.h"
19 #include "db/pinned_iterators_manager.h"
20 #include "db/range_tombstone_fragmenter.h"
21 #include "db/read_callback.h"
22 #include "memory/arena.h"
23 #include "memory/memory_usage.h"
24 #include "monitoring/perf_context_imp.h"
25 #include "monitoring/statistics.h"
26 #include "port/port.h"
27 #include "rocksdb/comparator.h"
28 #include "rocksdb/env.h"
29 #include "rocksdb/iterator.h"
30 #include "rocksdb/merge_operator.h"
31 #include "rocksdb/slice_transform.h"
32 #include "rocksdb/write_buffer_manager.h"
33 #include "table/internal_iterator.h"
34 #include "table/iterator_wrapper.h"
35 #include "table/merging_iterator.h"
36 #include "util/autovector.h"
37 #include "util/coding.h"
38 #include "util/mutexlock.h"
39 #include "util/util.h"
40
41 namespace ROCKSDB_NAMESPACE {
42
ImmutableMemTableOptions(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)43 ImmutableMemTableOptions::ImmutableMemTableOptions(
44 const ImmutableCFOptions& ioptions,
45 const MutableCFOptions& mutable_cf_options)
46 : arena_block_size(mutable_cf_options.arena_block_size),
47 memtable_prefix_bloom_bits(
48 static_cast<uint32_t>(
49 static_cast<double>(mutable_cf_options.write_buffer_size) *
50 mutable_cf_options.memtable_prefix_bloom_size_ratio) *
51 8u),
52 memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
53 memtable_whole_key_filtering(
54 mutable_cf_options.memtable_whole_key_filtering),
55 inplace_update_support(ioptions.inplace_update_support),
56 inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
57 inplace_callback(ioptions.inplace_callback),
58 max_successive_merges(mutable_cf_options.max_successive_merges),
59 statistics(ioptions.statistics),
60 merge_operator(ioptions.merge_operator),
61 info_log(ioptions.info_log) {}
62
MemTable(const InternalKeyComparator & cmp,const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,WriteBufferManager * write_buffer_manager,SequenceNumber latest_seq,uint32_t column_family_id)63 MemTable::MemTable(const InternalKeyComparator& cmp,
64 const ImmutableCFOptions& ioptions,
65 const MutableCFOptions& mutable_cf_options,
66 WriteBufferManager* write_buffer_manager,
67 SequenceNumber latest_seq, uint32_t column_family_id)
68 : comparator_(cmp),
69 moptions_(ioptions, mutable_cf_options),
70 refs_(0),
71 kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
72 mem_tracker_(write_buffer_manager),
73 arena_(moptions_.arena_block_size,
74 (write_buffer_manager != nullptr &&
75 (write_buffer_manager->enabled() ||
76 write_buffer_manager->cost_to_cache()))
77 ? &mem_tracker_
78 : nullptr,
79 mutable_cf_options.memtable_huge_page_size),
80 table_(ioptions.memtable_factory->CreateMemTableRep(
81 comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
82 ioptions.info_log, column_family_id)),
83 range_del_table_(SkipListFactory().CreateMemTableRep(
84 comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
85 column_family_id)),
86 is_range_del_table_empty_(true),
87 data_size_(0),
88 num_entries_(0),
89 num_deletes_(0),
90 write_buffer_size_(mutable_cf_options.write_buffer_size),
91 flush_in_progress_(false),
92 flush_completed_(false),
93 file_number_(0),
94 first_seqno_(0),
95 earliest_seqno_(latest_seq),
96 creation_seq_(latest_seq),
97 mem_next_logfile_number_(0),
98 min_prep_log_referenced_(0),
99 locks_(moptions_.inplace_update_support
100 ? moptions_.inplace_update_num_locks
101 : 0),
102 prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
103 flush_state_(FLUSH_NOT_REQUESTED),
104 env_(ioptions.env),
105 insert_with_hint_prefix_extractor_(
106 ioptions.memtable_insert_with_hint_prefix_extractor),
107 oldest_key_time_(std::numeric_limits<uint64_t>::max()),
108 atomic_flush_seqno_(kMaxSequenceNumber),
109 approximate_memory_usage_(0) {
110 UpdateFlushState();
111 // something went wrong if we need to flush before inserting anything
112 assert(!ShouldScheduleFlush());
113
114 // use bloom_filter_ for both whole key and prefix bloom filter
115 if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
116 moptions_.memtable_prefix_bloom_bits > 0) {
117 bloom_filter_.reset(
118 new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
119 6 /* hard coded 6 probes */,
120 moptions_.memtable_huge_page_size, ioptions.info_log));
121 }
122 }
123
~MemTable()124 MemTable::~MemTable() {
125 mem_tracker_.FreeMem();
126 assert(refs_ == 0);
127 }
128
ApproximateMemoryUsage()129 size_t MemTable::ApproximateMemoryUsage() {
130 autovector<size_t> usages = {
131 arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
132 range_del_table_->ApproximateMemoryUsage(),
133 ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
134 size_t total_usage = 0;
135 for (size_t usage : usages) {
136 // If usage + total_usage >= kMaxSizet, return kMaxSizet.
137 // the following variation is to avoid numeric overflow.
138 if (usage >= port::kMaxSizet - total_usage) {
139 return port::kMaxSizet;
140 }
141 total_usage += usage;
142 }
143 approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
144 // otherwise, return the actual usage
145 return total_usage;
146 }
147
ShouldFlushNow()148 bool MemTable::ShouldFlushNow() {
149 size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
150 // In a lot of times, we cannot allocate arena blocks that exactly matches the
151 // buffer size. Thus we have to decide if we should over-allocate or
152 // under-allocate.
153 // This constant variable can be interpreted as: if we still have more than
154 // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
155 // allocate one more block.
156 const double kAllowOverAllocationRatio = 0.6;
157
158 // If arena still have room for new block allocation, we can safely say it
159 // shouldn't flush.
160 auto allocated_memory = table_->ApproximateMemoryUsage() +
161 range_del_table_->ApproximateMemoryUsage() +
162 arena_.MemoryAllocatedBytes();
163
164 approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
165
166 // if we can still allocate one more block without exceeding the
167 // over-allocation ratio, then we should not flush.
168 if (allocated_memory + kArenaBlockSize <
169 write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
170 return false;
171 }
172
173 // if user keeps adding entries that exceeds write_buffer_size, we need to
174 // flush earlier even though we still have much available memory left.
175 if (allocated_memory >
176 write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
177 return true;
178 }
179
180 // In this code path, Arena has already allocated its "last block", which
181 // means the total allocatedmemory size is either:
182 // (1) "moderately" over allocated the memory (no more than `0.6 * arena
183 // block size`. Or,
184 // (2) the allocated memory is less than write buffer size, but we'll stop
185 // here since if we allocate a new arena block, we'll over allocate too much
186 // more (half of the arena block size) memory.
187 //
188 // In either case, to avoid over-allocate, the last block will stop allocation
189 // when its usage reaches a certain ratio, which we carefully choose "0.75
190 // full" as the stop condition because it addresses the following issue with
191 // great simplicity: What if the next inserted entry's size is
192 // bigger than AllocatedAndUnused()?
193 //
194 // The answer is: if the entry size is also bigger than 0.25 *
195 // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
196 // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
197 // and regular block. In either case, we *overly* over-allocated.
198 //
199 // Therefore, setting the last block to be at most "0.75 full" avoids both
200 // cases.
201 //
202 // NOTE: the average percentage of waste space of this approach can be counted
203 // as: "arena block size * 0.25 / write buffer size". User who specify a small
204 // write buffer size and/or big arena block size may suffer.
205 return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
206 }
207
UpdateFlushState()208 void MemTable::UpdateFlushState() {
209 auto state = flush_state_.load(std::memory_order_relaxed);
210 if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
211 // ignore CAS failure, because that means somebody else requested
212 // a flush
213 flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
214 std::memory_order_relaxed,
215 std::memory_order_relaxed);
216 }
217 }
218
UpdateOldestKeyTime()219 void MemTable::UpdateOldestKeyTime() {
220 uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
221 if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
222 int64_t current_time = 0;
223 auto s = env_->GetCurrentTime(¤t_time);
224 if (s.ok()) {
225 assert(current_time >= 0);
226 // If fail, the timestamp is already set.
227 oldest_key_time_.compare_exchange_strong(
228 oldest_key_time, static_cast<uint64_t>(current_time),
229 std::memory_order_relaxed, std::memory_order_relaxed);
230 }
231 }
232 }
233
operator ()(const char * prefix_len_key1,const char * prefix_len_key2) const234 int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
235 const char* prefix_len_key2) const {
236 // Internal keys are encoded as length-prefixed strings.
237 Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
238 Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
239 return comparator.CompareKeySeq(k1, k2);
240 }
241
operator ()(const char * prefix_len_key,const KeyComparator::DecodedType & key) const242 int MemTable::KeyComparator::operator()(const char* prefix_len_key,
243 const KeyComparator::DecodedType& key)
244 const {
245 // Internal keys are encoded as length-prefixed strings.
246 Slice a = GetLengthPrefixedSlice(prefix_len_key);
247 return comparator.CompareKeySeq(a, key);
248 }
249
InsertConcurrently(KeyHandle)250 void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
251 #ifndef ROCKSDB_LITE
252 throw std::runtime_error("concurrent insert not supported");
253 #else
254 abort();
255 #endif
256 }
257
UserKey(const char * key) const258 Slice MemTableRep::UserKey(const char* key) const {
259 Slice slice = GetLengthPrefixedSlice(key);
260 return Slice(slice.data(), slice.size() - 8);
261 }
262
Allocate(const size_t len,char ** buf)263 KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
264 *buf = allocator_->Allocate(len);
265 return static_cast<KeyHandle>(*buf);
266 }
267
268 // Encode a suitable internal key target for "target" and return it.
269 // Uses *scratch as scratch space, and the returned pointer will point
270 // into this scratch space.
EncodeKey(std::string * scratch,const Slice & target)271 const char* EncodeKey(std::string* scratch, const Slice& target) {
272 scratch->clear();
273 PutVarint32(scratch, static_cast<uint32_t>(target.size()));
274 scratch->append(target.data(), target.size());
275 return scratch->data();
276 }
277
278 class MemTableIterator : public InternalIterator {
279 public:
MemTableIterator(const MemTable & mem,const ReadOptions & read_options,Arena * arena,bool use_range_del_table=false)280 MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
281 Arena* arena, bool use_range_del_table = false)
282 : bloom_(nullptr),
283 prefix_extractor_(mem.prefix_extractor_),
284 comparator_(mem.comparator_),
285 valid_(false),
286 arena_mode_(arena != nullptr),
287 value_pinned_(
288 !mem.GetImmutableMemTableOptions()->inplace_update_support) {
289 if (use_range_del_table) {
290 iter_ = mem.range_del_table_->GetIterator(arena);
291 } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
292 !read_options.auto_prefix_mode) {
293 // Auto prefix mode is not implemented in memtable yet.
294 bloom_ = mem.bloom_filter_.get();
295 iter_ = mem.table_->GetDynamicPrefixIterator(arena);
296 } else {
297 iter_ = mem.table_->GetIterator(arena);
298 }
299 }
300 // No copying allowed
301 MemTableIterator(const MemTableIterator&) = delete;
302 void operator=(const MemTableIterator&) = delete;
303
~MemTableIterator()304 ~MemTableIterator() override {
305 #ifndef NDEBUG
306 // Assert that the MemTableIterator is never deleted while
307 // Pinning is Enabled.
308 assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
309 #endif
310 if (arena_mode_) {
311 iter_->~Iterator();
312 } else {
313 delete iter_;
314 }
315 }
316
317 #ifndef NDEBUG
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)318 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
319 pinned_iters_mgr_ = pinned_iters_mgr;
320 }
321 PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
322 #endif
323
Valid() const324 bool Valid() const override { return valid_; }
Seek(const Slice & k)325 void Seek(const Slice& k) override {
326 PERF_TIMER_GUARD(seek_on_memtable_time);
327 PERF_COUNTER_ADD(seek_on_memtable_count, 1);
328 if (bloom_) {
329 // iterator should only use prefix bloom filter
330 Slice user_k(ExtractUserKey(k));
331 if (prefix_extractor_->InDomain(user_k) &&
332 !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
333 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
334 valid_ = false;
335 return;
336 } else {
337 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
338 }
339 }
340 iter_->Seek(k, nullptr);
341 valid_ = iter_->Valid();
342 }
SeekForPrev(const Slice & k)343 void SeekForPrev(const Slice& k) override {
344 PERF_TIMER_GUARD(seek_on_memtable_time);
345 PERF_COUNTER_ADD(seek_on_memtable_count, 1);
346 if (bloom_) {
347 Slice user_k(ExtractUserKey(k));
348 if (prefix_extractor_->InDomain(user_k) &&
349 !bloom_->MayContain(prefix_extractor_->Transform(user_k))) {
350 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
351 valid_ = false;
352 return;
353 } else {
354 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
355 }
356 }
357 iter_->Seek(k, nullptr);
358 valid_ = iter_->Valid();
359 if (!Valid()) {
360 SeekToLast();
361 }
362 while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
363 Prev();
364 }
365 }
SeekToFirst()366 void SeekToFirst() override {
367 iter_->SeekToFirst();
368 valid_ = iter_->Valid();
369 }
SeekToLast()370 void SeekToLast() override {
371 iter_->SeekToLast();
372 valid_ = iter_->Valid();
373 }
Next()374 void Next() override {
375 PERF_COUNTER_ADD(next_on_memtable_count, 1);
376 assert(Valid());
377 iter_->Next();
378 valid_ = iter_->Valid();
379 }
Prev()380 void Prev() override {
381 PERF_COUNTER_ADD(prev_on_memtable_count, 1);
382 assert(Valid());
383 iter_->Prev();
384 valid_ = iter_->Valid();
385 }
key() const386 Slice key() const override {
387 assert(Valid());
388 return GetLengthPrefixedSlice(iter_->key());
389 }
value() const390 Slice value() const override {
391 assert(Valid());
392 Slice key_slice = GetLengthPrefixedSlice(iter_->key());
393 return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
394 }
395
status() const396 Status status() const override { return Status::OK(); }
397
IsKeyPinned() const398 bool IsKeyPinned() const override {
399 // memtable data is always pinned
400 return true;
401 }
402
IsValuePinned() const403 bool IsValuePinned() const override {
404 // memtable value is always pinned, except if we allow inplace update.
405 return value_pinned_;
406 }
407
408 private:
409 DynamicBloom* bloom_;
410 const SliceTransform* const prefix_extractor_;
411 const MemTable::KeyComparator comparator_;
412 MemTableRep::Iterator* iter_;
413 bool valid_;
414 bool arena_mode_;
415 bool value_pinned_;
416 };
417
NewIterator(const ReadOptions & read_options,Arena * arena)418 InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
419 Arena* arena) {
420 assert(arena != nullptr);
421 auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
422 return new (mem) MemTableIterator(*this, read_options, arena);
423 }
424
NewRangeTombstoneIterator(const ReadOptions & read_options,SequenceNumber read_seq)425 FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
426 const ReadOptions& read_options, SequenceNumber read_seq) {
427 if (read_options.ignore_range_deletions ||
428 is_range_del_table_empty_.load(std::memory_order_relaxed)) {
429 return nullptr;
430 }
431 auto* unfragmented_iter = new MemTableIterator(
432 *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
433 if (unfragmented_iter == nullptr) {
434 return nullptr;
435 }
436 auto fragmented_tombstone_list =
437 std::make_shared<FragmentedRangeTombstoneList>(
438 std::unique_ptr<InternalIterator>(unfragmented_iter),
439 comparator_.comparator);
440
441 auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
442 fragmented_tombstone_list, comparator_.comparator, read_seq);
443 return fragmented_iter;
444 }
445
GetLock(const Slice & key)446 port::RWMutex* MemTable::GetLock(const Slice& key) {
447 return &locks_[fastrange64(GetSliceNPHash64(key), locks_.size())];
448 }
449
ApproximateStats(const Slice & start_ikey,const Slice & end_ikey)450 MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
451 const Slice& end_ikey) {
452 uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
453 entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
454 if (entry_count == 0) {
455 return {0, 0};
456 }
457 uint64_t n = num_entries_.load(std::memory_order_relaxed);
458 if (n == 0) {
459 return {0, 0};
460 }
461 if (entry_count > n) {
462 // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
463 // be larger than actual entries we have. Cap it to entries we have to limit
464 // the inaccuracy.
465 entry_count = n;
466 }
467 uint64_t data_size = data_size_.load(std::memory_order_relaxed);
468 return {entry_count * (data_size / n), entry_count};
469 }
470
Add(SequenceNumber s,ValueType type,const Slice & key,const Slice & value,bool allow_concurrent,MemTablePostProcessInfo * post_process_info,void ** hint)471 bool MemTable::Add(SequenceNumber s, ValueType type,
472 const Slice& key, /* user key */
473 const Slice& value, bool allow_concurrent,
474 MemTablePostProcessInfo* post_process_info, void** hint) {
475 // Format of an entry is concatenation of:
476 // key_size : varint32 of internal_key.size()
477 // key bytes : char[internal_key.size()]
478 // value_size : varint32 of value.size()
479 // value bytes : char[value.size()]
480 uint32_t key_size = static_cast<uint32_t>(key.size());
481 uint32_t val_size = static_cast<uint32_t>(value.size());
482 uint32_t internal_key_size = key_size + 8;
483 const uint32_t encoded_len = VarintLength(internal_key_size) +
484 internal_key_size + VarintLength(val_size) +
485 val_size;
486 char* buf = nullptr;
487 std::unique_ptr<MemTableRep>& table =
488 type == kTypeRangeDeletion ? range_del_table_ : table_;
489 KeyHandle handle = table->Allocate(encoded_len, &buf);
490
491 char* p = EncodeVarint32(buf, internal_key_size);
492 memcpy(p, key.data(), key_size);
493 Slice key_slice(p, key_size);
494 p += key_size;
495 uint64_t packed = PackSequenceAndType(s, type);
496 EncodeFixed64(p, packed);
497 p += 8;
498 p = EncodeVarint32(p, val_size);
499 memcpy(p, value.data(), val_size);
500 assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
501 size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
502
503 if (!allow_concurrent) {
504 // Extract prefix for insert with hint.
505 if (insert_with_hint_prefix_extractor_ != nullptr &&
506 insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
507 Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
508 bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
509 if (UNLIKELY(!res)) {
510 return res;
511 }
512 } else {
513 bool res = table->InsertKey(handle);
514 if (UNLIKELY(!res)) {
515 return res;
516 }
517 }
518
519 // this is a bit ugly, but is the way to avoid locked instructions
520 // when incrementing an atomic
521 num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
522 std::memory_order_relaxed);
523 data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
524 std::memory_order_relaxed);
525 if (type == kTypeDeletion) {
526 num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
527 std::memory_order_relaxed);
528 }
529
530 if (bloom_filter_ && prefix_extractor_ &&
531 prefix_extractor_->InDomain(key)) {
532 bloom_filter_->Add(prefix_extractor_->Transform(key));
533 }
534 if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
535 bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
536 }
537
538 // The first sequence number inserted into the memtable
539 assert(first_seqno_ == 0 || s >= first_seqno_);
540 if (first_seqno_ == 0) {
541 first_seqno_.store(s, std::memory_order_relaxed);
542
543 if (earliest_seqno_ == kMaxSequenceNumber) {
544 earliest_seqno_.store(GetFirstSequenceNumber(),
545 std::memory_order_relaxed);
546 }
547 assert(first_seqno_.load() >= earliest_seqno_.load());
548 }
549 assert(post_process_info == nullptr);
550 UpdateFlushState();
551 } else {
552 bool res = (hint == nullptr)
553 ? table->InsertKeyConcurrently(handle)
554 : table->InsertKeyWithHintConcurrently(handle, hint);
555 if (UNLIKELY(!res)) {
556 return res;
557 }
558
559 assert(post_process_info != nullptr);
560 post_process_info->num_entries++;
561 post_process_info->data_size += encoded_len;
562 if (type == kTypeDeletion) {
563 post_process_info->num_deletes++;
564 }
565
566 if (bloom_filter_ && prefix_extractor_ &&
567 prefix_extractor_->InDomain(key)) {
568 bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
569 }
570 if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
571 bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
572 }
573
574 // atomically update first_seqno_ and earliest_seqno_.
575 uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
576 while ((cur_seq_num == 0 || s < cur_seq_num) &&
577 !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
578 }
579 uint64_t cur_earliest_seqno =
580 earliest_seqno_.load(std::memory_order_relaxed);
581 while (
582 (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
583 !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
584 }
585 }
586 if (type == kTypeRangeDeletion) {
587 is_range_del_table_empty_.store(false, std::memory_order_relaxed);
588 }
589 UpdateOldestKeyTime();
590 return true;
591 }
592
593 // Callback from MemTable::Get()
594 namespace {
595
596 struct Saver {
597 Status* status;
598 const LookupKey* key;
599 bool* found_final_value; // Is value set correctly? Used by KeyMayExist
600 bool* merge_in_progress;
601 std::string* value;
602 SequenceNumber seq;
603 std::string* timestamp;
604 const MergeOperator* merge_operator;
605 // the merge operations encountered;
606 MergeContext* merge_context;
607 SequenceNumber max_covering_tombstone_seq;
608 MemTable* mem;
609 Logger* logger;
610 Statistics* statistics;
611 bool inplace_update_support;
612 bool do_merge;
613 Env* env_;
614 ReadCallback* callback_;
615 bool* is_blob_index;
616
CheckCallbackROCKSDB_NAMESPACE::__anon41ff2b0c0111::Saver617 bool CheckCallback(SequenceNumber _seq) {
618 if (callback_) {
619 return callback_->IsVisible(_seq);
620 }
621 return true;
622 }
623 };
624 } // namespace
625
SaveValue(void * arg,const char * entry)626 static bool SaveValue(void* arg, const char* entry) {
627 Saver* s = reinterpret_cast<Saver*>(arg);
628 assert(s != nullptr);
629 MergeContext* merge_context = s->merge_context;
630 SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
631 const MergeOperator* merge_operator = s->merge_operator;
632
633 assert(merge_context != nullptr);
634
635 // entry format is:
636 // klength varint32
637 // userkey char[klength-8]
638 // tag uint64
639 // vlength varint32f
640 // value char[vlength]
641 // Check that it belongs to same user key. We do not check the
642 // sequence number since the Seek() call above should have skipped
643 // all entries with overly large sequence numbers.
644 uint32_t key_length;
645 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
646 Slice user_key_slice = Slice(key_ptr, key_length - 8);
647 const Comparator* user_comparator =
648 s->mem->GetInternalKeyComparator().user_comparator();
649 size_t ts_sz = user_comparator->timestamp_size();
650 if (user_comparator->CompareWithoutTimestamp(user_key_slice,
651 s->key->user_key()) == 0) {
652 // Correct user key
653 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
654 ValueType type;
655 SequenceNumber seq;
656 UnPackSequenceAndType(tag, &seq, &type);
657 // If the value is not in the snapshot, skip it
658 if (!s->CheckCallback(seq)) {
659 return true; // to continue to the next seq
660 }
661
662 s->seq = seq;
663
664 if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
665 max_covering_tombstone_seq > seq) {
666 type = kTypeRangeDeletion;
667 }
668 switch (type) {
669 case kTypeBlobIndex:
670 if (s->is_blob_index == nullptr) {
671 ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
672 *(s->status) = Status::NotSupported(
673 "Encounter unsupported blob value. Please open DB with "
674 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
675 } else if (*(s->merge_in_progress)) {
676 *(s->status) =
677 Status::NotSupported("Blob DB does not support merge operator.");
678 }
679 if (!s->status->ok()) {
680 *(s->found_final_value) = true;
681 return false;
682 }
683 FALLTHROUGH_INTENDED;
684 case kTypeValue: {
685 if (s->inplace_update_support) {
686 s->mem->GetLock(s->key->user_key())->ReadLock();
687 }
688 Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
689 *(s->status) = Status::OK();
690 if (*(s->merge_in_progress)) {
691 if (s->do_merge) {
692 if (s->value != nullptr) {
693 *(s->status) = MergeHelper::TimedFullMerge(
694 merge_operator, s->key->user_key(), &v,
695 merge_context->GetOperands(), s->value, s->logger,
696 s->statistics, s->env_, nullptr /* result_operand */, true);
697 }
698 } else {
699 // Preserve the value with the goal of returning it as part of
700 // raw merge operands to the user
701 merge_context->PushOperand(
702 v, s->inplace_update_support == false /* operand_pinned */);
703 }
704 } else if (!s->do_merge) {
705 // Preserve the value with the goal of returning it as part of
706 // raw merge operands to the user
707 merge_context->PushOperand(
708 v, s->inplace_update_support == false /* operand_pinned */);
709 } else if (s->value != nullptr) {
710 s->value->assign(v.data(), v.size());
711 }
712 if (s->inplace_update_support) {
713 s->mem->GetLock(s->key->user_key())->ReadUnlock();
714 }
715 *(s->found_final_value) = true;
716 if (s->is_blob_index != nullptr) {
717 *(s->is_blob_index) = (type == kTypeBlobIndex);
718 }
719
720 if (ts_sz > 0 && s->timestamp != nullptr) {
721 Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
722 s->timestamp->assign(ts.data(), ts.size());
723 }
724 return false;
725 }
726 case kTypeDeletion:
727 case kTypeSingleDeletion:
728 case kTypeRangeDeletion: {
729 if (*(s->merge_in_progress)) {
730 if (s->value != nullptr) {
731 *(s->status) = MergeHelper::TimedFullMerge(
732 merge_operator, s->key->user_key(), nullptr,
733 merge_context->GetOperands(), s->value, s->logger,
734 s->statistics, s->env_, nullptr /* result_operand */, true);
735 }
736 } else {
737 *(s->status) = Status::NotFound();
738 }
739 *(s->found_final_value) = true;
740 return false;
741 }
742 case kTypeMerge: {
743 if (!merge_operator) {
744 *(s->status) = Status::InvalidArgument(
745 "merge_operator is not properly initialized.");
746 // Normally we continue the loop (return true) when we see a merge
747 // operand. But in case of an error, we should stop the loop
748 // immediately and pretend we have found the value to stop further
749 // seek. Otherwise, the later call will override this error status.
750 *(s->found_final_value) = true;
751 return false;
752 }
753 Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
754 *(s->merge_in_progress) = true;
755 merge_context->PushOperand(
756 v, s->inplace_update_support == false /* operand_pinned */);
757 if (s->do_merge && merge_operator->ShouldMerge(
758 merge_context->GetOperandsDirectionBackward())) {
759 *(s->status) = MergeHelper::TimedFullMerge(
760 merge_operator, s->key->user_key(), nullptr,
761 merge_context->GetOperands(), s->value, s->logger, s->statistics,
762 s->env_, nullptr /* result_operand */, true);
763 *(s->found_final_value) = true;
764 return false;
765 }
766 return true;
767 }
768 default:
769 assert(false);
770 return true;
771 }
772 }
773
774 // s->state could be Corrupt, merge or notfound
775 return false;
776 }
777
Get(const LookupKey & key,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,SequenceNumber * seq,const ReadOptions & read_opts,ReadCallback * callback,bool * is_blob_index,bool do_merge)778 bool MemTable::Get(const LookupKey& key, std::string* value,
779 std::string* timestamp, Status* s,
780 MergeContext* merge_context,
781 SequenceNumber* max_covering_tombstone_seq,
782 SequenceNumber* seq, const ReadOptions& read_opts,
783 ReadCallback* callback, bool* is_blob_index, bool do_merge) {
784 // The sequence number is updated synchronously in version_set.h
785 if (IsEmpty()) {
786 // Avoiding recording stats for speed.
787 return false;
788 }
789 PERF_TIMER_GUARD(get_from_memtable_time);
790
791 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
792 NewRangeTombstoneIterator(read_opts,
793 GetInternalKeySeqno(key.internal_key())));
794 if (range_del_iter != nullptr) {
795 *max_covering_tombstone_seq =
796 std::max(*max_covering_tombstone_seq,
797 range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
798 }
799
800 Slice user_key = key.user_key();
801 bool found_final_value = false;
802 bool merge_in_progress = s->IsMergeInProgress();
803 bool may_contain = true;
804 size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
805 if (bloom_filter_) {
806 // when both memtable_whole_key_filtering and prefix_extractor_ are set,
807 // only do whole key filtering for Get() to save CPU
808 if (moptions_.memtable_whole_key_filtering) {
809 may_contain =
810 bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
811 } else {
812 assert(prefix_extractor_);
813 may_contain =
814 !prefix_extractor_->InDomain(user_key) ||
815 bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
816 }
817 }
818
819 if (bloom_filter_ && !may_contain) {
820 // iter is null if prefix bloom says the key does not exist
821 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
822 *seq = kMaxSequenceNumber;
823 } else {
824 if (bloom_filter_) {
825 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
826 }
827 GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
828 is_blob_index, value, timestamp, s, merge_context, seq,
829 &found_final_value, &merge_in_progress);
830 }
831
832 // No change to value, since we have not yet found a Put/Delete
833 if (!found_final_value && merge_in_progress) {
834 *s = Status::MergeInProgress();
835 }
836 PERF_COUNTER_ADD(get_from_memtable_count, 1);
837 return found_final_value;
838 }
839
GetFromTable(const LookupKey & key,SequenceNumber max_covering_tombstone_seq,bool do_merge,ReadCallback * callback,bool * is_blob_index,std::string * value,std::string * timestamp,Status * s,MergeContext * merge_context,SequenceNumber * seq,bool * found_final_value,bool * merge_in_progress)840 void MemTable::GetFromTable(const LookupKey& key,
841 SequenceNumber max_covering_tombstone_seq,
842 bool do_merge, ReadCallback* callback,
843 bool* is_blob_index, std::string* value,
844 std::string* timestamp, Status* s,
845 MergeContext* merge_context, SequenceNumber* seq,
846 bool* found_final_value, bool* merge_in_progress) {
847 Saver saver;
848 saver.status = s;
849 saver.found_final_value = found_final_value;
850 saver.merge_in_progress = merge_in_progress;
851 saver.key = &key;
852 saver.value = value;
853 saver.timestamp = timestamp;
854 saver.seq = kMaxSequenceNumber;
855 saver.mem = this;
856 saver.merge_context = merge_context;
857 saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
858 saver.merge_operator = moptions_.merge_operator;
859 saver.logger = moptions_.info_log;
860 saver.inplace_update_support = moptions_.inplace_update_support;
861 saver.statistics = moptions_.statistics;
862 saver.env_ = env_;
863 saver.callback_ = callback;
864 saver.is_blob_index = is_blob_index;
865 saver.do_merge = do_merge;
866 table_->Get(key, &saver, SaveValue);
867 *seq = saver.seq;
868 }
869
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)870 void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
871 ReadCallback* callback, bool* is_blob) {
872 // The sequence number is updated synchronously in version_set.h
873 if (IsEmpty()) {
874 // Avoiding recording stats for speed.
875 return;
876 }
877 PERF_TIMER_GUARD(get_from_memtable_time);
878
879 MultiGetRange temp_range(*range, range->begin(), range->end());
880 if (bloom_filter_) {
881 std::array<Slice*, MultiGetContext::MAX_BATCH_SIZE> keys;
882 std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match = {{true}};
883 autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> prefixes;
884 int num_keys = 0;
885 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
886 if (!prefix_extractor_) {
887 keys[num_keys++] = &iter->ukey;
888 } else if (prefix_extractor_->InDomain(iter->ukey)) {
889 prefixes.emplace_back(prefix_extractor_->Transform(iter->ukey));
890 keys[num_keys++] = &prefixes.back();
891 }
892 }
893 bloom_filter_->MayContain(num_keys, &keys[0], &may_match[0]);
894 int idx = 0;
895 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
896 if (prefix_extractor_ && !prefix_extractor_->InDomain(iter->ukey)) {
897 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
898 continue;
899 }
900 if (!may_match[idx]) {
901 temp_range.SkipKey(iter);
902 PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
903 } else {
904 PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
905 }
906 idx++;
907 }
908 }
909 for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
910 SequenceNumber seq = kMaxSequenceNumber;
911 bool found_final_value{false};
912 bool merge_in_progress = iter->s->IsMergeInProgress();
913 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
914 NewRangeTombstoneIterator(
915 read_options, GetInternalKeySeqno(iter->lkey->internal_key())));
916 if (range_del_iter != nullptr) {
917 iter->max_covering_tombstone_seq = std::max(
918 iter->max_covering_tombstone_seq,
919 range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()));
920 }
921 GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
922 callback, is_blob, iter->value->GetSelf(), iter->timestamp,
923 iter->s, &(iter->merge_context), &seq, &found_final_value,
924 &merge_in_progress);
925
926 if (!found_final_value && merge_in_progress) {
927 *(iter->s) = Status::MergeInProgress();
928 }
929
930 if (found_final_value) {
931 iter->value->PinSelf();
932 range->MarkKeyDone(iter);
933 RecordTick(moptions_.statistics, MEMTABLE_HIT);
934 }
935 }
936 PERF_COUNTER_ADD(get_from_memtable_count, 1);
937 }
938
Update(SequenceNumber seq,const Slice & key,const Slice & value)939 void MemTable::Update(SequenceNumber seq,
940 const Slice& key,
941 const Slice& value) {
942 LookupKey lkey(key, seq);
943 Slice mem_key = lkey.memtable_key();
944
945 std::unique_ptr<MemTableRep::Iterator> iter(
946 table_->GetDynamicPrefixIterator());
947 iter->Seek(lkey.internal_key(), mem_key.data());
948
949 if (iter->Valid()) {
950 // entry format is:
951 // key_length varint32
952 // userkey char[klength-8]
953 // tag uint64
954 // vlength varint32
955 // value char[vlength]
956 // Check that it belongs to same user key. We do not check the
957 // sequence number since the Seek() call above should have skipped
958 // all entries with overly large sequence numbers.
959 const char* entry = iter->key();
960 uint32_t key_length = 0;
961 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
962 if (comparator_.comparator.user_comparator()->Equal(
963 Slice(key_ptr, key_length - 8), lkey.user_key())) {
964 // Correct user key
965 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
966 ValueType type;
967 SequenceNumber existing_seq;
968 UnPackSequenceAndType(tag, &existing_seq, &type);
969 assert(existing_seq != seq);
970 if (type == kTypeValue) {
971 Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
972 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
973 uint32_t new_size = static_cast<uint32_t>(value.size());
974
975 // Update value, if new value size <= previous value size
976 if (new_size <= prev_size) {
977 char* p =
978 EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
979 WriteLock wl(GetLock(lkey.user_key()));
980 memcpy(p, value.data(), value.size());
981 assert((unsigned)((p + value.size()) - entry) ==
982 (unsigned)(VarintLength(key_length) + key_length +
983 VarintLength(value.size()) + value.size()));
984 RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
985 return;
986 }
987 }
988 }
989 }
990
991 // key doesn't exist
992 bool add_res __attribute__((__unused__));
993 add_res = Add(seq, kTypeValue, key, value);
994 // We already checked unused != seq above. In that case, Add should not fail.
995 assert(add_res);
996 }
997
UpdateCallback(SequenceNumber seq,const Slice & key,const Slice & delta)998 bool MemTable::UpdateCallback(SequenceNumber seq,
999 const Slice& key,
1000 const Slice& delta) {
1001 LookupKey lkey(key, seq);
1002 Slice memkey = lkey.memtable_key();
1003
1004 std::unique_ptr<MemTableRep::Iterator> iter(
1005 table_->GetDynamicPrefixIterator());
1006 iter->Seek(lkey.internal_key(), memkey.data());
1007
1008 if (iter->Valid()) {
1009 // entry format is:
1010 // key_length varint32
1011 // userkey char[klength-8]
1012 // tag uint64
1013 // vlength varint32
1014 // value char[vlength]
1015 // Check that it belongs to same user key. We do not check the
1016 // sequence number since the Seek() call above should have skipped
1017 // all entries with overly large sequence numbers.
1018 const char* entry = iter->key();
1019 uint32_t key_length = 0;
1020 const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1021 if (comparator_.comparator.user_comparator()->Equal(
1022 Slice(key_ptr, key_length - 8), lkey.user_key())) {
1023 // Correct user key
1024 const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1025 ValueType type;
1026 uint64_t unused;
1027 UnPackSequenceAndType(tag, &unused, &type);
1028 switch (type) {
1029 case kTypeValue: {
1030 Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1031 uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1032
1033 char* prev_buffer = const_cast<char*>(prev_value.data());
1034 uint32_t new_prev_size = prev_size;
1035
1036 std::string str_value;
1037 WriteLock wl(GetLock(lkey.user_key()));
1038 auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
1039 delta, &str_value);
1040 if (status == UpdateStatus::UPDATED_INPLACE) {
1041 // Value already updated by callback.
1042 assert(new_prev_size <= prev_size);
1043 if (new_prev_size < prev_size) {
1044 // overwrite the new prev_size
1045 char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
1046 new_prev_size);
1047 if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
1048 // shift the value buffer as well.
1049 memcpy(p, prev_buffer, new_prev_size);
1050 }
1051 }
1052 RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1053 UpdateFlushState();
1054 return true;
1055 } else if (status == UpdateStatus::UPDATED) {
1056 Add(seq, kTypeValue, key, Slice(str_value));
1057 RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
1058 UpdateFlushState();
1059 return true;
1060 } else if (status == UpdateStatus::UPDATE_FAILED) {
1061 // No action required. Return.
1062 UpdateFlushState();
1063 return true;
1064 }
1065 }
1066 default:
1067 break;
1068 }
1069 }
1070 }
1071 // If the latest value is not kTypeValue
1072 // or key doesn't exist
1073 return false;
1074 }
1075
CountSuccessiveMergeEntries(const LookupKey & key)1076 size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
1077 Slice memkey = key.memtable_key();
1078
1079 // A total ordered iterator is costly for some memtablerep (prefix aware
1080 // reps). By passing in the user key, we allow efficient iterator creation.
1081 // The iterator only needs to be ordered within the same user key.
1082 std::unique_ptr<MemTableRep::Iterator> iter(
1083 table_->GetDynamicPrefixIterator());
1084 iter->Seek(key.internal_key(), memkey.data());
1085
1086 size_t num_successive_merges = 0;
1087
1088 for (; iter->Valid(); iter->Next()) {
1089 const char* entry = iter->key();
1090 uint32_t key_length = 0;
1091 const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1092 if (!comparator_.comparator.user_comparator()->Equal(
1093 Slice(iter_key_ptr, key_length - 8), key.user_key())) {
1094 break;
1095 }
1096
1097 const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
1098 ValueType type;
1099 uint64_t unused;
1100 UnPackSequenceAndType(tag, &unused, &type);
1101 if (type != kTypeMerge) {
1102 break;
1103 }
1104
1105 ++num_successive_merges;
1106 }
1107
1108 return num_successive_merges;
1109 }
1110
Get(const LookupKey & k,void * callback_args,bool (* callback_func)(void * arg,const char * entry))1111 void MemTableRep::Get(const LookupKey& k, void* callback_args,
1112 bool (*callback_func)(void* arg, const char* entry)) {
1113 auto iter = GetDynamicPrefixIterator();
1114 for (iter->Seek(k.internal_key(), k.memtable_key().data());
1115 iter->Valid() && callback_func(callback_args, iter->key());
1116 iter->Next()) {
1117 }
1118 }
1119
RefLogContainingPrepSection(uint64_t log)1120 void MemTable::RefLogContainingPrepSection(uint64_t log) {
1121 assert(log > 0);
1122 auto cur = min_prep_log_referenced_.load();
1123 while ((log < cur || cur == 0) &&
1124 !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
1125 cur = min_prep_log_referenced_.load();
1126 }
1127 }
1128
GetMinLogContainingPrepSection()1129 uint64_t MemTable::GetMinLogContainingPrepSection() {
1130 return min_prep_log_referenced_.load();
1131 }
1132
1133 } // namespace ROCKSDB_NAMESPACE
1134