1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <atomic>
12 #include <deque>
13 #include <functional>
14 #include <memory>
15 #include <string>
16 #include <unordered_map>
17 #include <vector>
18 #include "db/dbformat.h"
19 #include "db/range_tombstone_fragmenter.h"
20 #include "db/read_callback.h"
21 #include "db/version_edit.h"
22 #include "memory/allocator.h"
23 #include "memory/concurrent_arena.h"
24 #include "monitoring/instrumented_mutex.h"
25 #include "options/cf_options.h"
26 #include "rocksdb/db.h"
27 #include "rocksdb/env.h"
28 #include "rocksdb/memtablerep.h"
29 #include "table/multiget_context.h"
30 #include "util/dynamic_bloom.h"
31 #include "util/hash.h"
32 
33 namespace ROCKSDB_NAMESPACE {
34 
35 struct FlushJobInfo;
36 class Mutex;
37 class MemTableIterator;
38 class MergeContext;
39 
40 struct ImmutableMemTableOptions {
41   explicit ImmutableMemTableOptions(const ImmutableCFOptions& ioptions,
42                                     const MutableCFOptions& mutable_cf_options);
43   size_t arena_block_size;
44   uint32_t memtable_prefix_bloom_bits;
45   size_t memtable_huge_page_size;
46   bool memtable_whole_key_filtering;
47   bool inplace_update_support;
48   size_t inplace_update_num_locks;
49   UpdateStatus (*inplace_callback)(char* existing_value,
50                                    uint32_t* existing_value_size,
51                                    Slice delta_value,
52                                    std::string* merged_value);
53   size_t max_successive_merges;
54   Statistics* statistics;
55   MergeOperator* merge_operator;
56   Logger* info_log;
57 };
58 
59 // Batched counters to updated when inserting keys in one write batch.
60 // In post process of the write batch, these can be updated together.
61 // Only used in concurrent memtable insert case.
62 struct MemTablePostProcessInfo {
63   uint64_t data_size = 0;
64   uint64_t num_entries = 0;
65   uint64_t num_deletes = 0;
66 };
67 
68 using MultiGetRange = MultiGetContext::Range;
69 // Note:  Many of the methods in this class have comments indicating that
70 // external synchronization is required as these methods are not thread-safe.
71 // It is up to higher layers of code to decide how to prevent concurrent
72 // invokation of these methods.  This is usually done by acquiring either
73 // the db mutex or the single writer thread.
74 //
75 // Some of these methods are documented to only require external
76 // synchronization if this memtable is immutable.  Calling MarkImmutable() is
77 // not sufficient to guarantee immutability.  It is up to higher layers of
78 // code to determine if this MemTable can still be modified by other threads.
79 // Eg: The Superversion stores a pointer to the current MemTable (that can
80 // be modified) and a separate list of the MemTables that can no longer be
81 // written to (aka the 'immutable memtables').
82 class MemTable {
83  public:
84   struct KeyComparator : public MemTableRep::KeyComparator {
85     const InternalKeyComparator comparator;
KeyComparatorKeyComparator86     explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
87     virtual int operator()(const char* prefix_len_key1,
88                            const char* prefix_len_key2) const override;
89     virtual int operator()(const char* prefix_len_key,
90                            const DecodedType& key) const override;
91   };
92 
93   // MemTables are reference counted.  The initial reference count
94   // is zero and the caller must call Ref() at least once.
95   //
96   // earliest_seq should be the current SequenceNumber in the db such that any
97   // key inserted into this memtable will have an equal or larger seq number.
98   // (When a db is first created, the earliest sequence number will be 0).
99   // If the earliest sequence number is not known, kMaxSequenceNumber may be
100   // used, but this may prevent some transactions from succeeding until the
101   // first key is inserted into the memtable.
102   explicit MemTable(const InternalKeyComparator& comparator,
103                     const ImmutableCFOptions& ioptions,
104                     const MutableCFOptions& mutable_cf_options,
105                     WriteBufferManager* write_buffer_manager,
106                     SequenceNumber earliest_seq, uint32_t column_family_id);
107   // No copying allowed
108   MemTable(const MemTable&) = delete;
109   MemTable& operator=(const MemTable&) = delete;
110 
111   // Do not delete this MemTable unless Unref() indicates it not in use.
112   ~MemTable();
113 
114   // Increase reference count.
115   // REQUIRES: external synchronization to prevent simultaneous
116   // operations on the same MemTable.
Ref()117   void Ref() { ++refs_; }
118 
119   // Drop reference count.
120   // If the refcount goes to zero return this memtable, otherwise return null.
121   // REQUIRES: external synchronization to prevent simultaneous
122   // operations on the same MemTable.
Unref()123   MemTable* Unref() {
124     --refs_;
125     assert(refs_ >= 0);
126     if (refs_ <= 0) {
127       return this;
128     }
129     return nullptr;
130   }
131 
132   // Returns an estimate of the number of bytes of data in use by this
133   // data structure.
134   //
135   // REQUIRES: external synchronization to prevent simultaneous
136   // operations on the same MemTable (unless this Memtable is immutable).
137   size_t ApproximateMemoryUsage();
138 
139   // As a cheap version of `ApproximateMemoryUsage()`, this function doens't
140   // require external synchronization. The value may be less accurate though
ApproximateMemoryUsageFast()141   size_t ApproximateMemoryUsageFast() const {
142     return approximate_memory_usage_.load(std::memory_order_relaxed);
143   }
144 
145   // This method heuristically determines if the memtable should continue to
146   // host more data.
ShouldScheduleFlush()147   bool ShouldScheduleFlush() const {
148     return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED;
149   }
150 
151   // Returns true if a flush should be scheduled and the caller should
152   // be the one to schedule it
MarkFlushScheduled()153   bool MarkFlushScheduled() {
154     auto before = FLUSH_REQUESTED;
155     return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED,
156                                                 std::memory_order_relaxed,
157                                                 std::memory_order_relaxed);
158   }
159 
160   // Return an iterator that yields the contents of the memtable.
161   //
162   // The caller must ensure that the underlying MemTable remains live
163   // while the returned iterator is live.  The keys returned by this
164   // iterator are internal keys encoded by AppendInternalKey in the
165   // db/dbformat.{h,cc} module.
166   //
167   // By default, it returns an iterator for prefix seek if prefix_extractor
168   // is configured in Options.
169   // arena: If not null, the arena needs to be used to allocate the Iterator.
170   //        Calling ~Iterator of the iterator will destroy all the states but
171   //        those allocated in arena.
172   InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
173 
174   FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
175       const ReadOptions& read_options, SequenceNumber read_seq);
176 
177   // Add an entry into memtable that maps key to value at the
178   // specified sequence number and with the specified type.
179   // Typically value will be empty if type==kTypeDeletion.
180   //
181   // REQUIRES: if allow_concurrent = false, external synchronization to prevent
182   // simultaneous operations on the same MemTable.
183   //
184   // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
185   // the <key, seq> already exists.
186   bool Add(SequenceNumber seq, ValueType type, const Slice& key,
187            const Slice& value, bool allow_concurrent = false,
188            MemTablePostProcessInfo* post_process_info = nullptr,
189            void** hint = nullptr);
190 
191   // Used to Get value associated with key or Get Merge Operands associated
192   // with key.
193   // If do_merge = true the default behavior which is Get value for key is
194   // executed. Expected behavior is described right below.
195   // If memtable contains a value for key, store it in *value and return true.
196   // If memtable contains a deletion for key, store a NotFound() error
197   // in *status and return true.
198   // If memtable contains Merge operation as the most recent entry for a key,
199   //   and the merge process does not stop (not reaching a value or delete),
200   //   prepend the current merge operand to *operands.
201   //   store MergeInProgress in s, and return false.
202   // Else, return false.
203   // If any operation was found, its most recent sequence number
204   // will be stored in *seq on success (regardless of whether true/false is
205   // returned).  Otherwise, *seq will be set to kMaxSequenceNumber.
206   // On success, *s may be set to OK, NotFound, or MergeInProgress.  Any other
207   // status returned indicates a corruption or other unexpected error.
208   // If do_merge = false then any Merge Operands encountered for key are simply
209   // stored in merge_context.operands_list and never actually merged to get a
210   // final value. The raw Merge Operands are eventually returned to the user.
211   bool Get(const LookupKey& key, std::string* value, Status* s,
212            MergeContext* merge_context,
213            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
214            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
215            bool* is_blob_index = nullptr, bool do_merge = true) {
216     return Get(key, value, /*timestamp=*/nullptr, s, merge_context,
217                max_covering_tombstone_seq, seq, read_opts, callback,
218                is_blob_index, do_merge);
219   }
220 
221   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
222            Status* s, MergeContext* merge_context,
223            SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
224            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
225            bool* is_blob_index = nullptr, bool do_merge = true);
226 
227   bool Get(const LookupKey& key, std::string* value, std::string* timestamp,
228            Status* s, MergeContext* merge_context,
229            SequenceNumber* max_covering_tombstone_seq,
230            const ReadOptions& read_opts, ReadCallback* callback = nullptr,
231            bool* is_blob_index = nullptr, bool do_merge = true) {
232     SequenceNumber seq;
233     return Get(key, value, timestamp, s, merge_context,
234                max_covering_tombstone_seq, &seq, read_opts, callback,
235                is_blob_index, do_merge);
236   }
237 
238   void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
239                 ReadCallback* callback, bool* is_blob);
240 
241   // Attempts to update the new_value inplace, else does normal Add
242   // Pseudocode
243   //   if key exists in current memtable && prev_value is of type kTypeValue
244   //     if new sizeof(new_value) <= sizeof(prev_value)
245   //       update inplace
246   //     else add(key, new_value)
247   //   else add(key, new_value)
248   //
249   // REQUIRES: external synchronization to prevent simultaneous
250   // operations on the same MemTable.
251   void Update(SequenceNumber seq,
252               const Slice& key,
253               const Slice& value);
254 
255   // If prev_value for key exists, attempts to update it inplace.
256   // else returns false
257   // Pseudocode
258   //   if key exists in current memtable && prev_value is of type kTypeValue
259   //     new_value = delta(prev_value)
260   //     if sizeof(new_value) <= sizeof(prev_value)
261   //       update inplace
262   //     else add(key, new_value)
263   //   else return false
264   //
265   // REQUIRES: external synchronization to prevent simultaneous
266   // operations on the same MemTable.
267   bool UpdateCallback(SequenceNumber seq,
268                       const Slice& key,
269                       const Slice& delta);
270 
271   // Returns the number of successive merge entries starting from the newest
272   // entry for the key up to the last non-merge entry or last entry for the
273   // key in the memtable.
274   size_t CountSuccessiveMergeEntries(const LookupKey& key);
275 
276   // Update counters and flush status after inserting a whole write batch
277   // Used in concurrent memtable inserts.
BatchPostProcess(const MemTablePostProcessInfo & update_counters)278   void BatchPostProcess(const MemTablePostProcessInfo& update_counters) {
279     num_entries_.fetch_add(update_counters.num_entries,
280                            std::memory_order_relaxed);
281     data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed);
282     if (update_counters.num_deletes != 0) {
283       num_deletes_.fetch_add(update_counters.num_deletes,
284                              std::memory_order_relaxed);
285     }
286     UpdateFlushState();
287   }
288 
289   // Get total number of entries in the mem table.
290   // REQUIRES: external synchronization to prevent simultaneous
291   // operations on the same MemTable (unless this Memtable is immutable).
num_entries()292   uint64_t num_entries() const {
293     return num_entries_.load(std::memory_order_relaxed);
294   }
295 
296   // Get total number of deletes in the mem table.
297   // REQUIRES: external synchronization to prevent simultaneous
298   // operations on the same MemTable (unless this Memtable is immutable).
num_deletes()299   uint64_t num_deletes() const {
300     return num_deletes_.load(std::memory_order_relaxed);
301   }
302 
get_data_size()303   uint64_t get_data_size() const {
304     return data_size_.load(std::memory_order_relaxed);
305   }
306 
307   // Dynamically change the memtable's capacity. If set below the current usage,
308   // the next key added will trigger a flush. Can only increase size when
309   // memtable prefix bloom is disabled, since we can't easily allocate more
310   // space.
UpdateWriteBufferSize(size_t new_write_buffer_size)311   void UpdateWriteBufferSize(size_t new_write_buffer_size) {
312     if (bloom_filter_ == nullptr ||
313         new_write_buffer_size < write_buffer_size_) {
314       write_buffer_size_.store(new_write_buffer_size,
315                                std::memory_order_relaxed);
316     }
317   }
318 
319   // Returns the edits area that is needed for flushing the memtable
GetEdits()320   VersionEdit* GetEdits() { return &edit_; }
321 
322   // Returns if there is no entry inserted to the mem table.
323   // REQUIRES: external synchronization to prevent simultaneous
324   // operations on the same MemTable (unless this Memtable is immutable).
IsEmpty()325   bool IsEmpty() const { return first_seqno_ == 0; }
326 
327   // Returns the sequence number of the first element that was inserted
328   // into the memtable.
329   // REQUIRES: external synchronization to prevent simultaneous
330   // operations on the same MemTable (unless this Memtable is immutable).
GetFirstSequenceNumber()331   SequenceNumber GetFirstSequenceNumber() {
332     return first_seqno_.load(std::memory_order_relaxed);
333   }
334 
335   // Returns the sequence number that is guaranteed to be smaller than or equal
336   // to the sequence number of any key that could be inserted into this
337   // memtable. It can then be assumed that any write with a larger(or equal)
338   // sequence number will be present in this memtable or a later memtable.
339   //
340   // If the earliest sequence number could not be determined,
341   // kMaxSequenceNumber will be returned.
GetEarliestSequenceNumber()342   SequenceNumber GetEarliestSequenceNumber() {
343     return earliest_seqno_.load(std::memory_order_relaxed);
344   }
345 
346   // DB's latest sequence ID when the memtable is created. This number
347   // may be updated to a more recent one before any key is inserted.
GetCreationSeq()348   SequenceNumber GetCreationSeq() const { return creation_seq_; }
349 
SetCreationSeq(SequenceNumber sn)350   void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; }
351 
352   // Returns the next active logfile number when this memtable is about to
353   // be flushed to storage
354   // REQUIRES: external synchronization to prevent simultaneous
355   // operations on the same MemTable.
GetNextLogNumber()356   uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
357 
358   // Sets the next active logfile number when this memtable is about to
359   // be flushed to storage
360   // REQUIRES: external synchronization to prevent simultaneous
361   // operations on the same MemTable.
SetNextLogNumber(uint64_t num)362   void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
363 
364   // if this memtable contains data from a committed
365   // two phase transaction we must take note of the
366   // log which contains that data so we can know
367   // when to relese that log
368   void RefLogContainingPrepSection(uint64_t log);
369   uint64_t GetMinLogContainingPrepSection();
370 
371   // Notify the underlying storage that no more items will be added.
372   // REQUIRES: external synchronization to prevent simultaneous
373   // operations on the same MemTable.
374   // After MarkImmutable() is called, you should not attempt to
375   // write anything to this MemTable().  (Ie. do not call Add() or Update()).
MarkImmutable()376   void MarkImmutable() {
377     table_->MarkReadOnly();
378     mem_tracker_.DoneAllocating();
379   }
380 
381   // Notify the underlying storage that all data it contained has been
382   // persisted.
383   // REQUIRES: external synchronization to prevent simultaneous
384   // operations on the same MemTable.
MarkFlushed()385   void MarkFlushed() {
386     table_->MarkFlushed();
387   }
388 
389   // return true if the current MemTableRep supports merge operator.
IsMergeOperatorSupported()390   bool IsMergeOperatorSupported() const {
391     return table_->IsMergeOperatorSupported();
392   }
393 
394   // return true if the current MemTableRep supports snapshots.
395   // inplace update prevents snapshots,
IsSnapshotSupported()396   bool IsSnapshotSupported() const {
397     return table_->IsSnapshotSupported() && !moptions_.inplace_update_support;
398   }
399 
400   struct MemTableStats {
401     uint64_t size;
402     uint64_t count;
403   };
404 
405   MemTableStats ApproximateStats(const Slice& start_ikey,
406                                  const Slice& end_ikey);
407 
408   // Get the lock associated for the key
409   port::RWMutex* GetLock(const Slice& key);
410 
GetInternalKeyComparator()411   const InternalKeyComparator& GetInternalKeyComparator() const {
412     return comparator_.comparator;
413   }
414 
GetImmutableMemTableOptions()415   const ImmutableMemTableOptions* GetImmutableMemTableOptions() const {
416     return &moptions_;
417   }
418 
ApproximateOldestKeyTime()419   uint64_t ApproximateOldestKeyTime() const {
420     return oldest_key_time_.load(std::memory_order_relaxed);
421   }
422 
423   // REQUIRES: db_mutex held.
SetID(uint64_t id)424   void SetID(uint64_t id) { id_ = id; }
425 
GetID()426   uint64_t GetID() const { return id_; }
427 
SetFlushCompleted(bool completed)428   void SetFlushCompleted(bool completed) { flush_completed_ = completed; }
429 
GetFileNumber()430   uint64_t GetFileNumber() const { return file_number_; }
431 
SetFileNumber(uint64_t file_num)432   void SetFileNumber(uint64_t file_num) { file_number_ = file_num; }
433 
SetFlushInProgress(bool in_progress)434   void SetFlushInProgress(bool in_progress) {
435     flush_in_progress_ = in_progress;
436   }
437 
438 #ifndef ROCKSDB_LITE
SetFlushJobInfo(std::unique_ptr<FlushJobInfo> && info)439   void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
440     flush_job_info_ = std::move(info);
441   }
442 
ReleaseFlushJobInfo()443   std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
444     return std::move(flush_job_info_);
445   }
446 #endif  // !ROCKSDB_LITE
447 
448  private:
449   enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
450 
451   friend class MemTableIterator;
452   friend class MemTableBackwardIterator;
453   friend class MemTableList;
454 
455   KeyComparator comparator_;
456   const ImmutableMemTableOptions moptions_;
457   int refs_;
458   const size_t kArenaBlockSize;
459   AllocTracker mem_tracker_;
460   ConcurrentArena arena_;
461   std::unique_ptr<MemTableRep> table_;
462   std::unique_ptr<MemTableRep> range_del_table_;
463   std::atomic_bool is_range_del_table_empty_;
464 
465   // Total data size of all data inserted
466   std::atomic<uint64_t> data_size_;
467   std::atomic<uint64_t> num_entries_;
468   std::atomic<uint64_t> num_deletes_;
469 
470   // Dynamically changeable memtable option
471   std::atomic<size_t> write_buffer_size_;
472 
473   // These are used to manage memtable flushes to storage
474   bool flush_in_progress_; // started the flush
475   bool flush_completed_;   // finished the flush
476   uint64_t file_number_;    // filled up after flush is complete
477 
478   // The updates to be applied to the transaction log when this
479   // memtable is flushed to storage.
480   VersionEdit edit_;
481 
482   // The sequence number of the kv that was inserted first
483   std::atomic<SequenceNumber> first_seqno_;
484 
485   // The db sequence number at the time of creation or kMaxSequenceNumber
486   // if not set.
487   std::atomic<SequenceNumber> earliest_seqno_;
488 
489   SequenceNumber creation_seq_;
490 
491   // The log files earlier than this number can be deleted.
492   uint64_t mem_next_logfile_number_;
493 
494   // the earliest log containing a prepared section
495   // which has been inserted into this memtable.
496   std::atomic<uint64_t> min_prep_log_referenced_;
497 
498   // rw locks for inplace updates
499   std::vector<port::RWMutex> locks_;
500 
501   const SliceTransform* const prefix_extractor_;
502   std::unique_ptr<DynamicBloom> bloom_filter_;
503 
504   std::atomic<FlushStateEnum> flush_state_;
505 
506   Env* env_;
507 
508   // Extract sequential insert prefixes.
509   const SliceTransform* insert_with_hint_prefix_extractor_;
510 
511   // Insert hints for each prefix.
512   std::unordered_map<Slice, void*, SliceHasher> insert_hints_;
513 
514   // Timestamp of oldest key
515   std::atomic<uint64_t> oldest_key_time_;
516 
517   // Memtable id to track flush.
518   uint64_t id_ = 0;
519 
520   // Sequence number of the atomic flush that is responsible for this memtable.
521   // The sequence number of atomic flush is a seq, such that no writes with
522   // sequence numbers greater than or equal to seq are flushed, while all
523   // writes with sequence number smaller than seq are flushed.
524   SequenceNumber atomic_flush_seqno_;
525 
526   // keep track of memory usage in table_, arena_, and range_del_table_.
527   // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
528   std::atomic<uint64_t> approximate_memory_usage_;
529 
530 #ifndef ROCKSDB_LITE
531   // Flush job info of the current memtable.
532   std::unique_ptr<FlushJobInfo> flush_job_info_;
533 #endif  // !ROCKSDB_LITE
534 
535   // Returns a heuristic flush decision
536   bool ShouldFlushNow();
537 
538   // Updates flush_state_ using ShouldFlushNow()
539   void UpdateFlushState();
540 
541   void UpdateOldestKeyTime();
542 
543   void GetFromTable(const LookupKey& key,
544                     SequenceNumber max_covering_tombstone_seq, bool do_merge,
545                     ReadCallback* callback, bool* is_blob_index,
546                     std::string* value, std::string* timestamp, Status* s,
547                     MergeContext* merge_context, SequenceNumber* seq,
548                     bool* found_final_value, bool* merge_in_progress);
549 };
550 
551 extern const char* EncodeKey(std::string* scratch, const Slice& target);
552 
553 }  // namespace ROCKSDB_NAMESPACE
554