1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/file_prefetch_buffer.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "file/random_access_file_reader.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22
23 namespace ROCKSDB_NAMESPACE {
Prefetch(RandomAccessFileReader * reader,uint64_t offset,size_t n,bool for_compaction)24 Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
25 uint64_t offset, size_t n,
26 bool for_compaction) {
27 if (!enable_ || reader == nullptr) {
28 return Status::OK();
29 }
30 size_t alignment = reader->file()->GetRequiredBufferAlignment();
31 size_t offset_ = static_cast<size_t>(offset);
32 uint64_t rounddown_offset = Rounddown(offset_, alignment);
33 uint64_t roundup_end = Roundup(offset_ + n, alignment);
34 uint64_t roundup_len = roundup_end - rounddown_offset;
35 assert(roundup_len >= alignment);
36 assert(roundup_len % alignment == 0);
37
38 // Check if requested bytes are in the existing buffer_.
39 // If all bytes exist -- return.
40 // If only a few bytes exist -- reuse them & read only what is really needed.
41 // This is typically the case of incremental reading of data.
42 // If no bytes exist in buffer -- full pread.
43
44 Status s;
45 uint64_t chunk_offset_in_buffer = 0;
46 uint64_t chunk_len = 0;
47 bool copy_data_to_new_buffer = false;
48 if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
49 offset <= buffer_offset_ + buffer_.CurrentSize()) {
50 if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
51 // All requested bytes are already in the buffer. So no need to Read
52 // again.
53 return s;
54 } else {
55 // Only a few requested bytes are in the buffer. memmove those chunk of
56 // bytes to the beginning, and memcpy them back into the new buffer if a
57 // new buffer is created.
58 chunk_offset_in_buffer =
59 Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
60 chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
61 assert(chunk_offset_in_buffer % alignment == 0);
62 assert(chunk_len % alignment == 0);
63 assert(chunk_offset_in_buffer + chunk_len <=
64 buffer_offset_ + buffer_.CurrentSize());
65 if (chunk_len > 0) {
66 copy_data_to_new_buffer = true;
67 } else {
68 // this reset is not necessary, but just to be safe.
69 chunk_offset_in_buffer = 0;
70 }
71 }
72 }
73
74 // Create a new buffer only if current capacity is not sufficient, and memcopy
75 // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
76 if (buffer_.Capacity() < roundup_len) {
77 buffer_.Alignment(alignment);
78 buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
79 copy_data_to_new_buffer, chunk_offset_in_buffer,
80 static_cast<size_t>(chunk_len));
81 } else if (chunk_len > 0) {
82 // New buffer not needed. But memmove bytes from tail to the beginning since
83 // chunk_len is greater than 0.
84 buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
85 static_cast<size_t>(chunk_len));
86 }
87
88 Slice result;
89 s = reader->Read(rounddown_offset + chunk_len,
90 static_cast<size_t>(roundup_len - chunk_len), &result,
91 buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
92 if (s.ok()) {
93 buffer_offset_ = rounddown_offset;
94 buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
95 }
96 return s;
97 }
98
TryReadFromCache(uint64_t offset,size_t n,Slice * result,bool for_compaction)99 bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
100 Slice* result, bool for_compaction) {
101 if (track_min_offset_ && offset < min_offset_read_) {
102 min_offset_read_ = static_cast<size_t>(offset);
103 }
104 if (!enable_ || offset < buffer_offset_) {
105 return false;
106 }
107
108 // If the buffer contains only a few of the requested bytes:
109 // If readahead is enabled: prefetch the remaining bytes + readadhead bytes
110 // and satisfy the request.
111 // If readahead is not enabled: return false.
112 if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
113 if (readahead_size_ > 0) {
114 assert(file_reader_ != nullptr);
115 assert(max_readahead_size_ >= readahead_size_);
116 Status s;
117 if (for_compaction) {
118 s = Prefetch(file_reader_, offset, std::max(n, readahead_size_),
119 for_compaction);
120 } else {
121 s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction);
122 }
123 if (!s.ok()) {
124 return false;
125 }
126 readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
127 } else {
128 return false;
129 }
130 }
131
132 uint64_t offset_in_buffer = offset - buffer_offset_;
133 *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
134 return true;
135 }
136 } // namespace ROCKSDB_NAMESPACE
137