1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "table/block_based/partitioned_index_reader.h"
10
11 #include "cache/simple_deleter.h"
12 #include "table/block_based/partitioned_index_iterator.h"
13
14 namespace ROCKSDB_NAMESPACE {
Create(const BlockBasedTable * table,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context,std::unique_ptr<IndexReader> * index_reader)15 Status PartitionIndexReader::Create(
16 const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
17 bool use_cache, bool prefetch, bool pin,
18 BlockCacheLookupContext* lookup_context,
19 std::unique_ptr<IndexReader>* index_reader) {
20 assert(table != nullptr);
21 assert(table->get_rep());
22 assert(!pin || prefetch);
23 assert(index_reader != nullptr);
24
25 CachableEntry<Block> index_block;
26 if (prefetch || !use_cache) {
27 const Status s =
28 ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
29 /*get_context=*/nullptr, lookup_context, &index_block);
30 if (!s.ok()) {
31 return s;
32 }
33
34 if (use_cache && !pin) {
35 index_block.Reset();
36 }
37 }
38
39 index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
40
41 return Status::OK();
42 }
43
NewIterator(const ReadOptions & read_options,bool,IndexBlockIter * iter,GetContext * get_context,BlockCacheLookupContext * lookup_context)44 InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
45 const ReadOptions& read_options, bool /* disable_prefix_seek */,
46 IndexBlockIter* iter, GetContext* get_context,
47 BlockCacheLookupContext* lookup_context) {
48 const bool no_io = (read_options.read_tier == kBlockCacheTier);
49 CachableEntry<Block> index_block;
50 const Status s =
51 GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
52 if (!s.ok()) {
53 if (iter != nullptr) {
54 iter->Invalidate(s);
55 return iter;
56 }
57
58 return NewErrorInternalIterator<IndexValue>(s);
59 }
60
61 const BlockBasedTable::Rep* rep = table()->rep_;
62 InternalIteratorBase<IndexValue>* it = nullptr;
63
64 Statistics* kNullStats = nullptr;
65 // Filters are already checked before seeking the index
66 if (!partition_map_.empty()) {
67 // We don't return pinned data from index blocks, so no need
68 // to set `block_contents_pinned`.
69 it = NewTwoLevelIterator(
70 new BlockBasedTable::PartitionedIndexIteratorState(table(),
71 &partition_map_),
72 index_block.GetValue()->NewIndexIterator(
73 internal_comparator(), internal_comparator()->user_comparator(),
74 rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
75 index_has_first_key(), index_key_includes_seq(),
76 index_value_is_full()));
77 } else {
78 ReadOptions ro;
79 ro.fill_cache = read_options.fill_cache;
80 // We don't return pinned data from index blocks, so no need
81 // to set `block_contents_pinned`.
82 std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
83 index_block.GetValue()->NewIndexIterator(
84 internal_comparator(), internal_comparator()->user_comparator(),
85 rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
86 index_has_first_key(), index_key_includes_seq(),
87 index_value_is_full()));
88
89 it = new ParititionedIndexIterator(
90 table(), ro, *internal_comparator(), std::move(index_iter),
91 lookup_context ? lookup_context->caller
92 : TableReaderCaller::kUncategorized);
93 }
94
95 assert(it != nullptr);
96 index_block.TransferTo(it);
97
98 return it;
99
100 // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
101 // on-stack BlockIter while the state is on heap. Currentlly it assumes
102 // the first level iter is always on heap and will attempt to delete it
103 // in its destructor.
104 }
CacheDependencies(bool pin)105 void PartitionIndexReader::CacheDependencies(bool pin) {
106 // Before read partitions, prefetch them to avoid lots of IOs
107 BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
108 const BlockBasedTable::Rep* rep = table()->rep_;
109 IndexBlockIter biter;
110 BlockHandle handle;
111 Statistics* kNullStats = nullptr;
112
113 CachableEntry<Block> index_block;
114 Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
115 &lookup_context, &index_block);
116 if (!s.ok()) {
117 ROCKS_LOG_WARN(rep->ioptions.info_log,
118 "Error retrieving top-level index block while trying to "
119 "cache index partitions: %s",
120 s.ToString().c_str());
121 return;
122 }
123
124 // We don't return pinned data from index blocks, so no need
125 // to set `block_contents_pinned`.
126 index_block.GetValue()->NewIndexIterator(
127 internal_comparator(), internal_comparator()->user_comparator(),
128 rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
129 index_has_first_key(), index_key_includes_seq(), index_value_is_full());
130 // Index partitions are assumed to be consecuitive. Prefetch them all.
131 // Read the first block offset
132 biter.SeekToFirst();
133 if (!biter.Valid()) {
134 // Empty index.
135 return;
136 }
137 handle = biter.value().handle;
138 uint64_t prefetch_off = handle.offset();
139
140 // Read the last block's offset
141 biter.SeekToLast();
142 if (!biter.Valid()) {
143 // Empty index.
144 return;
145 }
146 handle = biter.value().handle;
147 uint64_t last_off = handle.offset() + block_size(handle);
148 uint64_t prefetch_len = last_off - prefetch_off;
149 std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
150 rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer);
151 s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
152 static_cast<size_t>(prefetch_len));
153
154 // After prefetch, read the partitions one by one
155 biter.SeekToFirst();
156 auto ro = ReadOptions();
157 for (; biter.Valid(); biter.Next()) {
158 handle = biter.value().handle;
159 CachableEntry<Block> block;
160 // TODO: Support counter batch update for partitioned index and
161 // filter blocks
162 s = table()->MaybeReadBlockAndLoadToCache(
163 prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
164 &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
165 /*contents=*/nullptr);
166
167 assert(s.ok() || block.GetValue() == nullptr);
168 if (s.ok() && block.GetValue() != nullptr) {
169 if (block.IsCached()) {
170 if (pin) {
171 partition_map_[handle.offset()] = std::move(block);
172 }
173 }
174 }
175 }
176 }
177
178 } // namespace ROCKSDB_NAMESPACE
179