1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/block_fetcher.h"
11 
12 #include <cinttypes>
13 #include <string>
14 
15 #include "logging/logging.h"
16 #include "memory/memory_allocator.h"
17 #include "monitoring/perf_context_imp.h"
18 #include "rocksdb/env.h"
19 #include "table/block_based/block.h"
20 #include "table/block_based/block_based_table_reader.h"
21 #include "table/format.h"
22 #include "table/persistent_cache_helper.h"
23 #include "util/coding.h"
24 #include "util/compression.h"
25 #include "util/crc32c.h"
26 #include "util/stop_watch.h"
27 #include "util/string_util.h"
28 #include "util/xxhash.h"
29 
30 namespace ROCKSDB_NAMESPACE {
31 
CheckBlockChecksum()32 inline void BlockFetcher::CheckBlockChecksum() {
33   // Check the crc of the type and the block contents
34   if (read_options_.verify_checksums) {
35     const char* data = slice_.data();  // Pointer to where Read put the data
36     PERF_TIMER_GUARD(block_checksum_time);
37     uint32_t value = DecodeFixed32(data + block_size_ + 1);
38     uint32_t actual = 0;
39     switch (footer_.checksum()) {
40       case kNoChecksum:
41         break;
42       case kCRC32c:
43         value = crc32c::Unmask(value);
44         actual = crc32c::Value(data, block_size_ + 1);
45         break;
46       case kxxHash:
47         actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
48         break;
49       case kxxHash64:
50         actual = static_cast<uint32_t>(
51             XXH64(data, static_cast<int>(block_size_) + 1, 0) &
52             uint64_t{0xffffffff});
53         break;
54       default:
55         status_ = Status::Corruption(
56             "unknown checksum type " + ToString(footer_.checksum()) + " in " +
57             file_->file_name() + " offset " + ToString(handle_.offset()) +
58             " size " + ToString(block_size_));
59     }
60     if (status_.ok() && actual != value) {
61       status_ = Status::Corruption(
62           "block checksum mismatch: expected " + ToString(actual) + ", got " +
63           ToString(value) + "  in " + file_->file_name() + " offset " +
64           ToString(handle_.offset()) + " size " + ToString(block_size_));
65     }
66   }
67 }
68 
TryGetUncompressBlockFromPersistentCache()69 inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
70   if (cache_options_.persistent_cache &&
71       !cache_options_.persistent_cache->IsCompressed()) {
72     Status status = PersistentCacheHelper::LookupUncompressedPage(
73         cache_options_, handle_, contents_);
74     if (status.ok()) {
75       // uncompressed page is found for the block handle
76       return true;
77     } else {
78       // uncompressed page is not found
79       if (ioptions_.info_log && !status.IsNotFound()) {
80         assert(!status.ok());
81         ROCKS_LOG_INFO(ioptions_.info_log,
82                        "Error reading from persistent cache. %s",
83                        status.ToString().c_str());
84       }
85     }
86   }
87   return false;
88 }
89 
TryGetFromPrefetchBuffer()90 inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
91   if (prefetch_buffer_ != nullptr &&
92       prefetch_buffer_->TryReadFromCache(
93           handle_.offset(),
94           static_cast<size_t>(handle_.size()) + kBlockTrailerSize, &slice_,
95           for_compaction_)) {
96     block_size_ = static_cast<size_t>(handle_.size());
97     CheckBlockChecksum();
98     if (!status_.ok()) {
99       return true;
100     }
101     got_from_prefetch_buffer_ = true;
102     used_buf_ = const_cast<char*>(slice_.data());
103   }
104   return got_from_prefetch_buffer_;
105 }
106 
TryGetCompressedBlockFromPersistentCache()107 inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
108   if (cache_options_.persistent_cache &&
109       cache_options_.persistent_cache->IsCompressed()) {
110     // lookup uncompressed cache mode p-cache
111     std::unique_ptr<char[]> raw_data;
112     status_ = PersistentCacheHelper::LookupRawPage(
113         cache_options_, handle_, &raw_data, block_size_ + kBlockTrailerSize);
114     if (status_.ok()) {
115       heap_buf_ = CacheAllocationPtr(raw_data.release());
116       used_buf_ = heap_buf_.get();
117       slice_ = Slice(heap_buf_.get(), block_size_);
118       return true;
119     } else if (!status_.IsNotFound() && ioptions_.info_log) {
120       assert(!status_.ok());
121       ROCKS_LOG_INFO(ioptions_.info_log,
122                      "Error reading from persistent cache. %s",
123                      status_.ToString().c_str());
124     }
125   }
126   return false;
127 }
128 
PrepareBufferForBlockFromFile()129 inline void BlockFetcher::PrepareBufferForBlockFromFile() {
130   // cache miss read from device
131   if (do_uncompress_ &&
132       block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
133     // If we've got a small enough hunk of data, read it in to the
134     // trivially allocated stack buffer instead of needing a full malloc()
135     used_buf_ = &stack_buf_[0];
136   } else if (maybe_compressed_ && !do_uncompress_) {
137     compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
138                                     memory_allocator_compressed_);
139     used_buf_ = compressed_buf_.get();
140   } else {
141     heap_buf_ =
142         AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
143     used_buf_ = heap_buf_.get();
144   }
145 }
146 
InsertCompressedBlockToPersistentCacheIfNeeded()147 inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
148   if (status_.ok() && read_options_.fill_cache &&
149       cache_options_.persistent_cache &&
150       cache_options_.persistent_cache->IsCompressed()) {
151     // insert to raw cache
152     PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_,
153                                          block_size_ + kBlockTrailerSize);
154   }
155 }
156 
InsertUncompressedBlockToPersistentCacheIfNeeded()157 inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
158   if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
159       cache_options_.persistent_cache &&
160       !cache_options_.persistent_cache->IsCompressed()) {
161     // insert to uncompressed cache
162     PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_,
163                                                   *contents_);
164   }
165 }
166 
CopyBufferToHeap()167 inline void BlockFetcher::CopyBufferToHeap() {
168   assert(used_buf_ != heap_buf_.get());
169   heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
170   memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
171 }
172 
GetBlockContents()173 inline void BlockFetcher::GetBlockContents() {
174   if (slice_.data() != used_buf_) {
175     // the slice content is not the buffer provided
176     *contents_ = BlockContents(Slice(slice_.data(), block_size_));
177   } else {
178     // page can be either uncompressed or compressed, the buffer either stack
179     // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
180     if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
181       CopyBufferToHeap();
182     } else if (used_buf_ == compressed_buf_.get()) {
183       if (compression_type_ == kNoCompression &&
184           memory_allocator_ != memory_allocator_compressed_) {
185         CopyBufferToHeap();
186       } else {
187         heap_buf_ = std::move(compressed_buf_);
188       }
189     }
190     *contents_ = BlockContents(std::move(heap_buf_), block_size_);
191   }
192 #ifndef NDEBUG
193   contents_->is_raw_block = true;
194 #endif
195 }
196 
ReadBlockContents()197 Status BlockFetcher::ReadBlockContents() {
198   block_size_ = static_cast<size_t>(handle_.size());
199 
200   if (TryGetUncompressBlockFromPersistentCache()) {
201     compression_type_ = kNoCompression;
202 #ifndef NDEBUG
203     contents_->is_raw_block = true;
204 #endif  // NDEBUG
205     return Status::OK();
206   }
207   if (TryGetFromPrefetchBuffer()) {
208     if (!status_.ok()) {
209       return status_;
210     }
211   } else if (!TryGetCompressedBlockFromPersistentCache()) {
212     PrepareBufferForBlockFromFile();
213     Status s;
214 
215     {
216       PERF_TIMER_GUARD(block_read_time);
217       // Actual file read
218       status_ = file_->Read(handle_.offset(), block_size_ + kBlockTrailerSize,
219                             &slice_, used_buf_, nullptr, for_compaction_);
220     }
221     PERF_COUNTER_ADD(block_read_count, 1);
222 
223     // TODO: introduce dedicated perf counter for range tombstones
224     switch (block_type_) {
225       case BlockType::kFilter:
226         PERF_COUNTER_ADD(filter_block_read_count, 1);
227         break;
228 
229       case BlockType::kCompressionDictionary:
230         PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
231         break;
232 
233       case BlockType::kIndex:
234         PERF_COUNTER_ADD(index_block_read_count, 1);
235         break;
236 
237       // Nothing to do here as we don't have counters for the other types.
238       default:
239         break;
240     }
241 
242     PERF_COUNTER_ADD(block_read_byte, block_size_ + kBlockTrailerSize);
243     if (!status_.ok()) {
244       return status_;
245     }
246 
247     if (slice_.size() != block_size_ + kBlockTrailerSize) {
248       return Status::Corruption("truncated block read from " +
249                                 file_->file_name() + " offset " +
250                                 ToString(handle_.offset()) + ", expected " +
251                                 ToString(block_size_ + kBlockTrailerSize) +
252                                 " bytes, got " + ToString(slice_.size()));
253     }
254 
255     CheckBlockChecksum();
256     if (status_.ok()) {
257       InsertCompressedBlockToPersistentCacheIfNeeded();
258     } else {
259       return status_;
260     }
261   }
262 
263   PERF_TIMER_GUARD(block_decompress_time);
264 
265   compression_type_ = get_block_compression_type(slice_.data(), block_size_);
266 
267   if (do_uncompress_ && compression_type_ != kNoCompression) {
268     // compressed page, uncompress, update cache
269     UncompressionContext context(compression_type_);
270     UncompressionInfo info(context, uncompression_dict_, compression_type_);
271     status_ = UncompressBlockContents(info, slice_.data(), block_size_,
272                                       contents_, footer_.version(), ioptions_,
273                                       memory_allocator_);
274     compression_type_ = kNoCompression;
275   } else {
276     GetBlockContents();
277   }
278 
279   InsertUncompressedBlockToPersistentCacheIfNeeded();
280 
281   return status_;
282 }
283 
284 }  // namespace ROCKSDB_NAMESPACE
285