1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_fetcher.h"
11
12 #include <cinttypes>
13 #include <string>
14
15 #include "logging/logging.h"
16 #include "memory/memory_allocator.h"
17 #include "monitoring/perf_context_imp.h"
18 #include "rocksdb/env.h"
19 #include "table/block_based/block.h"
20 #include "table/block_based/block_based_table_reader.h"
21 #include "table/format.h"
22 #include "table/persistent_cache_helper.h"
23 #include "util/coding.h"
24 #include "util/compression.h"
25 #include "util/crc32c.h"
26 #include "util/stop_watch.h"
27 #include "util/string_util.h"
28 #include "util/xxhash.h"
29
30 namespace ROCKSDB_NAMESPACE {
31
CheckBlockChecksum()32 inline void BlockFetcher::CheckBlockChecksum() {
33 // Check the crc of the type and the block contents
34 if (read_options_.verify_checksums) {
35 const char* data = slice_.data(); // Pointer to where Read put the data
36 PERF_TIMER_GUARD(block_checksum_time);
37 uint32_t value = DecodeFixed32(data + block_size_ + 1);
38 uint32_t actual = 0;
39 switch (footer_.checksum()) {
40 case kNoChecksum:
41 break;
42 case kCRC32c:
43 value = crc32c::Unmask(value);
44 actual = crc32c::Value(data, block_size_ + 1);
45 break;
46 case kxxHash:
47 actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
48 break;
49 case kxxHash64:
50 actual = static_cast<uint32_t>(
51 XXH64(data, static_cast<int>(block_size_) + 1, 0) &
52 uint64_t{0xffffffff});
53 break;
54 default:
55 status_ = Status::Corruption(
56 "unknown checksum type " + ToString(footer_.checksum()) + " in " +
57 file_->file_name() + " offset " + ToString(handle_.offset()) +
58 " size " + ToString(block_size_));
59 }
60 if (status_.ok() && actual != value) {
61 status_ = Status::Corruption(
62 "block checksum mismatch: expected " + ToString(actual) + ", got " +
63 ToString(value) + " in " + file_->file_name() + " offset " +
64 ToString(handle_.offset()) + " size " + ToString(block_size_));
65 }
66 }
67 }
68
TryGetUncompressBlockFromPersistentCache()69 inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
70 if (cache_options_.persistent_cache &&
71 !cache_options_.persistent_cache->IsCompressed()) {
72 Status status = PersistentCacheHelper::LookupUncompressedPage(
73 cache_options_, handle_, contents_);
74 if (status.ok()) {
75 // uncompressed page is found for the block handle
76 return true;
77 } else {
78 // uncompressed page is not found
79 if (ioptions_.info_log && !status.IsNotFound()) {
80 assert(!status.ok());
81 ROCKS_LOG_INFO(ioptions_.info_log,
82 "Error reading from persistent cache. %s",
83 status.ToString().c_str());
84 }
85 }
86 }
87 return false;
88 }
89
TryGetFromPrefetchBuffer()90 inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
91 if (prefetch_buffer_ != nullptr &&
92 prefetch_buffer_->TryReadFromCache(
93 handle_.offset(),
94 static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
95 for_compaction_)) {
96 block_size_ = static_cast<size_t>(handle_.size());
97 CheckBlockChecksum();
98 if (!status_.ok()) {
99 return true;
100 }
101 got_from_prefetch_buffer_ = true;
102 used_buf_ = const_cast<char*>(slice_.data());
103 }
104 return got_from_prefetch_buffer_;
105 }
106
TryGetCompressedBlockFromPersistentCache()107 inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
108 if (cache_options_.persistent_cache &&
109 cache_options_.persistent_cache->IsCompressed()) {
110 // lookup uncompressed cache mode p-cache
111 std::unique_ptr<char[]> raw_data;
112 status_ = PersistentCacheHelper::LookupRawPage(
113 cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
114 if (status_.ok()) {
115 heap_buf_ = CacheAllocationPtr(raw_data.release());
116 used_buf_ = heap_buf_.get();
117 slice_ = Slice(heap_buf_.get(), block_size_);
118 return true;
119 } else if (!status_.IsNotFound() && ioptions_.info_log) {
120 assert(!status_.ok());
121 ROCKS_LOG_INFO(ioptions_.info_log,
122 "Error reading from persistent cache. %s",
123 status_.ToString().c_str());
124 }
125 }
126 return false;
127 }
128
PrepareBufferForBlockFromFile()129 inline void BlockFetcher::PrepareBufferForBlockFromFile() {
130 // cache miss read from device
131 if (do_uncompress_ &&
132 block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
133 // If we've got a small enough hunk of data, read it in to the
134 // trivially allocated stack buffer instead of needing a full malloc()
135 used_buf_ = &stack_buf_[0];
136 } else if (maybe_compressed_ && !do_uncompress_) {
137 compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
138 memory_allocator_compressed_);
139 used_buf_ = compressed_buf_.get();
140 } else {
141 heap_buf_ =
142 AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
143 used_buf_ = heap_buf_.get();
144 }
145 }
146
InsertCompressedBlockToPersistentCacheIfNeeded()147 inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
148 if (status_.ok() && read_options_.fill_cache &&
149 cache_options_.persistent_cache &&
150 cache_options_.persistent_cache->IsCompressed()) {
151 // insert to raw cache
152 PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
153 block_size_ + kBlockTrailerSize);
154 }
155 }
156
InsertUncompressedBlockToPersistentCacheIfNeeded()157 inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
158 if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
159 cache_options_.persistent_cache &&
160 !cache_options_.persistent_cache->IsCompressed()) {
161 // insert to uncompressed cache
162 PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
163 *contents_);
164 }
165 }
166
CopyBufferToHeap()167 inline void BlockFetcher::CopyBufferToHeap() {
168 assert(used_buf_ != heap_buf_.get());
169 heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
170 memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
171 }
172
GetBlockContents()173 inline void BlockFetcher::GetBlockContents() {
174 if (slice_.data() != used_buf_) {
175 // the slice content is not the buffer provided
176 *contents_ = BlockContents(Slice(slice_.data(), block_size_));
177 } else {
178 // page can be either uncompressed or compressed, the buffer either stack
179 // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
180 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
181 CopyBufferToHeap();
182 } else if (used_buf_ == compressed_buf_.get()) {
183 if (compression_type_ == kNoCompression &&
184 memory_allocator_ != memory_allocator_compressed_) {
185 CopyBufferToHeap();
186 } else {
187 heap_buf_ = std::move(compressed_buf_);
188 }
189 }
190 *contents_ = BlockContents(std::move(heap_buf_), block_size_);
191 }
192 #ifndef NDEBUG
193 contents_->is_raw_block = true;
194 #endif
195 }
196
ReadBlockContents()197 Status BlockFetcher::ReadBlockContents() {
198 block_size_ = static_cast<size_t>(handle_.size());
199
200 if (TryGetUncompressBlockFromPersistentCache()) {
201 compression_type_ = kNoCompression;
202 #ifndef NDEBUG
203 contents_->is_raw_block = true;
204 #endif // NDEBUG
205 return Status::OK();
206 }
207 if (TryGetFromPrefetchBuffer()) {
208 if (!status_.ok()) {
209 return status_;
210 }
211 } else if (!TryGetCompressedBlockFromPersistentCache()) {
212 PrepareBufferForBlockFromFile();
213 Status s;
214
215 {
216 PERF_TIMER_GUARD(block_read_time);
217 // Actual file read
218 status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
219 &slice_, used_buf_, nullptr, for_compaction_);
220 }
221 PERF_COUNTER_ADD(block_read_count, 1);
222
223 // TODO: introduce dedicated perf counter for range tombstones
224 switch (block_type_) {
225 case BlockType::kFilter:
226 PERF_COUNTER_ADD(filter_block_read_count, 1);
227 break;
228
229 case BlockType::kCompressionDictionary:
230 PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
231 break;
232
233 case BlockType::kIndex:
234 PERF_COUNTER_ADD(index_block_read_count, 1);
235 break;
236
237 // Nothing to do here as we don't have counters for the other types.
238 default:
239 break;
240 }
241
242 PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
243 if (!status_.ok()) {
244 return status_;
245 }
246
247 if (slice_.size() != block_size_ + kBlockTrailerSize) {
248 return Status::Corruption("truncated block read from " +
249 file_->file_name() + " offset " +
250 ToString(handle_.offset()) + ", expected " +
251 ToString(block_size_ + kBlockTrailerSize) +
252 " bytes, got " + ToString(slice_.size()));
253 }
254
255 CheckBlockChecksum();
256 if (status_.ok()) {
257 InsertCompressedBlockToPersistentCacheIfNeeded();
258 } else {
259 return status_;
260 }
261 }
262
263 PERF_TIMER_GUARD(block_decompress_time);
264
265 compression_type_ = get_block_compression_type(slice_.data(), block_size_);
266
267 if (do_uncompress_ && compression_type_ != kNoCompression) {
268 // compressed page, uncompress, update cache
269 UncompressionContext context(compression_type_);
270 UncompressionInfo info(context, uncompression_dict_, compression_type_);
271 status_ = UncompressBlockContents(info, slice_.data(), block_size_,
272 contents_, footer_.version(), ioptions_,
273 memory_allocator_);
274 compression_type_ = kNoCompression;
275 } else {
276 GetBlockContents();
277 }
278
279 InsertUncompressedBlockToPersistentCacheIfNeeded();
280
281 return status_;
282 }
283
284 } // namespace ROCKSDB_NAMESPACE
285