1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 // 10 // The representation of a DBImpl consists of a set of Versions. The 11 // newest version is called "current". Older versions may be kept 12 // around to provide a consistent view to live iterators. 13 // 14 // Each Version keeps track of a set of table files per level, as well as a 15 // set of blob files. The entire set of versions is maintained in a 16 // VersionSet. 17 // 18 // Version,VersionSet are thread-compatible, but require external 19 // synchronization on all accesses. 20 21 #pragma once 22 #include <atomic> 23 #include <deque> 24 #include <limits> 25 #include <map> 26 #include <memory> 27 #include <set> 28 #include <string> 29 #include <utility> 30 #include <vector> 31 32 #include "db/blob/blob_file_meta.h" 33 #include "db/column_family.h" 34 #include "db/compaction/compaction.h" 35 #include "db/compaction/compaction_picker.h" 36 #include "db/dbformat.h" 37 #include "db/file_indexer.h" 38 #include "db/log_reader.h" 39 #include "db/range_del_aggregator.h" 40 #include "db/read_callback.h" 41 #include "db/table_cache.h" 42 #include "db/version_builder.h" 43 #include "db/version_edit.h" 44 #include "db/write_controller.h" 45 #include "monitoring/instrumented_mutex.h" 46 #include "options/db_options.h" 47 #include "port/port.h" 48 #include "rocksdb/env.h" 49 #include "rocksdb/file_checksum.h" 50 #include "table/get_context.h" 51 #include "table/multiget_context.h" 52 #include "trace_replay/block_cache_tracer.h" 53 54 namespace ROCKSDB_NAMESPACE { 55 56 namespace log { 57 class Writer; 58 } 59 60 class Compaction; 61 class LogBuffer; 62 class LookupKey; 63 class MemTable; 64 class Version; 65 class VersionSet; 66 class WriteBufferManager; 67 class MergeContext; 68 class ColumnFamilySet; 69 class MergeIteratorBuilder; 70 71 // VersionEdit is always supposed to be valid and it is used to point at 72 // entries in Manifest. Ideally it should not be used as a container to 73 // carry around few of its fields as function params because it can cause 74 // readers to think it's a valid entry from Manifest. To avoid that confusion 75 // introducing VersionEditParams to simply carry around multiple VersionEdit 76 // params. It need not point to a valid record in Manifest. 77 using VersionEditParams = VersionEdit; 78 79 // Return the smallest index i such that file_level.files[i]->largest >= key. 80 // Return file_level.num_files if there is no such file. 81 // REQUIRES: "file_level.files" contains a sorted list of 82 // non-overlapping files. 83 extern int FindFile(const InternalKeyComparator& icmp, 84 const LevelFilesBrief& file_level, const Slice& key); 85 86 // Returns true iff some file in "files" overlaps the user key range 87 // [*smallest,*largest]. 88 // smallest==nullptr represents a key smaller than all keys in the DB. 89 // largest==nullptr represents a key largest than all keys in the DB. 90 // REQUIRES: If disjoint_sorted_files, file_level.files[] 91 // contains disjoint ranges in sorted order. 92 extern bool SomeFileOverlapsRange(const InternalKeyComparator& icmp, 93 bool disjoint_sorted_files, 94 const LevelFilesBrief& file_level, 95 const Slice* smallest_user_key, 96 const Slice* largest_user_key); 97 98 // Generate LevelFilesBrief from vector<FdWithKeyRange*> 99 // Would copy smallest_key and largest_key data to sequential memory 100 // arena: Arena used to allocate the memory 101 extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, 102 const std::vector<FileMetaData*>& files, 103 Arena* arena); 104 105 // Information of the storage associated with each Version, including number of 106 // levels of LSM tree, files information at each level, files marked for 107 // compaction, blob files, etc. 108 class VersionStorageInfo { 109 public: 110 VersionStorageInfo(const InternalKeyComparator* internal_comparator, 111 const Comparator* user_comparator, int num_levels, 112 CompactionStyle compaction_style, 113 VersionStorageInfo* src_vstorage, 114 bool _force_consistency_checks); 115 // No copying allowed 116 VersionStorageInfo(const VersionStorageInfo&) = delete; 117 void operator=(const VersionStorageInfo&) = delete; 118 ~VersionStorageInfo(); 119 Reserve(int level,size_t size)120 void Reserve(int level, size_t size) { files_[level].reserve(size); } 121 122 void AddFile(int level, FileMetaData* f, Logger* info_log = nullptr); 123 124 void AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta); 125 126 void SetFinalized(); 127 128 // Update num_non_empty_levels_. 129 void UpdateNumNonEmptyLevels(); 130 GenerateFileIndexer()131 void GenerateFileIndexer() { 132 file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); 133 } 134 135 // Update the accumulated stats from a file-meta. 136 void UpdateAccumulatedStats(FileMetaData* file_meta); 137 138 // Decrease the current stat from a to-be-deleted file-meta 139 void RemoveCurrentStats(FileMetaData* file_meta); 140 141 void ComputeCompensatedSizes(); 142 143 // Updates internal structures that keep track of compaction scores 144 // We use compaction scores to figure out which compaction to do next 145 // REQUIRES: db_mutex held!! 146 // TODO find a better way to pass compaction_options_fifo. 147 void ComputeCompactionScore(const ImmutableCFOptions& immutable_cf_options, 148 const MutableCFOptions& mutable_cf_options); 149 150 // Estimate est_comp_needed_bytes_ 151 void EstimateCompactionBytesNeeded( 152 const MutableCFOptions& mutable_cf_options); 153 154 // This computes files_marked_for_compaction_ and is called by 155 // ComputeCompactionScore() 156 void ComputeFilesMarkedForCompaction(); 157 158 // This computes ttl_expired_files_ and is called by 159 // ComputeCompactionScore() 160 void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions, 161 const uint64_t ttl); 162 163 // This computes files_marked_for_periodic_compaction_ and is called by 164 // ComputeCompactionScore() 165 void ComputeFilesMarkedForPeriodicCompaction( 166 const ImmutableCFOptions& ioptions, 167 const uint64_t periodic_compaction_seconds); 168 169 // This computes bottommost_files_marked_for_compaction_ and is called by 170 // ComputeCompactionScore() or UpdateOldestSnapshot(). 171 // 172 // Among bottommost files (assumes they've already been computed), marks the 173 // ones that have keys that would be eliminated if recompacted, according to 174 // the seqnum of the oldest existing snapshot. Must be called every time 175 // oldest snapshot changes as that is when bottom-level files can become 176 // eligible for compaction. 177 // 178 // REQUIRES: DB mutex held 179 void ComputeBottommostFilesMarkedForCompaction(); 180 181 // Generate level_files_brief_ from files_ 182 void GenerateLevelFilesBrief(); 183 // Sort all files for this version based on their file size and 184 // record results in files_by_compaction_pri_. The largest files are listed 185 // first. 186 void UpdateFilesByCompactionPri(CompactionPri compaction_pri); 187 188 void GenerateLevel0NonOverlapping(); level0_non_overlapping()189 bool level0_non_overlapping() const { 190 return level0_non_overlapping_; 191 } 192 193 // Check whether each file in this version is bottommost (i.e., nothing in its 194 // key-range could possibly exist in an older file/level). 195 // REQUIRES: This version has not been saved 196 void GenerateBottommostFiles(); 197 198 // Updates the oldest snapshot and related internal state, like the bottommost 199 // files marked for compaction. 200 // REQUIRES: DB mutex held 201 void UpdateOldestSnapshot(SequenceNumber oldest_snapshot_seqnum); 202 203 int MaxInputLevel() const; 204 int MaxOutputLevel(bool allow_ingest_behind) const; 205 206 // Return level number that has idx'th highest score CompactionScoreLevel(int idx)207 int CompactionScoreLevel(int idx) const { return compaction_level_[idx]; } 208 209 // Return idx'th highest score CompactionScore(int idx)210 double CompactionScore(int idx) const { return compaction_score_[idx]; } 211 212 void GetOverlappingInputs( 213 int level, const InternalKey* begin, // nullptr means before all keys 214 const InternalKey* end, // nullptr means after all keys 215 std::vector<FileMetaData*>* inputs, 216 int hint_index = -1, // index of overlap file 217 int* file_index = nullptr, // return index of overlap file 218 bool expand_range = true, // if set, returns files which overlap the 219 // range and overlap each other. If false, 220 // then just files intersecting the range 221 InternalKey** next_smallest = nullptr) // if non-null, returns the 222 const; // smallest key of next file not included 223 void GetCleanInputsWithinInterval( 224 int level, const InternalKey* begin, // nullptr means before all keys 225 const InternalKey* end, // nullptr means after all keys 226 std::vector<FileMetaData*>* inputs, 227 int hint_index = -1, // index of overlap file 228 int* file_index = nullptr) // return index of overlap file 229 const; 230 231 void GetOverlappingInputsRangeBinarySearch( 232 int level, // level > 0 233 const InternalKey* begin, // nullptr means before all keys 234 const InternalKey* end, // nullptr means after all keys 235 std::vector<FileMetaData*>* inputs, 236 int hint_index, // index of overlap file 237 int* file_index, // return index of overlap file 238 bool within_interval = false, // if set, force the inputs within interval 239 InternalKey** next_smallest = nullptr) // if non-null, returns the 240 const; // smallest key of next file not included 241 242 // Returns true iff some file in the specified level overlaps 243 // some part of [*smallest_user_key,*largest_user_key]. 244 // smallest_user_key==NULL represents a key smaller than all keys in the DB. 245 // largest_user_key==NULL represents a key largest than all keys in the DB. 246 bool OverlapInLevel(int level, const Slice* smallest_user_key, 247 const Slice* largest_user_key); 248 249 // Returns true iff the first or last file in inputs contains 250 // an overlapping user key to the file "just outside" of it (i.e. 251 // just after the last file, or just before the first file) 252 // REQUIRES: "*inputs" is a sorted list of non-overlapping files 253 bool HasOverlappingUserKey(const std::vector<FileMetaData*>* inputs, 254 int level); 255 num_levels()256 int num_levels() const { return num_levels_; } 257 258 // REQUIRES: This version has been saved (see VersionSet::SaveTo) num_non_empty_levels()259 int num_non_empty_levels() const { 260 assert(finalized_); 261 return num_non_empty_levels_; 262 } 263 264 // REQUIRES: This version has been finalized. 265 // (CalculateBaseBytes() is called) 266 // This may or may not return number of level files. It is to keep backward 267 // compatible behavior in universal compaction. l0_delay_trigger_count()268 int l0_delay_trigger_count() const { return l0_delay_trigger_count_; } 269 set_l0_delay_trigger_count(int v)270 void set_l0_delay_trigger_count(int v) { l0_delay_trigger_count_ = v; } 271 272 // REQUIRES: This version has been saved (see VersionSet::SaveTo) NumLevelFiles(int level)273 int NumLevelFiles(int level) const { 274 assert(finalized_); 275 return static_cast<int>(files_[level].size()); 276 } 277 278 // Return the combined file size of all files at the specified level. 279 uint64_t NumLevelBytes(int level) const; 280 281 // REQUIRES: This version has been saved (see VersionSet::SaveTo) LevelFiles(int level)282 const std::vector<FileMetaData*>& LevelFiles(int level) const { 283 return files_[level]; 284 } 285 286 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 287 using BlobFiles = std::map<uint64_t, std::shared_ptr<BlobFileMetaData>>; GetBlobFiles()288 const BlobFiles& GetBlobFiles() const { return blob_files_; } 289 LevelFilesBrief(int level)290 const ROCKSDB_NAMESPACE::LevelFilesBrief& LevelFilesBrief(int level) const { 291 assert(level < static_cast<int>(level_files_brief_.size())); 292 return level_files_brief_[level]; 293 } 294 295 // REQUIRES: This version has been saved (see VersionSet::SaveTo) FilesByCompactionPri(int level)296 const std::vector<int>& FilesByCompactionPri(int level) const { 297 assert(finalized_); 298 return files_by_compaction_pri_[level]; 299 } 300 301 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 302 // REQUIRES: DB mutex held during access FilesMarkedForCompaction()303 const autovector<std::pair<int, FileMetaData*>>& FilesMarkedForCompaction() 304 const { 305 assert(finalized_); 306 return files_marked_for_compaction_; 307 } 308 309 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 310 // REQUIRES: DB mutex held during access ExpiredTtlFiles()311 const autovector<std::pair<int, FileMetaData*>>& ExpiredTtlFiles() const { 312 assert(finalized_); 313 return expired_ttl_files_; 314 } 315 316 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 317 // REQUIRES: DB mutex held during access 318 const autovector<std::pair<int, FileMetaData*>>& FilesMarkedForPeriodicCompaction()319 FilesMarkedForPeriodicCompaction() const { 320 assert(finalized_); 321 return files_marked_for_periodic_compaction_; 322 } 323 TEST_AddFileMarkedForPeriodicCompaction(int level,FileMetaData * f)324 void TEST_AddFileMarkedForPeriodicCompaction(int level, FileMetaData* f) { 325 files_marked_for_periodic_compaction_.emplace_back(level, f); 326 } 327 328 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 329 // REQUIRES: DB mutex held during access 330 const autovector<std::pair<int, FileMetaData*>>& BottommostFilesMarkedForCompaction()331 BottommostFilesMarkedForCompaction() const { 332 assert(finalized_); 333 return bottommost_files_marked_for_compaction_; 334 } 335 base_level()336 int base_level() const { return base_level_; } level_multiplier()337 double level_multiplier() const { return level_multiplier_; } 338 339 // REQUIRES: lock is held 340 // Set the index that is used to offset into files_by_compaction_pri_ to find 341 // the next compaction candidate file. SetNextCompactionIndex(int level,int index)342 void SetNextCompactionIndex(int level, int index) { 343 next_file_to_compact_by_size_[level] = index; 344 } 345 346 // REQUIRES: lock is held NextCompactionIndex(int level)347 int NextCompactionIndex(int level) const { 348 return next_file_to_compact_by_size_[level]; 349 } 350 351 // REQUIRES: This version has been saved (see VersionSet::SaveTo) file_indexer()352 const FileIndexer& file_indexer() const { 353 assert(finalized_); 354 return file_indexer_; 355 } 356 357 // Only the first few entries of files_by_compaction_pri_ are sorted. 358 // There is no need to sort all the files because it is likely 359 // that on a running system, we need to look at only the first 360 // few largest files because a new version is created every few 361 // seconds/minutes (because of concurrent compactions). 362 static const size_t kNumberFilesToSort = 50; 363 364 // Return a human-readable short (single-line) summary of the number 365 // of files per level. Uses *scratch as backing store. 366 struct LevelSummaryStorage { 367 char buffer[1000]; 368 }; 369 struct FileSummaryStorage { 370 char buffer[3000]; 371 }; 372 const char* LevelSummary(LevelSummaryStorage* scratch) const; 373 // Return a human-readable short (single-line) summary of files 374 // in a specified level. Uses *scratch as backing store. 375 const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; 376 377 // Return the maximum overlapping data (in bytes) at next level for any 378 // file at a level >= 1. 379 int64_t MaxNextLevelOverlappingBytes(); 380 381 // Return a human readable string that describes this version's contents. 382 std::string DebugString(bool hex = false) const; 383 GetAverageValueSize()384 uint64_t GetAverageValueSize() const { 385 if (accumulated_num_non_deletions_ == 0) { 386 return 0; 387 } 388 assert(accumulated_raw_key_size_ + accumulated_raw_value_size_ > 0); 389 assert(accumulated_file_size_ > 0); 390 return accumulated_raw_value_size_ / accumulated_num_non_deletions_ * 391 accumulated_file_size_ / 392 (accumulated_raw_key_size_ + accumulated_raw_value_size_); 393 } 394 395 uint64_t GetEstimatedActiveKeys() const; 396 397 double GetEstimatedCompressionRatioAtLevel(int level) const; 398 399 // re-initializes the index that is used to offset into 400 // files_by_compaction_pri_ 401 // to find the next compaction candidate file. ResetNextCompactionIndex(int level)402 void ResetNextCompactionIndex(int level) { 403 next_file_to_compact_by_size_[level] = 0; 404 } 405 InternalComparator()406 const InternalKeyComparator* InternalComparator() { 407 return internal_comparator_; 408 } 409 410 // Returns maximum total bytes of data on a given level. 411 uint64_t MaxBytesForLevel(int level) const; 412 413 // Must be called after any change to MutableCFOptions. 414 void CalculateBaseBytes(const ImmutableCFOptions& ioptions, 415 const MutableCFOptions& options); 416 417 // Returns an estimate of the amount of live data in bytes. 418 uint64_t EstimateLiveDataSize() const; 419 estimated_compaction_needed_bytes()420 uint64_t estimated_compaction_needed_bytes() const { 421 return estimated_compaction_needed_bytes_; 422 } 423 TEST_set_estimated_compaction_needed_bytes(uint64_t v)424 void TEST_set_estimated_compaction_needed_bytes(uint64_t v) { 425 estimated_compaction_needed_bytes_ = v; 426 } 427 force_consistency_checks()428 bool force_consistency_checks() const { return force_consistency_checks_; } 429 bottommost_files_mark_threshold()430 SequenceNumber bottommost_files_mark_threshold() const { 431 return bottommost_files_mark_threshold_; 432 } 433 434 // Returns whether any key in [`smallest_key`, `largest_key`] could appear in 435 // an older L0 file than `last_l0_idx` or in a greater level than `last_level` 436 // 437 // @param last_level Level after which we check for overlap 438 // @param last_l0_idx If `last_level == 0`, index of L0 file after which we 439 // check for overlap; otherwise, must be -1 440 bool RangeMightExistAfterSortedRun(const Slice& smallest_user_key, 441 const Slice& largest_user_key, 442 int last_level, int last_l0_idx); 443 444 private: 445 const InternalKeyComparator* internal_comparator_; 446 const Comparator* user_comparator_; 447 int num_levels_; // Number of levels 448 int num_non_empty_levels_; // Number of levels. Any level larger than it 449 // is guaranteed to be empty. 450 // Per-level max bytes 451 std::vector<uint64_t> level_max_bytes_; 452 453 // A short brief metadata of files per level 454 autovector<ROCKSDB_NAMESPACE::LevelFilesBrief> level_files_brief_; 455 FileIndexer file_indexer_; 456 Arena arena_; // Used to allocate space for file_levels_ 457 458 CompactionStyle compaction_style_; 459 460 // List of files per level, files in each level are arranged 461 // in increasing order of keys 462 std::vector<FileMetaData*>* files_; 463 464 // Map of blob files in version by number. 465 BlobFiles blob_files_; 466 467 // Level that L0 data should be compacted to. All levels < base_level_ should 468 // be empty. -1 if it is not level-compaction so it's not applicable. 469 int base_level_; 470 471 double level_multiplier_; 472 473 // A list for the same set of files that are stored in files_, 474 // but files in each level are now sorted based on file 475 // size. The file with the largest size is at the front. 476 // This vector stores the index of the file from files_. 477 std::vector<std::vector<int>> files_by_compaction_pri_; 478 479 // If true, means that files in L0 have keys with non overlapping ranges 480 bool level0_non_overlapping_; 481 482 // An index into files_by_compaction_pri_ that specifies the first 483 // file that is not yet compacted 484 std::vector<int> next_file_to_compact_by_size_; 485 486 // Only the first few entries of files_by_compaction_pri_ are sorted. 487 // There is no need to sort all the files because it is likely 488 // that on a running system, we need to look at only the first 489 // few largest files because a new version is created every few 490 // seconds/minutes (because of concurrent compactions). 491 static const size_t number_of_files_to_sort_ = 50; 492 493 // This vector contains list of files marked for compaction and also not 494 // currently being compacted. It is protected by DB mutex. It is calculated in 495 // ComputeCompactionScore() 496 autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_; 497 498 autovector<std::pair<int, FileMetaData*>> expired_ttl_files_; 499 500 autovector<std::pair<int, FileMetaData*>> 501 files_marked_for_periodic_compaction_; 502 503 // These files are considered bottommost because none of their keys can exist 504 // at lower levels. They are not necessarily all in the same level. The marked 505 // ones are eligible for compaction because they contain duplicate key 506 // versions that are no longer protected by snapshot. These variables are 507 // protected by DB mutex and are calculated in `GenerateBottommostFiles()` and 508 // `ComputeBottommostFilesMarkedForCompaction()`. 509 autovector<std::pair<int, FileMetaData*>> bottommost_files_; 510 autovector<std::pair<int, FileMetaData*>> 511 bottommost_files_marked_for_compaction_; 512 513 // Threshold for needing to mark another bottommost file. Maintain it so we 514 // can quickly check when releasing a snapshot whether more bottommost files 515 // became eligible for compaction. It's defined as the min of the max nonzero 516 // seqnums of unmarked bottommost files. 517 SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber; 518 519 // Monotonically increases as we release old snapshots. Zero indicates no 520 // snapshots have been released yet. When no snapshots remain we set it to the 521 // current seqnum, which needs to be protected as a snapshot can still be 522 // created that references it. 523 SequenceNumber oldest_snapshot_seqnum_ = 0; 524 525 // Level that should be compacted next and its compaction score. 526 // Score < 1 means compaction is not strictly needed. These fields 527 // are initialized by Finalize(). 528 // The most critical level to be compacted is listed first 529 // These are used to pick the best compaction level 530 std::vector<double> compaction_score_; 531 std::vector<int> compaction_level_; 532 int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop 533 // for number of L0 files. 534 535 // the following are the sampled temporary stats. 536 // the current accumulated size of sampled files. 537 uint64_t accumulated_file_size_; 538 // the current accumulated size of all raw keys based on the sampled files. 539 uint64_t accumulated_raw_key_size_; 540 // the current accumulated size of all raw keys based on the sampled files. 541 uint64_t accumulated_raw_value_size_; 542 // total number of non-deletion entries 543 uint64_t accumulated_num_non_deletions_; 544 // total number of deletion entries 545 uint64_t accumulated_num_deletions_; 546 // current number of non_deletion entries 547 uint64_t current_num_non_deletions_; 548 // current number of deletion entries 549 uint64_t current_num_deletions_; 550 // current number of file samples 551 uint64_t current_num_samples_; 552 // Estimated bytes needed to be compacted until all levels' size is down to 553 // target sizes. 554 uint64_t estimated_compaction_needed_bytes_; 555 556 bool finalized_; 557 558 // If set to true, we will run consistency checks even if RocksDB 559 // is compiled in release mode 560 bool force_consistency_checks_; 561 562 friend class Version; 563 friend class VersionSet; 564 }; 565 566 using MultiGetRange = MultiGetContext::Range; 567 // A column family's version consists of the table and blob files owned by 568 // the column family at a certain point in time. 569 class Version { 570 public: 571 // Append to *iters a sequence of iterators that will 572 // yield the contents of this Version when merged together. 573 // REQUIRES: This version has been saved (see VersionSet::SaveTo) 574 void AddIterators(const ReadOptions&, const FileOptions& soptions, 575 MergeIteratorBuilder* merger_iter_builder, 576 RangeDelAggregator* range_del_agg); 577 578 void AddIteratorsForLevel(const ReadOptions&, const FileOptions& soptions, 579 MergeIteratorBuilder* merger_iter_builder, 580 int level, RangeDelAggregator* range_del_agg); 581 582 Status OverlapWithLevelIterator(const ReadOptions&, const FileOptions&, 583 const Slice& smallest_user_key, 584 const Slice& largest_user_key, 585 int level, bool* overlap); 586 587 // Lookup the value for key or get all merge operands for key. 588 // If do_merge = true (default) then lookup value for key. 589 // Behavior if do_merge = true: 590 // If found, store it in *value and 591 // return OK. Else return a non-OK status. 592 // Uses *operands to store merge_operator operations to apply later. 593 // 594 // If the ReadOptions.read_tier is set to do a read-only fetch, then 595 // *value_found will be set to false if it cannot be determined whether 596 // this value exists without doing IO. 597 // 598 // If the key is Deleted, *status will be set to NotFound and 599 // *key_exists will be set to true. 600 // If no key was found, *status will be set to NotFound and 601 // *key_exists will be set to false. 602 // If seq is non-null, *seq will be set to the sequence number found 603 // for the key if a key was found. 604 // Behavior if do_merge = false 605 // If the key has any merge operands then store them in 606 // merge_context.operands_list and don't merge the operands 607 // REQUIRES: lock is not held 608 void Get(const ReadOptions&, const LookupKey& key, PinnableSlice* value, 609 std::string* timestamp, Status* status, MergeContext* merge_context, 610 SequenceNumber* max_covering_tombstone_seq, 611 bool* value_found = nullptr, bool* key_exists = nullptr, 612 SequenceNumber* seq = nullptr, ReadCallback* callback = nullptr, 613 bool* is_blob = nullptr, bool do_merge = true); 614 615 void MultiGet(const ReadOptions&, MultiGetRange* range, 616 ReadCallback* callback = nullptr, bool* is_blob = nullptr); 617 618 // Loads some stats information from files. Call without mutex held. It needs 619 // to be called before applying the version to the version set. 620 void PrepareApply(const MutableCFOptions& mutable_cf_options, 621 bool update_stats); 622 623 // Reference count management (so Versions do not disappear out from 624 // under live iterators) 625 void Ref(); 626 // Decrease reference count. Delete the object if no reference left 627 // and return true. Otherwise, return false. 628 bool Unref(); 629 630 // Add all files listed in the current version to *live. 631 void AddLiveFiles(std::vector<FileDescriptor>* live); 632 633 // Return a human readable string that describes this version's contents. 634 std::string DebugString(bool hex = false, bool print_stats = false) const; 635 636 // Returns the version number of this version GetVersionNumber()637 uint64_t GetVersionNumber() const { return version_number_; } 638 639 // REQUIRES: lock is held 640 // On success, "tp" will contains the table properties of the file 641 // specified in "file_meta". If the file name of "file_meta" is 642 // known ahead, passing it by a non-null "fname" can save a 643 // file-name conversion. 644 Status GetTableProperties(std::shared_ptr<const TableProperties>* tp, 645 const FileMetaData* file_meta, 646 const std::string* fname = nullptr) const; 647 648 // REQUIRES: lock is held 649 // On success, *props will be populated with all SSTables' table properties. 650 // The keys of `props` are the sst file name, the values of `props` are the 651 // tables' properties, represented as std::shared_ptr. 652 Status GetPropertiesOfAllTables(TablePropertiesCollection* props); 653 Status GetPropertiesOfAllTables(TablePropertiesCollection* props, int level); 654 Status GetPropertiesOfTablesInRange(const Range* range, std::size_t n, 655 TablePropertiesCollection* props) const; 656 657 // Print summary of range delete tombstones in SST files into out_str, 658 // with maximum max_entries_to_print entries printed out. 659 Status TablesRangeTombstoneSummary(int max_entries_to_print, 660 std::string* out_str); 661 662 // REQUIRES: lock is held 663 // On success, "tp" will contains the aggregated table property among 664 // the table properties of all sst files in this version. 665 Status GetAggregatedTableProperties( 666 std::shared_ptr<const TableProperties>* tp, int level = -1); 667 GetEstimatedActiveKeys()668 uint64_t GetEstimatedActiveKeys() { 669 return storage_info_.GetEstimatedActiveKeys(); 670 } 671 672 size_t GetMemoryUsageByTableReaders(); 673 cfd()674 ColumnFamilyData* cfd() const { return cfd_; } 675 676 // Return the next Version in the linked list. Used for debug only TEST_Next()677 Version* TEST_Next() const { 678 return next_; 679 } 680 TEST_refs()681 int TEST_refs() const { return refs_; } 682 storage_info()683 VersionStorageInfo* storage_info() { return &storage_info_; } 684 version_set()685 VersionSet* version_set() { return vset_; } 686 687 void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta); 688 689 uint64_t GetSstFilesSize(); 690 691 // Retrieves the file_creation_time of the oldest file in the DB. 692 // Prerequisite for this API is max_open_files = -1 693 void GetCreationTimeOfOldestFile(uint64_t* creation_time); 694 GetMutableCFOptions()695 const MutableCFOptions& GetMutableCFOptions() { return mutable_cf_options_; } 696 697 private: 698 Env* env_; 699 FileSystem* fs_; 700 friend class ReactiveVersionSet; 701 friend class VersionSet; 702 friend class VersionEditHandler; 703 friend class VersionEditHandlerPointInTime; 704 internal_comparator()705 const InternalKeyComparator* internal_comparator() const { 706 return storage_info_.internal_comparator_; 707 } user_comparator()708 const Comparator* user_comparator() const { 709 return storage_info_.user_comparator_; 710 } 711 712 // Returns true if the filter blocks in the specified level will not be 713 // checked during read operations. In certain cases (trivial move or preload), 714 // the filter block may already be cached, but we still do not access it such 715 // that it eventually expires from the cache. 716 bool IsFilterSkipped(int level, bool is_file_last_in_level = false); 717 718 // The helper function of UpdateAccumulatedStats, which may fill the missing 719 // fields of file_meta from its associated TableProperties. 720 // Returns true if it does initialize FileMetaData. 721 bool MaybeInitializeFileMetaData(FileMetaData* file_meta); 722 723 // Update the accumulated stats associated with the current version. 724 // This accumulated stats will be used in compaction. 725 void UpdateAccumulatedStats(bool update_stats); 726 727 // Sort all files for this version based on their file size and 728 // record results in files_by_compaction_pri_. The largest files are listed 729 // first. 730 void UpdateFilesByCompactionPri(); 731 732 ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs 733 Logger* info_log_; 734 Statistics* db_statistics_; 735 TableCache* table_cache_; 736 const MergeOperator* merge_operator_; 737 738 VersionStorageInfo storage_info_; 739 VersionSet* vset_; // VersionSet to which this Version belongs 740 Version* next_; // Next version in linked list 741 Version* prev_; // Previous version in linked list 742 int refs_; // Number of live refs to this version 743 const FileOptions file_options_; 744 const MutableCFOptions mutable_cf_options_; 745 746 // A version number that uniquely represents this version. This is 747 // used for debugging and logging purposes only. 748 uint64_t version_number_; 749 750 Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt, 751 MutableCFOptions mutable_cf_options, uint64_t version_number = 0); 752 753 ~Version(); 754 755 // No copying allowed 756 Version(const Version&) = delete; 757 void operator=(const Version&) = delete; 758 }; 759 760 struct ObsoleteFileInfo { 761 FileMetaData* metadata; 762 std::string path; 763 ObsoleteFileInfoObsoleteFileInfo764 ObsoleteFileInfo() noexcept : metadata(nullptr) {} ObsoleteFileInfoObsoleteFileInfo765 ObsoleteFileInfo(FileMetaData* f, const std::string& file_path) 766 : metadata(f), path(file_path) {} 767 768 ObsoleteFileInfo(const ObsoleteFileInfo&) = delete; 769 ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete; 770 ObsoleteFileInfoObsoleteFileInfo771 ObsoleteFileInfo(ObsoleteFileInfo&& rhs) noexcept : 772 ObsoleteFileInfo() { 773 *this = std::move(rhs); 774 } 775 776 ObsoleteFileInfo& operator=(ObsoleteFileInfo&& rhs) noexcept { 777 path = std::move(rhs.path); 778 metadata = rhs.metadata; 779 rhs.metadata = nullptr; 780 781 return *this; 782 } 783 DeleteMetadataObsoleteFileInfo784 void DeleteMetadata() { 785 delete metadata; 786 metadata = nullptr; 787 } 788 }; 789 790 class BaseReferencedVersionBuilder; 791 792 class AtomicGroupReadBuffer { 793 public: 794 Status AddEdit(VersionEdit* edit); 795 void Clear(); 796 bool IsFull() const; 797 bool IsEmpty() const; 798 TEST_read_edits_in_atomic_group()799 uint64_t TEST_read_edits_in_atomic_group() const { 800 return read_edits_in_atomic_group_; 801 } replay_buffer()802 std::vector<VersionEdit>& replay_buffer() { return replay_buffer_; } 803 804 private: 805 uint64_t read_edits_in_atomic_group_ = 0; 806 std::vector<VersionEdit> replay_buffer_; 807 }; 808 809 // VersionSet is the collection of versions of all the column families of the 810 // database. Each database owns one VersionSet. A VersionSet has access to all 811 // column families via ColumnFamilySet, i.e. set of the column families. 812 class VersionSet { 813 public: 814 VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, 815 const FileOptions& file_options, Cache* table_cache, 816 WriteBufferManager* write_buffer_manager, 817 WriteController* write_controller, 818 BlockCacheTracer* const block_cache_tracer); 819 // No copying allowed 820 VersionSet(const VersionSet&) = delete; 821 void operator=(const VersionSet&) = delete; 822 823 virtual ~VersionSet(); 824 825 // Apply *edit to the current version to form a new descriptor that 826 // is both saved to persistent state and installed as the new 827 // current version. Will release *mu while actually writing to the file. 828 // column_family_options has to be set if edit is column family add 829 // REQUIRES: *mu is held on entry. 830 // REQUIRES: no other thread concurrently calls LogAndApply() 831 Status LogAndApply( 832 ColumnFamilyData* column_family_data, 833 const MutableCFOptions& mutable_cf_options, VersionEdit* edit, 834 InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, 835 bool new_descriptor_log = false, 836 const ColumnFamilyOptions* column_family_options = nullptr) { 837 autovector<ColumnFamilyData*> cfds; 838 cfds.emplace_back(column_family_data); 839 autovector<const MutableCFOptions*> mutable_cf_options_list; 840 mutable_cf_options_list.emplace_back(&mutable_cf_options); 841 autovector<autovector<VersionEdit*>> edit_lists; 842 autovector<VersionEdit*> edit_list; 843 edit_list.emplace_back(edit); 844 edit_lists.emplace_back(edit_list); 845 return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, 846 db_directory, new_descriptor_log, column_family_options); 847 } 848 // The batch version. If edit_list.size() > 1, caller must ensure that 849 // no edit in the list column family add or drop 850 Status LogAndApply( 851 ColumnFamilyData* column_family_data, 852 const MutableCFOptions& mutable_cf_options, 853 const autovector<VersionEdit*>& edit_list, InstrumentedMutex* mu, 854 FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, 855 const ColumnFamilyOptions* column_family_options = nullptr) { 856 autovector<ColumnFamilyData*> cfds; 857 cfds.emplace_back(column_family_data); 858 autovector<const MutableCFOptions*> mutable_cf_options_list; 859 mutable_cf_options_list.emplace_back(&mutable_cf_options); 860 autovector<autovector<VersionEdit*>> edit_lists; 861 edit_lists.emplace_back(edit_list); 862 return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, 863 db_directory, new_descriptor_log, column_family_options); 864 } 865 866 // The across-multi-cf batch version. If edit_lists contain more than 867 // 1 version edits, caller must ensure that no edit in the []list is column 868 // family manipulation. 869 virtual Status LogAndApply( 870 const autovector<ColumnFamilyData*>& cfds, 871 const autovector<const MutableCFOptions*>& mutable_cf_options_list, 872 const autovector<autovector<VersionEdit*>>& edit_lists, 873 InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, 874 bool new_descriptor_log = false, 875 const ColumnFamilyOptions* new_cf_options = nullptr); 876 877 static Status GetCurrentManifestPath(const std::string& dbname, 878 FileSystem* fs, 879 std::string* manifest_filename, 880 uint64_t* manifest_file_number); 881 882 // Recover the last saved descriptor from persistent storage. 883 // If read_only == true, Recover() will not complain if some column families 884 // are not opened 885 Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families, 886 bool read_only = false, std::string* db_id = nullptr); 887 888 Status TryRecover(const std::vector<ColumnFamilyDescriptor>& column_families, 889 bool read_only, std::string* db_id, 890 bool* has_missing_table_file); 891 892 // Try to recover the version set to the most recent consistent state 893 // recorded in the specified manifest. 894 Status TryRecoverFromOneManifest( 895 const std::string& manifest_path, 896 const std::vector<ColumnFamilyDescriptor>& column_families, 897 bool read_only, std::string* db_id, bool* has_missing_table_file); 898 899 // Reads a manifest file and returns a list of column families in 900 // column_families. 901 static Status ListColumnFamilies(std::vector<std::string>* column_families, 902 const std::string& dbname, FileSystem* fs); 903 904 #ifndef ROCKSDB_LITE 905 // Try to reduce the number of levels. This call is valid when 906 // only one level from the new max level to the old 907 // max level containing files. 908 // The call is static, since number of levels is immutable during 909 // the lifetime of a RocksDB instance. It reduces number of levels 910 // in a DB by applying changes to manifest. 911 // For example, a db currently has 7 levels [0-6], and a call to 912 // to reduce to 5 [0-4] can only be executed when only one level 913 // among [4-6] contains files. 914 static Status ReduceNumberOfLevels(const std::string& dbname, 915 const Options* options, 916 const FileOptions& file_options, 917 int new_levels); 918 919 // Get the checksum information of all live files 920 Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list); 921 922 // printf contents (for debugging) 923 Status DumpManifest(Options& options, std::string& manifestFileName, 924 bool verbose, bool hex = false, bool json = false); 925 926 #endif // ROCKSDB_LITE 927 928 // Return the current manifest file number manifest_file_number()929 uint64_t manifest_file_number() const { return manifest_file_number_; } 930 options_file_number()931 uint64_t options_file_number() const { return options_file_number_; } 932 pending_manifest_file_number()933 uint64_t pending_manifest_file_number() const { 934 return pending_manifest_file_number_; 935 } 936 current_next_file_number()937 uint64_t current_next_file_number() const { return next_file_number_.load(); } 938 min_log_number_to_keep_2pc()939 uint64_t min_log_number_to_keep_2pc() const { 940 return min_log_number_to_keep_2pc_.load(); 941 } 942 943 // Allocate and return a new file number NewFileNumber()944 uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } 945 946 // Fetch And Add n new file number FetchAddFileNumber(uint64_t n)947 uint64_t FetchAddFileNumber(uint64_t n) { 948 return next_file_number_.fetch_add(n); 949 } 950 951 // Return the last sequence number. LastSequence()952 uint64_t LastSequence() const { 953 return last_sequence_.load(std::memory_order_acquire); 954 } 955 956 // Note: memory_order_acquire must be sufficient. LastAllocatedSequence()957 uint64_t LastAllocatedSequence() const { 958 return last_allocated_sequence_.load(std::memory_order_seq_cst); 959 } 960 961 // Note: memory_order_acquire must be sufficient. LastPublishedSequence()962 uint64_t LastPublishedSequence() const { 963 return last_published_sequence_.load(std::memory_order_seq_cst); 964 } 965 966 // Set the last sequence number to s. SetLastSequence(uint64_t s)967 void SetLastSequence(uint64_t s) { 968 assert(s >= last_sequence_); 969 // Last visible sequence must always be less than last written seq 970 assert(!db_options_->two_write_queues || s <= last_allocated_sequence_); 971 last_sequence_.store(s, std::memory_order_release); 972 } 973 974 // Note: memory_order_release must be sufficient SetLastPublishedSequence(uint64_t s)975 void SetLastPublishedSequence(uint64_t s) { 976 assert(s >= last_published_sequence_); 977 last_published_sequence_.store(s, std::memory_order_seq_cst); 978 } 979 980 // Note: memory_order_release must be sufficient SetLastAllocatedSequence(uint64_t s)981 void SetLastAllocatedSequence(uint64_t s) { 982 assert(s >= last_allocated_sequence_); 983 last_allocated_sequence_.store(s, std::memory_order_seq_cst); 984 } 985 986 // Note: memory_order_release must be sufficient FetchAddLastAllocatedSequence(uint64_t s)987 uint64_t FetchAddLastAllocatedSequence(uint64_t s) { 988 return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst); 989 } 990 991 // Mark the specified file number as used. 992 // REQUIRED: this is only called during single-threaded recovery or repair. 993 void MarkFileNumberUsed(uint64_t number); 994 995 // Mark the specified log number as deleted 996 // REQUIRED: this is only called during single-threaded recovery or repair, or 997 // from ::LogAndApply where the global mutex is held. 998 void MarkMinLogNumberToKeep2PC(uint64_t number); 999 1000 // Return the log file number for the log file that is currently 1001 // being compacted, or zero if there is no such log file. prev_log_number()1002 uint64_t prev_log_number() const { return prev_log_number_; } 1003 1004 // Returns the minimum log number which still has data not flushed to any SST 1005 // file. 1006 // In non-2PC mode, all the log numbers smaller than this number can be safely 1007 // deleted. MinLogNumberWithUnflushedData()1008 uint64_t MinLogNumberWithUnflushedData() const { 1009 return PreComputeMinLogNumberWithUnflushedData(nullptr); 1010 } 1011 // Returns the minimum log number which still has data not flushed to any SST 1012 // file, except data from `cfd_to_skip`. PreComputeMinLogNumberWithUnflushedData(const ColumnFamilyData * cfd_to_skip)1013 uint64_t PreComputeMinLogNumberWithUnflushedData( 1014 const ColumnFamilyData* cfd_to_skip) const { 1015 uint64_t min_log_num = std::numeric_limits<uint64_t>::max(); 1016 for (auto cfd : *column_family_set_) { 1017 if (cfd == cfd_to_skip) { 1018 continue; 1019 } 1020 // It's safe to ignore dropped column families here: 1021 // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. 1022 if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { 1023 min_log_num = cfd->GetLogNumber(); 1024 } 1025 } 1026 return min_log_num; 1027 } 1028 1029 // Create an iterator that reads over the compaction inputs for "*c". 1030 // The caller should delete the iterator when no longer needed. 1031 InternalIterator* MakeInputIterator( 1032 const Compaction* c, RangeDelAggregator* range_del_agg, 1033 const FileOptions& file_options_compactions); 1034 1035 // Add all files listed in any live version to *live. 1036 void AddLiveFiles(std::vector<FileDescriptor>* live_list); 1037 1038 // Return the approximate size of data to be scanned for range [start, end) 1039 // in levels [start_level, end_level). If end_level == -1 it will search 1040 // through all non-empty levels 1041 uint64_t ApproximateSize(const SizeApproximationOptions& options, Version* v, 1042 const Slice& start, const Slice& end, 1043 int start_level, int end_level, 1044 TableReaderCaller caller); 1045 1046 // Return the size of the current manifest file manifest_file_size()1047 uint64_t manifest_file_size() const { return manifest_file_size_; } 1048 1049 // verify that the files that we started with for a compaction 1050 // still exist in the current version and in the same original level. 1051 // This ensures that a concurrent compaction did not erroneously 1052 // pick the same files to compact. 1053 bool VerifyCompactionFileConsistency(Compaction* c); 1054 1055 Status GetMetadataForFile(uint64_t number, int* filelevel, 1056 FileMetaData** metadata, ColumnFamilyData** cfd); 1057 1058 // This function doesn't support leveldb SST filenames 1059 void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata); 1060 1061 void GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files, 1062 std::vector<std::string>* manifest_filenames, 1063 uint64_t min_pending_output); 1064 GetColumnFamilySet()1065 ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } file_options()1066 const FileOptions& file_options() { return file_options_; } ChangeFileOptions(const MutableDBOptions & new_options)1067 void ChangeFileOptions(const MutableDBOptions& new_options) { 1068 file_options_.writable_file_max_buffer_size = 1069 new_options.writable_file_max_buffer_size; 1070 } 1071 db_options()1072 const ImmutableDBOptions* db_options() const { return db_options_; } 1073 1074 static uint64_t GetNumLiveVersions(Version* dummy_versions); 1075 1076 static uint64_t GetTotalSstFilesSize(Version* dummy_versions); 1077 1078 // Get the IO Status returned by written Manifest. io_status()1079 IOStatus io_status() const { return io_status_; } 1080 1081 // Set the IO Status to OK. Called before Manifest write if needed. SetIOStatusOK()1082 void SetIOStatusOK() { io_status_ = IOStatus::OK(); } 1083 1084 protected: 1085 struct ManifestWriter; 1086 1087 friend class Version; 1088 friend class VersionEditHandler; 1089 friend class VersionEditHandlerPointInTime; 1090 friend class DBImpl; 1091 friend class DBImplReadOnly; 1092 1093 struct LogReporter : public log::Reader::Reporter { 1094 Status* status; CorruptionLogReporter1095 virtual void Corruption(size_t /*bytes*/, const Status& s) override { 1096 if (this->status->ok()) *this->status = s; 1097 } 1098 }; 1099 1100 void Reset(); 1101 1102 // Returns approximated offset of a key in a file for a given version. 1103 uint64_t ApproximateOffsetOf(Version* v, const FdWithKeyRange& f, 1104 const Slice& key, TableReaderCaller caller); 1105 1106 // Returns approximated data size between start and end keys in a file 1107 // for a given version. 1108 uint64_t ApproximateSize(Version* v, const FdWithKeyRange& f, 1109 const Slice& start, const Slice& end, 1110 TableReaderCaller caller); 1111 1112 struct MutableCFState { 1113 uint64_t log_number; 1114 }; 1115 1116 // Save current contents to *log 1117 Status WriteCurrentStateToManifest( 1118 const std::unordered_map<uint32_t, MutableCFState>& curr_state, 1119 log::Writer* log); 1120 1121 void AppendVersion(ColumnFamilyData* column_family_data, Version* v); 1122 1123 ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, 1124 const VersionEdit* edit); 1125 1126 Status ReadAndRecover( 1127 log::Reader* reader, AtomicGroupReadBuffer* read_buffer, 1128 const std::unordered_map<std::string, ColumnFamilyOptions>& 1129 name_to_options, 1130 std::unordered_map<int, std::string>& column_families_not_found, 1131 std::unordered_map< 1132 uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders, 1133 VersionEditParams* version_edit, std::string* db_id = nullptr); 1134 1135 // REQUIRES db mutex 1136 Status ApplyOneVersionEditToBuilder( 1137 VersionEdit& edit, 1138 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_opts, 1139 std::unordered_map<int, std::string>& column_families_not_found, 1140 std::unordered_map< 1141 uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders, 1142 VersionEditParams* version_edit); 1143 1144 Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, 1145 const VersionEdit& from_edit, 1146 VersionEditParams* version_edit_params); 1147 1148 Status VerifyFileMetadata(const std::string& fpath, 1149 const FileMetaData& meta) const; 1150 1151 std::unique_ptr<ColumnFamilySet> column_family_set_; 1152 Env* const env_; 1153 FileSystem* const fs_; 1154 const std::string dbname_; 1155 std::string db_id_; 1156 const ImmutableDBOptions* const db_options_; 1157 std::atomic<uint64_t> next_file_number_; 1158 // Any log number equal or lower than this should be ignored during recovery, 1159 // and is qualified for being deleted in 2PC mode. In non-2PC mode, this 1160 // number is ignored. 1161 std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0}; 1162 uint64_t manifest_file_number_; 1163 uint64_t options_file_number_; 1164 uint64_t pending_manifest_file_number_; 1165 // The last seq visible to reads. It normally indicates the last sequence in 1166 // the memtable but when using two write queues it could also indicate the 1167 // last sequence in the WAL visible to reads. 1168 std::atomic<uint64_t> last_sequence_; 1169 // The last seq that is already allocated. It is applicable only when we have 1170 // two write queues. In that case seq might or might not have appreated in 1171 // memtable but it is expected to appear in the WAL. 1172 // We have last_sequence <= last_allocated_sequence_ 1173 std::atomic<uint64_t> last_allocated_sequence_; 1174 // The last allocated sequence that is also published to the readers. This is 1175 // applicable only when last_seq_same_as_publish_seq_ is not set. Otherwise 1176 // last_sequence_ also indicates the last published seq. 1177 // We have last_sequence <= last_published_sequence_ <= 1178 // last_allocated_sequence_ 1179 std::atomic<uint64_t> last_published_sequence_; 1180 uint64_t prev_log_number_; // 0 or backing store for memtable being compacted 1181 1182 // Opened lazily 1183 std::unique_ptr<log::Writer> descriptor_log_; 1184 1185 // generates a increasing version number for every new version 1186 uint64_t current_version_number_; 1187 1188 // Queue of writers to the manifest file 1189 std::deque<ManifestWriter*> manifest_writers_; 1190 1191 // Current size of manifest file 1192 uint64_t manifest_file_size_; 1193 1194 std::vector<ObsoleteFileInfo> obsolete_files_; 1195 std::vector<std::string> obsolete_manifests_; 1196 1197 // env options for all reads and writes except compactions 1198 FileOptions file_options_; 1199 1200 BlockCacheTracer* const block_cache_tracer_; 1201 1202 // Store the IO status when Manifest is written 1203 IOStatus io_status_; 1204 1205 private: 1206 // REQUIRES db mutex at beginning. may release and re-acquire db mutex 1207 Status ProcessManifestWrites(std::deque<ManifestWriter>& writers, 1208 InstrumentedMutex* mu, FSDirectory* db_directory, 1209 bool new_descriptor_log, 1210 const ColumnFamilyOptions* new_cf_options); 1211 1212 void LogAndApplyCFHelper(VersionEdit* edit); 1213 Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, 1214 VersionEdit* edit, InstrumentedMutex* mu); 1215 }; 1216 1217 // ReactiveVersionSet represents a collection of versions of the column 1218 // families of the database. Users of ReactiveVersionSet, e.g. DBImplSecondary, 1219 // need to replay the MANIFEST (description log in older terms) in order to 1220 // reconstruct and install versions. 1221 class ReactiveVersionSet : public VersionSet { 1222 public: 1223 ReactiveVersionSet(const std::string& dbname, 1224 const ImmutableDBOptions* _db_options, 1225 const FileOptions& _file_options, Cache* table_cache, 1226 WriteBufferManager* write_buffer_manager, 1227 WriteController* write_controller); 1228 1229 ~ReactiveVersionSet() override; 1230 1231 Status ReadAndApply( 1232 InstrumentedMutex* mu, 1233 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader, 1234 std::unordered_set<ColumnFamilyData*>* cfds_changed); 1235 1236 Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families, 1237 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader, 1238 std::unique_ptr<log::Reader::Reporter>* manifest_reporter, 1239 std::unique_ptr<Status>* manifest_reader_status); 1240 TEST_read_edits_in_atomic_group()1241 uint64_t TEST_read_edits_in_atomic_group() const { 1242 return read_buffer_.TEST_read_edits_in_atomic_group(); 1243 } replay_buffer()1244 std::vector<VersionEdit>& replay_buffer() { 1245 return read_buffer_.replay_buffer(); 1246 } 1247 1248 protected: 1249 using VersionSet::ApplyOneVersionEditToBuilder; 1250 1251 // REQUIRES db mutex 1252 Status ApplyOneVersionEditToBuilder( 1253 VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed, 1254 VersionEdit* version_edit); 1255 1256 Status MaybeSwitchManifest( 1257 log::Reader::Reporter* reporter, 1258 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader); 1259 1260 private: 1261 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> 1262 active_version_builders_; 1263 AtomicGroupReadBuffer read_buffer_; 1264 // Number of version edits to skip by ReadAndApply at the beginning of a new 1265 // MANIFEST created by primary. 1266 int number_of_edits_to_skip_; 1267 1268 using VersionSet::LogAndApply; 1269 using VersionSet::Recover; 1270 LogAndApply(const autovector<ColumnFamilyData * > &,const autovector<const MutableCFOptions * > &,const autovector<autovector<VersionEdit * >> &,InstrumentedMutex *,FSDirectory *,bool,const ColumnFamilyOptions *)1271 Status LogAndApply( 1272 const autovector<ColumnFamilyData*>& /*cfds*/, 1273 const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/, 1274 const autovector<autovector<VersionEdit*>>& /*edit_lists*/, 1275 InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/, 1276 bool /*new_descriptor_log*/, 1277 const ColumnFamilyOptions* /*new_cf_option*/) override { 1278 return Status::NotSupported("not supported in reactive mode"); 1279 } 1280 1281 // No copy allowed 1282 ReactiveVersionSet(const ReactiveVersionSet&); 1283 ReactiveVersionSet& operator=(const ReactiveVersionSet&); 1284 }; 1285 1286 } // namespace ROCKSDB_NAMESPACE 1287