1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "table/block_based/partitioned_index_reader.h"
10 
11 #include "cache/simple_deleter.h"
12 #include "table/block_based/partitioned_index_iterator.h"
13 
14 namespace ROCKSDB_NAMESPACE {
Create(const BlockBasedTable * table,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context,std::unique_ptr<IndexReader> * index_reader)15 Status PartitionIndexReader::Create(
16     const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
17     bool use_cache, bool prefetch, bool pin,
18     BlockCacheLookupContext* lookup_context,
19     std::unique_ptr<IndexReader>* index_reader) {
20   assert(table != nullptr);
21   assert(table->get_rep());
22   assert(!pin || prefetch);
23   assert(index_reader != nullptr);
24 
25   CachableEntry<Block> index_block;
26   if (prefetch || !use_cache) {
27     const Status s =
28         ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache,
29                        /*get_context=*/nullptr, lookup_context, &index_block);
30     if (!s.ok()) {
31       return s;
32     }
33 
34     if (use_cache && !pin) {
35       index_block.Reset();
36     }
37   }
38 
39   index_reader->reset(new PartitionIndexReader(table, std::move(index_block)));
40 
41   return Status::OK();
42 }
43 
NewIterator(const ReadOptions & read_options,bool,IndexBlockIter * iter,GetContext * get_context,BlockCacheLookupContext * lookup_context)44 InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
45     const ReadOptions& read_options, bool /* disable_prefix_seek */,
46     IndexBlockIter* iter, GetContext* get_context,
47     BlockCacheLookupContext* lookup_context) {
48   const bool no_io = (read_options.read_tier == kBlockCacheTier);
49   CachableEntry<Block> index_block;
50   const Status s =
51       GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block);
52   if (!s.ok()) {
53     if (iter != nullptr) {
54       iter->Invalidate(s);
55       return iter;
56     }
57 
58     return NewErrorInternalIterator<IndexValue>(s);
59   }
60 
61   const BlockBasedTable::Rep* rep = table()->rep_;
62   InternalIteratorBase<IndexValue>* it = nullptr;
63 
64   Statistics* kNullStats = nullptr;
65   // Filters are already checked before seeking the index
66   if (!partition_map_.empty()) {
67     // We don't return pinned data from index blocks, so no need
68     // to set `block_contents_pinned`.
69     it = NewTwoLevelIterator(
70         new BlockBasedTable::PartitionedIndexIteratorState(table(),
71                                                            &partition_map_),
72         index_block.GetValue()->NewIndexIterator(
73             internal_comparator(), internal_comparator()->user_comparator(),
74             rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
75             index_has_first_key(), index_key_includes_seq(),
76             index_value_is_full()));
77   } else {
78     ReadOptions ro;
79     ro.fill_cache = read_options.fill_cache;
80     // We don't return pinned data from index blocks, so no need
81     // to set `block_contents_pinned`.
82     std::unique_ptr<InternalIteratorBase<IndexValue>> index_iter(
83         index_block.GetValue()->NewIndexIterator(
84             internal_comparator(), internal_comparator()->user_comparator(),
85             rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
86             index_has_first_key(), index_key_includes_seq(),
87             index_value_is_full()));
88 
89     it = new ParititionedIndexIterator(
90         table(), ro, *internal_comparator(), std::move(index_iter),
91         lookup_context ? lookup_context->caller
92                        : TableReaderCaller::kUncategorized);
93   }
94 
95   assert(it != nullptr);
96   index_block.TransferTo(it);
97 
98   return it;
99 
100   // TODO(myabandeh): Update TwoLevelIterator to be able to make use of
101   // on-stack BlockIter while the state is on heap. Currentlly it assumes
102   // the first level iter is always on heap and will attempt to delete it
103   // in its destructor.
104 }
CacheDependencies(bool pin)105 void PartitionIndexReader::CacheDependencies(bool pin) {
106   // Before read partitions, prefetch them to avoid lots of IOs
107   BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
108   const BlockBasedTable::Rep* rep = table()->rep_;
109   IndexBlockIter biter;
110   BlockHandle handle;
111   Statistics* kNullStats = nullptr;
112 
113   CachableEntry<Block> index_block;
114   Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */,
115                                  &lookup_context, &index_block);
116   if (!s.ok()) {
117     ROCKS_LOG_WARN(rep->ioptions.info_log,
118                    "Error retrieving top-level index block while trying to "
119                    "cache index partitions: %s",
120                    s.ToString().c_str());
121     return;
122   }
123 
124   // We don't return pinned data from index blocks, so no need
125   // to set `block_contents_pinned`.
126   index_block.GetValue()->NewIndexIterator(
127       internal_comparator(), internal_comparator()->user_comparator(),
128       rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
129       index_has_first_key(), index_key_includes_seq(), index_value_is_full());
130   // Index partitions are assumed to be consecuitive. Prefetch them all.
131   // Read the first block offset
132   biter.SeekToFirst();
133   if (!biter.Valid()) {
134     // Empty index.
135     return;
136   }
137   handle = biter.value().handle;
138   uint64_t prefetch_off = handle.offset();
139 
140   // Read the last block's offset
141   biter.SeekToLast();
142   if (!biter.Valid()) {
143     // Empty index.
144     return;
145   }
146   handle = biter.value().handle;
147   uint64_t last_off = handle.offset() + block_size(handle);
148   uint64_t prefetch_len = last_off - prefetch_off;
149   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
150   rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer);
151   s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
152                                 static_cast<size_t>(prefetch_len));
153 
154   // After prefetch, read the partitions one by one
155   biter.SeekToFirst();
156   auto ro = ReadOptions();
157   for (; biter.Valid(); biter.Next()) {
158     handle = biter.value().handle;
159     CachableEntry<Block> block;
160     // TODO: Support counter batch update for partitioned index and
161     // filter blocks
162     s = table()->MaybeReadBlockAndLoadToCache(
163         prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
164         &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context,
165         /*contents=*/nullptr);
166 
167     assert(s.ok() || block.GetValue() == nullptr);
168     if (s.ok() && block.GetValue() != nullptr) {
169       if (block.IsCached()) {
170         if (pin) {
171           partition_map_[handle.offset()] = std::move(block);
172         }
173       }
174     }
175   }
176 }
177 
178 }  // namespace ROCKSDB_NAMESPACE
179