1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_iter.h"
11 #include <string>
12 #include <iostream>
13 #include <limits>
14 
15 #include "db/dbformat.h"
16 #include "db/merge_context.h"
17 #include "db/merge_helper.h"
18 #include "db/pinned_iterators_manager.h"
19 #include "file/filename.h"
20 #include "logging/logging.h"
21 #include "memory/arena.h"
22 #include "monitoring/perf_context_imp.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/iterator.h"
25 #include "rocksdb/merge_operator.h"
26 #include "rocksdb/options.h"
27 #include "table/internal_iterator.h"
28 #include "table/iterator_wrapper.h"
29 #include "trace_replay/trace_replay.h"
30 #include "util/mutexlock.h"
31 #include "util/string_util.h"
32 #include "util/user_comparator_wrapper.h"
33 
34 namespace ROCKSDB_NAMESPACE {
35 
DBIter(Env * _env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * cmp,InternalIterator * iter,SequenceNumber s,bool arena_mode,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)36 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
37                const ImmutableCFOptions& cf_options,
38                const MutableCFOptions& mutable_cf_options,
39                const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
40                bool arena_mode, uint64_t max_sequential_skip_in_iterations,
41                ReadCallback* read_callback, DBImpl* db_impl,
42                ColumnFamilyData* cfd, bool allow_blob)
43     : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
44       env_(_env),
45       logger_(cf_options.info_log),
46       user_comparator_(cmp),
47       merge_operator_(cf_options.merge_operator),
48       iter_(iter),
49       read_callback_(read_callback),
50       sequence_(s),
51       statistics_(cf_options.statistics),
52       max_skip_(max_sequential_skip_in_iterations),
53       max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
54       num_internal_keys_skipped_(0),
55       iterate_lower_bound_(read_options.iterate_lower_bound),
56       iterate_upper_bound_(read_options.iterate_upper_bound),
57       direction_(kForward),
58       valid_(false),
59       current_entry_is_merged_(false),
60       is_key_seqnum_zero_(false),
61       prefix_same_as_start_(mutable_cf_options.prefix_extractor
62                                 ? read_options.prefix_same_as_start
63                                 : false),
64       pin_thru_lifetime_(read_options.pin_data),
65       expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
66                                      read_options.total_order_seek ||
67                                      read_options.auto_prefix_mode),
68       allow_blob_(allow_blob),
69       is_blob_(false),
70       arena_mode_(arena_mode),
71       range_del_agg_(&cf_options.internal_comparator, s),
72       db_impl_(db_impl),
73       cfd_(cfd),
74       start_seqnum_(read_options.iter_start_seqnum),
75       timestamp_ub_(read_options.timestamp),
76       timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
77   RecordTick(statistics_, NO_ITERATOR_CREATED);
78   if (pin_thru_lifetime_) {
79     pinned_iters_mgr_.StartPinning();
80   }
81   if (iter_.iter()) {
82     iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
83   }
84   assert(timestamp_size_ == user_comparator_.timestamp_size());
85 }
86 
GetProperty(std::string prop_name,std::string * prop)87 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
88   if (prop == nullptr) {
89     return Status::InvalidArgument("prop is nullptr");
90   }
91   if (prop_name == "rocksdb.iterator.super-version-number") {
92     // First try to pass the value returned from inner iterator.
93     return iter_.iter()->GetProperty(prop_name, prop);
94   } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
95     if (valid_) {
96       *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
97     } else {
98       *prop = "Iterator is not valid.";
99     }
100     return Status::OK();
101   } else if (prop_name == "rocksdb.iterator.internal-key") {
102     *prop = saved_key_.GetUserKey().ToString();
103     return Status::OK();
104   }
105   return Status::InvalidArgument("Unidentified property.");
106 }
107 
ParseKey(ParsedInternalKey * ikey)108 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
109   if (!ParseInternalKey(iter_.key(), ikey)) {
110     status_ = Status::Corruption("corrupted internal key in DBIter");
111     valid_ = false;
112     ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
113                     iter_.key().ToString(true).c_str());
114     return false;
115   } else {
116     return true;
117   }
118 }
119 
Next()120 void DBIter::Next() {
121   assert(valid_);
122   assert(status_.ok());
123 
124   PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
125   // Release temporarily pinned blocks from last operation
126   ReleaseTempPinnedData();
127   local_stats_.skip_count_ += num_internal_keys_skipped_;
128   local_stats_.skip_count_--;
129   num_internal_keys_skipped_ = 0;
130   bool ok = true;
131   if (direction_ == kReverse) {
132     is_key_seqnum_zero_ = false;
133     if (!ReverseToForward()) {
134       ok = false;
135     }
136   } else if (!current_entry_is_merged_) {
137     // If the current value is not a merge, the iter position is the
138     // current key, which is already returned. We can safely issue a
139     // Next() without checking the current key.
140     // If the current key is a merge, very likely iter already points
141     // to the next internal position.
142     assert(iter_.Valid());
143     iter_.Next();
144     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
145   }
146 
147   local_stats_.next_count_++;
148   if (ok && iter_.Valid()) {
149     Slice prefix;
150     if (prefix_same_as_start_) {
151       assert(prefix_extractor_ != nullptr);
152       prefix = prefix_.GetUserKey();
153     }
154     FindNextUserEntry(true /* skipping the current user key */,
155                       prefix_same_as_start_ ? &prefix : nullptr);
156   } else {
157     is_key_seqnum_zero_ = false;
158     valid_ = false;
159   }
160   if (statistics_ != nullptr && valid_) {
161     local_stats_.next_found_count_++;
162     local_stats_.bytes_read_ += (key().size() + value().size());
163   }
164 }
165 
166 // PRE: saved_key_ has the current user key if skipping_saved_key
167 // POST: saved_key_ should have the next user key if valid_,
168 //       if the current entry is a result of merge
169 //           current_entry_is_merged_ => true
170 //           saved_value_             => the merged value
171 //
172 // NOTE: In between, saved_key_ can point to a user key that has
173 //       a delete marker or a sequence number higher than sequence_
174 //       saved_key_ MUST have a proper user_key before calling this function
175 //
176 // The prefix parameter, if not null, indicates that we need to iterate
177 // within the prefix, and the iterator needs to be made invalid, if no
178 // more entry for the prefix can be found.
FindNextUserEntry(bool skipping_saved_key,const Slice * prefix)179 bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
180   PERF_TIMER_GUARD(find_next_user_entry_time);
181   return FindNextUserEntryInternal(skipping_saved_key, prefix);
182 }
183 
184 // Actual implementation of DBIter::FindNextUserEntry()
FindNextUserEntryInternal(bool skipping_saved_key,const Slice * prefix)185 bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
186                                        const Slice* prefix) {
187   // Loop until we hit an acceptable entry to yield
188   assert(iter_.Valid());
189   assert(status_.ok());
190   assert(direction_ == kForward);
191   current_entry_is_merged_ = false;
192 
193   // How many times in a row we have skipped an entry with user key less than
194   // or equal to saved_key_. We could skip these entries either because
195   // sequence numbers were too high or because skipping_saved_key = true.
196   // What saved_key_ contains throughout this method:
197   //  - if skipping_saved_key : saved_key_ contains the key that we need
198   //                            to skip, and we haven't seen any keys greater
199   //                            than that,
200   //  - if num_skipped > 0    : saved_key_ contains the key that we have skipped
201   //                            num_skipped times, and we haven't seen any keys
202   //                            greater than that,
203   //  - none of the above     : saved_key_ can contain anything, it doesn't
204   //                            matter.
205   uint64_t num_skipped = 0;
206   // For write unprepared, the target sequence number in reseek could be larger
207   // than the snapshot, and thus needs to be skipped again. This could result in
208   // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
209   // to one.
210   bool reseek_done = false;
211 
212   is_blob_ = false;
213 
214   do {
215     // Will update is_key_seqnum_zero_ as soon as we parsed the current key
216     // but we need to save the previous value to be used in the loop.
217     bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
218     if (!ParseKey(&ikey_)) {
219       is_key_seqnum_zero_ = false;
220       return false;
221     }
222 
223     is_key_seqnum_zero_ = (ikey_.sequence == 0);
224 
225     assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
226            user_comparator_.CompareWithoutTimestamp(
227                ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
228                /*b_has_ts=*/false) < 0);
229     if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
230         user_comparator_.CompareWithoutTimestamp(
231             ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
232             /*b_has_ts=*/false) >= 0) {
233       break;
234     }
235 
236     assert(prefix == nullptr || prefix_extractor_ != nullptr);
237     if (prefix != nullptr &&
238         prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
239       assert(prefix_same_as_start_);
240       break;
241     }
242 
243     if (TooManyInternalKeysSkipped()) {
244       return false;
245     }
246 
247     assert(ikey_.user_key.size() >= timestamp_size_);
248     Slice ts;
249     if (timestamp_size_ > 0) {
250       ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
251     }
252     if (IsVisible(ikey_.sequence, ts)) {
253       // If the previous entry is of seqnum 0, the current entry will not
254       // possibly be skipped. This condition can potentially be relaxed to
255       // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
256       // prone to bugs causing the same user key with the same sequence number.
257       if (!is_prev_key_seqnum_zero && skipping_saved_key &&
258           user_comparator_.CompareWithoutTimestamp(
259               ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
260         num_skipped++;  // skip this entry
261         PERF_COUNTER_ADD(internal_key_skipped_count, 1);
262       } else {
263         assert(!skipping_saved_key ||
264                user_comparator_.CompareWithoutTimestamp(
265                    ikey_.user_key, saved_key_.GetUserKey()) > 0);
266         num_skipped = 0;
267         reseek_done = false;
268         switch (ikey_.type) {
269           case kTypeDeletion:
270           case kTypeSingleDeletion:
271             // Arrange to skip all upcoming entries for this key since
272             // they are hidden by this deletion.
273             // if iterartor specified start_seqnum we
274             // 1) return internal key, including the type
275             // 2) return ikey only if ikey.seqnum >= start_seqnum_
276             // note that if deletion seqnum is < start_seqnum_ we
277             // just skip it like in normal iterator.
278             if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_)  {
279               saved_key_.SetInternalKey(ikey_);
280               valid_ = true;
281               return true;
282             } else {
283               saved_key_.SetUserKey(
284                   ikey_.user_key, !pin_thru_lifetime_ ||
285                                       !iter_.iter()->IsKeyPinned() /* copy */);
286               skipping_saved_key = true;
287               PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
288             }
289             break;
290           case kTypeValue:
291           case kTypeBlobIndex:
292             if (start_seqnum_ > 0) {
293               // we are taking incremental snapshot here
294               // incremental snapshots aren't supported on DB with range deletes
295               assert(ikey_.type != kTypeBlobIndex);
296               if (ikey_.sequence >= start_seqnum_) {
297                 saved_key_.SetInternalKey(ikey_);
298                 valid_ = true;
299                 return true;
300               } else {
301                 // this key and all previous versions shouldn't be included,
302                 // skipping_saved_key
303                 saved_key_.SetUserKey(
304                     ikey_.user_key,
305                     !pin_thru_lifetime_ ||
306                         !iter_.iter()->IsKeyPinned() /* copy */);
307                 skipping_saved_key = true;
308               }
309             } else {
310               saved_key_.SetUserKey(
311                   ikey_.user_key, !pin_thru_lifetime_ ||
312                                       !iter_.iter()->IsKeyPinned() /* copy */);
313               if (range_del_agg_.ShouldDelete(
314                       ikey_, RangeDelPositioningMode::kForwardTraversal)) {
315                 // Arrange to skip all upcoming entries for this key since
316                 // they are hidden by this deletion.
317                 skipping_saved_key = true;
318                 num_skipped = 0;
319                 reseek_done = false;
320                 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
321               } else if (ikey_.type == kTypeBlobIndex) {
322                 if (!allow_blob_) {
323                   ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
324                   status_ = Status::NotSupported(
325                       "Encounter unexpected blob index. Please open DB with "
326                       "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
327                   valid_ = false;
328                   return false;
329                 }
330 
331                 is_blob_ = true;
332                 valid_ = true;
333                 return true;
334               } else {
335                 valid_ = true;
336                 return true;
337               }
338             }
339             break;
340           case kTypeMerge:
341             saved_key_.SetUserKey(
342                 ikey_.user_key,
343                 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
344             if (range_del_agg_.ShouldDelete(
345                     ikey_, RangeDelPositioningMode::kForwardTraversal)) {
346               // Arrange to skip all upcoming entries for this key since
347               // they are hidden by this deletion.
348               skipping_saved_key = true;
349               num_skipped = 0;
350               reseek_done = false;
351               PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
352             } else {
353               // By now, we are sure the current ikey is going to yield a
354               // value
355               current_entry_is_merged_ = true;
356               valid_ = true;
357               return MergeValuesNewToOld();  // Go to a different state machine
358             }
359             break;
360           default:
361             assert(false);
362             break;
363         }
364       }
365     } else {
366       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
367 
368       // This key was inserted after our snapshot was taken.
369       // If this happens too many times in a row for the same user key, we want
370       // to seek to the target sequence number.
371       int cmp = user_comparator_.CompareWithoutTimestamp(
372           ikey_.user_key, saved_key_.GetUserKey());
373       if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
374         num_skipped++;
375       } else {
376         saved_key_.SetUserKey(
377             ikey_.user_key,
378             !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
379         skipping_saved_key = false;
380         num_skipped = 0;
381         reseek_done = false;
382       }
383     }
384 
385     // If we have sequentially iterated via numerous equal keys, then it's
386     // better to seek so that we can avoid too many key comparisons.
387     //
388     // To avoid infinite loops, do not reseek if we have already attempted to
389     // reseek previously.
390     //
391     // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
392     // then it does not make sense to reseek as we would actually land further
393     // away from the desired key. There is opportunity for optimization here.
394     if (num_skipped > max_skip_ && !reseek_done) {
395       is_key_seqnum_zero_ = false;
396       num_skipped = 0;
397       reseek_done = true;
398       std::string last_key;
399       if (skipping_saved_key) {
400         // We're looking for the next user-key but all we see are the same
401         // user-key with decreasing sequence numbers. Fast forward to
402         // sequence number 0 and type deletion (the smallest type).
403         if (timestamp_size_ == 0) {
404           AppendInternalKey(
405               &last_key,
406               ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
407         } else {
408           std::string min_ts(timestamp_size_, static_cast<char>(0));
409           AppendInternalKeyWithDifferentTimestamp(
410               &last_key,
411               ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
412               min_ts);
413         }
414         // Don't set skipping_saved_key = false because we may still see more
415         // user-keys equal to saved_key_.
416       } else {
417         // We saw multiple entries with this user key and sequence numbers
418         // higher than sequence_. Fast forward to sequence_.
419         // Note that this only covers a case when a higher key was overwritten
420         // many times since our snapshot was taken, not the case when a lot of
421         // different keys were inserted after our snapshot was taken.
422         if (timestamp_size_ == 0) {
423           AppendInternalKey(
424               &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
425                                            kValueTypeForSeek));
426         } else {
427           AppendInternalKeyWithDifferentTimestamp(
428               &last_key,
429               ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
430                                 kValueTypeForSeek),
431               *timestamp_ub_);
432         }
433       }
434       iter_.Seek(last_key);
435       RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
436     } else {
437       iter_.Next();
438     }
439   } while (iter_.Valid());
440 
441   valid_ = false;
442   return iter_.status().ok();
443 }
444 
445 // Merge values of the same user key starting from the current iter_ position
446 // Scan from the newer entries to older entries.
447 // PRE: iter_.key() points to the first merge type entry
448 //      saved_key_ stores the user key
449 // POST: saved_value_ has the merged value for the user key
450 //       iter_ points to the next entry (or invalid)
MergeValuesNewToOld()451 bool DBIter::MergeValuesNewToOld() {
452   if (!merge_operator_) {
453     ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
454     status_ = Status::InvalidArgument("merge_operator_ must be set.");
455     valid_ = false;
456     return false;
457   }
458 
459   // Temporarily pin the blocks that hold merge operands
460   TempPinData();
461   merge_context_.Clear();
462   // Start the merge process by pushing the first operand
463   merge_context_.PushOperand(
464       iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
465   TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
466 
467   ParsedInternalKey ikey;
468   Status s;
469   for (iter_.Next(); iter_.Valid(); iter_.Next()) {
470     TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
471     if (!ParseKey(&ikey)) {
472       return false;
473     }
474 
475     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
476       // hit the next user key, stop right here
477       break;
478     } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
479                range_del_agg_.ShouldDelete(
480                    ikey, RangeDelPositioningMode::kForwardTraversal)) {
481       // hit a delete with the same user key, stop right here
482       // iter_ is positioned after delete
483       iter_.Next();
484       break;
485     } else if (kTypeValue == ikey.type) {
486       // hit a put, merge the put value with operands and store the
487       // final result in saved_value_. We are done!
488       const Slice val = iter_.value();
489       s = MergeHelper::TimedFullMerge(
490           merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
491           &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
492       if (!s.ok()) {
493         valid_ = false;
494         status_ = s;
495         return false;
496       }
497       // iter_ is positioned after put
498       iter_.Next();
499       if (!iter_.status().ok()) {
500         valid_ = false;
501         return false;
502       }
503       return true;
504     } else if (kTypeMerge == ikey.type) {
505       // hit a merge, add the value as an operand and run associative merge.
506       // when complete, add result to operands and continue.
507       merge_context_.PushOperand(
508           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
509       PERF_COUNTER_ADD(internal_merge_count, 1);
510     } else if (kTypeBlobIndex == ikey.type) {
511       if (!allow_blob_) {
512         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
513         status_ = Status::NotSupported(
514             "Encounter unexpected blob index. Please open DB with "
515             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
516       } else {
517         status_ =
518             Status::NotSupported("Blob DB does not support merge operator.");
519       }
520       valid_ = false;
521       return false;
522     } else {
523       assert(false);
524     }
525   }
526 
527   if (!iter_.status().ok()) {
528     valid_ = false;
529     return false;
530   }
531 
532   // we either exhausted all internal keys under this user key, or hit
533   // a deletion marker.
534   // feed null as the existing value to the merge operator, such that
535   // client can differentiate this scenario and do things accordingly.
536   s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
537                                   nullptr, merge_context_.GetOperands(),
538                                   &saved_value_, logger_, statistics_, env_,
539                                   &pinned_value_, true);
540   if (!s.ok()) {
541     valid_ = false;
542     status_ = s;
543     return false;
544   }
545 
546   assert(status_.ok());
547   return true;
548 }
549 
Prev()550 void DBIter::Prev() {
551   if (timestamp_size_ > 0) {
552     valid_ = false;
553     status_ = Status::NotSupported(
554         "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
555     return;
556   }
557 
558   assert(valid_);
559   assert(status_.ok());
560 
561   PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
562   ReleaseTempPinnedData();
563   ResetInternalKeysSkippedCounter();
564   bool ok = true;
565   if (direction_ == kForward) {
566     if (!ReverseToBackward()) {
567       ok = false;
568     }
569   }
570   if (ok) {
571     Slice prefix;
572     if (prefix_same_as_start_) {
573       assert(prefix_extractor_ != nullptr);
574       prefix = prefix_.GetUserKey();
575     }
576     PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
577   }
578 
579   if (statistics_ != nullptr) {
580     local_stats_.prev_count_++;
581     if (valid_) {
582       local_stats_.prev_found_count_++;
583       local_stats_.bytes_read_ += (key().size() + value().size());
584     }
585   }
586 }
587 
ReverseToForward()588 bool DBIter::ReverseToForward() {
589   assert(iter_.status().ok());
590 
591   // When moving backwards, iter_ is positioned on _previous_ key, which may
592   // not exist or may have different prefix than the current key().
593   // If that's the case, seek iter_ to current key.
594   if (!expect_total_order_inner_iter() || !iter_.Valid()) {
595     IterKey last_key;
596     last_key.SetInternalKey(ParsedInternalKey(
597         saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
598     iter_.Seek(last_key.GetInternalKey());
599   }
600 
601   direction_ = kForward;
602   // Skip keys less than the current key() (a.k.a. saved_key_).
603   while (iter_.Valid()) {
604     ParsedInternalKey ikey;
605     if (!ParseKey(&ikey)) {
606       return false;
607     }
608     if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
609       return true;
610     }
611     iter_.Next();
612   }
613 
614   if (!iter_.status().ok()) {
615     valid_ = false;
616     return false;
617   }
618 
619   return true;
620 }
621 
622 // Move iter_ to the key before saved_key_.
ReverseToBackward()623 bool DBIter::ReverseToBackward() {
624   assert(iter_.status().ok());
625 
626   // When current_entry_is_merged_ is true, iter_ may be positioned on the next
627   // key, which may not exist or may have prefix different from current.
628   // If that's the case, seek to saved_key_.
629   if (current_entry_is_merged_ &&
630       (!expect_total_order_inner_iter() || !iter_.Valid())) {
631     IterKey last_key;
632     // Using kMaxSequenceNumber and kValueTypeForSeek
633     // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
634     // than saved_key_.
635     last_key.SetInternalKey(ParsedInternalKey(
636         saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
637     if (!expect_total_order_inner_iter()) {
638       iter_.SeekForPrev(last_key.GetInternalKey());
639     } else {
640       // Some iterators may not support SeekForPrev(), so we avoid using it
641       // when prefix seek mode is disabled. This is somewhat expensive
642       // (an extra Prev(), as well as an extra change of direction of iter_),
643       // so we may need to reconsider it later.
644       iter_.Seek(last_key.GetInternalKey());
645       if (!iter_.Valid() && iter_.status().ok()) {
646         iter_.SeekToLast();
647       }
648     }
649   }
650 
651   direction_ = kReverse;
652   return FindUserKeyBeforeSavedKey();
653 }
654 
PrevInternal(const Slice * prefix)655 void DBIter::PrevInternal(const Slice* prefix) {
656   while (iter_.Valid()) {
657     saved_key_.SetUserKey(
658         ExtractUserKey(iter_.key()),
659         !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
660 
661     assert(prefix == nullptr || prefix_extractor_ != nullptr);
662     if (prefix != nullptr &&
663         prefix_extractor_->Transform(saved_key_.GetUserKey())
664                 .compare(*prefix) != 0) {
665       assert(prefix_same_as_start_);
666       // Current key does not have the same prefix as start
667       valid_ = false;
668       return;
669     }
670 
671     assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
672            user_comparator_.Compare(saved_key_.GetUserKey(),
673                                     *iterate_lower_bound_) >= 0);
674     if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
675         user_comparator_.Compare(saved_key_.GetUserKey(),
676                                  *iterate_lower_bound_) < 0) {
677       // We've iterated earlier than the user-specified lower bound.
678       valid_ = false;
679       return;
680     }
681 
682     if (!FindValueForCurrentKey()) {  // assigns valid_
683       return;
684     }
685 
686     // Whether or not we found a value for current key, we need iter_ to end up
687     // on a smaller key.
688     if (!FindUserKeyBeforeSavedKey()) {
689       return;
690     }
691 
692     if (valid_) {
693       // Found the value.
694       return;
695     }
696 
697     if (TooManyInternalKeysSkipped(false)) {
698       return;
699     }
700   }
701 
702   // We haven't found any key - iterator is not valid
703   valid_ = false;
704 }
705 
706 // Used for backwards iteration.
707 // Looks at the entries with user key saved_key_ and finds the most up-to-date
708 // value for it, or executes a merge, or determines that the value was deleted.
709 // Sets valid_ to true if the value is found and is ready to be presented to
710 // the user through value().
711 // Sets valid_ to false if the value was deleted, and we should try another key.
712 // Returns false if an error occurred, and !status().ok() and !valid_.
713 //
714 // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
715 // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
716 //       the entry just before them, or on the entry just after them.
FindValueForCurrentKey()717 bool DBIter::FindValueForCurrentKey() {
718   assert(iter_.Valid());
719   merge_context_.Clear();
720   current_entry_is_merged_ = false;
721   // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
722   // kTypeValue)
723   ValueType last_not_merge_type = kTypeDeletion;
724   ValueType last_key_entry_type = kTypeDeletion;
725 
726   // Temporarily pin blocks that hold (merge operands / the value)
727   ReleaseTempPinnedData();
728   TempPinData();
729   size_t num_skipped = 0;
730   while (iter_.Valid()) {
731     ParsedInternalKey ikey;
732     if (!ParseKey(&ikey)) {
733       return false;
734     }
735 
736     assert(ikey.user_key.size() >= timestamp_size_);
737     Slice ts;
738     if (timestamp_size_ > 0) {
739       ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
740                  timestamp_size_);
741     }
742     if (!IsVisible(ikey.sequence, ts) ||
743         !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
744       break;
745     }
746     if (TooManyInternalKeysSkipped()) {
747       return false;
748     }
749 
750     // This user key has lots of entries.
751     // We're going from old to new, and it's taking too long. Let's do a Seek()
752     // and go from new to old. This helps when a key was overwritten many times.
753     if (num_skipped >= max_skip_) {
754       return FindValueForCurrentKeyUsingSeek();
755     }
756 
757     last_key_entry_type = ikey.type;
758     switch (last_key_entry_type) {
759       case kTypeValue:
760       case kTypeBlobIndex:
761         if (range_del_agg_.ShouldDelete(
762                 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
763           last_key_entry_type = kTypeRangeDeletion;
764           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
765         } else {
766           assert(iter_.iter()->IsValuePinned());
767           pinned_value_ = iter_.value();
768         }
769         merge_context_.Clear();
770         last_not_merge_type = last_key_entry_type;
771         break;
772       case kTypeDeletion:
773       case kTypeSingleDeletion:
774         merge_context_.Clear();
775         last_not_merge_type = last_key_entry_type;
776         PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
777         break;
778       case kTypeMerge:
779         if (range_del_agg_.ShouldDelete(
780                 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
781           merge_context_.Clear();
782           last_key_entry_type = kTypeRangeDeletion;
783           last_not_merge_type = last_key_entry_type;
784           PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
785         } else {
786           assert(merge_operator_ != nullptr);
787           merge_context_.PushOperandBack(
788               iter_.value(),
789               iter_.iter()->IsValuePinned() /* operand_pinned */);
790           PERF_COUNTER_ADD(internal_merge_count, 1);
791         }
792         break;
793       default:
794         assert(false);
795     }
796 
797     PERF_COUNTER_ADD(internal_key_skipped_count, 1);
798     iter_.Prev();
799     ++num_skipped;
800   }
801 
802   if (!iter_.status().ok()) {
803     valid_ = false;
804     return false;
805   }
806 
807   Status s;
808   is_blob_ = false;
809   switch (last_key_entry_type) {
810     case kTypeDeletion:
811     case kTypeSingleDeletion:
812     case kTypeRangeDeletion:
813       valid_ = false;
814       return true;
815     case kTypeMerge:
816       current_entry_is_merged_ = true;
817       if (last_not_merge_type == kTypeDeletion ||
818           last_not_merge_type == kTypeSingleDeletion ||
819           last_not_merge_type == kTypeRangeDeletion) {
820         s = MergeHelper::TimedFullMerge(
821             merge_operator_, saved_key_.GetUserKey(), nullptr,
822             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
823             env_, &pinned_value_, true);
824       } else if (last_not_merge_type == kTypeBlobIndex) {
825         if (!allow_blob_) {
826           ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
827           status_ = Status::NotSupported(
828               "Encounter unexpected blob index. Please open DB with "
829               "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
830         } else {
831           status_ =
832               Status::NotSupported("Blob DB does not support merge operator.");
833         }
834         valid_ = false;
835         return false;
836       } else {
837         assert(last_not_merge_type == kTypeValue);
838         s = MergeHelper::TimedFullMerge(
839             merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
840             merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
841             env_, &pinned_value_, true);
842       }
843       break;
844     case kTypeValue:
845       // do nothing - we've already has value in pinned_value_
846       break;
847     case kTypeBlobIndex:
848       if (!allow_blob_) {
849         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
850         status_ = Status::NotSupported(
851             "Encounter unexpected blob index. Please open DB with "
852             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
853         valid_ = false;
854         return false;
855       }
856       is_blob_ = true;
857       break;
858     default:
859       assert(false);
860       break;
861   }
862   if (!s.ok()) {
863     valid_ = false;
864     status_ = s;
865     return false;
866   }
867   valid_ = true;
868   return true;
869 }
870 
871 // This function is used in FindValueForCurrentKey.
872 // We use Seek() function instead of Prev() to find necessary value
873 // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
874 //       Would be nice to reuse some code.
FindValueForCurrentKeyUsingSeek()875 bool DBIter::FindValueForCurrentKeyUsingSeek() {
876   // FindValueForCurrentKey will enable pinning before calling
877   // FindValueForCurrentKeyUsingSeek()
878   assert(pinned_iters_mgr_.PinningEnabled());
879   std::string last_key;
880   AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
881                                                  sequence_, kValueTypeForSeek));
882   iter_.Seek(last_key);
883   RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
884 
885   // In case read_callback presents, the value we seek to may not be visible.
886   // Find the next value that's visible.
887   ParsedInternalKey ikey;
888   is_blob_ = false;
889   while (true) {
890     if (!iter_.Valid()) {
891       valid_ = false;
892       return iter_.status().ok();
893     }
894 
895     if (!ParseKey(&ikey)) {
896       return false;
897     }
898     assert(ikey.user_key.size() >= timestamp_size_);
899     Slice ts;
900     if (timestamp_size_ > 0) {
901       ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
902                  timestamp_size_);
903     }
904 
905     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
906       // No visible values for this key, even though FindValueForCurrentKey()
907       // has seen some. This is possible if we're using a tailing iterator, and
908       // the entries were discarded in a compaction.
909       valid_ = false;
910       return true;
911     }
912 
913     if (IsVisible(ikey.sequence, ts)) {
914       break;
915     }
916 
917     iter_.Next();
918   }
919 
920   if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
921       range_del_agg_.ShouldDelete(
922           ikey, RangeDelPositioningMode::kBackwardTraversal)) {
923     valid_ = false;
924     return true;
925   }
926   if (ikey.type == kTypeBlobIndex && !allow_blob_) {
927     ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
928     status_ = Status::NotSupported(
929         "Encounter unexpected blob index. Please open DB with "
930         "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
931     valid_ = false;
932     return false;
933   }
934   if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
935     assert(iter_.iter()->IsValuePinned());
936     pinned_value_ = iter_.value();
937     is_blob_ = (ikey.type == kTypeBlobIndex);
938     valid_ = true;
939     return true;
940   }
941 
942   // kTypeMerge. We need to collect all kTypeMerge values and save them
943   // in operands
944   assert(ikey.type == kTypeMerge);
945   current_entry_is_merged_ = true;
946   merge_context_.Clear();
947   merge_context_.PushOperand(
948       iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
949   while (true) {
950     iter_.Next();
951 
952     if (!iter_.Valid()) {
953       if (!iter_.status().ok()) {
954         valid_ = false;
955         return false;
956       }
957       break;
958     }
959     if (!ParseKey(&ikey)) {
960       return false;
961     }
962     if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
963       break;
964     }
965 
966     if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
967         range_del_agg_.ShouldDelete(
968             ikey, RangeDelPositioningMode::kForwardTraversal)) {
969       break;
970     } else if (ikey.type == kTypeValue) {
971       const Slice val = iter_.value();
972       Status s = MergeHelper::TimedFullMerge(
973           merge_operator_, saved_key_.GetUserKey(), &val,
974           merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
975           env_, &pinned_value_, true);
976       if (!s.ok()) {
977         valid_ = false;
978         status_ = s;
979         return false;
980       }
981       valid_ = true;
982       return true;
983     } else if (ikey.type == kTypeMerge) {
984       merge_context_.PushOperand(
985           iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
986       PERF_COUNTER_ADD(internal_merge_count, 1);
987     } else if (ikey.type == kTypeBlobIndex) {
988       if (!allow_blob_) {
989         ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
990         status_ = Status::NotSupported(
991             "Encounter unexpected blob index. Please open DB with "
992             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
993       } else {
994         status_ =
995             Status::NotSupported("Blob DB does not support merge operator.");
996       }
997       valid_ = false;
998       return false;
999     } else {
1000       assert(false);
1001     }
1002   }
1003 
1004   Status s = MergeHelper::TimedFullMerge(
1005       merge_operator_, saved_key_.GetUserKey(), nullptr,
1006       merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
1007       &pinned_value_, true);
1008   if (!s.ok()) {
1009     valid_ = false;
1010     status_ = s;
1011     return false;
1012   }
1013 
1014   // Make sure we leave iter_ in a good state. If it's valid and we don't care
1015   // about prefixes, that's already good enough. Otherwise it needs to be
1016   // seeked to the current key.
1017   if (!expect_total_order_inner_iter() || !iter_.Valid()) {
1018     if (!expect_total_order_inner_iter()) {
1019       iter_.SeekForPrev(last_key);
1020     } else {
1021       iter_.Seek(last_key);
1022       if (!iter_.Valid() && iter_.status().ok()) {
1023         iter_.SeekToLast();
1024       }
1025     }
1026     RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1027   }
1028 
1029   valid_ = true;
1030   return true;
1031 }
1032 
1033 // Move backwards until the key smaller than saved_key_.
1034 // Changes valid_ only if return value is false.
FindUserKeyBeforeSavedKey()1035 bool DBIter::FindUserKeyBeforeSavedKey() {
1036   assert(status_.ok());
1037   size_t num_skipped = 0;
1038   while (iter_.Valid()) {
1039     ParsedInternalKey ikey;
1040     if (!ParseKey(&ikey)) {
1041       return false;
1042     }
1043 
1044     if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1045       return true;
1046     }
1047 
1048     if (TooManyInternalKeysSkipped()) {
1049       return false;
1050     }
1051 
1052     assert(ikey.sequence != kMaxSequenceNumber);
1053     assert(ikey.user_key.size() >= timestamp_size_);
1054     Slice ts;
1055     if (timestamp_size_ > 0) {
1056       ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
1057                  timestamp_size_);
1058     }
1059     if (!IsVisible(ikey.sequence, ts)) {
1060       PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1061     } else {
1062       PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1063     }
1064 
1065     if (num_skipped >= max_skip_) {
1066       num_skipped = 0;
1067       IterKey last_key;
1068       last_key.SetInternalKey(ParsedInternalKey(
1069           saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
1070       // It would be more efficient to use SeekForPrev() here, but some
1071       // iterators may not support it.
1072       iter_.Seek(last_key.GetInternalKey());
1073       RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1074       if (!iter_.Valid()) {
1075         break;
1076       }
1077     } else {
1078       ++num_skipped;
1079     }
1080 
1081     iter_.Prev();
1082   }
1083 
1084   if (!iter_.status().ok()) {
1085     valid_ = false;
1086     return false;
1087   }
1088 
1089   return true;
1090 }
1091 
TooManyInternalKeysSkipped(bool increment)1092 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1093   if ((max_skippable_internal_keys_ > 0) &&
1094       (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1095     valid_ = false;
1096     status_ = Status::Incomplete("Too many internal keys skipped.");
1097     return true;
1098   } else if (increment) {
1099     num_internal_keys_skipped_++;
1100   }
1101   return false;
1102 }
1103 
IsVisible(SequenceNumber sequence,const Slice & ts)1104 bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) {
1105   // Remember that comparator orders preceding timestamp as larger.
1106   int cmp_ts = timestamp_ub_ != nullptr
1107                    ? user_comparator_.CompareTimestamp(ts, *timestamp_ub_)
1108                    : 0;
1109   if (cmp_ts > 0) {
1110     return false;
1111   }
1112   if (read_callback_ == nullptr) {
1113     return sequence <= sequence_;
1114   } else {
1115     // TODO(yanqin): support timestamp in read_callback_.
1116     return read_callback_->IsVisible(sequence);
1117   }
1118 }
1119 
SetSavedKeyToSeekTarget(const Slice & target)1120 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1121   is_key_seqnum_zero_ = false;
1122   SequenceNumber seq = sequence_;
1123   saved_key_.Clear();
1124   saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1125 
1126   if (iterate_lower_bound_ != nullptr &&
1127       user_comparator_.CompareWithoutTimestamp(
1128           saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
1129           /*b_has_ts=*/false) < 0) {
1130     // Seek key is smaller than the lower bound.
1131     saved_key_.Clear();
1132     saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
1133                               timestamp_ub_);
1134   }
1135 }
1136 
SetSavedKeyToSeekForPrevTarget(const Slice & target)1137 void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
1138   is_key_seqnum_zero_ = false;
1139   saved_key_.Clear();
1140   // now saved_key is used to store internal key.
1141   saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1142                             kValueTypeForSeekForPrev);
1143 
1144   if (iterate_upper_bound_ != nullptr &&
1145       user_comparator_.Compare(saved_key_.GetUserKey(),
1146                                *iterate_upper_bound_) >= 0) {
1147     saved_key_.Clear();
1148     saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
1149   }
1150 }
1151 
Seek(const Slice & target)1152 void DBIter::Seek(const Slice& target) {
1153   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1154   StopWatch sw(env_, statistics_, DB_SEEK);
1155 
1156 #ifndef ROCKSDB_LITE
1157   if (db_impl_ != nullptr && cfd_ != nullptr) {
1158     db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
1159   }
1160 #endif  // ROCKSDB_LITE
1161 
1162   status_ = Status::OK();
1163   ReleaseTempPinnedData();
1164   ResetInternalKeysSkippedCounter();
1165 
1166   // Seek the inner iterator based on the target key.
1167   {
1168     PERF_TIMER_GUARD(seek_internal_seek_time);
1169 
1170     SetSavedKeyToSeekTarget(target);
1171     iter_.Seek(saved_key_.GetInternalKey());
1172 
1173     range_del_agg_.InvalidateRangeDelMapPositions();
1174     RecordTick(statistics_, NUMBER_DB_SEEK);
1175   }
1176   if (!iter_.Valid()) {
1177     valid_ = false;
1178     return;
1179   }
1180   direction_ = kForward;
1181 
1182   // Now the inner iterator is placed to the target position. From there,
1183   // we need to find out the next key that is visible to the user.
1184   ClearSavedValue();
1185   if (prefix_same_as_start_) {
1186     // The case where the iterator needs to be invalidated if it has exausted
1187     // keys within the same prefix of the seek key.
1188     assert(prefix_extractor_ != nullptr);
1189     Slice target_prefix = prefix_extractor_->Transform(target);
1190     FindNextUserEntry(false /* not skipping saved_key */,
1191                       &target_prefix /* prefix */);
1192     if (valid_) {
1193       // Remember the prefix of the seek key for the future Next() call to
1194       // check.
1195       prefix_.SetUserKey(target_prefix);
1196     }
1197   } else {
1198     FindNextUserEntry(false /* not skipping saved_key */, nullptr);
1199   }
1200   if (!valid_) {
1201     return;
1202   }
1203 
1204   // Updating stats and perf context counters.
1205   if (statistics_ != nullptr) {
1206     // Decrement since we don't want to count this key as skipped
1207     RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1208     RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1209   }
1210   PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1211 }
1212 
SeekForPrev(const Slice & target)1213 void DBIter::SeekForPrev(const Slice& target) {
1214   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1215   StopWatch sw(env_, statistics_, DB_SEEK);
1216 
1217 #ifndef ROCKSDB_LITE
1218   if (db_impl_ != nullptr && cfd_ != nullptr) {
1219     db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
1220   }
1221 #endif  // ROCKSDB_LITE
1222 
1223   if (timestamp_size_ > 0) {
1224     valid_ = false;
1225     status_ = Status::NotSupported(
1226         "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
1227     return;
1228   }
1229 
1230   status_ = Status::OK();
1231   ReleaseTempPinnedData();
1232   ResetInternalKeysSkippedCounter();
1233 
1234   // Seek the inner iterator based on the target key.
1235   {
1236     PERF_TIMER_GUARD(seek_internal_seek_time);
1237     SetSavedKeyToSeekForPrevTarget(target);
1238     iter_.SeekForPrev(saved_key_.GetInternalKey());
1239     range_del_agg_.InvalidateRangeDelMapPositions();
1240     RecordTick(statistics_, NUMBER_DB_SEEK);
1241   }
1242   if (!iter_.Valid()) {
1243     valid_ = false;
1244     return;
1245   }
1246   direction_ = kReverse;
1247 
1248   // Now the inner iterator is placed to the target position. From there,
1249   // we need to find out the first key that is visible to the user in the
1250   // backward direction.
1251   ClearSavedValue();
1252   if (prefix_same_as_start_) {
1253     // The case where the iterator needs to be invalidated if it has exausted
1254     // keys within the same prefix of the seek key.
1255     assert(prefix_extractor_ != nullptr);
1256     Slice target_prefix = prefix_extractor_->Transform(target);
1257     PrevInternal(&target_prefix);
1258     if (valid_) {
1259       // Remember the prefix of the seek key for the future Prev() call to
1260       // check.
1261       prefix_.SetUserKey(target_prefix);
1262     }
1263   } else {
1264     PrevInternal(nullptr);
1265   }
1266 
1267   // Report stats and perf context.
1268   if (statistics_ != nullptr && valid_) {
1269     RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1270     RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1271     PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1272   }
1273 }
1274 
SeekToFirst()1275 void DBIter::SeekToFirst() {
1276   if (iterate_lower_bound_ != nullptr) {
1277     Seek(*iterate_lower_bound_);
1278     return;
1279   }
1280   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1281   // Don't use iter_::Seek() if we set a prefix extractor
1282   // because prefix seek will be used.
1283   if (!expect_total_order_inner_iter()) {
1284     max_skip_ = std::numeric_limits<uint64_t>::max();
1285   }
1286   status_ = Status::OK();
1287   direction_ = kForward;
1288   ReleaseTempPinnedData();
1289   ResetInternalKeysSkippedCounter();
1290   ClearSavedValue();
1291   is_key_seqnum_zero_ = false;
1292 
1293   {
1294     PERF_TIMER_GUARD(seek_internal_seek_time);
1295     iter_.SeekToFirst();
1296     range_del_agg_.InvalidateRangeDelMapPositions();
1297   }
1298 
1299   RecordTick(statistics_, NUMBER_DB_SEEK);
1300   if (iter_.Valid()) {
1301     saved_key_.SetUserKey(
1302         ExtractUserKey(iter_.key()),
1303         !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1304     FindNextUserEntry(false /* not skipping saved_key */,
1305                       nullptr /* no prefix check */);
1306     if (statistics_ != nullptr) {
1307       if (valid_) {
1308         RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1309         RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1310         PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1311       }
1312     }
1313   } else {
1314     valid_ = false;
1315   }
1316   if (valid_ && prefix_same_as_start_) {
1317     assert(prefix_extractor_ != nullptr);
1318     prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1319   }
1320 }
1321 
SeekToLast()1322 void DBIter::SeekToLast() {
1323   if (timestamp_size_ > 0) {
1324     valid_ = false;
1325     status_ = Status::NotSupported(
1326         "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
1327     return;
1328   }
1329 
1330   if (iterate_upper_bound_ != nullptr) {
1331     // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1332     SeekForPrev(*iterate_upper_bound_);
1333     if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1334       ReleaseTempPinnedData();
1335       PrevInternal(nullptr);
1336     }
1337     return;
1338   }
1339 
1340   PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1341   // Don't use iter_::Seek() if we set a prefix extractor
1342   // because prefix seek will be used.
1343   if (!expect_total_order_inner_iter()) {
1344     max_skip_ = std::numeric_limits<uint64_t>::max();
1345   }
1346   status_ = Status::OK();
1347   direction_ = kReverse;
1348   ReleaseTempPinnedData();
1349   ResetInternalKeysSkippedCounter();
1350   ClearSavedValue();
1351   is_key_seqnum_zero_ = false;
1352 
1353   {
1354     PERF_TIMER_GUARD(seek_internal_seek_time);
1355     iter_.SeekToLast();
1356     range_del_agg_.InvalidateRangeDelMapPositions();
1357   }
1358   PrevInternal(nullptr);
1359   if (statistics_ != nullptr) {
1360     RecordTick(statistics_, NUMBER_DB_SEEK);
1361     if (valid_) {
1362       RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1363       RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1364       PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1365     }
1366   }
1367   if (valid_ && prefix_same_as_start_) {
1368     assert(prefix_extractor_ != nullptr);
1369     prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1370   }
1371 }
1372 
NewDBIterator(Env * env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * user_key_comparator,InternalIterator * internal_iter,const SequenceNumber & sequence,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)1373 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1374                         const ImmutableCFOptions& cf_options,
1375                         const MutableCFOptions& mutable_cf_options,
1376                         const Comparator* user_key_comparator,
1377                         InternalIterator* internal_iter,
1378                         const SequenceNumber& sequence,
1379                         uint64_t max_sequential_skip_in_iterations,
1380                         ReadCallback* read_callback, DBImpl* db_impl,
1381                         ColumnFamilyData* cfd, bool allow_blob) {
1382   DBIter* db_iter = new DBIter(
1383       env, read_options, cf_options, mutable_cf_options, user_key_comparator,
1384       internal_iter, sequence, false, max_sequential_skip_in_iterations,
1385       read_callback, db_impl, cfd, allow_blob);
1386   return db_iter;
1387 }
1388 
1389 }  // namespace ROCKSDB_NAMESPACE
1390