1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/random_access_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
Read(uint64_t offset,size_t n,Slice * result,char * scratch,AlignedBuf * aligned_buf,bool for_compaction) const24 Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
25 char* scratch, AlignedBuf* aligned_buf,
26 bool for_compaction) const {
27 (void)aligned_buf;
28 Status s;
29 uint64_t elapsed = 0;
30 {
31 StopWatch sw(env_, stats_, hist_type_,
32 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
33 true /*delay_enabled*/);
34 auto prev_perf_level = GetPerfLevel();
35 IOSTATS_TIMER_GUARD(read_nanos);
36 if (use_direct_io()) {
37 #ifndef ROCKSDB_LITE
38 size_t alignment = file_->GetRequiredBufferAlignment();
39 size_t aligned_offset =
40 TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
41 size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
42 size_t read_size =
43 Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
44 AlignedBuffer buf;
45 buf.Alignment(alignment);
46 buf.AllocateNewBuffer(read_size);
47 while (buf.CurrentSize() < read_size) {
48 size_t allowed;
49 if (for_compaction && rate_limiter_ != nullptr) {
50 allowed = rate_limiter_->RequestToken(
51 buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
52 Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
53 } else {
54 assert(buf.CurrentSize() == 0);
55 allowed = read_size;
56 }
57 Slice tmp;
58
59 FileOperationInfo::TimePoint start_ts;
60 uint64_t orig_offset = 0;
61 if (ShouldNotifyListeners()) {
62 start_ts = std::chrono::system_clock::now();
63 orig_offset = aligned_offset + buf.CurrentSize();
64 }
65 {
66 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
67 s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
68 IOOptions(), &tmp, buf.Destination(), nullptr);
69 }
70 if (ShouldNotifyListeners()) {
71 auto finish_ts = std::chrono::system_clock::now();
72 NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
73 s);
74 }
75
76 buf.Size(buf.CurrentSize() + tmp.size());
77 if (!s.ok() || tmp.size() < allowed) {
78 break;
79 }
80 }
81 size_t res_len = 0;
82 if (s.ok() && offset_advance < buf.CurrentSize()) {
83 res_len = std::min(buf.CurrentSize() - offset_advance, n);
84 if (aligned_buf == nullptr) {
85 buf.Read(scratch, offset_advance, res_len);
86 } else {
87 scratch = buf.BufferStart();
88 aligned_buf->reset(buf.Release());
89 }
90 }
91 *result = Slice(scratch, res_len);
92 #endif // !ROCKSDB_LITE
93 } else {
94 size_t pos = 0;
95 const char* res_scratch = nullptr;
96 while (pos < n) {
97 size_t allowed;
98 if (for_compaction && rate_limiter_ != nullptr) {
99 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
100 sw.DelayStart();
101 }
102 allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
103 Env::IOPriority::IO_LOW, stats_,
104 RateLimiter::OpType::kRead);
105 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
106 sw.DelayStop();
107 }
108 } else {
109 allowed = n;
110 }
111 Slice tmp_result;
112
113 #ifndef ROCKSDB_LITE
114 FileOperationInfo::TimePoint start_ts;
115 if (ShouldNotifyListeners()) {
116 start_ts = std::chrono::system_clock::now();
117 }
118 #endif
119 {
120 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
121 s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
122 scratch + pos, nullptr);
123 }
124 #ifndef ROCKSDB_LITE
125 if (ShouldNotifyListeners()) {
126 auto finish_ts = std::chrono::system_clock::now();
127 NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
128 finish_ts, s);
129 }
130 #endif
131
132 if (res_scratch == nullptr) {
133 // we can't simply use `scratch` because reads of mmap'd files return
134 // data in a different buffer.
135 res_scratch = tmp_result.data();
136 } else {
137 // make sure chunks are inserted contiguously into `res_scratch`.
138 assert(tmp_result.data() == res_scratch + pos);
139 }
140 pos += tmp_result.size();
141 if (!s.ok() || tmp_result.size() < allowed) {
142 break;
143 }
144 }
145 *result = Slice(res_scratch, s.ok() ? pos : 0);
146 }
147 IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
148 SetPerfLevel(prev_perf_level);
149 }
150 if (stats_ != nullptr && file_read_hist_ != nullptr) {
151 file_read_hist_->Add(elapsed);
152 }
153
154 return s;
155 }
156
End(const FSReadRequest & r)157 size_t End(const FSReadRequest& r) {
158 return static_cast<size_t>(r.offset) + r.len;
159 }
160
Align(const FSReadRequest & r,size_t alignment)161 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
162 FSReadRequest req;
163 req.offset = static_cast<uint64_t>(
164 TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
165 req.len = Roundup(End(r), alignment) - req.offset;
166 req.scratch = nullptr;
167 return req;
168 }
169
170 // Try to merge src to dest if they have overlap.
171 //
172 // Each request represents an inclusive interval [offset, offset + len].
173 // If the intervals have overlap, update offset and len to represent the
174 // merged interval, and return true.
175 // Otherwise, do nothing and return false.
TryMerge(FSReadRequest * dest,const FSReadRequest & src)176 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
177 size_t dest_offset = static_cast<size_t>(dest->offset);
178 size_t src_offset = static_cast<size_t>(src.offset);
179 size_t dest_end = End(*dest);
180 size_t src_end = End(src);
181 if (std::max(dest_offset, dest_offset) > std::min(dest_end, src_end)) {
182 return false;
183 }
184 dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
185 dest->len = std::max(dest_end, src_end) - dest->offset;
186 return true;
187 }
188
MultiRead(FSReadRequest * read_reqs,size_t num_reqs,AlignedBuf * aligned_buf) const189 Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
190 size_t num_reqs,
191 AlignedBuf* aligned_buf) const {
192 (void)aligned_buf; // suppress warning of unused variable in LITE mode
193 assert(num_reqs > 0);
194 Status s;
195 uint64_t elapsed = 0;
196 {
197 StopWatch sw(env_, stats_, hist_type_,
198 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
199 true /*delay_enabled*/);
200 auto prev_perf_level = GetPerfLevel();
201 IOSTATS_TIMER_GUARD(read_nanos);
202
203 FSReadRequest* fs_reqs = read_reqs;
204 size_t num_fs_reqs = num_reqs;
205 #ifndef ROCKSDB_LITE
206 std::vector<FSReadRequest> aligned_reqs;
207 if (use_direct_io()) {
208 // num_reqs is the max possible size,
209 // this can reduce std::vecector's internal resize operations.
210 aligned_reqs.reserve(num_reqs);
211 // Align and merge the read requests.
212 size_t alignment = file_->GetRequiredBufferAlignment();
213 aligned_reqs.push_back(Align(read_reqs[0], alignment));
214 for (size_t i = 1; i < num_reqs; i++) {
215 const auto& r = Align(read_reqs[i], alignment);
216 if (!TryMerge(&aligned_reqs.back(), r)) {
217 aligned_reqs.push_back(r);
218 }
219 }
220
221 // Allocate aligned buffer and let scratch buffers point to it.
222 size_t total_len = 0;
223 for (const auto& r : aligned_reqs) {
224 total_len += r.len;
225 }
226 AlignedBuffer buf;
227 buf.Alignment(alignment);
228 buf.AllocateNewBuffer(total_len);
229 char* scratch = buf.BufferStart();
230 for (auto& r : aligned_reqs) {
231 r.scratch = scratch;
232 scratch += r.len;
233 }
234
235 aligned_buf->reset(buf.Release());
236 fs_reqs = aligned_reqs.data();
237 num_fs_reqs = aligned_reqs.size();
238 }
239 #endif // ROCKSDB_LITE
240
241 #ifndef ROCKSDB_LITE
242 FileOperationInfo::TimePoint start_ts;
243 if (ShouldNotifyListeners()) {
244 start_ts = std::chrono::system_clock::now();
245 }
246 #endif // ROCKSDB_LITE
247 {
248 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
249 s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr);
250 }
251
252 #ifndef ROCKSDB_LITE
253 if (use_direct_io()) {
254 // Populate results in the unaligned read requests.
255 size_t aligned_i = 0;
256 for (size_t i = 0; i < num_reqs; i++) {
257 auto& r = read_reqs[i];
258 if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
259 aligned_i++;
260 }
261 const auto& fs_r = fs_reqs[aligned_i];
262 r.status = fs_r.status;
263 if (r.status.ok()) {
264 uint64_t offset = r.offset - fs_r.offset;
265 size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
266 r.result = Slice(fs_r.scratch + offset, len);
267 } else {
268 r.result = Slice();
269 }
270 }
271 }
272 #endif // ROCKSDB_LITE
273
274 for (size_t i = 0; i < num_reqs; ++i) {
275 #ifndef ROCKSDB_LITE
276 if (ShouldNotifyListeners()) {
277 auto finish_ts = std::chrono::system_clock::now();
278 NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
279 start_ts, finish_ts, read_reqs[i].status);
280 }
281 #endif // ROCKSDB_LITE
282 IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
283 }
284 SetPerfLevel(prev_perf_level);
285 }
286 if (stats_ != nullptr && file_read_hist_ != nullptr) {
287 file_read_hist_->Add(elapsed);
288 }
289
290 return s;
291 }
292
293 } // namespace ROCKSDB_NAMESPACE
294