1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 
7 #include <algorithm>
8 #include <deque>
9 #include <string>
10 #include <unordered_set>
11 #include <vector>
12 
13 #include "db/compaction/compaction.h"
14 #include "db/compaction/compaction_iteration_stats.h"
15 #include "db/merge_helper.h"
16 #include "db/pinned_iterators_manager.h"
17 #include "db/range_del_aggregator.h"
18 #include "db/snapshot_checker.h"
19 #include "options/cf_options.h"
20 #include "rocksdb/compaction_filter.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 
24 class CompactionIterator {
25  public:
26   // A wrapper around Compaction. Has a much smaller interface, only what
27   // CompactionIterator uses. Tests can override it.
28   class CompactionProxy {
29    public:
CompactionProxy(const Compaction * compaction)30     explicit CompactionProxy(const Compaction* compaction)
31         : compaction_(compaction) {}
32 
33     virtual ~CompactionProxy() = default;
34     virtual int level(size_t /*compaction_input_level*/ = 0) const {
35       return compaction_->level();
36     }
KeyNotExistsBeyondOutputLevel(const Slice & user_key,std::vector<size_t> * level_ptrs)37     virtual bool KeyNotExistsBeyondOutputLevel(
38         const Slice& user_key, std::vector<size_t>* level_ptrs) const {
39       return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
40     }
bottommost_level()41     virtual bool bottommost_level() const {
42       return compaction_->bottommost_level();
43     }
number_levels()44     virtual int number_levels() const { return compaction_->number_levels(); }
GetLargestUserKey()45     virtual Slice GetLargestUserKey() const {
46       return compaction_->GetLargestUserKey();
47     }
allow_ingest_behind()48     virtual bool allow_ingest_behind() const {
49       return compaction_->immutable_cf_options()->allow_ingest_behind;
50     }
preserve_deletes()51     virtual bool preserve_deletes() const {
52       return compaction_->immutable_cf_options()->preserve_deletes;
53     }
54 
55    protected:
56     CompactionProxy() = default;
57 
58    private:
59     const Compaction* compaction_;
60   };
61 
62   CompactionIterator(
63       InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
64       SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
65       SequenceNumber earliest_write_conflict_snapshot,
66       const SnapshotChecker* snapshot_checker, Env* env,
67       bool report_detailed_time, bool expect_valid_internal_key,
68       CompactionRangeDelAggregator* range_del_agg,
69       const Compaction* compaction = nullptr,
70       const CompactionFilter* compaction_filter = nullptr,
71       const std::atomic<bool>* shutting_down = nullptr,
72       const SequenceNumber preserve_deletes_seqnum = 0,
73       const std::atomic<bool>* manual_compaction_paused = nullptr,
74       const std::shared_ptr<Logger> info_log = nullptr);
75 
76   // Constructor with custom CompactionProxy, used for tests.
77   CompactionIterator(
78       InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
79       SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
80       SequenceNumber earliest_write_conflict_snapshot,
81       const SnapshotChecker* snapshot_checker, Env* env,
82       bool report_detailed_time, bool expect_valid_internal_key,
83       CompactionRangeDelAggregator* range_del_agg,
84       std::unique_ptr<CompactionProxy> compaction,
85       const CompactionFilter* compaction_filter = nullptr,
86       const std::atomic<bool>* shutting_down = nullptr,
87       const SequenceNumber preserve_deletes_seqnum = 0,
88       const std::atomic<bool>* manual_compaction_paused = nullptr,
89       const std::shared_ptr<Logger> info_log = nullptr);
90 
91   ~CompactionIterator();
92 
93   void ResetRecordCounts();
94 
95   // Seek to the beginning of the compaction iterator output.
96   //
97   // REQUIRED: Call only once.
98   void SeekToFirst();
99 
100   // Produces the next record in the compaction.
101   //
102   // REQUIRED: SeekToFirst() has been called.
103   void Next();
104 
105   // Getters
key()106   const Slice& key() const { return key_; }
value()107   const Slice& value() const { return value_; }
status()108   const Status& status() const { return status_; }
ikey()109   const ParsedInternalKey& ikey() const { return ikey_; }
Valid()110   bool Valid() const { return valid_; }
user_key()111   const Slice& user_key() const { return current_user_key_; }
iter_stats()112   const CompactionIterationStats& iter_stats() const { return iter_stats_; }
113 
114  private:
115   // Processes the input stream to find the next output
116   void NextFromInput();
117 
118   // Do last preparations before presenting the output to the callee. At this
119   // point this only zeroes out the sequence number if possible for better
120   // compression.
121   void PrepareOutput();
122 
123   // Invoke compaction filter if needed.
124   void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
125 
126   // Given a sequence number, return the sequence number of the
127   // earliest snapshot that this sequence number is visible in.
128   // The snapshots themselves are arranged in ascending order of
129   // sequence numbers.
130   // Employ a sequential search because the total number of
131   // snapshots are typically small.
132   inline SequenceNumber findEarliestVisibleSnapshot(
133       SequenceNumber in, SequenceNumber* prev_snapshot);
134 
135   // Checks whether the currently seen ikey_ is needed for
136   // incremental (differential) snapshot and hence can't be dropped
137   // or seqnum be zero-ed out even if all other conditions for it are met.
138   inline bool ikeyNotNeededForIncrementalSnapshot();
139 
KeyCommitted(SequenceNumber sequence)140   inline bool KeyCommitted(SequenceNumber sequence) {
141     return snapshot_checker_ == nullptr ||
142            snapshot_checker_->CheckInSnapshot(sequence, kMaxSequenceNumber) ==
143                SnapshotCheckerResult::kInSnapshot;
144   }
145 
146   bool IsInEarliestSnapshot(SequenceNumber sequence);
147 
148   InternalIterator* input_;
149   const Comparator* cmp_;
150   MergeHelper* merge_helper_;
151   const std::vector<SequenceNumber>* snapshots_;
152   // List of snapshots released during compaction.
153   // findEarliestVisibleSnapshot() find them out from return of
154   // snapshot_checker, and make sure they will not be returned as
155   // earliest visible snapshot of an older value.
156   // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
157   std::unordered_set<SequenceNumber> released_snapshots_;
158   std::vector<SequenceNumber>::const_iterator earliest_snapshot_iter_;
159   const SequenceNumber earliest_write_conflict_snapshot_;
160   const SnapshotChecker* const snapshot_checker_;
161   Env* env_;
162   bool report_detailed_time_;
163   bool expect_valid_internal_key_;
164   CompactionRangeDelAggregator* range_del_agg_;
165   std::unique_ptr<CompactionProxy> compaction_;
166   const CompactionFilter* compaction_filter_;
167   const std::atomic<bool>* shutting_down_;
168   const std::atomic<bool>* manual_compaction_paused_;
169   const SequenceNumber preserve_deletes_seqnum_;
170   bool bottommost_level_;
171   bool valid_ = false;
172   bool visible_at_tip_;
173   SequenceNumber earliest_snapshot_;
174   SequenceNumber latest_snapshot_;
175 
176   // State
177   //
178   // Points to a copy of the current compaction iterator output (current_key_)
179   // if valid_.
180   Slice key_;
181   // Points to the value in the underlying iterator that corresponds to the
182   // current output.
183   Slice value_;
184   // The status is OK unless compaction iterator encounters a merge operand
185   // while not having a merge operator defined.
186   Status status_;
187   // Stores the user key, sequence number and type of the current compaction
188   // iterator output (or current key in the underlying iterator during
189   // NextFromInput()).
190   ParsedInternalKey ikey_;
191   // Stores whether ikey_.user_key is valid. If set to false, the user key is
192   // not compared against the current key in the underlying iterator.
193   bool has_current_user_key_ = false;
194   bool at_next_ = false;  // If false, the iterator
195   // Holds a copy of the current compaction iterator output (or current key in
196   // the underlying iterator during NextFromInput()).
197   IterKey current_key_;
198   Slice current_user_key_;
199   SequenceNumber current_user_key_sequence_;
200   SequenceNumber current_user_key_snapshot_;
201 
202   // True if the iterator has already returned a record for the current key.
203   bool has_outputted_key_ = false;
204 
205   // truncated the value of the next key and output it without applying any
206   // compaction rules.  This is used for outputting a put after a single delete.
207   bool clear_and_output_next_key_ = false;
208 
209   MergeOutputIterator merge_out_iter_;
210   // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
211   // merge operands and then releasing them after consuming them.
212   PinnedIteratorsManager pinned_iters_mgr_;
213   std::string compaction_filter_value_;
214   InternalKey compaction_filter_skip_until_;
215   // "level_ptrs" holds indices that remember which file of an associated
216   // level we were last checking during the last call to compaction->
217   // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
218   // to pick off where it left off since each subcompaction's key range is
219   // increasing so a later call to the function must be looking for a key that
220   // is in or beyond the last file checked during the previous call
221   std::vector<size_t> level_ptrs_;
222   CompactionIterationStats iter_stats_;
223 
224   // Used to avoid purging uncommitted values. The application can specify
225   // uncommitted values by providing a SnapshotChecker object.
226   bool current_key_committed_;
227   std::shared_ptr<Logger> info_log_;
228 
IsShuttingDown()229   bool IsShuttingDown() {
230     // This is a best-effort facility, so memory_order_relaxed is sufficient.
231     return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
232   }
233 
IsPausingManualCompaction()234   bool IsPausingManualCompaction() {
235     // This is a best-effort facility, so memory_order_relaxed is sufficient.
236     return manual_compaction_paused_ &&
237            manual_compaction_paused_->load(std::memory_order_relaxed);
238   }
239 };
240 }  // namespace ROCKSDB_NAMESPACE
241