1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 #include "table/block_based/block_based_table_reader.h"
11
12 #include "table/block_based/reader_common.h"
13
14 // The file contains some member functions of BlockBasedTable that
15 // cannot be implemented in block_based_table_reader.cc because
16 // it's called by other files (e.g. block_based_iterator.h) and
17 // are templates.
18
19 namespace ROCKSDB_NAMESPACE {
20 // Convert an index iterator value (i.e., an encoded BlockHandle)
21 // into an iterator over the contents of the corresponding block.
22 // If input_iter is null, new a iterator
23 // If input_iter is not null, update this iter and return it
24 template <typename TBlockIter>
NewDataBlockIterator(const ReadOptions & ro,const BlockHandle & handle,TBlockIter * input_iter,BlockType block_type,GetContext * get_context,BlockCacheLookupContext * lookup_context,Status s,FilePrefetchBuffer * prefetch_buffer,bool for_compaction)25 TBlockIter* BlockBasedTable::NewDataBlockIterator(
26 const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
27 BlockType block_type, GetContext* get_context,
28 BlockCacheLookupContext* lookup_context, Status s,
29 FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
30 PERF_TIMER_GUARD(new_table_block_iter_nanos);
31
32 TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
33 if (!s.ok()) {
34 iter->Invalidate(s);
35 return iter;
36 }
37
38 CachableEntry<UncompressionDict> uncompression_dict;
39 if (rep_->uncompression_dict_reader) {
40 const bool no_io = (ro.read_tier == kBlockCacheTier);
41 s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
42 prefetch_buffer, no_io, get_context, lookup_context,
43 &uncompression_dict);
44 if (!s.ok()) {
45 iter->Invalidate(s);
46 return iter;
47 }
48 }
49
50 const UncompressionDict& dict = uncompression_dict.GetValue()
51 ? *uncompression_dict.GetValue()
52 : UncompressionDict::GetEmptyDict();
53
54 CachableEntry<Block> block;
55 s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
56 get_context, lookup_context, for_compaction,
57 /* use_cache */ true);
58
59 if (!s.ok()) {
60 assert(block.IsEmpty());
61 iter->Invalidate(s);
62 return iter;
63 }
64
65 assert(block.GetValue() != nullptr);
66
67 // Block contents are pinned and it is still pinned after the iterator
68 // is destroyed as long as cleanup functions are moved to another object,
69 // when:
70 // 1. block cache handle is set to be released in cleanup function, or
71 // 2. it's pointing to immortal source. If own_bytes is true then we are
72 // not reading data from the original source, whether immortal or not.
73 // Otherwise, the block is pinned iff the source is immortal.
74 const bool block_contents_pinned =
75 block.IsCached() ||
76 (!block.GetValue()->own_bytes() && rep_->immortal_table);
77 iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), block_type, iter,
78 block_contents_pinned);
79
80 if (!block.IsCached()) {
81 if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
82 // insert a dummy record to block cache to track the memory usage
83 Cache* const block_cache = rep_->table_options.block_cache.get();
84 Cache::Handle* cache_handle = nullptr;
85 // There are two other types of cache keys: 1) SST cache key added in
86 // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
87 // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
88 // from SST cache key(31 bytes), and use non-zero prefix to
89 // differentiate from `write_buffer_manager`
90 const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
91 char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
92 // Prefix: use rep_->cache_key_prefix padded by 0s
93 memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
94 assert(rep_->cache_key_prefix_size != 0);
95 assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
96 memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
97 char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
98 next_cache_key_id_++);
99 assert(end - cache_key <=
100 static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
101 const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
102 s = block_cache->Insert(unique_key, nullptr,
103 block.GetValue()->ApproximateMemoryUsage(),
104 nullptr, &cache_handle);
105
106 if (s.ok()) {
107 assert(cache_handle != nullptr);
108 iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
109 cache_handle);
110 }
111 }
112 } else {
113 iter->SetCacheHandle(block.GetCacheHandle());
114 }
115
116 block.TransferTo(iter);
117
118 return iter;
119 }
120
121 // Convert an uncompressed data block (i.e CachableEntry<Block>)
122 // into an iterator over the contents of the corresponding block.
123 // If input_iter is null, new a iterator
124 // If input_iter is not null, update this iter and return it
125 template <typename TBlockIter>
NewDataBlockIterator(const ReadOptions & ro,CachableEntry<Block> & block,TBlockIter * input_iter,Status s)126 TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro,
127 CachableEntry<Block>& block,
128 TBlockIter* input_iter,
129 Status s) const {
130 PERF_TIMER_GUARD(new_table_block_iter_nanos);
131
132 TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
133 if (!s.ok()) {
134 iter->Invalidate(s);
135 return iter;
136 }
137
138 assert(block.GetValue() != nullptr);
139 // Block contents are pinned and it is still pinned after the iterator
140 // is destroyed as long as cleanup functions are moved to another object,
141 // when:
142 // 1. block cache handle is set to be released in cleanup function, or
143 // 2. it's pointing to immortal source. If own_bytes is true then we are
144 // not reading data from the original source, whether immortal or not.
145 // Otherwise, the block is pinned iff the source is immortal.
146 const bool block_contents_pinned =
147 block.IsCached() ||
148 (!block.GetValue()->own_bytes() && rep_->immortal_table);
149 iter = InitBlockIterator<TBlockIter>(rep_, block.GetValue(), BlockType::kData,
150 iter, block_contents_pinned);
151
152 if (!block.IsCached()) {
153 if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
154 // insert a dummy record to block cache to track the memory usage
155 Cache* const block_cache = rep_->table_options.block_cache.get();
156 Cache::Handle* cache_handle = nullptr;
157 // There are two other types of cache keys: 1) SST cache key added in
158 // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
159 // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
160 // from SST cache key(31 bytes), and use non-zero prefix to
161 // differentiate from `write_buffer_manager`
162 const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
163 char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
164 // Prefix: use rep_->cache_key_prefix padded by 0s
165 memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
166 assert(rep_->cache_key_prefix_size != 0);
167 assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
168 memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
169 char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
170 next_cache_key_id_++);
171 assert(end - cache_key <=
172 static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
173 const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
174 s = block_cache->Insert(unique_key, nullptr,
175 block.GetValue()->ApproximateMemoryUsage(),
176 nullptr, &cache_handle);
177 if (s.ok()) {
178 assert(cache_handle != nullptr);
179 iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
180 cache_handle);
181 }
182 }
183 } else {
184 iter->SetCacheHandle(block.GetCacheHandle());
185 }
186
187 block.TransferTo(iter);
188 return iter;
189 }
190 } // namespace ROCKSDB_NAMESPACE
191