1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 
12 #include <stdio.h>
13 #include <algorithm>
14 #include <array>
15 #include <cinttypes>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 #include "compaction/compaction.h"
23 #include "db/internal_stats.h"
24 #include "db/log_reader.h"
25 #include "db/log_writer.h"
26 #include "db/memtable.h"
27 #include "db/merge_context.h"
28 #include "db/merge_helper.h"
29 #include "db/pinned_iterators_manager.h"
30 #include "db/table_cache.h"
31 #include "db/version_builder.h"
32 #include "db/version_edit_handler.h"
33 #include "file/filename.h"
34 #include "file/random_access_file_reader.h"
35 #include "file/read_write_util.h"
36 #include "file/writable_file_writer.h"
37 #include "monitoring/file_read_sample.h"
38 #include "monitoring/perf_context_imp.h"
39 #include "monitoring/persistent_stats_history.h"
40 #include "rocksdb/env.h"
41 #include "rocksdb/merge_operator.h"
42 #include "rocksdb/write_buffer_manager.h"
43 #include "table/format.h"
44 #include "table/get_context.h"
45 #include "table/internal_iterator.h"
46 #include "table/merging_iterator.h"
47 #include "table/meta_blocks.h"
48 #include "table/multiget_context.h"
49 #include "table/plain/plain_table_factory.h"
50 #include "table/table_reader.h"
51 #include "table/two_level_iterator.h"
52 #include "test_util/sync_point.h"
53 #include "util/coding.h"
54 #include "util/stop_watch.h"
55 #include "util/string_util.h"
56 #include "util/user_comparator_wrapper.h"
57 
58 namespace ROCKSDB_NAMESPACE {
59 
60 namespace {
61 
62 // Find File in LevelFilesBrief data structure
63 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)64 int FindFileInRange(const InternalKeyComparator& icmp,
65     const LevelFilesBrief& file_level,
66     const Slice& key,
67     uint32_t left,
68     uint32_t right) {
69   auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
70     return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
71   };
72   const auto &b = file_level.files;
73   return static_cast<int>(std::lower_bound(b + left,
74                                            b + right, key, cmp) - b);
75 }
76 
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)77 Status OverlapWithIterator(const Comparator* ucmp,
78     const Slice& smallest_user_key,
79     const Slice& largest_user_key,
80     InternalIterator* iter,
81     bool* overlap) {
82   InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
83                           kValueTypeForSeek);
84   iter->Seek(range_start.Encode());
85   if (!iter->status().ok()) {
86     return iter->status();
87   }
88 
89   *overlap = false;
90   if (iter->Valid()) {
91     ParsedInternalKey seek_result;
92     if (!ParseInternalKey(iter->key(), &seek_result)) {
93       return Status::Corruption("DB have corrupted keys");
94     }
95 
96     if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
97         0) {
98       *overlap = true;
99     }
100   }
101 
102   return iter->status();
103 }
104 
105 // Class to help choose the next file to search for the particular key.
106 // Searches and returns files level by level.
107 // We can search level-by-level since entries never hop across
108 // levels. Therefore we are guaranteed that if we find data
109 // in a smaller level, later levels are irrelevant (unless we
110 // are MergeInProgress).
111 class FilePicker {
112  public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)113   FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
114              const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
115              unsigned int num_levels, FileIndexer* file_indexer,
116              const Comparator* user_comparator,
117              const InternalKeyComparator* internal_comparator)
118       : num_levels_(num_levels),
119         curr_level_(static_cast<unsigned int>(-1)),
120         returned_file_level_(static_cast<unsigned int>(-1)),
121         hit_file_level_(static_cast<unsigned int>(-1)),
122         search_left_bound_(0),
123         search_right_bound_(FileIndexer::kLevelMaxIndex),
124 #ifndef NDEBUG
125         files_(files),
126 #endif
127         level_files_brief_(file_levels),
128         is_hit_file_last_in_level_(false),
129         curr_file_level_(nullptr),
130         user_key_(user_key),
131         ikey_(ikey),
132         file_indexer_(file_indexer),
133         user_comparator_(user_comparator),
134         internal_comparator_(internal_comparator) {
135 #ifdef NDEBUG
136     (void)files;
137 #endif
138     // Setup member variables to search first level.
139     search_ended_ = !PrepareNextLevel();
140     if (!search_ended_) {
141       // Prefetch Level 0 table data to avoid cache miss if possible.
142       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
143         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
144         if (r) {
145           r->Prepare(ikey);
146         }
147       }
148     }
149   }
150 
GetCurrentLevel() const151   int GetCurrentLevel() const { return curr_level_; }
152 
GetNextFile()153   FdWithKeyRange* GetNextFile() {
154     while (!search_ended_) {  // Loops over different levels.
155       while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
156         // Loops over all files in current level.
157         FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
158         hit_file_level_ = curr_level_;
159         is_hit_file_last_in_level_ =
160             curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
161         int cmp_largest = -1;
162 
163         // Do key range filtering of files or/and fractional cascading if:
164         // (1) not all the files are in level 0, or
165         // (2) there are more than 3 current level files
166         // If there are only 3 or less current level files in the system, we skip
167         // the key range filtering. In this case, more likely, the system is
168         // highly tuned to minimize number of tables queried by each query,
169         // so it is unlikely that key range filtering is more efficient than
170         // querying the files.
171         if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
172           // Check if key is within a file's range. If search left bound and
173           // right bound point to the same find, we are sure key falls in
174           // range.
175           assert(curr_level_ == 0 ||
176                  curr_index_in_curr_level_ == start_index_in_curr_level_ ||
177                  user_comparator_->CompareWithoutTimestamp(
178                      user_key_, ExtractUserKey(f->smallest_key)) <= 0);
179 
180           int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
181               user_key_, ExtractUserKey(f->smallest_key));
182           if (cmp_smallest >= 0) {
183             cmp_largest = user_comparator_->CompareWithoutTimestamp(
184                 user_key_, ExtractUserKey(f->largest_key));
185           }
186 
187           // Setup file search bound for the next level based on the
188           // comparison results
189           if (curr_level_ > 0) {
190             file_indexer_->GetNextLevelIndex(curr_level_,
191                                             curr_index_in_curr_level_,
192                                             cmp_smallest, cmp_largest,
193                                             &search_left_bound_,
194                                             &search_right_bound_);
195           }
196           // Key falls out of current file's range
197           if (cmp_smallest < 0 || cmp_largest > 0) {
198             if (curr_level_ == 0) {
199               ++curr_index_in_curr_level_;
200               continue;
201             } else {
202               // Search next level.
203               break;
204             }
205           }
206         }
207 #ifndef NDEBUG
208         // Sanity check to make sure that the files are correctly sorted
209         if (prev_file_) {
210           if (curr_level_ != 0) {
211             int comp_sign = internal_comparator_->Compare(
212                 prev_file_->largest_key, f->smallest_key);
213             assert(comp_sign < 0);
214           } else {
215             // level == 0, the current file cannot be newer than the previous
216             // one. Use compressed data structure, has no attribute seqNo
217             assert(curr_index_in_curr_level_ > 0);
218             assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
219                   files_[0][curr_index_in_curr_level_-1]));
220           }
221         }
222         prev_file_ = f;
223 #endif
224         returned_file_level_ = curr_level_;
225         if (curr_level_ > 0 && cmp_largest < 0) {
226           // No more files to search in this level.
227           search_ended_ = !PrepareNextLevel();
228         } else {
229           ++curr_index_in_curr_level_;
230         }
231         return f;
232       }
233       // Start searching next level.
234       search_ended_ = !PrepareNextLevel();
235     }
236     // Search ended.
237     return nullptr;
238   }
239 
240   // getter for current file level
241   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()242   unsigned int GetHitFileLevel() { return hit_file_level_; }
243 
244   // Returns true if the most recent "hit file" (i.e., one returned by
245   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()246   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
247 
248  private:
249   unsigned int num_levels_;
250   unsigned int curr_level_;
251   unsigned int returned_file_level_;
252   unsigned int hit_file_level_;
253   int32_t search_left_bound_;
254   int32_t search_right_bound_;
255 #ifndef NDEBUG
256   std::vector<FileMetaData*>* files_;
257 #endif
258   autovector<LevelFilesBrief>* level_files_brief_;
259   bool search_ended_;
260   bool is_hit_file_last_in_level_;
261   LevelFilesBrief* curr_file_level_;
262   unsigned int curr_index_in_curr_level_;
263   unsigned int start_index_in_curr_level_;
264   Slice user_key_;
265   Slice ikey_;
266   FileIndexer* file_indexer_;
267   const Comparator* user_comparator_;
268   const InternalKeyComparator* internal_comparator_;
269 #ifndef NDEBUG
270   FdWithKeyRange* prev_file_;
271 #endif
272 
273   // Setup local variables to search next level.
274   // Returns false if there are no more levels to search.
PrepareNextLevel()275   bool PrepareNextLevel() {
276     curr_level_++;
277     while (curr_level_ < num_levels_) {
278       curr_file_level_ = &(*level_files_brief_)[curr_level_];
279       if (curr_file_level_->num_files == 0) {
280         // When current level is empty, the search bound generated from upper
281         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
282         // also empty.
283         assert(search_left_bound_ == 0);
284         assert(search_right_bound_ == -1 ||
285                search_right_bound_ == FileIndexer::kLevelMaxIndex);
286         // Since current level is empty, it will need to search all files in
287         // the next level
288         search_left_bound_ = 0;
289         search_right_bound_ = FileIndexer::kLevelMaxIndex;
290         curr_level_++;
291         continue;
292       }
293 
294       // Some files may overlap each other. We find
295       // all files that overlap user_key and process them in order from
296       // newest to oldest. In the context of merge-operator, this can occur at
297       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
298       // are always compacted into a single entry).
299       int32_t start_index;
300       if (curr_level_ == 0) {
301         // On Level-0, we read through all files to check for overlap.
302         start_index = 0;
303       } else {
304         // On Level-n (n>=1), files are sorted. Binary search to find the
305         // earliest file whose largest key >= ikey. Search left bound and
306         // right bound are used to narrow the range.
307         if (search_left_bound_ <= search_right_bound_) {
308           if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
309             search_right_bound_ =
310                 static_cast<int32_t>(curr_file_level_->num_files) - 1;
311           }
312           // `search_right_bound_` is an inclusive upper-bound, but since it was
313           // determined based on user key, it is still possible the lookup key
314           // falls to the right of `search_right_bound_`'s corresponding file.
315           // So, pass a limit one higher, which allows us to detect this case.
316           start_index =
317               FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
318                               static_cast<uint32_t>(search_left_bound_),
319                               static_cast<uint32_t>(search_right_bound_) + 1);
320           if (start_index == search_right_bound_ + 1) {
321             // `ikey_` comes after `search_right_bound_`. The lookup key does
322             // not exist on this level, so let's skip this level and do a full
323             // binary search on the next level.
324             search_left_bound_ = 0;
325             search_right_bound_ = FileIndexer::kLevelMaxIndex;
326             curr_level_++;
327             continue;
328           }
329         } else {
330           // search_left_bound > search_right_bound, key does not exist in
331           // this level. Since no comparison is done in this level, it will
332           // need to search all files in the next level.
333           search_left_bound_ = 0;
334           search_right_bound_ = FileIndexer::kLevelMaxIndex;
335           curr_level_++;
336           continue;
337         }
338       }
339       start_index_in_curr_level_ = start_index;
340       curr_index_in_curr_level_ = start_index;
341 #ifndef NDEBUG
342       prev_file_ = nullptr;
343 #endif
344       return true;
345     }
346     // curr_level_ = num_levels_. So, no more levels to search.
347     return false;
348   }
349 };
350 
351 class FilePickerMultiGet {
352  private:
353   struct FilePickerContext;
354 
355  public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)356   FilePickerMultiGet(MultiGetRange* range,
357                      autovector<LevelFilesBrief>* file_levels,
358                      unsigned int num_levels, FileIndexer* file_indexer,
359                      const Comparator* user_comparator,
360                      const InternalKeyComparator* internal_comparator)
361       : num_levels_(num_levels),
362         curr_level_(static_cast<unsigned int>(-1)),
363         returned_file_level_(static_cast<unsigned int>(-1)),
364         hit_file_level_(static_cast<unsigned int>(-1)),
365         range_(range),
366         batch_iter_(range->begin()),
367         batch_iter_prev_(range->begin()),
368         maybe_repeat_key_(false),
369         current_level_range_(*range, range->begin(), range->end()),
370         current_file_range_(*range, range->begin(), range->end()),
371         level_files_brief_(file_levels),
372         is_hit_file_last_in_level_(false),
373         curr_file_level_(nullptr),
374         file_indexer_(file_indexer),
375         user_comparator_(user_comparator),
376         internal_comparator_(internal_comparator) {
377     for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
378       fp_ctx_array_[iter.index()] =
379           FilePickerContext(0, FileIndexer::kLevelMaxIndex);
380     }
381 
382     // Setup member variables to search first level.
383     search_ended_ = !PrepareNextLevel();
384     if (!search_ended_) {
385       // REVISIT
386       // Prefetch Level 0 table data to avoid cache miss if possible.
387       // As of now, only PlainTableReader and CuckooTableReader do any
388       // prefetching. This may not be necessary anymore once we implement
389       // batching in those table readers
390       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
391         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
392         if (r) {
393           for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
394             r->Prepare(iter->ikey);
395           }
396         }
397       }
398     }
399   }
400 
GetCurrentLevel() const401   int GetCurrentLevel() const { return curr_level_; }
402 
403   // Iterates through files in the current level until it finds a file that
404   // contains atleast one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)405   bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
406                                   size_t* file_index, FdWithKeyRange** fd,
407                                   bool* is_last_key_in_file) {
408     size_t curr_file_index = *file_index;
409     FdWithKeyRange* f = nullptr;
410     bool file_hit = false;
411     int cmp_largest = -1;
412     if (curr_file_index >= curr_file_level_->num_files) {
413       // In the unlikely case the next key is a duplicate of the current key,
414       // and the current key is the last in the level and the internal key
415       // was not found, we need to skip lookup for the remaining keys and
416       // reset the search bounds
417       if (batch_iter_ != current_level_range_.end()) {
418         ++batch_iter_;
419         for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
420           struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
421           fp_ctx.search_left_bound = 0;
422           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
423         }
424       }
425       return false;
426     }
427     // Loops over keys in the MultiGet batch until it finds a file with
428     // atleast one of the keys. Then it keeps moving forward until the
429     // last key in the batch that falls in that file
430     while (batch_iter_ != current_level_range_.end() &&
431            (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
432                 curr_file_index ||
433             !file_hit)) {
434       struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
435       f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
436       Slice& user_key = batch_iter_->ukey;
437 
438       // Do key range filtering of files or/and fractional cascading if:
439       // (1) not all the files are in level 0, or
440       // (2) there are more than 3 current level files
441       // If there are only 3 or less current level files in the system, we
442       // skip the key range filtering. In this case, more likely, the system
443       // is highly tuned to minimize number of tables queried by each query,
444       // so it is unlikely that key range filtering is more efficient than
445       // querying the files.
446       if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
447         // Check if key is within a file's range. If search left bound and
448         // right bound point to the same find, we are sure key falls in
449         // range.
450         assert(curr_level_ == 0 ||
451                fp_ctx.curr_index_in_curr_level ==
452                    fp_ctx.start_index_in_curr_level ||
453                user_comparator_->Compare(user_key,
454                                          ExtractUserKey(f->smallest_key)) <= 0);
455 
456         int cmp_smallest = user_comparator_->Compare(
457             user_key, ExtractUserKey(f->smallest_key));
458         if (cmp_smallest >= 0) {
459           cmp_largest = user_comparator_->Compare(
460               user_key, ExtractUserKey(f->largest_key));
461         } else {
462           cmp_largest = -1;
463         }
464 
465         // Setup file search bound for the next level based on the
466         // comparison results
467         if (curr_level_ > 0) {
468           file_indexer_->GetNextLevelIndex(
469               curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
470               cmp_largest, &fp_ctx.search_left_bound,
471               &fp_ctx.search_right_bound);
472         }
473         // Key falls out of current file's range
474         if (cmp_smallest < 0 || cmp_largest > 0) {
475           next_file_range->SkipKey(batch_iter_);
476         } else {
477           file_hit = true;
478         }
479       } else {
480         file_hit = true;
481       }
482       if (cmp_largest == 0) {
483         // cmp_largest is 0, which means the next key will not be in this
484         // file, so stop looking further. Also don't increment megt_iter_
485         // as we may have to look for this key in the next file if we don't
486         // find it in this one
487         break;
488       } else {
489         if (curr_level_ == 0) {
490           // We need to look through all files in level 0
491           ++fp_ctx.curr_index_in_curr_level;
492         }
493         ++batch_iter_;
494       }
495       if (!file_hit) {
496         curr_file_index =
497             (batch_iter_ != current_level_range_.end())
498                 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
499                 : curr_file_level_->num_files;
500       }
501     }
502 
503     *fd = f;
504     *file_index = curr_file_index;
505     *is_last_key_in_file = cmp_largest == 0;
506     return file_hit;
507   }
508 
GetNextFile()509   FdWithKeyRange* GetNextFile() {
510     while (!search_ended_) {
511       // Start searching next level.
512       if (batch_iter_ == current_level_range_.end()) {
513         search_ended_ = !PrepareNextLevel();
514         continue;
515       } else {
516         if (maybe_repeat_key_) {
517           maybe_repeat_key_ = false;
518           // Check if we found the final value for the last key in the
519           // previous lookup range. If we did, then there's no need to look
520           // any further for that key, so advance batch_iter_. Else, keep
521           // batch_iter_ positioned on that key so we look it up again in
522           // the next file
523           // For L0, always advance the key because we will look in the next
524           // file regardless for all keys not found yet
525           if (current_level_range_.CheckKeyDone(batch_iter_) ||
526               curr_level_ == 0) {
527             ++batch_iter_;
528           }
529         }
530         // batch_iter_prev_ will become the start key for the next file
531         // lookup
532         batch_iter_prev_ = batch_iter_;
533       }
534 
535       MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
536                                     current_level_range_.end());
537       size_t curr_file_index =
538           (batch_iter_ != current_level_range_.end())
539               ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
540               : curr_file_level_->num_files;
541       FdWithKeyRange* f;
542       bool is_last_key_in_file;
543       if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
544                                       &is_last_key_in_file)) {
545         search_ended_ = !PrepareNextLevel();
546       } else {
547         MultiGetRange::Iterator upper_key = batch_iter_;
548         if (is_last_key_in_file) {
549           // Since cmp_largest is 0, batch_iter_ still points to the last key
550           // that falls in this file, instead of the next one. Increment
551           // upper_key so we can set the range properly for SST MultiGet
552           ++upper_key;
553           ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
554           maybe_repeat_key_ = true;
555         }
556         // Set the range for this file
557         current_file_range_ =
558             MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
559         returned_file_level_ = curr_level_;
560         hit_file_level_ = curr_level_;
561         is_hit_file_last_in_level_ =
562             curr_file_index == curr_file_level_->num_files - 1;
563         return f;
564       }
565     }
566 
567     // Search ended
568     return nullptr;
569   }
570 
571   // getter for current file level
572   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()573   unsigned int GetHitFileLevel() { return hit_file_level_; }
574 
575   // Returns true if the most recent "hit file" (i.e., one returned by
576   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()577   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
578 
CurrentFileRange()579   const MultiGetRange& CurrentFileRange() { return current_file_range_; }
580 
581  private:
582   unsigned int num_levels_;
583   unsigned int curr_level_;
584   unsigned int returned_file_level_;
585   unsigned int hit_file_level_;
586 
587   struct FilePickerContext {
588     int32_t search_left_bound;
589     int32_t search_right_bound;
590     unsigned int curr_index_in_curr_level;
591     unsigned int start_index_in_curr_level;
592 
FilePickerContextROCKSDB_NAMESPACE::__anonc84eced60111::FilePickerMultiGet::FilePickerContext593     FilePickerContext(int32_t left, int32_t right)
594         : search_left_bound(left), search_right_bound(right),
595           curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
596 
597     FilePickerContext() = default;
598   };
599   std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
600   MultiGetRange* range_;
601   // Iterator to iterate through the keys in a MultiGet batch, that gets reset
602   // at the beginning of each level. Each call to GetNextFile() will position
603   // batch_iter_ at or right after the last key that was found in the returned
604   // SST file
605   MultiGetRange::Iterator batch_iter_;
606   // An iterator that records the previous position of batch_iter_, i.e last
607   // key found in the previous SST file, in order to serve as the start of
608   // the batch key range for the next SST file
609   MultiGetRange::Iterator batch_iter_prev_;
610   bool maybe_repeat_key_;
611   MultiGetRange current_level_range_;
612   MultiGetRange current_file_range_;
613   autovector<LevelFilesBrief>* level_files_brief_;
614   bool search_ended_;
615   bool is_hit_file_last_in_level_;
616   LevelFilesBrief* curr_file_level_;
617   FileIndexer* file_indexer_;
618   const Comparator* user_comparator_;
619   const InternalKeyComparator* internal_comparator_;
620 
621   // Setup local variables to search next level.
622   // Returns false if there are no more levels to search.
PrepareNextLevel()623   bool PrepareNextLevel() {
624     if (curr_level_ == 0) {
625       MultiGetRange::Iterator mget_iter = current_level_range_.begin();
626       if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
627           curr_file_level_->num_files) {
628         batch_iter_prev_ = current_level_range_.begin();
629         batch_iter_ = current_level_range_.begin();
630         return true;
631       }
632     }
633 
634     curr_level_++;
635     // Reset key range to saved value
636     while (curr_level_ < num_levels_) {
637       bool level_contains_keys = false;
638       curr_file_level_ = &(*level_files_brief_)[curr_level_];
639       if (curr_file_level_->num_files == 0) {
640         // When current level is empty, the search bound generated from upper
641         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
642         // also empty.
643 
644         for (auto mget_iter = current_level_range_.begin();
645              mget_iter != current_level_range_.end(); ++mget_iter) {
646           struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
647 
648           assert(fp_ctx.search_left_bound == 0);
649           assert(fp_ctx.search_right_bound == -1 ||
650                  fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
651           // Since current level is empty, it will need to search all files in
652           // the next level
653           fp_ctx.search_left_bound = 0;
654           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
655         }
656         // Skip all subsequent empty levels
657         do {
658           ++curr_level_;
659         } while ((curr_level_ < num_levels_) &&
660                  (*level_files_brief_)[curr_level_].num_files == 0);
661         continue;
662       }
663 
664       // Some files may overlap each other. We find
665       // all files that overlap user_key and process them in order from
666       // newest to oldest. In the context of merge-operator, this can occur at
667       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
668       // are always compacted into a single entry).
669       int32_t start_index = -1;
670       current_level_range_ =
671           MultiGetRange(*range_, range_->begin(), range_->end());
672       for (auto mget_iter = current_level_range_.begin();
673            mget_iter != current_level_range_.end(); ++mget_iter) {
674         struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
675         if (curr_level_ == 0) {
676           // On Level-0, we read through all files to check for overlap.
677           start_index = 0;
678           level_contains_keys = true;
679         } else {
680           // On Level-n (n>=1), files are sorted. Binary search to find the
681           // earliest file whose largest key >= ikey. Search left bound and
682           // right bound are used to narrow the range.
683           if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
684             if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
685               fp_ctx.search_right_bound =
686                   static_cast<int32_t>(curr_file_level_->num_files) - 1;
687             }
688             // `search_right_bound_` is an inclusive upper-bound, but since it
689             // was determined based on user key, it is still possible the lookup
690             // key falls to the right of `search_right_bound_`'s corresponding
691             // file. So, pass a limit one higher, which allows us to detect this
692             // case.
693             Slice& ikey = mget_iter->ikey;
694             start_index = FindFileInRange(
695                 *internal_comparator_, *curr_file_level_, ikey,
696                 static_cast<uint32_t>(fp_ctx.search_left_bound),
697                 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
698             if (start_index == fp_ctx.search_right_bound + 1) {
699               // `ikey_` comes after `search_right_bound_`. The lookup key does
700               // not exist on this level, so let's skip this level and do a full
701               // binary search on the next level.
702               fp_ctx.search_left_bound = 0;
703               fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
704               current_level_range_.SkipKey(mget_iter);
705               continue;
706             } else {
707               level_contains_keys = true;
708             }
709           } else {
710             // search_left_bound > search_right_bound, key does not exist in
711             // this level. Since no comparison is done in this level, it will
712             // need to search all files in the next level.
713             fp_ctx.search_left_bound = 0;
714             fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
715             current_level_range_.SkipKey(mget_iter);
716             continue;
717           }
718         }
719         fp_ctx.start_index_in_curr_level = start_index;
720         fp_ctx.curr_index_in_curr_level = start_index;
721       }
722       if (level_contains_keys) {
723         batch_iter_prev_ = current_level_range_.begin();
724         batch_iter_ = current_level_range_.begin();
725         return true;
726       }
727       curr_level_++;
728     }
729     // curr_level_ = num_levels_. So, no more levels to search.
730     return false;
731   }
732 };
733 }  // anonymous namespace
734 
~VersionStorageInfo()735 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
736 
~Version()737 Version::~Version() {
738   assert(refs_ == 0);
739 
740   // Remove from linked list
741   prev_->next_ = next_;
742   next_->prev_ = prev_;
743 
744   // Drop references to files
745   for (int level = 0; level < storage_info_.num_levels_; level++) {
746     for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
747       FileMetaData* f = storage_info_.files_[level][i];
748       assert(f->refs > 0);
749       f->refs--;
750       if (f->refs <= 0) {
751         assert(cfd_ != nullptr);
752         uint32_t path_id = f->fd.GetPathId();
753         assert(path_id < cfd_->ioptions()->cf_paths.size());
754         vset_->obsolete_files_.push_back(
755             ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
756       }
757     }
758   }
759 }
760 
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)761 int FindFile(const InternalKeyComparator& icmp,
762              const LevelFilesBrief& file_level,
763              const Slice& key) {
764   return FindFileInRange(icmp, file_level, key, 0,
765                          static_cast<uint32_t>(file_level.num_files));
766 }
767 
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)768 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
769         const std::vector<FileMetaData*>& files,
770         Arena* arena) {
771   assert(file_level);
772   assert(arena);
773 
774   size_t num = files.size();
775   file_level->num_files = num;
776   char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
777   file_level->files = new (mem)FdWithKeyRange[num];
778 
779   for (size_t i = 0; i < num; i++) {
780     Slice smallest_key = files[i]->smallest.Encode();
781     Slice largest_key = files[i]->largest.Encode();
782 
783     // Copy key slice to sequential memory
784     size_t smallest_size = smallest_key.size();
785     size_t largest_size = largest_key.size();
786     mem = arena->AllocateAligned(smallest_size + largest_size);
787     memcpy(mem, smallest_key.data(), smallest_size);
788     memcpy(mem + smallest_size, largest_key.data(), largest_size);
789 
790     FdWithKeyRange& f = file_level->files[i];
791     f.fd = files[i]->fd;
792     f.file_metadata = files[i];
793     f.smallest_key = Slice(mem, smallest_size);
794     f.largest_key = Slice(mem + smallest_size, largest_size);
795   }
796 }
797 
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)798 static bool AfterFile(const Comparator* ucmp,
799                       const Slice* user_key, const FdWithKeyRange* f) {
800   // nullptr user_key occurs before all keys and is therefore never after *f
801   return (user_key != nullptr &&
802           ucmp->CompareWithoutTimestamp(*user_key,
803                                         ExtractUserKey(f->largest_key)) > 0);
804 }
805 
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)806 static bool BeforeFile(const Comparator* ucmp,
807                        const Slice* user_key, const FdWithKeyRange* f) {
808   // nullptr user_key occurs after all keys and is therefore never before *f
809   return (user_key != nullptr &&
810           ucmp->CompareWithoutTimestamp(*user_key,
811                                         ExtractUserKey(f->smallest_key)) < 0);
812 }
813 
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)814 bool SomeFileOverlapsRange(
815     const InternalKeyComparator& icmp,
816     bool disjoint_sorted_files,
817     const LevelFilesBrief& file_level,
818     const Slice* smallest_user_key,
819     const Slice* largest_user_key) {
820   const Comparator* ucmp = icmp.user_comparator();
821   if (!disjoint_sorted_files) {
822     // Need to check against all files
823     for (size_t i = 0; i < file_level.num_files; i++) {
824       const FdWithKeyRange* f = &(file_level.files[i]);
825       if (AfterFile(ucmp, smallest_user_key, f) ||
826           BeforeFile(ucmp, largest_user_key, f)) {
827         // No overlap
828       } else {
829         return true;  // Overlap
830       }
831     }
832     return false;
833   }
834 
835   // Binary search over file list
836   uint32_t index = 0;
837   if (smallest_user_key != nullptr) {
838     // Find the leftmost possible internal key for smallest_user_key
839     InternalKey small;
840     small.SetMinPossibleForUserKey(*smallest_user_key);
841     index = FindFile(icmp, file_level, small.Encode());
842   }
843 
844   if (index >= file_level.num_files) {
845     // beginning of range is after all files, so no overlap.
846     return false;
847   }
848 
849   return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
850 }
851 
852 namespace {
853 
854 class LevelIterator final : public InternalIterator {
855  public:
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr)856   LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
857                 const FileOptions& file_options,
858                 const InternalKeyComparator& icomparator,
859                 const LevelFilesBrief* flevel,
860                 const SliceTransform* prefix_extractor, bool should_sample,
861                 HistogramImpl* file_read_hist, TableReaderCaller caller,
862                 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
863                 const std::vector<AtomicCompactionUnitBoundary>*
864                     compaction_boundaries = nullptr)
865       : table_cache_(table_cache),
866         read_options_(read_options),
867         file_options_(file_options),
868         icomparator_(icomparator),
869         user_comparator_(icomparator.user_comparator()),
870         flevel_(flevel),
871         prefix_extractor_(prefix_extractor),
872         file_read_hist_(file_read_hist),
873         should_sample_(should_sample),
874         caller_(caller),
875         skip_filters_(skip_filters),
876         file_index_(flevel_->num_files),
877         level_(level),
878         range_del_agg_(range_del_agg),
879         pinned_iters_mgr_(nullptr),
880         compaction_boundaries_(compaction_boundaries) {
881     // Empty level is not supported.
882     assert(flevel_ != nullptr && flevel_->num_files > 0);
883   }
884 
~LevelIterator()885   ~LevelIterator() override { delete file_iter_.Set(nullptr); }
886 
887   void Seek(const Slice& target) override;
888   void SeekForPrev(const Slice& target) override;
889   void SeekToFirst() override;
890   void SeekToLast() override;
891   void Next() final override;
892   bool NextAndGetResult(IterateResult* result) override;
893   void Prev() override;
894 
Valid() const895   bool Valid() const override { return file_iter_.Valid(); }
key() const896   Slice key() const override {
897     assert(Valid());
898     return file_iter_.key();
899   }
900 
value() const901   Slice value() const override {
902     assert(Valid());
903     return file_iter_.value();
904   }
905 
status() const906   Status status() const override {
907     return file_iter_.iter() ? file_iter_.status() : Status::OK();
908   }
909 
MayBeOutOfLowerBound()910   inline bool MayBeOutOfLowerBound() override {
911     assert(Valid());
912     return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
913   }
914 
MayBeOutOfUpperBound()915   inline bool MayBeOutOfUpperBound() override {
916     assert(Valid());
917     return file_iter_.MayBeOutOfUpperBound();
918   }
919 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)920   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
921     pinned_iters_mgr_ = pinned_iters_mgr;
922     if (file_iter_.iter()) {
923       file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
924     }
925   }
926 
IsKeyPinned() const927   bool IsKeyPinned() const override {
928     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
929            file_iter_.iter() && file_iter_.IsKeyPinned();
930   }
931 
IsValuePinned() const932   bool IsValuePinned() const override {
933     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
934            file_iter_.iter() && file_iter_.IsValuePinned();
935   }
936 
937  private:
938   // Return true if at least one invalid file is seen and skipped.
939   bool SkipEmptyFileForward();
940   void SkipEmptyFileBackward();
941   void SetFileIterator(InternalIterator* iter);
942   void InitFileIterator(size_t new_file_index);
943 
944   // Called by both of Next() and NextAndGetResult(). Force inline.
NextImpl()945   void NextImpl() {
946     assert(Valid());
947     file_iter_.Next();
948     SkipEmptyFileForward();
949   }
950 
file_smallest_key(size_t file_index)951   const Slice& file_smallest_key(size_t file_index) {
952     assert(file_index < flevel_->num_files);
953     return flevel_->files[file_index].smallest_key;
954   }
955 
KeyReachedUpperBound(const Slice & internal_key)956   bool KeyReachedUpperBound(const Slice& internal_key) {
957     return read_options_.iterate_upper_bound != nullptr &&
958            user_comparator_.CompareWithoutTimestamp(
959                ExtractUserKey(internal_key), /*a_has_ts=*/true,
960                *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
961   }
962 
NewFileIterator()963   InternalIterator* NewFileIterator() {
964     assert(file_index_ < flevel_->num_files);
965     auto file_meta = flevel_->files[file_index_];
966     if (should_sample_) {
967       sample_file_read_inc(file_meta.file_metadata);
968     }
969 
970     const InternalKey* smallest_compaction_key = nullptr;
971     const InternalKey* largest_compaction_key = nullptr;
972     if (compaction_boundaries_ != nullptr) {
973       smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
974       largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
975     }
976     CheckMayBeOutOfLowerBound();
977     return table_cache_->NewIterator(
978         read_options_, file_options_, icomparator_, *file_meta.file_metadata,
979         range_del_agg_, prefix_extractor_,
980         nullptr /* don't need reference to table */, file_read_hist_, caller_,
981         /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
982         largest_compaction_key);
983   }
984 
985   // Check if current file being fully within iterate_lower_bound.
986   //
987   // Note MyRocks may update iterate bounds between seek. To workaround it,
988   // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()989   void CheckMayBeOutOfLowerBound() {
990     if (read_options_.iterate_lower_bound != nullptr &&
991         file_index_ < flevel_->num_files) {
992       may_be_out_of_lower_bound_ =
993           user_comparator_.Compare(
994               ExtractUserKey(file_smallest_key(file_index_)),
995               *read_options_.iterate_lower_bound) < 0;
996     }
997   }
998 
999   TableCache* table_cache_;
1000   const ReadOptions read_options_;
1001   const FileOptions& file_options_;
1002   const InternalKeyComparator& icomparator_;
1003   const UserComparatorWrapper user_comparator_;
1004   const LevelFilesBrief* flevel_;
1005   mutable FileDescriptor current_value_;
1006   // `prefix_extractor_` may be non-null even for total order seek. Checking
1007   // this variable is not the right way to identify whether prefix iterator
1008   // is used.
1009   const SliceTransform* prefix_extractor_;
1010 
1011   HistogramImpl* file_read_hist_;
1012   bool should_sample_;
1013   TableReaderCaller caller_;
1014   bool skip_filters_;
1015   bool may_be_out_of_lower_bound_ = true;
1016   size_t file_index_;
1017   int level_;
1018   RangeDelAggregator* range_del_agg_;
1019   IteratorWrapper file_iter_;  // May be nullptr
1020   PinnedIteratorsManager* pinned_iters_mgr_;
1021 
1022   // To be propagated to RangeDelAggregator in order to safely truncate range
1023   // tombstones.
1024   const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1025 };
1026 
Seek(const Slice & target)1027 void LevelIterator::Seek(const Slice& target) {
1028   // Check whether the seek key fall under the same file
1029   bool need_to_reseek = true;
1030   if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1031     const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1032     if (icomparator_.InternalKeyComparator::Compare(
1033             target, cur_file.largest_key) <= 0 &&
1034         icomparator_.InternalKeyComparator::Compare(
1035             target, cur_file.smallest_key) >= 0) {
1036       need_to_reseek = false;
1037       assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1038              file_index_);
1039     }
1040   }
1041   if (need_to_reseek) {
1042     TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1043     size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1044     InitFileIterator(new_file_index);
1045   }
1046 
1047   if (file_iter_.iter() != nullptr) {
1048     file_iter_.Seek(target);
1049   }
1050   if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1051       !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1052       file_iter_.iter() != nullptr && file_iter_.Valid()) {
1053     // We've skipped the file we initially positioned to. In the prefix
1054     // seek case, it is likely that the file is skipped because of
1055     // prefix bloom or hash, where more keys are skipped. We then check
1056     // the current key and invalidate the iterator if the prefix is
1057     // already passed.
1058     // When doing prefix iterator seek, when keys for one prefix have
1059     // been exhausted, it can jump to any key that is larger. Here we are
1060     // enforcing a stricter contract than that, in order to make it easier for
1061     // higher layers (merging and DB iterator) to reason the correctness:
1062     // 1. Within the prefix, the result should be accurate.
1063     // 2. If keys for the prefix is exhausted, it is either positioned to the
1064     //    next key after the prefix, or make the iterator invalid.
1065     // A side benefit will be that it invalidates the iterator earlier so that
1066     // the upper level merging iterator can merge fewer child iterators.
1067     Slice target_user_key = ExtractUserKey(target);
1068     Slice file_user_key = ExtractUserKey(file_iter_.key());
1069     if (prefix_extractor_->InDomain(target_user_key) &&
1070         (!prefix_extractor_->InDomain(file_user_key) ||
1071          user_comparator_.Compare(
1072              prefix_extractor_->Transform(target_user_key),
1073              prefix_extractor_->Transform(file_user_key)) != 0)) {
1074       SetFileIterator(nullptr);
1075     }
1076   }
1077   CheckMayBeOutOfLowerBound();
1078 }
1079 
SeekForPrev(const Slice & target)1080 void LevelIterator::SeekForPrev(const Slice& target) {
1081   size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1082   if (new_file_index >= flevel_->num_files) {
1083     new_file_index = flevel_->num_files - 1;
1084   }
1085 
1086   InitFileIterator(new_file_index);
1087   if (file_iter_.iter() != nullptr) {
1088     file_iter_.SeekForPrev(target);
1089     SkipEmptyFileBackward();
1090   }
1091   CheckMayBeOutOfLowerBound();
1092 }
1093 
SeekToFirst()1094 void LevelIterator::SeekToFirst() {
1095   InitFileIterator(0);
1096   if (file_iter_.iter() != nullptr) {
1097     file_iter_.SeekToFirst();
1098   }
1099   SkipEmptyFileForward();
1100   CheckMayBeOutOfLowerBound();
1101 }
1102 
SeekToLast()1103 void LevelIterator::SeekToLast() {
1104   InitFileIterator(flevel_->num_files - 1);
1105   if (file_iter_.iter() != nullptr) {
1106     file_iter_.SeekToLast();
1107   }
1108   SkipEmptyFileBackward();
1109   CheckMayBeOutOfLowerBound();
1110 }
1111 
Next()1112 void LevelIterator::Next() { NextImpl(); }
1113 
NextAndGetResult(IterateResult * result)1114 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1115   NextImpl();
1116   bool is_valid = Valid();
1117   if (is_valid) {
1118     result->key = key();
1119     result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
1120   }
1121   return is_valid;
1122 }
1123 
Prev()1124 void LevelIterator::Prev() {
1125   assert(Valid());
1126   file_iter_.Prev();
1127   SkipEmptyFileBackward();
1128 }
1129 
SkipEmptyFileForward()1130 bool LevelIterator::SkipEmptyFileForward() {
1131   bool seen_empty_file = false;
1132   while (file_iter_.iter() == nullptr ||
1133          (!file_iter_.Valid() && file_iter_.status().ok() &&
1134           !file_iter_.iter()->IsOutOfBound())) {
1135     seen_empty_file = true;
1136     // Move to next file
1137     if (file_index_ >= flevel_->num_files - 1) {
1138       // Already at the last file
1139       SetFileIterator(nullptr);
1140       break;
1141     }
1142     if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1143       SetFileIterator(nullptr);
1144       break;
1145     }
1146     InitFileIterator(file_index_ + 1);
1147     if (file_iter_.iter() != nullptr) {
1148       file_iter_.SeekToFirst();
1149     }
1150   }
1151   return seen_empty_file;
1152 }
1153 
SkipEmptyFileBackward()1154 void LevelIterator::SkipEmptyFileBackward() {
1155   while (file_iter_.iter() == nullptr ||
1156          (!file_iter_.Valid() && file_iter_.status().ok())) {
1157     // Move to previous file
1158     if (file_index_ == 0) {
1159       // Already the first file
1160       SetFileIterator(nullptr);
1161       return;
1162     }
1163     InitFileIterator(file_index_ - 1);
1164     if (file_iter_.iter() != nullptr) {
1165       file_iter_.SeekToLast();
1166     }
1167   }
1168 }
1169 
SetFileIterator(InternalIterator * iter)1170 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1171   if (pinned_iters_mgr_ && iter) {
1172     iter->SetPinnedItersMgr(pinned_iters_mgr_);
1173   }
1174 
1175   InternalIterator* old_iter = file_iter_.Set(iter);
1176   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1177     pinned_iters_mgr_->PinIterator(old_iter);
1178   } else {
1179     delete old_iter;
1180   }
1181 }
1182 
InitFileIterator(size_t new_file_index)1183 void LevelIterator::InitFileIterator(size_t new_file_index) {
1184   if (new_file_index >= flevel_->num_files) {
1185     file_index_ = new_file_index;
1186     SetFileIterator(nullptr);
1187     return;
1188   } else {
1189     // If the file iterator shows incomplete, we try it again if users seek
1190     // to the same file, as this time we may go to a different data block
1191     // which is cached in block cache.
1192     //
1193     if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1194         new_file_index == file_index_) {
1195       // file_iter_ is already constructed with this iterator, so
1196       // no need to change anything
1197     } else {
1198       file_index_ = new_file_index;
1199       InternalIterator* iter = NewFileIterator();
1200       SetFileIterator(iter);
1201     }
1202   }
1203 }
1204 }  // anonymous namespace
1205 
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1206 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1207                                    const FileMetaData* file_meta,
1208                                    const std::string* fname) const {
1209   auto table_cache = cfd_->table_cache();
1210   auto ioptions = cfd_->ioptions();
1211   Status s = table_cache->GetTableProperties(
1212       file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1213       mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1214   if (s.ok()) {
1215     return s;
1216   }
1217 
1218   // We only ignore error type `Incomplete` since it's by design that we
1219   // disallow table when it's not in table cache.
1220   if (!s.IsIncomplete()) {
1221     return s;
1222   }
1223 
1224   // 2. Table is not present in table cache, we'll read the table properties
1225   // directly from the properties block in the file.
1226   std::unique_ptr<FSRandomAccessFile> file;
1227   std::string file_name;
1228   if (fname != nullptr) {
1229     file_name = *fname;
1230   } else {
1231     file_name =
1232       TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1233                     file_meta->fd.GetPathId());
1234   }
1235   s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1236                                         nullptr);
1237   if (!s.ok()) {
1238     return s;
1239   }
1240 
1241   TableProperties* raw_table_properties;
1242   // By setting the magic number to kInvalidTableMagicNumber, we can by
1243   // pass the magic number check in the footer.
1244   std::unique_ptr<RandomAccessFileReader> file_reader(
1245       new RandomAccessFileReader(
1246           std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
1247           0 /* hist_type */, nullptr /* file_read_hist */,
1248           nullptr /* rate_limiter */, ioptions->listeners));
1249   s = ReadTableProperties(
1250       file_reader.get(), file_meta->fd.GetFileSize(),
1251       Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1252       &raw_table_properties, false /* compression_type_missing */);
1253   if (!s.ok()) {
1254     return s;
1255   }
1256   RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1257 
1258   *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1259   return s;
1260 }
1261 
GetPropertiesOfAllTables(TablePropertiesCollection * props)1262 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1263   Status s;
1264   for (int level = 0; level < storage_info_.num_levels_; level++) {
1265     s = GetPropertiesOfAllTables(props, level);
1266     if (!s.ok()) {
1267       return s;
1268     }
1269   }
1270 
1271   return Status::OK();
1272 }
1273 
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1274 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1275                                             std::string* out_str) {
1276   if (max_entries_to_print <= 0) {
1277     return Status::OK();
1278   }
1279   int num_entries_left = max_entries_to_print;
1280 
1281   std::stringstream ss;
1282 
1283   for (int level = 0; level < storage_info_.num_levels_; level++) {
1284     for (const auto& file_meta : storage_info_.files_[level]) {
1285       auto fname =
1286           TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1287                         file_meta->fd.GetPathId());
1288 
1289       ss << "=== file : " << fname << " ===\n";
1290 
1291       TableCache* table_cache = cfd_->table_cache();
1292       std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1293 
1294       Status s = table_cache->GetRangeTombstoneIterator(
1295           ReadOptions(), cfd_->internal_comparator(), *file_meta,
1296           &tombstone_iter);
1297       if (!s.ok()) {
1298         return s;
1299       }
1300       if (tombstone_iter) {
1301         tombstone_iter->SeekToFirst();
1302 
1303         while (tombstone_iter->Valid() && num_entries_left > 0) {
1304           ss << "start: " << tombstone_iter->start_key().ToString(true)
1305              << " end: " << tombstone_iter->end_key().ToString(true)
1306              << " seq: " << tombstone_iter->seq() << '\n';
1307           tombstone_iter->Next();
1308           num_entries_left--;
1309         }
1310         if (num_entries_left <= 0) {
1311           break;
1312         }
1313       }
1314     }
1315     if (num_entries_left <= 0) {
1316       break;
1317     }
1318   }
1319   assert(num_entries_left >= 0);
1320   if (num_entries_left <= 0) {
1321     ss << "(results may not be complete)\n";
1322   }
1323 
1324   *out_str = ss.str();
1325   return Status::OK();
1326 }
1327 
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1328 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1329                                          int level) {
1330   for (const auto& file_meta : storage_info_.files_[level]) {
1331     auto fname =
1332         TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1333                       file_meta->fd.GetPathId());
1334     // 1. If the table is already present in table cache, load table
1335     // properties from there.
1336     std::shared_ptr<const TableProperties> table_properties;
1337     Status s = GetTableProperties(&table_properties, file_meta, &fname);
1338     if (s.ok()) {
1339       props->insert({fname, table_properties});
1340     } else {
1341       return s;
1342     }
1343   }
1344 
1345   return Status::OK();
1346 }
1347 
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1348 Status Version::GetPropertiesOfTablesInRange(
1349     const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1350   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1351     for (decltype(n) i = 0; i < n; i++) {
1352       // Convert user_key into a corresponding internal key.
1353       InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1354       InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1355       std::vector<FileMetaData*> files;
1356       storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1357                                          false);
1358       for (const auto& file_meta : files) {
1359         auto fname =
1360             TableFileName(cfd_->ioptions()->cf_paths,
1361                           file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1362         if (props->count(fname) == 0) {
1363           // 1. If the table is already present in table cache, load table
1364           // properties from there.
1365           std::shared_ptr<const TableProperties> table_properties;
1366           Status s = GetTableProperties(&table_properties, file_meta, &fname);
1367           if (s.ok()) {
1368             props->insert({fname, table_properties});
1369           } else {
1370             return s;
1371           }
1372         }
1373       }
1374     }
1375   }
1376 
1377   return Status::OK();
1378 }
1379 
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1380 Status Version::GetAggregatedTableProperties(
1381     std::shared_ptr<const TableProperties>* tp, int level) {
1382   TablePropertiesCollection props;
1383   Status s;
1384   if (level < 0) {
1385     s = GetPropertiesOfAllTables(&props);
1386   } else {
1387     s = GetPropertiesOfAllTables(&props, level);
1388   }
1389   if (!s.ok()) {
1390     return s;
1391   }
1392 
1393   auto* new_tp = new TableProperties();
1394   for (const auto& item : props) {
1395     new_tp->Add(*item.second);
1396   }
1397   tp->reset(new_tp);
1398   return Status::OK();
1399 }
1400 
GetMemoryUsageByTableReaders()1401 size_t Version::GetMemoryUsageByTableReaders() {
1402   size_t total_usage = 0;
1403   for (auto& file_level : storage_info_.level_files_brief_) {
1404     for (size_t i = 0; i < file_level.num_files; i++) {
1405       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1406           file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1407           mutable_cf_options_.prefix_extractor.get());
1408     }
1409   }
1410   return total_usage;
1411 }
1412 
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1413 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1414   assert(cf_meta);
1415   assert(cfd_);
1416 
1417   cf_meta->name = cfd_->GetName();
1418   cf_meta->size = 0;
1419   cf_meta->file_count = 0;
1420   cf_meta->levels.clear();
1421 
1422   auto* ioptions = cfd_->ioptions();
1423   auto* vstorage = storage_info();
1424 
1425   for (int level = 0; level < cfd_->NumberLevels(); level++) {
1426     uint64_t level_size = 0;
1427     cf_meta->file_count += vstorage->LevelFiles(level).size();
1428     std::vector<SstFileMetaData> files;
1429     for (const auto& file : vstorage->LevelFiles(level)) {
1430       uint32_t path_id = file->fd.GetPathId();
1431       std::string file_path;
1432       if (path_id < ioptions->cf_paths.size()) {
1433         file_path = ioptions->cf_paths[path_id].path;
1434       } else {
1435         assert(!ioptions->cf_paths.empty());
1436         file_path = ioptions->cf_paths.back().path;
1437       }
1438       const uint64_t file_number = file->fd.GetNumber();
1439       files.emplace_back(SstFileMetaData{
1440           MakeTableFileName("", file_number), file_number, file_path,
1441           static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1442           file->fd.largest_seqno, file->smallest.user_key().ToString(),
1443           file->largest.user_key().ToString(),
1444           file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1445           file->being_compacted, file->oldest_blob_file_number,
1446           file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
1447           file->file_checksum, file->file_checksum_func_name});
1448       files.back().num_entries = file->num_entries;
1449       files.back().num_deletions = file->num_deletions;
1450       level_size += file->fd.GetFileSize();
1451     }
1452     cf_meta->levels.emplace_back(
1453         level, level_size, std::move(files));
1454     cf_meta->size += level_size;
1455   }
1456 }
1457 
GetSstFilesSize()1458 uint64_t Version::GetSstFilesSize() {
1459   uint64_t sst_files_size = 0;
1460   for (int level = 0; level < storage_info_.num_levels_; level++) {
1461     for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1462       sst_files_size += file_meta->fd.GetFileSize();
1463     }
1464   }
1465   return sst_files_size;
1466 }
1467 
GetCreationTimeOfOldestFile(uint64_t * creation_time)1468 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1469   uint64_t oldest_time = port::kMaxUint64;
1470   for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1471     for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1472       assert(meta->fd.table_reader != nullptr);
1473       uint64_t file_creation_time = meta->TryGetFileCreationTime();
1474       if (file_creation_time == kUnknownFileCreationTime) {
1475         *creation_time = 0;
1476         return;
1477       }
1478       if (file_creation_time < oldest_time) {
1479         oldest_time = file_creation_time;
1480       }
1481     }
1482   }
1483   *creation_time = oldest_time;
1484 }
1485 
GetEstimatedActiveKeys() const1486 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1487   // Estimation will be inaccurate when:
1488   // (1) there exist merge keys
1489   // (2) keys are directly overwritten
1490   // (3) deletion on non-existing keys
1491   // (4) low number of samples
1492   if (current_num_samples_ == 0) {
1493     return 0;
1494   }
1495 
1496   if (current_num_non_deletions_ <= current_num_deletions_) {
1497     return 0;
1498   }
1499 
1500   uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1501 
1502   uint64_t file_count = 0;
1503   for (int level = 0; level < num_levels_; ++level) {
1504     file_count += files_[level].size();
1505   }
1506 
1507   if (current_num_samples_ < file_count) {
1508     // casting to avoid overflowing
1509     return
1510       static_cast<uint64_t>(
1511         (est * static_cast<double>(file_count) / current_num_samples_)
1512       );
1513   } else {
1514     return est;
1515   }
1516 }
1517 
GetEstimatedCompressionRatioAtLevel(int level) const1518 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1519     int level) const {
1520   assert(level < num_levels_);
1521   uint64_t sum_file_size_bytes = 0;
1522   uint64_t sum_data_size_bytes = 0;
1523   for (auto* file_meta : files_[level]) {
1524     sum_file_size_bytes += file_meta->fd.GetFileSize();
1525     sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1526   }
1527   if (sum_file_size_bytes == 0) {
1528     return -1.0;
1529   }
1530   return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1531 }
1532 
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg)1533 void Version::AddIterators(const ReadOptions& read_options,
1534                            const FileOptions& soptions,
1535                            MergeIteratorBuilder* merge_iter_builder,
1536                            RangeDelAggregator* range_del_agg) {
1537   assert(storage_info_.finalized_);
1538 
1539   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1540     AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1541                          range_del_agg);
1542   }
1543 }
1544 
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg)1545 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1546                                    const FileOptions& soptions,
1547                                    MergeIteratorBuilder* merge_iter_builder,
1548                                    int level,
1549                                    RangeDelAggregator* range_del_agg) {
1550   assert(storage_info_.finalized_);
1551   if (level >= storage_info_.num_non_empty_levels()) {
1552     // This is an empty level
1553     return;
1554   } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1555     // No files in this level
1556     return;
1557   }
1558 
1559   bool should_sample = should_sample_file_read();
1560 
1561   auto* arena = merge_iter_builder->GetArena();
1562   if (level == 0) {
1563     // Merge all level zero files together since they may overlap
1564     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1565       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1566       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1567           read_options, soptions, cfd_->internal_comparator(),
1568           *file.file_metadata, range_del_agg,
1569           mutable_cf_options_.prefix_extractor.get(), nullptr,
1570           cfd_->internal_stats()->GetFileReadHist(0),
1571           TableReaderCaller::kUserIterator, arena,
1572           /*skip_filters=*/false, /*level=*/0,
1573           /*smallest_compaction_key=*/nullptr,
1574           /*largest_compaction_key=*/nullptr));
1575     }
1576     if (should_sample) {
1577       // Count ones for every L0 files. This is done per iterator creation
1578       // rather than Seek(), while files in other levels are recored per seek.
1579       // If users execute one range query per iterator, there may be some
1580       // discrepancy here.
1581       for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1582         sample_file_read_inc(meta);
1583       }
1584     }
1585   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1586     // For levels > 0, we can use a concatenating iterator that sequentially
1587     // walks through the non-overlapping files in the level, opening them
1588     // lazily.
1589     auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1590     merge_iter_builder->AddIterator(new (mem) LevelIterator(
1591         cfd_->table_cache(), read_options, soptions,
1592         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1593         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1594         cfd_->internal_stats()->GetFileReadHist(level),
1595         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1596         range_del_agg, /*largest_compaction_key=*/nullptr));
1597   }
1598 }
1599 
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1600 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1601                                          const FileOptions& file_options,
1602                                          const Slice& smallest_user_key,
1603                                          const Slice& largest_user_key,
1604                                          int level, bool* overlap) {
1605   assert(storage_info_.finalized_);
1606 
1607   auto icmp = cfd_->internal_comparator();
1608   auto ucmp = icmp.user_comparator();
1609 
1610   Arena arena;
1611   Status status;
1612   ReadRangeDelAggregator range_del_agg(&icmp,
1613                                        kMaxSequenceNumber /* upper_bound */);
1614 
1615   *overlap = false;
1616 
1617   if (level == 0) {
1618     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1619       const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1620       if (AfterFile(ucmp, &smallest_user_key, file) ||
1621           BeforeFile(ucmp, &largest_user_key, file)) {
1622         continue;
1623       }
1624       ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1625           read_options, file_options, cfd_->internal_comparator(),
1626           *file->file_metadata, &range_del_agg,
1627           mutable_cf_options_.prefix_extractor.get(), nullptr,
1628           cfd_->internal_stats()->GetFileReadHist(0),
1629           TableReaderCaller::kUserIterator, &arena,
1630           /*skip_filters=*/false, /*level=*/0,
1631           /*smallest_compaction_key=*/nullptr,
1632           /*largest_compaction_key=*/nullptr));
1633       status = OverlapWithIterator(
1634           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1635       if (!status.ok() || *overlap) {
1636         break;
1637       }
1638     }
1639   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1640     auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1641     ScopedArenaIterator iter(new (mem) LevelIterator(
1642         cfd_->table_cache(), read_options, file_options,
1643         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1644         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1645         cfd_->internal_stats()->GetFileReadHist(level),
1646         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1647         &range_del_agg));
1648     status = OverlapWithIterator(
1649         ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1650   }
1651 
1652   if (status.ok() && *overlap == false &&
1653       range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1654     *overlap = true;
1655   }
1656   return status;
1657 }
1658 
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1659 VersionStorageInfo::VersionStorageInfo(
1660     const InternalKeyComparator* internal_comparator,
1661     const Comparator* user_comparator, int levels,
1662     CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1663     bool _force_consistency_checks)
1664     : internal_comparator_(internal_comparator),
1665       user_comparator_(user_comparator),
1666       // cfd is nullptr if Version is dummy
1667       num_levels_(levels),
1668       num_non_empty_levels_(0),
1669       file_indexer_(user_comparator),
1670       compaction_style_(compaction_style),
1671       files_(new std::vector<FileMetaData*>[num_levels_]),
1672       base_level_(num_levels_ == 1 ? -1 : 1),
1673       level_multiplier_(0.0),
1674       files_by_compaction_pri_(num_levels_),
1675       level0_non_overlapping_(false),
1676       next_file_to_compact_by_size_(num_levels_),
1677       compaction_score_(num_levels_),
1678       compaction_level_(num_levels_),
1679       l0_delay_trigger_count_(0),
1680       accumulated_file_size_(0),
1681       accumulated_raw_key_size_(0),
1682       accumulated_raw_value_size_(0),
1683       accumulated_num_non_deletions_(0),
1684       accumulated_num_deletions_(0),
1685       current_num_non_deletions_(0),
1686       current_num_deletions_(0),
1687       current_num_samples_(0),
1688       estimated_compaction_needed_bytes_(0),
1689       finalized_(false),
1690       force_consistency_checks_(_force_consistency_checks) {
1691   if (ref_vstorage != nullptr) {
1692     accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1693     accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1694     accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1695     accumulated_num_non_deletions_ =
1696         ref_vstorage->accumulated_num_non_deletions_;
1697     accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1698     current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1699     current_num_deletions_ = ref_vstorage->current_num_deletions_;
1700     current_num_samples_ = ref_vstorage->current_num_samples_;
1701     oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1702   }
1703 }
1704 
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,uint64_t version_number)1705 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1706                  const FileOptions& file_opt,
1707                  const MutableCFOptions mutable_cf_options,
1708                  uint64_t version_number)
1709     : env_(vset->env_),
1710       cfd_(column_family_data),
1711       info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1712       db_statistics_((cfd_ == nullptr) ? nullptr
1713                                        : cfd_->ioptions()->statistics),
1714       table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1715       merge_operator_((cfd_ == nullptr) ? nullptr
1716                                         : cfd_->ioptions()->merge_operator),
1717       storage_info_(
1718           (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1719           (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1720           cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1721           cfd_ == nullptr ? kCompactionStyleLevel
1722                           : cfd_->ioptions()->compaction_style,
1723           (cfd_ == nullptr || cfd_->current() == nullptr)
1724               ? nullptr
1725               : cfd_->current()->storage_info(),
1726           cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1727       vset_(vset),
1728       next_(this),
1729       prev_(this),
1730       refs_(0),
1731       file_options_(file_opt),
1732       mutable_cf_options_(mutable_cf_options),
1733       version_number_(version_number) {}
1734 
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,std::string * timestamp,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1735 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1736                   PinnableSlice* value, std::string* timestamp, Status* status,
1737                   MergeContext* merge_context,
1738                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1739                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1740                   bool* is_blob, bool do_merge) {
1741   Slice ikey = k.internal_key();
1742   Slice user_key = k.user_key();
1743 
1744   assert(status->ok() || status->IsMergeInProgress());
1745 
1746   if (key_exists != nullptr) {
1747     // will falsify below if not found
1748     *key_exists = true;
1749   }
1750 
1751   PinnedIteratorsManager pinned_iters_mgr;
1752   uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1753   if (vset_ && vset_->block_cache_tracer_ &&
1754       vset_->block_cache_tracer_->is_tracing_enabled()) {
1755     tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1756   }
1757   GetContext get_context(
1758       user_comparator(), merge_operator_, info_log_, db_statistics_,
1759       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1760       do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
1761       merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
1762       merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1763       tracing_get_id);
1764 
1765   // Pin blocks that we read to hold merge operands
1766   if (merge_operator_) {
1767     pinned_iters_mgr.StartPinning();
1768   }
1769 
1770   FilePicker fp(
1771       storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1772       storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1773       user_comparator(), internal_comparator());
1774   FdWithKeyRange* f = fp.GetNextFile();
1775 
1776   while (f != nullptr) {
1777     if (*max_covering_tombstone_seq > 0) {
1778       // The remaining files we look at will only contain covered keys, so we
1779       // stop here.
1780       break;
1781     }
1782     if (get_context.sample()) {
1783       sample_file_read_inc(f->file_metadata);
1784     }
1785 
1786     bool timer_enabled =
1787         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1788         get_perf_context()->per_level_perf_context_enabled;
1789     StopWatchNano timer(env_, timer_enabled /* auto_start */);
1790     *status = table_cache_->Get(
1791         read_options, *internal_comparator(), *f->file_metadata, ikey,
1792         &get_context, mutable_cf_options_.prefix_extractor.get(),
1793         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1794         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1795                         fp.IsHitFileLastInLevel()),
1796         fp.GetCurrentLevel());
1797     // TODO: examine the behavior for corrupted key
1798     if (timer_enabled) {
1799       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1800                                 fp.GetCurrentLevel());
1801     }
1802     if (!status->ok()) {
1803       return;
1804     }
1805 
1806     // report the counters before returning
1807     if (get_context.State() != GetContext::kNotFound &&
1808         get_context.State() != GetContext::kMerge &&
1809         db_statistics_ != nullptr) {
1810       get_context.ReportCounters();
1811     }
1812     switch (get_context.State()) {
1813       case GetContext::kNotFound:
1814         // Keep searching in other files
1815         break;
1816       case GetContext::kMerge:
1817         // TODO: update per-level perfcontext user_key_return_count for kMerge
1818         break;
1819       case GetContext::kFound:
1820         if (fp.GetHitFileLevel() == 0) {
1821           RecordTick(db_statistics_, GET_HIT_L0);
1822         } else if (fp.GetHitFileLevel() == 1) {
1823           RecordTick(db_statistics_, GET_HIT_L1);
1824         } else if (fp.GetHitFileLevel() >= 2) {
1825           RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1826         }
1827         PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1828                                   fp.GetHitFileLevel());
1829         return;
1830       case GetContext::kDeleted:
1831         // Use empty error message for speed
1832         *status = Status::NotFound();
1833         return;
1834       case GetContext::kCorrupt:
1835         *status = Status::Corruption("corrupted key for ", user_key);
1836         return;
1837       case GetContext::kBlobIndex:
1838         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1839         *status = Status::NotSupported(
1840             "Encounter unexpected blob index. Please open DB with "
1841             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1842         return;
1843     }
1844     f = fp.GetNextFile();
1845   }
1846   if (db_statistics_ != nullptr) {
1847     get_context.ReportCounters();
1848   }
1849   if (GetContext::kMerge == get_context.State()) {
1850     if (!do_merge) {
1851       *status = Status::OK();
1852       return;
1853     }
1854     if (!merge_operator_) {
1855       *status =  Status::InvalidArgument(
1856           "merge_operator is not properly initialized.");
1857       return;
1858     }
1859     // merge_operands are in saver and we hit the beginning of the key history
1860     // do a final merge of nullptr and operands;
1861     std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1862     *status = MergeHelper::TimedFullMerge(
1863         merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1864         str_value, info_log_, db_statistics_, env_,
1865         nullptr /* result_operand */, true);
1866     if (LIKELY(value != nullptr)) {
1867       value->PinSelf();
1868     }
1869   } else {
1870     if (key_exists != nullptr) {
1871       *key_exists = false;
1872     }
1873     *status = Status::NotFound(); // Use an empty error message for speed
1874   }
1875 }
1876 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)1877 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1878                        ReadCallback* callback, bool* is_blob) {
1879   PinnedIteratorsManager pinned_iters_mgr;
1880 
1881   // Pin blocks that we read to hold merge operands
1882   if (merge_operator_) {
1883     pinned_iters_mgr.StartPinning();
1884   }
1885   uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
1886 
1887   if (vset_ && vset_->block_cache_tracer_ &&
1888       vset_->block_cache_tracer_->is_tracing_enabled()) {
1889     tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
1890   }
1891   // Even though we know the batch size won't be > MAX_BATCH_SIZE,
1892   // use autovector in order to avoid unnecessary construction of GetContext
1893   // objects, which is expensive
1894   autovector<GetContext, 16> get_ctx;
1895   for (auto iter = range->begin(); iter != range->end(); ++iter) {
1896     assert(iter->s->ok() || iter->s->IsMergeInProgress());
1897     get_ctx.emplace_back(
1898         user_comparator(), merge_operator_, info_log_, db_statistics_,
1899         iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
1900         iter->value, iter->timestamp, nullptr, &(iter->merge_context), true,
1901         &iter->max_covering_tombstone_seq, this->env_, nullptr,
1902         merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1903         tracing_mget_id);
1904     // MergeInProgress status, if set, has been transferred to the get_context
1905     // state, so we set status to ok here. From now on, the iter status will
1906     // be used for IO errors, and get_context state will be used for any
1907     // key level errors
1908     *(iter->s) = Status::OK();
1909   }
1910   int get_ctx_index = 0;
1911   for (auto iter = range->begin(); iter != range->end();
1912        ++iter, get_ctx_index++) {
1913     iter->get_context = &(get_ctx[get_ctx_index]);
1914   }
1915 
1916   MultiGetRange file_picker_range(*range, range->begin(), range->end());
1917   FilePickerMultiGet fp(
1918       &file_picker_range,
1919       &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
1920       &storage_info_.file_indexer_, user_comparator(), internal_comparator());
1921   FdWithKeyRange* f = fp.GetNextFile();
1922 
1923   while (f != nullptr) {
1924     MultiGetRange file_range = fp.CurrentFileRange();
1925     bool timer_enabled =
1926         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1927         get_perf_context()->per_level_perf_context_enabled;
1928     StopWatchNano timer(env_, timer_enabled /* auto_start */);
1929     Status s = table_cache_->MultiGet(
1930         read_options, *internal_comparator(), *f->file_metadata, &file_range,
1931         mutable_cf_options_.prefix_extractor.get(),
1932         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1933         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1934                         fp.IsHitFileLastInLevel()),
1935         fp.GetCurrentLevel());
1936     // TODO: examine the behavior for corrupted key
1937     if (timer_enabled) {
1938       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1939                                 fp.GetCurrentLevel());
1940     }
1941     if (!s.ok()) {
1942       // TODO: Set status for individual keys appropriately
1943       for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1944         *iter->s = s;
1945         file_range.MarkKeyDone(iter);
1946       }
1947       return;
1948     }
1949     uint64_t batch_size = 0;
1950     for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1951       GetContext& get_context = *iter->get_context;
1952       Status* status = iter->s;
1953       // The Status in the KeyContext takes precedence over GetContext state
1954       // Status may be an error if there were any IO errors in the table
1955       // reader. We never expect Status to be NotFound(), as that is
1956       // determined by get_context
1957       assert(!status->IsNotFound());
1958       if (!status->ok()) {
1959         file_range.MarkKeyDone(iter);
1960         continue;
1961       }
1962 
1963       if (get_context.sample()) {
1964         sample_file_read_inc(f->file_metadata);
1965       }
1966       batch_size++;
1967       // report the counters before returning
1968       if (get_context.State() != GetContext::kNotFound &&
1969           get_context.State() != GetContext::kMerge &&
1970           db_statistics_ != nullptr) {
1971         get_context.ReportCounters();
1972       } else {
1973         if (iter->max_covering_tombstone_seq > 0) {
1974           // The remaining files we look at will only contain covered keys, so
1975           // we stop here for this key
1976           file_picker_range.SkipKey(iter);
1977         }
1978       }
1979       switch (get_context.State()) {
1980         case GetContext::kNotFound:
1981           // Keep searching in other files
1982           break;
1983         case GetContext::kMerge:
1984           // TODO: update per-level perfcontext user_key_return_count for kMerge
1985           break;
1986         case GetContext::kFound:
1987           if (fp.GetHitFileLevel() == 0) {
1988             RecordTick(db_statistics_, GET_HIT_L0);
1989           } else if (fp.GetHitFileLevel() == 1) {
1990             RecordTick(db_statistics_, GET_HIT_L1);
1991           } else if (fp.GetHitFileLevel() >= 2) {
1992             RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1993           }
1994           PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1995                                     fp.GetHitFileLevel());
1996           file_range.MarkKeyDone(iter);
1997           continue;
1998         case GetContext::kDeleted:
1999           // Use empty error message for speed
2000           *status = Status::NotFound();
2001           file_range.MarkKeyDone(iter);
2002           continue;
2003         case GetContext::kCorrupt:
2004           *status =
2005               Status::Corruption("corrupted key for ", iter->lkey->user_key());
2006           file_range.MarkKeyDone(iter);
2007           continue;
2008         case GetContext::kBlobIndex:
2009           ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2010           *status = Status::NotSupported(
2011               "Encounter unexpected blob index. Please open DB with "
2012               "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2013           file_range.MarkKeyDone(iter);
2014           continue;
2015       }
2016     }
2017     RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2018     if (file_picker_range.empty()) {
2019       break;
2020     }
2021     f = fp.GetNextFile();
2022   }
2023 
2024   // Process any left over keys
2025   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2026     GetContext& get_context = *iter->get_context;
2027     Status* status = iter->s;
2028     Slice user_key = iter->lkey->user_key();
2029 
2030     if (db_statistics_ != nullptr) {
2031       get_context.ReportCounters();
2032     }
2033     if (GetContext::kMerge == get_context.State()) {
2034       if (!merge_operator_) {
2035         *status = Status::InvalidArgument(
2036             "merge_operator is not properly initialized.");
2037         range->MarkKeyDone(iter);
2038         continue;
2039       }
2040       // merge_operands are in saver and we hit the beginning of the key history
2041       // do a final merge of nullptr and operands;
2042       std::string* str_value =
2043           iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2044       *status = MergeHelper::TimedFullMerge(
2045           merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2046           str_value, info_log_, db_statistics_, env_,
2047           nullptr /* result_operand */, true);
2048       if (LIKELY(iter->value != nullptr)) {
2049         iter->value->PinSelf();
2050       }
2051     } else {
2052       range->MarkKeyDone(iter);
2053       *status = Status::NotFound();  // Use an empty error message for speed
2054     }
2055   }
2056 }
2057 
IsFilterSkipped(int level,bool is_file_last_in_level)2058 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2059   // Reaching the bottom level implies misses at all upper levels, so we'll
2060   // skip checking the filters when we predict a hit.
2061   return cfd_->ioptions()->optimize_filters_for_hits &&
2062          (level > 0 || is_file_last_in_level) &&
2063          level == storage_info_.num_non_empty_levels() - 1;
2064 }
2065 
GenerateLevelFilesBrief()2066 void VersionStorageInfo::GenerateLevelFilesBrief() {
2067   level_files_brief_.resize(num_non_empty_levels_);
2068   for (int level = 0; level < num_non_empty_levels_; level++) {
2069     DoGenerateLevelFilesBrief(
2070         &level_files_brief_[level], files_[level], &arena_);
2071   }
2072 }
2073 
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2074 void Version::PrepareApply(
2075     const MutableCFOptions& mutable_cf_options,
2076     bool update_stats) {
2077   UpdateAccumulatedStats(update_stats);
2078   storage_info_.UpdateNumNonEmptyLevels();
2079   storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2080   storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2081   storage_info_.GenerateFileIndexer();
2082   storage_info_.GenerateLevelFilesBrief();
2083   storage_info_.GenerateLevel0NonOverlapping();
2084   storage_info_.GenerateBottommostFiles();
2085 }
2086 
MaybeInitializeFileMetaData(FileMetaData * file_meta)2087 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2088   if (file_meta->init_stats_from_file ||
2089       file_meta->compensated_file_size > 0) {
2090     return false;
2091   }
2092   std::shared_ptr<const TableProperties> tp;
2093   Status s = GetTableProperties(&tp, file_meta);
2094   file_meta->init_stats_from_file = true;
2095   if (!s.ok()) {
2096     ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2097                     "Unable to load table properties for file %" PRIu64
2098                     " --- %s\n",
2099                     file_meta->fd.GetNumber(), s.ToString().c_str());
2100     return false;
2101   }
2102   if (tp.get() == nullptr) return false;
2103   file_meta->num_entries = tp->num_entries;
2104   file_meta->num_deletions = tp->num_deletions;
2105   file_meta->raw_value_size = tp->raw_value_size;
2106   file_meta->raw_key_size = tp->raw_key_size;
2107 
2108   return true;
2109 }
2110 
UpdateAccumulatedStats(FileMetaData * file_meta)2111 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2112   TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2113                            nullptr);
2114 
2115   assert(file_meta->init_stats_from_file);
2116   accumulated_file_size_ += file_meta->fd.GetFileSize();
2117   accumulated_raw_key_size_ += file_meta->raw_key_size;
2118   accumulated_raw_value_size_ += file_meta->raw_value_size;
2119   accumulated_num_non_deletions_ +=
2120       file_meta->num_entries - file_meta->num_deletions;
2121   accumulated_num_deletions_ += file_meta->num_deletions;
2122 
2123   current_num_non_deletions_ +=
2124       file_meta->num_entries - file_meta->num_deletions;
2125   current_num_deletions_ += file_meta->num_deletions;
2126   current_num_samples_++;
2127 }
2128 
RemoveCurrentStats(FileMetaData * file_meta)2129 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2130   if (file_meta->init_stats_from_file) {
2131     current_num_non_deletions_ -=
2132         file_meta->num_entries - file_meta->num_deletions;
2133     current_num_deletions_ -= file_meta->num_deletions;
2134     current_num_samples_--;
2135   }
2136 }
2137 
UpdateAccumulatedStats(bool update_stats)2138 void Version::UpdateAccumulatedStats(bool update_stats) {
2139   if (update_stats) {
2140     // maximum number of table properties loaded from files.
2141     const int kMaxInitCount = 20;
2142     int init_count = 0;
2143     // here only the first kMaxInitCount files which haven't been
2144     // initialized from file will be updated with num_deletions.
2145     // The motivation here is to cap the maximum I/O per Version creation.
2146     // The reason for choosing files from lower-level instead of higher-level
2147     // is that such design is able to propagate the initialization from
2148     // lower-level to higher-level:  When the num_deletions of lower-level
2149     // files are updated, it will make the lower-level files have accurate
2150     // compensated_file_size, making lower-level to higher-level compaction
2151     // will be triggered, which creates higher-level files whose num_deletions
2152     // will be updated here.
2153     for (int level = 0;
2154          level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2155          ++level) {
2156       for (auto* file_meta : storage_info_.files_[level]) {
2157         if (MaybeInitializeFileMetaData(file_meta)) {
2158           // each FileMeta will be initialized only once.
2159           storage_info_.UpdateAccumulatedStats(file_meta);
2160           // when option "max_open_files" is -1, all the file metadata has
2161           // already been read, so MaybeInitializeFileMetaData() won't incur
2162           // any I/O cost. "max_open_files=-1" means that the table cache passed
2163           // to the VersionSet and then to the ColumnFamilySet has a size of
2164           // TableCache::kInfiniteCapacity
2165           if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2166               TableCache::kInfiniteCapacity) {
2167             continue;
2168           }
2169           if (++init_count >= kMaxInitCount) {
2170             break;
2171           }
2172         }
2173       }
2174     }
2175     // In case all sampled-files contain only deletion entries, then we
2176     // load the table-property of a file in higher-level to initialize
2177     // that value.
2178     for (int level = storage_info_.num_levels_ - 1;
2179          storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2180          --level) {
2181       for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2182            storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2183         if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2184           storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2185         }
2186       }
2187     }
2188   }
2189 
2190   storage_info_.ComputeCompensatedSizes();
2191 }
2192 
ComputeCompensatedSizes()2193 void VersionStorageInfo::ComputeCompensatedSizes() {
2194   static const int kDeletionWeightOnCompaction = 2;
2195   uint64_t average_value_size = GetAverageValueSize();
2196 
2197   // compute the compensated size
2198   for (int level = 0; level < num_levels_; level++) {
2199     for (auto* file_meta : files_[level]) {
2200       // Here we only compute compensated_file_size for those file_meta
2201       // which compensated_file_size is uninitialized (== 0). This is true only
2202       // for files that have been created right now and no other thread has
2203       // access to them. That's why we can safely mutate compensated_file_size.
2204       if (file_meta->compensated_file_size == 0) {
2205         file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2206         // Here we only boost the size of deletion entries of a file only
2207         // when the number of deletion entries is greater than the number of
2208         // non-deletion entries in the file.  The motivation here is that in
2209         // a stable workload, the number of deletion entries should be roughly
2210         // equal to the number of non-deletion entries.  If we compensate the
2211         // size of deletion entries in a stable workload, the deletion
2212         // compensation logic might introduce unwanted effet which changes the
2213         // shape of LSM tree.
2214         if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2215           file_meta->compensated_file_size +=
2216               (file_meta->num_deletions * 2 - file_meta->num_entries) *
2217               average_value_size * kDeletionWeightOnCompaction;
2218         }
2219       }
2220     }
2221   }
2222 }
2223 
MaxInputLevel() const2224 int VersionStorageInfo::MaxInputLevel() const {
2225   if (compaction_style_ == kCompactionStyleLevel) {
2226     return num_levels() - 2;
2227   }
2228   return 0;
2229 }
2230 
MaxOutputLevel(bool allow_ingest_behind) const2231 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2232   if (allow_ingest_behind) {
2233     assert(num_levels() > 1);
2234     return num_levels() - 2;
2235   }
2236   return num_levels() - 1;
2237 }
2238 
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2239 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2240     const MutableCFOptions& mutable_cf_options) {
2241   // Only implemented for level-based compaction
2242   if (compaction_style_ != kCompactionStyleLevel) {
2243     estimated_compaction_needed_bytes_ = 0;
2244     return;
2245   }
2246 
2247   // Start from Level 0, if level 0 qualifies compaction to level 1,
2248   // we estimate the size of compaction.
2249   // Then we move on to the next level and see whether it qualifies compaction
2250   // to the next level. The size of the level is estimated as the actual size
2251   // on the level plus the input bytes from the previous level if there is any.
2252   // If it exceeds, take the exceeded bytes as compaction input and add the size
2253   // of the compaction size to tatal size.
2254   // We keep doing it to Level 2, 3, etc, until the last level and return the
2255   // accumulated bytes.
2256 
2257   uint64_t bytes_compact_to_next_level = 0;
2258   uint64_t level_size = 0;
2259   for (auto* f : files_[0]) {
2260     level_size += f->fd.GetFileSize();
2261   }
2262   // Level 0
2263   bool level0_compact_triggered = false;
2264   if (static_cast<int>(files_[0].size()) >=
2265           mutable_cf_options.level0_file_num_compaction_trigger ||
2266       level_size >= mutable_cf_options.max_bytes_for_level_base) {
2267     level0_compact_triggered = true;
2268     estimated_compaction_needed_bytes_ = level_size;
2269     bytes_compact_to_next_level = level_size;
2270   } else {
2271     estimated_compaction_needed_bytes_ = 0;
2272   }
2273 
2274   // Level 1 and up.
2275   uint64_t bytes_next_level = 0;
2276   for (int level = base_level(); level <= MaxInputLevel(); level++) {
2277     level_size = 0;
2278     if (bytes_next_level > 0) {
2279 #ifndef NDEBUG
2280       uint64_t level_size2 = 0;
2281       for (auto* f : files_[level]) {
2282         level_size2 += f->fd.GetFileSize();
2283       }
2284       assert(level_size2 == bytes_next_level);
2285 #endif
2286       level_size = bytes_next_level;
2287       bytes_next_level = 0;
2288     } else {
2289       for (auto* f : files_[level]) {
2290         level_size += f->fd.GetFileSize();
2291       }
2292     }
2293     if (level == base_level() && level0_compact_triggered) {
2294       // Add base level size to compaction if level0 compaction triggered.
2295       estimated_compaction_needed_bytes_ += level_size;
2296     }
2297     // Add size added by previous compaction
2298     level_size += bytes_compact_to_next_level;
2299     bytes_compact_to_next_level = 0;
2300     uint64_t level_target = MaxBytesForLevel(level);
2301     if (level_size > level_target) {
2302       bytes_compact_to_next_level = level_size - level_target;
2303       // Estimate the actual compaction fan-out ratio as size ratio between
2304       // the two levels.
2305 
2306       assert(bytes_next_level == 0);
2307       if (level + 1 < num_levels_) {
2308         for (auto* f : files_[level + 1]) {
2309           bytes_next_level += f->fd.GetFileSize();
2310         }
2311       }
2312       if (bytes_next_level > 0) {
2313         assert(level_size > 0);
2314         estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2315             static_cast<double>(bytes_compact_to_next_level) *
2316             (static_cast<double>(bytes_next_level) /
2317                  static_cast<double>(level_size) +
2318              1));
2319       }
2320     }
2321   }
2322 }
2323 
2324 namespace {
GetExpiredTtlFilesCount(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2325 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2326                                  const MutableCFOptions& mutable_cf_options,
2327                                  const std::vector<FileMetaData*>& files) {
2328   uint32_t ttl_expired_files_count = 0;
2329 
2330   int64_t _current_time;
2331   auto status = ioptions.env->GetCurrentTime(&_current_time);
2332   if (status.ok()) {
2333     const uint64_t current_time = static_cast<uint64_t>(_current_time);
2334     for (FileMetaData* f : files) {
2335       if (!f->being_compacted) {
2336         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2337         if (oldest_ancester_time != 0 &&
2338             oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2339           ttl_expired_files_count++;
2340         }
2341       }
2342     }
2343   }
2344   return ttl_expired_files_count;
2345 }
2346 }  // anonymous namespace
2347 
ComputeCompactionScore(const ImmutableCFOptions & immutable_cf_options,const MutableCFOptions & mutable_cf_options)2348 void VersionStorageInfo::ComputeCompactionScore(
2349     const ImmutableCFOptions& immutable_cf_options,
2350     const MutableCFOptions& mutable_cf_options) {
2351   for (int level = 0; level <= MaxInputLevel(); level++) {
2352     double score;
2353     if (level == 0) {
2354       // We treat level-0 specially by bounding the number of files
2355       // instead of number of bytes for two reasons:
2356       //
2357       // (1) With larger write-buffer sizes, it is nice not to do too
2358       // many level-0 compactions.
2359       //
2360       // (2) The files in level-0 are merged on every read and
2361       // therefore we wish to avoid too many files when the individual
2362       // file size is small (perhaps because of a small write-buffer
2363       // setting, or very high compression ratios, or lots of
2364       // overwrites/deletions).
2365       int num_sorted_runs = 0;
2366       uint64_t total_size = 0;
2367       for (auto* f : files_[level]) {
2368         if (!f->being_compacted) {
2369           total_size += f->compensated_file_size;
2370           num_sorted_runs++;
2371         }
2372       }
2373       if (compaction_style_ == kCompactionStyleUniversal) {
2374         // For universal compaction, we use level0 score to indicate
2375         // compaction score for the whole DB. Adding other levels as if
2376         // they are L0 files.
2377         for (int i = 1; i < num_levels(); i++) {
2378           if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2379             num_sorted_runs++;
2380           }
2381         }
2382       }
2383 
2384       if (compaction_style_ == kCompactionStyleFIFO) {
2385         score = static_cast<double>(total_size) /
2386                 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2387         if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2388           score = std::max(
2389               static_cast<double>(num_sorted_runs) /
2390                   mutable_cf_options.level0_file_num_compaction_trigger,
2391               score);
2392         }
2393         if (mutable_cf_options.ttl > 0) {
2394           score = std::max(
2395               static_cast<double>(GetExpiredTtlFilesCount(
2396                   immutable_cf_options, mutable_cf_options, files_[level])),
2397               score);
2398         }
2399 
2400       } else {
2401         score = static_cast<double>(num_sorted_runs) /
2402                 mutable_cf_options.level0_file_num_compaction_trigger;
2403         if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2404           // Level-based involves L0->L0 compactions that can lead to oversized
2405           // L0 files. Take into account size as well to avoid later giant
2406           // compactions to the base level.
2407           score = std::max(
2408               score, static_cast<double>(total_size) /
2409                      mutable_cf_options.max_bytes_for_level_base);
2410         }
2411       }
2412     } else {
2413       // Compute the ratio of current size to size limit.
2414       uint64_t level_bytes_no_compacting = 0;
2415       for (auto f : files_[level]) {
2416         if (!f->being_compacted) {
2417           level_bytes_no_compacting += f->compensated_file_size;
2418         }
2419       }
2420       score = static_cast<double>(level_bytes_no_compacting) /
2421               MaxBytesForLevel(level);
2422     }
2423     compaction_level_[level] = level;
2424     compaction_score_[level] = score;
2425   }
2426 
2427   // sort all the levels based on their score. Higher scores get listed
2428   // first. Use bubble sort because the number of entries are small.
2429   for (int i = 0; i < num_levels() - 2; i++) {
2430     for (int j = i + 1; j < num_levels() - 1; j++) {
2431       if (compaction_score_[i] < compaction_score_[j]) {
2432         double score = compaction_score_[i];
2433         int level = compaction_level_[i];
2434         compaction_score_[i] = compaction_score_[j];
2435         compaction_level_[i] = compaction_level_[j];
2436         compaction_score_[j] = score;
2437         compaction_level_[j] = level;
2438       }
2439     }
2440   }
2441   ComputeFilesMarkedForCompaction();
2442   ComputeBottommostFilesMarkedForCompaction();
2443   if (mutable_cf_options.ttl > 0) {
2444     ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2445   }
2446   if (mutable_cf_options.periodic_compaction_seconds > 0) {
2447     ComputeFilesMarkedForPeriodicCompaction(
2448         immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2449   }
2450   EstimateCompactionBytesNeeded(mutable_cf_options);
2451 }
2452 
ComputeFilesMarkedForCompaction()2453 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2454   files_marked_for_compaction_.clear();
2455   int last_qualify_level = 0;
2456 
2457   // Do not include files from the last level with data
2458   // If table properties collector suggests a file on the last level,
2459   // we should not move it to a new level.
2460   for (int level = num_levels() - 1; level >= 1; level--) {
2461     if (!files_[level].empty()) {
2462       last_qualify_level = level - 1;
2463       break;
2464     }
2465   }
2466 
2467   for (int level = 0; level <= last_qualify_level; level++) {
2468     for (auto* f : files_[level]) {
2469       if (!f->being_compacted && f->marked_for_compaction) {
2470         files_marked_for_compaction_.emplace_back(level, f);
2471       }
2472     }
2473   }
2474 }
2475 
ComputeExpiredTtlFiles(const ImmutableCFOptions & ioptions,const uint64_t ttl)2476 void VersionStorageInfo::ComputeExpiredTtlFiles(
2477     const ImmutableCFOptions& ioptions, const uint64_t ttl) {
2478   assert(ttl > 0);
2479 
2480   expired_ttl_files_.clear();
2481 
2482   int64_t _current_time;
2483   auto status = ioptions.env->GetCurrentTime(&_current_time);
2484   if (!status.ok()) {
2485     return;
2486   }
2487   const uint64_t current_time = static_cast<uint64_t>(_current_time);
2488 
2489   for (int level = 0; level < num_levels() - 1; level++) {
2490     for (FileMetaData* f : files_[level]) {
2491       if (!f->being_compacted) {
2492         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2493         if (oldest_ancester_time > 0 &&
2494             oldest_ancester_time < (current_time - ttl)) {
2495           expired_ttl_files_.emplace_back(level, f);
2496         }
2497       }
2498     }
2499   }
2500 }
2501 
ComputeFilesMarkedForPeriodicCompaction(const ImmutableCFOptions & ioptions,const uint64_t periodic_compaction_seconds)2502 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2503     const ImmutableCFOptions& ioptions,
2504     const uint64_t periodic_compaction_seconds) {
2505   assert(periodic_compaction_seconds > 0);
2506 
2507   files_marked_for_periodic_compaction_.clear();
2508 
2509   int64_t temp_current_time;
2510   auto status = ioptions.env->GetCurrentTime(&temp_current_time);
2511   if (!status.ok()) {
2512     return;
2513   }
2514   const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2515 
2516   // If periodic_compaction_seconds is larger than current time, periodic
2517   // compaction can't possibly be triggered.
2518   if (periodic_compaction_seconds > current_time) {
2519     return;
2520   }
2521 
2522   const uint64_t allowed_time_limit =
2523       current_time - periodic_compaction_seconds;
2524 
2525   for (int level = 0; level < num_levels(); level++) {
2526     for (auto f : files_[level]) {
2527       if (!f->being_compacted) {
2528         // Compute a file's modification time in the following order:
2529         // 1. Use file_creation_time table property if it is > 0.
2530         // 2. Use creation_time table property if it is > 0.
2531         // 3. Use file's mtime metadata if the above two table properties are 0.
2532         // Don't consider the file at all if the modification time cannot be
2533         // correctly determined based on the above conditions.
2534         uint64_t file_modification_time = f->TryGetFileCreationTime();
2535         if (file_modification_time == kUnknownFileCreationTime) {
2536           file_modification_time = f->TryGetOldestAncesterTime();
2537         }
2538         if (file_modification_time == kUnknownOldestAncesterTime) {
2539           auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2540                                          f->fd.GetPathId());
2541           status = ioptions.env->GetFileModificationTime(
2542               file_path, &file_modification_time);
2543           if (!status.ok()) {
2544             ROCKS_LOG_WARN(ioptions.info_log,
2545                            "Can't get file modification time: %s: %s",
2546                            file_path.c_str(), status.ToString().c_str());
2547             continue;
2548           }
2549         }
2550         if (file_modification_time > 0 &&
2551             file_modification_time < allowed_time_limit) {
2552           files_marked_for_periodic_compaction_.emplace_back(level, f);
2553         }
2554       }
2555     }
2556   }
2557 }
2558 
2559 namespace {
2560 
2561 // used to sort files by size
2562 struct Fsize {
2563   size_t index;
2564   FileMetaData* file;
2565 };
2566 
2567 // Compator that is used to sort files based on their size
2568 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)2569 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2570   return (first.file->compensated_file_size >
2571       second.file->compensated_file_size);
2572 }
2573 } // anonymous namespace
2574 
AddFile(int level,FileMetaData * f,Logger * info_log)2575 void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
2576   auto* level_files = &files_[level];
2577   // Must not overlap
2578 #ifndef NDEBUG
2579   if (level > 0 && !level_files->empty() &&
2580       internal_comparator_->Compare(
2581           (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
2582     auto* f2 = (*level_files)[level_files->size() - 1];
2583     if (info_log != nullptr) {
2584       Error(info_log, "Adding new file %" PRIu64
2585                       " range (%s, %s) to level %d but overlapping "
2586                       "with existing file %" PRIu64 " %s %s",
2587             f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
2588             f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
2589             f2->smallest.DebugString(true).c_str(),
2590             f2->largest.DebugString(true).c_str());
2591       LogFlush(info_log);
2592     }
2593     assert(false);
2594   }
2595 #else
2596   (void)info_log;
2597 #endif
2598   f->refs++;
2599   level_files->push_back(f);
2600 }
2601 
AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta)2602 void VersionStorageInfo::AddBlobFile(
2603     std::shared_ptr<BlobFileMetaData> blob_file_meta) {
2604   assert(blob_file_meta);
2605 
2606   const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
2607 
2608   auto it = blob_files_.lower_bound(blob_file_number);
2609   assert(it == blob_files_.end() || it->first != blob_file_number);
2610 
2611   blob_files_.insert(
2612       it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
2613 }
2614 
2615 // Version::PrepareApply() need to be called before calling the function, or
2616 // following functions called:
2617 // 1. UpdateNumNonEmptyLevels();
2618 // 2. CalculateBaseBytes();
2619 // 3. UpdateFilesByCompactionPri();
2620 // 4. GenerateFileIndexer();
2621 // 5. GenerateLevelFilesBrief();
2622 // 6. GenerateLevel0NonOverlapping();
2623 // 7. GenerateBottommostFiles();
SetFinalized()2624 void VersionStorageInfo::SetFinalized() {
2625   finalized_ = true;
2626 #ifndef NDEBUG
2627   if (compaction_style_ != kCompactionStyleLevel) {
2628     // Not level based compaction.
2629     return;
2630   }
2631   assert(base_level_ < 0 || num_levels() == 1 ||
2632          (base_level_ >= 1 && base_level_ < num_levels()));
2633   // Verify all levels newer than base_level are empty except L0
2634   for (int level = 1; level < base_level(); level++) {
2635     assert(NumLevelBytes(level) == 0);
2636   }
2637   uint64_t max_bytes_prev_level = 0;
2638   for (int level = base_level(); level < num_levels() - 1; level++) {
2639     if (LevelFiles(level).size() == 0) {
2640       continue;
2641     }
2642     assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2643     max_bytes_prev_level = MaxBytesForLevel(level);
2644   }
2645   int num_empty_non_l0_level = 0;
2646   for (int level = 0; level < num_levels(); level++) {
2647     assert(LevelFiles(level).size() == 0 ||
2648            LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2649     if (level > 0 && NumLevelBytes(level) > 0) {
2650       num_empty_non_l0_level++;
2651     }
2652     if (LevelFiles(level).size() > 0) {
2653       assert(level < num_non_empty_levels());
2654     }
2655   }
2656   assert(compaction_level_.size() > 0);
2657   assert(compaction_level_.size() == compaction_score_.size());
2658 #endif
2659 }
2660 
UpdateNumNonEmptyLevels()2661 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2662   num_non_empty_levels_ = num_levels_;
2663   for (int i = num_levels_ - 1; i >= 0; i--) {
2664     if (files_[i].size() != 0) {
2665       return;
2666     } else {
2667       num_non_empty_levels_ = i;
2668     }
2669   }
2670 }
2671 
2672 namespace {
2673 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)2674 void SortFileByOverlappingRatio(
2675     const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2676     const std::vector<FileMetaData*>& next_level_files,
2677     std::vector<Fsize>* temp) {
2678   std::unordered_map<uint64_t, uint64_t> file_to_order;
2679   auto next_level_it = next_level_files.begin();
2680 
2681   for (auto& file : files) {
2682     uint64_t overlapping_bytes = 0;
2683     // Skip files in next level that is smaller than current file
2684     while (next_level_it != next_level_files.end() &&
2685            icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2686       next_level_it++;
2687     }
2688 
2689     while (next_level_it != next_level_files.end() &&
2690            icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2691       overlapping_bytes += (*next_level_it)->fd.file_size;
2692 
2693       if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2694         // next level file cross large boundary of current file.
2695         break;
2696       }
2697       next_level_it++;
2698     }
2699 
2700     assert(file->compensated_file_size != 0);
2701     file_to_order[file->fd.GetNumber()] =
2702         overlapping_bytes * 1024u / file->compensated_file_size;
2703   }
2704 
2705   std::sort(temp->begin(), temp->end(),
2706             [&](const Fsize& f1, const Fsize& f2) -> bool {
2707               return file_to_order[f1.file->fd.GetNumber()] <
2708                      file_to_order[f2.file->fd.GetNumber()];
2709             });
2710 }
2711 }  // namespace
2712 
UpdateFilesByCompactionPri(CompactionPri compaction_pri)2713 void VersionStorageInfo::UpdateFilesByCompactionPri(
2714     CompactionPri compaction_pri) {
2715   if (compaction_style_ == kCompactionStyleNone ||
2716       compaction_style_ == kCompactionStyleFIFO ||
2717       compaction_style_ == kCompactionStyleUniversal) {
2718     // don't need this
2719     return;
2720   }
2721   // No need to sort the highest level because it is never compacted.
2722   for (int level = 0; level < num_levels() - 1; level++) {
2723     const std::vector<FileMetaData*>& files = files_[level];
2724     auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2725     assert(files_by_compaction_pri.size() == 0);
2726 
2727     // populate a temp vector for sorting based on size
2728     std::vector<Fsize> temp(files.size());
2729     for (size_t i = 0; i < files.size(); i++) {
2730       temp[i].index = i;
2731       temp[i].file = files[i];
2732     }
2733 
2734     // sort the top number_of_files_to_sort_ based on file size
2735     size_t num = VersionStorageInfo::kNumberFilesToSort;
2736     if (num > temp.size()) {
2737       num = temp.size();
2738     }
2739     switch (compaction_pri) {
2740       case kByCompensatedSize:
2741         std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2742                           CompareCompensatedSizeDescending);
2743         break;
2744       case kOldestLargestSeqFirst:
2745         std::sort(temp.begin(), temp.end(),
2746                   [](const Fsize& f1, const Fsize& f2) -> bool {
2747                     return f1.file->fd.largest_seqno <
2748                            f2.file->fd.largest_seqno;
2749                   });
2750         break;
2751       case kOldestSmallestSeqFirst:
2752         std::sort(temp.begin(), temp.end(),
2753                   [](const Fsize& f1, const Fsize& f2) -> bool {
2754                     return f1.file->fd.smallest_seqno <
2755                            f2.file->fd.smallest_seqno;
2756                   });
2757         break;
2758       case kMinOverlappingRatio:
2759         SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2760                                    files_[level + 1], &temp);
2761         break;
2762       default:
2763         assert(false);
2764     }
2765     assert(temp.size() == files.size());
2766 
2767     // initialize files_by_compaction_pri_
2768     for (size_t i = 0; i < temp.size(); i++) {
2769       files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2770     }
2771     next_file_to_compact_by_size_[level] = 0;
2772     assert(files_[level].size() == files_by_compaction_pri_[level].size());
2773   }
2774 }
2775 
GenerateLevel0NonOverlapping()2776 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2777   assert(!finalized_);
2778   level0_non_overlapping_ = true;
2779   if (level_files_brief_.size() == 0) {
2780     return;
2781   }
2782 
2783   // A copy of L0 files sorted by smallest key
2784   std::vector<FdWithKeyRange> level0_sorted_file(
2785       level_files_brief_[0].files,
2786       level_files_brief_[0].files + level_files_brief_[0].num_files);
2787   std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2788             [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2789               return (internal_comparator_->Compare(f1.smallest_key,
2790                                                     f2.smallest_key) < 0);
2791             });
2792 
2793   for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
2794     FdWithKeyRange& f = level0_sorted_file[i];
2795     FdWithKeyRange& prev = level0_sorted_file[i - 1];
2796     if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
2797       level0_non_overlapping_ = false;
2798       break;
2799     }
2800   }
2801 }
2802 
GenerateBottommostFiles()2803 void VersionStorageInfo::GenerateBottommostFiles() {
2804   assert(!finalized_);
2805   assert(bottommost_files_.empty());
2806   for (size_t level = 0; level < level_files_brief_.size(); ++level) {
2807     for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
2808          ++file_idx) {
2809       const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
2810       int l0_file_idx;
2811       if (level == 0) {
2812         l0_file_idx = static_cast<int>(file_idx);
2813       } else {
2814         l0_file_idx = -1;
2815       }
2816       Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2817       Slice largest_user_key = ExtractUserKey(f.largest_key);
2818       if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2819                                          static_cast<int>(level),
2820                                          l0_file_idx)) {
2821         bottommost_files_.emplace_back(static_cast<int>(level),
2822                                        f.file_metadata);
2823       }
2824     }
2825   }
2826 }
2827 
UpdateOldestSnapshot(SequenceNumber seqnum)2828 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
2829   assert(seqnum >= oldest_snapshot_seqnum_);
2830   oldest_snapshot_seqnum_ = seqnum;
2831   if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
2832     ComputeBottommostFilesMarkedForCompaction();
2833   }
2834 }
2835 
ComputeBottommostFilesMarkedForCompaction()2836 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2837   bottommost_files_marked_for_compaction_.clear();
2838   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2839   for (auto& level_and_file : bottommost_files_) {
2840     if (!level_and_file.second->being_compacted &&
2841         level_and_file.second->fd.largest_seqno != 0 &&
2842         level_and_file.second->num_deletions > 1) {
2843       // largest_seqno might be nonzero due to containing the final key in an
2844       // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2845       // ensures the file really contains deleted or overwritten keys.
2846       if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2847         bottommost_files_marked_for_compaction_.push_back(level_and_file);
2848       } else {
2849         bottommost_files_mark_threshold_ =
2850             std::min(bottommost_files_mark_threshold_,
2851                      level_and_file.second->fd.largest_seqno);
2852       }
2853     }
2854   }
2855 }
2856 
Ref()2857 void Version::Ref() {
2858   ++refs_;
2859 }
2860 
Unref()2861 bool Version::Unref() {
2862   assert(refs_ >= 1);
2863   --refs_;
2864   if (refs_ == 0) {
2865     delete this;
2866     return true;
2867   }
2868   return false;
2869 }
2870 
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)2871 bool VersionStorageInfo::OverlapInLevel(int level,
2872                                         const Slice* smallest_user_key,
2873                                         const Slice* largest_user_key) {
2874   if (level >= num_non_empty_levels_) {
2875     // empty level, no overlap
2876     return false;
2877   }
2878   return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2879                                level_files_brief_[level], smallest_user_key,
2880                                largest_user_key);
2881 }
2882 
2883 // Store in "*inputs" all files in "level" that overlap [begin,end]
2884 // If hint_index is specified, then it points to a file in the
2885 // overlapping range.
2886 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const2887 void VersionStorageInfo::GetOverlappingInputs(
2888     int level, const InternalKey* begin, const InternalKey* end,
2889     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2890     bool expand_range, InternalKey** next_smallest) const {
2891   if (level >= num_non_empty_levels_) {
2892     // this level is empty, no overlapping inputs
2893     return;
2894   }
2895 
2896   inputs->clear();
2897   if (file_index) {
2898     *file_index = -1;
2899   }
2900   const Comparator* user_cmp = user_comparator_;
2901   if (level > 0) {
2902     GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2903                                           file_index, false, next_smallest);
2904     return;
2905   }
2906 
2907   if (next_smallest) {
2908     // next_smallest key only makes sense for non-level 0, where files are
2909     // non-overlapping
2910     *next_smallest = nullptr;
2911   }
2912 
2913   Slice user_begin, user_end;
2914   if (begin != nullptr) {
2915     user_begin = begin->user_key();
2916   }
2917   if (end != nullptr) {
2918     user_end = end->user_key();
2919   }
2920 
2921   // index stores the file index need to check.
2922   std::list<size_t> index;
2923   for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2924     index.emplace_back(i);
2925   }
2926 
2927   while (!index.empty()) {
2928     bool found_overlapping_file = false;
2929     auto iter = index.begin();
2930     while (iter != index.end()) {
2931       FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2932       const Slice file_start = ExtractUserKey(f->smallest_key);
2933       const Slice file_limit = ExtractUserKey(f->largest_key);
2934       if (begin != nullptr &&
2935           user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
2936         // "f" is completely before specified range; skip it
2937         iter++;
2938       } else if (end != nullptr &&
2939                  user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
2940         // "f" is completely after specified range; skip it
2941         iter++;
2942       } else {
2943         // if overlap
2944         inputs->emplace_back(files_[level][*iter]);
2945         found_overlapping_file = true;
2946         // record the first file index.
2947         if (file_index && *file_index == -1) {
2948           *file_index = static_cast<int>(*iter);
2949         }
2950         // the related file is overlap, erase to avoid checking again.
2951         iter = index.erase(iter);
2952         if (expand_range) {
2953           if (begin != nullptr &&
2954               user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
2955             user_begin = file_start;
2956           }
2957           if (end != nullptr &&
2958               user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
2959             user_end = file_limit;
2960           }
2961         }
2962       }
2963     }
2964     // if all the files left are not overlap, break
2965     if (!found_overlapping_file) {
2966       break;
2967     }
2968   }
2969 }
2970 
2971 // Store in "*inputs" files in "level" that within range [begin,end]
2972 // Guarantee a "clean cut" boundary between the files in inputs
2973 // and the surrounding files and the maxinum number of files.
2974 // This will ensure that no parts of a key are lost during compaction.
2975 // If hint_index is specified, then it points to a file in the range.
2976 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const2977 void VersionStorageInfo::GetCleanInputsWithinInterval(
2978     int level, const InternalKey* begin, const InternalKey* end,
2979     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2980   inputs->clear();
2981   if (file_index) {
2982     *file_index = -1;
2983   }
2984   if (level >= num_non_empty_levels_ || level == 0 ||
2985       level_files_brief_[level].num_files == 0) {
2986     // this level is empty, no inputs within range
2987     // also don't support clean input interval within L0
2988     return;
2989   }
2990 
2991   GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
2992                                         hint_index, file_index,
2993                                         true /* within_interval */);
2994 }
2995 
2996 // Store in "*inputs" all files in "level" that overlap [begin,end]
2997 // Employ binary search to find at least one file that overlaps the
2998 // specified range. From that file, iterate backwards and
2999 // forwards to find all overlapping files.
3000 // if within_range is set, then only store the maximum clean inputs
3001 // within range [begin, end]. "clean" means there is a boudnary
3002 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3003 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3004     int level, const InternalKey* begin, const InternalKey* end,
3005     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3006     bool within_interval, InternalKey** next_smallest) const {
3007   assert(level > 0);
3008 
3009   auto user_cmp = user_comparator_;
3010   const FdWithKeyRange* files = level_files_brief_[level].files;
3011   const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3012 
3013   // begin to use binary search to find lower bound
3014   // and upper bound.
3015   int start_index = 0;
3016   int end_index = num_files;
3017 
3018   if (begin != nullptr) {
3019     // if within_interval is true, with file_key would find
3020     // not overlapping ranges in std::lower_bound.
3021     auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3022                                              const InternalKey* k) {
3023       auto& file_key = within_interval ? f.file_metadata->smallest
3024                                        : f.file_metadata->largest;
3025       return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3026     };
3027 
3028     start_index = static_cast<int>(
3029         std::lower_bound(files,
3030                          files + (hint_index == -1 ? num_files : hint_index),
3031                          begin, cmp) -
3032         files);
3033 
3034     if (start_index > 0 && within_interval) {
3035       bool is_overlapping = true;
3036       while (is_overlapping && start_index < num_files) {
3037         auto& pre_limit = files[start_index - 1].file_metadata->largest;
3038         auto& cur_start = files[start_index].file_metadata->smallest;
3039         is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3040         start_index += is_overlapping;
3041       }
3042     }
3043   }
3044 
3045   if (end != nullptr) {
3046     // if within_interval is true, with file_key would find
3047     // not overlapping ranges in std::upper_bound.
3048     auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3049                                              const FdWithKeyRange& f) {
3050       auto& file_key = within_interval ? f.file_metadata->largest
3051                                        : f.file_metadata->smallest;
3052       return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3053     };
3054 
3055     end_index = static_cast<int>(
3056         std::upper_bound(files + start_index, files + num_files, end, cmp) -
3057         files);
3058 
3059     if (end_index < num_files && within_interval) {
3060       bool is_overlapping = true;
3061       while (is_overlapping && end_index > start_index) {
3062         auto& next_start = files[end_index].file_metadata->smallest;
3063         auto& cur_limit = files[end_index - 1].file_metadata->largest;
3064         is_overlapping =
3065             sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3066         end_index -= is_overlapping;
3067       }
3068     }
3069   }
3070 
3071   assert(start_index <= end_index);
3072 
3073   // If there were no overlapping files, return immediately.
3074   if (start_index == end_index) {
3075     if (next_smallest) {
3076       *next_smallest = nullptr;
3077     }
3078     return;
3079   }
3080 
3081   assert(start_index < end_index);
3082 
3083   // returns the index where an overlap is found
3084   if (file_index) {
3085     *file_index = start_index;
3086   }
3087 
3088   // insert overlapping files into vector
3089   for (int i = start_index; i < end_index; i++) {
3090     inputs->push_back(files_[level][i]);
3091   }
3092 
3093   if (next_smallest != nullptr) {
3094     // Provide the next key outside the range covered by inputs
3095     if (end_index < static_cast<int>(files_[level].size())) {
3096       **next_smallest = files_[level][end_index]->smallest;
3097     } else {
3098       *next_smallest = nullptr;
3099     }
3100   }
3101 }
3102 
NumLevelBytes(int level) const3103 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3104   assert(level >= 0);
3105   assert(level < num_levels());
3106   return TotalFileSize(files_[level]);
3107 }
3108 
LevelSummary(LevelSummaryStorage * scratch) const3109 const char* VersionStorageInfo::LevelSummary(
3110     LevelSummaryStorage* scratch) const {
3111   int len = 0;
3112   if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3113     assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3114     if (level_multiplier_ != 0.0) {
3115       len = snprintf(
3116           scratch->buffer, sizeof(scratch->buffer),
3117           "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3118           base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3119     }
3120   }
3121   len +=
3122       snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3123   for (int i = 0; i < num_levels(); i++) {
3124     int sz = sizeof(scratch->buffer) - len;
3125     int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3126     if (ret < 0 || ret >= sz) break;
3127     len += ret;
3128   }
3129   if (len > 0) {
3130     // overwrite the last space
3131     --len;
3132   }
3133   len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3134                   "] max score %.2f", compaction_score_[0]);
3135 
3136   if (!files_marked_for_compaction_.empty()) {
3137     snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3138              " (%" ROCKSDB_PRIszt " files need compaction)",
3139              files_marked_for_compaction_.size());
3140   }
3141 
3142   return scratch->buffer;
3143 }
3144 
LevelFileSummary(FileSummaryStorage * scratch,int level) const3145 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3146                                                  int level) const {
3147   int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3148   for (const auto& f : files_[level]) {
3149     int sz = sizeof(scratch->buffer) - len;
3150     char sztxt[16];
3151     AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3152     int ret = snprintf(scratch->buffer + len, sz,
3153                        "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3154                        f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3155                        static_cast<int>(f->being_compacted));
3156     if (ret < 0 || ret >= sz)
3157       break;
3158     len += ret;
3159   }
3160   // overwrite the last space (only if files_[level].size() is non-zero)
3161   if (files_[level].size() && len > 0) {
3162     --len;
3163   }
3164   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3165   return scratch->buffer;
3166 }
3167 
MaxNextLevelOverlappingBytes()3168 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3169   uint64_t result = 0;
3170   std::vector<FileMetaData*> overlaps;
3171   for (int level = 1; level < num_levels() - 1; level++) {
3172     for (const auto& f : files_[level]) {
3173       GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3174       const uint64_t sum = TotalFileSize(overlaps);
3175       if (sum > result) {
3176         result = sum;
3177       }
3178     }
3179   }
3180   return result;
3181 }
3182 
MaxBytesForLevel(int level) const3183 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3184   // Note: the result for level zero is not really used since we set
3185   // the level-0 compaction threshold based on number of files.
3186   assert(level >= 0);
3187   assert(level < static_cast<int>(level_max_bytes_.size()));
3188   return level_max_bytes_[level];
3189 }
3190 
CalculateBaseBytes(const ImmutableCFOptions & ioptions,const MutableCFOptions & options)3191 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
3192                                             const MutableCFOptions& options) {
3193   // Special logic to set number of sorted runs.
3194   // It is to match the previous behavior when all files are in L0.
3195   int num_l0_count = static_cast<int>(files_[0].size());
3196   if (compaction_style_ == kCompactionStyleUniversal) {
3197     // For universal compaction, we use level0 score to indicate
3198     // compaction score for the whole DB. Adding other levels as if
3199     // they are L0 files.
3200     for (int i = 1; i < num_levels(); i++) {
3201       if (!files_[i].empty()) {
3202         num_l0_count++;
3203       }
3204     }
3205   }
3206   set_l0_delay_trigger_count(num_l0_count);
3207 
3208   level_max_bytes_.resize(ioptions.num_levels);
3209   if (!ioptions.level_compaction_dynamic_level_bytes) {
3210     base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3211 
3212     // Calculate for static bytes base case
3213     for (int i = 0; i < ioptions.num_levels; ++i) {
3214       if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3215         level_max_bytes_[i] = options.max_bytes_for_level_base;
3216       } else if (i > 1) {
3217         level_max_bytes_[i] = MultiplyCheckOverflow(
3218             MultiplyCheckOverflow(level_max_bytes_[i - 1],
3219                                   options.max_bytes_for_level_multiplier),
3220             options.MaxBytesMultiplerAdditional(i - 1));
3221       } else {
3222         level_max_bytes_[i] = options.max_bytes_for_level_base;
3223       }
3224     }
3225   } else {
3226     uint64_t max_level_size = 0;
3227 
3228     int first_non_empty_level = -1;
3229     // Find size of non-L0 level of most data.
3230     // Cannot use the size of the last level because it can be empty or less
3231     // than previous levels after compaction.
3232     for (int i = 1; i < num_levels_; i++) {
3233       uint64_t total_size = 0;
3234       for (const auto& f : files_[i]) {
3235         total_size += f->fd.GetFileSize();
3236       }
3237       if (total_size > 0 && first_non_empty_level == -1) {
3238         first_non_empty_level = i;
3239       }
3240       if (total_size > max_level_size) {
3241         max_level_size = total_size;
3242       }
3243     }
3244 
3245     // Prefill every level's max bytes to disallow compaction from there.
3246     for (int i = 0; i < num_levels_; i++) {
3247       level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3248     }
3249 
3250     if (max_level_size == 0) {
3251       // No data for L1 and up. L0 compacts to last level directly.
3252       // No compaction from L1+ needs to be scheduled.
3253       base_level_ = num_levels_ - 1;
3254     } else {
3255       uint64_t l0_size = 0;
3256       for (const auto& f : files_[0]) {
3257         l0_size += f->fd.GetFileSize();
3258       }
3259 
3260       uint64_t base_bytes_max =
3261           std::max(options.max_bytes_for_level_base, l0_size);
3262       uint64_t base_bytes_min = static_cast<uint64_t>(
3263           base_bytes_max / options.max_bytes_for_level_multiplier);
3264 
3265       // Try whether we can make last level's target size to be max_level_size
3266       uint64_t cur_level_size = max_level_size;
3267       for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3268         // Round up after dividing
3269         cur_level_size = static_cast<uint64_t>(
3270             cur_level_size / options.max_bytes_for_level_multiplier);
3271       }
3272 
3273       // Calculate base level and its size.
3274       uint64_t base_level_size;
3275       if (cur_level_size <= base_bytes_min) {
3276         // Case 1. If we make target size of last level to be max_level_size,
3277         // target size of the first non-empty level would be smaller than
3278         // base_bytes_min. We set it be base_bytes_min.
3279         base_level_size = base_bytes_min + 1U;
3280         base_level_ = first_non_empty_level;
3281         ROCKS_LOG_INFO(ioptions.info_log,
3282                        "More existing levels in DB than needed. "
3283                        "max_bytes_for_level_multiplier may not be guaranteed.");
3284       } else {
3285         // Find base level (where L0 data is compacted to).
3286         base_level_ = first_non_empty_level;
3287         while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3288           --base_level_;
3289           cur_level_size = static_cast<uint64_t>(
3290               cur_level_size / options.max_bytes_for_level_multiplier);
3291         }
3292         if (cur_level_size > base_bytes_max) {
3293           // Even L1 will be too large
3294           assert(base_level_ == 1);
3295           base_level_size = base_bytes_max;
3296         } else {
3297           base_level_size = cur_level_size;
3298         }
3299       }
3300 
3301       level_multiplier_ = options.max_bytes_for_level_multiplier;
3302       assert(base_level_size > 0);
3303       if (l0_size > base_level_size &&
3304           (l0_size > options.max_bytes_for_level_base ||
3305            static_cast<int>(files_[0].size() / 2) >=
3306                options.level0_file_num_compaction_trigger)) {
3307         // We adjust the base level according to actual L0 size, and adjust
3308         // the level multiplier accordingly, when:
3309         //   1. the L0 size is larger than level size base, or
3310         //   2. number of L0 files reaches twice the L0->L1 compaction trigger
3311         // We don't do this otherwise to keep the LSM-tree structure stable
3312         // unless the L0 compation is backlogged.
3313         base_level_size = l0_size;
3314         if (base_level_ == num_levels_ - 1) {
3315           level_multiplier_ = 1.0;
3316         } else {
3317           level_multiplier_ = std::pow(
3318               static_cast<double>(max_level_size) /
3319                   static_cast<double>(base_level_size),
3320               1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3321         }
3322       }
3323 
3324       uint64_t level_size = base_level_size;
3325       for (int i = base_level_; i < num_levels_; i++) {
3326         if (i > base_level_) {
3327           level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3328         }
3329         // Don't set any level below base_bytes_max. Otherwise, the LSM can
3330         // assume an hourglass shape where L1+ sizes are smaller than L0. This
3331         // causes compaction scoring, which depends on level sizes, to favor L1+
3332         // at the expense of L0, which may fill up and stall.
3333         level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3334       }
3335     }
3336   }
3337 }
3338 
EstimateLiveDataSize() const3339 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3340   // Estimate the live data size by adding up the size of the last level for all
3341   // key ranges. Note: Estimate depends on the ordering of files in level 0
3342   // because files in level 0 can be overlapping.
3343   uint64_t size = 0;
3344 
3345   auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3346     return internal_comparator_->Compare(*x, *y) < 0;
3347   };
3348   // (Ordered) map of largest keys in non-overlapping files
3349   std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3350 
3351   for (int l = num_levels_ - 1; l >= 0; l--) {
3352     bool found_end = false;
3353     for (auto file : files_[l]) {
3354       // Find the first file where the largest key is larger than the smallest
3355       // key of the current file. If this file does not overlap with the
3356       // current file, none of the files in the map does. If there is
3357       // no potential overlap, we can safely insert the rest of this level
3358       // (if the level is not 0) into the map without checking again because
3359       // the elements in the level are sorted and non-overlapping.
3360       auto lb = (found_end && l != 0) ?
3361         ranges.end() : ranges.lower_bound(&file->smallest);
3362       found_end = (lb == ranges.end());
3363       if (found_end || internal_comparator_->Compare(
3364             file->largest, (*lb).second->smallest) < 0) {
3365           ranges.emplace_hint(lb, &file->largest, file);
3366           size += file->fd.file_size;
3367       }
3368     }
3369   }
3370   return size;
3371 }
3372 
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3373 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3374     const Slice& smallest_user_key, const Slice& largest_user_key,
3375     int last_level, int last_l0_idx) {
3376   assert((last_l0_idx != -1) == (last_level == 0));
3377   // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3378   // bottommost only if it's the oldest L0 file and there are no files on older
3379   // levels. It'd be better to consider it bottommost if there's no overlap in
3380   // older levels/files.
3381   if (last_level == 0 &&
3382       last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3383     return true;
3384   }
3385 
3386   // Checks whether there are files living beyond the `last_level`. If lower
3387   // levels have files, it checks for overlap between [`smallest_key`,
3388   // `largest_key`] and those files. Bottomlevel optimizations can be made if
3389   // there are no files in lower levels or if there is no overlap with the files
3390   // in the lower levels.
3391   for (int level = last_level + 1; level < num_levels(); level++) {
3392     // The range is not in the bottommost level if there are files in lower
3393     // levels when the `last_level` is 0 or if there are files in lower levels
3394     // which overlap with [`smallest_key`, `largest_key`].
3395     if (files_[level].size() > 0 &&
3396         (last_level == 0 ||
3397          OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3398       return true;
3399     }
3400   }
3401   return false;
3402 }
3403 
AddLiveFiles(std::vector<FileDescriptor> * live)3404 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
3405   for (int level = 0; level < storage_info_.num_levels(); level++) {
3406     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3407     for (const auto& file : files) {
3408       live->push_back(file->fd);
3409     }
3410   }
3411 }
3412 
DebugString(bool hex,bool print_stats) const3413 std::string Version::DebugString(bool hex, bool print_stats) const {
3414   std::string r;
3415   for (int level = 0; level < storage_info_.num_levels_; level++) {
3416     // E.g.,
3417     //   --- level 1 ---
3418     //   17:123[1 .. 124]['a' .. 'd']
3419     //   20:43[124 .. 128]['e' .. 'g']
3420     //
3421     // if print_stats=true:
3422     //   17:123[1 .. 124]['a' .. 'd'](4096)
3423     r.append("--- level ");
3424     AppendNumberTo(&r, level);
3425     r.append(" --- version# ");
3426     AppendNumberTo(&r, version_number_);
3427     r.append(" ---\n");
3428     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3429     for (size_t i = 0; i < files.size(); i++) {
3430       r.push_back(' ');
3431       AppendNumberTo(&r, files[i]->fd.GetNumber());
3432       r.push_back(':');
3433       AppendNumberTo(&r, files[i]->fd.GetFileSize());
3434       r.append("[");
3435       AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3436       r.append(" .. ");
3437       AppendNumberTo(&r, files[i]->fd.largest_seqno);
3438       r.append("]");
3439       r.append("[");
3440       r.append(files[i]->smallest.DebugString(hex));
3441       r.append(" .. ");
3442       r.append(files[i]->largest.DebugString(hex));
3443       r.append("]");
3444       if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3445         r.append(" blob_file:");
3446         AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3447       }
3448       if (print_stats) {
3449         r.append("(");
3450         r.append(ToString(
3451             files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3452         r.append(")");
3453       }
3454       r.append("\n");
3455     }
3456   }
3457 
3458   const auto& blob_files = storage_info_.GetBlobFiles();
3459   if (!blob_files.empty()) {
3460     r.append("--- blob files --- version# ");
3461     AppendNumberTo(&r, version_number_);
3462     r.append(" ---\n");
3463     for (const auto& pair : blob_files) {
3464       const auto& blob_file_meta = pair.second;
3465       assert(blob_file_meta);
3466 
3467       r.append(blob_file_meta->DebugString());
3468       r.push_back('\n');
3469     }
3470   }
3471 
3472   return r;
3473 }
3474 
3475 // this is used to batch writes to the manifest file
3476 struct VersionSet::ManifestWriter {
3477   Status status;
3478   bool done;
3479   InstrumentedCondVar cv;
3480   ColumnFamilyData* cfd;
3481   const MutableCFOptions mutable_cf_options;
3482   const autovector<VersionEdit*>& edit_list;
3483 
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3484   explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3485                           const MutableCFOptions& cf_options,
3486                           const autovector<VersionEdit*>& e)
3487       : done(false),
3488         cv(mu),
3489         cfd(_cfd),
3490         mutable_cf_options(cf_options),
3491         edit_list(e) {}
3492 };
3493 
AddEdit(VersionEdit * edit)3494 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3495   assert(edit);
3496   if (edit->is_in_atomic_group_) {
3497     TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3498     if (replay_buffer_.empty()) {
3499       replay_buffer_.resize(edit->remaining_entries_ + 1);
3500       TEST_SYNC_POINT_CALLBACK(
3501           "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3502     }
3503     read_edits_in_atomic_group_++;
3504     if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3505         static_cast<uint32_t>(replay_buffer_.size())) {
3506       TEST_SYNC_POINT_CALLBACK(
3507           "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3508       return Status::Corruption("corrupted atomic group");
3509     }
3510     replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3511     if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3512       TEST_SYNC_POINT_CALLBACK(
3513           "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3514       return Status::OK();
3515     }
3516     return Status::OK();
3517   }
3518 
3519   // A normal edit.
3520   if (!replay_buffer().empty()) {
3521     TEST_SYNC_POINT_CALLBACK(
3522         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3523     return Status::Corruption("corrupted atomic group");
3524   }
3525   return Status::OK();
3526 }
3527 
IsFull() const3528 bool AtomicGroupReadBuffer::IsFull() const {
3529   return read_edits_in_atomic_group_ == replay_buffer_.size();
3530 }
3531 
IsEmpty() const3532 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3533 
Clear()3534 void AtomicGroupReadBuffer::Clear() {
3535   read_edits_in_atomic_group_ = 0;
3536   replay_buffer_.clear();
3537 }
3538 
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)3539 VersionSet::VersionSet(const std::string& dbname,
3540                        const ImmutableDBOptions* _db_options,
3541                        const FileOptions& storage_options, Cache* table_cache,
3542                        WriteBufferManager* write_buffer_manager,
3543                        WriteController* write_controller,
3544                        BlockCacheTracer* const block_cache_tracer)
3545     : column_family_set_(new ColumnFamilySet(
3546           dbname, _db_options, storage_options, table_cache,
3547           write_buffer_manager, write_controller, block_cache_tracer)),
3548       env_(_db_options->env),
3549       fs_(_db_options->fs.get()),
3550       dbname_(dbname),
3551       db_options_(_db_options),
3552       next_file_number_(2),
3553       manifest_file_number_(0),  // Filled by Recover()
3554       options_file_number_(0),
3555       pending_manifest_file_number_(0),
3556       last_sequence_(0),
3557       last_allocated_sequence_(0),
3558       last_published_sequence_(0),
3559       prev_log_number_(0),
3560       current_version_number_(0),
3561       manifest_file_size_(0),
3562       file_options_(storage_options),
3563       block_cache_tracer_(block_cache_tracer) {}
3564 
~VersionSet()3565 VersionSet::~VersionSet() {
3566   // we need to delete column_family_set_ because its destructor depends on
3567   // VersionSet
3568   Cache* table_cache = column_family_set_->get_table_cache();
3569   column_family_set_.reset();
3570   for (auto& file : obsolete_files_) {
3571     if (file.metadata->table_reader_handle) {
3572       table_cache->Release(file.metadata->table_reader_handle);
3573       TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
3574     }
3575     file.DeleteMetadata();
3576   }
3577   obsolete_files_.clear();
3578 }
3579 
Reset()3580 void VersionSet::Reset() {
3581   if (column_family_set_) {
3582     Cache* table_cache = column_family_set_->get_table_cache();
3583     WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
3584     WriteController* wc = column_family_set_->write_controller();
3585     column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_,
3586                                                  file_options_, table_cache,
3587                                                  wbm, wc, block_cache_tracer_));
3588   }
3589   db_id_.clear();
3590   next_file_number_.store(2);
3591   min_log_number_to_keep_2pc_.store(0);
3592   manifest_file_number_ = 0;
3593   options_file_number_ = 0;
3594   pending_manifest_file_number_ = 0;
3595   last_sequence_.store(0);
3596   last_allocated_sequence_.store(0);
3597   last_published_sequence_.store(0);
3598   prev_log_number_ = 0;
3599   descriptor_log_.reset();
3600   current_version_number_ = 0;
3601   manifest_writers_.clear();
3602   manifest_file_size_ = 0;
3603   obsolete_files_.clear();
3604   obsolete_manifests_.clear();
3605 }
3606 
AppendVersion(ColumnFamilyData * column_family_data,Version * v)3607 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3608                                Version* v) {
3609   // compute new compaction score
3610   v->storage_info()->ComputeCompactionScore(
3611       *column_family_data->ioptions(),
3612       *column_family_data->GetLatestMutableCFOptions());
3613 
3614   // Mark v finalized
3615   v->storage_info_.SetFinalized();
3616 
3617   // Make "v" current
3618   assert(v->refs_ == 0);
3619   Version* current = column_family_data->current();
3620   assert(v != current);
3621   if (current != nullptr) {
3622     assert(current->refs_ > 0);
3623     current->Unref();
3624   }
3625   column_family_data->SetCurrent(v);
3626   v->Ref();
3627 
3628   // Append to linked list
3629   v->prev_ = column_family_data->dummy_versions()->prev_;
3630   v->next_ = column_family_data->dummy_versions();
3631   v->prev_->next_ = v;
3632   v->next_->prev_ = v;
3633 }
3634 
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)3635 Status VersionSet::ProcessManifestWrites(
3636     std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3637     FSDirectory* db_directory, bool new_descriptor_log,
3638     const ColumnFamilyOptions* new_cf_options) {
3639   assert(!writers.empty());
3640   ManifestWriter& first_writer = writers.front();
3641   ManifestWriter* last_writer = &first_writer;
3642 
3643   assert(!manifest_writers_.empty());
3644   assert(manifest_writers_.front() == &first_writer);
3645 
3646   autovector<VersionEdit*> batch_edits;
3647   autovector<Version*> versions;
3648   autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3649   std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3650 
3651   if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3652     // No group commits for column family add or drop
3653     LogAndApplyCFHelper(first_writer.edit_list.front());
3654     batch_edits.push_back(first_writer.edit_list.front());
3655   } else {
3656     auto it = manifest_writers_.cbegin();
3657     size_t group_start = std::numeric_limits<size_t>::max();
3658     while (it != manifest_writers_.cend()) {
3659       if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3660         // no group commits for column family add or drop
3661         break;
3662       }
3663       last_writer = *(it++);
3664       assert(last_writer != nullptr);
3665       assert(last_writer->cfd != nullptr);
3666       if (last_writer->cfd->IsDropped()) {
3667         // If we detect a dropped CF at this point, and the corresponding
3668         // version edits belong to an atomic group, then we need to find out
3669         // the preceding version edits in the same atomic group, and update
3670         // their `remaining_entries_` member variable because we are NOT going
3671         // to write the version edits' of dropped CF to the MANIFEST. If we
3672         // don't update, then Recover can report corrupted atomic group because
3673         // the `remaining_entries_` do not match.
3674         if (!batch_edits.empty()) {
3675           if (batch_edits.back()->is_in_atomic_group_ &&
3676               batch_edits.back()->remaining_entries_ > 0) {
3677             assert(group_start < batch_edits.size());
3678             const auto& edit_list = last_writer->edit_list;
3679             size_t k = 0;
3680             while (k < edit_list.size()) {
3681               if (!edit_list[k]->is_in_atomic_group_) {
3682                 break;
3683               } else if (edit_list[k]->remaining_entries_ == 0) {
3684                 ++k;
3685                 break;
3686               }
3687               ++k;
3688             }
3689             for (auto i = group_start; i < batch_edits.size(); ++i) {
3690               assert(static_cast<uint32_t>(k) <=
3691                      batch_edits.back()->remaining_entries_);
3692               batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3693             }
3694           }
3695         }
3696         continue;
3697       }
3698       // We do a linear search on versions because versions is small.
3699       // TODO(yanqin) maybe consider unordered_map
3700       Version* version = nullptr;
3701       VersionBuilder* builder = nullptr;
3702       for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3703         uint32_t cf_id = last_writer->cfd->GetID();
3704         if (versions[i]->cfd()->GetID() == cf_id) {
3705           version = versions[i];
3706           assert(!builder_guards.empty() &&
3707                  builder_guards.size() == versions.size());
3708           builder = builder_guards[i]->version_builder();
3709           TEST_SYNC_POINT_CALLBACK(
3710               "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3711           break;
3712         }
3713       }
3714       if (version == nullptr) {
3715         version = new Version(last_writer->cfd, this, file_options_,
3716                               last_writer->mutable_cf_options,
3717                               current_version_number_++);
3718         versions.push_back(version);
3719         mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3720         builder_guards.emplace_back(
3721             new BaseReferencedVersionBuilder(last_writer->cfd));
3722         builder = builder_guards.back()->version_builder();
3723       }
3724       assert(builder != nullptr);  // make checker happy
3725       for (const auto& e : last_writer->edit_list) {
3726         if (e->is_in_atomic_group_) {
3727           if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3728               (batch_edits.back()->is_in_atomic_group_ &&
3729                batch_edits.back()->remaining_entries_ == 0)) {
3730             group_start = batch_edits.size();
3731           }
3732         } else if (group_start != std::numeric_limits<size_t>::max()) {
3733           group_start = std::numeric_limits<size_t>::max();
3734         }
3735         Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3736         if (!s.ok()) {
3737           // free up the allocated memory
3738           for (auto v : versions) {
3739             delete v;
3740           }
3741           return s;
3742         }
3743         batch_edits.push_back(e);
3744       }
3745     }
3746     for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3747       assert(!builder_guards.empty() &&
3748              builder_guards.size() == versions.size());
3749       auto* builder = builder_guards[i]->version_builder();
3750       Status s = builder->SaveTo(versions[i]->storage_info());
3751       if (!s.ok()) {
3752         // free up the allocated memory
3753         for (auto v : versions) {
3754           delete v;
3755         }
3756         return s;
3757       }
3758     }
3759   }
3760 
3761 #ifndef NDEBUG
3762   // Verify that version edits of atomic groups have correct
3763   // remaining_entries_.
3764   size_t k = 0;
3765   while (k < batch_edits.size()) {
3766     while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
3767       ++k;
3768     }
3769     if (k == batch_edits.size()) {
3770       break;
3771     }
3772     size_t i = k;
3773     while (i < batch_edits.size()) {
3774       if (!batch_edits[i]->is_in_atomic_group_) {
3775         break;
3776       }
3777       assert(i - k + batch_edits[i]->remaining_entries_ ==
3778              batch_edits[k]->remaining_entries_);
3779       if (batch_edits[i]->remaining_entries_ == 0) {
3780         ++i;
3781         break;
3782       }
3783       ++i;
3784     }
3785     assert(batch_edits[i - 1]->is_in_atomic_group_);
3786     assert(0 == batch_edits[i - 1]->remaining_entries_);
3787     std::vector<VersionEdit*> tmp;
3788     for (size_t j = k; j != i; ++j) {
3789       tmp.emplace_back(batch_edits[j]);
3790     }
3791     TEST_SYNC_POINT_CALLBACK(
3792         "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
3793     k = i;
3794   }
3795 #endif  // NDEBUG
3796 
3797   uint64_t new_manifest_file_size = 0;
3798   Status s;
3799   IOStatus io_s;
3800 
3801   assert(pending_manifest_file_number_ == 0);
3802   if (!descriptor_log_ ||
3803       manifest_file_size_ > db_options_->max_manifest_file_size) {
3804     TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
3805     new_descriptor_log = true;
3806   } else {
3807     pending_manifest_file_number_ = manifest_file_number_;
3808   }
3809 
3810   // Local cached copy of state variable(s). WriteCurrentStateToManifest()
3811   // reads its content after releasing db mutex to avoid race with
3812   // SwitchMemtable().
3813   std::unordered_map<uint32_t, MutableCFState> curr_state;
3814   if (new_descriptor_log) {
3815     pending_manifest_file_number_ = NewFileNumber();
3816     batch_edits.back()->SetNextFile(next_file_number_.load());
3817 
3818     // if we are writing out new snapshot make sure to persist max column
3819     // family.
3820     if (column_family_set_->GetMaxColumnFamily() > 0) {
3821       first_writer.edit_list.front()->SetMaxColumnFamily(
3822           column_family_set_->GetMaxColumnFamily());
3823     }
3824     for (const auto* cfd : *column_family_set_) {
3825       assert(curr_state.find(cfd->GetID()) == curr_state.end());
3826       curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
3827     }
3828   }
3829 
3830   {
3831     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
3832     mu->Unlock();
3833 
3834     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3835     if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3836       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3837         assert(!builder_guards.empty() &&
3838                builder_guards.size() == versions.size());
3839         assert(!mutable_cf_options_ptrs.empty() &&
3840                builder_guards.size() == versions.size());
3841         ColumnFamilyData* cfd = versions[i]->cfd_;
3842         s = builder_guards[i]->version_builder()->LoadTableHandlers(
3843             cfd->internal_stats(), 1 /* max_threads */,
3844             true /* prefetch_index_and_filter_in_cache */,
3845             false /* is_initial_load */,
3846             mutable_cf_options_ptrs[i]->prefix_extractor.get());
3847         if (!s.ok()) {
3848           if (db_options_->paranoid_checks) {
3849             break;
3850           }
3851           s = Status::OK();
3852         }
3853       }
3854     }
3855 
3856     if (s.ok() && new_descriptor_log) {
3857       // This is fine because everything inside of this block is serialized --
3858       // only one thread can be here at the same time
3859       // create new manifest file
3860       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
3861                      pending_manifest_file_number_);
3862       std::string descriptor_fname =
3863           DescriptorFileName(dbname_, pending_manifest_file_number_);
3864       std::unique_ptr<FSWritableFile> descriptor_file;
3865       s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3866                           opt_file_opts);
3867       if (s.ok()) {
3868         descriptor_file->SetPreallocationBlockSize(
3869             db_options_->manifest_preallocation_size);
3870 
3871         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3872             std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
3873             nullptr, db_options_->listeners));
3874         descriptor_log_.reset(
3875             new log::Writer(std::move(file_writer), 0, false));
3876         s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
3877       }
3878     }
3879 
3880     if (s.ok()) {
3881       if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3882         for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3883           versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
3884         }
3885       }
3886 
3887       // Write new records to MANIFEST log
3888 #ifndef NDEBUG
3889       size_t idx = 0;
3890 #endif
3891       for (auto& e : batch_edits) {
3892         std::string record;
3893         if (!e->EncodeTo(&record)) {
3894           s = Status::Corruption("Unable to encode VersionEdit:" +
3895                                  e->DebugString(true));
3896           break;
3897         }
3898         TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3899                          rocksdb_kill_odds * REDUCE_ODDS2);
3900 #ifndef NDEBUG
3901         if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3902           TEST_SYNC_POINT_CALLBACK(
3903               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3904               nullptr);
3905           TEST_SYNC_POINT(
3906               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3907         }
3908         ++idx;
3909 #endif /* !NDEBUG */
3910         io_s = descriptor_log_->AddRecord(record);
3911         if (!io_s.ok()) {
3912           io_status_ = io_s;
3913           s = io_s;
3914           break;
3915         }
3916       }
3917       if (s.ok()) {
3918         io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
3919       }
3920       if (!io_s.ok()) {
3921         io_status_ = io_s;
3922         s = io_s;
3923         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3924                         s.ToString().c_str());
3925       }
3926     }
3927 
3928     // If we just created a new descriptor file, install it by writing a
3929     // new CURRENT file that points to it.
3930     if (s.ok() && new_descriptor_log) {
3931       io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
3932                             db_directory);
3933       if (!io_s.ok()) {
3934         io_status_ = io_s;
3935         s = io_s;
3936       }
3937       TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3938     }
3939 
3940     if (s.ok()) {
3941       // find offset in manifest file where this version is stored.
3942       new_manifest_file_size = descriptor_log_->file()->GetFileSize();
3943     }
3944 
3945     if (first_writer.edit_list.front()->is_column_family_drop_) {
3946       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3947       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3948       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3949     }
3950 
3951     LogFlush(db_options_->info_log);
3952     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3953     mu->Lock();
3954   }
3955 
3956   // Append the old manifest file to the obsolete_manifest_ list to be deleted
3957   // by PurgeObsoleteFiles later.
3958   if (s.ok() && new_descriptor_log) {
3959     obsolete_manifests_.emplace_back(
3960         DescriptorFileName("", manifest_file_number_));
3961   }
3962 
3963   // Install the new versions
3964   if (s.ok()) {
3965     if (first_writer.edit_list.front()->is_column_family_add_) {
3966       assert(batch_edits.size() == 1);
3967       assert(new_cf_options != nullptr);
3968       CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3969     } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3970       assert(batch_edits.size() == 1);
3971       first_writer.cfd->SetDropped();
3972       first_writer.cfd->UnrefAndTryDelete();
3973     } else {
3974       // Each version in versions corresponds to a column family.
3975       // For each column family, update its log number indicating that logs
3976       // with number smaller than this should be ignored.
3977       for (const auto version : versions) {
3978         uint64_t max_log_number_in_batch = 0;
3979         uint32_t cf_id = version->cfd_->GetID();
3980         for (const auto& e : batch_edits) {
3981           if (e->has_log_number_ && e->column_family_ == cf_id) {
3982             max_log_number_in_batch =
3983                 std::max(max_log_number_in_batch, e->log_number_);
3984           }
3985         }
3986         if (max_log_number_in_batch != 0) {
3987           assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3988           version->cfd_->SetLogNumber(max_log_number_in_batch);
3989         }
3990       }
3991 
3992       uint64_t last_min_log_number_to_keep = 0;
3993       for (auto& e : batch_edits) {
3994         if (e->has_min_log_number_to_keep_) {
3995           last_min_log_number_to_keep =
3996               std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
3997         }
3998       }
3999 
4000       if (last_min_log_number_to_keep != 0) {
4001         // Should only be set in 2PC mode.
4002         MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4003       }
4004 
4005       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4006         ColumnFamilyData* cfd = versions[i]->cfd_;
4007         AppendVersion(cfd, versions[i]);
4008       }
4009     }
4010     manifest_file_number_ = pending_manifest_file_number_;
4011     manifest_file_size_ = new_manifest_file_size;
4012     prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4013   } else {
4014     std::string version_edits;
4015     for (auto& e : batch_edits) {
4016       version_edits += ("\n" + e->DebugString(true));
4017     }
4018     ROCKS_LOG_ERROR(db_options_->info_log,
4019                     "Error in committing version edit to MANIFEST: %s",
4020                     version_edits.c_str());
4021     for (auto v : versions) {
4022       delete v;
4023     }
4024     // If manifest append failed for whatever reason, the file could be
4025     // corrupted. So we need to force the next version update to start a
4026     // new manifest file.
4027     descriptor_log_.reset();
4028     if (new_descriptor_log) {
4029       ROCKS_LOG_INFO(db_options_->info_log,
4030                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4031                      "\n",
4032                      manifest_file_number_, pending_manifest_file_number_);
4033       env_->DeleteFile(
4034           DescriptorFileName(dbname_, pending_manifest_file_number_));
4035     }
4036   }
4037 
4038   pending_manifest_file_number_ = 0;
4039 
4040   // wake up all the waiting writers
4041   while (true) {
4042     ManifestWriter* ready = manifest_writers_.front();
4043     manifest_writers_.pop_front();
4044     bool need_signal = true;
4045     for (const auto& w : writers) {
4046       if (&w == ready) {
4047         need_signal = false;
4048         break;
4049       }
4050     }
4051     ready->status = s;
4052     ready->done = true;
4053     if (need_signal) {
4054       ready->cv.Signal();
4055     }
4056     if (ready == last_writer) {
4057       break;
4058     }
4059   }
4060   if (!manifest_writers_.empty()) {
4061     manifest_writers_.front()->cv.Signal();
4062   }
4063   return s;
4064 }
4065 
4066 // 'datas' is gramatically incorrect. We still use this notation to indicate
4067 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4068 Status VersionSet::LogAndApply(
4069     const autovector<ColumnFamilyData*>& column_family_datas,
4070     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4071     const autovector<autovector<VersionEdit*>>& edit_lists,
4072     InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4073     const ColumnFamilyOptions* new_cf_options) {
4074   mu->AssertHeld();
4075   int num_edits = 0;
4076   for (const auto& elist : edit_lists) {
4077     num_edits += static_cast<int>(elist.size());
4078   }
4079   if (num_edits == 0) {
4080     return Status::OK();
4081   } else if (num_edits > 1) {
4082 #ifndef NDEBUG
4083     for (const auto& edit_list : edit_lists) {
4084       for (const auto& edit : edit_list) {
4085         assert(!edit->IsColumnFamilyManipulation());
4086       }
4087     }
4088 #endif /* ! NDEBUG */
4089   }
4090 
4091   int num_cfds = static_cast<int>(column_family_datas.size());
4092   if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4093     assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4094     assert(edit_lists[0][0]->is_column_family_add_);
4095     assert(new_cf_options != nullptr);
4096   }
4097   std::deque<ManifestWriter> writers;
4098   if (num_cfds > 0) {
4099     assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4100     assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4101   }
4102   for (int i = 0; i < num_cfds; ++i) {
4103     writers.emplace_back(mu, column_family_datas[i],
4104                          *mutable_cf_options_list[i], edit_lists[i]);
4105     manifest_writers_.push_back(&writers[i]);
4106   }
4107   assert(!writers.empty());
4108   ManifestWriter& first_writer = writers.front();
4109   while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4110     first_writer.cv.Wait();
4111   }
4112   if (first_writer.done) {
4113     // All non-CF-manipulation operations can be grouped together and committed
4114     // to MANIFEST. They should all have finished. The status code is stored in
4115     // the first manifest writer.
4116 #ifndef NDEBUG
4117     for (const auto& writer : writers) {
4118       assert(writer.done);
4119     }
4120 #endif /* !NDEBUG */
4121     return first_writer.status;
4122   }
4123 
4124   int num_undropped_cfds = 0;
4125   for (auto cfd : column_family_datas) {
4126     // if cfd == nullptr, it is a column family add.
4127     if (cfd == nullptr || !cfd->IsDropped()) {
4128       ++num_undropped_cfds;
4129     }
4130   }
4131   if (0 == num_undropped_cfds) {
4132     for (int i = 0; i != num_cfds; ++i) {
4133       manifest_writers_.pop_front();
4134     }
4135     // Notify new head of manifest write queue.
4136     if (!manifest_writers_.empty()) {
4137       manifest_writers_.front()->cv.Signal();
4138     }
4139     return Status::ColumnFamilyDropped();
4140   }
4141 
4142   return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4143                                new_cf_options);
4144 }
4145 
LogAndApplyCFHelper(VersionEdit * edit)4146 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4147   assert(edit->IsColumnFamilyManipulation());
4148   edit->SetNextFile(next_file_number_.load());
4149   // The log might have data that is not visible to memtbale and hence have not
4150   // updated the last_sequence_ yet. It is also possible that the log has is
4151   // expecting some new data that is not written yet. Since LastSequence is an
4152   // upper bound on the sequence, it is ok to record
4153   // last_allocated_sequence_ as the last sequence.
4154   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4155                                                       : last_sequence_);
4156   if (edit->is_column_family_drop_) {
4157     // if we drop column family, we have to make sure to save max column family,
4158     // so that we don't reuse existing ID
4159     edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4160   }
4161 }
4162 
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4163 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4164                                      VersionBuilder* builder, VersionEdit* edit,
4165                                      InstrumentedMutex* mu) {
4166 #ifdef NDEBUG
4167   (void)cfd;
4168 #endif
4169   mu->AssertHeld();
4170   assert(!edit->IsColumnFamilyManipulation());
4171 
4172   if (edit->has_log_number_) {
4173     assert(edit->log_number_ >= cfd->GetLogNumber());
4174     assert(edit->log_number_ < next_file_number_.load());
4175   }
4176 
4177   if (!edit->has_prev_log_number_) {
4178     edit->SetPrevLogNumber(prev_log_number_);
4179   }
4180   edit->SetNextFile(next_file_number_.load());
4181   // The log might have data that is not visible to memtbale and hence have not
4182   // updated the last_sequence_ yet. It is also possible that the log has is
4183   // expecting some new data that is not written yet. Since LastSequence is an
4184   // upper bound on the sequence, it is ok to record
4185   // last_allocated_sequence_ as the last sequence.
4186   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4187                                                       : last_sequence_);
4188 
4189   Status s = builder->Apply(edit);
4190 
4191   return s;
4192 }
4193 
ApplyOneVersionEditToBuilder(VersionEdit & edit,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params)4194 Status VersionSet::ApplyOneVersionEditToBuilder(
4195     VersionEdit& edit,
4196     const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4197     std::unordered_map<int, std::string>& column_families_not_found,
4198     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4199         builders,
4200     VersionEditParams* version_edit_params) {
4201   // Not found means that user didn't supply that column
4202   // family option AND we encountered column family add
4203   // record. Once we encounter column family drop record,
4204   // we will delete the column family from
4205   // column_families_not_found.
4206   bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
4207                           column_families_not_found.end());
4208   // in builders means that user supplied that column family
4209   // option AND that we encountered column family add record
4210   bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
4211 
4212   // they can't both be true
4213   assert(!(cf_in_not_found && cf_in_builders));
4214 
4215   ColumnFamilyData* cfd = nullptr;
4216 
4217   if (edit.is_column_family_add_) {
4218     if (cf_in_builders || cf_in_not_found) {
4219       return Status::Corruption(
4220           "Manifest adding the same column family twice: " +
4221           edit.column_family_name_);
4222     }
4223     auto cf_options = name_to_options.find(edit.column_family_name_);
4224     // implicitly add persistent_stats column family without requiring user
4225     // to specify
4226     bool is_persistent_stats_column_family =
4227         edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
4228     if (cf_options == name_to_options.end() &&
4229         !is_persistent_stats_column_family) {
4230       column_families_not_found.insert(
4231           {edit.column_family_, edit.column_family_name_});
4232     } else {
4233       // recover persistent_stats CF from a DB that already contains it
4234       if (is_persistent_stats_column_family) {
4235         ColumnFamilyOptions cfo;
4236         OptimizeForPersistentStats(&cfo);
4237         cfd = CreateColumnFamily(cfo, &edit);
4238       } else {
4239         cfd = CreateColumnFamily(cf_options->second, &edit);
4240       }
4241       cfd->set_initialized();
4242       builders.insert(std::make_pair(
4243           edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4244                                    new BaseReferencedVersionBuilder(cfd))));
4245     }
4246   } else if (edit.is_column_family_drop_) {
4247     if (cf_in_builders) {
4248       auto builder = builders.find(edit.column_family_);
4249       assert(builder != builders.end());
4250       builders.erase(builder);
4251       cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4252       assert(cfd != nullptr);
4253       if (cfd->UnrefAndTryDelete()) {
4254         cfd = nullptr;
4255       } else {
4256         // who else can have reference to cfd!?
4257         assert(false);
4258       }
4259     } else if (cf_in_not_found) {
4260       column_families_not_found.erase(edit.column_family_);
4261     } else {
4262       return Status::Corruption(
4263           "Manifest - dropping non-existing column family");
4264     }
4265   } else if (!cf_in_not_found) {
4266     if (!cf_in_builders) {
4267       return Status::Corruption(
4268           "Manifest record referencing unknown column family");
4269     }
4270 
4271     cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4272     // this should never happen since cf_in_builders is true
4273     assert(cfd != nullptr);
4274 
4275     // if it is not column family add or column family drop,
4276     // then it's a file add/delete, which should be forwarded
4277     // to builder
4278     auto builder = builders.find(edit.column_family_);
4279     assert(builder != builders.end());
4280     Status s = builder->second->version_builder()->Apply(&edit);
4281     if (!s.ok()) {
4282       return s;
4283     }
4284   }
4285   return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4286 }
4287 
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & from_edit,VersionEditParams * version_edit_params)4288 Status VersionSet::ExtractInfoFromVersionEdit(
4289     ColumnFamilyData* cfd, const VersionEdit& from_edit,
4290     VersionEditParams* version_edit_params) {
4291   if (cfd != nullptr) {
4292     if (from_edit.has_db_id_) {
4293       version_edit_params->SetDBId(from_edit.db_id_);
4294     }
4295     if (from_edit.has_log_number_) {
4296       if (cfd->GetLogNumber() > from_edit.log_number_) {
4297         ROCKS_LOG_WARN(
4298             db_options_->info_log,
4299             "MANIFEST corruption detected, but ignored - Log numbers in "
4300             "records NOT monotonically increasing");
4301       } else {
4302         cfd->SetLogNumber(from_edit.log_number_);
4303         version_edit_params->SetLogNumber(from_edit.log_number_);
4304       }
4305     }
4306     if (from_edit.has_comparator_ &&
4307         from_edit.comparator_ != cfd->user_comparator()->Name()) {
4308       return Status::InvalidArgument(
4309           cfd->user_comparator()->Name(),
4310           "does not match existing comparator " + from_edit.comparator_);
4311     }
4312   }
4313 
4314   if (from_edit.has_prev_log_number_) {
4315     version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4316   }
4317 
4318   if (from_edit.has_next_file_number_) {
4319     version_edit_params->SetNextFile(from_edit.next_file_number_);
4320   }
4321 
4322   if (from_edit.has_max_column_family_) {
4323     version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4324   }
4325 
4326   if (from_edit.has_min_log_number_to_keep_) {
4327     version_edit_params->min_log_number_to_keep_ =
4328         std::max(version_edit_params->min_log_number_to_keep_,
4329                  from_edit.min_log_number_to_keep_);
4330   }
4331 
4332   if (from_edit.has_last_sequence_) {
4333     version_edit_params->SetLastSequence(from_edit.last_sequence_);
4334   }
4335   return Status::OK();
4336 }
4337 
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4338 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4339                                           FileSystem* fs,
4340                                           std::string* manifest_path,
4341                                           uint64_t* manifest_file_number) {
4342   assert(fs != nullptr);
4343   assert(manifest_path != nullptr);
4344   assert(manifest_file_number != nullptr);
4345 
4346   std::string fname;
4347   Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4348   if (!s.ok()) {
4349     return s;
4350   }
4351   if (fname.empty() || fname.back() != '\n') {
4352     return Status::Corruption("CURRENT file does not end with newline");
4353   }
4354   // remove the trailing '\n'
4355   fname.resize(fname.size() - 1);
4356   FileType type;
4357   bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4358   if (!parse_ok || type != kDescriptorFile) {
4359     return Status::Corruption("CURRENT file corrupted");
4360   }
4361   *manifest_path = dbname;
4362   if (dbname.back() != '/') {
4363     manifest_path->push_back('/');
4364   }
4365   *manifest_path += fname;
4366   return Status::OK();
4367 }
4368 
ReadAndRecover(log::Reader * reader,AtomicGroupReadBuffer * read_buffer,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params,std::string * db_id)4369 Status VersionSet::ReadAndRecover(
4370     log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
4371     const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4372     std::unordered_map<int, std::string>& column_families_not_found,
4373     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4374         builders,
4375     VersionEditParams* version_edit_params, std::string* db_id) {
4376   assert(reader != nullptr);
4377   assert(read_buffer != nullptr);
4378   Status s;
4379   Slice record;
4380   std::string scratch;
4381   size_t recovered_edits = 0;
4382   while (reader->ReadRecord(&record, &scratch) && s.ok()) {
4383     VersionEdit edit;
4384     s = edit.DecodeFrom(record);
4385     if (!s.ok()) {
4386       break;
4387     }
4388     if (edit.has_db_id_) {
4389       db_id_ = edit.GetDbId();
4390       if (db_id != nullptr) {
4391         db_id->assign(edit.GetDbId());
4392       }
4393     }
4394     s = read_buffer->AddEdit(&edit);
4395     if (!s.ok()) {
4396       break;
4397     }
4398     if (edit.is_in_atomic_group_) {
4399       if (read_buffer->IsFull()) {
4400         // Apply edits in an atomic group when we have read all edits in the
4401         // group.
4402         for (auto& e : read_buffer->replay_buffer()) {
4403           s = ApplyOneVersionEditToBuilder(e, name_to_options,
4404                                            column_families_not_found, builders,
4405                                            version_edit_params);
4406           if (!s.ok()) {
4407             break;
4408           }
4409           recovered_edits++;
4410         }
4411         if (!s.ok()) {
4412           break;
4413         }
4414         read_buffer->Clear();
4415       }
4416     } else {
4417       // Apply a normal edit immediately.
4418       s = ApplyOneVersionEditToBuilder(edit, name_to_options,
4419                                        column_families_not_found, builders,
4420                                        version_edit_params);
4421       if (s.ok()) {
4422         recovered_edits++;
4423       }
4424     }
4425   }
4426   if (!s.ok()) {
4427     // Clear the buffer if we fail to decode/apply an edit.
4428     read_buffer->Clear();
4429   }
4430   TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4431                            &recovered_edits);
4432   return s;
4433 }
4434 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4435 Status VersionSet::Recover(
4436     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4437     std::string* db_id) {
4438   std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4439   for (const auto& cf : column_families) {
4440     cf_name_to_options.emplace(cf.name, cf.options);
4441   }
4442   // keeps track of column families in manifest that were not found in
4443   // column families parameters. if those column families are not dropped
4444   // by subsequent manifest records, Recover() will return failure status
4445   std::unordered_map<int, std::string> column_families_not_found;
4446 
4447   // Read "CURRENT" file, which contains a pointer to the current manifest file
4448   std::string manifest_path;
4449   Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
4450                                     &manifest_file_number_);
4451   if (!s.ok()) {
4452     return s;
4453   }
4454 
4455   ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4456                  manifest_path.c_str());
4457 
4458   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4459   {
4460     std::unique_ptr<FSSequentialFile> manifest_file;
4461     s = fs_->NewSequentialFile(manifest_path,
4462                                fs_->OptimizeForManifestRead(file_options_),
4463                                &manifest_file, nullptr);
4464     if (!s.ok()) {
4465       return s;
4466     }
4467     manifest_file_reader.reset(
4468         new SequentialFileReader(std::move(manifest_file), manifest_path,
4469                                  db_options_->log_readahead_size));
4470   }
4471 
4472   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4473       builders;
4474 
4475   // add default column family
4476   auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
4477   if (default_cf_iter == cf_name_to_options.end()) {
4478     return Status::InvalidArgument("Default column family not specified");
4479   }
4480   VersionEdit default_cf_edit;
4481   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4482   default_cf_edit.SetColumnFamily(0);
4483   ColumnFamilyData* default_cfd =
4484       CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4485   // In recovery, nobody else can access it, so it's fine to set it to be
4486   // initialized earlier.
4487   default_cfd->set_initialized();
4488   builders.insert(
4489       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4490                             new BaseReferencedVersionBuilder(default_cfd))));
4491   uint64_t current_manifest_file_size = 0;
4492   VersionEditParams version_edit_params;
4493   {
4494     VersionSet::LogReporter reporter;
4495     reporter.status = &s;
4496     log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4497                        true /* checksum */, 0 /* log_number */);
4498     AtomicGroupReadBuffer read_buffer;
4499     s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
4500                        column_families_not_found, builders,
4501                        &version_edit_params, db_id);
4502     current_manifest_file_size = reader.GetReadOffset();
4503     assert(current_manifest_file_size != 0);
4504   }
4505 
4506   if (s.ok()) {
4507     if (!version_edit_params.has_next_file_number_) {
4508       s = Status::Corruption("no meta-nextfile entry in descriptor");
4509     } else if (!version_edit_params.has_log_number_) {
4510       s = Status::Corruption("no meta-lognumber entry in descriptor");
4511     } else if (!version_edit_params.has_last_sequence_) {
4512       s = Status::Corruption("no last-sequence-number entry in descriptor");
4513     }
4514 
4515     if (!version_edit_params.has_prev_log_number_) {
4516       version_edit_params.SetPrevLogNumber(0);
4517     }
4518 
4519     column_family_set_->UpdateMaxColumnFamily(
4520         version_edit_params.max_column_family_);
4521 
4522     // When reading DB generated using old release, min_log_number_to_keep=0.
4523     // All log files will be scanned for potential prepare entries.
4524     MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
4525     MarkFileNumberUsed(version_edit_params.prev_log_number_);
4526     MarkFileNumberUsed(version_edit_params.log_number_);
4527   }
4528 
4529   // there were some column families in the MANIFEST that weren't specified
4530   // in the argument. This is OK in read_only mode
4531   if (read_only == false && !column_families_not_found.empty()) {
4532     std::string list_of_not_found;
4533     for (const auto& cf : column_families_not_found) {
4534       list_of_not_found += ", " + cf.second;
4535     }
4536     list_of_not_found = list_of_not_found.substr(2);
4537     s = Status::InvalidArgument(
4538         "You have to open all column families. Column families not opened: " +
4539         list_of_not_found);
4540   }
4541 
4542   if (s.ok()) {
4543     for (auto cfd : *column_family_set_) {
4544       assert(builders.count(cfd->GetID()) > 0);
4545       auto* builder = builders[cfd->GetID()]->version_builder();
4546       if (!builder->CheckConsistencyForNumLevels()) {
4547         s = Status::InvalidArgument(
4548             "db has more levels than options.num_levels");
4549         break;
4550       }
4551     }
4552   }
4553 
4554   if (s.ok()) {
4555     for (auto cfd : *column_family_set_) {
4556       if (cfd->IsDropped()) {
4557         continue;
4558       }
4559       if (read_only) {
4560         cfd->table_cache()->SetTablesAreImmortal();
4561       }
4562       assert(cfd->initialized());
4563       auto builders_iter = builders.find(cfd->GetID());
4564       assert(builders_iter != builders.end());
4565       auto builder = builders_iter->second->version_builder();
4566 
4567       // unlimited table cache. Pre-load table handle now.
4568       // Need to do it out of the mutex.
4569       s = builder->LoadTableHandlers(
4570           cfd->internal_stats(), db_options_->max_file_opening_threads,
4571           false /* prefetch_index_and_filter_in_cache */,
4572           true /* is_initial_load */,
4573           cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4574       if (!s.ok()) {
4575         if (db_options_->paranoid_checks) {
4576           return s;
4577         }
4578         s = Status::OK();
4579       }
4580 
4581       Version* v = new Version(cfd, this, file_options_,
4582                                *cfd->GetLatestMutableCFOptions(),
4583                                current_version_number_++);
4584       builder->SaveTo(v->storage_info());
4585 
4586       // Install recovered version
4587       v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
4588           !(db_options_->skip_stats_update_on_db_open));
4589       AppendVersion(cfd, v);
4590     }
4591 
4592     manifest_file_size_ = current_manifest_file_size;
4593     next_file_number_.store(version_edit_params.next_file_number_ + 1);
4594     last_allocated_sequence_ = version_edit_params.last_sequence_;
4595     last_published_sequence_ = version_edit_params.last_sequence_;
4596     last_sequence_ = version_edit_params.last_sequence_;
4597     prev_log_number_ = version_edit_params.prev_log_number_;
4598 
4599     ROCKS_LOG_INFO(
4600         db_options_->info_log,
4601         "Recovered from manifest file:%s succeeded,"
4602         "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4603         ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4604         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4605         ",min_log_number_to_keep is %" PRIu64 "\n",
4606         manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4607         last_sequence_.load(), version_edit_params.log_number_,
4608         prev_log_number_, column_family_set_->GetMaxColumnFamily(),
4609         min_log_number_to_keep_2pc());
4610 
4611     for (auto cfd : *column_family_set_) {
4612       if (cfd->IsDropped()) {
4613         continue;
4614       }
4615       ROCKS_LOG_INFO(db_options_->info_log,
4616                      "Column family [%s] (ID %" PRIu32
4617                      "), log number is %" PRIu64 "\n",
4618                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4619     }
4620   }
4621 
4622   return s;
4623 }
4624 
4625 namespace {
4626 class ManifestPicker {
4627  public:
4628   explicit ManifestPicker(const std::string& dbname, FileSystem* fs);
4629   void SeekToFirstManifest();
4630   // REQUIRES Valid() == true
4631   std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
Valid() const4632   bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
status() const4633   const Status& status() const { return status_; }
4634 
4635  private:
4636   const std::string& dbname_;
4637   FileSystem* const fs_;
4638   // MANIFEST file names(s)
4639   std::vector<std::string> manifest_files_;
4640   std::vector<std::string>::const_iterator manifest_file_iter_;
4641   Status status_;
4642 };
4643 
ManifestPicker(const std::string & dbname,FileSystem * fs)4644 ManifestPicker::ManifestPicker(const std::string& dbname, FileSystem* fs)
4645     : dbname_(dbname), fs_(fs) {}
4646 
SeekToFirstManifest()4647 void ManifestPicker::SeekToFirstManifest() {
4648   assert(fs_ != nullptr);
4649   std::vector<std::string> children;
4650   Status s = fs_->GetChildren(dbname_, IOOptions(), &children, /*dbg=*/nullptr);
4651   if (!s.ok()) {
4652     status_ = s;
4653     return;
4654   }
4655   for (const auto& fname : children) {
4656     uint64_t file_num = 0;
4657     FileType file_type;
4658     bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4659     if (parse_ok && file_type == kDescriptorFile) {
4660       manifest_files_.push_back(fname);
4661     }
4662   }
4663   std::sort(manifest_files_.begin(), manifest_files_.end(),
4664             [](const std::string& lhs, const std::string& rhs) {
4665               uint64_t num1 = 0;
4666               uint64_t num2 = 0;
4667               FileType type1;
4668               FileType type2;
4669               bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4670               bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4671 #ifndef NDEBUG
4672               assert(parse_ok1);
4673               assert(parse_ok2);
4674 #else
4675               (void)parse_ok1;
4676               (void)parse_ok2;
4677 #endif
4678               return num1 > num2;
4679             });
4680   manifest_file_iter_ = manifest_files_.begin();
4681 }
4682 
GetNextManifest(uint64_t * number,std::string * file_name)4683 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4684                                             std::string* file_name) {
4685   assert(status_.ok());
4686   assert(Valid());
4687   std::string ret;
4688   if (manifest_file_iter_ != manifest_files_.end()) {
4689     ret.assign(dbname_);
4690     if (ret.back() != kFilePathSeparator) {
4691       ret.push_back(kFilePathSeparator);
4692     }
4693     ret.append(*manifest_file_iter_);
4694     if (number) {
4695       FileType type;
4696       bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4697       assert(type == kDescriptorFile);
4698 #ifndef NDEBUG
4699       assert(parse);
4700 #else
4701       (void)parse;
4702 #endif
4703     }
4704     if (file_name) {
4705       *file_name = *manifest_file_iter_;
4706     }
4707     ++manifest_file_iter_;
4708   }
4709   return ret;
4710 }
4711 }  // namespace
4712 
TryRecover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)4713 Status VersionSet::TryRecover(
4714     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4715     std::string* db_id, bool* has_missing_table_file) {
4716   ManifestPicker manifest_picker(dbname_, fs_);
4717   manifest_picker.SeekToFirstManifest();
4718   Status s = manifest_picker.status();
4719   if (!s.ok()) {
4720     return s;
4721   }
4722   if (!manifest_picker.Valid()) {
4723     return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4724   }
4725   std::string manifest_path =
4726       manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4727   while (!manifest_path.empty()) {
4728     s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
4729                                   db_id, has_missing_table_file);
4730     if (s.ok() || !manifest_picker.Valid()) {
4731       break;
4732     }
4733     Reset();
4734     manifest_path =
4735         manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4736   }
4737   return s;
4738 }
4739 
TryRecoverFromOneManifest(const std::string & manifest_path,const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)4740 Status VersionSet::TryRecoverFromOneManifest(
4741     const std::string& manifest_path,
4742     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4743     std::string* db_id, bool* has_missing_table_file) {
4744   ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
4745                  manifest_path.c_str());
4746   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4747   Status s;
4748   {
4749     std::unique_ptr<FSSequentialFile> manifest_file;
4750     s = fs_->NewSequentialFile(manifest_path,
4751                                fs_->OptimizeForManifestRead(file_options_),
4752                                &manifest_file, nullptr);
4753     if (!s.ok()) {
4754       return s;
4755     }
4756     manifest_file_reader.reset(
4757         new SequentialFileReader(std::move(manifest_file), manifest_path,
4758                                  db_options_->log_readahead_size));
4759   }
4760 
4761   VersionSet::LogReporter reporter;
4762   reporter.status = &s;
4763   log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4764                      /*checksum=*/true, /*log_num=*/0);
4765   {
4766     VersionEditHandlerPointInTime handler_pit(read_only, column_families,
4767                                               const_cast<VersionSet*>(this));
4768 
4769     s = handler_pit.Iterate(reader, db_id);
4770 
4771     assert(nullptr != has_missing_table_file);
4772     *has_missing_table_file = handler_pit.HasMissingFiles();
4773   }
4774 
4775   return s;
4776 }
4777 
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)4778 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4779                                       const std::string& dbname,
4780                                       FileSystem* fs) {
4781   // these are just for performance reasons, not correcntes,
4782   // so we're fine using the defaults
4783   FileOptions soptions;
4784   // Read "CURRENT" file, which contains a pointer to the current manifest file
4785   std::string manifest_path;
4786   uint64_t manifest_file_number;
4787   Status s =
4788       GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4789   if (!s.ok()) {
4790     return s;
4791   }
4792 
4793   std::unique_ptr<SequentialFileReader> file_reader;
4794   {
4795     std::unique_ptr<FSSequentialFile> file;
4796     s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4797     if (!s.ok()) {
4798       return s;
4799   }
4800   file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
4801   }
4802 
4803   std::map<uint32_t, std::string> column_family_names;
4804   // default column family is always implicitly there
4805   column_family_names.insert({0, kDefaultColumnFamilyName});
4806   VersionSet::LogReporter reporter;
4807   reporter.status = &s;
4808   log::Reader reader(nullptr, std::move(file_reader), &reporter,
4809                      true /* checksum */, 0 /* log_number */);
4810   Slice record;
4811   std::string scratch;
4812   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4813     VersionEdit edit;
4814     s = edit.DecodeFrom(record);
4815     if (!s.ok()) {
4816       break;
4817     }
4818     if (edit.is_column_family_add_) {
4819       if (column_family_names.find(edit.column_family_) !=
4820           column_family_names.end()) {
4821         s = Status::Corruption("Manifest adding the same column family twice");
4822         break;
4823       }
4824       column_family_names.insert(
4825           {edit.column_family_, edit.column_family_name_});
4826     } else if (edit.is_column_family_drop_) {
4827       if (column_family_names.find(edit.column_family_) ==
4828           column_family_names.end()) {
4829         s = Status::Corruption(
4830             "Manifest - dropping non-existing column family");
4831         break;
4832       }
4833       column_family_names.erase(edit.column_family_);
4834     }
4835   }
4836 
4837   column_families->clear();
4838   if (s.ok()) {
4839     for (const auto& iter : column_family_names) {
4840       column_families->push_back(iter.second);
4841     }
4842   }
4843 
4844   return s;
4845 }
4846 
4847 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)4848 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4849                                         const Options* options,
4850                                         const FileOptions& file_options,
4851                                         int new_levels) {
4852   if (new_levels <= 1) {
4853     return Status::InvalidArgument(
4854         "Number of levels needs to be bigger than 1");
4855   }
4856 
4857   ImmutableDBOptions db_options(*options);
4858   ColumnFamilyOptions cf_options(*options);
4859   std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4860                                         options->table_cache_numshardbits));
4861   WriteController wc(options->delayed_write_rate);
4862   WriteBufferManager wb(options->db_write_buffer_size);
4863   VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4864                       /*block_cache_tracer=*/nullptr);
4865   Status status;
4866 
4867   std::vector<ColumnFamilyDescriptor> dummy;
4868   ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4869                                           ColumnFamilyOptions(*options));
4870   dummy.push_back(dummy_descriptor);
4871   status = versions.Recover(dummy);
4872   if (!status.ok()) {
4873     return status;
4874   }
4875 
4876   Version* current_version =
4877       versions.GetColumnFamilySet()->GetDefault()->current();
4878   auto* vstorage = current_version->storage_info();
4879   int current_levels = vstorage->num_levels();
4880 
4881   if (current_levels <= new_levels) {
4882     return Status::OK();
4883   }
4884 
4885   // Make sure there are file only on one level from
4886   // (new_levels-1) to (current_levels-1)
4887   int first_nonempty_level = -1;
4888   int first_nonempty_level_filenum = 0;
4889   for (int i = new_levels - 1; i < current_levels; i++) {
4890     int file_num = vstorage->NumLevelFiles(i);
4891     if (file_num != 0) {
4892       if (first_nonempty_level < 0) {
4893         first_nonempty_level = i;
4894         first_nonempty_level_filenum = file_num;
4895       } else {
4896         char msg[255];
4897         snprintf(msg, sizeof(msg),
4898                  "Found at least two levels containing files: "
4899                  "[%d:%d],[%d:%d].\n",
4900                  first_nonempty_level, first_nonempty_level_filenum, i,
4901                  file_num);
4902         return Status::InvalidArgument(msg);
4903       }
4904     }
4905   }
4906 
4907   // we need to allocate an array with the old number of levels size to
4908   // avoid SIGSEGV in WriteCurrentStatetoManifest()
4909   // however, all levels bigger or equal to new_levels will be empty
4910   std::vector<FileMetaData*>* new_files_list =
4911       new std::vector<FileMetaData*>[current_levels];
4912   for (int i = 0; i < new_levels - 1; i++) {
4913     new_files_list[i] = vstorage->LevelFiles(i);
4914   }
4915 
4916   if (first_nonempty_level > 0) {
4917     new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4918   }
4919 
4920   delete[] vstorage -> files_;
4921   vstorage->files_ = new_files_list;
4922   vstorage->num_levels_ = new_levels;
4923 
4924   MutableCFOptions mutable_cf_options(*options);
4925   VersionEdit ve;
4926   InstrumentedMutex dummy_mutex;
4927   InstrumentedMutexLock l(&dummy_mutex);
4928   return versions.LogAndApply(
4929       versions.GetColumnFamilySet()->GetDefault(),
4930       mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4931 }
4932 
4933 // Get the checksum information including the checksum and checksum function
4934 // name of all SST files in VersionSet. Store the information in
4935 // FileChecksumList which contains a map from file number to its checksum info.
4936 // If DB is not running, make sure call VersionSet::Recover() to load the file
4937 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)4938 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4939   // Clean the previously stored checksum information if any.
4940   if (checksum_list == nullptr) {
4941     return Status::InvalidArgument("checksum_list is nullptr");
4942   }
4943   checksum_list->reset();
4944 
4945   for (auto cfd : *column_family_set_) {
4946     if (cfd->IsDropped() || !cfd->initialized()) {
4947       continue;
4948     }
4949     for (int level = 0; level < cfd->NumberLevels(); level++) {
4950       for (const auto& file :
4951            cfd->current()->storage_info()->LevelFiles(level)) {
4952         checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
4953                                              file->file_checksum,
4954                                              file->file_checksum_func_name);
4955       }
4956     }
4957   }
4958   return Status::OK();
4959 }
4960 
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)4961 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4962                                 bool verbose, bool hex, bool json) {
4963   // Open the specified manifest file.
4964   std::unique_ptr<SequentialFileReader> file_reader;
4965   Status s;
4966   {
4967     std::unique_ptr<FSSequentialFile> file;
4968     const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
4969     s = fs->NewSequentialFile(
4970         dscname,
4971         fs->OptimizeForManifestRead(file_options_), &file,
4972         nullptr);
4973     if (!s.ok()) {
4974       return s;
4975     }
4976     file_reader.reset(new SequentialFileReader(
4977         std::move(file), dscname, db_options_->log_readahead_size));
4978   }
4979 
4980   bool have_prev_log_number = false;
4981   bool have_next_file = false;
4982   bool have_last_sequence = false;
4983   uint64_t next_file = 0;
4984   uint64_t last_sequence = 0;
4985   uint64_t previous_log_number = 0;
4986   int count = 0;
4987   std::unordered_map<uint32_t, std::string> comparators;
4988   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4989       builders;
4990 
4991   // add default column family
4992   VersionEdit default_cf_edit;
4993   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4994   default_cf_edit.SetColumnFamily(0);
4995   ColumnFamilyData* default_cfd =
4996       CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
4997   builders.insert(
4998       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4999                             new BaseReferencedVersionBuilder(default_cfd))));
5000 
5001   {
5002     VersionSet::LogReporter reporter;
5003     reporter.status = &s;
5004     log::Reader reader(nullptr, std::move(file_reader), &reporter,
5005                        true /* checksum */, 0 /* log_number */);
5006     Slice record;
5007     std::string scratch;
5008     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
5009       VersionEdit edit;
5010       s = edit.DecodeFrom(record);
5011       if (!s.ok()) {
5012         break;
5013       }
5014 
5015       // Write out each individual edit
5016       if (verbose && !json) {
5017         printf("%s\n", edit.DebugString(hex).c_str());
5018       } else if (json) {
5019         printf("%s\n", edit.DebugJSON(count, hex).c_str());
5020       }
5021       count++;
5022 
5023       bool cf_in_builders =
5024           builders.find(edit.column_family_) != builders.end();
5025 
5026       if (edit.has_comparator_) {
5027         comparators.insert({edit.column_family_, edit.comparator_});
5028       }
5029 
5030       ColumnFamilyData* cfd = nullptr;
5031 
5032       if (edit.is_column_family_add_) {
5033         if (cf_in_builders) {
5034           s = Status::Corruption(
5035               "Manifest adding the same column family twice");
5036           break;
5037         }
5038         cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
5039         cfd->set_initialized();
5040         builders.insert(std::make_pair(
5041             edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
5042                                      new BaseReferencedVersionBuilder(cfd))));
5043       } else if (edit.is_column_family_drop_) {
5044         if (!cf_in_builders) {
5045           s = Status::Corruption(
5046               "Manifest - dropping non-existing column family");
5047           break;
5048         }
5049         auto builder_iter = builders.find(edit.column_family_);
5050         builders.erase(builder_iter);
5051         comparators.erase(edit.column_family_);
5052         cfd = column_family_set_->GetColumnFamily(edit.column_family_);
5053         assert(cfd != nullptr);
5054         cfd->UnrefAndTryDelete();
5055         cfd = nullptr;
5056       } else {
5057         if (!cf_in_builders) {
5058           s = Status::Corruption(
5059               "Manifest record referencing unknown column family");
5060           break;
5061         }
5062 
5063         cfd = column_family_set_->GetColumnFamily(edit.column_family_);
5064         // this should never happen since cf_in_builders is true
5065         assert(cfd != nullptr);
5066 
5067         // if it is not column family add or column family drop,
5068         // then it's a file add/delete, which should be forwarded
5069         // to builder
5070         auto builder = builders.find(edit.column_family_);
5071         assert(builder != builders.end());
5072         s = builder->second->version_builder()->Apply(&edit);
5073         if (!s.ok()) {
5074           break;
5075         }
5076       }
5077 
5078       if (cfd != nullptr && edit.has_log_number_) {
5079         cfd->SetLogNumber(edit.log_number_);
5080       }
5081 
5082 
5083       if (edit.has_prev_log_number_) {
5084         previous_log_number = edit.prev_log_number_;
5085         have_prev_log_number = true;
5086       }
5087 
5088       if (edit.has_next_file_number_) {
5089         next_file = edit.next_file_number_;
5090         have_next_file = true;
5091       }
5092 
5093       if (edit.has_last_sequence_) {
5094         last_sequence = edit.last_sequence_;
5095         have_last_sequence = true;
5096       }
5097 
5098       if (edit.has_max_column_family_) {
5099         column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
5100       }
5101 
5102       if (edit.has_min_log_number_to_keep_) {
5103         MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
5104       }
5105     }
5106   }
5107   file_reader.reset();
5108 
5109   if (s.ok()) {
5110     if (!have_next_file) {
5111       s = Status::Corruption("no meta-nextfile entry in descriptor");
5112       printf("no meta-nextfile entry in descriptor");
5113     } else if (!have_last_sequence) {
5114       printf("no last-sequence-number entry in descriptor");
5115       s = Status::Corruption("no last-sequence-number entry in descriptor");
5116     }
5117 
5118     if (!have_prev_log_number) {
5119       previous_log_number = 0;
5120     }
5121   }
5122 
5123   if (s.ok()) {
5124     for (auto cfd : *column_family_set_) {
5125       if (cfd->IsDropped()) {
5126         continue;
5127       }
5128       auto builders_iter = builders.find(cfd->GetID());
5129       assert(builders_iter != builders.end());
5130       auto builder = builders_iter->second->version_builder();
5131 
5132       Version* v = new Version(cfd, this, file_options_,
5133                                *cfd->GetLatestMutableCFOptions(),
5134                                current_version_number_++);
5135       builder->SaveTo(v->storage_info());
5136       v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
5137 
5138       printf("--------------- Column family \"%s\"  (ID %" PRIu32
5139              ") --------------\n",
5140              cfd->GetName().c_str(), cfd->GetID());
5141       printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
5142       auto comparator = comparators.find(cfd->GetID());
5143       if (comparator != comparators.end()) {
5144         printf("comparator: %s\n", comparator->second.c_str());
5145       } else {
5146         printf("comparator: <NO COMPARATOR>\n");
5147       }
5148       printf("%s \n", v->DebugString(hex).c_str());
5149       delete v;
5150     }
5151 
5152     next_file_number_.store(next_file + 1);
5153     last_allocated_sequence_ = last_sequence;
5154     last_published_sequence_ = last_sequence;
5155     last_sequence_ = last_sequence;
5156     prev_log_number_ = previous_log_number;
5157 
5158     printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
5159            "  prev_log_number %" PRIu64 " max_column_family %" PRIu32
5160            " min_log_number_to_keep "
5161            "%" PRIu64 "\n",
5162            next_file_number_.load(), last_sequence, previous_log_number,
5163            column_family_set_->GetMaxColumnFamily(),
5164            min_log_number_to_keep_2pc());
5165   }
5166 
5167   return s;
5168 }
5169 #endif  // ROCKSDB_LITE
5170 
MarkFileNumberUsed(uint64_t number)5171 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5172   // only called during recovery and repair which are single threaded, so this
5173   // works because there can't be concurrent calls
5174   if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5175     next_file_number_.store(number + 1, std::memory_order_relaxed);
5176   }
5177 }
5178 // Called only either from ::LogAndApply which is protected by mutex or during
5179 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)5180 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5181   if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5182     min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5183   }
5184 }
5185 
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,log::Writer * log)5186 Status VersionSet::WriteCurrentStateToManifest(
5187     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5188     log::Writer* log) {
5189   // TODO: Break up into multiple records to reduce memory usage on recovery?
5190 
5191   // WARNING: This method doesn't hold a mutex!!
5192 
5193   // This is done without DB mutex lock held, but only within single-threaded
5194   // LogAndApply. Column family manipulations can only happen within LogAndApply
5195   // (the same single thread), so we're safe to iterate.
5196 
5197   if (db_options_->write_dbid_to_manifest) {
5198     VersionEdit edit_for_db_id;
5199     assert(!db_id_.empty());
5200     edit_for_db_id.SetDBId(db_id_);
5201     std::string db_id_record;
5202     if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5203       return Status::Corruption("Unable to Encode VersionEdit:" +
5204                                 edit_for_db_id.DebugString(true));
5205     }
5206     IOStatus io_s = log->AddRecord(db_id_record);
5207     if (!io_s.ok()) {
5208       io_status_ = io_s;
5209       return std::move(io_s);
5210     }
5211   }
5212 
5213   for (auto cfd : *column_family_set_) {
5214     if (cfd->IsDropped()) {
5215       continue;
5216     }
5217     assert(cfd->initialized());
5218     {
5219       // Store column family info
5220       VersionEdit edit;
5221       if (cfd->GetID() != 0) {
5222         // default column family is always there,
5223         // no need to explicitly write it
5224         edit.AddColumnFamily(cfd->GetName());
5225         edit.SetColumnFamily(cfd->GetID());
5226       }
5227       edit.SetComparatorName(
5228           cfd->internal_comparator().user_comparator()->Name());
5229       std::string record;
5230       if (!edit.EncodeTo(&record)) {
5231         return Status::Corruption(
5232             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5233       }
5234       IOStatus io_s = log->AddRecord(record);
5235       if (!io_s.ok()) {
5236         io_status_ = io_s;
5237         return std::move(io_s);
5238       }
5239     }
5240 
5241     {
5242       // Save files
5243       VersionEdit edit;
5244       edit.SetColumnFamily(cfd->GetID());
5245 
5246       for (int level = 0; level < cfd->NumberLevels(); level++) {
5247         for (const auto& f :
5248              cfd->current()->storage_info()->LevelFiles(level)) {
5249           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5250                        f->fd.GetFileSize(), f->smallest, f->largest,
5251                        f->fd.smallest_seqno, f->fd.largest_seqno,
5252                        f->marked_for_compaction, f->oldest_blob_file_number,
5253                        f->oldest_ancester_time, f->file_creation_time,
5254                        f->file_checksum, f->file_checksum_func_name);
5255         }
5256       }
5257       const auto iter = curr_state.find(cfd->GetID());
5258       assert(iter != curr_state.end());
5259       uint64_t log_number = iter->second.log_number;
5260       edit.SetLogNumber(log_number);
5261       std::string record;
5262       if (!edit.EncodeTo(&record)) {
5263         return Status::Corruption(
5264             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5265       }
5266       IOStatus io_s = log->AddRecord(record);
5267       if (!io_s.ok()) {
5268         io_status_ = io_s;
5269         return std::move(io_s);
5270       }
5271     }
5272   }
5273   return Status::OK();
5274 }
5275 
5276 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5277 // function is called repeatedly with consecutive pairs of slices. For example
5278 // if the slice list is [a, b, c, d] this function is called with arguments
5279 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5280 // we avoid doing binary search for the keys b and c twice and instead somehow
5281 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5282 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5283                                      Version* v, const Slice& start,
5284                                      const Slice& end, int start_level,
5285                                      int end_level, TableReaderCaller caller) {
5286   const auto& icmp = v->cfd_->internal_comparator();
5287 
5288   // pre-condition
5289   assert(icmp.Compare(start, end) <= 0);
5290 
5291   uint64_t total_full_size = 0;
5292   const auto* vstorage = v->storage_info();
5293   const int num_non_empty_levels = vstorage->num_non_empty_levels();
5294   end_level = (end_level == -1) ? num_non_empty_levels
5295                                 : std::min(end_level, num_non_empty_levels);
5296 
5297   assert(start_level <= end_level);
5298 
5299   // Outline of the optimization that uses options.files_size_error_margin.
5300   // When approximating the files total size that is used to store a keys range,
5301   // we first sum up the sizes of the files that fully fall into the range.
5302   // Then we sum up the sizes of all the files that may intersect with the range
5303   // (this includes all files in L0 as well). Then, if total_intersecting_size
5304   // is smaller than total_full_size * options.files_size_error_margin - we can
5305   // infer that the intersecting files have a sufficiently negligible
5306   // contribution to the total size, and we can approximate the storage required
5307   // for the keys in range as just half of the intersecting_files_size.
5308   // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5309   // approximation is limited to only ~10% of the total size of files that fully
5310   // fall into the keys range. In such case, this helps to avoid a costly
5311   // process of binary searching the intersecting files that is required only
5312   // for a more precise calculation of the total size.
5313 
5314   autovector<FdWithKeyRange*, 32> first_files;
5315   autovector<FdWithKeyRange*, 16> last_files;
5316 
5317   // scan all the levels
5318   for (int level = start_level; level < end_level; ++level) {
5319     const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5320     if (files_brief.num_files == 0) {
5321       // empty level, skip exploration
5322       continue;
5323     }
5324 
5325     if (level == 0) {
5326       // level 0 files are not in sorted order, we need to iterate through
5327       // the list to compute the total bytes that require scanning,
5328       // so handle the case explicitly (similarly to first_files case)
5329       for (size_t i = 0; i < files_brief.num_files; i++) {
5330         first_files.push_back(&files_brief.files[i]);
5331       }
5332       continue;
5333     }
5334 
5335     assert(level > 0);
5336     assert(files_brief.num_files > 0);
5337 
5338     // identify the file position for start key
5339     const int idx_start =
5340         FindFileInRange(icmp, files_brief, start, 0,
5341                         static_cast<uint32_t>(files_brief.num_files - 1));
5342     assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5343 
5344     // identify the file position for end key
5345     int idx_end = idx_start;
5346     if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5347       idx_end =
5348           FindFileInRange(icmp, files_brief, end, idx_start,
5349                           static_cast<uint32_t>(files_brief.num_files - 1));
5350     }
5351     assert(idx_end >= idx_start &&
5352            static_cast<size_t>(idx_end) < files_brief.num_files);
5353 
5354     // scan all files from the starting index to the ending index
5355     // (inferred from the sorted order)
5356 
5357     // first scan all the intermediate full files (excluding first and last)
5358     for (int i = idx_start + 1; i < idx_end; ++i) {
5359       uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5360       // The entire file falls into the range, so we can just take its size.
5361       assert(file_size ==
5362              ApproximateSize(v, files_brief.files[i], start, end, caller));
5363       total_full_size += file_size;
5364     }
5365 
5366     // save the first and the last files (which may be the same file), so we
5367     // can scan them later.
5368     first_files.push_back(&files_brief.files[idx_start]);
5369     if (idx_start != idx_end) {
5370       // we need to estimate size for both files, only if they are different
5371       last_files.push_back(&files_brief.files[idx_end]);
5372     }
5373   }
5374 
5375   // The sum of all file sizes that intersect the [start, end] keys range.
5376   uint64_t total_intersecting_size = 0;
5377   for (const auto* file_ptr : first_files) {
5378     total_intersecting_size += file_ptr->fd.GetFileSize();
5379   }
5380   for (const auto* file_ptr : last_files) {
5381     total_intersecting_size += file_ptr->fd.GetFileSize();
5382   }
5383 
5384   // Now scan all the first & last files at each level, and estimate their size.
5385   // If the total_intersecting_size is less than X% of the total_full_size - we
5386   // want to approximate the result in order to avoid the costly binary search
5387   // inside ApproximateSize. We use half of file size as an approximation below.
5388 
5389   const double margin = options.files_size_error_margin;
5390   if (margin > 0 && total_intersecting_size <
5391                         static_cast<uint64_t>(total_full_size * margin)) {
5392     total_full_size += total_intersecting_size / 2;
5393   } else {
5394     // Estimate for all the first files, at each level
5395     for (const auto file_ptr : first_files) {
5396       total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5397     }
5398 
5399     // Estimate for all the last files, at each level
5400     for (const auto file_ptr : last_files) {
5401       // We could use ApproximateSize here, but calling ApproximateOffsetOf
5402       // directly is just more efficient.
5403       total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5404     }
5405   }
5406 
5407   return total_full_size;
5408 }
5409 
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5410 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5411                                          const Slice& key,
5412                                          TableReaderCaller caller) {
5413   // pre-condition
5414   assert(v);
5415   const auto& icmp = v->cfd_->internal_comparator();
5416 
5417   uint64_t result = 0;
5418   if (icmp.Compare(f.largest_key, key) <= 0) {
5419     // Entire file is before "key", so just add the file size
5420     result = f.fd.GetFileSize();
5421   } else if (icmp.Compare(f.smallest_key, key) > 0) {
5422     // Entire file is after "key", so ignore
5423     result = 0;
5424   } else {
5425     // "key" falls in the range for this table.  Add the
5426     // approximate offset of "key" within the table.
5427     TableCache* table_cache = v->cfd_->table_cache();
5428     if (table_cache != nullptr) {
5429       result = table_cache->ApproximateOffsetOf(
5430           key, f.file_metadata->fd, caller, icmp,
5431           v->GetMutableCFOptions().prefix_extractor.get());
5432     }
5433   }
5434   return result;
5435 }
5436 
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5437 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5438                                      const Slice& start, const Slice& end,
5439                                      TableReaderCaller caller) {
5440   // pre-condition
5441   assert(v);
5442   const auto& icmp = v->cfd_->internal_comparator();
5443   assert(icmp.Compare(start, end) <= 0);
5444 
5445   if (icmp.Compare(f.largest_key, start) <= 0 ||
5446       icmp.Compare(f.smallest_key, end) > 0) {
5447     // Entire file is before or after the start/end keys range
5448     return 0;
5449   }
5450 
5451   if (icmp.Compare(f.smallest_key, start) >= 0) {
5452     // Start of the range is before the file start - approximate by end offset
5453     return ApproximateOffsetOf(v, f, end, caller);
5454   }
5455 
5456   if (icmp.Compare(f.largest_key, end) < 0) {
5457     // End of the range is after the file end - approximate by subtracting
5458     // start offset from the file size
5459     uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5460     assert(f.fd.GetFileSize() >= start_offset);
5461     return f.fd.GetFileSize() - start_offset;
5462   }
5463 
5464   // The interval falls entirely in the range for this file.
5465   TableCache* table_cache = v->cfd_->table_cache();
5466   if (table_cache == nullptr) {
5467     return 0;
5468   }
5469   return table_cache->ApproximateSize(
5470       start, end, f.file_metadata->fd, caller, icmp,
5471       v->GetMutableCFOptions().prefix_extractor.get());
5472 }
5473 
AddLiveFiles(std::vector<FileDescriptor> * live_list)5474 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
5475   // pre-calculate space requirement
5476   int64_t total_files = 0;
5477   for (auto cfd : *column_family_set_) {
5478     if (!cfd->initialized()) {
5479       continue;
5480     }
5481     Version* dummy_versions = cfd->dummy_versions();
5482     for (Version* v = dummy_versions->next_; v != dummy_versions;
5483          v = v->next_) {
5484       const auto* vstorage = v->storage_info();
5485       for (int level = 0; level < vstorage->num_levels(); level++) {
5486         total_files += vstorage->LevelFiles(level).size();
5487       }
5488     }
5489   }
5490 
5491   // just one time extension to the right size
5492   live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
5493 
5494   for (auto cfd : *column_family_set_) {
5495     if (!cfd->initialized()) {
5496       continue;
5497     }
5498     auto* current = cfd->current();
5499     bool found_current = false;
5500     Version* dummy_versions = cfd->dummy_versions();
5501     for (Version* v = dummy_versions->next_; v != dummy_versions;
5502          v = v->next_) {
5503       v->AddLiveFiles(live_list);
5504       if (v == current) {
5505         found_current = true;
5506       }
5507     }
5508     if (!found_current && current != nullptr) {
5509       // Should never happen unless it is a bug.
5510       assert(false);
5511       current->AddLiveFiles(live_list);
5512     }
5513   }
5514 }
5515 
MakeInputIterator(const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5516 InternalIterator* VersionSet::MakeInputIterator(
5517     const Compaction* c, RangeDelAggregator* range_del_agg,
5518     const FileOptions& file_options_compactions) {
5519   auto cfd = c->column_family_data();
5520   ReadOptions read_options;
5521   read_options.verify_checksums = true;
5522   read_options.fill_cache = false;
5523   // Compaction iterators shouldn't be confined to a single prefix.
5524   // Compactions use Seek() for
5525   // (a) concurrent compactions,
5526   // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
5527   read_options.total_order_seek = true;
5528 
5529   // Level-0 files have to be merged together.  For other levels,
5530   // we will make a concatenating iterator per level.
5531   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5532   const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5533                                               c->num_input_levels() - 1
5534                                         : c->num_input_levels());
5535   InternalIterator** list = new InternalIterator* [space];
5536   size_t num = 0;
5537   for (size_t which = 0; which < c->num_input_levels(); which++) {
5538     if (c->input_levels(which)->num_files != 0) {
5539       if (c->level(which) == 0) {
5540         const LevelFilesBrief* flevel = c->input_levels(which);
5541         for (size_t i = 0; i < flevel->num_files; i++) {
5542           list[num++] = cfd->table_cache()->NewIterator(
5543               read_options, file_options_compactions,
5544               cfd->internal_comparator(),
5545               *flevel->files[i].file_metadata, range_del_agg,
5546               c->mutable_cf_options()->prefix_extractor.get(),
5547               /*table_reader_ptr=*/nullptr,
5548               /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5549               /*arena=*/nullptr,
5550               /*skip_filters=*/false, /*level=*/static_cast<int>(which),
5551               /*smallest_compaction_key=*/nullptr,
5552               /*largest_compaction_key=*/nullptr);
5553         }
5554       } else {
5555         // Create concatenating iterator for the files from this level
5556         list[num++] = new LevelIterator(
5557             cfd->table_cache(), read_options, file_options_compactions,
5558             cfd->internal_comparator(), c->input_levels(which),
5559             c->mutable_cf_options()->prefix_extractor.get(),
5560             /*should_sample=*/false,
5561             /*no per level latency histogram=*/nullptr,
5562             TableReaderCaller::kCompaction, /*skip_filters=*/false,
5563             /*level=*/static_cast<int>(which), range_del_agg,
5564             c->boundaries(which));
5565       }
5566     }
5567   }
5568   assert(num <= space);
5569   InternalIterator* result =
5570       NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5571                          static_cast<int>(num));
5572   delete[] list;
5573   return result;
5574 }
5575 
5576 // verify that the files listed in this compaction are present
5577 // in the current version
VerifyCompactionFileConsistency(Compaction * c)5578 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5579 #ifndef NDEBUG
5580   Version* version = c->column_family_data()->current();
5581   const VersionStorageInfo* vstorage = version->storage_info();
5582   if (c->input_version() != version) {
5583     ROCKS_LOG_INFO(
5584         db_options_->info_log,
5585         "[%s] compaction output being applied to a different base version from"
5586         " input version",
5587         c->column_family_data()->GetName().c_str());
5588 
5589     if (vstorage->compaction_style_ == kCompactionStyleLevel &&
5590         c->start_level() == 0 && c->num_input_levels() > 2U) {
5591       // We are doing a L0->base_level compaction. The assumption is if
5592       // base level is not L1, levels from L1 to base_level - 1 is empty.
5593       // This is ensured by having one compaction from L0 going on at the
5594       // same time in level-based compaction. So that during the time, no
5595       // compaction/flush can put files to those levels.
5596       for (int l = c->start_level() + 1; l < c->output_level(); l++) {
5597         if (vstorage->NumLevelFiles(l) != 0) {
5598           return false;
5599         }
5600       }
5601     }
5602   }
5603 
5604   for (size_t input = 0; input < c->num_input_levels(); ++input) {
5605     int level = c->level(input);
5606     for (size_t i = 0; i < c->num_input_files(input); ++i) {
5607       uint64_t number = c->input(input, i)->fd.GetNumber();
5608       bool found = false;
5609       for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5610         FileMetaData* f = vstorage->files_[level][j];
5611         if (f->fd.GetNumber() == number) {
5612           found = true;
5613           break;
5614         }
5615       }
5616       if (!found) {
5617         return false;  // input files non existent in current version
5618       }
5619     }
5620   }
5621 #else
5622   (void)c;
5623 #endif
5624   return true;     // everything good
5625 }
5626 
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5627 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5628                                       FileMetaData** meta,
5629                                       ColumnFamilyData** cfd) {
5630   for (auto cfd_iter : *column_family_set_) {
5631     if (!cfd_iter->initialized()) {
5632       continue;
5633     }
5634     Version* version = cfd_iter->current();
5635     const auto* vstorage = version->storage_info();
5636     for (int level = 0; level < vstorage->num_levels(); level++) {
5637       for (const auto& file : vstorage->LevelFiles(level)) {
5638         if (file->fd.GetNumber() == number) {
5639           *meta = file;
5640           *filelevel = level;
5641           *cfd = cfd_iter;
5642           return Status::OK();
5643         }
5644       }
5645     }
5646   }
5647   return Status::NotFound("File not present in any level");
5648 }
5649 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5650 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5651   for (auto cfd : *column_family_set_) {
5652     if (cfd->IsDropped() || !cfd->initialized()) {
5653       continue;
5654     }
5655     for (int level = 0; level < cfd->NumberLevels(); level++) {
5656       for (const auto& file :
5657            cfd->current()->storage_info()->LevelFiles(level)) {
5658         LiveFileMetaData filemetadata;
5659         filemetadata.column_family_name = cfd->GetName();
5660         uint32_t path_id = file->fd.GetPathId();
5661         if (path_id < cfd->ioptions()->cf_paths.size()) {
5662           filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5663         } else {
5664           assert(!cfd->ioptions()->cf_paths.empty());
5665           filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5666         }
5667         const uint64_t file_number = file->fd.GetNumber();
5668         filemetadata.name = MakeTableFileName("", file_number);
5669         filemetadata.file_number = file_number;
5670         filemetadata.level = level;
5671         filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5672         filemetadata.smallestkey = file->smallest.user_key().ToString();
5673         filemetadata.largestkey = file->largest.user_key().ToString();
5674         filemetadata.smallest_seqno = file->fd.smallest_seqno;
5675         filemetadata.largest_seqno = file->fd.largest_seqno;
5676         filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5677             std::memory_order_relaxed);
5678         filemetadata.being_compacted = file->being_compacted;
5679         filemetadata.num_entries = file->num_entries;
5680         filemetadata.num_deletions = file->num_deletions;
5681         filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5682         filemetadata.file_checksum = file->file_checksum;
5683         filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5684         metadata->push_back(filemetadata);
5685       }
5686     }
5687   }
5688 }
5689 
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5690 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5691                                   std::vector<std::string>* manifest_filenames,
5692                                   uint64_t min_pending_output) {
5693   assert(manifest_filenames->empty());
5694   obsolete_manifests_.swap(*manifest_filenames);
5695   std::vector<ObsoleteFileInfo> pending_files;
5696   for (auto& f : obsolete_files_) {
5697     if (f.metadata->fd.GetNumber() < min_pending_output) {
5698       files->push_back(std::move(f));
5699     } else {
5700       pending_files.push_back(std::move(f));
5701     }
5702   }
5703   obsolete_files_.swap(pending_files);
5704 }
5705 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const VersionEdit * edit)5706 ColumnFamilyData* VersionSet::CreateColumnFamily(
5707     const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5708   assert(edit->is_column_family_add_);
5709 
5710   MutableCFOptions dummy_cf_options;
5711   Version* dummy_versions =
5712       new Version(nullptr, this, file_options_, dummy_cf_options);
5713   // Ref() dummy version once so that later we can call Unref() to delete it
5714   // by avoiding calling "delete" explicitly (~Version is private)
5715   dummy_versions->Ref();
5716   auto new_cfd = column_family_set_->CreateColumnFamily(
5717       edit->column_family_name_, edit->column_family_, dummy_versions,
5718       cf_options);
5719 
5720   Version* v = new Version(new_cfd, this, file_options_,
5721                            *new_cfd->GetLatestMutableCFOptions(),
5722                            current_version_number_++);
5723 
5724   // Fill level target base information.
5725   v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5726                                         *new_cfd->GetLatestMutableCFOptions());
5727   AppendVersion(new_cfd, v);
5728   // GetLatestMutableCFOptions() is safe here without mutex since the
5729   // cfd is not available to client
5730   new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5731                              LastSequence());
5732   new_cfd->SetLogNumber(edit->log_number_);
5733   return new_cfd;
5734 }
5735 
GetNumLiveVersions(Version * dummy_versions)5736 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5737   uint64_t count = 0;
5738   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5739     count++;
5740   }
5741   return count;
5742 }
5743 
GetTotalSstFilesSize(Version * dummy_versions)5744 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5745   std::unordered_set<uint64_t> unique_files;
5746   uint64_t total_files_size = 0;
5747   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5748     VersionStorageInfo* storage_info = v->storage_info();
5749     for (int level = 0; level < storage_info->num_levels_; level++) {
5750       for (const auto& file_meta : storage_info->LevelFiles(level)) {
5751         if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5752             unique_files.end()) {
5753           unique_files.insert(file_meta->fd.packed_number_and_path_id);
5754           total_files_size += file_meta->fd.GetFileSize();
5755         }
5756       }
5757     }
5758   }
5759   return total_files_size;
5760 }
5761 
VerifyFileMetadata(const std::string & fpath,const FileMetaData & meta) const5762 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5763                                       const FileMetaData& meta) const {
5764   uint64_t fsize = 0;
5765   Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5766   if (status.ok()) {
5767     if (fsize != meta.fd.GetFileSize()) {
5768       status = Status::Corruption("File size mismatch: " + fpath);
5769     }
5770   }
5771   return status;
5772 }
5773 
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller)5774 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
5775                                        const ImmutableDBOptions* _db_options,
5776                                        const FileOptions& _file_options,
5777                                        Cache* table_cache,
5778                                        WriteBufferManager* write_buffer_manager,
5779                                        WriteController* write_controller)
5780     : VersionSet(dbname, _db_options, _file_options, table_cache,
5781                  write_buffer_manager, write_controller,
5782                  /*block_cache_tracer=*/nullptr),
5783       number_of_edits_to_skip_(0) {}
5784 
~ReactiveVersionSet()5785 ReactiveVersionSet::~ReactiveVersionSet() {}
5786 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5787 Status ReactiveVersionSet::Recover(
5788     const std::vector<ColumnFamilyDescriptor>& column_families,
5789     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5790     std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5791     std::unique_ptr<Status>* manifest_reader_status) {
5792   assert(manifest_reader != nullptr);
5793   assert(manifest_reporter != nullptr);
5794   assert(manifest_reader_status != nullptr);
5795 
5796   std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
5797   for (const auto& cf : column_families) {
5798     cf_name_to_options.insert({cf.name, cf.options});
5799   }
5800 
5801   // add default column family
5802   auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
5803   if (default_cf_iter == cf_name_to_options.end()) {
5804     return Status::InvalidArgument("Default column family not specified");
5805   }
5806   VersionEdit default_cf_edit;
5807   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
5808   default_cf_edit.SetColumnFamily(0);
5809   ColumnFamilyData* default_cfd =
5810       CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
5811   // In recovery, nobody else can access it, so it's fine to set it to be
5812   // initialized earlier.
5813   default_cfd->set_initialized();
5814   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
5815       builders;
5816   std::unordered_map<int, std::string> column_families_not_found;
5817   builders.insert(
5818       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
5819                             new BaseReferencedVersionBuilder(default_cfd))));
5820 
5821   manifest_reader_status->reset(new Status());
5822   manifest_reporter->reset(new LogReporter());
5823   static_cast<LogReporter*>(manifest_reporter->get())->status =
5824       manifest_reader_status->get();
5825   Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5826   log::Reader* reader = manifest_reader->get();
5827 
5828   int retry = 0;
5829   VersionEdit version_edit;
5830   while (s.ok() && retry < 1) {
5831     assert(reader != nullptr);
5832     Slice record;
5833     std::string scratch;
5834     s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
5835                        column_families_not_found, builders, &version_edit);
5836     if (s.ok()) {
5837       bool enough = version_edit.has_next_file_number_ &&
5838                     version_edit.has_log_number_ &&
5839                     version_edit.has_last_sequence_;
5840       if (enough) {
5841         for (const auto& cf : column_families) {
5842           auto cfd = column_family_set_->GetColumnFamily(cf.name);
5843           if (cfd == nullptr) {
5844             enough = false;
5845             break;
5846           }
5847         }
5848       }
5849       if (enough) {
5850         for (const auto& cf : column_families) {
5851           auto cfd = column_family_set_->GetColumnFamily(cf.name);
5852           assert(cfd != nullptr);
5853           if (!cfd->IsDropped()) {
5854             auto builder_iter = builders.find(cfd->GetID());
5855             assert(builder_iter != builders.end());
5856             auto builder = builder_iter->second->version_builder();
5857             assert(builder != nullptr);
5858             s = builder->LoadTableHandlers(
5859                 cfd->internal_stats(), db_options_->max_file_opening_threads,
5860                 false /* prefetch_index_and_filter_in_cache */,
5861                 true /* is_initial_load */,
5862                 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5863             if (!s.ok()) {
5864               enough = false;
5865               if (s.IsPathNotFound()) {
5866                 s = Status::OK();
5867               }
5868               break;
5869             }
5870           }
5871         }
5872       }
5873       if (enough) {
5874         break;
5875       }
5876     }
5877     ++retry;
5878   }
5879 
5880   if (s.ok()) {
5881     if (!version_edit.has_prev_log_number_) {
5882       version_edit.prev_log_number_ = 0;
5883     }
5884     column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5885 
5886     MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
5887     MarkFileNumberUsed(version_edit.prev_log_number_);
5888     MarkFileNumberUsed(version_edit.log_number_);
5889 
5890     for (auto cfd : *column_family_set_) {
5891       assert(builders.count(cfd->GetID()) > 0);
5892       auto builder = builders[cfd->GetID()]->version_builder();
5893       if (!builder->CheckConsistencyForNumLevels()) {
5894         s = Status::InvalidArgument(
5895             "db has more levels than options.num_levels");
5896         break;
5897       }
5898     }
5899   }
5900 
5901   if (s.ok()) {
5902     for (auto cfd : *column_family_set_) {
5903       if (cfd->IsDropped()) {
5904         continue;
5905       }
5906       assert(cfd->initialized());
5907       auto builders_iter = builders.find(cfd->GetID());
5908       assert(builders_iter != builders.end());
5909       auto* builder = builders_iter->second->version_builder();
5910 
5911       Version* v = new Version(cfd, this, file_options_,
5912                                *cfd->GetLatestMutableCFOptions(),
5913                                current_version_number_++);
5914       builder->SaveTo(v->storage_info());
5915 
5916       // Install recovered version
5917       v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
5918                       !(db_options_->skip_stats_update_on_db_open));
5919       AppendVersion(cfd, v);
5920     }
5921     next_file_number_.store(version_edit.next_file_number_ + 1);
5922     last_allocated_sequence_ = version_edit.last_sequence_;
5923     last_published_sequence_ = version_edit.last_sequence_;
5924     last_sequence_ = version_edit.last_sequence_;
5925     prev_log_number_ = version_edit.prev_log_number_;
5926     for (auto cfd : *column_family_set_) {
5927       if (cfd->IsDropped()) {
5928         continue;
5929       }
5930       ROCKS_LOG_INFO(db_options_->info_log,
5931                      "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
5932                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
5933     }
5934   }
5935   return s;
5936 }
5937 
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unordered_set<ColumnFamilyData * > * cfds_changed)5938 Status ReactiveVersionSet::ReadAndApply(
5939     InstrumentedMutex* mu,
5940     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5941     std::unordered_set<ColumnFamilyData*>* cfds_changed) {
5942   assert(manifest_reader != nullptr);
5943   assert(cfds_changed != nullptr);
5944   mu->AssertHeld();
5945 
5946   Status s;
5947   uint64_t applied_edits = 0;
5948   while (s.ok()) {
5949     Slice record;
5950     std::string scratch;
5951     log::Reader* reader = manifest_reader->get();
5952     std::string old_manifest_path = reader->file()->file_name();
5953     while (reader->ReadRecord(&record, &scratch)) {
5954       VersionEdit edit;
5955       s = edit.DecodeFrom(record);
5956       if (!s.ok()) {
5957         break;
5958       }
5959 
5960       // Skip the first VersionEdits of each MANIFEST generated by
5961       // VersionSet::WriteCurrentStatetoManifest.
5962       if (number_of_edits_to_skip_ > 0) {
5963         ColumnFamilyData* cfd =
5964             column_family_set_->GetColumnFamily(edit.column_family_);
5965         if (cfd != nullptr && !cfd->IsDropped()) {
5966           --number_of_edits_to_skip_;
5967         }
5968         continue;
5969       }
5970 
5971       s = read_buffer_.AddEdit(&edit);
5972       if (!s.ok()) {
5973         break;
5974       }
5975       VersionEdit temp_edit;
5976       if (edit.is_in_atomic_group_) {
5977         if (read_buffer_.IsFull()) {
5978           // Apply edits in an atomic group when we have read all edits in the
5979           // group.
5980           for (auto& e : read_buffer_.replay_buffer()) {
5981             s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
5982             if (!s.ok()) {
5983               break;
5984             }
5985             applied_edits++;
5986           }
5987           if (!s.ok()) {
5988             break;
5989           }
5990           read_buffer_.Clear();
5991         }
5992       } else {
5993         // Apply a normal edit immediately.
5994         s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
5995         if (s.ok()) {
5996           applied_edits++;
5997         }
5998       }
5999     }
6000     if (!s.ok()) {
6001       // Clear the buffer if we fail to decode/apply an edit.
6002       read_buffer_.Clear();
6003     }
6004     // It's possible that:
6005     // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
6006     // 2) we have finished reading the current MANIFEST.
6007     // 3) we have encountered an IOError reading the current MANIFEST.
6008     // We need to look for the next MANIFEST and start from there. If we cannot
6009     // find the next MANIFEST, we should exit the loop.
6010     s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
6011     reader = manifest_reader->get();
6012     if (s.ok()) {
6013       if (reader->file()->file_name() == old_manifest_path) {
6014         // Still processing the same MANIFEST, thus no need to continue this
6015         // loop since no record is available if we have reached here.
6016         break;
6017       } else {
6018         // We have switched to a new MANIFEST whose first records have been
6019         // generated by VersionSet::WriteCurrentStatetoManifest. Since the
6020         // secondary instance has already finished recovering upon start, there
6021         // is no need for the secondary to process these records. Actually, if
6022         // the secondary were to replay these records, the secondary may end up
6023         // adding the same SST files AGAIN to each column family, causing
6024         // consistency checks done by VersionBuilder to fail. Therefore, we
6025         // record the number of records to skip at the beginning of the new
6026         // MANIFEST and ignore them.
6027         number_of_edits_to_skip_ = 0;
6028         for (auto* cfd : *column_family_set_) {
6029           if (cfd->IsDropped()) {
6030             continue;
6031           }
6032           // Increase number_of_edits_to_skip by 2 because
6033           // WriteCurrentStatetoManifest() writes 2 version edits for each
6034           // column family at the beginning of the newly-generated MANIFEST.
6035           // TODO(yanqin) remove hard-coded value.
6036           if (db_options_->write_dbid_to_manifest) {
6037             number_of_edits_to_skip_ += 3;
6038           } else {
6039             number_of_edits_to_skip_ += 2;
6040           }
6041         }
6042       }
6043     }
6044   }
6045 
6046   if (s.ok()) {
6047     for (auto cfd : *column_family_set_) {
6048       auto builder_iter = active_version_builders_.find(cfd->GetID());
6049       if (builder_iter == active_version_builders_.end()) {
6050         continue;
6051       }
6052       auto builder = builder_iter->second->version_builder();
6053       if (!builder->CheckConsistencyForNumLevels()) {
6054         s = Status::InvalidArgument(
6055             "db has more levels than options.num_levels");
6056         break;
6057       }
6058     }
6059   }
6060   TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
6061                            &applied_edits);
6062   return s;
6063 }
6064 
ApplyOneVersionEditToBuilder(VersionEdit & edit,std::unordered_set<ColumnFamilyData * > * cfds_changed,VersionEdit * version_edit)6065 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
6066     VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
6067     VersionEdit* version_edit) {
6068   ColumnFamilyData* cfd =
6069       column_family_set_->GetColumnFamily(edit.column_family_);
6070 
6071   // If we cannot find this column family in our column family set, then it
6072   // may be a new column family created by the primary after the secondary
6073   // starts. It is also possible that the secondary instance opens only a subset
6074   // of column families. Ignore it for now.
6075   if (nullptr == cfd) {
6076     return Status::OK();
6077   }
6078   if (active_version_builders_.find(edit.column_family_) ==
6079           active_version_builders_.end() &&
6080       !cfd->IsDropped()) {
6081     std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
6082         new BaseReferencedVersionBuilder(cfd));
6083     active_version_builders_.insert(
6084         std::make_pair(edit.column_family_, std::move(builder_guard)));
6085   }
6086 
6087   auto builder_iter = active_version_builders_.find(edit.column_family_);
6088   assert(builder_iter != active_version_builders_.end());
6089   auto builder = builder_iter->second->version_builder();
6090   assert(builder != nullptr);
6091 
6092   if (edit.is_column_family_add_) {
6093     // TODO (yanqin) for now the secondary ignores column families created
6094     // after Open. This also simplifies handling of switching to a new MANIFEST
6095     // and processing the snapshot of the system at the beginning of the
6096     // MANIFEST.
6097   } else if (edit.is_column_family_drop_) {
6098     // Drop the column family by setting it to be 'dropped' without destroying
6099     // the column family handle.
6100     // TODO (haoyu) figure out how to handle column faimly drop for
6101     // secondary instance. (Is it possible that the ref count for cfd is 0 but
6102     // the ref count for its versions is higher than 0?)
6103     cfd->SetDropped();
6104     if (cfd->UnrefAndTryDelete()) {
6105       cfd = nullptr;
6106     }
6107     active_version_builders_.erase(builder_iter);
6108   } else {
6109     Status s = builder->Apply(&edit);
6110     if (!s.ok()) {
6111       return s;
6112     }
6113   }
6114   Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
6115   if (!s.ok()) {
6116     return s;
6117   }
6118 
6119   if (cfd != nullptr && !cfd->IsDropped()) {
6120     s = builder->LoadTableHandlers(
6121         cfd->internal_stats(), db_options_->max_file_opening_threads,
6122         false /* prefetch_index_and_filter_in_cache */,
6123         false /* is_initial_load */,
6124         cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
6125     TEST_SYNC_POINT_CALLBACK(
6126         "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
6127         "AfterLoadTableHandlers",
6128         &s);
6129 
6130     if (s.ok()) {
6131       auto version = new Version(cfd, this, file_options_,
6132                                  *cfd->GetLatestMutableCFOptions(),
6133                                  current_version_number_++);
6134       builder->SaveTo(version->storage_info());
6135       version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
6136       AppendVersion(cfd, version);
6137       active_version_builders_.erase(builder_iter);
6138       if (cfds_changed->count(cfd) == 0) {
6139         cfds_changed->insert(cfd);
6140       }
6141     } else if (s.IsPathNotFound()) {
6142       s = Status::OK();
6143     }
6144     // Some other error has occurred during LoadTableHandlers.
6145   }
6146 
6147   if (version_edit->HasNextFile()) {
6148     next_file_number_.store(version_edit->next_file_number_ + 1);
6149   }
6150   if (version_edit->has_last_sequence_) {
6151     last_allocated_sequence_ = version_edit->last_sequence_;
6152     last_published_sequence_ = version_edit->last_sequence_;
6153     last_sequence_ = version_edit->last_sequence_;
6154   }
6155   if (version_edit->has_prev_log_number_) {
6156     prev_log_number_ = version_edit->prev_log_number_;
6157     MarkFileNumberUsed(version_edit->prev_log_number_);
6158   }
6159   if (version_edit->has_log_number_) {
6160     MarkFileNumberUsed(version_edit->log_number_);
6161   }
6162   column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
6163   MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
6164   return s;
6165 }
6166 
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)6167 Status ReactiveVersionSet::MaybeSwitchManifest(
6168     log::Reader::Reporter* reporter,
6169     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
6170   assert(manifest_reader != nullptr);
6171   Status s;
6172   do {
6173     std::string manifest_path;
6174     s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
6175                                &manifest_file_number_);
6176     std::unique_ptr<FSSequentialFile> manifest_file;
6177     if (s.ok()) {
6178       if (nullptr == manifest_reader->get() ||
6179           manifest_reader->get()->file()->file_name() != manifest_path) {
6180         TEST_SYNC_POINT(
6181             "ReactiveVersionSet::MaybeSwitchManifest:"
6182             "AfterGetCurrentManifestPath:0");
6183         TEST_SYNC_POINT(
6184             "ReactiveVersionSet::MaybeSwitchManifest:"
6185             "AfterGetCurrentManifestPath:1");
6186         s = fs_->NewSequentialFile(manifest_path,
6187                                    env_->OptimizeForManifestRead(file_options_),
6188                                    &manifest_file, nullptr);
6189       } else {
6190         // No need to switch manifest.
6191         break;
6192       }
6193     }
6194     std::unique_ptr<SequentialFileReader> manifest_file_reader;
6195     if (s.ok()) {
6196       manifest_file_reader.reset(
6197           new SequentialFileReader(std::move(manifest_file), manifest_path,
6198                                    db_options_->log_readahead_size));
6199       manifest_reader->reset(new log::FragmentBufferedReader(
6200           nullptr, std::move(manifest_file_reader), reporter,
6201           true /* checksum */, 0 /* log_number */));
6202       ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
6203                      manifest_path.c_str());
6204       // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
6205       // active_version_builders_ map because we choose to construct the
6206       // versions from scratch, thanks to the first part of each MANIFEST
6207       // written by VersionSet::WriteCurrentStatetoManifest. This is not
6208       // necessary, but we choose this at present for the sake of simplicity.
6209       active_version_builders_.clear();
6210     }
6211   } while (s.IsPathNotFound());
6212   return s;
6213 }
6214 
6215 }  // namespace ROCKSDB_NAMESPACE
6216