1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 #ifndef ROCKSDB_LITE 7 8 #include <atomic> 9 #include <limits> 10 #include <memory> 11 #include <unordered_set> 12 13 #include "file/random_access_file_reader.h" 14 #include "port/port.h" 15 #include "rocksdb/env.h" 16 #include "rocksdb/options.h" 17 #include "utilities/blob_db/blob_log_format.h" 18 #include "utilities/blob_db/blob_log_reader.h" 19 #include "utilities/blob_db/blob_log_writer.h" 20 21 namespace ROCKSDB_NAMESPACE { 22 namespace blob_db { 23 24 class BlobDBImpl; 25 26 class BlobFile { 27 friend class BlobDBImpl; 28 friend struct BlobFileComparator; 29 friend struct BlobFileComparatorTTL; 30 friend class BlobIndexCompactionFilterGC; 31 32 private: 33 // access to parent 34 const BlobDBImpl* parent_{nullptr}; 35 36 // path to blob directory 37 std::string path_to_dir_; 38 39 // the id of the file. 40 // the above 2 are created during file creation and never changed 41 // after that 42 uint64_t file_number_{0}; 43 44 // The file numbers of the SST files whose oldest blob file reference 45 // points to this blob file. 46 std::unordered_set<uint64_t> linked_sst_files_; 47 48 // Info log. 49 Logger* info_log_{nullptr}; 50 51 // Column family id. 52 uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()}; 53 54 // Compression type of blobs in the file 55 CompressionType compression_{kNoCompression}; 56 57 // If true, the keys in this file all has TTL. Otherwise all keys don't 58 // have TTL. 59 bool has_ttl_{false}; 60 61 // TTL range of blobs in the file. 62 ExpirationRange expiration_range_; 63 64 // number of blobs in the file 65 std::atomic<uint64_t> blob_count_{0}; 66 67 // size of the file 68 std::atomic<uint64_t> file_size_{0}; 69 70 BlobLogHeader header_; 71 72 // closed_ = true implies the file is no more mutable 73 // no more blobs will be appended and the footer has been written out 74 std::atomic<bool> closed_{false}; 75 76 // The latest sequence number when the file was closed/made immutable. 77 SequenceNumber immutable_sequence_{0}; 78 79 // Whether the file was marked obsolete (due to either TTL or GC). 80 // obsolete_ still needs to do iterator/snapshot checks 81 std::atomic<bool> obsolete_{false}; 82 83 // The last sequence number by the time the file marked as obsolete. 84 // Data in this file is visible to a snapshot taken before the sequence. 85 SequenceNumber obsolete_sequence_{0}; 86 87 // Sequential/Append writer for blobs 88 std::shared_ptr<Writer> log_writer_; 89 90 // random access file reader for GET calls 91 std::shared_ptr<RandomAccessFileReader> ra_file_reader_; 92 93 // This Read-Write mutex is per file specific and protects 94 // all the datastructures 95 mutable port::RWMutex mutex_; 96 97 // time when the random access reader was last created. 98 std::atomic<std::int64_t> last_access_{-1}; 99 100 // last time file was fsync'd/fdatasyncd 101 std::atomic<uint64_t> last_fsync_{0}; 102 103 bool header_valid_{false}; 104 105 bool footer_valid_{false}; 106 107 public: 108 BlobFile() = default; 109 110 BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, 111 Logger* info_log); 112 113 BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, 114 Logger* info_log, uint32_t column_family_id, 115 CompressionType compression, bool has_ttl, 116 const ExpirationRange& expiration_range); 117 118 ~BlobFile(); 119 120 uint32_t GetColumnFamilyId() const; 121 122 // Returns log file's absolute pathname. 123 std::string PathName() const; 124 125 // Primary identifier for blob file. 126 // once the file is created, this never changes BlobFileNumber()127 uint64_t BlobFileNumber() const { return file_number_; } 128 129 // Get the set of SST files whose oldest blob file reference points to 130 // this file. GetLinkedSstFiles()131 const std::unordered_set<uint64_t>& GetLinkedSstFiles() const { 132 return linked_sst_files_; 133 } 134 135 // Link an SST file whose oldest blob file reference points to this file. LinkSstFile(uint64_t sst_file_number)136 void LinkSstFile(uint64_t sst_file_number) { 137 assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end()); 138 linked_sst_files_.insert(sst_file_number); 139 } 140 141 // Unlink an SST file whose oldest blob file reference points to this file. UnlinkSstFile(uint64_t sst_file_number)142 void UnlinkSstFile(uint64_t sst_file_number) { 143 auto it = linked_sst_files_.find(sst_file_number); 144 assert(it != linked_sst_files_.end()); 145 linked_sst_files_.erase(it); 146 } 147 148 // the following functions are atomic, and don't need 149 // read lock BlobCount()150 uint64_t BlobCount() const { 151 return blob_count_.load(std::memory_order_acquire); 152 } 153 154 std::string DumpState() const; 155 156 // if the file is not taking any more appends. Immutable()157 bool Immutable() const { return closed_.load(); } 158 159 // Mark the file as immutable. 160 // REQUIRES: write lock held, or access from single thread (on DB open). MarkImmutable(SequenceNumber sequence)161 void MarkImmutable(SequenceNumber sequence) { 162 closed_ = true; 163 immutable_sequence_ = sequence; 164 } 165 GetImmutableSequence()166 SequenceNumber GetImmutableSequence() const { 167 assert(Immutable()); 168 return immutable_sequence_; 169 } 170 171 // Whether the file was marked obsolete (due to either TTL or GC). Obsolete()172 bool Obsolete() const { 173 assert(Immutable() || !obsolete_.load()); 174 return obsolete_.load(); 175 } 176 177 // Mark file as obsolete (due to either TTL or GC). The file is not visible to 178 // snapshots with sequence greater or equal to the given sequence. 179 void MarkObsolete(SequenceNumber sequence); 180 GetObsoleteSequence()181 SequenceNumber GetObsoleteSequence() const { 182 assert(Obsolete()); 183 return obsolete_sequence_; 184 } 185 186 // we will assume this is atomic 187 bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const; 188 189 Status Fsync(); 190 GetFileSize()191 uint64_t GetFileSize() const { 192 return file_size_.load(std::memory_order_acquire); 193 } 194 195 // All Get functions which are not atomic, will need ReadLock on the mutex 196 GetExpirationRange()197 ExpirationRange GetExpirationRange() const { return expiration_range_; } 198 ExtendExpirationRange(uint64_t expiration)199 void ExtendExpirationRange(uint64_t expiration) { 200 expiration_range_.first = std::min(expiration_range_.first, expiration); 201 expiration_range_.second = std::max(expiration_range_.second, expiration); 202 } 203 HasTTL()204 bool HasTTL() const { return has_ttl_; } 205 SetHasTTL(bool has_ttl)206 void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } 207 GetCompressionType()208 CompressionType GetCompressionType() const { return compression_; } 209 GetWriter()210 std::shared_ptr<Writer> GetWriter() const { return log_writer_; } 211 212 // Read blob file header and footer. Return corruption if file header is 213 // malform or incomplete. If footer is malform or incomplete, set 214 // footer_valid_ to false and return Status::OK. 215 Status ReadMetadata(Env* env, const EnvOptions& env_options); 216 217 Status GetReader(Env* env, const EnvOptions& env_options, 218 std::shared_ptr<RandomAccessFileReader>* reader, 219 bool* fresh_open); 220 221 private: 222 std::shared_ptr<Reader> OpenRandomAccessReader( 223 Env* env, const DBOptions& db_options, 224 const EnvOptions& env_options) const; 225 226 Status ReadFooter(BlobLogFooter* footer); 227 228 Status WriteFooterAndCloseLocked(SequenceNumber sequence); 229 230 void CloseRandomAccessLocked(); 231 232 // this is used, when you are reading only the footer of a 233 // previously closed file 234 Status SetFromFooterLocked(const BlobLogFooter& footer); 235 set_expiration_range(const ExpirationRange & expiration_range)236 void set_expiration_range(const ExpirationRange& expiration_range) { 237 expiration_range_ = expiration_range; 238 } 239 240 // The following functions are atomic, and don't need locks SetFileSize(uint64_t fs)241 void SetFileSize(uint64_t fs) { file_size_ = fs; } 242 SetBlobCount(uint64_t bc)243 void SetBlobCount(uint64_t bc) { blob_count_ = bc; } 244 BlobRecordAdded(uint64_t record_size)245 void BlobRecordAdded(uint64_t record_size) { 246 ++blob_count_; 247 file_size_ += record_size; 248 } 249 }; 250 } // namespace blob_db 251 } // namespace ROCKSDB_NAMESPACE 252 #endif // ROCKSDB_LITE 253