1
2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7 #include "utilities/blob_db/blob_file.h"
8
9 #include <stdio.h>
10 #include <cinttypes>
11
12 #include <algorithm>
13 #include <memory>
14
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/dbformat.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/filename.h"
20 #include "file/readahead_raf.h"
21 #include "logging/logging.h"
22 #include "utilities/blob_db/blob_db_impl.h"
23
24 namespace ROCKSDB_NAMESPACE {
25
26 namespace blob_db {
27
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log)28 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
29 Logger* info_log)
30 : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
31
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log,uint32_t column_family_id,CompressionType compression,bool has_ttl,const ExpirationRange & expiration_range)32 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
33 Logger* info_log, uint32_t column_family_id,
34 CompressionType compression, bool has_ttl,
35 const ExpirationRange& expiration_range)
36 : parent_(p),
37 path_to_dir_(bdir),
38 file_number_(fn),
39 info_log_(info_log),
40 column_family_id_(column_family_id),
41 compression_(compression),
42 has_ttl_(has_ttl),
43 expiration_range_(expiration_range),
44 header_(column_family_id, compression, has_ttl, expiration_range),
45 header_valid_(true) {}
46
~BlobFile()47 BlobFile::~BlobFile() {
48 if (obsolete_) {
49 std::string pn(PathName());
50 Status s = Env::Default()->DeleteFile(PathName());
51 if (!s.ok()) {
52 // ROCKS_LOG_INFO(db_options_.info_log,
53 // "File could not be deleted %s", pn.c_str());
54 }
55 }
56 }
57
GetColumnFamilyId() const58 uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; }
59
PathName() const60 std::string BlobFile::PathName() const {
61 return BlobFileName(path_to_dir_, file_number_);
62 }
63
OpenRandomAccessReader(Env * env,const DBOptions & db_options,const EnvOptions & env_options) const64 std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
65 Env* env, const DBOptions& db_options,
66 const EnvOptions& env_options) const {
67 constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
68 std::unique_ptr<RandomAccessFile> sfile;
69 std::string path_name(PathName());
70 Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
71 if (!s.ok()) {
72 // report something here.
73 return nullptr;
74 }
75 sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
76
77 std::unique_ptr<RandomAccessFileReader> sfile_reader;
78 sfile_reader.reset(new RandomAccessFileReader(
79 NewLegacyRandomAccessFileWrapper(sfile), path_name));
80
81 std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
82 std::move(sfile_reader), db_options.env, db_options.statistics.get());
83
84 return log_reader;
85 }
86
DumpState() const87 std::string BlobFile::DumpState() const {
88 char str[1000];
89 snprintf(
90 str, sizeof(str),
91 "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
92 " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
93 "), writer: %d reader: %d",
94 path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
95 closed_.load(), obsolete_.load(), expiration_range_.first,
96 expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
97 return str;
98 }
99
MarkObsolete(SequenceNumber sequence)100 void BlobFile::MarkObsolete(SequenceNumber sequence) {
101 assert(Immutable());
102 obsolete_sequence_ = sequence;
103 obsolete_.store(true);
104 }
105
NeedsFsync(bool hard,uint64_t bytes_per_sync) const106 bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
107 assert(last_fsync_ <= file_size_);
108 return (hard) ? file_size_ > last_fsync_
109 : (file_size_ - last_fsync_) >= bytes_per_sync;
110 }
111
WriteFooterAndCloseLocked(SequenceNumber sequence)112 Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
113 BlobLogFooter footer;
114 footer.blob_count = blob_count_;
115 if (HasTTL()) {
116 footer.expiration_range = expiration_range_;
117 }
118
119 // this will close the file and reset the Writable File Pointer.
120 Status s = log_writer_->AppendFooter(footer);
121 if (s.ok()) {
122 closed_ = true;
123 immutable_sequence_ = sequence;
124 file_size_ += BlobLogFooter::kSize;
125 }
126 // delete the sequential writer
127 log_writer_.reset();
128 return s;
129 }
130
ReadFooter(BlobLogFooter * bf)131 Status BlobFile::ReadFooter(BlobLogFooter* bf) {
132 if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
133 return Status::IOError("File does not have footer", PathName());
134 }
135
136 uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
137 // assume that ra_file_reader_ is valid before we enter this
138 assert(ra_file_reader_);
139
140 Slice result;
141 std::string buf;
142 AlignedBuf aligned_buf;
143 Status s;
144 if (ra_file_reader_->use_direct_io()) {
145 s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
146 nullptr, &aligned_buf);
147 } else {
148 buf.reserve(BlobLogFooter::kSize + 10);
149 s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
150 &buf[0], nullptr);
151 }
152 if (!s.ok()) return s;
153 if (result.size() != BlobLogFooter::kSize) {
154 // should not happen
155 return Status::IOError("EOF reached before footer");
156 }
157
158 s = bf->DecodeFrom(result);
159 return s;
160 }
161
SetFromFooterLocked(const BlobLogFooter & footer)162 Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
163 // assume that file has been fully fsync'd
164 last_fsync_.store(file_size_);
165 blob_count_ = footer.blob_count;
166 expiration_range_ = footer.expiration_range;
167 closed_ = true;
168 return Status::OK();
169 }
170
Fsync()171 Status BlobFile::Fsync() {
172 Status s;
173 if (log_writer_.get()) {
174 s = log_writer_->Sync();
175 last_fsync_.store(file_size_.load());
176 }
177 return s;
178 }
179
CloseRandomAccessLocked()180 void BlobFile::CloseRandomAccessLocked() {
181 ra_file_reader_.reset();
182 last_access_ = -1;
183 }
184
GetReader(Env * env,const EnvOptions & env_options,std::shared_ptr<RandomAccessFileReader> * reader,bool * fresh_open)185 Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
186 std::shared_ptr<RandomAccessFileReader>* reader,
187 bool* fresh_open) {
188 assert(reader != nullptr);
189 assert(fresh_open != nullptr);
190 *fresh_open = false;
191 int64_t current_time = 0;
192 env->GetCurrentTime(¤t_time);
193 last_access_.store(current_time);
194 Status s;
195
196 {
197 ReadLock lockbfile_r(&mutex_);
198 if (ra_file_reader_) {
199 *reader = ra_file_reader_;
200 return s;
201 }
202 }
203
204 WriteLock lockbfile_w(&mutex_);
205 // Double check.
206 if (ra_file_reader_) {
207 *reader = ra_file_reader_;
208 return s;
209 }
210
211 std::unique_ptr<RandomAccessFile> rfile;
212 s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
213 if (!s.ok()) {
214 ROCKS_LOG_ERROR(info_log_,
215 "Failed to open blob file for random-read: %s status: '%s'"
216 " exists: '%s'",
217 PathName().c_str(), s.ToString().c_str(),
218 env->FileExists(PathName()).ToString().c_str());
219 return s;
220 }
221
222 ra_file_reader_ = std::make_shared<RandomAccessFileReader>(
223 NewLegacyRandomAccessFileWrapper(rfile), PathName());
224 *reader = ra_file_reader_;
225 *fresh_open = true;
226 return s;
227 }
228
ReadMetadata(Env * env,const EnvOptions & env_options)229 Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
230 assert(Immutable());
231 // Get file size.
232 uint64_t file_size = 0;
233 Status s = env->GetFileSize(PathName(), &file_size);
234 if (s.ok()) {
235 file_size_ = file_size;
236 } else {
237 ROCKS_LOG_ERROR(info_log_,
238 "Failed to get size of blob file %" PRIu64
239 ", status: %s",
240 file_number_, s.ToString().c_str());
241 return s;
242 }
243 if (file_size < BlobLogHeader::kSize) {
244 ROCKS_LOG_ERROR(info_log_,
245 "Incomplete blob file blob file %" PRIu64
246 ", size: %" PRIu64,
247 file_number_, file_size);
248 return Status::Corruption("Incomplete blob file header.");
249 }
250
251 // Create file reader.
252 std::unique_ptr<RandomAccessFile> file;
253 s = env->NewRandomAccessFile(PathName(), &file, env_options);
254 if (!s.ok()) {
255 ROCKS_LOG_ERROR(info_log_,
256 "Failed to open blob file %" PRIu64 ", status: %s",
257 file_number_, s.ToString().c_str());
258 return s;
259 }
260 std::unique_ptr<RandomAccessFileReader> file_reader(
261 new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
262 PathName()));
263
264 // Read file header.
265 std::string header_buf;
266 AlignedBuf aligned_buf;
267 Slice header_slice;
268 if (file_reader->use_direct_io()) {
269 s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr,
270 &aligned_buf);
271 } else {
272 header_buf.reserve(BlobLogHeader::kSize);
273 s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice,
274 &header_buf[0], nullptr);
275 }
276 if (!s.ok()) {
277 ROCKS_LOG_ERROR(info_log_,
278 "Failed to read header of blob file %" PRIu64
279 ", status: %s",
280 file_number_, s.ToString().c_str());
281 return s;
282 }
283 BlobLogHeader header;
284 s = header.DecodeFrom(header_slice);
285 if (!s.ok()) {
286 ROCKS_LOG_ERROR(info_log_,
287 "Failed to decode header of blob file %" PRIu64
288 ", status: %s",
289 file_number_, s.ToString().c_str());
290 return s;
291 }
292 column_family_id_ = header.column_family_id;
293 compression_ = header.compression;
294 has_ttl_ = header.has_ttl;
295 if (has_ttl_) {
296 expiration_range_ = header.expiration_range;
297 }
298 header_valid_ = true;
299
300 // Read file footer.
301 if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
302 // OK not to have footer.
303 assert(!footer_valid_);
304 return Status::OK();
305 }
306 std::string footer_buf;
307 Slice footer_slice;
308 if (file_reader->use_direct_io()) {
309 s = file_reader->Read(file_size - BlobLogFooter::kSize,
310 BlobLogFooter::kSize, &footer_slice, nullptr,
311 &aligned_buf);
312 } else {
313 footer_buf.reserve(BlobLogFooter::kSize);
314 s = file_reader->Read(file_size - BlobLogFooter::kSize,
315 BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
316 nullptr);
317 }
318 if (!s.ok()) {
319 ROCKS_LOG_ERROR(info_log_,
320 "Failed to read footer of blob file %" PRIu64
321 ", status: %s",
322 file_number_, s.ToString().c_str());
323 return s;
324 }
325 BlobLogFooter footer;
326 s = footer.DecodeFrom(footer_slice);
327 if (!s.ok()) {
328 // OK not to have footer.
329 assert(!footer_valid_);
330 return Status::OK();
331 }
332 blob_count_ = footer.blob_count;
333 if (has_ttl_) {
334 assert(header.expiration_range.first <= footer.expiration_range.first);
335 assert(header.expiration_range.second >= footer.expiration_range.second);
336 expiration_range_ = footer.expiration_range;
337 }
338 footer_valid_ = true;
339 return Status::OK();
340 }
341
342 } // namespace blob_db
343 } // namespace ROCKSDB_NAMESPACE
344 #endif // ROCKSDB_LITE
345