1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/file_prefetch_buffer.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "file/random_access_file_reader.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22 
23 namespace ROCKSDB_NAMESPACE {
Prefetch(RandomAccessFileReader * reader,uint64_t offset,size_t n,bool for_compaction)24 Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,
25                                     uint64_t offset, size_t n,
26                                     bool for_compaction) {
27   if (!enable_ || reader == nullptr) {
28     return Status::OK();
29   }
30   size_t alignment = reader->file()->GetRequiredBufferAlignment();
31   size_t offset_ = static_cast<size_t>(offset);
32   uint64_t rounddown_offset = Rounddown(offset_, alignment);
33   uint64_t roundup_end = Roundup(offset_ + n, alignment);
34   uint64_t roundup_len = roundup_end - rounddown_offset;
35   assert(roundup_len >= alignment);
36   assert(roundup_len % alignment == 0);
37 
38   // Check if requested bytes are in the existing buffer_.
39   // If all bytes exist -- return.
40   // If only a few bytes exist -- reuse them & read only what is really needed.
41   //     This is typically the case of incremental reading of data.
42   // If no bytes exist in buffer -- full pread.
43 
44   Status s;
45   uint64_t chunk_offset_in_buffer = 0;
46   uint64_t chunk_len = 0;
47   bool copy_data_to_new_buffer = false;
48   if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ &&
49       offset <= buffer_offset_ + buffer_.CurrentSize()) {
50     if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) {
51       // All requested bytes are already in the buffer. So no need to Read
52       // again.
53       return s;
54     } else {
55       // Only a few requested bytes are in the buffer. memmove those chunk of
56       // bytes to the beginning, and memcpy them back into the new buffer if a
57       // new buffer is created.
58       chunk_offset_in_buffer =
59           Rounddown(static_cast<size_t>(offset - buffer_offset_), alignment);
60       chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer;
61       assert(chunk_offset_in_buffer % alignment == 0);
62       assert(chunk_len % alignment == 0);
63       assert(chunk_offset_in_buffer + chunk_len <=
64              buffer_offset_ + buffer_.CurrentSize());
65       if (chunk_len > 0) {
66         copy_data_to_new_buffer = true;
67       } else {
68         // this reset is not necessary, but just to be safe.
69         chunk_offset_in_buffer = 0;
70       }
71     }
72   }
73 
74   // Create a new buffer only if current capacity is not sufficient, and memcopy
75   // bytes from old buffer if needed (i.e., if chunk_len is greater than 0).
76   if (buffer_.Capacity() < roundup_len) {
77     buffer_.Alignment(alignment);
78     buffer_.AllocateNewBuffer(static_cast<size_t>(roundup_len),
79                               copy_data_to_new_buffer, chunk_offset_in_buffer,
80                               static_cast<size_t>(chunk_len));
81   } else if (chunk_len > 0) {
82     // New buffer not needed. But memmove bytes from tail to the beginning since
83     // chunk_len is greater than 0.
84     buffer_.RefitTail(static_cast<size_t>(chunk_offset_in_buffer),
85                       static_cast<size_t>(chunk_len));
86   }
87 
88   Slice result;
89   s = reader->Read(rounddown_offset + chunk_len,
90                    static_cast<size_t>(roundup_len - chunk_len), &result,
91                    buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
92   if (s.ok()) {
93     buffer_offset_ = rounddown_offset;
94     buffer_.Size(static_cast<size_t>(chunk_len) + result.size());
95   }
96   return s;
97 }
98 
TryReadFromCache(uint64_t offset,size_t n,Slice * result,bool for_compaction)99 bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n,
100                                           Slice* result, bool for_compaction) {
101   if (track_min_offset_ && offset < min_offset_read_) {
102     min_offset_read_ = static_cast<size_t>(offset);
103   }
104   if (!enable_ || offset < buffer_offset_) {
105     return false;
106   }
107 
108   // If the buffer contains only a few of the requested bytes:
109   //    If readahead is enabled: prefetch the remaining bytes + readadhead bytes
110   //        and satisfy the request.
111   //    If readahead is not enabled: return false.
112   if (offset + n > buffer_offset_ + buffer_.CurrentSize()) {
113     if (readahead_size_ > 0) {
114       assert(file_reader_ != nullptr);
115       assert(max_readahead_size_ >= readahead_size_);
116       Status s;
117       if (for_compaction) {
118         s = Prefetch(file_reader_, offset, std::max(n, readahead_size_),
119                      for_compaction);
120       } else {
121         s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction);
122       }
123       if (!s.ok()) {
124         return false;
125       }
126       readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2);
127     } else {
128       return false;
129     }
130   }
131 
132   uint64_t offset_in_buffer = offset - buffer_offset_;
133   *result = Slice(buffer_.BufferStart() + offset_in_buffer, n);
134   return true;
135 }
136 }  // namespace ROCKSDB_NAMESPACE
137