1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/sequence_file_reader.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "file/read_write_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/aligned_buffer.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23 
24 namespace ROCKSDB_NAMESPACE {
Read(size_t n,Slice * result,char * scratch)25 Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
26   Status s;
27   if (use_direct_io()) {
28 #ifndef ROCKSDB_LITE
29     size_t offset = offset_.fetch_add(n);
30     size_t alignment = file_->GetRequiredBufferAlignment();
31     size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
32     size_t offset_advance = offset - aligned_offset;
33     size_t size = Roundup(offset + n, alignment) - aligned_offset;
34     size_t r = 0;
35     AlignedBuffer buf;
36     buf.Alignment(alignment);
37     buf.AllocateNewBuffer(size);
38     Slice tmp;
39     s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
40                               buf.BufferStart(), nullptr);
41     if (s.ok() && offset_advance < tmp.size()) {
42       buf.Size(tmp.size());
43       r = buf.Read(scratch, offset_advance,
44                    std::min(tmp.size() - offset_advance, n));
45     }
46     *result = Slice(scratch, r);
47 #endif  // !ROCKSDB_LITE
48   } else {
49     s = file_->Read(n, IOOptions(), result, scratch, nullptr);
50   }
51   IOSTATS_ADD(bytes_read, result->size());
52   return s;
53 }
54 
Skip(uint64_t n)55 Status SequentialFileReader::Skip(uint64_t n) {
56 #ifndef ROCKSDB_LITE
57   if (use_direct_io()) {
58     offset_ += static_cast<size_t>(n);
59     return Status::OK();
60   }
61 #endif  // !ROCKSDB_LITE
62   return file_->Skip(n);
63 }
64 
65 namespace {
66 // This class wraps a SequentialFile, exposing same API, with the differenece
67 // of being able to prefetch up to readahead_size bytes and then serve them
68 // from memory, avoiding the entire round-trip if, for example, the data for the
69 // file is actually remote.
70 class ReadaheadSequentialFile : public FSSequentialFile {
71  public:
ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)72   ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
73                           size_t readahead_size)
74       : file_(std::move(file)),
75         alignment_(file_->GetRequiredBufferAlignment()),
76         readahead_size_(Roundup(readahead_size, alignment_)),
77         buffer_(),
78         buffer_offset_(0),
79         read_offset_(0) {
80     buffer_.Alignment(alignment_);
81     buffer_.AllocateNewBuffer(readahead_size_);
82   }
83 
84   ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
85 
86   ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
87 
Read(size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)88   IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
89                 IODebugContext* dbg) override {
90     std::unique_lock<std::mutex> lk(lock_);
91 
92     size_t cached_len = 0;
93     // Check if there is a cache hit, meaning that [offset, offset + n) is
94     // either completely or partially in the buffer. If it's completely cached,
95     // including end of file case when offset + n is greater than EOF, then
96     // return.
97     if (TryReadFromCache(n, &cached_len, scratch) &&
98         (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
99       // We read exactly what we needed, or we hit end of file - return.
100       *result = Slice(scratch, cached_len);
101       return IOStatus::OK();
102     }
103     n -= cached_len;
104 
105     IOStatus s;
106     // Read-ahead only make sense if we have some slack left after reading
107     if (n + alignment_ >= readahead_size_) {
108       s = file_->Read(n, opts, result, scratch + cached_len, dbg);
109       if (s.ok()) {
110         read_offset_ += result->size();
111         *result = Slice(scratch, cached_len + result->size());
112       }
113       buffer_.Clear();
114       return s;
115     }
116 
117     s = ReadIntoBuffer(readahead_size_, opts, dbg);
118     if (s.ok()) {
119       // The data we need is now in cache, so we can safely read it
120       size_t remaining_len;
121       TryReadFromCache(n, &remaining_len, scratch + cached_len);
122       *result = Slice(scratch, cached_len + remaining_len);
123     }
124     return s;
125   }
126 
Skip(uint64_t n)127   IOStatus Skip(uint64_t n) override {
128     std::unique_lock<std::mutex> lk(lock_);
129     IOStatus s = IOStatus::OK();
130     // First check if we need to skip already cached data
131     if (buffer_.CurrentSize() > 0) {
132       // Do we need to skip beyond cached data?
133       if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
134         // Yes. Skip whaterver is in memory and adjust offset accordingly
135         n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
136         read_offset_ = buffer_offset_ + buffer_.CurrentSize();
137       } else {
138         // No. The entire section to be skipped is entirely i cache.
139         read_offset_ += n;
140         n = 0;
141       }
142     }
143     if (n > 0) {
144       // We still need to skip more, so call the file API for skipping
145       s = file_->Skip(n);
146       if (s.ok()) {
147         read_offset_ += n;
148       }
149       buffer_.Clear();
150     }
151     return s;
152   }
153 
PositionedRead(uint64_t offset,size_t n,const IOOptions & opts,Slice * result,char * scratch,IODebugContext * dbg)154   IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
155                           Slice* result, char* scratch,
156                           IODebugContext* dbg) override {
157     return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
158   }
159 
InvalidateCache(size_t offset,size_t length)160   IOStatus InvalidateCache(size_t offset, size_t length) override {
161     std::unique_lock<std::mutex> lk(lock_);
162     buffer_.Clear();
163     return file_->InvalidateCache(offset, length);
164   }
165 
use_direct_io() const166   bool use_direct_io() const override { return file_->use_direct_io(); }
167 
168  private:
169   // Tries to read from buffer_ n bytes. If anything was read from the cache, it
170   // sets cached_len to the number of bytes actually read, copies these number
171   // of bytes to scratch and returns true.
172   // If nothing was read sets cached_len to 0 and returns false.
TryReadFromCache(size_t n,size_t * cached_len,char * scratch)173   bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
174     if (read_offset_ < buffer_offset_ ||
175         read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
176       *cached_len = 0;
177       return false;
178     }
179     uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
180     *cached_len = std::min(
181         buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
182     memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
183     read_offset_ += *cached_len;
184     return true;
185   }
186 
187   // Reads into buffer_ the next n bytes from file_.
188   // Can actually read less if EOF was reached.
189   // Returns the status of the read operastion on the file.
ReadIntoBuffer(size_t n,const IOOptions & opts,IODebugContext * dbg)190   IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
191                           IODebugContext* dbg) {
192     if (n > buffer_.Capacity()) {
193       n = buffer_.Capacity();
194     }
195     assert(IsFileSectorAligned(n, alignment_));
196     Slice result;
197     IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
198     if (s.ok()) {
199       buffer_offset_ = read_offset_;
200       buffer_.Size(result.size());
201       assert(result.size() == 0 || buffer_.BufferStart() == result.data());
202     }
203     return s;
204   }
205 
206   const std::unique_ptr<FSSequentialFile> file_;
207   const size_t alignment_;
208   const size_t readahead_size_;
209 
210   std::mutex lock_;
211   // The buffer storing the prefetched data
212   AlignedBuffer buffer_;
213   // The offset in file_, corresponding to data stored in buffer_
214   uint64_t buffer_offset_;
215   // The offset up to which data was read from file_. In fact, it can be larger
216   // than the actual file size, since the file_->Skip(n) call doesn't return the
217   // actual number of bytes that were skipped, which can be less than n.
218   // This is not a problemm since read_offset_ is monotonically increasing and
219   // its only use is to figure out if next piece of data should be read from
220   // buffer_ or file_ directly.
221   uint64_t read_offset_;
222 };
223 }  // namespace
224 
225 std::unique_ptr<FSSequentialFile>
NewReadaheadSequentialFile(std::unique_ptr<FSSequentialFile> && file,size_t readahead_size)226 SequentialFileReader::NewReadaheadSequentialFile(
227     std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
228   if (file->GetRequiredBufferAlignment() >= readahead_size) {
229     // Short-circuit and return the original file if readahead_size is
230     // too small and hence doesn't make sense to be used for prefetching.
231     return std::move(file);
232   }
233   std::unique_ptr<FSSequentialFile> result(
234       new ReadaheadSequentialFile(std::move(file), readahead_size));
235   return result;
236 }
237 }  // namespace ROCKSDB_NAMESPACE
238