1 
2 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
3 //  This source code is licensed under both the GPLv2 (found in the
4 //  COPYING file in the root directory) and Apache 2.0 License
5 //  (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7 #include "utilities/blob_db/blob_file.h"
8 
9 #include <stdio.h>
10 #include <cinttypes>
11 
12 #include <algorithm>
13 #include <memory>
14 
15 #include "db/column_family.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/dbformat.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/filename.h"
20 #include "file/readahead_raf.h"
21 #include "logging/logging.h"
22 #include "utilities/blob_db/blob_db_impl.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 
26 namespace blob_db {
27 
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log)28 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
29                    Logger* info_log)
30     : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {}
31 
BlobFile(const BlobDBImpl * p,const std::string & bdir,uint64_t fn,Logger * info_log,uint32_t column_family_id,CompressionType compression,bool has_ttl,const ExpirationRange & expiration_range)32 BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
33                    Logger* info_log, uint32_t column_family_id,
34                    CompressionType compression, bool has_ttl,
35                    const ExpirationRange& expiration_range)
36     : parent_(p),
37       path_to_dir_(bdir),
38       file_number_(fn),
39       info_log_(info_log),
40       column_family_id_(column_family_id),
41       compression_(compression),
42       has_ttl_(has_ttl),
43       expiration_range_(expiration_range),
44       header_(column_family_id, compression, has_ttl, expiration_range),
45       header_valid_(true) {}
46 
~BlobFile()47 BlobFile::~BlobFile() {
48   if (obsolete_) {
49     std::string pn(PathName());
50     Status s = Env::Default()->DeleteFile(PathName());
51     if (!s.ok()) {
52       // ROCKS_LOG_INFO(db_options_.info_log,
53       // "File could not be deleted %s", pn.c_str());
54     }
55   }
56 }
57 
GetColumnFamilyId() const58 uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; }
59 
PathName() const60 std::string BlobFile::PathName() const {
61   return BlobFileName(path_to_dir_, file_number_);
62 }
63 
OpenRandomAccessReader(Env * env,const DBOptions & db_options,const EnvOptions & env_options) const64 std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
65     Env* env, const DBOptions& db_options,
66     const EnvOptions& env_options) const {
67   constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
68   std::unique_ptr<RandomAccessFile> sfile;
69   std::string path_name(PathName());
70   Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
71   if (!s.ok()) {
72     // report something here.
73     return nullptr;
74   }
75   sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
76 
77   std::unique_ptr<RandomAccessFileReader> sfile_reader;
78   sfile_reader.reset(new RandomAccessFileReader(
79       NewLegacyRandomAccessFileWrapper(sfile), path_name));
80 
81   std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
82       std::move(sfile_reader), db_options.env, db_options.statistics.get());
83 
84   return log_reader;
85 }
86 
DumpState() const87 std::string BlobFile::DumpState() const {
88   char str[1000];
89   snprintf(
90       str, sizeof(str),
91       "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
92       " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
93       "), writer: %d reader: %d",
94       path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
95       closed_.load(), obsolete_.load(), expiration_range_.first,
96       expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
97   return str;
98 }
99 
MarkObsolete(SequenceNumber sequence)100 void BlobFile::MarkObsolete(SequenceNumber sequence) {
101   assert(Immutable());
102   obsolete_sequence_ = sequence;
103   obsolete_.store(true);
104 }
105 
NeedsFsync(bool hard,uint64_t bytes_per_sync) const106 bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
107   assert(last_fsync_ <= file_size_);
108   return (hard) ? file_size_ > last_fsync_
109                 : (file_size_ - last_fsync_) >= bytes_per_sync;
110 }
111 
WriteFooterAndCloseLocked(SequenceNumber sequence)112 Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
113   BlobLogFooter footer;
114   footer.blob_count = blob_count_;
115   if (HasTTL()) {
116     footer.expiration_range = expiration_range_;
117   }
118 
119   // this will close the file and reset the Writable File Pointer.
120   Status s = log_writer_->AppendFooter(footer);
121   if (s.ok()) {
122     closed_ = true;
123     immutable_sequence_ = sequence;
124     file_size_ += BlobLogFooter::kSize;
125   }
126   // delete the sequential writer
127   log_writer_.reset();
128   return s;
129 }
130 
ReadFooter(BlobLogFooter * bf)131 Status BlobFile::ReadFooter(BlobLogFooter* bf) {
132   if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
133     return Status::IOError("File does not have footer", PathName());
134   }
135 
136   uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
137   // assume that ra_file_reader_ is valid before we enter this
138   assert(ra_file_reader_);
139 
140   Slice result;
141   std::string buf;
142   AlignedBuf aligned_buf;
143   Status s;
144   if (ra_file_reader_->use_direct_io()) {
145     s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
146                               nullptr, &aligned_buf);
147   } else {
148     buf.reserve(BlobLogFooter::kSize + 10);
149     s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
150                               &buf[0], nullptr);
151   }
152   if (!s.ok()) return s;
153   if (result.size() != BlobLogFooter::kSize) {
154     // should not happen
155     return Status::IOError("EOF reached before footer");
156   }
157 
158   s = bf->DecodeFrom(result);
159   return s;
160 }
161 
SetFromFooterLocked(const BlobLogFooter & footer)162 Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
163   // assume that file has been fully fsync'd
164   last_fsync_.store(file_size_);
165   blob_count_ = footer.blob_count;
166   expiration_range_ = footer.expiration_range;
167   closed_ = true;
168   return Status::OK();
169 }
170 
Fsync()171 Status BlobFile::Fsync() {
172   Status s;
173   if (log_writer_.get()) {
174     s = log_writer_->Sync();
175     last_fsync_.store(file_size_.load());
176   }
177   return s;
178 }
179 
CloseRandomAccessLocked()180 void BlobFile::CloseRandomAccessLocked() {
181   ra_file_reader_.reset();
182   last_access_ = -1;
183 }
184 
GetReader(Env * env,const EnvOptions & env_options,std::shared_ptr<RandomAccessFileReader> * reader,bool * fresh_open)185 Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
186                            std::shared_ptr<RandomAccessFileReader>* reader,
187                            bool* fresh_open) {
188   assert(reader != nullptr);
189   assert(fresh_open != nullptr);
190   *fresh_open = false;
191   int64_t current_time = 0;
192   env->GetCurrentTime(&current_time);
193   last_access_.store(current_time);
194   Status s;
195 
196   {
197     ReadLock lockbfile_r(&mutex_);
198     if (ra_file_reader_) {
199       *reader = ra_file_reader_;
200       return s;
201     }
202   }
203 
204   WriteLock lockbfile_w(&mutex_);
205   // Double check.
206   if (ra_file_reader_) {
207     *reader = ra_file_reader_;
208     return s;
209   }
210 
211   std::unique_ptr<RandomAccessFile> rfile;
212   s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
213   if (!s.ok()) {
214     ROCKS_LOG_ERROR(info_log_,
215                     "Failed to open blob file for random-read: %s status: '%s'"
216                     " exists: '%s'",
217                     PathName().c_str(), s.ToString().c_str(),
218                     env->FileExists(PathName()).ToString().c_str());
219     return s;
220   }
221 
222   ra_file_reader_ = std::make_shared<RandomAccessFileReader>(
223       NewLegacyRandomAccessFileWrapper(rfile), PathName());
224   *reader = ra_file_reader_;
225   *fresh_open = true;
226   return s;
227 }
228 
ReadMetadata(Env * env,const EnvOptions & env_options)229 Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
230   assert(Immutable());
231   // Get file size.
232   uint64_t file_size = 0;
233   Status s = env->GetFileSize(PathName(), &file_size);
234   if (s.ok()) {
235     file_size_ = file_size;
236   } else {
237     ROCKS_LOG_ERROR(info_log_,
238                     "Failed to get size of blob file %" PRIu64
239                     ", status: %s",
240                     file_number_, s.ToString().c_str());
241     return s;
242   }
243   if (file_size < BlobLogHeader::kSize) {
244     ROCKS_LOG_ERROR(info_log_,
245                     "Incomplete blob file blob file %" PRIu64
246                     ", size: %" PRIu64,
247                     file_number_, file_size);
248     return Status::Corruption("Incomplete blob file header.");
249   }
250 
251   // Create file reader.
252   std::unique_ptr<RandomAccessFile> file;
253   s = env->NewRandomAccessFile(PathName(), &file, env_options);
254   if (!s.ok()) {
255     ROCKS_LOG_ERROR(info_log_,
256                     "Failed to open blob file %" PRIu64 ", status: %s",
257                     file_number_, s.ToString().c_str());
258     return s;
259   }
260   std::unique_ptr<RandomAccessFileReader> file_reader(
261       new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
262                                  PathName()));
263 
264   // Read file header.
265   std::string header_buf;
266   AlignedBuf aligned_buf;
267   Slice header_slice;
268   if (file_reader->use_direct_io()) {
269     s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, nullptr,
270                           &aligned_buf);
271   } else {
272     header_buf.reserve(BlobLogHeader::kSize);
273     s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice,
274                           &header_buf[0], nullptr);
275   }
276   if (!s.ok()) {
277     ROCKS_LOG_ERROR(info_log_,
278                     "Failed to read header of blob file %" PRIu64
279                     ", status: %s",
280                     file_number_, s.ToString().c_str());
281     return s;
282   }
283   BlobLogHeader header;
284   s = header.DecodeFrom(header_slice);
285   if (!s.ok()) {
286     ROCKS_LOG_ERROR(info_log_,
287                     "Failed to decode header of blob file %" PRIu64
288                     ", status: %s",
289                     file_number_, s.ToString().c_str());
290     return s;
291   }
292   column_family_id_ = header.column_family_id;
293   compression_ = header.compression;
294   has_ttl_ = header.has_ttl;
295   if (has_ttl_) {
296     expiration_range_ = header.expiration_range;
297   }
298   header_valid_ = true;
299 
300   // Read file footer.
301   if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
302     // OK not to have footer.
303     assert(!footer_valid_);
304     return Status::OK();
305   }
306   std::string footer_buf;
307   Slice footer_slice;
308   if (file_reader->use_direct_io()) {
309     s = file_reader->Read(file_size - BlobLogFooter::kSize,
310                           BlobLogFooter::kSize, &footer_slice, nullptr,
311                           &aligned_buf);
312   } else {
313     footer_buf.reserve(BlobLogFooter::kSize);
314     s = file_reader->Read(file_size - BlobLogFooter::kSize,
315                           BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
316                           nullptr);
317   }
318   if (!s.ok()) {
319     ROCKS_LOG_ERROR(info_log_,
320                     "Failed to read footer of blob file %" PRIu64
321                     ", status: %s",
322                     file_number_, s.ToString().c_str());
323     return s;
324   }
325   BlobLogFooter footer;
326   s = footer.DecodeFrom(footer_slice);
327   if (!s.ok()) {
328     // OK not to have footer.
329     assert(!footer_valid_);
330     return Status::OK();
331   }
332   blob_count_ = footer.blob_count;
333   if (has_ttl_) {
334     assert(header.expiration_range.first <= footer.expiration_range.first);
335     assert(header.expiration_range.second >= footer.expiration_range.second);
336     expiration_range_ = footer.expiration_range;
337   }
338   footer_valid_ = true;
339   return Status::OK();
340 }
341 
342 }  // namespace blob_db
343 }  // namespace ROCKSDB_NAMESPACE
344 #endif  // ROCKSDB_LITE
345