1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <atomic> 12 #include <deque> 13 #include <functional> 14 #include <memory> 15 #include <string> 16 #include <unordered_map> 17 #include <vector> 18 #include "db/dbformat.h" 19 #include "db/range_tombstone_fragmenter.h" 20 #include "db/read_callback.h" 21 #include "db/version_edit.h" 22 #include "memory/allocator.h" 23 #include "memory/concurrent_arena.h" 24 #include "monitoring/instrumented_mutex.h" 25 #include "options/cf_options.h" 26 #include "rocksdb/db.h" 27 #include "rocksdb/env.h" 28 #include "rocksdb/memtablerep.h" 29 #include "table/multiget_context.h" 30 #include "util/dynamic_bloom.h" 31 #include "util/hash.h" 32 33 namespace ROCKSDB_NAMESPACE { 34 35 struct FlushJobInfo; 36 class Mutex; 37 class MemTableIterator; 38 class MergeContext; 39 40 struct ImmutableMemTableOptions { 41 explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions, 42 const MutableCFOptions& mutable_cf_options); 43 size_t arena_block_size; 44 uint32_t memtable_prefix_bloom_bits; 45 size_t memtable_huge_page_size; 46 bool memtable_whole_key_filtering; 47 bool inplace_update_support; 48 size_t inplace_update_num_locks; 49 UpdateStatus (*inplace_callback)(char* existing_value, 50 uint32_t* existing_value_size, 51 Slice delta_value, 52 std::string* merged_value); 53 size_t max_successive_merges; 54 Statistics* statistics; 55 MergeOperator* merge_operator; 56 Logger* info_log; 57 }; 58 59 // Batched counters to updated when inserting keys in one write batch. 60 // In post process of the write batch, these can be updated together. 61 // Only used in concurrent memtable insert case. 62 struct MemTablePostProcessInfo { 63 uint64_t data_size = 0; 64 uint64_t num_entries = 0; 65 uint64_t num_deletes = 0; 66 }; 67 68 using MultiGetRange = MultiGetContext::Range; 69 // Note: Many of the methods in this class have comments indicating that 70 // external synchronization is required as these methods are not thread-safe. 71 // It is up to higher layers of code to decide how to prevent concurrent 72 // invokation of these methods. This is usually done by acquiring either 73 // the db mutex or the single writer thread. 74 // 75 // Some of these methods are documented to only require external 76 // synchronization if this memtable is immutable. Calling MarkImmutable() is 77 // not sufficient to guarantee immutability. It is up to higher layers of 78 // code to determine if this MemTable can still be modified by other threads. 79 // Eg: The Superversion stores a pointer to the current MemTable (that can 80 // be modified) and a separate list of the MemTables that can no longer be 81 // written to (aka the 'immutable memtables'). 82 class MemTable { 83 public: 84 struct KeyComparator : public MemTableRep::KeyComparator { 85 const InternalKeyComparator comparator; KeyComparatorKeyComparator86 explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } 87 virtual int operator()(const char* prefix_len_key1, 88 const char* prefix_len_key2) const override; 89 virtual int operator()(const char* prefix_len_key, 90 const DecodedType& key) const override; 91 }; 92 93 // MemTables are reference counted. The initial reference count 94 // is zero and the caller must call Ref() at least once. 95 // 96 // earliest_seq should be the current SequenceNumber in the db such that any 97 // key inserted into this memtable will have an equal or larger seq number. 98 // (When a db is first created, the earliest sequence number will be 0). 99 // If the earliest sequence number is not known, kMaxSequenceNumber may be 100 // used, but this may prevent some transactions from succeeding until the 101 // first key is inserted into the memtable. 102 explicit MemTable(const InternalKeyComparator& comparator, 103 const ImmutableCFOptions& ioptions, 104 const MutableCFOptions& mutable_cf_options, 105 WriteBufferManager* write_buffer_manager, 106 SequenceNumber earliest_seq, uint32_t column_family_id); 107 // No copying allowed 108 MemTable(const MemTable&) = delete; 109 MemTable& operator=(const MemTable&) = delete; 110 111 // Do not delete this MemTable unless Unref() indicates it not in use. 112 ~MemTable(); 113 114 // Increase reference count. 115 // REQUIRES: external synchronization to prevent simultaneous 116 // operations on the same MemTable. Ref()117 void Ref() { ++refs_; } 118 119 // Drop reference count. 120 // If the refcount goes to zero return this memtable, otherwise return null. 121 // REQUIRES: external synchronization to prevent simultaneous 122 // operations on the same MemTable. Unref()123 MemTable* Unref() { 124 --refs_; 125 assert(refs_ >= 0); 126 if (refs_ <= 0) { 127 return this; 128 } 129 return nullptr; 130 } 131 132 // Returns an estimate of the number of bytes of data in use by this 133 // data structure. 134 // 135 // REQUIRES: external synchronization to prevent simultaneous 136 // operations on the same MemTable (unless this Memtable is immutable). 137 size_t ApproximateMemoryUsage(); 138 139 // As a cheap version of `ApproximateMemoryUsage()`, this function doens't 140 // require external synchronization. The value may be less accurate though ApproximateMemoryUsageFast()141 size_t ApproximateMemoryUsageFast() const { 142 return approximate_memory_usage_.load(std::memory_order_relaxed); 143 } 144 145 // This method heuristically determines if the memtable should continue to 146 // host more data. ShouldScheduleFlush()147 bool ShouldScheduleFlush() const { 148 return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; 149 } 150 151 // Returns true if a flush should be scheduled and the caller should 152 // be the one to schedule it MarkFlushScheduled()153 bool MarkFlushScheduled() { 154 auto before = FLUSH_REQUESTED; 155 return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, 156 std::memory_order_relaxed, 157 std::memory_order_relaxed); 158 } 159 160 // Return an iterator that yields the contents of the memtable. 161 // 162 // The caller must ensure that the underlying MemTable remains live 163 // while the returned iterator is live. The keys returned by this 164 // iterator are internal keys encoded by AppendInternalKey in the 165 // db/dbformat.{h,cc} module. 166 // 167 // By default, it returns an iterator for prefix seek if prefix_extractor 168 // is configured in Options. 169 // arena: If not null, the arena needs to be used to allocate the Iterator. 170 // Calling ~Iterator of the iterator will destroy all the states but 171 // those allocated in arena. 172 InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena); 173 174 FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( 175 const ReadOptions& read_options, SequenceNumber read_seq); 176 177 // Add an entry into memtable that maps key to value at the 178 // specified sequence number and with the specified type. 179 // Typically value will be empty if type==kTypeDeletion. 180 // 181 // REQUIRES: if allow_concurrent = false, external synchronization to prevent 182 // simultaneous operations on the same MemTable. 183 // 184 // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and 185 // the <key, seq> already exists. 186 bool Add(SequenceNumber seq, ValueType type, const Slice& key, 187 const Slice& value, bool allow_concurrent = false, 188 MemTablePostProcessInfo* post_process_info = nullptr, 189 void** hint = nullptr); 190 191 // Used to Get value associated with key or Get Merge Operands associated 192 // with key. 193 // If do_merge = true the default behavior which is Get value for key is 194 // executed. Expected behavior is described right below. 195 // If memtable contains a value for key, store it in *value and return true. 196 // If memtable contains a deletion for key, store a NotFound() error 197 // in *status and return true. 198 // If memtable contains Merge operation as the most recent entry for a key, 199 // and the merge process does not stop (not reaching a value or delete), 200 // prepend the current merge operand to *operands. 201 // store MergeInProgress in s, and return false. 202 // Else, return false. 203 // If any operation was found, its most recent sequence number 204 // will be stored in *seq on success (regardless of whether true/false is 205 // returned). Otherwise, *seq will be set to kMaxSequenceNumber. 206 // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other 207 // status returned indicates a corruption or other unexpected error. 208 // If do_merge = false then any Merge Operands encountered for key are simply 209 // stored in merge_context.operands_list and never actually merged to get a 210 // final value. The raw Merge Operands are eventually returned to the user. 211 bool Get(const LookupKey& key, std::string* value, Status* s, 212 MergeContext* merge_context, 213 SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, 214 const ReadOptions& read_opts, ReadCallback* callback = nullptr, 215 bool* is_blob_index = nullptr, bool do_merge = true) { 216 return Get(key, value, /*timestamp=*/nullptr, s, merge_context, 217 max_covering_tombstone_seq, seq, read_opts, callback, 218 is_blob_index, do_merge); 219 } 220 221 bool Get(const LookupKey& key, std::string* value, std::string* timestamp, 222 Status* s, MergeContext* merge_context, 223 SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, 224 const ReadOptions& read_opts, ReadCallback* callback = nullptr, 225 bool* is_blob_index = nullptr, bool do_merge = true); 226 227 bool Get(const LookupKey& key, std::string* value, std::string* timestamp, 228 Status* s, MergeContext* merge_context, 229 SequenceNumber* max_covering_tombstone_seq, 230 const ReadOptions& read_opts, ReadCallback* callback = nullptr, 231 bool* is_blob_index = nullptr, bool do_merge = true) { 232 SequenceNumber seq; 233 return Get(key, value, timestamp, s, merge_context, 234 max_covering_tombstone_seq, &seq, read_opts, callback, 235 is_blob_index, do_merge); 236 } 237 238 void MultiGet(const ReadOptions& read_options, MultiGetRange* range, 239 ReadCallback* callback, bool* is_blob); 240 241 // Attempts to update the new_value inplace, else does normal Add 242 // Pseudocode 243 // if key exists in current memtable && prev_value is of type kTypeValue 244 // if new sizeof(new_value) <= sizeof(prev_value) 245 // update inplace 246 // else add(key, new_value) 247 // else add(key, new_value) 248 // 249 // REQUIRES: external synchronization to prevent simultaneous 250 // operations on the same MemTable. 251 void Update(SequenceNumber seq, 252 const Slice& key, 253 const Slice& value); 254 255 // If prev_value for key exists, attempts to update it inplace. 256 // else returns false 257 // Pseudocode 258 // if key exists in current memtable && prev_value is of type kTypeValue 259 // new_value = delta(prev_value) 260 // if sizeof(new_value) <= sizeof(prev_value) 261 // update inplace 262 // else add(key, new_value) 263 // else return false 264 // 265 // REQUIRES: external synchronization to prevent simultaneous 266 // operations on the same MemTable. 267 bool UpdateCallback(SequenceNumber seq, 268 const Slice& key, 269 const Slice& delta); 270 271 // Returns the number of successive merge entries starting from the newest 272 // entry for the key up to the last non-merge entry or last entry for the 273 // key in the memtable. 274 size_t CountSuccessiveMergeEntries(const LookupKey& key); 275 276 // Update counters and flush status after inserting a whole write batch 277 // Used in concurrent memtable inserts. BatchPostProcess(const MemTablePostProcessInfo & update_counters)278 void BatchPostProcess(const MemTablePostProcessInfo& update_counters) { 279 num_entries_.fetch_add(update_counters.num_entries, 280 std::memory_order_relaxed); 281 data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed); 282 if (update_counters.num_deletes != 0) { 283 num_deletes_.fetch_add(update_counters.num_deletes, 284 std::memory_order_relaxed); 285 } 286 UpdateFlushState(); 287 } 288 289 // Get total number of entries in the mem table. 290 // REQUIRES: external synchronization to prevent simultaneous 291 // operations on the same MemTable (unless this Memtable is immutable). num_entries()292 uint64_t num_entries() const { 293 return num_entries_.load(std::memory_order_relaxed); 294 } 295 296 // Get total number of deletes in the mem table. 297 // REQUIRES: external synchronization to prevent simultaneous 298 // operations on the same MemTable (unless this Memtable is immutable). num_deletes()299 uint64_t num_deletes() const { 300 return num_deletes_.load(std::memory_order_relaxed); 301 } 302 get_data_size()303 uint64_t get_data_size() const { 304 return data_size_.load(std::memory_order_relaxed); 305 } 306 307 // Dynamically change the memtable's capacity. If set below the current usage, 308 // the next key added will trigger a flush. Can only increase size when 309 // memtable prefix bloom is disabled, since we can't easily allocate more 310 // space. UpdateWriteBufferSize(size_t new_write_buffer_size)311 void UpdateWriteBufferSize(size_t new_write_buffer_size) { 312 if (bloom_filter_ == nullptr || 313 new_write_buffer_size < write_buffer_size_) { 314 write_buffer_size_.store(new_write_buffer_size, 315 std::memory_order_relaxed); 316 } 317 } 318 319 // Returns the edits area that is needed for flushing the memtable GetEdits()320 VersionEdit* GetEdits() { return &edit_; } 321 322 // Returns if there is no entry inserted to the mem table. 323 // REQUIRES: external synchronization to prevent simultaneous 324 // operations on the same MemTable (unless this Memtable is immutable). IsEmpty()325 bool IsEmpty() const { return first_seqno_ == 0; } 326 327 // Returns the sequence number of the first element that was inserted 328 // into the memtable. 329 // REQUIRES: external synchronization to prevent simultaneous 330 // operations on the same MemTable (unless this Memtable is immutable). GetFirstSequenceNumber()331 SequenceNumber GetFirstSequenceNumber() { 332 return first_seqno_.load(std::memory_order_relaxed); 333 } 334 335 // Returns the sequence number that is guaranteed to be smaller than or equal 336 // to the sequence number of any key that could be inserted into this 337 // memtable. It can then be assumed that any write with a larger(or equal) 338 // sequence number will be present in this memtable or a later memtable. 339 // 340 // If the earliest sequence number could not be determined, 341 // kMaxSequenceNumber will be returned. GetEarliestSequenceNumber()342 SequenceNumber GetEarliestSequenceNumber() { 343 return earliest_seqno_.load(std::memory_order_relaxed); 344 } 345 346 // DB's latest sequence ID when the memtable is created. This number 347 // may be updated to a more recent one before any key is inserted. GetCreationSeq()348 SequenceNumber GetCreationSeq() const { return creation_seq_; } 349 SetCreationSeq(SequenceNumber sn)350 void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } 351 352 // Returns the next active logfile number when this memtable is about to 353 // be flushed to storage 354 // REQUIRES: external synchronization to prevent simultaneous 355 // operations on the same MemTable. GetNextLogNumber()356 uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } 357 358 // Sets the next active logfile number when this memtable is about to 359 // be flushed to storage 360 // REQUIRES: external synchronization to prevent simultaneous 361 // operations on the same MemTable. SetNextLogNumber(uint64_t num)362 void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } 363 364 // if this memtable contains data from a committed 365 // two phase transaction we must take note of the 366 // log which contains that data so we can know 367 // when to relese that log 368 void RefLogContainingPrepSection(uint64_t log); 369 uint64_t GetMinLogContainingPrepSection(); 370 371 // Notify the underlying storage that no more items will be added. 372 // REQUIRES: external synchronization to prevent simultaneous 373 // operations on the same MemTable. 374 // After MarkImmutable() is called, you should not attempt to 375 // write anything to this MemTable(). (Ie. do not call Add() or Update()). MarkImmutable()376 void MarkImmutable() { 377 table_->MarkReadOnly(); 378 mem_tracker_.DoneAllocating(); 379 } 380 381 // Notify the underlying storage that all data it contained has been 382 // persisted. 383 // REQUIRES: external synchronization to prevent simultaneous 384 // operations on the same MemTable. MarkFlushed()385 void MarkFlushed() { 386 table_->MarkFlushed(); 387 } 388 389 // return true if the current MemTableRep supports merge operator. IsMergeOperatorSupported()390 bool IsMergeOperatorSupported() const { 391 return table_->IsMergeOperatorSupported(); 392 } 393 394 // return true if the current MemTableRep supports snapshots. 395 // inplace update prevents snapshots, IsSnapshotSupported()396 bool IsSnapshotSupported() const { 397 return table_->IsSnapshotSupported() && !moptions_.inplace_update_support; 398 } 399 400 struct MemTableStats { 401 uint64_t size; 402 uint64_t count; 403 }; 404 405 MemTableStats ApproximateStats(const Slice& start_ikey, 406 const Slice& end_ikey); 407 408 // Get the lock associated for the key 409 port::RWMutex* GetLock(const Slice& key); 410 GetInternalKeyComparator()411 const InternalKeyComparator& GetInternalKeyComparator() const { 412 return comparator_.comparator; 413 } 414 GetImmutableMemTableOptions()415 const ImmutableMemTableOptions* GetImmutableMemTableOptions() const { 416 return &moptions_; 417 } 418 ApproximateOldestKeyTime()419 uint64_t ApproximateOldestKeyTime() const { 420 return oldest_key_time_.load(std::memory_order_relaxed); 421 } 422 423 // REQUIRES: db_mutex held. SetID(uint64_t id)424 void SetID(uint64_t id) { id_ = id; } 425 GetID()426 uint64_t GetID() const { return id_; } 427 SetFlushCompleted(bool completed)428 void SetFlushCompleted(bool completed) { flush_completed_ = completed; } 429 GetFileNumber()430 uint64_t GetFileNumber() const { return file_number_; } 431 SetFileNumber(uint64_t file_num)432 void SetFileNumber(uint64_t file_num) { file_number_ = file_num; } 433 SetFlushInProgress(bool in_progress)434 void SetFlushInProgress(bool in_progress) { 435 flush_in_progress_ = in_progress; 436 } 437 438 #ifndef ROCKSDB_LITE SetFlushJobInfo(std::unique_ptr<FlushJobInfo> && info)439 void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) { 440 flush_job_info_ = std::move(info); 441 } 442 ReleaseFlushJobInfo()443 std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() { 444 return std::move(flush_job_info_); 445 } 446 #endif // !ROCKSDB_LITE 447 448 private: 449 enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; 450 451 friend class MemTableIterator; 452 friend class MemTableBackwardIterator; 453 friend class MemTableList; 454 455 KeyComparator comparator_; 456 const ImmutableMemTableOptions moptions_; 457 int refs_; 458 const size_t kArenaBlockSize; 459 AllocTracker mem_tracker_; 460 ConcurrentArena arena_; 461 std::unique_ptr<MemTableRep> table_; 462 std::unique_ptr<MemTableRep> range_del_table_; 463 std::atomic_bool is_range_del_table_empty_; 464 465 // Total data size of all data inserted 466 std::atomic<uint64_t> data_size_; 467 std::atomic<uint64_t> num_entries_; 468 std::atomic<uint64_t> num_deletes_; 469 470 // Dynamically changeable memtable option 471 std::atomic<size_t> write_buffer_size_; 472 473 // These are used to manage memtable flushes to storage 474 bool flush_in_progress_; // started the flush 475 bool flush_completed_; // finished the flush 476 uint64_t file_number_; // filled up after flush is complete 477 478 // The updates to be applied to the transaction log when this 479 // memtable is flushed to storage. 480 VersionEdit edit_; 481 482 // The sequence number of the kv that was inserted first 483 std::atomic<SequenceNumber> first_seqno_; 484 485 // The db sequence number at the time of creation or kMaxSequenceNumber 486 // if not set. 487 std::atomic<SequenceNumber> earliest_seqno_; 488 489 SequenceNumber creation_seq_; 490 491 // The log files earlier than this number can be deleted. 492 uint64_t mem_next_logfile_number_; 493 494 // the earliest log containing a prepared section 495 // which has been inserted into this memtable. 496 std::atomic<uint64_t> min_prep_log_referenced_; 497 498 // rw locks for inplace updates 499 std::vector<port::RWMutex> locks_; 500 501 const SliceTransform* const prefix_extractor_; 502 std::unique_ptr<DynamicBloom> bloom_filter_; 503 504 std::atomic<FlushStateEnum> flush_state_; 505 506 Env* env_; 507 508 // Extract sequential insert prefixes. 509 const SliceTransform* insert_with_hint_prefix_extractor_; 510 511 // Insert hints for each prefix. 512 std::unordered_map<Slice, void*, SliceHasher> insert_hints_; 513 514 // Timestamp of oldest key 515 std::atomic<uint64_t> oldest_key_time_; 516 517 // Memtable id to track flush. 518 uint64_t id_ = 0; 519 520 // Sequence number of the atomic flush that is responsible for this memtable. 521 // The sequence number of atomic flush is a seq, such that no writes with 522 // sequence numbers greater than or equal to seq are flushed, while all 523 // writes with sequence number smaller than seq are flushed. 524 SequenceNumber atomic_flush_seqno_; 525 526 // keep track of memory usage in table_, arena_, and range_del_table_. 527 // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` 528 std::atomic<uint64_t> approximate_memory_usage_; 529 530 #ifndef ROCKSDB_LITE 531 // Flush job info of the current memtable. 532 std::unique_ptr<FlushJobInfo> flush_job_info_; 533 #endif // !ROCKSDB_LITE 534 535 // Returns a heuristic flush decision 536 bool ShouldFlushNow(); 537 538 // Updates flush_state_ using ShouldFlushNow() 539 void UpdateFlushState(); 540 541 void UpdateOldestKeyTime(); 542 543 void GetFromTable(const LookupKey& key, 544 SequenceNumber max_covering_tombstone_seq, bool do_merge, 545 ReadCallback* callback, bool* is_blob_index, 546 std::string* value, std::string* timestamp, Status* s, 547 MergeContext* merge_context, SequenceNumber* seq, 548 bool* found_final_value, bool* merge_in_progress); 549 }; 550 551 extern const char* EncodeKey(std::string* scratch, const Slice& target); 552 553 } // namespace ROCKSDB_NAMESPACE 554