1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/random_access_file_reader.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 
Read(uint64_t offset,size_t n,Slice * result,char * scratch,AlignedBuf * aligned_buf,bool for_compaction) const24 Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
25                                     char* scratch, AlignedBuf* aligned_buf,
26                                     bool for_compaction) const {
27   (void)aligned_buf;
28   Status s;
29   uint64_t elapsed = 0;
30   {
31     StopWatch sw(env_, stats_, hist_type_,
32                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
33                  true /*delay_enabled*/);
34     auto prev_perf_level = GetPerfLevel();
35     IOSTATS_TIMER_GUARD(read_nanos);
36     if (use_direct_io()) {
37 #ifndef ROCKSDB_LITE
38       size_t alignment = file_->GetRequiredBufferAlignment();
39       size_t aligned_offset =
40           TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
41       size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
42       size_t read_size =
43           Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
44       AlignedBuffer buf;
45       buf.Alignment(alignment);
46       buf.AllocateNewBuffer(read_size);
47       while (buf.CurrentSize() < read_size) {
48         size_t allowed;
49         if (for_compaction && rate_limiter_ != nullptr) {
50           allowed = rate_limiter_->RequestToken(
51               buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
52               Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
53         } else {
54           assert(buf.CurrentSize() == 0);
55           allowed = read_size;
56         }
57         Slice tmp;
58 
59         FileOperationInfo::TimePoint start_ts;
60         uint64_t orig_offset = 0;
61         if (ShouldNotifyListeners()) {
62           start_ts = std::chrono::system_clock::now();
63           orig_offset = aligned_offset + buf.CurrentSize();
64         }
65         {
66           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
67           s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
68                           IOOptions(), &tmp, buf.Destination(), nullptr);
69         }
70         if (ShouldNotifyListeners()) {
71           auto finish_ts = std::chrono::system_clock::now();
72           NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
73                                  s);
74         }
75 
76         buf.Size(buf.CurrentSize() + tmp.size());
77         if (!s.ok() || tmp.size() < allowed) {
78           break;
79         }
80       }
81       size_t res_len = 0;
82       if (s.ok() && offset_advance < buf.CurrentSize()) {
83         res_len = std::min(buf.CurrentSize() - offset_advance, n);
84         if (aligned_buf == nullptr) {
85           buf.Read(scratch, offset_advance, res_len);
86         } else {
87           scratch = buf.BufferStart();
88           aligned_buf->reset(buf.Release());
89         }
90       }
91       *result = Slice(scratch, res_len);
92 #endif  // !ROCKSDB_LITE
93     } else {
94       size_t pos = 0;
95       const char* res_scratch = nullptr;
96       while (pos < n) {
97         size_t allowed;
98         if (for_compaction && rate_limiter_ != nullptr) {
99           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
100             sw.DelayStart();
101           }
102           allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
103                                                 Env::IOPriority::IO_LOW, stats_,
104                                                 RateLimiter::OpType::kRead);
105           if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
106             sw.DelayStop();
107           }
108         } else {
109           allowed = n;
110         }
111         Slice tmp_result;
112 
113 #ifndef ROCKSDB_LITE
114         FileOperationInfo::TimePoint start_ts;
115         if (ShouldNotifyListeners()) {
116           start_ts = std::chrono::system_clock::now();
117         }
118 #endif
119         {
120           IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
121           s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
122                           scratch + pos, nullptr);
123         }
124 #ifndef ROCKSDB_LITE
125         if (ShouldNotifyListeners()) {
126           auto finish_ts = std::chrono::system_clock::now();
127           NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
128                                  finish_ts, s);
129         }
130 #endif
131 
132         if (res_scratch == nullptr) {
133           // we can't simply use `scratch` because reads of mmap'd files return
134           // data in a different buffer.
135           res_scratch = tmp_result.data();
136         } else {
137           // make sure chunks are inserted contiguously into `res_scratch`.
138           assert(tmp_result.data() == res_scratch + pos);
139         }
140         pos += tmp_result.size();
141         if (!s.ok() || tmp_result.size() < allowed) {
142           break;
143         }
144       }
145       *result = Slice(res_scratch, s.ok() ? pos : 0);
146     }
147     IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
148     SetPerfLevel(prev_perf_level);
149   }
150   if (stats_ != nullptr && file_read_hist_ != nullptr) {
151     file_read_hist_->Add(elapsed);
152   }
153 
154   return s;
155 }
156 
End(const FSReadRequest & r)157 size_t End(const FSReadRequest& r) {
158   return static_cast<size_t>(r.offset) + r.len;
159 }
160 
Align(const FSReadRequest & r,size_t alignment)161 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
162   FSReadRequest req;
163   req.offset = static_cast<uint64_t>(
164     TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
165   req.len = Roundup(End(r), alignment) - req.offset;
166   req.scratch = nullptr;
167   return req;
168 }
169 
170 // Try to merge src to dest if they have overlap.
171 //
172 // Each request represents an inclusive interval [offset, offset + len].
173 // If the intervals have overlap, update offset and len to represent the
174 // merged interval, and return true.
175 // Otherwise, do nothing and return false.
TryMerge(FSReadRequest * dest,const FSReadRequest & src)176 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
177   size_t dest_offset = static_cast<size_t>(dest->offset);
178   size_t src_offset = static_cast<size_t>(src.offset);
179   size_t dest_end = End(*dest);
180   size_t src_end = End(src);
181   if (std::max(dest_offset, dest_offset) > std::min(dest_end, src_end)) {
182     return false;
183   }
184   dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
185   dest->len = std::max(dest_end, src_end) - dest->offset;
186   return true;
187 }
188 
MultiRead(FSReadRequest * read_reqs,size_t num_reqs,AlignedBuf * aligned_buf) const189 Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
190                                          size_t num_reqs,
191                                          AlignedBuf* aligned_buf) const {
192   (void)aligned_buf;  // suppress warning of unused variable in LITE mode
193   assert(num_reqs > 0);
194   Status s;
195   uint64_t elapsed = 0;
196   {
197     StopWatch sw(env_, stats_, hist_type_,
198                  (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
199                  true /*delay_enabled*/);
200     auto prev_perf_level = GetPerfLevel();
201     IOSTATS_TIMER_GUARD(read_nanos);
202 
203     FSReadRequest* fs_reqs = read_reqs;
204     size_t num_fs_reqs = num_reqs;
205 #ifndef ROCKSDB_LITE
206     std::vector<FSReadRequest> aligned_reqs;
207     if (use_direct_io()) {
208       // num_reqs is the max possible size,
209       // this can reduce std::vecector's internal resize operations.
210       aligned_reqs.reserve(num_reqs);
211       // Align and merge the read requests.
212       size_t alignment = file_->GetRequiredBufferAlignment();
213       aligned_reqs.push_back(Align(read_reqs[0], alignment));
214       for (size_t i = 1; i < num_reqs; i++) {
215         const auto& r = Align(read_reqs[i], alignment);
216         if (!TryMerge(&aligned_reqs.back(), r)) {
217           aligned_reqs.push_back(r);
218         }
219       }
220 
221       // Allocate aligned buffer and let scratch buffers point to it.
222       size_t total_len = 0;
223       for (const auto& r : aligned_reqs) {
224         total_len += r.len;
225       }
226       AlignedBuffer buf;
227       buf.Alignment(alignment);
228       buf.AllocateNewBuffer(total_len);
229       char* scratch = buf.BufferStart();
230       for (auto& r : aligned_reqs) {
231         r.scratch = scratch;
232         scratch += r.len;
233       }
234 
235       aligned_buf->reset(buf.Release());
236       fs_reqs = aligned_reqs.data();
237       num_fs_reqs = aligned_reqs.size();
238     }
239 #endif  // ROCKSDB_LITE
240 
241 #ifndef ROCKSDB_LITE
242     FileOperationInfo::TimePoint start_ts;
243     if (ShouldNotifyListeners()) {
244       start_ts = std::chrono::system_clock::now();
245     }
246 #endif  // ROCKSDB_LITE
247     {
248       IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
249       s = file_->MultiRead(fs_reqs, num_fs_reqs, IOOptions(), nullptr);
250     }
251 
252 #ifndef ROCKSDB_LITE
253     if (use_direct_io()) {
254       // Populate results in the unaligned read requests.
255       size_t aligned_i = 0;
256       for (size_t i = 0; i < num_reqs; i++) {
257         auto& r = read_reqs[i];
258         if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
259           aligned_i++;
260         }
261         const auto& fs_r = fs_reqs[aligned_i];
262         r.status = fs_r.status;
263         if (r.status.ok()) {
264           uint64_t offset = r.offset - fs_r.offset;
265           size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
266           r.result = Slice(fs_r.scratch + offset, len);
267         } else {
268           r.result = Slice();
269         }
270       }
271     }
272 #endif  // ROCKSDB_LITE
273 
274     for (size_t i = 0; i < num_reqs; ++i) {
275 #ifndef ROCKSDB_LITE
276       if (ShouldNotifyListeners()) {
277         auto finish_ts = std::chrono::system_clock::now();
278         NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
279                                start_ts, finish_ts, read_reqs[i].status);
280       }
281 #endif  // ROCKSDB_LITE
282       IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
283     }
284     SetPerfLevel(prev_perf_level);
285   }
286   if (stats_ != nullptr && file_read_hist_ != nullptr) {
287     file_read_hist_->Add(elapsed);
288   }
289 
290   return s;
291 }
292 
293 }  // namespace ROCKSDB_NAMESPACE
294