1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 7 #include <algorithm> 8 #include <deque> 9 #include <string> 10 #include <unordered_set> 11 #include <vector> 12 13 #include "db/compaction/compaction.h" 14 #include "db/compaction/compaction_iteration_stats.h" 15 #include "db/merge_helper.h" 16 #include "db/pinned_iterators_manager.h" 17 #include "db/range_del_aggregator.h" 18 #include "db/snapshot_checker.h" 19 #include "options/cf_options.h" 20 #include "rocksdb/compaction_filter.h" 21 22 namespace ROCKSDB_NAMESPACE { 23 24 class CompactionIterator { 25 public: 26 // A wrapper around Compaction. Has a much smaller interface, only what 27 // CompactionIterator uses. Tests can override it. 28 class CompactionProxy { 29 public: CompactionProxy(const Compaction * compaction)30 explicit CompactionProxy(const Compaction* compaction) 31 : compaction_(compaction) {} 32 33 virtual ~CompactionProxy() = default; 34 virtual int level(size_t /*compaction_input_level*/ = 0) const { 35 return compaction_->level(); 36 } KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs)37 virtual bool KeyNotExistsBeyondOutputLevel( 38 const Slice& user_key, std::vector<size_t>* level_ptrs) const { 39 return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); 40 } bottommost_level()41 virtual bool bottommost_level() const { 42 return compaction_->bottommost_level(); 43 } number_levels()44 virtual int number_levels() const { return compaction_->number_levels(); } GetLargestUserKey()45 virtual Slice GetLargestUserKey() const { 46 return compaction_->GetLargestUserKey(); 47 } allow_ingest_behind()48 virtual bool allow_ingest_behind() const { 49 return compaction_->immutable_cf_options()->allow_ingest_behind; 50 } preserve_deletes()51 virtual bool preserve_deletes() const { 52 return compaction_->immutable_cf_options()->preserve_deletes; 53 } 54 55 protected: 56 CompactionProxy() = default; 57 58 private: 59 const Compaction* compaction_; 60 }; 61 62 CompactionIterator( 63 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, 64 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, 65 SequenceNumber earliest_write_conflict_snapshot, 66 const SnapshotChecker* snapshot_checker, Env* env, 67 bool report_detailed_time, bool expect_valid_internal_key, 68 CompactionRangeDelAggregator* range_del_agg, 69 const Compaction* compaction = nullptr, 70 const CompactionFilter* compaction_filter = nullptr, 71 const std::atomic<bool>* shutting_down = nullptr, 72 const SequenceNumber preserve_deletes_seqnum = 0, 73 const std::atomic<bool>* manual_compaction_paused = nullptr, 74 const std::shared_ptr<Logger> info_log = nullptr); 75 76 // Constructor with custom CompactionProxy, used for tests. 77 CompactionIterator( 78 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, 79 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, 80 SequenceNumber earliest_write_conflict_snapshot, 81 const SnapshotChecker* snapshot_checker, Env* env, 82 bool report_detailed_time, bool expect_valid_internal_key, 83 CompactionRangeDelAggregator* range_del_agg, 84 std::unique_ptr<CompactionProxy> compaction, 85 const CompactionFilter* compaction_filter = nullptr, 86 const std::atomic<bool>* shutting_down = nullptr, 87 const SequenceNumber preserve_deletes_seqnum = 0, 88 const std::atomic<bool>* manual_compaction_paused = nullptr, 89 const std::shared_ptr<Logger> info_log = nullptr); 90 91 ~CompactionIterator(); 92 93 void ResetRecordCounts(); 94 95 // Seek to the beginning of the compaction iterator output. 96 // 97 // REQUIRED: Call only once. 98 void SeekToFirst(); 99 100 // Produces the next record in the compaction. 101 // 102 // REQUIRED: SeekToFirst() has been called. 103 void Next(); 104 105 // Getters key()106 const Slice& key() const { return key_; } value()107 const Slice& value() const { return value_; } status()108 const Status& status() const { return status_; } ikey()109 const ParsedInternalKey& ikey() const { return ikey_; } Valid()110 bool Valid() const { return valid_; } user_key()111 const Slice& user_key() const { return current_user_key_; } iter_stats()112 const CompactionIterationStats& iter_stats() const { return iter_stats_; } 113 114 private: 115 // Processes the input stream to find the next output 116 void NextFromInput(); 117 118 // Do last preparations before presenting the output to the callee. At this 119 // point this only zeroes out the sequence number if possible for better 120 // compression. 121 void PrepareOutput(); 122 123 // Invoke compaction filter if needed. 124 void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until); 125 126 // Given a sequence number, return the sequence number of the 127 // earliest snapshot that this sequence number is visible in. 128 // The snapshots themselves are arranged in ascending order of 129 // sequence numbers. 130 // Employ a sequential search because the total number of 131 // snapshots are typically small. 132 inline SequenceNumber findEarliestVisibleSnapshot( 133 SequenceNumber in, SequenceNumber* prev_snapshot); 134 135 // Checks whether the currently seen ikey_ is needed for 136 // incremental (differential) snapshot and hence can't be dropped 137 // or seqnum be zero-ed out even if all other conditions for it are met. 138 inline bool ikeyNotNeededForIncrementalSnapshot(); 139 KeyCommitted(SequenceNumber sequence)140 inline bool KeyCommitted(SequenceNumber sequence) { 141 return snapshot_checker_ == nullptr || 142 snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) == 143 SnapshotCheckerResult::kInSnapshot; 144 } 145 146 bool IsInEarliestSnapshot(SequenceNumber sequence); 147 148 InternalIterator* input_; 149 const Comparator* cmp_; 150 MergeHelper* merge_helper_; 151 const std::vector<SequenceNumber>* snapshots_; 152 // List of snapshots released during compaction. 153 // findEarliestVisibleSnapshot() find them out from return of 154 // snapshot_checker, and make sure they will not be returned as 155 // earliest visible snapshot of an older value. 156 // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3. 157 std::unordered_set<SequenceNumber> released_snapshots_; 158 std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_; 159 const SequenceNumber earliest_write_conflict_snapshot_; 160 const SnapshotChecker* const snapshot_checker_; 161 Env* env_; 162 bool report_detailed_time_; 163 bool expect_valid_internal_key_; 164 CompactionRangeDelAggregator* range_del_agg_; 165 std::unique_ptr<CompactionProxy> compaction_; 166 const CompactionFilter* compaction_filter_; 167 const std::atomic<bool>* shutting_down_; 168 const std::atomic<bool>* manual_compaction_paused_; 169 const SequenceNumber preserve_deletes_seqnum_; 170 bool bottommost_level_; 171 bool valid_ = false; 172 bool visible_at_tip_; 173 SequenceNumber earliest_snapshot_; 174 SequenceNumber latest_snapshot_; 175 176 // State 177 // 178 // Points to a copy of the current compaction iterator output (current_key_) 179 // if valid_. 180 Slice key_; 181 // Points to the value in the underlying iterator that corresponds to the 182 // current output. 183 Slice value_; 184 // The status is OK unless compaction iterator encounters a merge operand 185 // while not having a merge operator defined. 186 Status status_; 187 // Stores the user key, sequence number and type of the current compaction 188 // iterator output (or current key in the underlying iterator during 189 // NextFromInput()). 190 ParsedInternalKey ikey_; 191 // Stores whether ikey_.user_key is valid. If set to false, the user key is 192 // not compared against the current key in the underlying iterator. 193 bool has_current_user_key_ = false; 194 bool at_next_ = false; // If false, the iterator 195 // Holds a copy of the current compaction iterator output (or current key in 196 // the underlying iterator during NextFromInput()). 197 IterKey current_key_; 198 Slice current_user_key_; 199 SequenceNumber current_user_key_sequence_; 200 SequenceNumber current_user_key_snapshot_; 201 202 // True if the iterator has already returned a record for the current key. 203 bool has_outputted_key_ = false; 204 205 // truncated the value of the next key and output it without applying any 206 // compaction rules. This is used for outputting a put after a single delete. 207 bool clear_and_output_next_key_ = false; 208 209 MergeOutputIterator merge_out_iter_; 210 // PinnedIteratorsManager used to pin input_ Iterator blocks while reading 211 // merge operands and then releasing them after consuming them. 212 PinnedIteratorsManager pinned_iters_mgr_; 213 std::string compaction_filter_value_; 214 InternalKey compaction_filter_skip_until_; 215 // "level_ptrs" holds indices that remember which file of an associated 216 // level we were last checking during the last call to compaction-> 217 // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function 218 // to pick off where it left off since each subcompaction's key range is 219 // increasing so a later call to the function must be looking for a key that 220 // is in or beyond the last file checked during the previous call 221 std::vector<size_t> level_ptrs_; 222 CompactionIterationStats iter_stats_; 223 224 // Used to avoid purging uncommitted values. The application can specify 225 // uncommitted values by providing a SnapshotChecker object. 226 bool current_key_committed_; 227 std::shared_ptr<Logger> info_log_; 228 IsShuttingDown()229 bool IsShuttingDown() { 230 // This is a best-effort facility, so memory_order_relaxed is sufficient. 231 return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); 232 } 233 IsPausingManualCompaction()234 bool IsPausingManualCompaction() { 235 // This is a best-effort facility, so memory_order_relaxed is sufficient. 236 return manual_compaction_paused_ && 237 manual_compaction_paused_->load(std::memory_order_relaxed); 238 } 239 }; 240 } // namespace ROCKSDB_NAMESPACE 241