1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_iter.h"
11 #include <string>
12 #include <iostream>
13 #include <limits>
14
15 #include "db/dbformat.h"
16 #include "db/merge_context.h"
17 #include "db/merge_helper.h"
18 #include "db/pinned_iterators_manager.h"
19 #include "file/filename.h"
20 #include "logging/logging.h"
21 #include "memory/arena.h"
22 #include "monitoring/perf_context_imp.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/iterator.h"
25 #include "rocksdb/merge_operator.h"
26 #include "rocksdb/options.h"
27 #include "table/internal_iterator.h"
28 #include "table/iterator_wrapper.h"
29 #include "trace_replay/trace_replay.h"
30 #include "util/mutexlock.h"
31 #include "util/string_util.h"
32 #include "util/user_comparator_wrapper.h"
33
34 namespace ROCKSDB_NAMESPACE {
35
DBIter(Env * _env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * cmp,InternalIterator * iter,SequenceNumber s,bool arena_mode,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)36 DBIter::DBIter(Env* _env, const ReadOptions& read_options,
37 const ImmutableCFOptions& cf_options,
38 const MutableCFOptions& mutable_cf_options,
39 const Comparator* cmp, InternalIterator* iter, SequenceNumber s,
40 bool arena_mode, uint64_t max_sequential_skip_in_iterations,
41 ReadCallback* read_callback, DBImpl* db_impl,
42 ColumnFamilyData* cfd, bool allow_blob)
43 : prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
44 env_(_env),
45 logger_(cf_options.info_log),
46 user_comparator_(cmp),
47 merge_operator_(cf_options.merge_operator),
48 iter_(iter),
49 read_callback_(read_callback),
50 sequence_(s),
51 statistics_(cf_options.statistics),
52 max_skip_(max_sequential_skip_in_iterations),
53 max_skippable_internal_keys_(read_options.max_skippable_internal_keys),
54 num_internal_keys_skipped_(0),
55 iterate_lower_bound_(read_options.iterate_lower_bound),
56 iterate_upper_bound_(read_options.iterate_upper_bound),
57 direction_(kForward),
58 valid_(false),
59 current_entry_is_merged_(false),
60 is_key_seqnum_zero_(false),
61 prefix_same_as_start_(mutable_cf_options.prefix_extractor
62 ? read_options.prefix_same_as_start
63 : false),
64 pin_thru_lifetime_(read_options.pin_data),
65 expect_total_order_inner_iter_(prefix_extractor_ == nullptr ||
66 read_options.total_order_seek ||
67 read_options.auto_prefix_mode),
68 allow_blob_(allow_blob),
69 is_blob_(false),
70 arena_mode_(arena_mode),
71 range_del_agg_(&cf_options.internal_comparator, s),
72 db_impl_(db_impl),
73 cfd_(cfd),
74 start_seqnum_(read_options.iter_start_seqnum),
75 timestamp_ub_(read_options.timestamp),
76 timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) {
77 RecordTick(statistics_, NO_ITERATOR_CREATED);
78 if (pin_thru_lifetime_) {
79 pinned_iters_mgr_.StartPinning();
80 }
81 if (iter_.iter()) {
82 iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
83 }
84 assert(timestamp_size_ == user_comparator_.timestamp_size());
85 }
86
GetProperty(std::string prop_name,std::string * prop)87 Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
88 if (prop == nullptr) {
89 return Status::InvalidArgument("prop is nullptr");
90 }
91 if (prop_name == "rocksdb.iterator.super-version-number") {
92 // First try to pass the value returned from inner iterator.
93 return iter_.iter()->GetProperty(prop_name, prop);
94 } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
95 if (valid_) {
96 *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
97 } else {
98 *prop = "Iterator is not valid.";
99 }
100 return Status::OK();
101 } else if (prop_name == "rocksdb.iterator.internal-key") {
102 *prop = saved_key_.GetUserKey().ToString();
103 return Status::OK();
104 }
105 return Status::InvalidArgument("Unidentified property.");
106 }
107
ParseKey(ParsedInternalKey * ikey)108 bool DBIter::ParseKey(ParsedInternalKey* ikey) {
109 if (!ParseInternalKey(iter_.key(), ikey)) {
110 status_ = Status::Corruption("corrupted internal key in DBIter");
111 valid_ = false;
112 ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s",
113 iter_.key().ToString(true).c_str());
114 return false;
115 } else {
116 return true;
117 }
118 }
119
Next()120 void DBIter::Next() {
121 assert(valid_);
122 assert(status_.ok());
123
124 PERF_CPU_TIMER_GUARD(iter_next_cpu_nanos, env_);
125 // Release temporarily pinned blocks from last operation
126 ReleaseTempPinnedData();
127 local_stats_.skip_count_ += num_internal_keys_skipped_;
128 local_stats_.skip_count_--;
129 num_internal_keys_skipped_ = 0;
130 bool ok = true;
131 if (direction_ == kReverse) {
132 is_key_seqnum_zero_ = false;
133 if (!ReverseToForward()) {
134 ok = false;
135 }
136 } else if (!current_entry_is_merged_) {
137 // If the current value is not a merge, the iter position is the
138 // current key, which is already returned. We can safely issue a
139 // Next() without checking the current key.
140 // If the current key is a merge, very likely iter already points
141 // to the next internal position.
142 assert(iter_.Valid());
143 iter_.Next();
144 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
145 }
146
147 local_stats_.next_count_++;
148 if (ok && iter_.Valid()) {
149 Slice prefix;
150 if (prefix_same_as_start_) {
151 assert(prefix_extractor_ != nullptr);
152 prefix = prefix_.GetUserKey();
153 }
154 FindNextUserEntry(true /* skipping the current user key */,
155 prefix_same_as_start_ ? &prefix : nullptr);
156 } else {
157 is_key_seqnum_zero_ = false;
158 valid_ = false;
159 }
160 if (statistics_ != nullptr && valid_) {
161 local_stats_.next_found_count_++;
162 local_stats_.bytes_read_ += (key().size() + value().size());
163 }
164 }
165
166 // PRE: saved_key_ has the current user key if skipping_saved_key
167 // POST: saved_key_ should have the next user key if valid_,
168 // if the current entry is a result of merge
169 // current_entry_is_merged_ => true
170 // saved_value_ => the merged value
171 //
172 // NOTE: In between, saved_key_ can point to a user key that has
173 // a delete marker or a sequence number higher than sequence_
174 // saved_key_ MUST have a proper user_key before calling this function
175 //
176 // The prefix parameter, if not null, indicates that we need to iterate
177 // within the prefix, and the iterator needs to be made invalid, if no
178 // more entry for the prefix can be found.
FindNextUserEntry(bool skipping_saved_key,const Slice * prefix)179 bool DBIter::FindNextUserEntry(bool skipping_saved_key, const Slice* prefix) {
180 PERF_TIMER_GUARD(find_next_user_entry_time);
181 return FindNextUserEntryInternal(skipping_saved_key, prefix);
182 }
183
184 // Actual implementation of DBIter::FindNextUserEntry()
FindNextUserEntryInternal(bool skipping_saved_key,const Slice * prefix)185 bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
186 const Slice* prefix) {
187 // Loop until we hit an acceptable entry to yield
188 assert(iter_.Valid());
189 assert(status_.ok());
190 assert(direction_ == kForward);
191 current_entry_is_merged_ = false;
192
193 // How many times in a row we have skipped an entry with user key less than
194 // or equal to saved_key_. We could skip these entries either because
195 // sequence numbers were too high or because skipping_saved_key = true.
196 // What saved_key_ contains throughout this method:
197 // - if skipping_saved_key : saved_key_ contains the key that we need
198 // to skip, and we haven't seen any keys greater
199 // than that,
200 // - if num_skipped > 0 : saved_key_ contains the key that we have skipped
201 // num_skipped times, and we haven't seen any keys
202 // greater than that,
203 // - none of the above : saved_key_ can contain anything, it doesn't
204 // matter.
205 uint64_t num_skipped = 0;
206 // For write unprepared, the target sequence number in reseek could be larger
207 // than the snapshot, and thus needs to be skipped again. This could result in
208 // an infinite loop of reseeks. To avoid that, we limit the number of reseeks
209 // to one.
210 bool reseek_done = false;
211
212 is_blob_ = false;
213
214 do {
215 // Will update is_key_seqnum_zero_ as soon as we parsed the current key
216 // but we need to save the previous value to be used in the loop.
217 bool is_prev_key_seqnum_zero = is_key_seqnum_zero_;
218 if (!ParseKey(&ikey_)) {
219 is_key_seqnum_zero_ = false;
220 return false;
221 }
222
223 is_key_seqnum_zero_ = (ikey_.sequence == 0);
224
225 assert(iterate_upper_bound_ == nullptr || iter_.MayBeOutOfUpperBound() ||
226 user_comparator_.CompareWithoutTimestamp(
227 ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
228 /*b_has_ts=*/false) < 0);
229 if (iterate_upper_bound_ != nullptr && iter_.MayBeOutOfUpperBound() &&
230 user_comparator_.CompareWithoutTimestamp(
231 ikey_.user_key, /*a_has_ts=*/true, *iterate_upper_bound_,
232 /*b_has_ts=*/false) >= 0) {
233 break;
234 }
235
236 assert(prefix == nullptr || prefix_extractor_ != nullptr);
237 if (prefix != nullptr &&
238 prefix_extractor_->Transform(ikey_.user_key).compare(*prefix) != 0) {
239 assert(prefix_same_as_start_);
240 break;
241 }
242
243 if (TooManyInternalKeysSkipped()) {
244 return false;
245 }
246
247 assert(ikey_.user_key.size() >= timestamp_size_);
248 Slice ts;
249 if (timestamp_size_ > 0) {
250 ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
251 }
252 if (IsVisible(ikey_.sequence, ts)) {
253 // If the previous entry is of seqnum 0, the current entry will not
254 // possibly be skipped. This condition can potentially be relaxed to
255 // prev_key.seq <= ikey_.sequence. We are cautious because it will be more
256 // prone to bugs causing the same user key with the same sequence number.
257 if (!is_prev_key_seqnum_zero && skipping_saved_key &&
258 user_comparator_.CompareWithoutTimestamp(
259 ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
260 num_skipped++; // skip this entry
261 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
262 } else {
263 assert(!skipping_saved_key ||
264 user_comparator_.CompareWithoutTimestamp(
265 ikey_.user_key, saved_key_.GetUserKey()) > 0);
266 num_skipped = 0;
267 reseek_done = false;
268 switch (ikey_.type) {
269 case kTypeDeletion:
270 case kTypeSingleDeletion:
271 // Arrange to skip all upcoming entries for this key since
272 // they are hidden by this deletion.
273 // if iterartor specified start_seqnum we
274 // 1) return internal key, including the type
275 // 2) return ikey only if ikey.seqnum >= start_seqnum_
276 // note that if deletion seqnum is < start_seqnum_ we
277 // just skip it like in normal iterator.
278 if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) {
279 saved_key_.SetInternalKey(ikey_);
280 valid_ = true;
281 return true;
282 } else {
283 saved_key_.SetUserKey(
284 ikey_.user_key, !pin_thru_lifetime_ ||
285 !iter_.iter()->IsKeyPinned() /* copy */);
286 skipping_saved_key = true;
287 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
288 }
289 break;
290 case kTypeValue:
291 case kTypeBlobIndex:
292 if (start_seqnum_ > 0) {
293 // we are taking incremental snapshot here
294 // incremental snapshots aren't supported on DB with range deletes
295 assert(ikey_.type != kTypeBlobIndex);
296 if (ikey_.sequence >= start_seqnum_) {
297 saved_key_.SetInternalKey(ikey_);
298 valid_ = true;
299 return true;
300 } else {
301 // this key and all previous versions shouldn't be included,
302 // skipping_saved_key
303 saved_key_.SetUserKey(
304 ikey_.user_key,
305 !pin_thru_lifetime_ ||
306 !iter_.iter()->IsKeyPinned() /* copy */);
307 skipping_saved_key = true;
308 }
309 } else {
310 saved_key_.SetUserKey(
311 ikey_.user_key, !pin_thru_lifetime_ ||
312 !iter_.iter()->IsKeyPinned() /* copy */);
313 if (range_del_agg_.ShouldDelete(
314 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
315 // Arrange to skip all upcoming entries for this key since
316 // they are hidden by this deletion.
317 skipping_saved_key = true;
318 num_skipped = 0;
319 reseek_done = false;
320 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
321 } else if (ikey_.type == kTypeBlobIndex) {
322 if (!allow_blob_) {
323 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
324 status_ = Status::NotSupported(
325 "Encounter unexpected blob index. Please open DB with "
326 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
327 valid_ = false;
328 return false;
329 }
330
331 is_blob_ = true;
332 valid_ = true;
333 return true;
334 } else {
335 valid_ = true;
336 return true;
337 }
338 }
339 break;
340 case kTypeMerge:
341 saved_key_.SetUserKey(
342 ikey_.user_key,
343 !pin_thru_lifetime_ || !iter_.iter()->IsKeyPinned() /* copy */);
344 if (range_del_agg_.ShouldDelete(
345 ikey_, RangeDelPositioningMode::kForwardTraversal)) {
346 // Arrange to skip all upcoming entries for this key since
347 // they are hidden by this deletion.
348 skipping_saved_key = true;
349 num_skipped = 0;
350 reseek_done = false;
351 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
352 } else {
353 // By now, we are sure the current ikey is going to yield a
354 // value
355 current_entry_is_merged_ = true;
356 valid_ = true;
357 return MergeValuesNewToOld(); // Go to a different state machine
358 }
359 break;
360 default:
361 assert(false);
362 break;
363 }
364 }
365 } else {
366 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
367
368 // This key was inserted after our snapshot was taken.
369 // If this happens too many times in a row for the same user key, we want
370 // to seek to the target sequence number.
371 int cmp = user_comparator_.CompareWithoutTimestamp(
372 ikey_.user_key, saved_key_.GetUserKey());
373 if (cmp == 0 || (skipping_saved_key && cmp < 0)) {
374 num_skipped++;
375 } else {
376 saved_key_.SetUserKey(
377 ikey_.user_key,
378 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
379 skipping_saved_key = false;
380 num_skipped = 0;
381 reseek_done = false;
382 }
383 }
384
385 // If we have sequentially iterated via numerous equal keys, then it's
386 // better to seek so that we can avoid too many key comparisons.
387 //
388 // To avoid infinite loops, do not reseek if we have already attempted to
389 // reseek previously.
390 //
391 // TODO(lth): If we reseek to sequence number greater than ikey_.sequence,
392 // then it does not make sense to reseek as we would actually land further
393 // away from the desired key. There is opportunity for optimization here.
394 if (num_skipped > max_skip_ && !reseek_done) {
395 is_key_seqnum_zero_ = false;
396 num_skipped = 0;
397 reseek_done = true;
398 std::string last_key;
399 if (skipping_saved_key) {
400 // We're looking for the next user-key but all we see are the same
401 // user-key with decreasing sequence numbers. Fast forward to
402 // sequence number 0 and type deletion (the smallest type).
403 if (timestamp_size_ == 0) {
404 AppendInternalKey(
405 &last_key,
406 ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion));
407 } else {
408 std::string min_ts(timestamp_size_, static_cast<char>(0));
409 AppendInternalKeyWithDifferentTimestamp(
410 &last_key,
411 ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion),
412 min_ts);
413 }
414 // Don't set skipping_saved_key = false because we may still see more
415 // user-keys equal to saved_key_.
416 } else {
417 // We saw multiple entries with this user key and sequence numbers
418 // higher than sequence_. Fast forward to sequence_.
419 // Note that this only covers a case when a higher key was overwritten
420 // many times since our snapshot was taken, not the case when a lot of
421 // different keys were inserted after our snapshot was taken.
422 if (timestamp_size_ == 0) {
423 AppendInternalKey(
424 &last_key, ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
425 kValueTypeForSeek));
426 } else {
427 AppendInternalKeyWithDifferentTimestamp(
428 &last_key,
429 ParsedInternalKey(saved_key_.GetUserKey(), sequence_,
430 kValueTypeForSeek),
431 *timestamp_ub_);
432 }
433 }
434 iter_.Seek(last_key);
435 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
436 } else {
437 iter_.Next();
438 }
439 } while (iter_.Valid());
440
441 valid_ = false;
442 return iter_.status().ok();
443 }
444
445 // Merge values of the same user key starting from the current iter_ position
446 // Scan from the newer entries to older entries.
447 // PRE: iter_.key() points to the first merge type entry
448 // saved_key_ stores the user key
449 // POST: saved_value_ has the merged value for the user key
450 // iter_ points to the next entry (or invalid)
MergeValuesNewToOld()451 bool DBIter::MergeValuesNewToOld() {
452 if (!merge_operator_) {
453 ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null.");
454 status_ = Status::InvalidArgument("merge_operator_ must be set.");
455 valid_ = false;
456 return false;
457 }
458
459 // Temporarily pin the blocks that hold merge operands
460 TempPinData();
461 merge_context_.Clear();
462 // Start the merge process by pushing the first operand
463 merge_context_.PushOperand(
464 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
465 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
466
467 ParsedInternalKey ikey;
468 Status s;
469 for (iter_.Next(); iter_.Valid(); iter_.Next()) {
470 TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
471 if (!ParseKey(&ikey)) {
472 return false;
473 }
474
475 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
476 // hit the next user key, stop right here
477 break;
478 } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type ||
479 range_del_agg_.ShouldDelete(
480 ikey, RangeDelPositioningMode::kForwardTraversal)) {
481 // hit a delete with the same user key, stop right here
482 // iter_ is positioned after delete
483 iter_.Next();
484 break;
485 } else if (kTypeValue == ikey.type) {
486 // hit a put, merge the put value with operands and store the
487 // final result in saved_value_. We are done!
488 const Slice val = iter_.value();
489 s = MergeHelper::TimedFullMerge(
490 merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
491 &saved_value_, logger_, statistics_, env_, &pinned_value_, true);
492 if (!s.ok()) {
493 valid_ = false;
494 status_ = s;
495 return false;
496 }
497 // iter_ is positioned after put
498 iter_.Next();
499 if (!iter_.status().ok()) {
500 valid_ = false;
501 return false;
502 }
503 return true;
504 } else if (kTypeMerge == ikey.type) {
505 // hit a merge, add the value as an operand and run associative merge.
506 // when complete, add result to operands and continue.
507 merge_context_.PushOperand(
508 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
509 PERF_COUNTER_ADD(internal_merge_count, 1);
510 } else if (kTypeBlobIndex == ikey.type) {
511 if (!allow_blob_) {
512 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
513 status_ = Status::NotSupported(
514 "Encounter unexpected blob index. Please open DB with "
515 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
516 } else {
517 status_ =
518 Status::NotSupported("Blob DB does not support merge operator.");
519 }
520 valid_ = false;
521 return false;
522 } else {
523 assert(false);
524 }
525 }
526
527 if (!iter_.status().ok()) {
528 valid_ = false;
529 return false;
530 }
531
532 // we either exhausted all internal keys under this user key, or hit
533 // a deletion marker.
534 // feed null as the existing value to the merge operator, such that
535 // client can differentiate this scenario and do things accordingly.
536 s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
537 nullptr, merge_context_.GetOperands(),
538 &saved_value_, logger_, statistics_, env_,
539 &pinned_value_, true);
540 if (!s.ok()) {
541 valid_ = false;
542 status_ = s;
543 return false;
544 }
545
546 assert(status_.ok());
547 return true;
548 }
549
Prev()550 void DBIter::Prev() {
551 if (timestamp_size_ > 0) {
552 valid_ = false;
553 status_ = Status::NotSupported(
554 "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
555 return;
556 }
557
558 assert(valid_);
559 assert(status_.ok());
560
561 PERF_CPU_TIMER_GUARD(iter_prev_cpu_nanos, env_);
562 ReleaseTempPinnedData();
563 ResetInternalKeysSkippedCounter();
564 bool ok = true;
565 if (direction_ == kForward) {
566 if (!ReverseToBackward()) {
567 ok = false;
568 }
569 }
570 if (ok) {
571 Slice prefix;
572 if (prefix_same_as_start_) {
573 assert(prefix_extractor_ != nullptr);
574 prefix = prefix_.GetUserKey();
575 }
576 PrevInternal(prefix_same_as_start_ ? &prefix : nullptr);
577 }
578
579 if (statistics_ != nullptr) {
580 local_stats_.prev_count_++;
581 if (valid_) {
582 local_stats_.prev_found_count_++;
583 local_stats_.bytes_read_ += (key().size() + value().size());
584 }
585 }
586 }
587
ReverseToForward()588 bool DBIter::ReverseToForward() {
589 assert(iter_.status().ok());
590
591 // When moving backwards, iter_ is positioned on _previous_ key, which may
592 // not exist or may have different prefix than the current key().
593 // If that's the case, seek iter_ to current key.
594 if (!expect_total_order_inner_iter() || !iter_.Valid()) {
595 IterKey last_key;
596 last_key.SetInternalKey(ParsedInternalKey(
597 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
598 iter_.Seek(last_key.GetInternalKey());
599 }
600
601 direction_ = kForward;
602 // Skip keys less than the current key() (a.k.a. saved_key_).
603 while (iter_.Valid()) {
604 ParsedInternalKey ikey;
605 if (!ParseKey(&ikey)) {
606 return false;
607 }
608 if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) >= 0) {
609 return true;
610 }
611 iter_.Next();
612 }
613
614 if (!iter_.status().ok()) {
615 valid_ = false;
616 return false;
617 }
618
619 return true;
620 }
621
622 // Move iter_ to the key before saved_key_.
ReverseToBackward()623 bool DBIter::ReverseToBackward() {
624 assert(iter_.status().ok());
625
626 // When current_entry_is_merged_ is true, iter_ may be positioned on the next
627 // key, which may not exist or may have prefix different from current.
628 // If that's the case, seek to saved_key_.
629 if (current_entry_is_merged_ &&
630 (!expect_total_order_inner_iter() || !iter_.Valid())) {
631 IterKey last_key;
632 // Using kMaxSequenceNumber and kValueTypeForSeek
633 // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller
634 // than saved_key_.
635 last_key.SetInternalKey(ParsedInternalKey(
636 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
637 if (!expect_total_order_inner_iter()) {
638 iter_.SeekForPrev(last_key.GetInternalKey());
639 } else {
640 // Some iterators may not support SeekForPrev(), so we avoid using it
641 // when prefix seek mode is disabled. This is somewhat expensive
642 // (an extra Prev(), as well as an extra change of direction of iter_),
643 // so we may need to reconsider it later.
644 iter_.Seek(last_key.GetInternalKey());
645 if (!iter_.Valid() && iter_.status().ok()) {
646 iter_.SeekToLast();
647 }
648 }
649 }
650
651 direction_ = kReverse;
652 return FindUserKeyBeforeSavedKey();
653 }
654
PrevInternal(const Slice * prefix)655 void DBIter::PrevInternal(const Slice* prefix) {
656 while (iter_.Valid()) {
657 saved_key_.SetUserKey(
658 ExtractUserKey(iter_.key()),
659 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
660
661 assert(prefix == nullptr || prefix_extractor_ != nullptr);
662 if (prefix != nullptr &&
663 prefix_extractor_->Transform(saved_key_.GetUserKey())
664 .compare(*prefix) != 0) {
665 assert(prefix_same_as_start_);
666 // Current key does not have the same prefix as start
667 valid_ = false;
668 return;
669 }
670
671 assert(iterate_lower_bound_ == nullptr || iter_.MayBeOutOfLowerBound() ||
672 user_comparator_.Compare(saved_key_.GetUserKey(),
673 *iterate_lower_bound_) >= 0);
674 if (iterate_lower_bound_ != nullptr && iter_.MayBeOutOfLowerBound() &&
675 user_comparator_.Compare(saved_key_.GetUserKey(),
676 *iterate_lower_bound_) < 0) {
677 // We've iterated earlier than the user-specified lower bound.
678 valid_ = false;
679 return;
680 }
681
682 if (!FindValueForCurrentKey()) { // assigns valid_
683 return;
684 }
685
686 // Whether or not we found a value for current key, we need iter_ to end up
687 // on a smaller key.
688 if (!FindUserKeyBeforeSavedKey()) {
689 return;
690 }
691
692 if (valid_) {
693 // Found the value.
694 return;
695 }
696
697 if (TooManyInternalKeysSkipped(false)) {
698 return;
699 }
700 }
701
702 // We haven't found any key - iterator is not valid
703 valid_ = false;
704 }
705
706 // Used for backwards iteration.
707 // Looks at the entries with user key saved_key_ and finds the most up-to-date
708 // value for it, or executes a merge, or determines that the value was deleted.
709 // Sets valid_ to true if the value is found and is ready to be presented to
710 // the user through value().
711 // Sets valid_ to false if the value was deleted, and we should try another key.
712 // Returns false if an error occurred, and !status().ok() and !valid_.
713 //
714 // PRE: iter_ is positioned on the last entry with user key equal to saved_key_.
715 // POST: iter_ is positioned on one of the entries equal to saved_key_, or on
716 // the entry just before them, or on the entry just after them.
FindValueForCurrentKey()717 bool DBIter::FindValueForCurrentKey() {
718 assert(iter_.Valid());
719 merge_context_.Clear();
720 current_entry_is_merged_ = false;
721 // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
722 // kTypeValue)
723 ValueType last_not_merge_type = kTypeDeletion;
724 ValueType last_key_entry_type = kTypeDeletion;
725
726 // Temporarily pin blocks that hold (merge operands / the value)
727 ReleaseTempPinnedData();
728 TempPinData();
729 size_t num_skipped = 0;
730 while (iter_.Valid()) {
731 ParsedInternalKey ikey;
732 if (!ParseKey(&ikey)) {
733 return false;
734 }
735
736 assert(ikey.user_key.size() >= timestamp_size_);
737 Slice ts;
738 if (timestamp_size_ > 0) {
739 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
740 timestamp_size_);
741 }
742 if (!IsVisible(ikey.sequence, ts) ||
743 !user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
744 break;
745 }
746 if (TooManyInternalKeysSkipped()) {
747 return false;
748 }
749
750 // This user key has lots of entries.
751 // We're going from old to new, and it's taking too long. Let's do a Seek()
752 // and go from new to old. This helps when a key was overwritten many times.
753 if (num_skipped >= max_skip_) {
754 return FindValueForCurrentKeyUsingSeek();
755 }
756
757 last_key_entry_type = ikey.type;
758 switch (last_key_entry_type) {
759 case kTypeValue:
760 case kTypeBlobIndex:
761 if (range_del_agg_.ShouldDelete(
762 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
763 last_key_entry_type = kTypeRangeDeletion;
764 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
765 } else {
766 assert(iter_.iter()->IsValuePinned());
767 pinned_value_ = iter_.value();
768 }
769 merge_context_.Clear();
770 last_not_merge_type = last_key_entry_type;
771 break;
772 case kTypeDeletion:
773 case kTypeSingleDeletion:
774 merge_context_.Clear();
775 last_not_merge_type = last_key_entry_type;
776 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
777 break;
778 case kTypeMerge:
779 if (range_del_agg_.ShouldDelete(
780 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
781 merge_context_.Clear();
782 last_key_entry_type = kTypeRangeDeletion;
783 last_not_merge_type = last_key_entry_type;
784 PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
785 } else {
786 assert(merge_operator_ != nullptr);
787 merge_context_.PushOperandBack(
788 iter_.value(),
789 iter_.iter()->IsValuePinned() /* operand_pinned */);
790 PERF_COUNTER_ADD(internal_merge_count, 1);
791 }
792 break;
793 default:
794 assert(false);
795 }
796
797 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
798 iter_.Prev();
799 ++num_skipped;
800 }
801
802 if (!iter_.status().ok()) {
803 valid_ = false;
804 return false;
805 }
806
807 Status s;
808 is_blob_ = false;
809 switch (last_key_entry_type) {
810 case kTypeDeletion:
811 case kTypeSingleDeletion:
812 case kTypeRangeDeletion:
813 valid_ = false;
814 return true;
815 case kTypeMerge:
816 current_entry_is_merged_ = true;
817 if (last_not_merge_type == kTypeDeletion ||
818 last_not_merge_type == kTypeSingleDeletion ||
819 last_not_merge_type == kTypeRangeDeletion) {
820 s = MergeHelper::TimedFullMerge(
821 merge_operator_, saved_key_.GetUserKey(), nullptr,
822 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
823 env_, &pinned_value_, true);
824 } else if (last_not_merge_type == kTypeBlobIndex) {
825 if (!allow_blob_) {
826 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
827 status_ = Status::NotSupported(
828 "Encounter unexpected blob index. Please open DB with "
829 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
830 } else {
831 status_ =
832 Status::NotSupported("Blob DB does not support merge operator.");
833 }
834 valid_ = false;
835 return false;
836 } else {
837 assert(last_not_merge_type == kTypeValue);
838 s = MergeHelper::TimedFullMerge(
839 merge_operator_, saved_key_.GetUserKey(), &pinned_value_,
840 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
841 env_, &pinned_value_, true);
842 }
843 break;
844 case kTypeValue:
845 // do nothing - we've already has value in pinned_value_
846 break;
847 case kTypeBlobIndex:
848 if (!allow_blob_) {
849 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
850 status_ = Status::NotSupported(
851 "Encounter unexpected blob index. Please open DB with "
852 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
853 valid_ = false;
854 return false;
855 }
856 is_blob_ = true;
857 break;
858 default:
859 assert(false);
860 break;
861 }
862 if (!s.ok()) {
863 valid_ = false;
864 status_ = s;
865 return false;
866 }
867 valid_ = true;
868 return true;
869 }
870
871 // This function is used in FindValueForCurrentKey.
872 // We use Seek() function instead of Prev() to find necessary value
873 // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld().
874 // Would be nice to reuse some code.
FindValueForCurrentKeyUsingSeek()875 bool DBIter::FindValueForCurrentKeyUsingSeek() {
876 // FindValueForCurrentKey will enable pinning before calling
877 // FindValueForCurrentKeyUsingSeek()
878 assert(pinned_iters_mgr_.PinningEnabled());
879 std::string last_key;
880 AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(),
881 sequence_, kValueTypeForSeek));
882 iter_.Seek(last_key);
883 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
884
885 // In case read_callback presents, the value we seek to may not be visible.
886 // Find the next value that's visible.
887 ParsedInternalKey ikey;
888 is_blob_ = false;
889 while (true) {
890 if (!iter_.Valid()) {
891 valid_ = false;
892 return iter_.status().ok();
893 }
894
895 if (!ParseKey(&ikey)) {
896 return false;
897 }
898 assert(ikey.user_key.size() >= timestamp_size_);
899 Slice ts;
900 if (timestamp_size_ > 0) {
901 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
902 timestamp_size_);
903 }
904
905 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
906 // No visible values for this key, even though FindValueForCurrentKey()
907 // has seen some. This is possible if we're using a tailing iterator, and
908 // the entries were discarded in a compaction.
909 valid_ = false;
910 return true;
911 }
912
913 if (IsVisible(ikey.sequence, ts)) {
914 break;
915 }
916
917 iter_.Next();
918 }
919
920 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
921 range_del_agg_.ShouldDelete(
922 ikey, RangeDelPositioningMode::kBackwardTraversal)) {
923 valid_ = false;
924 return true;
925 }
926 if (ikey.type == kTypeBlobIndex && !allow_blob_) {
927 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
928 status_ = Status::NotSupported(
929 "Encounter unexpected blob index. Please open DB with "
930 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
931 valid_ = false;
932 return false;
933 }
934 if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) {
935 assert(iter_.iter()->IsValuePinned());
936 pinned_value_ = iter_.value();
937 is_blob_ = (ikey.type == kTypeBlobIndex);
938 valid_ = true;
939 return true;
940 }
941
942 // kTypeMerge. We need to collect all kTypeMerge values and save them
943 // in operands
944 assert(ikey.type == kTypeMerge);
945 current_entry_is_merged_ = true;
946 merge_context_.Clear();
947 merge_context_.PushOperand(
948 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
949 while (true) {
950 iter_.Next();
951
952 if (!iter_.Valid()) {
953 if (!iter_.status().ok()) {
954 valid_ = false;
955 return false;
956 }
957 break;
958 }
959 if (!ParseKey(&ikey)) {
960 return false;
961 }
962 if (!user_comparator_.Equal(ikey.user_key, saved_key_.GetUserKey())) {
963 break;
964 }
965
966 if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion ||
967 range_del_agg_.ShouldDelete(
968 ikey, RangeDelPositioningMode::kForwardTraversal)) {
969 break;
970 } else if (ikey.type == kTypeValue) {
971 const Slice val = iter_.value();
972 Status s = MergeHelper::TimedFullMerge(
973 merge_operator_, saved_key_.GetUserKey(), &val,
974 merge_context_.GetOperands(), &saved_value_, logger_, statistics_,
975 env_, &pinned_value_, true);
976 if (!s.ok()) {
977 valid_ = false;
978 status_ = s;
979 return false;
980 }
981 valid_ = true;
982 return true;
983 } else if (ikey.type == kTypeMerge) {
984 merge_context_.PushOperand(
985 iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
986 PERF_COUNTER_ADD(internal_merge_count, 1);
987 } else if (ikey.type == kTypeBlobIndex) {
988 if (!allow_blob_) {
989 ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index.");
990 status_ = Status::NotSupported(
991 "Encounter unexpected blob index. Please open DB with "
992 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
993 } else {
994 status_ =
995 Status::NotSupported("Blob DB does not support merge operator.");
996 }
997 valid_ = false;
998 return false;
999 } else {
1000 assert(false);
1001 }
1002 }
1003
1004 Status s = MergeHelper::TimedFullMerge(
1005 merge_operator_, saved_key_.GetUserKey(), nullptr,
1006 merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
1007 &pinned_value_, true);
1008 if (!s.ok()) {
1009 valid_ = false;
1010 status_ = s;
1011 return false;
1012 }
1013
1014 // Make sure we leave iter_ in a good state. If it's valid and we don't care
1015 // about prefixes, that's already good enough. Otherwise it needs to be
1016 // seeked to the current key.
1017 if (!expect_total_order_inner_iter() || !iter_.Valid()) {
1018 if (!expect_total_order_inner_iter()) {
1019 iter_.SeekForPrev(last_key);
1020 } else {
1021 iter_.Seek(last_key);
1022 if (!iter_.Valid() && iter_.status().ok()) {
1023 iter_.SeekToLast();
1024 }
1025 }
1026 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1027 }
1028
1029 valid_ = true;
1030 return true;
1031 }
1032
1033 // Move backwards until the key smaller than saved_key_.
1034 // Changes valid_ only if return value is false.
FindUserKeyBeforeSavedKey()1035 bool DBIter::FindUserKeyBeforeSavedKey() {
1036 assert(status_.ok());
1037 size_t num_skipped = 0;
1038 while (iter_.Valid()) {
1039 ParsedInternalKey ikey;
1040 if (!ParseKey(&ikey)) {
1041 return false;
1042 }
1043
1044 if (user_comparator_.Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) {
1045 return true;
1046 }
1047
1048 if (TooManyInternalKeysSkipped()) {
1049 return false;
1050 }
1051
1052 assert(ikey.sequence != kMaxSequenceNumber);
1053 assert(ikey.user_key.size() >= timestamp_size_);
1054 Slice ts;
1055 if (timestamp_size_ > 0) {
1056 ts = Slice(ikey.user_key.data() + ikey.user_key.size() - timestamp_size_,
1057 timestamp_size_);
1058 }
1059 if (!IsVisible(ikey.sequence, ts)) {
1060 PERF_COUNTER_ADD(internal_recent_skipped_count, 1);
1061 } else {
1062 PERF_COUNTER_ADD(internal_key_skipped_count, 1);
1063 }
1064
1065 if (num_skipped >= max_skip_) {
1066 num_skipped = 0;
1067 IterKey last_key;
1068 last_key.SetInternalKey(ParsedInternalKey(
1069 saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek));
1070 // It would be more efficient to use SeekForPrev() here, but some
1071 // iterators may not support it.
1072 iter_.Seek(last_key.GetInternalKey());
1073 RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION);
1074 if (!iter_.Valid()) {
1075 break;
1076 }
1077 } else {
1078 ++num_skipped;
1079 }
1080
1081 iter_.Prev();
1082 }
1083
1084 if (!iter_.status().ok()) {
1085 valid_ = false;
1086 return false;
1087 }
1088
1089 return true;
1090 }
1091
TooManyInternalKeysSkipped(bool increment)1092 bool DBIter::TooManyInternalKeysSkipped(bool increment) {
1093 if ((max_skippable_internal_keys_ > 0) &&
1094 (num_internal_keys_skipped_ > max_skippable_internal_keys_)) {
1095 valid_ = false;
1096 status_ = Status::Incomplete("Too many internal keys skipped.");
1097 return true;
1098 } else if (increment) {
1099 num_internal_keys_skipped_++;
1100 }
1101 return false;
1102 }
1103
IsVisible(SequenceNumber sequence,const Slice & ts)1104 bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) {
1105 // Remember that comparator orders preceding timestamp as larger.
1106 int cmp_ts = timestamp_ub_ != nullptr
1107 ? user_comparator_.CompareTimestamp(ts, *timestamp_ub_)
1108 : 0;
1109 if (cmp_ts > 0) {
1110 return false;
1111 }
1112 if (read_callback_ == nullptr) {
1113 return sequence <= sequence_;
1114 } else {
1115 // TODO(yanqin): support timestamp in read_callback_.
1116 return read_callback_->IsVisible(sequence);
1117 }
1118 }
1119
SetSavedKeyToSeekTarget(const Slice & target)1120 void DBIter::SetSavedKeyToSeekTarget(const Slice& target) {
1121 is_key_seqnum_zero_ = false;
1122 SequenceNumber seq = sequence_;
1123 saved_key_.Clear();
1124 saved_key_.SetInternalKey(target, seq, kValueTypeForSeek, timestamp_ub_);
1125
1126 if (iterate_lower_bound_ != nullptr &&
1127 user_comparator_.CompareWithoutTimestamp(
1128 saved_key_.GetUserKey(), /*a_has_ts=*/true, *iterate_lower_bound_,
1129 /*b_has_ts=*/false) < 0) {
1130 // Seek key is smaller than the lower bound.
1131 saved_key_.Clear();
1132 saved_key_.SetInternalKey(*iterate_lower_bound_, seq, kValueTypeForSeek,
1133 timestamp_ub_);
1134 }
1135 }
1136
SetSavedKeyToSeekForPrevTarget(const Slice & target)1137 void DBIter::SetSavedKeyToSeekForPrevTarget(const Slice& target) {
1138 is_key_seqnum_zero_ = false;
1139 saved_key_.Clear();
1140 // now saved_key is used to store internal key.
1141 saved_key_.SetInternalKey(target, 0 /* sequence_number */,
1142 kValueTypeForSeekForPrev);
1143
1144 if (iterate_upper_bound_ != nullptr &&
1145 user_comparator_.Compare(saved_key_.GetUserKey(),
1146 *iterate_upper_bound_) >= 0) {
1147 saved_key_.Clear();
1148 saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber);
1149 }
1150 }
1151
Seek(const Slice & target)1152 void DBIter::Seek(const Slice& target) {
1153 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1154 StopWatch sw(env_, statistics_, DB_SEEK);
1155
1156 #ifndef ROCKSDB_LITE
1157 if (db_impl_ != nullptr && cfd_ != nullptr) {
1158 db_impl_->TraceIteratorSeek(cfd_->GetID(), target);
1159 }
1160 #endif // ROCKSDB_LITE
1161
1162 status_ = Status::OK();
1163 ReleaseTempPinnedData();
1164 ResetInternalKeysSkippedCounter();
1165
1166 // Seek the inner iterator based on the target key.
1167 {
1168 PERF_TIMER_GUARD(seek_internal_seek_time);
1169
1170 SetSavedKeyToSeekTarget(target);
1171 iter_.Seek(saved_key_.GetInternalKey());
1172
1173 range_del_agg_.InvalidateRangeDelMapPositions();
1174 RecordTick(statistics_, NUMBER_DB_SEEK);
1175 }
1176 if (!iter_.Valid()) {
1177 valid_ = false;
1178 return;
1179 }
1180 direction_ = kForward;
1181
1182 // Now the inner iterator is placed to the target position. From there,
1183 // we need to find out the next key that is visible to the user.
1184 ClearSavedValue();
1185 if (prefix_same_as_start_) {
1186 // The case where the iterator needs to be invalidated if it has exausted
1187 // keys within the same prefix of the seek key.
1188 assert(prefix_extractor_ != nullptr);
1189 Slice target_prefix = prefix_extractor_->Transform(target);
1190 FindNextUserEntry(false /* not skipping saved_key */,
1191 &target_prefix /* prefix */);
1192 if (valid_) {
1193 // Remember the prefix of the seek key for the future Next() call to
1194 // check.
1195 prefix_.SetUserKey(target_prefix);
1196 }
1197 } else {
1198 FindNextUserEntry(false /* not skipping saved_key */, nullptr);
1199 }
1200 if (!valid_) {
1201 return;
1202 }
1203
1204 // Updating stats and perf context counters.
1205 if (statistics_ != nullptr) {
1206 // Decrement since we don't want to count this key as skipped
1207 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1208 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1209 }
1210 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1211 }
1212
SeekForPrev(const Slice & target)1213 void DBIter::SeekForPrev(const Slice& target) {
1214 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1215 StopWatch sw(env_, statistics_, DB_SEEK);
1216
1217 #ifndef ROCKSDB_LITE
1218 if (db_impl_ != nullptr && cfd_ != nullptr) {
1219 db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target);
1220 }
1221 #endif // ROCKSDB_LITE
1222
1223 if (timestamp_size_ > 0) {
1224 valid_ = false;
1225 status_ = Status::NotSupported(
1226 "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
1227 return;
1228 }
1229
1230 status_ = Status::OK();
1231 ReleaseTempPinnedData();
1232 ResetInternalKeysSkippedCounter();
1233
1234 // Seek the inner iterator based on the target key.
1235 {
1236 PERF_TIMER_GUARD(seek_internal_seek_time);
1237 SetSavedKeyToSeekForPrevTarget(target);
1238 iter_.SeekForPrev(saved_key_.GetInternalKey());
1239 range_del_agg_.InvalidateRangeDelMapPositions();
1240 RecordTick(statistics_, NUMBER_DB_SEEK);
1241 }
1242 if (!iter_.Valid()) {
1243 valid_ = false;
1244 return;
1245 }
1246 direction_ = kReverse;
1247
1248 // Now the inner iterator is placed to the target position. From there,
1249 // we need to find out the first key that is visible to the user in the
1250 // backward direction.
1251 ClearSavedValue();
1252 if (prefix_same_as_start_) {
1253 // The case where the iterator needs to be invalidated if it has exausted
1254 // keys within the same prefix of the seek key.
1255 assert(prefix_extractor_ != nullptr);
1256 Slice target_prefix = prefix_extractor_->Transform(target);
1257 PrevInternal(&target_prefix);
1258 if (valid_) {
1259 // Remember the prefix of the seek key for the future Prev() call to
1260 // check.
1261 prefix_.SetUserKey(target_prefix);
1262 }
1263 } else {
1264 PrevInternal(nullptr);
1265 }
1266
1267 // Report stats and perf context.
1268 if (statistics_ != nullptr && valid_) {
1269 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1270 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1271 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1272 }
1273 }
1274
SeekToFirst()1275 void DBIter::SeekToFirst() {
1276 if (iterate_lower_bound_ != nullptr) {
1277 Seek(*iterate_lower_bound_);
1278 return;
1279 }
1280 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1281 // Don't use iter_::Seek() if we set a prefix extractor
1282 // because prefix seek will be used.
1283 if (!expect_total_order_inner_iter()) {
1284 max_skip_ = std::numeric_limits<uint64_t>::max();
1285 }
1286 status_ = Status::OK();
1287 direction_ = kForward;
1288 ReleaseTempPinnedData();
1289 ResetInternalKeysSkippedCounter();
1290 ClearSavedValue();
1291 is_key_seqnum_zero_ = false;
1292
1293 {
1294 PERF_TIMER_GUARD(seek_internal_seek_time);
1295 iter_.SeekToFirst();
1296 range_del_agg_.InvalidateRangeDelMapPositions();
1297 }
1298
1299 RecordTick(statistics_, NUMBER_DB_SEEK);
1300 if (iter_.Valid()) {
1301 saved_key_.SetUserKey(
1302 ExtractUserKey(iter_.key()),
1303 !iter_.iter()->IsKeyPinned() || !pin_thru_lifetime_ /* copy */);
1304 FindNextUserEntry(false /* not skipping saved_key */,
1305 nullptr /* no prefix check */);
1306 if (statistics_ != nullptr) {
1307 if (valid_) {
1308 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1309 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1310 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1311 }
1312 }
1313 } else {
1314 valid_ = false;
1315 }
1316 if (valid_ && prefix_same_as_start_) {
1317 assert(prefix_extractor_ != nullptr);
1318 prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1319 }
1320 }
1321
SeekToLast()1322 void DBIter::SeekToLast() {
1323 if (timestamp_size_ > 0) {
1324 valid_ = false;
1325 status_ = Status::NotSupported(
1326 "SeekToLast/SeekForPrev/Prev currently not supported with timestamp.");
1327 return;
1328 }
1329
1330 if (iterate_upper_bound_ != nullptr) {
1331 // Seek to last key strictly less than ReadOptions.iterate_upper_bound.
1332 SeekForPrev(*iterate_upper_bound_);
1333 if (Valid() && user_comparator_.Equal(*iterate_upper_bound_, key())) {
1334 ReleaseTempPinnedData();
1335 PrevInternal(nullptr);
1336 }
1337 return;
1338 }
1339
1340 PERF_CPU_TIMER_GUARD(iter_seek_cpu_nanos, env_);
1341 // Don't use iter_::Seek() if we set a prefix extractor
1342 // because prefix seek will be used.
1343 if (!expect_total_order_inner_iter()) {
1344 max_skip_ = std::numeric_limits<uint64_t>::max();
1345 }
1346 status_ = Status::OK();
1347 direction_ = kReverse;
1348 ReleaseTempPinnedData();
1349 ResetInternalKeysSkippedCounter();
1350 ClearSavedValue();
1351 is_key_seqnum_zero_ = false;
1352
1353 {
1354 PERF_TIMER_GUARD(seek_internal_seek_time);
1355 iter_.SeekToLast();
1356 range_del_agg_.InvalidateRangeDelMapPositions();
1357 }
1358 PrevInternal(nullptr);
1359 if (statistics_ != nullptr) {
1360 RecordTick(statistics_, NUMBER_DB_SEEK);
1361 if (valid_) {
1362 RecordTick(statistics_, NUMBER_DB_SEEK_FOUND);
1363 RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size());
1364 PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size());
1365 }
1366 }
1367 if (valid_ && prefix_same_as_start_) {
1368 assert(prefix_extractor_ != nullptr);
1369 prefix_.SetUserKey(prefix_extractor_->Transform(saved_key_.GetUserKey()));
1370 }
1371 }
1372
NewDBIterator(Env * env,const ReadOptions & read_options,const ImmutableCFOptions & cf_options,const MutableCFOptions & mutable_cf_options,const Comparator * user_key_comparator,InternalIterator * internal_iter,const SequenceNumber & sequence,uint64_t max_sequential_skip_in_iterations,ReadCallback * read_callback,DBImpl * db_impl,ColumnFamilyData * cfd,bool allow_blob)1373 Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
1374 const ImmutableCFOptions& cf_options,
1375 const MutableCFOptions& mutable_cf_options,
1376 const Comparator* user_key_comparator,
1377 InternalIterator* internal_iter,
1378 const SequenceNumber& sequence,
1379 uint64_t max_sequential_skip_in_iterations,
1380 ReadCallback* read_callback, DBImpl* db_impl,
1381 ColumnFamilyData* cfd, bool allow_blob) {
1382 DBIter* db_iter = new DBIter(
1383 env, read_options, cf_options, mutable_cf_options, user_key_comparator,
1384 internal_iter, sequence, false, max_sequential_skip_in_iterations,
1385 read_callback, db_impl, cfd, allow_blob);
1386 return db_iter;
1387 }
1388
1389 } // namespace ROCKSDB_NAMESPACE
1390