1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 #include "table/block_based/block_based_table_reader.h"
11 
12 #include "table/block_based/reader_common.h"
13 
14 // The file contains some member functions of BlockBasedTable that
15 // cannot be implemented in block_based_table_reader.cc because
16 // it's called by other files (e.g. block_based_iterator.h) and
17 // are templates.
18 
19 namespace ROCKSDB_NAMESPACE {
20 // Convert an index iterator value (i.e., an encoded BlockHandle)
21 // into an iterator over the contents of the corresponding block.
22 // If input_iter is null, new a iterator
23 // If input_iter is not null, update this iter and return it
24 template <typename TBlockIter>
NewDataBlockIterator(const ReadOptions & ro,const BlockHandle & handle,TBlockIter * input_iter,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,Status s,FilePrefetchBuffer * prefetch_buffer,bool for_compaction)25 TBlockIter* BlockBasedTable::NewDataBlockIterator(
26     const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
27     BlockType block_type, GetContext* get_context,
28     BlockCacheLookupContext* lookup_context, Status s,
29     FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
30   PERF_TIMER_GUARD(new_table_block_iter_nanos);
31 
32   TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
33   if (!s.ok()) {
34     iter->Invalidate(s);
35     return iter;
36   }
37 
38   CachableEntry<UncompressionDict> uncompression_dict;
39   if (rep_->uncompression_dict_reader) {
40     const bool no_io = (ro.read_tier == kBlockCacheTier);
41     s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
42         prefetch_buffer, no_io, get_context, lookup_context,
43         &uncompression_dict);
44     if (!s.ok()) {
45       iter->Invalidate(s);
46       return iter;
47     }
48   }
49 
50   const UncompressionDict& dict = uncompression_dict.GetValue()
51                                       ? *uncompression_dict.GetValue()
52                                       : UncompressionDict::GetEmptyDict();
53 
54   CachableEntry<Block> block;
55   s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
56                     get_context, lookup_context, for_compaction,
57                     /* use_cache */ true);
58 
59   if (!s.ok()) {
60     assert(block.IsEmpty());
61     iter->Invalidate(s);
62     return iter;
63   }
64 
65   assert(block.GetValue() != nullptr);
66 
67   // Block contents are pinned and it is still pinned after the iterator
68   // is destroyed as long as cleanup functions are moved to another object,
69   // when:
70   // 1. block cache handle is set to be released in cleanup function, or
71   // 2. it's pointing to immortal source. If own_bytes is true then we are
72   //    not reading data from the original source, whether immortal or not.
73   //    Otherwise, the block is pinned iff the source is immortal.
74   const bool block_contents_pinned =
75       block.IsCached() ||
76       (!block.GetValue()->own_bytes() && rep_->immortal_table);
77   iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
78                                        block_contents_pinned);
79 
80   if (!block.IsCached()) {
81     if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
82       // insert a dummy record to block cache to track the memory usage
83       Cache* const block_cache = rep_->table_options.block_cache.get();
84       Cache::Handle* cache_handle = nullptr;
85       // There are two other types of cache keys: 1) SST cache key added in
86       // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
87       // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
88       // from SST cache key(31 bytes), and use non-zero prefix to
89       // differentiate from `write_buffer_manager`
90       const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
91       char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
92       // Prefix: use rep_->cache_key_prefix padded by 0s
93       memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
94       assert(rep_->cache_key_prefix_size != 0);
95       assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
96       memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
97       char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
98                                  next_cache_key_id_++);
99       assert(end - cache_key <=
100              static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
101       const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
102       s = block_cache->Insert(unique_key, nullptr,
103                               block.GetValue()->ApproximateMemoryUsage(),
104                               nullptr, &cache_handle);
105 
106       if (s.ok()) {
107         assert(cache_handle != nullptr);
108         iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
109                               cache_handle);
110       }
111     }
112   } else {
113     iter->SetCacheHandle(block.GetCacheHandle());
114   }
115 
116   block.TransferTo(iter);
117 
118   return iter;
119 }
120 
121 // Convert an uncompressed data block (i.e CachableEntry<Block>)
122 // into an iterator over the contents of the corresponding block.
123 // If input_iter is null, new a iterator
124 // If input_iter is not null, update this iter and return it
125 template <typename TBlockIter>
NewDataBlockIterator(const ReadOptions & ro,CachableEntry<Block> & block,TBlockIter * input_iter,Status s)126 TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
127                                                   CachableEntry<Block>& block,
128                                                   TBlockIter* input_iter,
129                                                   Status s) const {
130   PERF_TIMER_GUARD(new_table_block_iter_nanos);
131 
132   TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
133   if (!s.ok()) {
134     iter->Invalidate(s);
135     return iter;
136   }
137 
138   assert(block.GetValue() != nullptr);
139   // Block contents are pinned and it is still pinned after the iterator
140   // is destroyed as long as cleanup functions are moved to another object,
141   // when:
142   // 1. block cache handle is set to be released in cleanup function, or
143   // 2. it's pointing to immortal source. If own_bytes is true then we are
144   //    not reading data from the original source, whether immortal or not.
145   //    Otherwise, the block is pinned iff the source is immortal.
146   const bool block_contents_pinned =
147       block.IsCached() ||
148       (!block.GetValue()->own_bytes() && rep_->immortal_table);
149   iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
150                                        iter, block_contents_pinned);
151 
152   if (!block.IsCached()) {
153     if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
154       // insert a dummy record to block cache to track the memory usage
155       Cache* const block_cache = rep_->table_options.block_cache.get();
156       Cache::Handle* cache_handle = nullptr;
157       // There are two other types of cache keys: 1) SST cache key added in
158       // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
159       // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
160       // from SST cache key(31 bytes), and use non-zero prefix to
161       // differentiate from `write_buffer_manager`
162       const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
163       char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
164       // Prefix: use rep_->cache_key_prefix padded by 0s
165       memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
166       assert(rep_->cache_key_prefix_size != 0);
167       assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
168       memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
169       char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
170                                  next_cache_key_id_++);
171       assert(end - cache_key <=
172              static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
173       const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
174       s = block_cache->Insert(unique_key, nullptr,
175                               block.GetValue()->ApproximateMemoryUsage(),
176                               nullptr, &cache_handle);
177       if (s.ok()) {
178         assert(cache_handle != nullptr);
179         iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
180                               cache_handle);
181       }
182     }
183   } else {
184     iter->SetCacheHandle(block.GetCacheHandle());
185   }
186 
187   block.TransferTo(iter);
188   return iter;
189 }
190 }  // namespace ROCKSDB_NAMESPACE
191