1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "table/block_based/partitioned_filter_block.h"
7 
8 #include <utility>
9 
10 #include "monitoring/perf_context_imp.h"
11 #include "port/malloc.h"
12 #include "port/port.h"
13 #include "rocksdb/filter_policy.h"
14 #include "table/block_based/block.h"
15 #include "table/block_based/block_based_table_reader.h"
16 #include "util/coding.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
PartitionedFilterBlockBuilder(const SliceTransform * _prefix_extractor,bool whole_key_filtering,FilterBitsBuilder * filter_bits_builder,int index_block_restart_interval,const bool use_value_delta_encoding,PartitionedIndexBuilder * const p_index_builder,const uint32_t partition_size)20 PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
21     const SliceTransform* _prefix_extractor, bool whole_key_filtering,
22     FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
23     const bool use_value_delta_encoding,
24     PartitionedIndexBuilder* const p_index_builder,
25     const uint32_t partition_size)
26     : FullFilterBlockBuilder(_prefix_extractor, whole_key_filtering,
27                              filter_bits_builder),
28       index_on_filter_block_builder_(index_block_restart_interval,
29                                      true /*use_delta_encoding*/,
30                                      use_value_delta_encoding),
31       index_on_filter_block_builder_without_seq_(index_block_restart_interval,
32                                                  true /*use_delta_encoding*/,
33                                                  use_value_delta_encoding),
34       p_index_builder_(p_index_builder),
35       keys_added_to_partition_(0) {
36   keys_per_partition_ =
37       filter_bits_builder_->CalculateNumEntry(partition_size);
38 }
39 
~PartitionedFilterBlockBuilder()40 PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
41 
MaybeCutAFilterBlock(const Slice * next_key)42 void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
43     const Slice* next_key) {
44   // Use == to send the request only once
45   if (keys_added_to_partition_ == keys_per_partition_) {
46     // Currently only index builder is in charge of cutting a partition. We keep
47     // requesting until it is granted.
48     p_index_builder_->RequestPartitionCut();
49   }
50   if (!p_index_builder_->ShouldCutFilterBlock()) {
51     return;
52   }
53   filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
54 
55   // Add the prefix of the next key before finishing the partition. This hack,
56   // fixes a bug with format_verison=3 where seeking for the prefix would lead
57   // us to the previous partition.
58   const bool add_prefix =
59       next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key);
60   if (add_prefix) {
61     FullFilterBlockBuilder::AddPrefix(*next_key);
62   }
63 
64   Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
65   std::string& index_key = p_index_builder_->GetPartitionKey();
66   filters.push_back({index_key, filter});
67   keys_added_to_partition_ = 0;
68   Reset();
69 }
70 
Add(const Slice & key)71 void PartitionedFilterBlockBuilder::Add(const Slice& key) {
72   MaybeCutAFilterBlock(&key);
73   FullFilterBlockBuilder::Add(key);
74 }
75 
AddKey(const Slice & key)76 void PartitionedFilterBlockBuilder::AddKey(const Slice& key) {
77   FullFilterBlockBuilder::AddKey(key);
78   keys_added_to_partition_++;
79 }
80 
Finish(const BlockHandle & last_partition_block_handle,Status * status)81 Slice PartitionedFilterBlockBuilder::Finish(
82     const BlockHandle& last_partition_block_handle, Status* status) {
83   if (finishing_filters == true) {
84     // Record the handle of the last written filter block in the index
85     FilterEntry& last_entry = filters.front();
86     std::string handle_encoding;
87     last_partition_block_handle.EncodeTo(&handle_encoding);
88     std::string handle_delta_encoding;
89     PutVarsignedint64(
90         &handle_delta_encoding,
91         last_partition_block_handle.size() - last_encoded_handle_.size());
92     last_encoded_handle_ = last_partition_block_handle;
93     const Slice handle_delta_encoding_slice(handle_delta_encoding);
94     index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
95                                        &handle_delta_encoding_slice);
96     if (!p_index_builder_->seperator_is_key_plus_seq()) {
97       index_on_filter_block_builder_without_seq_.Add(
98           ExtractUserKey(last_entry.key), handle_encoding,
99           &handle_delta_encoding_slice);
100     }
101     filters.pop_front();
102   } else {
103     MaybeCutAFilterBlock(nullptr);
104   }
105   // If there is no filter partition left, then return the index on filter
106   // partitions
107   if (UNLIKELY(filters.empty())) {
108     *status = Status::OK();
109     if (finishing_filters) {
110       if (p_index_builder_->seperator_is_key_plus_seq()) {
111         return index_on_filter_block_builder_.Finish();
112       } else {
113         return index_on_filter_block_builder_without_seq_.Finish();
114       }
115     } else {
116       // This is the rare case where no key was added to the filter
117       return Slice();
118     }
119   } else {
120     // Return the next filter partition in line and set Incomplete() status to
121     // indicate we expect more calls to Finish
122     *status = Status::Incomplete();
123     finishing_filters = true;
124     return filters.front().filter;
125   }
126 }
127 
PartitionedFilterBlockReader(const BlockBasedTable * t,CachableEntry<Block> && filter_block)128 PartitionedFilterBlockReader::PartitionedFilterBlockReader(
129     const BlockBasedTable* t, CachableEntry<Block>&& filter_block)
130     : FilterBlockReaderCommon(t, std::move(filter_block)) {}
131 
Create(const BlockBasedTable * table,FilePrefetchBuffer * prefetch_buffer,bool use_cache,bool prefetch,bool pin,BlockCacheLookupContext * lookup_context)132 std::unique_ptr<FilterBlockReader> PartitionedFilterBlockReader::Create(
133     const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
134     bool use_cache, bool prefetch, bool pin,
135     BlockCacheLookupContext* lookup_context) {
136   assert(table);
137   assert(table->get_rep());
138   assert(!pin || prefetch);
139 
140   CachableEntry<Block> filter_block;
141   if (prefetch || !use_cache) {
142     const Status s = ReadFilterBlock(table, prefetch_buffer, ReadOptions(),
143                                      use_cache, nullptr /* get_context */,
144                                      lookup_context, &filter_block);
145     if (!s.ok()) {
146       return std::unique_ptr<FilterBlockReader>();
147     }
148 
149     if (use_cache && !pin) {
150       filter_block.Reset();
151     }
152   }
153 
154   return std::unique_ptr<FilterBlockReader>(
155       new PartitionedFilterBlockReader(table, std::move(filter_block)));
156 }
157 
KeyMayMatch(const Slice & key,const SliceTransform * prefix_extractor,uint64_t block_offset,const bool no_io,const Slice * const const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context)158 bool PartitionedFilterBlockReader::KeyMayMatch(
159     const Slice& key, const SliceTransform* prefix_extractor,
160     uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
161     GetContext* get_context, BlockCacheLookupContext* lookup_context) {
162   assert(const_ikey_ptr != nullptr);
163   assert(block_offset == kNotValid);
164   if (!whole_key_filtering()) {
165     return true;
166   }
167 
168   return MayMatch(key, prefix_extractor, block_offset, no_io, const_ikey_ptr,
169                   get_context, lookup_context,
170                   &FullFilterBlockReader::KeyMayMatch);
171 }
172 
PrefixMayMatch(const Slice & prefix,const SliceTransform * prefix_extractor,uint64_t block_offset,const bool no_io,const Slice * const const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context)173 bool PartitionedFilterBlockReader::PrefixMayMatch(
174     const Slice& prefix, const SliceTransform* prefix_extractor,
175     uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
176     GetContext* get_context, BlockCacheLookupContext* lookup_context) {
177 #ifdef NDEBUG
178   (void)block_offset;
179 #endif
180   assert(const_ikey_ptr != nullptr);
181   assert(block_offset == kNotValid);
182   if (!table_prefix_extractor() && !prefix_extractor) {
183     return true;
184   }
185 
186   return MayMatch(prefix, prefix_extractor, block_offset, no_io, const_ikey_ptr,
187                   get_context, lookup_context,
188                   &FullFilterBlockReader::PrefixMayMatch);
189 }
190 
GetFilterPartitionHandle(const CachableEntry<Block> & filter_block,const Slice & entry) const191 BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
192     const CachableEntry<Block>& filter_block, const Slice& entry) const {
193   IndexBlockIter iter;
194   const InternalKeyComparator* const comparator = internal_comparator();
195   Statistics* kNullStats = nullptr;
196   filter_block.GetValue()->NewIndexIterator(
197       comparator, comparator->user_comparator(),
198       table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter,
199       kNullStats, true /* total_order_seek */, false /* have_first_key */,
200       index_key_includes_seq(), index_value_is_full());
201   iter.Seek(entry);
202   if (UNLIKELY(!iter.Valid())) {
203     // entry is larger than all the keys. However its prefix might still be
204     // present in the last partition. If this is called by PrefixMayMatch this
205     // is necessary for correct behavior. Otherwise it is unnecessary but safe.
206     // Assuming this is an unlikely case for full key search, the performance
207     // overhead should be negligible.
208     iter.SeekToLast();
209   }
210   assert(iter.Valid());
211   BlockHandle fltr_blk_handle = iter.value().handle;
212   return fltr_blk_handle;
213 }
214 
GetFilterPartitionBlock(FilePrefetchBuffer * prefetch_buffer,const BlockHandle & fltr_blk_handle,bool no_io,GetContext * get_context,BlockCacheLookupContext * lookup_context,CachableEntry<ParsedFullFilterBlock> * filter_block) const215 Status PartitionedFilterBlockReader::GetFilterPartitionBlock(
216     FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle,
217     bool no_io, GetContext* get_context,
218     BlockCacheLookupContext* lookup_context,
219     CachableEntry<ParsedFullFilterBlock>* filter_block) const {
220   assert(table());
221   assert(filter_block);
222   assert(filter_block->IsEmpty());
223 
224   if (!filter_map_.empty()) {
225     auto iter = filter_map_.find(fltr_blk_handle.offset());
226     // This is a possible scenario since block cache might not have had space
227     // for the partition
228     if (iter != filter_map_.end()) {
229       filter_block->SetUnownedValue(iter->second.GetValue());
230       return Status::OK();
231     }
232   }
233 
234   ReadOptions read_options;
235   if (no_io) {
236     read_options.read_tier = kBlockCacheTier;
237   }
238 
239   const Status s =
240       table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
241                              UncompressionDict::GetEmptyDict(), filter_block,
242                              BlockType::kFilter, get_context, lookup_context,
243                              /* for_compaction */ false, /* use_cache */ true);
244 
245   return s;
246 }
247 
MayMatch(const Slice & slice,const SliceTransform * prefix_extractor,uint64_t block_offset,bool no_io,const Slice * const_ikey_ptr,GetContext * get_context,BlockCacheLookupContext * lookup_context,FilterFunction filter_function) const248 bool PartitionedFilterBlockReader::MayMatch(
249     const Slice& slice, const SliceTransform* prefix_extractor,
250     uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr,
251     GetContext* get_context, BlockCacheLookupContext* lookup_context,
252     FilterFunction filter_function) const {
253   CachableEntry<Block> filter_block;
254   Status s =
255       GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block);
256   if (UNLIKELY(!s.ok())) {
257     return true;
258   }
259 
260   if (UNLIKELY(filter_block.GetValue()->size() == 0)) {
261     return true;
262   }
263 
264   auto filter_handle = GetFilterPartitionHandle(filter_block, *const_ikey_ptr);
265   if (UNLIKELY(filter_handle.size() == 0)) {  // key is out of range
266     return false;
267   }
268 
269   CachableEntry<ParsedFullFilterBlock> filter_partition_block;
270   s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle,
271                               no_io, get_context, lookup_context,
272                               &filter_partition_block);
273   if (UNLIKELY(!s.ok())) {
274     return true;
275   }
276 
277   FullFilterBlockReader filter_partition(table(),
278                                          std::move(filter_partition_block));
279   return (filter_partition.*filter_function)(
280       slice, prefix_extractor, block_offset, no_io, const_ikey_ptr, get_context,
281       lookup_context);
282 }
283 
ApproximateMemoryUsage() const284 size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
285   size_t usage = ApproximateFilterBlockMemoryUsage();
286 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
287   usage += malloc_usable_size(const_cast<PartitionedFilterBlockReader*>(this));
288 #else
289   usage += sizeof(*this);
290 #endif  // ROCKSDB_MALLOC_USABLE_SIZE
291   return usage;
292   // TODO(myabandeh): better estimation for filter_map_ size
293 }
294 
295 // TODO(myabandeh): merge this with the same function in IndexReader
CacheDependencies(bool pin)296 void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
297   assert(table());
298 
299   const BlockBasedTable::Rep* const rep = table()->get_rep();
300   assert(rep);
301 
302   BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
303 
304   CachableEntry<Block> filter_block;
305 
306   Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */,
307                                   &lookup_context, &filter_block);
308   if (!s.ok()) {
309     ROCKS_LOG_WARN(rep->ioptions.info_log,
310                    "Error retrieving top-level filter block while trying to "
311                    "cache filter partitions: %s",
312                    s.ToString().c_str());
313     return;
314   }
315 
316   // Before read partitions, prefetch them to avoid lots of IOs
317   assert(filter_block.GetValue());
318 
319   IndexBlockIter biter;
320   const InternalKeyComparator* const comparator = internal_comparator();
321   Statistics* kNullStats = nullptr;
322   filter_block.GetValue()->NewIndexIterator(
323       comparator, comparator->user_comparator(),
324       rep->get_global_seqno(BlockType::kFilter), &biter, kNullStats,
325       true /* total_order_seek */, false /* have_first_key */,
326       index_key_includes_seq(), index_value_is_full());
327   // Index partitions are assumed to be consecuitive. Prefetch them all.
328   // Read the first block offset
329   biter.SeekToFirst();
330   BlockHandle handle = biter.value().handle;
331   uint64_t prefetch_off = handle.offset();
332 
333   // Read the last block's offset
334   biter.SeekToLast();
335   handle = biter.value().handle;
336   uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
337   uint64_t prefetch_len = last_off - prefetch_off;
338   std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
339 
340   prefetch_buffer.reset(new FilePrefetchBuffer());
341   s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
342                                 static_cast<size_t>(prefetch_len));
343 
344   // After prefetch, read the partitions one by one
345   ReadOptions read_options;
346   for (biter.SeekToFirst(); biter.Valid(); biter.Next()) {
347     handle = biter.value().handle;
348 
349     CachableEntry<ParsedFullFilterBlock> block;
350     // TODO: Support counter batch update for partitioned index and
351     // filter blocks
352     s = table()->MaybeReadBlockAndLoadToCache(
353         prefetch_buffer.get(), read_options, handle,
354         UncompressionDict::GetEmptyDict(), &block, BlockType::kFilter,
355         nullptr /* get_context */, &lookup_context, nullptr /* contents */);
356 
357     assert(s.ok() || block.GetValue() == nullptr);
358     if (s.ok() && block.GetValue() != nullptr) {
359       if (block.IsCached()) {
360         if (pin) {
361           filter_map_[handle.offset()] = std::move(block);
362         }
363       }
364     }
365   }
366 }
367 
internal_comparator() const368 const InternalKeyComparator* PartitionedFilterBlockReader::internal_comparator()
369     const {
370   assert(table());
371   assert(table()->get_rep());
372 
373   return &table()->get_rep()->internal_comparator;
374 }
375 
index_key_includes_seq() const376 bool PartitionedFilterBlockReader::index_key_includes_seq() const {
377   assert(table());
378   assert(table()->get_rep());
379 
380   return table()->get_rep()->index_key_includes_seq;
381 }
382 
index_value_is_full() const383 bool PartitionedFilterBlockReader::index_value_is_full() const {
384   assert(table());
385   assert(table()->get_rep());
386 
387   return table()->get_rep()->index_value_is_full;
388 }
389 
390 }  // namespace ROCKSDB_NAMESPACE
391