1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 #ifndef ROCKSDB_LITE
7 
8 #include <atomic>
9 #include <limits>
10 #include <memory>
11 #include <unordered_set>
12 
13 #include "file/random_access_file_reader.h"
14 #include "port/port.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/options.h"
17 #include "utilities/blob_db/blob_log_format.h"
18 #include "utilities/blob_db/blob_log_reader.h"
19 #include "utilities/blob_db/blob_log_writer.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 namespace blob_db {
23 
24 class BlobDBImpl;
25 
26 class BlobFile {
27   friend class BlobDBImpl;
28   friend struct BlobFileComparator;
29   friend struct BlobFileComparatorTTL;
30   friend class BlobIndexCompactionFilterGC;
31 
32  private:
33   // access to parent
34   const BlobDBImpl* parent_{nullptr};
35 
36   // path to blob directory
37   std::string path_to_dir_;
38 
39   // the id of the file.
40   // the above 2 are created during file creation and never changed
41   // after that
42   uint64_t file_number_{0};
43 
44   // The file numbers of the SST files whose oldest blob file reference
45   // points to this blob file.
46   std::unordered_set<uint64_t> linked_sst_files_;
47 
48   // Info log.
49   Logger* info_log_{nullptr};
50 
51   // Column family id.
52   uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()};
53 
54   // Compression type of blobs in the file
55   CompressionType compression_{kNoCompression};
56 
57   // If true, the keys in this file all has TTL. Otherwise all keys don't
58   // have TTL.
59   bool has_ttl_{false};
60 
61   // TTL range of blobs in the file.
62   ExpirationRange expiration_range_;
63 
64   // number of blobs in the file
65   std::atomic<uint64_t> blob_count_{0};
66 
67   // size of the file
68   std::atomic<uint64_t> file_size_{0};
69 
70   BlobLogHeader header_;
71 
72   // closed_ = true implies the file is no more mutable
73   // no more blobs will be appended and the footer has been written out
74   std::atomic<bool> closed_{false};
75 
76   // The latest sequence number when the file was closed/made immutable.
77   SequenceNumber immutable_sequence_{0};
78 
79   // Whether the file was marked obsolete (due to either TTL or GC).
80   // obsolete_ still needs to do iterator/snapshot checks
81   std::atomic<bool> obsolete_{false};
82 
83   // The last sequence number by the time the file marked as obsolete.
84   // Data in this file is visible to a snapshot taken before the sequence.
85   SequenceNumber obsolete_sequence_{0};
86 
87   // Sequential/Append writer for blobs
88   std::shared_ptr<Writer> log_writer_;
89 
90   // random access file reader for GET calls
91   std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
92 
93   // This Read-Write mutex is per file specific and protects
94   // all the datastructures
95   mutable port::RWMutex mutex_;
96 
97   // time when the random access reader was last created.
98   std::atomic<std::int64_t> last_access_{-1};
99 
100   // last time file was fsync'd/fdatasyncd
101   std::atomic<uint64_t> last_fsync_{0};
102 
103   bool header_valid_{false};
104 
105   bool footer_valid_{false};
106 
107  public:
108   BlobFile() = default;
109 
110   BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
111            Logger* info_log);
112 
113   BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
114            Logger* info_log, uint32_t column_family_id,
115            CompressionType compression, bool has_ttl,
116            const ExpirationRange& expiration_range);
117 
118   ~BlobFile();
119 
120   uint32_t GetColumnFamilyId() const;
121 
122   // Returns log file's absolute pathname.
123   std::string PathName() const;
124 
125   // Primary identifier for blob file.
126   // once the file is created, this never changes
BlobFileNumber()127   uint64_t BlobFileNumber() const { return file_number_; }
128 
129   // Get the set of SST files whose oldest blob file reference points to
130   // this file.
GetLinkedSstFiles()131   const std::unordered_set<uint64_t>& GetLinkedSstFiles() const {
132     return linked_sst_files_;
133   }
134 
135   // Link an SST file whose oldest blob file reference points to this file.
LinkSstFile(uint64_t sst_file_number)136   void LinkSstFile(uint64_t sst_file_number) {
137     assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end());
138     linked_sst_files_.insert(sst_file_number);
139   }
140 
141   // Unlink an SST file whose oldest blob file reference points to this file.
UnlinkSstFile(uint64_t sst_file_number)142   void UnlinkSstFile(uint64_t sst_file_number) {
143     auto it = linked_sst_files_.find(sst_file_number);
144     assert(it != linked_sst_files_.end());
145     linked_sst_files_.erase(it);
146   }
147 
148   // the following functions are atomic, and don't need
149   // read lock
BlobCount()150   uint64_t BlobCount() const {
151     return blob_count_.load(std::memory_order_acquire);
152   }
153 
154   std::string DumpState() const;
155 
156   // if the file is not taking any more appends.
Immutable()157   bool Immutable() const { return closed_.load(); }
158 
159   // Mark the file as immutable.
160   // REQUIRES: write lock held, or access from single thread (on DB open).
MarkImmutable(SequenceNumber sequence)161   void MarkImmutable(SequenceNumber sequence) {
162     closed_ = true;
163     immutable_sequence_ = sequence;
164   }
165 
GetImmutableSequence()166   SequenceNumber GetImmutableSequence() const {
167     assert(Immutable());
168     return immutable_sequence_;
169   }
170 
171   // Whether the file was marked obsolete (due to either TTL or GC).
Obsolete()172   bool Obsolete() const {
173     assert(Immutable() || !obsolete_.load());
174     return obsolete_.load();
175   }
176 
177   // Mark file as obsolete (due to either TTL or GC). The file is not visible to
178   // snapshots with sequence greater or equal to the given sequence.
179   void MarkObsolete(SequenceNumber sequence);
180 
GetObsoleteSequence()181   SequenceNumber GetObsoleteSequence() const {
182     assert(Obsolete());
183     return obsolete_sequence_;
184   }
185 
186   // we will assume this is atomic
187   bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
188 
189   Status Fsync();
190 
GetFileSize()191   uint64_t GetFileSize() const {
192     return file_size_.load(std::memory_order_acquire);
193   }
194 
195   // All Get functions which are not atomic, will need ReadLock on the mutex
196 
GetExpirationRange()197   ExpirationRange GetExpirationRange() const { return expiration_range_; }
198 
ExtendExpirationRange(uint64_t expiration)199   void ExtendExpirationRange(uint64_t expiration) {
200     expiration_range_.first = std::min(expiration_range_.first, expiration);
201     expiration_range_.second = std::max(expiration_range_.second, expiration);
202   }
203 
HasTTL()204   bool HasTTL() const { return has_ttl_; }
205 
SetHasTTL(bool has_ttl)206   void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
207 
GetCompressionType()208   CompressionType GetCompressionType() const { return compression_; }
209 
GetWriter()210   std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
211 
212   // Read blob file header and footer. Return corruption if file header is
213   // malform or incomplete. If footer is malform or incomplete, set
214   // footer_valid_ to false and return Status::OK.
215   Status ReadMetadata(Env* env, const EnvOptions& env_options);
216 
217   Status GetReader(Env* env, const EnvOptions& env_options,
218                    std::shared_ptr<RandomAccessFileReader>* reader,
219                    bool* fresh_open);
220 
221  private:
222   std::shared_ptr<Reader> OpenRandomAccessReader(
223       Env* env, const DBOptions& db_options,
224       const EnvOptions& env_options) const;
225 
226   Status ReadFooter(BlobLogFooter* footer);
227 
228   Status WriteFooterAndCloseLocked(SequenceNumber sequence);
229 
230   void CloseRandomAccessLocked();
231 
232   // this is used, when you are reading only the footer of a
233   // previously closed file
234   Status SetFromFooterLocked(const BlobLogFooter& footer);
235 
set_expiration_range(const ExpirationRange & expiration_range)236   void set_expiration_range(const ExpirationRange& expiration_range) {
237     expiration_range_ = expiration_range;
238   }
239 
240   // The following functions are atomic, and don't need locks
SetFileSize(uint64_t fs)241   void SetFileSize(uint64_t fs) { file_size_ = fs; }
242 
SetBlobCount(uint64_t bc)243   void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
244 
BlobRecordAdded(uint64_t record_size)245   void BlobRecordAdded(uint64_t record_size) {
246     ++blob_count_;
247     file_size_ += record_size;
248   }
249 };
250 }  // namespace blob_db
251 }  // namespace ROCKSDB_NAMESPACE
252 #endif  // ROCKSDB_LITE
253