1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #include <stdio.h>
13 #include <algorithm>
14 #include <array>
15 #include <cinttypes>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 #include "compaction/compaction.h"
23 #include "db/internal_stats.h"
24 #include "db/log_reader.h"
25 #include "db/log_writer.h"
26 #include "db/memtable.h"
27 #include "db/merge_context.h"
28 #include "db/merge_helper.h"
29 #include "db/pinned_iterators_manager.h"
30 #include "db/table_cache.h"
31 #include "db/version_builder.h"
32 #include "db/version_edit_handler.h"
33 #include "file/filename.h"
34 #include "file/random_access_file_reader.h"
35 #include "file/read_write_util.h"
36 #include "file/writable_file_writer.h"
37 #include "monitoring/file_read_sample.h"
38 #include "monitoring/perf_context_imp.h"
39 #include "monitoring/persistent_stats_history.h"
40 #include "rocksdb/env.h"
41 #include "rocksdb/merge_operator.h"
42 #include "rocksdb/write_buffer_manager.h"
43 #include "table/format.h"
44 #include "table/get_context.h"
45 #include "table/internal_iterator.h"
46 #include "table/merging_iterator.h"
47 #include "table/meta_blocks.h"
48 #include "table/multiget_context.h"
49 #include "table/plain/plain_table_factory.h"
50 #include "table/table_reader.h"
51 #include "table/two_level_iterator.h"
52 #include "test_util/sync_point.h"
53 #include "util/coding.h"
54 #include "util/stop_watch.h"
55 #include "util/string_util.h"
56 #include "util/user_comparator_wrapper.h"
57
58 namespace ROCKSDB_NAMESPACE {
59
60 namespace {
61
62 // Find File in LevelFilesBrief data structure
63 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)64 int FindFileInRange(const InternalKeyComparator& icmp,
65 const LevelFilesBrief& file_level,
66 const Slice& key,
67 uint32_t left,
68 uint32_t right) {
69 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
70 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
71 };
72 const auto &b = file_level.files;
73 return static_cast<int>(std::lower_bound(b + left,
74 b + right, key, cmp) - b);
75 }
76
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)77 Status OverlapWithIterator(const Comparator* ucmp,
78 const Slice& smallest_user_key,
79 const Slice& largest_user_key,
80 InternalIterator* iter,
81 bool* overlap) {
82 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
83 kValueTypeForSeek);
84 iter->Seek(range_start.Encode());
85 if (!iter->status().ok()) {
86 return iter->status();
87 }
88
89 *overlap = false;
90 if (iter->Valid()) {
91 ParsedInternalKey seek_result;
92 if (!ParseInternalKey(iter->key(), &seek_result)) {
93 return Status::Corruption("DB have corrupted keys");
94 }
95
96 if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
97 0) {
98 *overlap = true;
99 }
100 }
101
102 return iter->status();
103 }
104
105 // Class to help choose the next file to search for the particular key.
106 // Searches and returns files level by level.
107 // We can search level-by-level since entries never hop across
108 // levels. Therefore we are guaranteed that if we find data
109 // in a smaller level, later levels are irrelevant (unless we
110 // are MergeInProgress).
111 class FilePicker {
112 public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)113 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
114 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
115 unsigned int num_levels, FileIndexer* file_indexer,
116 const Comparator* user_comparator,
117 const InternalKeyComparator* internal_comparator)
118 : num_levels_(num_levels),
119 curr_level_(static_cast<unsigned int>(-1)),
120 returned_file_level_(static_cast<unsigned int>(-1)),
121 hit_file_level_(static_cast<unsigned int>(-1)),
122 search_left_bound_(0),
123 search_right_bound_(FileIndexer::kLevelMaxIndex),
124 #ifndef NDEBUG
125 files_(files),
126 #endif
127 level_files_brief_(file_levels),
128 is_hit_file_last_in_level_(false),
129 curr_file_level_(nullptr),
130 user_key_(user_key),
131 ikey_(ikey),
132 file_indexer_(file_indexer),
133 user_comparator_(user_comparator),
134 internal_comparator_(internal_comparator) {
135 #ifdef NDEBUG
136 (void)files;
137 #endif
138 // Setup member variables to search first level.
139 search_ended_ = !PrepareNextLevel();
140 if (!search_ended_) {
141 // Prefetch Level 0 table data to avoid cache miss if possible.
142 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
143 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
144 if (r) {
145 r->Prepare(ikey);
146 }
147 }
148 }
149 }
150
GetCurrentLevel() const151 int GetCurrentLevel() const { return curr_level_; }
152
GetNextFile()153 FdWithKeyRange* GetNextFile() {
154 while (!search_ended_) { // Loops over different levels.
155 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
156 // Loops over all files in current level.
157 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
158 hit_file_level_ = curr_level_;
159 is_hit_file_last_in_level_ =
160 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
161 int cmp_largest = -1;
162
163 // Do key range filtering of files or/and fractional cascading if:
164 // (1) not all the files are in level 0, or
165 // (2) there are more than 3 current level files
166 // If there are only 3 or less current level files in the system, we skip
167 // the key range filtering. In this case, more likely, the system is
168 // highly tuned to minimize number of tables queried by each query,
169 // so it is unlikely that key range filtering is more efficient than
170 // querying the files.
171 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
172 // Check if key is within a file's range. If search left bound and
173 // right bound point to the same find, we are sure key falls in
174 // range.
175 assert(curr_level_ == 0 ||
176 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
177 user_comparator_->CompareWithoutTimestamp(
178 user_key_, ExtractUserKey(f->smallest_key)) <= 0);
179
180 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
181 user_key_, ExtractUserKey(f->smallest_key));
182 if (cmp_smallest >= 0) {
183 cmp_largest = user_comparator_->CompareWithoutTimestamp(
184 user_key_, ExtractUserKey(f->largest_key));
185 }
186
187 // Setup file search bound for the next level based on the
188 // comparison results
189 if (curr_level_ > 0) {
190 file_indexer_->GetNextLevelIndex(curr_level_,
191 curr_index_in_curr_level_,
192 cmp_smallest, cmp_largest,
193 &search_left_bound_,
194 &search_right_bound_);
195 }
196 // Key falls out of current file's range
197 if (cmp_smallest < 0 || cmp_largest > 0) {
198 if (curr_level_ == 0) {
199 ++curr_index_in_curr_level_;
200 continue;
201 } else {
202 // Search next level.
203 break;
204 }
205 }
206 }
207 #ifndef NDEBUG
208 // Sanity check to make sure that the files are correctly sorted
209 if (prev_file_) {
210 if (curr_level_ != 0) {
211 int comp_sign = internal_comparator_->Compare(
212 prev_file_->largest_key, f->smallest_key);
213 assert(comp_sign < 0);
214 } else {
215 // level == 0, the current file cannot be newer than the previous
216 // one. Use compressed data structure, has no attribute seqNo
217 assert(curr_index_in_curr_level_ > 0);
218 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
219 files_[0][curr_index_in_curr_level_-1]));
220 }
221 }
222 prev_file_ = f;
223 #endif
224 returned_file_level_ = curr_level_;
225 if (curr_level_ > 0 && cmp_largest < 0) {
226 // No more files to search in this level.
227 search_ended_ = !PrepareNextLevel();
228 } else {
229 ++curr_index_in_curr_level_;
230 }
231 return f;
232 }
233 // Start searching next level.
234 search_ended_ = !PrepareNextLevel();
235 }
236 // Search ended.
237 return nullptr;
238 }
239
240 // getter for current file level
241 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()242 unsigned int GetHitFileLevel() { return hit_file_level_; }
243
244 // Returns true if the most recent "hit file" (i.e., one returned by
245 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()246 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
247
248 private:
249 unsigned int num_levels_;
250 unsigned int curr_level_;
251 unsigned int returned_file_level_;
252 unsigned int hit_file_level_;
253 int32_t search_left_bound_;
254 int32_t search_right_bound_;
255 #ifndef NDEBUG
256 std::vector<FileMetaData*>* files_;
257 #endif
258 autovector<LevelFilesBrief>* level_files_brief_;
259 bool search_ended_;
260 bool is_hit_file_last_in_level_;
261 LevelFilesBrief* curr_file_level_;
262 unsigned int curr_index_in_curr_level_;
263 unsigned int start_index_in_curr_level_;
264 Slice user_key_;
265 Slice ikey_;
266 FileIndexer* file_indexer_;
267 const Comparator* user_comparator_;
268 const InternalKeyComparator* internal_comparator_;
269 #ifndef NDEBUG
270 FdWithKeyRange* prev_file_;
271 #endif
272
273 // Setup local variables to search next level.
274 // Returns false if there are no more levels to search.
PrepareNextLevel()275 bool PrepareNextLevel() {
276 curr_level_++;
277 while (curr_level_ < num_levels_) {
278 curr_file_level_ = &(*level_files_brief_)[curr_level_];
279 if (curr_file_level_->num_files == 0) {
280 // When current level is empty, the search bound generated from upper
281 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
282 // also empty.
283 assert(search_left_bound_ == 0);
284 assert(search_right_bound_ == -1 ||
285 search_right_bound_ == FileIndexer::kLevelMaxIndex);
286 // Since current level is empty, it will need to search all files in
287 // the next level
288 search_left_bound_ = 0;
289 search_right_bound_ = FileIndexer::kLevelMaxIndex;
290 curr_level_++;
291 continue;
292 }
293
294 // Some files may overlap each other. We find
295 // all files that overlap user_key and process them in order from
296 // newest to oldest. In the context of merge-operator, this can occur at
297 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
298 // are always compacted into a single entry).
299 int32_t start_index;
300 if (curr_level_ == 0) {
301 // On Level-0, we read through all files to check for overlap.
302 start_index = 0;
303 } else {
304 // On Level-n (n>=1), files are sorted. Binary search to find the
305 // earliest file whose largest key >= ikey. Search left bound and
306 // right bound are used to narrow the range.
307 if (search_left_bound_ <= search_right_bound_) {
308 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
309 search_right_bound_ =
310 static_cast<int32_t>(curr_file_level_->num_files) - 1;
311 }
312 // `search_right_bound_` is an inclusive upper-bound, but since it was
313 // determined based on user key, it is still possible the lookup key
314 // falls to the right of `search_right_bound_`'s corresponding file.
315 // So, pass a limit one higher, which allows us to detect this case.
316 start_index =
317 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
318 static_cast<uint32_t>(search_left_bound_),
319 static_cast<uint32_t>(search_right_bound_) + 1);
320 if (start_index == search_right_bound_ + 1) {
321 // `ikey_` comes after `search_right_bound_`. The lookup key does
322 // not exist on this level, so let's skip this level and do a full
323 // binary search on the next level.
324 search_left_bound_ = 0;
325 search_right_bound_ = FileIndexer::kLevelMaxIndex;
326 curr_level_++;
327 continue;
328 }
329 } else {
330 // search_left_bound > search_right_bound, key does not exist in
331 // this level. Since no comparison is done in this level, it will
332 // need to search all files in the next level.
333 search_left_bound_ = 0;
334 search_right_bound_ = FileIndexer::kLevelMaxIndex;
335 curr_level_++;
336 continue;
337 }
338 }
339 start_index_in_curr_level_ = start_index;
340 curr_index_in_curr_level_ = start_index;
341 #ifndef NDEBUG
342 prev_file_ = nullptr;
343 #endif
344 return true;
345 }
346 // curr_level_ = num_levels_. So, no more levels to search.
347 return false;
348 }
349 };
350
351 class FilePickerMultiGet {
352 private:
353 struct FilePickerContext;
354
355 public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)356 FilePickerMultiGet(MultiGetRange* range,
357 autovector<LevelFilesBrief>* file_levels,
358 unsigned int num_levels, FileIndexer* file_indexer,
359 const Comparator* user_comparator,
360 const InternalKeyComparator* internal_comparator)
361 : num_levels_(num_levels),
362 curr_level_(static_cast<unsigned int>(-1)),
363 returned_file_level_(static_cast<unsigned int>(-1)),
364 hit_file_level_(static_cast<unsigned int>(-1)),
365 range_(range),
366 batch_iter_(range->begin()),
367 batch_iter_prev_(range->begin()),
368 maybe_repeat_key_(false),
369 current_level_range_(*range, range->begin(), range->end()),
370 current_file_range_(*range, range->begin(), range->end()),
371 level_files_brief_(file_levels),
372 is_hit_file_last_in_level_(false),
373 curr_file_level_(nullptr),
374 file_indexer_(file_indexer),
375 user_comparator_(user_comparator),
376 internal_comparator_(internal_comparator) {
377 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
378 fp_ctx_array_[iter.index()] =
379 FilePickerContext(0, FileIndexer::kLevelMaxIndex);
380 }
381
382 // Setup member variables to search first level.
383 search_ended_ = !PrepareNextLevel();
384 if (!search_ended_) {
385 // REVISIT
386 // Prefetch Level 0 table data to avoid cache miss if possible.
387 // As of now, only PlainTableReader and CuckooTableReader do any
388 // prefetching. This may not be necessary anymore once we implement
389 // batching in those table readers
390 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
391 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
392 if (r) {
393 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
394 r->Prepare(iter->ikey);
395 }
396 }
397 }
398 }
399 }
400
GetCurrentLevel() const401 int GetCurrentLevel() const { return curr_level_; }
402
403 // Iterates through files in the current level until it finds a file that
404 // contains atleast one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)405 bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
406 size_t* file_index, FdWithKeyRange** fd,
407 bool* is_last_key_in_file) {
408 size_t curr_file_index = *file_index;
409 FdWithKeyRange* f = nullptr;
410 bool file_hit = false;
411 int cmp_largest = -1;
412 if (curr_file_index >= curr_file_level_->num_files) {
413 // In the unlikely case the next key is a duplicate of the current key,
414 // and the current key is the last in the level and the internal key
415 // was not found, we need to skip lookup for the remaining keys and
416 // reset the search bounds
417 if (batch_iter_ != current_level_range_.end()) {
418 ++batch_iter_;
419 for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
420 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
421 fp_ctx.search_left_bound = 0;
422 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
423 }
424 }
425 return false;
426 }
427 // Loops over keys in the MultiGet batch until it finds a file with
428 // atleast one of the keys. Then it keeps moving forward until the
429 // last key in the batch that falls in that file
430 while (batch_iter_ != current_level_range_.end() &&
431 (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
432 curr_file_index ||
433 !file_hit)) {
434 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
435 f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
436 Slice& user_key = batch_iter_->ukey;
437
438 // Do key range filtering of files or/and fractional cascading if:
439 // (1) not all the files are in level 0, or
440 // (2) there are more than 3 current level files
441 // If there are only 3 or less current level files in the system, we
442 // skip the key range filtering. In this case, more likely, the system
443 // is highly tuned to minimize number of tables queried by each query,
444 // so it is unlikely that key range filtering is more efficient than
445 // querying the files.
446 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
447 // Check if key is within a file's range. If search left bound and
448 // right bound point to the same find, we are sure key falls in
449 // range.
450 assert(curr_level_ == 0 ||
451 fp_ctx.curr_index_in_curr_level ==
452 fp_ctx.start_index_in_curr_level ||
453 user_comparator_->Compare(user_key,
454 ExtractUserKey(f->smallest_key)) <= 0);
455
456 int cmp_smallest = user_comparator_->Compare(
457 user_key, ExtractUserKey(f->smallest_key));
458 if (cmp_smallest >= 0) {
459 cmp_largest = user_comparator_->Compare(
460 user_key, ExtractUserKey(f->largest_key));
461 } else {
462 cmp_largest = -1;
463 }
464
465 // Setup file search bound for the next level based on the
466 // comparison results
467 if (curr_level_ > 0) {
468 file_indexer_->GetNextLevelIndex(
469 curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
470 cmp_largest, &fp_ctx.search_left_bound,
471 &fp_ctx.search_right_bound);
472 }
473 // Key falls out of current file's range
474 if (cmp_smallest < 0 || cmp_largest > 0) {
475 next_file_range->SkipKey(batch_iter_);
476 } else {
477 file_hit = true;
478 }
479 } else {
480 file_hit = true;
481 }
482 if (cmp_largest == 0) {
483 // cmp_largest is 0, which means the next key will not be in this
484 // file, so stop looking further. Also don't increment megt_iter_
485 // as we may have to look for this key in the next file if we don't
486 // find it in this one
487 break;
488 } else {
489 if (curr_level_ == 0) {
490 // We need to look through all files in level 0
491 ++fp_ctx.curr_index_in_curr_level;
492 }
493 ++batch_iter_;
494 }
495 if (!file_hit) {
496 curr_file_index =
497 (batch_iter_ != current_level_range_.end())
498 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
499 : curr_file_level_->num_files;
500 }
501 }
502
503 *fd = f;
504 *file_index = curr_file_index;
505 *is_last_key_in_file = cmp_largest == 0;
506 return file_hit;
507 }
508
GetNextFile()509 FdWithKeyRange* GetNextFile() {
510 while (!search_ended_) {
511 // Start searching next level.
512 if (batch_iter_ == current_level_range_.end()) {
513 search_ended_ = !PrepareNextLevel();
514 continue;
515 } else {
516 if (maybe_repeat_key_) {
517 maybe_repeat_key_ = false;
518 // Check if we found the final value for the last key in the
519 // previous lookup range. If we did, then there's no need to look
520 // any further for that key, so advance batch_iter_. Else, keep
521 // batch_iter_ positioned on that key so we look it up again in
522 // the next file
523 // For L0, always advance the key because we will look in the next
524 // file regardless for all keys not found yet
525 if (current_level_range_.CheckKeyDone(batch_iter_) ||
526 curr_level_ == 0) {
527 ++batch_iter_;
528 }
529 }
530 // batch_iter_prev_ will become the start key for the next file
531 // lookup
532 batch_iter_prev_ = batch_iter_;
533 }
534
535 MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
536 current_level_range_.end());
537 size_t curr_file_index =
538 (batch_iter_ != current_level_range_.end())
539 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
540 : curr_file_level_->num_files;
541 FdWithKeyRange* f;
542 bool is_last_key_in_file;
543 if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
544 &is_last_key_in_file)) {
545 search_ended_ = !PrepareNextLevel();
546 } else {
547 MultiGetRange::Iterator upper_key = batch_iter_;
548 if (is_last_key_in_file) {
549 // Since cmp_largest is 0, batch_iter_ still points to the last key
550 // that falls in this file, instead of the next one. Increment
551 // upper_key so we can set the range properly for SST MultiGet
552 ++upper_key;
553 ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
554 maybe_repeat_key_ = true;
555 }
556 // Set the range for this file
557 current_file_range_ =
558 MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
559 returned_file_level_ = curr_level_;
560 hit_file_level_ = curr_level_;
561 is_hit_file_last_in_level_ =
562 curr_file_index == curr_file_level_->num_files - 1;
563 return f;
564 }
565 }
566
567 // Search ended
568 return nullptr;
569 }
570
571 // getter for current file level
572 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()573 unsigned int GetHitFileLevel() { return hit_file_level_; }
574
575 // Returns true if the most recent "hit file" (i.e., one returned by
576 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()577 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
578
CurrentFileRange()579 const MultiGetRange& CurrentFileRange() { return current_file_range_; }
580
581 private:
582 unsigned int num_levels_;
583 unsigned int curr_level_;
584 unsigned int returned_file_level_;
585 unsigned int hit_file_level_;
586
587 struct FilePickerContext {
588 int32_t search_left_bound;
589 int32_t search_right_bound;
590 unsigned int curr_index_in_curr_level;
591 unsigned int start_index_in_curr_level;
592
FilePickerContextROCKSDB_NAMESPACE::__anonc84eced60111::FilePickerMultiGet::FilePickerContext593 FilePickerContext(int32_t left, int32_t right)
594 : search_left_bound(left), search_right_bound(right),
595 curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
596
597 FilePickerContext() = default;
598 };
599 std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
600 MultiGetRange* range_;
601 // Iterator to iterate through the keys in a MultiGet batch, that gets reset
602 // at the beginning of each level. Each call to GetNextFile() will position
603 // batch_iter_ at or right after the last key that was found in the returned
604 // SST file
605 MultiGetRange::Iterator batch_iter_;
606 // An iterator that records the previous position of batch_iter_, i.e last
607 // key found in the previous SST file, in order to serve as the start of
608 // the batch key range for the next SST file
609 MultiGetRange::Iterator batch_iter_prev_;
610 bool maybe_repeat_key_;
611 MultiGetRange current_level_range_;
612 MultiGetRange current_file_range_;
613 autovector<LevelFilesBrief>* level_files_brief_;
614 bool search_ended_;
615 bool is_hit_file_last_in_level_;
616 LevelFilesBrief* curr_file_level_;
617 FileIndexer* file_indexer_;
618 const Comparator* user_comparator_;
619 const InternalKeyComparator* internal_comparator_;
620
621 // Setup local variables to search next level.
622 // Returns false if there are no more levels to search.
PrepareNextLevel()623 bool PrepareNextLevel() {
624 if (curr_level_ == 0) {
625 MultiGetRange::Iterator mget_iter = current_level_range_.begin();
626 if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
627 curr_file_level_->num_files) {
628 batch_iter_prev_ = current_level_range_.begin();
629 batch_iter_ = current_level_range_.begin();
630 return true;
631 }
632 }
633
634 curr_level_++;
635 // Reset key range to saved value
636 while (curr_level_ < num_levels_) {
637 bool level_contains_keys = false;
638 curr_file_level_ = &(*level_files_brief_)[curr_level_];
639 if (curr_file_level_->num_files == 0) {
640 // When current level is empty, the search bound generated from upper
641 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
642 // also empty.
643
644 for (auto mget_iter = current_level_range_.begin();
645 mget_iter != current_level_range_.end(); ++mget_iter) {
646 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
647
648 assert(fp_ctx.search_left_bound == 0);
649 assert(fp_ctx.search_right_bound == -1 ||
650 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
651 // Since current level is empty, it will need to search all files in
652 // the next level
653 fp_ctx.search_left_bound = 0;
654 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
655 }
656 // Skip all subsequent empty levels
657 do {
658 ++curr_level_;
659 } while ((curr_level_ < num_levels_) &&
660 (*level_files_brief_)[curr_level_].num_files == 0);
661 continue;
662 }
663
664 // Some files may overlap each other. We find
665 // all files that overlap user_key and process them in order from
666 // newest to oldest. In the context of merge-operator, this can occur at
667 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
668 // are always compacted into a single entry).
669 int32_t start_index = -1;
670 current_level_range_ =
671 MultiGetRange(*range_, range_->begin(), range_->end());
672 for (auto mget_iter = current_level_range_.begin();
673 mget_iter != current_level_range_.end(); ++mget_iter) {
674 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
675 if (curr_level_ == 0) {
676 // On Level-0, we read through all files to check for overlap.
677 start_index = 0;
678 level_contains_keys = true;
679 } else {
680 // On Level-n (n>=1), files are sorted. Binary search to find the
681 // earliest file whose largest key >= ikey. Search left bound and
682 // right bound are used to narrow the range.
683 if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
684 if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
685 fp_ctx.search_right_bound =
686 static_cast<int32_t>(curr_file_level_->num_files) - 1;
687 }
688 // `search_right_bound_` is an inclusive upper-bound, but since it
689 // was determined based on user key, it is still possible the lookup
690 // key falls to the right of `search_right_bound_`'s corresponding
691 // file. So, pass a limit one higher, which allows us to detect this
692 // case.
693 Slice& ikey = mget_iter->ikey;
694 start_index = FindFileInRange(
695 *internal_comparator_, *curr_file_level_, ikey,
696 static_cast<uint32_t>(fp_ctx.search_left_bound),
697 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
698 if (start_index == fp_ctx.search_right_bound + 1) {
699 // `ikey_` comes after `search_right_bound_`. The lookup key does
700 // not exist on this level, so let's skip this level and do a full
701 // binary search on the next level.
702 fp_ctx.search_left_bound = 0;
703 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
704 current_level_range_.SkipKey(mget_iter);
705 continue;
706 } else {
707 level_contains_keys = true;
708 }
709 } else {
710 // search_left_bound > search_right_bound, key does not exist in
711 // this level. Since no comparison is done in this level, it will
712 // need to search all files in the next level.
713 fp_ctx.search_left_bound = 0;
714 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
715 current_level_range_.SkipKey(mget_iter);
716 continue;
717 }
718 }
719 fp_ctx.start_index_in_curr_level = start_index;
720 fp_ctx.curr_index_in_curr_level = start_index;
721 }
722 if (level_contains_keys) {
723 batch_iter_prev_ = current_level_range_.begin();
724 batch_iter_ = current_level_range_.begin();
725 return true;
726 }
727 curr_level_++;
728 }
729 // curr_level_ = num_levels_. So, no more levels to search.
730 return false;
731 }
732 };
733 } // anonymous namespace
734
~VersionStorageInfo()735 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
736
~Version()737 Version::~Version() {
738 assert(refs_ == 0);
739
740 // Remove from linked list
741 prev_->next_ = next_;
742 next_->prev_ = prev_;
743
744 // Drop references to files
745 for (int level = 0; level < storage_info_.num_levels_; level++) {
746 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
747 FileMetaData* f = storage_info_.files_[level][i];
748 assert(f->refs > 0);
749 f->refs--;
750 if (f->refs <= 0) {
751 assert(cfd_ != nullptr);
752 uint32_t path_id = f->fd.GetPathId();
753 assert(path_id < cfd_->ioptions()->cf_paths.size());
754 vset_->obsolete_files_.push_back(
755 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
756 }
757 }
758 }
759 }
760
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)761 int FindFile(const InternalKeyComparator& icmp,
762 const LevelFilesBrief& file_level,
763 const Slice& key) {
764 return FindFileInRange(icmp, file_level, key, 0,
765 static_cast<uint32_t>(file_level.num_files));
766 }
767
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)768 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
769 const std::vector<FileMetaData*>& files,
770 Arena* arena) {
771 assert(file_level);
772 assert(arena);
773
774 size_t num = files.size();
775 file_level->num_files = num;
776 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
777 file_level->files = new (mem)FdWithKeyRange[num];
778
779 for (size_t i = 0; i < num; i++) {
780 Slice smallest_key = files[i]->smallest.Encode();
781 Slice largest_key = files[i]->largest.Encode();
782
783 // Copy key slice to sequential memory
784 size_t smallest_size = smallest_key.size();
785 size_t largest_size = largest_key.size();
786 mem = arena->AllocateAligned(smallest_size + largest_size);
787 memcpy(mem, smallest_key.data(), smallest_size);
788 memcpy(mem + smallest_size, largest_key.data(), largest_size);
789
790 FdWithKeyRange& f = file_level->files[i];
791 f.fd = files[i]->fd;
792 f.file_metadata = files[i];
793 f.smallest_key = Slice(mem, smallest_size);
794 f.largest_key = Slice(mem + smallest_size, largest_size);
795 }
796 }
797
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)798 static bool AfterFile(const Comparator* ucmp,
799 const Slice* user_key, const FdWithKeyRange* f) {
800 // nullptr user_key occurs before all keys and is therefore never after *f
801 return (user_key != nullptr &&
802 ucmp->CompareWithoutTimestamp(*user_key,
803 ExtractUserKey(f->largest_key)) > 0);
804 }
805
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)806 static bool BeforeFile(const Comparator* ucmp,
807 const Slice* user_key, const FdWithKeyRange* f) {
808 // nullptr user_key occurs after all keys and is therefore never before *f
809 return (user_key != nullptr &&
810 ucmp->CompareWithoutTimestamp(*user_key,
811 ExtractUserKey(f->smallest_key)) < 0);
812 }
813
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)814 bool SomeFileOverlapsRange(
815 const InternalKeyComparator& icmp,
816 bool disjoint_sorted_files,
817 const LevelFilesBrief& file_level,
818 const Slice* smallest_user_key,
819 const Slice* largest_user_key) {
820 const Comparator* ucmp = icmp.user_comparator();
821 if (!disjoint_sorted_files) {
822 // Need to check against all files
823 for (size_t i = 0; i < file_level.num_files; i++) {
824 const FdWithKeyRange* f = &(file_level.files[i]);
825 if (AfterFile(ucmp, smallest_user_key, f) ||
826 BeforeFile(ucmp, largest_user_key, f)) {
827 // No overlap
828 } else {
829 return true; // Overlap
830 }
831 }
832 return false;
833 }
834
835 // Binary search over file list
836 uint32_t index = 0;
837 if (smallest_user_key != nullptr) {
838 // Find the leftmost possible internal key for smallest_user_key
839 InternalKey small;
840 small.SetMinPossibleForUserKey(*smallest_user_key);
841 index = FindFile(icmp, file_level, small.Encode());
842 }
843
844 if (index >= file_level.num_files) {
845 // beginning of range is after all files, so no overlap.
846 return false;
847 }
848
849 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
850 }
851
852 namespace {
853
854 class LevelIterator final : public InternalIterator {
855 public:
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr)856 LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
857 const FileOptions& file_options,
858 const InternalKeyComparator& icomparator,
859 const LevelFilesBrief* flevel,
860 const SliceTransform* prefix_extractor, bool should_sample,
861 HistogramImpl* file_read_hist, TableReaderCaller caller,
862 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
863 const std::vector<AtomicCompactionUnitBoundary>*
864 compaction_boundaries = nullptr)
865 : table_cache_(table_cache),
866 read_options_(read_options),
867 file_options_(file_options),
868 icomparator_(icomparator),
869 user_comparator_(icomparator.user_comparator()),
870 flevel_(flevel),
871 prefix_extractor_(prefix_extractor),
872 file_read_hist_(file_read_hist),
873 should_sample_(should_sample),
874 caller_(caller),
875 skip_filters_(skip_filters),
876 file_index_(flevel_->num_files),
877 level_(level),
878 range_del_agg_(range_del_agg),
879 pinned_iters_mgr_(nullptr),
880 compaction_boundaries_(compaction_boundaries) {
881 // Empty level is not supported.
882 assert(flevel_ != nullptr && flevel_->num_files > 0);
883 }
884
~LevelIterator()885 ~LevelIterator() override { delete file_iter_.Set(nullptr); }
886
887 void Seek(const Slice& target) override;
888 void SeekForPrev(const Slice& target) override;
889 void SeekToFirst() override;
890 void SeekToLast() override;
891 void Next() final override;
892 bool NextAndGetResult(IterateResult* result) override;
893 void Prev() override;
894
Valid() const895 bool Valid() const override { return file_iter_.Valid(); }
key() const896 Slice key() const override {
897 assert(Valid());
898 return file_iter_.key();
899 }
900
value() const901 Slice value() const override {
902 assert(Valid());
903 return file_iter_.value();
904 }
905
status() const906 Status status() const override {
907 return file_iter_.iter() ? file_iter_.status() : Status::OK();
908 }
909
MayBeOutOfLowerBound()910 inline bool MayBeOutOfLowerBound() override {
911 assert(Valid());
912 return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
913 }
914
MayBeOutOfUpperBound()915 inline bool MayBeOutOfUpperBound() override {
916 assert(Valid());
917 return file_iter_.MayBeOutOfUpperBound();
918 }
919
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)920 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
921 pinned_iters_mgr_ = pinned_iters_mgr;
922 if (file_iter_.iter()) {
923 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
924 }
925 }
926
IsKeyPinned() const927 bool IsKeyPinned() const override {
928 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
929 file_iter_.iter() && file_iter_.IsKeyPinned();
930 }
931
IsValuePinned() const932 bool IsValuePinned() const override {
933 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
934 file_iter_.iter() && file_iter_.IsValuePinned();
935 }
936
937 private:
938 // Return true if at least one invalid file is seen and skipped.
939 bool SkipEmptyFileForward();
940 void SkipEmptyFileBackward();
941 void SetFileIterator(InternalIterator* iter);
942 void InitFileIterator(size_t new_file_index);
943
944 // Called by both of Next() and NextAndGetResult(). Force inline.
NextImpl()945 void NextImpl() {
946 assert(Valid());
947 file_iter_.Next();
948 SkipEmptyFileForward();
949 }
950
file_smallest_key(size_t file_index)951 const Slice& file_smallest_key(size_t file_index) {
952 assert(file_index < flevel_->num_files);
953 return flevel_->files[file_index].smallest_key;
954 }
955
KeyReachedUpperBound(const Slice & internal_key)956 bool KeyReachedUpperBound(const Slice& internal_key) {
957 return read_options_.iterate_upper_bound != nullptr &&
958 user_comparator_.CompareWithoutTimestamp(
959 ExtractUserKey(internal_key), /*a_has_ts=*/true,
960 *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
961 }
962
NewFileIterator()963 InternalIterator* NewFileIterator() {
964 assert(file_index_ < flevel_->num_files);
965 auto file_meta = flevel_->files[file_index_];
966 if (should_sample_) {
967 sample_file_read_inc(file_meta.file_metadata);
968 }
969
970 const InternalKey* smallest_compaction_key = nullptr;
971 const InternalKey* largest_compaction_key = nullptr;
972 if (compaction_boundaries_ != nullptr) {
973 smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
974 largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
975 }
976 CheckMayBeOutOfLowerBound();
977 return table_cache_->NewIterator(
978 read_options_, file_options_, icomparator_, *file_meta.file_metadata,
979 range_del_agg_, prefix_extractor_,
980 nullptr /* don't need reference to table */, file_read_hist_, caller_,
981 /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
982 largest_compaction_key);
983 }
984
985 // Check if current file being fully within iterate_lower_bound.
986 //
987 // Note MyRocks may update iterate bounds between seek. To workaround it,
988 // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()989 void CheckMayBeOutOfLowerBound() {
990 if (read_options_.iterate_lower_bound != nullptr &&
991 file_index_ < flevel_->num_files) {
992 may_be_out_of_lower_bound_ =
993 user_comparator_.Compare(
994 ExtractUserKey(file_smallest_key(file_index_)),
995 *read_options_.iterate_lower_bound) < 0;
996 }
997 }
998
999 TableCache* table_cache_;
1000 const ReadOptions read_options_;
1001 const FileOptions& file_options_;
1002 const InternalKeyComparator& icomparator_;
1003 const UserComparatorWrapper user_comparator_;
1004 const LevelFilesBrief* flevel_;
1005 mutable FileDescriptor current_value_;
1006 // `prefix_extractor_` may be non-null even for total order seek. Checking
1007 // this variable is not the right way to identify whether prefix iterator
1008 // is used.
1009 const SliceTransform* prefix_extractor_;
1010
1011 HistogramImpl* file_read_hist_;
1012 bool should_sample_;
1013 TableReaderCaller caller_;
1014 bool skip_filters_;
1015 bool may_be_out_of_lower_bound_ = true;
1016 size_t file_index_;
1017 int level_;
1018 RangeDelAggregator* range_del_agg_;
1019 IteratorWrapper file_iter_; // May be nullptr
1020 PinnedIteratorsManager* pinned_iters_mgr_;
1021
1022 // To be propagated to RangeDelAggregator in order to safely truncate range
1023 // tombstones.
1024 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1025 };
1026
Seek(const Slice & target)1027 void LevelIterator::Seek(const Slice& target) {
1028 // Check whether the seek key fall under the same file
1029 bool need_to_reseek = true;
1030 if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1031 const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1032 if (icomparator_.InternalKeyComparator::Compare(
1033 target, cur_file.largest_key) <= 0 &&
1034 icomparator_.InternalKeyComparator::Compare(
1035 target, cur_file.smallest_key) >= 0) {
1036 need_to_reseek = false;
1037 assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1038 file_index_);
1039 }
1040 }
1041 if (need_to_reseek) {
1042 TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1043 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1044 InitFileIterator(new_file_index);
1045 }
1046
1047 if (file_iter_.iter() != nullptr) {
1048 file_iter_.Seek(target);
1049 }
1050 if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1051 !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1052 file_iter_.iter() != nullptr && file_iter_.Valid()) {
1053 // We've skipped the file we initially positioned to. In the prefix
1054 // seek case, it is likely that the file is skipped because of
1055 // prefix bloom or hash, where more keys are skipped. We then check
1056 // the current key and invalidate the iterator if the prefix is
1057 // already passed.
1058 // When doing prefix iterator seek, when keys for one prefix have
1059 // been exhausted, it can jump to any key that is larger. Here we are
1060 // enforcing a stricter contract than that, in order to make it easier for
1061 // higher layers (merging and DB iterator) to reason the correctness:
1062 // 1. Within the prefix, the result should be accurate.
1063 // 2. If keys for the prefix is exhausted, it is either positioned to the
1064 // next key after the prefix, or make the iterator invalid.
1065 // A side benefit will be that it invalidates the iterator earlier so that
1066 // the upper level merging iterator can merge fewer child iterators.
1067 Slice target_user_key = ExtractUserKey(target);
1068 Slice file_user_key = ExtractUserKey(file_iter_.key());
1069 if (prefix_extractor_->InDomain(target_user_key) &&
1070 (!prefix_extractor_->InDomain(file_user_key) ||
1071 user_comparator_.Compare(
1072 prefix_extractor_->Transform(target_user_key),
1073 prefix_extractor_->Transform(file_user_key)) != 0)) {
1074 SetFileIterator(nullptr);
1075 }
1076 }
1077 CheckMayBeOutOfLowerBound();
1078 }
1079
SeekForPrev(const Slice & target)1080 void LevelIterator::SeekForPrev(const Slice& target) {
1081 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1082 if (new_file_index >= flevel_->num_files) {
1083 new_file_index = flevel_->num_files - 1;
1084 }
1085
1086 InitFileIterator(new_file_index);
1087 if (file_iter_.iter() != nullptr) {
1088 file_iter_.SeekForPrev(target);
1089 SkipEmptyFileBackward();
1090 }
1091 CheckMayBeOutOfLowerBound();
1092 }
1093
SeekToFirst()1094 void LevelIterator::SeekToFirst() {
1095 InitFileIterator(0);
1096 if (file_iter_.iter() != nullptr) {
1097 file_iter_.SeekToFirst();
1098 }
1099 SkipEmptyFileForward();
1100 CheckMayBeOutOfLowerBound();
1101 }
1102
SeekToLast()1103 void LevelIterator::SeekToLast() {
1104 InitFileIterator(flevel_->num_files - 1);
1105 if (file_iter_.iter() != nullptr) {
1106 file_iter_.SeekToLast();
1107 }
1108 SkipEmptyFileBackward();
1109 CheckMayBeOutOfLowerBound();
1110 }
1111
Next()1112 void LevelIterator::Next() { NextImpl(); }
1113
NextAndGetResult(IterateResult * result)1114 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1115 NextImpl();
1116 bool is_valid = Valid();
1117 if (is_valid) {
1118 result->key = key();
1119 result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
1120 }
1121 return is_valid;
1122 }
1123
Prev()1124 void LevelIterator::Prev() {
1125 assert(Valid());
1126 file_iter_.Prev();
1127 SkipEmptyFileBackward();
1128 }
1129
SkipEmptyFileForward()1130 bool LevelIterator::SkipEmptyFileForward() {
1131 bool seen_empty_file = false;
1132 while (file_iter_.iter() == nullptr ||
1133 (!file_iter_.Valid() && file_iter_.status().ok() &&
1134 !file_iter_.iter()->IsOutOfBound())) {
1135 seen_empty_file = true;
1136 // Move to next file
1137 if (file_index_ >= flevel_->num_files - 1) {
1138 // Already at the last file
1139 SetFileIterator(nullptr);
1140 break;
1141 }
1142 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1143 SetFileIterator(nullptr);
1144 break;
1145 }
1146 InitFileIterator(file_index_ + 1);
1147 if (file_iter_.iter() != nullptr) {
1148 file_iter_.SeekToFirst();
1149 }
1150 }
1151 return seen_empty_file;
1152 }
1153
SkipEmptyFileBackward()1154 void LevelIterator::SkipEmptyFileBackward() {
1155 while (file_iter_.iter() == nullptr ||
1156 (!file_iter_.Valid() && file_iter_.status().ok())) {
1157 // Move to previous file
1158 if (file_index_ == 0) {
1159 // Already the first file
1160 SetFileIterator(nullptr);
1161 return;
1162 }
1163 InitFileIterator(file_index_ - 1);
1164 if (file_iter_.iter() != nullptr) {
1165 file_iter_.SeekToLast();
1166 }
1167 }
1168 }
1169
SetFileIterator(InternalIterator * iter)1170 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1171 if (pinned_iters_mgr_ && iter) {
1172 iter->SetPinnedItersMgr(pinned_iters_mgr_);
1173 }
1174
1175 InternalIterator* old_iter = file_iter_.Set(iter);
1176 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1177 pinned_iters_mgr_->PinIterator(old_iter);
1178 } else {
1179 delete old_iter;
1180 }
1181 }
1182
InitFileIterator(size_t new_file_index)1183 void LevelIterator::InitFileIterator(size_t new_file_index) {
1184 if (new_file_index >= flevel_->num_files) {
1185 file_index_ = new_file_index;
1186 SetFileIterator(nullptr);
1187 return;
1188 } else {
1189 // If the file iterator shows incomplete, we try it again if users seek
1190 // to the same file, as this time we may go to a different data block
1191 // which is cached in block cache.
1192 //
1193 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1194 new_file_index == file_index_) {
1195 // file_iter_ is already constructed with this iterator, so
1196 // no need to change anything
1197 } else {
1198 file_index_ = new_file_index;
1199 InternalIterator* iter = NewFileIterator();
1200 SetFileIterator(iter);
1201 }
1202 }
1203 }
1204 } // anonymous namespace
1205
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1206 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1207 const FileMetaData* file_meta,
1208 const std::string* fname) const {
1209 auto table_cache = cfd_->table_cache();
1210 auto ioptions = cfd_->ioptions();
1211 Status s = table_cache->GetTableProperties(
1212 file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1213 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1214 if (s.ok()) {
1215 return s;
1216 }
1217
1218 // We only ignore error type `Incomplete` since it's by design that we
1219 // disallow table when it's not in table cache.
1220 if (!s.IsIncomplete()) {
1221 return s;
1222 }
1223
1224 // 2. Table is not present in table cache, we'll read the table properties
1225 // directly from the properties block in the file.
1226 std::unique_ptr<FSRandomAccessFile> file;
1227 std::string file_name;
1228 if (fname != nullptr) {
1229 file_name = *fname;
1230 } else {
1231 file_name =
1232 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1233 file_meta->fd.GetPathId());
1234 }
1235 s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1236 nullptr);
1237 if (!s.ok()) {
1238 return s;
1239 }
1240
1241 TableProperties* raw_table_properties;
1242 // By setting the magic number to kInvalidTableMagicNumber, we can by
1243 // pass the magic number check in the footer.
1244 std::unique_ptr<RandomAccessFileReader> file_reader(
1245 new RandomAccessFileReader(
1246 std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
1247 0 /* hist_type */, nullptr /* file_read_hist */,
1248 nullptr /* rate_limiter */, ioptions->listeners));
1249 s = ReadTableProperties(
1250 file_reader.get(), file_meta->fd.GetFileSize(),
1251 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1252 &raw_table_properties, false /* compression_type_missing */);
1253 if (!s.ok()) {
1254 return s;
1255 }
1256 RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1257
1258 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1259 return s;
1260 }
1261
GetPropertiesOfAllTables(TablePropertiesCollection * props)1262 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1263 Status s;
1264 for (int level = 0; level < storage_info_.num_levels_; level++) {
1265 s = GetPropertiesOfAllTables(props, level);
1266 if (!s.ok()) {
1267 return s;
1268 }
1269 }
1270
1271 return Status::OK();
1272 }
1273
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1274 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1275 std::string* out_str) {
1276 if (max_entries_to_print <= 0) {
1277 return Status::OK();
1278 }
1279 int num_entries_left = max_entries_to_print;
1280
1281 std::stringstream ss;
1282
1283 for (int level = 0; level < storage_info_.num_levels_; level++) {
1284 for (const auto& file_meta : storage_info_.files_[level]) {
1285 auto fname =
1286 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1287 file_meta->fd.GetPathId());
1288
1289 ss << "=== file : " << fname << " ===\n";
1290
1291 TableCache* table_cache = cfd_->table_cache();
1292 std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1293
1294 Status s = table_cache->GetRangeTombstoneIterator(
1295 ReadOptions(), cfd_->internal_comparator(), *file_meta,
1296 &tombstone_iter);
1297 if (!s.ok()) {
1298 return s;
1299 }
1300 if (tombstone_iter) {
1301 tombstone_iter->SeekToFirst();
1302
1303 while (tombstone_iter->Valid() && num_entries_left > 0) {
1304 ss << "start: " << tombstone_iter->start_key().ToString(true)
1305 << " end: " << tombstone_iter->end_key().ToString(true)
1306 << " seq: " << tombstone_iter->seq() << '\n';
1307 tombstone_iter->Next();
1308 num_entries_left--;
1309 }
1310 if (num_entries_left <= 0) {
1311 break;
1312 }
1313 }
1314 }
1315 if (num_entries_left <= 0) {
1316 break;
1317 }
1318 }
1319 assert(num_entries_left >= 0);
1320 if (num_entries_left <= 0) {
1321 ss << "(results may not be complete)\n";
1322 }
1323
1324 *out_str = ss.str();
1325 return Status::OK();
1326 }
1327
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1328 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1329 int level) {
1330 for (const auto& file_meta : storage_info_.files_[level]) {
1331 auto fname =
1332 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1333 file_meta->fd.GetPathId());
1334 // 1. If the table is already present in table cache, load table
1335 // properties from there.
1336 std::shared_ptr<const TableProperties> table_properties;
1337 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1338 if (s.ok()) {
1339 props->insert({fname, table_properties});
1340 } else {
1341 return s;
1342 }
1343 }
1344
1345 return Status::OK();
1346 }
1347
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1348 Status Version::GetPropertiesOfTablesInRange(
1349 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1350 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1351 for (decltype(n) i = 0; i < n; i++) {
1352 // Convert user_key into a corresponding internal key.
1353 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1354 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1355 std::vector<FileMetaData*> files;
1356 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1357 false);
1358 for (const auto& file_meta : files) {
1359 auto fname =
1360 TableFileName(cfd_->ioptions()->cf_paths,
1361 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1362 if (props->count(fname) == 0) {
1363 // 1. If the table is already present in table cache, load table
1364 // properties from there.
1365 std::shared_ptr<const TableProperties> table_properties;
1366 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1367 if (s.ok()) {
1368 props->insert({fname, table_properties});
1369 } else {
1370 return s;
1371 }
1372 }
1373 }
1374 }
1375 }
1376
1377 return Status::OK();
1378 }
1379
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1380 Status Version::GetAggregatedTableProperties(
1381 std::shared_ptr<const TableProperties>* tp, int level) {
1382 TablePropertiesCollection props;
1383 Status s;
1384 if (level < 0) {
1385 s = GetPropertiesOfAllTables(&props);
1386 } else {
1387 s = GetPropertiesOfAllTables(&props, level);
1388 }
1389 if (!s.ok()) {
1390 return s;
1391 }
1392
1393 auto* new_tp = new TableProperties();
1394 for (const auto& item : props) {
1395 new_tp->Add(*item.second);
1396 }
1397 tp->reset(new_tp);
1398 return Status::OK();
1399 }
1400
GetMemoryUsageByTableReaders()1401 size_t Version::GetMemoryUsageByTableReaders() {
1402 size_t total_usage = 0;
1403 for (auto& file_level : storage_info_.level_files_brief_) {
1404 for (size_t i = 0; i < file_level.num_files; i++) {
1405 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1406 file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1407 mutable_cf_options_.prefix_extractor.get());
1408 }
1409 }
1410 return total_usage;
1411 }
1412
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1413 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1414 assert(cf_meta);
1415 assert(cfd_);
1416
1417 cf_meta->name = cfd_->GetName();
1418 cf_meta->size = 0;
1419 cf_meta->file_count = 0;
1420 cf_meta->levels.clear();
1421
1422 auto* ioptions = cfd_->ioptions();
1423 auto* vstorage = storage_info();
1424
1425 for (int level = 0; level < cfd_->NumberLevels(); level++) {
1426 uint64_t level_size = 0;
1427 cf_meta->file_count += vstorage->LevelFiles(level).size();
1428 std::vector<SstFileMetaData> files;
1429 for (const auto& file : vstorage->LevelFiles(level)) {
1430 uint32_t path_id = file->fd.GetPathId();
1431 std::string file_path;
1432 if (path_id < ioptions->cf_paths.size()) {
1433 file_path = ioptions->cf_paths[path_id].path;
1434 } else {
1435 assert(!ioptions->cf_paths.empty());
1436 file_path = ioptions->cf_paths.back().path;
1437 }
1438 const uint64_t file_number = file->fd.GetNumber();
1439 files.emplace_back(SstFileMetaData{
1440 MakeTableFileName("", file_number), file_number, file_path,
1441 static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1442 file->fd.largest_seqno, file->smallest.user_key().ToString(),
1443 file->largest.user_key().ToString(),
1444 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1445 file->being_compacted, file->oldest_blob_file_number,
1446 file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
1447 file->file_checksum, file->file_checksum_func_name});
1448 files.back().num_entries = file->num_entries;
1449 files.back().num_deletions = file->num_deletions;
1450 level_size += file->fd.GetFileSize();
1451 }
1452 cf_meta->levels.emplace_back(
1453 level, level_size, std::move(files));
1454 cf_meta->size += level_size;
1455 }
1456 }
1457
GetSstFilesSize()1458 uint64_t Version::GetSstFilesSize() {
1459 uint64_t sst_files_size = 0;
1460 for (int level = 0; level < storage_info_.num_levels_; level++) {
1461 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1462 sst_files_size += file_meta->fd.GetFileSize();
1463 }
1464 }
1465 return sst_files_size;
1466 }
1467
GetCreationTimeOfOldestFile(uint64_t * creation_time)1468 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1469 uint64_t oldest_time = port::kMaxUint64;
1470 for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1471 for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1472 assert(meta->fd.table_reader != nullptr);
1473 uint64_t file_creation_time = meta->TryGetFileCreationTime();
1474 if (file_creation_time == kUnknownFileCreationTime) {
1475 *creation_time = 0;
1476 return;
1477 }
1478 if (file_creation_time < oldest_time) {
1479 oldest_time = file_creation_time;
1480 }
1481 }
1482 }
1483 *creation_time = oldest_time;
1484 }
1485
GetEstimatedActiveKeys() const1486 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1487 // Estimation will be inaccurate when:
1488 // (1) there exist merge keys
1489 // (2) keys are directly overwritten
1490 // (3) deletion on non-existing keys
1491 // (4) low number of samples
1492 if (current_num_samples_ == 0) {
1493 return 0;
1494 }
1495
1496 if (current_num_non_deletions_ <= current_num_deletions_) {
1497 return 0;
1498 }
1499
1500 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1501
1502 uint64_t file_count = 0;
1503 for (int level = 0; level < num_levels_; ++level) {
1504 file_count += files_[level].size();
1505 }
1506
1507 if (current_num_samples_ < file_count) {
1508 // casting to avoid overflowing
1509 return
1510 static_cast<uint64_t>(
1511 (est * static_cast<double>(file_count) / current_num_samples_)
1512 );
1513 } else {
1514 return est;
1515 }
1516 }
1517
GetEstimatedCompressionRatioAtLevel(int level) const1518 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1519 int level) const {
1520 assert(level < num_levels_);
1521 uint64_t sum_file_size_bytes = 0;
1522 uint64_t sum_data_size_bytes = 0;
1523 for (auto* file_meta : files_[level]) {
1524 sum_file_size_bytes += file_meta->fd.GetFileSize();
1525 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1526 }
1527 if (sum_file_size_bytes == 0) {
1528 return -1.0;
1529 }
1530 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1531 }
1532
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg)1533 void Version::AddIterators(const ReadOptions& read_options,
1534 const FileOptions& soptions,
1535 MergeIteratorBuilder* merge_iter_builder,
1536 RangeDelAggregator* range_del_agg) {
1537 assert(storage_info_.finalized_);
1538
1539 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1540 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1541 range_del_agg);
1542 }
1543 }
1544
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg)1545 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1546 const FileOptions& soptions,
1547 MergeIteratorBuilder* merge_iter_builder,
1548 int level,
1549 RangeDelAggregator* range_del_agg) {
1550 assert(storage_info_.finalized_);
1551 if (level >= storage_info_.num_non_empty_levels()) {
1552 // This is an empty level
1553 return;
1554 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1555 // No files in this level
1556 return;
1557 }
1558
1559 bool should_sample = should_sample_file_read();
1560
1561 auto* arena = merge_iter_builder->GetArena();
1562 if (level == 0) {
1563 // Merge all level zero files together since they may overlap
1564 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1565 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1566 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1567 read_options, soptions, cfd_->internal_comparator(),
1568 *file.file_metadata, range_del_agg,
1569 mutable_cf_options_.prefix_extractor.get(), nullptr,
1570 cfd_->internal_stats()->GetFileReadHist(0),
1571 TableReaderCaller::kUserIterator, arena,
1572 /*skip_filters=*/false, /*level=*/0,
1573 /*smallest_compaction_key=*/nullptr,
1574 /*largest_compaction_key=*/nullptr));
1575 }
1576 if (should_sample) {
1577 // Count ones for every L0 files. This is done per iterator creation
1578 // rather than Seek(), while files in other levels are recored per seek.
1579 // If users execute one range query per iterator, there may be some
1580 // discrepancy here.
1581 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1582 sample_file_read_inc(meta);
1583 }
1584 }
1585 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1586 // For levels > 0, we can use a concatenating iterator that sequentially
1587 // walks through the non-overlapping files in the level, opening them
1588 // lazily.
1589 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1590 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1591 cfd_->table_cache(), read_options, soptions,
1592 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1593 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1594 cfd_->internal_stats()->GetFileReadHist(level),
1595 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1596 range_del_agg, /*largest_compaction_key=*/nullptr));
1597 }
1598 }
1599
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1600 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1601 const FileOptions& file_options,
1602 const Slice& smallest_user_key,
1603 const Slice& largest_user_key,
1604 int level, bool* overlap) {
1605 assert(storage_info_.finalized_);
1606
1607 auto icmp = cfd_->internal_comparator();
1608 auto ucmp = icmp.user_comparator();
1609
1610 Arena arena;
1611 Status status;
1612 ReadRangeDelAggregator range_del_agg(&icmp,
1613 kMaxSequenceNumber /* upper_bound */);
1614
1615 *overlap = false;
1616
1617 if (level == 0) {
1618 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1619 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1620 if (AfterFile(ucmp, &smallest_user_key, file) ||
1621 BeforeFile(ucmp, &largest_user_key, file)) {
1622 continue;
1623 }
1624 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1625 read_options, file_options, cfd_->internal_comparator(),
1626 *file->file_metadata, &range_del_agg,
1627 mutable_cf_options_.prefix_extractor.get(), nullptr,
1628 cfd_->internal_stats()->GetFileReadHist(0),
1629 TableReaderCaller::kUserIterator, &arena,
1630 /*skip_filters=*/false, /*level=*/0,
1631 /*smallest_compaction_key=*/nullptr,
1632 /*largest_compaction_key=*/nullptr));
1633 status = OverlapWithIterator(
1634 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1635 if (!status.ok() || *overlap) {
1636 break;
1637 }
1638 }
1639 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1640 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1641 ScopedArenaIterator iter(new (mem) LevelIterator(
1642 cfd_->table_cache(), read_options, file_options,
1643 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1644 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1645 cfd_->internal_stats()->GetFileReadHist(level),
1646 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1647 &range_del_agg));
1648 status = OverlapWithIterator(
1649 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1650 }
1651
1652 if (status.ok() && *overlap == false &&
1653 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1654 *overlap = true;
1655 }
1656 return status;
1657 }
1658
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1659 VersionStorageInfo::VersionStorageInfo(
1660 const InternalKeyComparator* internal_comparator,
1661 const Comparator* user_comparator, int levels,
1662 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1663 bool _force_consistency_checks)
1664 : internal_comparator_(internal_comparator),
1665 user_comparator_(user_comparator),
1666 // cfd is nullptr if Version is dummy
1667 num_levels_(levels),
1668 num_non_empty_levels_(0),
1669 file_indexer_(user_comparator),
1670 compaction_style_(compaction_style),
1671 files_(new std::vector<FileMetaData*>[num_levels_]),
1672 base_level_(num_levels_ == 1 ? -1 : 1),
1673 level_multiplier_(0.0),
1674 files_by_compaction_pri_(num_levels_),
1675 level0_non_overlapping_(false),
1676 next_file_to_compact_by_size_(num_levels_),
1677 compaction_score_(num_levels_),
1678 compaction_level_(num_levels_),
1679 l0_delay_trigger_count_(0),
1680 accumulated_file_size_(0),
1681 accumulated_raw_key_size_(0),
1682 accumulated_raw_value_size_(0),
1683 accumulated_num_non_deletions_(0),
1684 accumulated_num_deletions_(0),
1685 current_num_non_deletions_(0),
1686 current_num_deletions_(0),
1687 current_num_samples_(0),
1688 estimated_compaction_needed_bytes_(0),
1689 finalized_(false),
1690 force_consistency_checks_(_force_consistency_checks) {
1691 if (ref_vstorage != nullptr) {
1692 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1693 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1694 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1695 accumulated_num_non_deletions_ =
1696 ref_vstorage->accumulated_num_non_deletions_;
1697 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1698 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1699 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1700 current_num_samples_ = ref_vstorage->current_num_samples_;
1701 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1702 }
1703 }
1704
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,uint64_t version_number)1705 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1706 const FileOptions& file_opt,
1707 const MutableCFOptions mutable_cf_options,
1708 uint64_t version_number)
1709 : env_(vset->env_),
1710 cfd_(column_family_data),
1711 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1712 db_statistics_((cfd_ == nullptr) ? nullptr
1713 : cfd_->ioptions()->statistics),
1714 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1715 merge_operator_((cfd_ == nullptr) ? nullptr
1716 : cfd_->ioptions()->merge_operator),
1717 storage_info_(
1718 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1719 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1720 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1721 cfd_ == nullptr ? kCompactionStyleLevel
1722 : cfd_->ioptions()->compaction_style,
1723 (cfd_ == nullptr || cfd_->current() == nullptr)
1724 ? nullptr
1725 : cfd_->current()->storage_info(),
1726 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1727 vset_(vset),
1728 next_(this),
1729 prev_(this),
1730 refs_(0),
1731 file_options_(file_opt),
1732 mutable_cf_options_(mutable_cf_options),
1733 version_number_(version_number) {}
1734
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,std::string * timestamp,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1735 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1736 PinnableSlice* value, std::string* timestamp, Status* status,
1737 MergeContext* merge_context,
1738 SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1739 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1740 bool* is_blob, bool do_merge) {
1741 Slice ikey = k.internal_key();
1742 Slice user_key = k.user_key();
1743
1744 assert(status->ok() || status->IsMergeInProgress());
1745
1746 if (key_exists != nullptr) {
1747 // will falsify below if not found
1748 *key_exists = true;
1749 }
1750
1751 PinnedIteratorsManager pinned_iters_mgr;
1752 uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1753 if (vset_ && vset_->block_cache_tracer_ &&
1754 vset_->block_cache_tracer_->is_tracing_enabled()) {
1755 tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1756 }
1757 GetContext get_context(
1758 user_comparator(), merge_operator_, info_log_, db_statistics_,
1759 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1760 do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
1761 merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
1762 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1763 tracing_get_id);
1764
1765 // Pin blocks that we read to hold merge operands
1766 if (merge_operator_) {
1767 pinned_iters_mgr.StartPinning();
1768 }
1769
1770 FilePicker fp(
1771 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1772 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1773 user_comparator(), internal_comparator());
1774 FdWithKeyRange* f = fp.GetNextFile();
1775
1776 while (f != nullptr) {
1777 if (*max_covering_tombstone_seq > 0) {
1778 // The remaining files we look at will only contain covered keys, so we
1779 // stop here.
1780 break;
1781 }
1782 if (get_context.sample()) {
1783 sample_file_read_inc(f->file_metadata);
1784 }
1785
1786 bool timer_enabled =
1787 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1788 get_perf_context()->per_level_perf_context_enabled;
1789 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1790 *status = table_cache_->Get(
1791 read_options, *internal_comparator(), *f->file_metadata, ikey,
1792 &get_context, mutable_cf_options_.prefix_extractor.get(),
1793 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1794 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1795 fp.IsHitFileLastInLevel()),
1796 fp.GetCurrentLevel());
1797 // TODO: examine the behavior for corrupted key
1798 if (timer_enabled) {
1799 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1800 fp.GetCurrentLevel());
1801 }
1802 if (!status->ok()) {
1803 return;
1804 }
1805
1806 // report the counters before returning
1807 if (get_context.State() != GetContext::kNotFound &&
1808 get_context.State() != GetContext::kMerge &&
1809 db_statistics_ != nullptr) {
1810 get_context.ReportCounters();
1811 }
1812 switch (get_context.State()) {
1813 case GetContext::kNotFound:
1814 // Keep searching in other files
1815 break;
1816 case GetContext::kMerge:
1817 // TODO: update per-level perfcontext user_key_return_count for kMerge
1818 break;
1819 case GetContext::kFound:
1820 if (fp.GetHitFileLevel() == 0) {
1821 RecordTick(db_statistics_, GET_HIT_L0);
1822 } else if (fp.GetHitFileLevel() == 1) {
1823 RecordTick(db_statistics_, GET_HIT_L1);
1824 } else if (fp.GetHitFileLevel() >= 2) {
1825 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1826 }
1827 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1828 fp.GetHitFileLevel());
1829 return;
1830 case GetContext::kDeleted:
1831 // Use empty error message for speed
1832 *status = Status::NotFound();
1833 return;
1834 case GetContext::kCorrupt:
1835 *status = Status::Corruption("corrupted key for ", user_key);
1836 return;
1837 case GetContext::kBlobIndex:
1838 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1839 *status = Status::NotSupported(
1840 "Encounter unexpected blob index. Please open DB with "
1841 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1842 return;
1843 }
1844 f = fp.GetNextFile();
1845 }
1846 if (db_statistics_ != nullptr) {
1847 get_context.ReportCounters();
1848 }
1849 if (GetContext::kMerge == get_context.State()) {
1850 if (!do_merge) {
1851 *status = Status::OK();
1852 return;
1853 }
1854 if (!merge_operator_) {
1855 *status = Status::InvalidArgument(
1856 "merge_operator is not properly initialized.");
1857 return;
1858 }
1859 // merge_operands are in saver and we hit the beginning of the key history
1860 // do a final merge of nullptr and operands;
1861 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1862 *status = MergeHelper::TimedFullMerge(
1863 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1864 str_value, info_log_, db_statistics_, env_,
1865 nullptr /* result_operand */, true);
1866 if (LIKELY(value != nullptr)) {
1867 value->PinSelf();
1868 }
1869 } else {
1870 if (key_exists != nullptr) {
1871 *key_exists = false;
1872 }
1873 *status = Status::NotFound(); // Use an empty error message for speed
1874 }
1875 }
1876
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)1877 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1878 ReadCallback* callback, bool* is_blob) {
1879 PinnedIteratorsManager pinned_iters_mgr;
1880
1881 // Pin blocks that we read to hold merge operands
1882 if (merge_operator_) {
1883 pinned_iters_mgr.StartPinning();
1884 }
1885 uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
1886
1887 if (vset_ && vset_->block_cache_tracer_ &&
1888 vset_->block_cache_tracer_->is_tracing_enabled()) {
1889 tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
1890 }
1891 // Even though we know the batch size won't be > MAX_BATCH_SIZE,
1892 // use autovector in order to avoid unnecessary construction of GetContext
1893 // objects, which is expensive
1894 autovector<GetContext, 16> get_ctx;
1895 for (auto iter = range->begin(); iter != range->end(); ++iter) {
1896 assert(iter->s->ok() || iter->s->IsMergeInProgress());
1897 get_ctx.emplace_back(
1898 user_comparator(), merge_operator_, info_log_, db_statistics_,
1899 iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
1900 iter->value, iter->timestamp, nullptr, &(iter->merge_context), true,
1901 &iter->max_covering_tombstone_seq, this->env_, nullptr,
1902 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1903 tracing_mget_id);
1904 // MergeInProgress status, if set, has been transferred to the get_context
1905 // state, so we set status to ok here. From now on, the iter status will
1906 // be used for IO errors, and get_context state will be used for any
1907 // key level errors
1908 *(iter->s) = Status::OK();
1909 }
1910 int get_ctx_index = 0;
1911 for (auto iter = range->begin(); iter != range->end();
1912 ++iter, get_ctx_index++) {
1913 iter->get_context = &(get_ctx[get_ctx_index]);
1914 }
1915
1916 MultiGetRange file_picker_range(*range, range->begin(), range->end());
1917 FilePickerMultiGet fp(
1918 &file_picker_range,
1919 &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
1920 &storage_info_.file_indexer_, user_comparator(), internal_comparator());
1921 FdWithKeyRange* f = fp.GetNextFile();
1922
1923 while (f != nullptr) {
1924 MultiGetRange file_range = fp.CurrentFileRange();
1925 bool timer_enabled =
1926 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1927 get_perf_context()->per_level_perf_context_enabled;
1928 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1929 Status s = table_cache_->MultiGet(
1930 read_options, *internal_comparator(), *f->file_metadata, &file_range,
1931 mutable_cf_options_.prefix_extractor.get(),
1932 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1933 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1934 fp.IsHitFileLastInLevel()),
1935 fp.GetCurrentLevel());
1936 // TODO: examine the behavior for corrupted key
1937 if (timer_enabled) {
1938 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1939 fp.GetCurrentLevel());
1940 }
1941 if (!s.ok()) {
1942 // TODO: Set status for individual keys appropriately
1943 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1944 *iter->s = s;
1945 file_range.MarkKeyDone(iter);
1946 }
1947 return;
1948 }
1949 uint64_t batch_size = 0;
1950 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1951 GetContext& get_context = *iter->get_context;
1952 Status* status = iter->s;
1953 // The Status in the KeyContext takes precedence over GetContext state
1954 // Status may be an error if there were any IO errors in the table
1955 // reader. We never expect Status to be NotFound(), as that is
1956 // determined by get_context
1957 assert(!status->IsNotFound());
1958 if (!status->ok()) {
1959 file_range.MarkKeyDone(iter);
1960 continue;
1961 }
1962
1963 if (get_context.sample()) {
1964 sample_file_read_inc(f->file_metadata);
1965 }
1966 batch_size++;
1967 // report the counters before returning
1968 if (get_context.State() != GetContext::kNotFound &&
1969 get_context.State() != GetContext::kMerge &&
1970 db_statistics_ != nullptr) {
1971 get_context.ReportCounters();
1972 } else {
1973 if (iter->max_covering_tombstone_seq > 0) {
1974 // The remaining files we look at will only contain covered keys, so
1975 // we stop here for this key
1976 file_picker_range.SkipKey(iter);
1977 }
1978 }
1979 switch (get_context.State()) {
1980 case GetContext::kNotFound:
1981 // Keep searching in other files
1982 break;
1983 case GetContext::kMerge:
1984 // TODO: update per-level perfcontext user_key_return_count for kMerge
1985 break;
1986 case GetContext::kFound:
1987 if (fp.GetHitFileLevel() == 0) {
1988 RecordTick(db_statistics_, GET_HIT_L0);
1989 } else if (fp.GetHitFileLevel() == 1) {
1990 RecordTick(db_statistics_, GET_HIT_L1);
1991 } else if (fp.GetHitFileLevel() >= 2) {
1992 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1993 }
1994 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1995 fp.GetHitFileLevel());
1996 file_range.MarkKeyDone(iter);
1997 continue;
1998 case GetContext::kDeleted:
1999 // Use empty error message for speed
2000 *status = Status::NotFound();
2001 file_range.MarkKeyDone(iter);
2002 continue;
2003 case GetContext::kCorrupt:
2004 *status =
2005 Status::Corruption("corrupted key for ", iter->lkey->user_key());
2006 file_range.MarkKeyDone(iter);
2007 continue;
2008 case GetContext::kBlobIndex:
2009 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2010 *status = Status::NotSupported(
2011 "Encounter unexpected blob index. Please open DB with "
2012 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2013 file_range.MarkKeyDone(iter);
2014 continue;
2015 }
2016 }
2017 RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2018 if (file_picker_range.empty()) {
2019 break;
2020 }
2021 f = fp.GetNextFile();
2022 }
2023
2024 // Process any left over keys
2025 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2026 GetContext& get_context = *iter->get_context;
2027 Status* status = iter->s;
2028 Slice user_key = iter->lkey->user_key();
2029
2030 if (db_statistics_ != nullptr) {
2031 get_context.ReportCounters();
2032 }
2033 if (GetContext::kMerge == get_context.State()) {
2034 if (!merge_operator_) {
2035 *status = Status::InvalidArgument(
2036 "merge_operator is not properly initialized.");
2037 range->MarkKeyDone(iter);
2038 continue;
2039 }
2040 // merge_operands are in saver and we hit the beginning of the key history
2041 // do a final merge of nullptr and operands;
2042 std::string* str_value =
2043 iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2044 *status = MergeHelper::TimedFullMerge(
2045 merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2046 str_value, info_log_, db_statistics_, env_,
2047 nullptr /* result_operand */, true);
2048 if (LIKELY(iter->value != nullptr)) {
2049 iter->value->PinSelf();
2050 }
2051 } else {
2052 range->MarkKeyDone(iter);
2053 *status = Status::NotFound(); // Use an empty error message for speed
2054 }
2055 }
2056 }
2057
IsFilterSkipped(int level,bool is_file_last_in_level)2058 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2059 // Reaching the bottom level implies misses at all upper levels, so we'll
2060 // skip checking the filters when we predict a hit.
2061 return cfd_->ioptions()->optimize_filters_for_hits &&
2062 (level > 0 || is_file_last_in_level) &&
2063 level == storage_info_.num_non_empty_levels() - 1;
2064 }
2065
GenerateLevelFilesBrief()2066 void VersionStorageInfo::GenerateLevelFilesBrief() {
2067 level_files_brief_.resize(num_non_empty_levels_);
2068 for (int level = 0; level < num_non_empty_levels_; level++) {
2069 DoGenerateLevelFilesBrief(
2070 &level_files_brief_[level], files_[level], &arena_);
2071 }
2072 }
2073
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2074 void Version::PrepareApply(
2075 const MutableCFOptions& mutable_cf_options,
2076 bool update_stats) {
2077 UpdateAccumulatedStats(update_stats);
2078 storage_info_.UpdateNumNonEmptyLevels();
2079 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2080 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2081 storage_info_.GenerateFileIndexer();
2082 storage_info_.GenerateLevelFilesBrief();
2083 storage_info_.GenerateLevel0NonOverlapping();
2084 storage_info_.GenerateBottommostFiles();
2085 }
2086
MaybeInitializeFileMetaData(FileMetaData * file_meta)2087 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2088 if (file_meta->init_stats_from_file ||
2089 file_meta->compensated_file_size > 0) {
2090 return false;
2091 }
2092 std::shared_ptr<const TableProperties> tp;
2093 Status s = GetTableProperties(&tp, file_meta);
2094 file_meta->init_stats_from_file = true;
2095 if (!s.ok()) {
2096 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2097 "Unable to load table properties for file %" PRIu64
2098 " --- %s\n",
2099 file_meta->fd.GetNumber(), s.ToString().c_str());
2100 return false;
2101 }
2102 if (tp.get() == nullptr) return false;
2103 file_meta->num_entries = tp->num_entries;
2104 file_meta->num_deletions = tp->num_deletions;
2105 file_meta->raw_value_size = tp->raw_value_size;
2106 file_meta->raw_key_size = tp->raw_key_size;
2107
2108 return true;
2109 }
2110
UpdateAccumulatedStats(FileMetaData * file_meta)2111 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2112 TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2113 nullptr);
2114
2115 assert(file_meta->init_stats_from_file);
2116 accumulated_file_size_ += file_meta->fd.GetFileSize();
2117 accumulated_raw_key_size_ += file_meta->raw_key_size;
2118 accumulated_raw_value_size_ += file_meta->raw_value_size;
2119 accumulated_num_non_deletions_ +=
2120 file_meta->num_entries - file_meta->num_deletions;
2121 accumulated_num_deletions_ += file_meta->num_deletions;
2122
2123 current_num_non_deletions_ +=
2124 file_meta->num_entries - file_meta->num_deletions;
2125 current_num_deletions_ += file_meta->num_deletions;
2126 current_num_samples_++;
2127 }
2128
RemoveCurrentStats(FileMetaData * file_meta)2129 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2130 if (file_meta->init_stats_from_file) {
2131 current_num_non_deletions_ -=
2132 file_meta->num_entries - file_meta->num_deletions;
2133 current_num_deletions_ -= file_meta->num_deletions;
2134 current_num_samples_--;
2135 }
2136 }
2137
UpdateAccumulatedStats(bool update_stats)2138 void Version::UpdateAccumulatedStats(bool update_stats) {
2139 if (update_stats) {
2140 // maximum number of table properties loaded from files.
2141 const int kMaxInitCount = 20;
2142 int init_count = 0;
2143 // here only the first kMaxInitCount files which haven't been
2144 // initialized from file will be updated with num_deletions.
2145 // The motivation here is to cap the maximum I/O per Version creation.
2146 // The reason for choosing files from lower-level instead of higher-level
2147 // is that such design is able to propagate the initialization from
2148 // lower-level to higher-level: When the num_deletions of lower-level
2149 // files are updated, it will make the lower-level files have accurate
2150 // compensated_file_size, making lower-level to higher-level compaction
2151 // will be triggered, which creates higher-level files whose num_deletions
2152 // will be updated here.
2153 for (int level = 0;
2154 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2155 ++level) {
2156 for (auto* file_meta : storage_info_.files_[level]) {
2157 if (MaybeInitializeFileMetaData(file_meta)) {
2158 // each FileMeta will be initialized only once.
2159 storage_info_.UpdateAccumulatedStats(file_meta);
2160 // when option "max_open_files" is -1, all the file metadata has
2161 // already been read, so MaybeInitializeFileMetaData() won't incur
2162 // any I/O cost. "max_open_files=-1" means that the table cache passed
2163 // to the VersionSet and then to the ColumnFamilySet has a size of
2164 // TableCache::kInfiniteCapacity
2165 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2166 TableCache::kInfiniteCapacity) {
2167 continue;
2168 }
2169 if (++init_count >= kMaxInitCount) {
2170 break;
2171 }
2172 }
2173 }
2174 }
2175 // In case all sampled-files contain only deletion entries, then we
2176 // load the table-property of a file in higher-level to initialize
2177 // that value.
2178 for (int level = storage_info_.num_levels_ - 1;
2179 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2180 --level) {
2181 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2182 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2183 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2184 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2185 }
2186 }
2187 }
2188 }
2189
2190 storage_info_.ComputeCompensatedSizes();
2191 }
2192
ComputeCompensatedSizes()2193 void VersionStorageInfo::ComputeCompensatedSizes() {
2194 static const int kDeletionWeightOnCompaction = 2;
2195 uint64_t average_value_size = GetAverageValueSize();
2196
2197 // compute the compensated size
2198 for (int level = 0; level < num_levels_; level++) {
2199 for (auto* file_meta : files_[level]) {
2200 // Here we only compute compensated_file_size for those file_meta
2201 // which compensated_file_size is uninitialized (== 0). This is true only
2202 // for files that have been created right now and no other thread has
2203 // access to them. That's why we can safely mutate compensated_file_size.
2204 if (file_meta->compensated_file_size == 0) {
2205 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2206 // Here we only boost the size of deletion entries of a file only
2207 // when the number of deletion entries is greater than the number of
2208 // non-deletion entries in the file. The motivation here is that in
2209 // a stable workload, the number of deletion entries should be roughly
2210 // equal to the number of non-deletion entries. If we compensate the
2211 // size of deletion entries in a stable workload, the deletion
2212 // compensation logic might introduce unwanted effet which changes the
2213 // shape of LSM tree.
2214 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2215 file_meta->compensated_file_size +=
2216 (file_meta->num_deletions * 2 - file_meta->num_entries) *
2217 average_value_size * kDeletionWeightOnCompaction;
2218 }
2219 }
2220 }
2221 }
2222 }
2223
MaxInputLevel() const2224 int VersionStorageInfo::MaxInputLevel() const {
2225 if (compaction_style_ == kCompactionStyleLevel) {
2226 return num_levels() - 2;
2227 }
2228 return 0;
2229 }
2230
MaxOutputLevel(bool allow_ingest_behind) const2231 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2232 if (allow_ingest_behind) {
2233 assert(num_levels() > 1);
2234 return num_levels() - 2;
2235 }
2236 return num_levels() - 1;
2237 }
2238
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2239 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2240 const MutableCFOptions& mutable_cf_options) {
2241 // Only implemented for level-based compaction
2242 if (compaction_style_ != kCompactionStyleLevel) {
2243 estimated_compaction_needed_bytes_ = 0;
2244 return;
2245 }
2246
2247 // Start from Level 0, if level 0 qualifies compaction to level 1,
2248 // we estimate the size of compaction.
2249 // Then we move on to the next level and see whether it qualifies compaction
2250 // to the next level. The size of the level is estimated as the actual size
2251 // on the level plus the input bytes from the previous level if there is any.
2252 // If it exceeds, take the exceeded bytes as compaction input and add the size
2253 // of the compaction size to tatal size.
2254 // We keep doing it to Level 2, 3, etc, until the last level and return the
2255 // accumulated bytes.
2256
2257 uint64_t bytes_compact_to_next_level = 0;
2258 uint64_t level_size = 0;
2259 for (auto* f : files_[0]) {
2260 level_size += f->fd.GetFileSize();
2261 }
2262 // Level 0
2263 bool level0_compact_triggered = false;
2264 if (static_cast<int>(files_[0].size()) >=
2265 mutable_cf_options.level0_file_num_compaction_trigger ||
2266 level_size >= mutable_cf_options.max_bytes_for_level_base) {
2267 level0_compact_triggered = true;
2268 estimated_compaction_needed_bytes_ = level_size;
2269 bytes_compact_to_next_level = level_size;
2270 } else {
2271 estimated_compaction_needed_bytes_ = 0;
2272 }
2273
2274 // Level 1 and up.
2275 uint64_t bytes_next_level = 0;
2276 for (int level = base_level(); level <= MaxInputLevel(); level++) {
2277 level_size = 0;
2278 if (bytes_next_level > 0) {
2279 #ifndef NDEBUG
2280 uint64_t level_size2 = 0;
2281 for (auto* f : files_[level]) {
2282 level_size2 += f->fd.GetFileSize();
2283 }
2284 assert(level_size2 == bytes_next_level);
2285 #endif
2286 level_size = bytes_next_level;
2287 bytes_next_level = 0;
2288 } else {
2289 for (auto* f : files_[level]) {
2290 level_size += f->fd.GetFileSize();
2291 }
2292 }
2293 if (level == base_level() && level0_compact_triggered) {
2294 // Add base level size to compaction if level0 compaction triggered.
2295 estimated_compaction_needed_bytes_ += level_size;
2296 }
2297 // Add size added by previous compaction
2298 level_size += bytes_compact_to_next_level;
2299 bytes_compact_to_next_level = 0;
2300 uint64_t level_target = MaxBytesForLevel(level);
2301 if (level_size > level_target) {
2302 bytes_compact_to_next_level = level_size - level_target;
2303 // Estimate the actual compaction fan-out ratio as size ratio between
2304 // the two levels.
2305
2306 assert(bytes_next_level == 0);
2307 if (level + 1 < num_levels_) {
2308 for (auto* f : files_[level + 1]) {
2309 bytes_next_level += f->fd.GetFileSize();
2310 }
2311 }
2312 if (bytes_next_level > 0) {
2313 assert(level_size > 0);
2314 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2315 static_cast<double>(bytes_compact_to_next_level) *
2316 (static_cast<double>(bytes_next_level) /
2317 static_cast<double>(level_size) +
2318 1));
2319 }
2320 }
2321 }
2322 }
2323
2324 namespace {
GetExpiredTtlFilesCount(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2325 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2326 const MutableCFOptions& mutable_cf_options,
2327 const std::vector<FileMetaData*>& files) {
2328 uint32_t ttl_expired_files_count = 0;
2329
2330 int64_t _current_time;
2331 auto status = ioptions.env->GetCurrentTime(&_current_time);
2332 if (status.ok()) {
2333 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2334 for (FileMetaData* f : files) {
2335 if (!f->being_compacted) {
2336 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2337 if (oldest_ancester_time != 0 &&
2338 oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2339 ttl_expired_files_count++;
2340 }
2341 }
2342 }
2343 }
2344 return ttl_expired_files_count;
2345 }
2346 } // anonymous namespace
2347
ComputeCompactionScore(const ImmutableCFOptions & immutable_cf_options,const MutableCFOptions & mutable_cf_options)2348 void VersionStorageInfo::ComputeCompactionScore(
2349 const ImmutableCFOptions& immutable_cf_options,
2350 const MutableCFOptions& mutable_cf_options) {
2351 for (int level = 0; level <= MaxInputLevel(); level++) {
2352 double score;
2353 if (level == 0) {
2354 // We treat level-0 specially by bounding the number of files
2355 // instead of number of bytes for two reasons:
2356 //
2357 // (1) With larger write-buffer sizes, it is nice not to do too
2358 // many level-0 compactions.
2359 //
2360 // (2) The files in level-0 are merged on every read and
2361 // therefore we wish to avoid too many files when the individual
2362 // file size is small (perhaps because of a small write-buffer
2363 // setting, or very high compression ratios, or lots of
2364 // overwrites/deletions).
2365 int num_sorted_runs = 0;
2366 uint64_t total_size = 0;
2367 for (auto* f : files_[level]) {
2368 if (!f->being_compacted) {
2369 total_size += f->compensated_file_size;
2370 num_sorted_runs++;
2371 }
2372 }
2373 if (compaction_style_ == kCompactionStyleUniversal) {
2374 // For universal compaction, we use level0 score to indicate
2375 // compaction score for the whole DB. Adding other levels as if
2376 // they are L0 files.
2377 for (int i = 1; i < num_levels(); i++) {
2378 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2379 num_sorted_runs++;
2380 }
2381 }
2382 }
2383
2384 if (compaction_style_ == kCompactionStyleFIFO) {
2385 score = static_cast<double>(total_size) /
2386 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2387 if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2388 score = std::max(
2389 static_cast<double>(num_sorted_runs) /
2390 mutable_cf_options.level0_file_num_compaction_trigger,
2391 score);
2392 }
2393 if (mutable_cf_options.ttl > 0) {
2394 score = std::max(
2395 static_cast<double>(GetExpiredTtlFilesCount(
2396 immutable_cf_options, mutable_cf_options, files_[level])),
2397 score);
2398 }
2399
2400 } else {
2401 score = static_cast<double>(num_sorted_runs) /
2402 mutable_cf_options.level0_file_num_compaction_trigger;
2403 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2404 // Level-based involves L0->L0 compactions that can lead to oversized
2405 // L0 files. Take into account size as well to avoid later giant
2406 // compactions to the base level.
2407 score = std::max(
2408 score, static_cast<double>(total_size) /
2409 mutable_cf_options.max_bytes_for_level_base);
2410 }
2411 }
2412 } else {
2413 // Compute the ratio of current size to size limit.
2414 uint64_t level_bytes_no_compacting = 0;
2415 for (auto f : files_[level]) {
2416 if (!f->being_compacted) {
2417 level_bytes_no_compacting += f->compensated_file_size;
2418 }
2419 }
2420 score = static_cast<double>(level_bytes_no_compacting) /
2421 MaxBytesForLevel(level);
2422 }
2423 compaction_level_[level] = level;
2424 compaction_score_[level] = score;
2425 }
2426
2427 // sort all the levels based on their score. Higher scores get listed
2428 // first. Use bubble sort because the number of entries are small.
2429 for (int i = 0; i < num_levels() - 2; i++) {
2430 for (int j = i + 1; j < num_levels() - 1; j++) {
2431 if (compaction_score_[i] < compaction_score_[j]) {
2432 double score = compaction_score_[i];
2433 int level = compaction_level_[i];
2434 compaction_score_[i] = compaction_score_[j];
2435 compaction_level_[i] = compaction_level_[j];
2436 compaction_score_[j] = score;
2437 compaction_level_[j] = level;
2438 }
2439 }
2440 }
2441 ComputeFilesMarkedForCompaction();
2442 ComputeBottommostFilesMarkedForCompaction();
2443 if (mutable_cf_options.ttl > 0) {
2444 ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2445 }
2446 if (mutable_cf_options.periodic_compaction_seconds > 0) {
2447 ComputeFilesMarkedForPeriodicCompaction(
2448 immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2449 }
2450 EstimateCompactionBytesNeeded(mutable_cf_options);
2451 }
2452
ComputeFilesMarkedForCompaction()2453 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2454 files_marked_for_compaction_.clear();
2455 int last_qualify_level = 0;
2456
2457 // Do not include files from the last level with data
2458 // If table properties collector suggests a file on the last level,
2459 // we should not move it to a new level.
2460 for (int level = num_levels() - 1; level >= 1; level--) {
2461 if (!files_[level].empty()) {
2462 last_qualify_level = level - 1;
2463 break;
2464 }
2465 }
2466
2467 for (int level = 0; level <= last_qualify_level; level++) {
2468 for (auto* f : files_[level]) {
2469 if (!f->being_compacted && f->marked_for_compaction) {
2470 files_marked_for_compaction_.emplace_back(level, f);
2471 }
2472 }
2473 }
2474 }
2475
ComputeExpiredTtlFiles(const ImmutableCFOptions & ioptions,const uint64_t ttl)2476 void VersionStorageInfo::ComputeExpiredTtlFiles(
2477 const ImmutableCFOptions& ioptions, const uint64_t ttl) {
2478 assert(ttl > 0);
2479
2480 expired_ttl_files_.clear();
2481
2482 int64_t _current_time;
2483 auto status = ioptions.env->GetCurrentTime(&_current_time);
2484 if (!status.ok()) {
2485 return;
2486 }
2487 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2488
2489 for (int level = 0; level < num_levels() - 1; level++) {
2490 for (FileMetaData* f : files_[level]) {
2491 if (!f->being_compacted) {
2492 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2493 if (oldest_ancester_time > 0 &&
2494 oldest_ancester_time < (current_time - ttl)) {
2495 expired_ttl_files_.emplace_back(level, f);
2496 }
2497 }
2498 }
2499 }
2500 }
2501
ComputeFilesMarkedForPeriodicCompaction(const ImmutableCFOptions & ioptions,const uint64_t periodic_compaction_seconds)2502 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2503 const ImmutableCFOptions& ioptions,
2504 const uint64_t periodic_compaction_seconds) {
2505 assert(periodic_compaction_seconds > 0);
2506
2507 files_marked_for_periodic_compaction_.clear();
2508
2509 int64_t temp_current_time;
2510 auto status = ioptions.env->GetCurrentTime(&temp_current_time);
2511 if (!status.ok()) {
2512 return;
2513 }
2514 const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2515
2516 // If periodic_compaction_seconds is larger than current time, periodic
2517 // compaction can't possibly be triggered.
2518 if (periodic_compaction_seconds > current_time) {
2519 return;
2520 }
2521
2522 const uint64_t allowed_time_limit =
2523 current_time - periodic_compaction_seconds;
2524
2525 for (int level = 0; level < num_levels(); level++) {
2526 for (auto f : files_[level]) {
2527 if (!f->being_compacted) {
2528 // Compute a file's modification time in the following order:
2529 // 1. Use file_creation_time table property if it is > 0.
2530 // 2. Use creation_time table property if it is > 0.
2531 // 3. Use file's mtime metadata if the above two table properties are 0.
2532 // Don't consider the file at all if the modification time cannot be
2533 // correctly determined based on the above conditions.
2534 uint64_t file_modification_time = f->TryGetFileCreationTime();
2535 if (file_modification_time == kUnknownFileCreationTime) {
2536 file_modification_time = f->TryGetOldestAncesterTime();
2537 }
2538 if (file_modification_time == kUnknownOldestAncesterTime) {
2539 auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2540 f->fd.GetPathId());
2541 status = ioptions.env->GetFileModificationTime(
2542 file_path, &file_modification_time);
2543 if (!status.ok()) {
2544 ROCKS_LOG_WARN(ioptions.info_log,
2545 "Can't get file modification time: %s: %s",
2546 file_path.c_str(), status.ToString().c_str());
2547 continue;
2548 }
2549 }
2550 if (file_modification_time > 0 &&
2551 file_modification_time < allowed_time_limit) {
2552 files_marked_for_periodic_compaction_.emplace_back(level, f);
2553 }
2554 }
2555 }
2556 }
2557 }
2558
2559 namespace {
2560
2561 // used to sort files by size
2562 struct Fsize {
2563 size_t index;
2564 FileMetaData* file;
2565 };
2566
2567 // Compator that is used to sort files based on their size
2568 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)2569 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2570 return (first.file->compensated_file_size >
2571 second.file->compensated_file_size);
2572 }
2573 } // anonymous namespace
2574
AddFile(int level,FileMetaData * f,Logger * info_log)2575 void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
2576 auto* level_files = &files_[level];
2577 // Must not overlap
2578 #ifndef NDEBUG
2579 if (level > 0 && !level_files->empty() &&
2580 internal_comparator_->Compare(
2581 (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
2582 auto* f2 = (*level_files)[level_files->size() - 1];
2583 if (info_log != nullptr) {
2584 Error(info_log, "Adding new file %" PRIu64
2585 " range (%s, %s) to level %d but overlapping "
2586 "with existing file %" PRIu64 " %s %s",
2587 f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
2588 f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
2589 f2->smallest.DebugString(true).c_str(),
2590 f2->largest.DebugString(true).c_str());
2591 LogFlush(info_log);
2592 }
2593 assert(false);
2594 }
2595 #else
2596 (void)info_log;
2597 #endif
2598 f->refs++;
2599 level_files->push_back(f);
2600 }
2601
AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta)2602 void VersionStorageInfo::AddBlobFile(
2603 std::shared_ptr<BlobFileMetaData> blob_file_meta) {
2604 assert(blob_file_meta);
2605
2606 const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
2607
2608 auto it = blob_files_.lower_bound(blob_file_number);
2609 assert(it == blob_files_.end() || it->first != blob_file_number);
2610
2611 blob_files_.insert(
2612 it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
2613 }
2614
2615 // Version::PrepareApply() need to be called before calling the function, or
2616 // following functions called:
2617 // 1. UpdateNumNonEmptyLevels();
2618 // 2. CalculateBaseBytes();
2619 // 3. UpdateFilesByCompactionPri();
2620 // 4. GenerateFileIndexer();
2621 // 5. GenerateLevelFilesBrief();
2622 // 6. GenerateLevel0NonOverlapping();
2623 // 7. GenerateBottommostFiles();
SetFinalized()2624 void VersionStorageInfo::SetFinalized() {
2625 finalized_ = true;
2626 #ifndef NDEBUG
2627 if (compaction_style_ != kCompactionStyleLevel) {
2628 // Not level based compaction.
2629 return;
2630 }
2631 assert(base_level_ < 0 || num_levels() == 1 ||
2632 (base_level_ >= 1 && base_level_ < num_levels()));
2633 // Verify all levels newer than base_level are empty except L0
2634 for (int level = 1; level < base_level(); level++) {
2635 assert(NumLevelBytes(level) == 0);
2636 }
2637 uint64_t max_bytes_prev_level = 0;
2638 for (int level = base_level(); level < num_levels() - 1; level++) {
2639 if (LevelFiles(level).size() == 0) {
2640 continue;
2641 }
2642 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2643 max_bytes_prev_level = MaxBytesForLevel(level);
2644 }
2645 int num_empty_non_l0_level = 0;
2646 for (int level = 0; level < num_levels(); level++) {
2647 assert(LevelFiles(level).size() == 0 ||
2648 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2649 if (level > 0 && NumLevelBytes(level) > 0) {
2650 num_empty_non_l0_level++;
2651 }
2652 if (LevelFiles(level).size() > 0) {
2653 assert(level < num_non_empty_levels());
2654 }
2655 }
2656 assert(compaction_level_.size() > 0);
2657 assert(compaction_level_.size() == compaction_score_.size());
2658 #endif
2659 }
2660
UpdateNumNonEmptyLevels()2661 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2662 num_non_empty_levels_ = num_levels_;
2663 for (int i = num_levels_ - 1; i >= 0; i--) {
2664 if (files_[i].size() != 0) {
2665 return;
2666 } else {
2667 num_non_empty_levels_ = i;
2668 }
2669 }
2670 }
2671
2672 namespace {
2673 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)2674 void SortFileByOverlappingRatio(
2675 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2676 const std::vector<FileMetaData*>& next_level_files,
2677 std::vector<Fsize>* temp) {
2678 std::unordered_map<uint64_t, uint64_t> file_to_order;
2679 auto next_level_it = next_level_files.begin();
2680
2681 for (auto& file : files) {
2682 uint64_t overlapping_bytes = 0;
2683 // Skip files in next level that is smaller than current file
2684 while (next_level_it != next_level_files.end() &&
2685 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2686 next_level_it++;
2687 }
2688
2689 while (next_level_it != next_level_files.end() &&
2690 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2691 overlapping_bytes += (*next_level_it)->fd.file_size;
2692
2693 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2694 // next level file cross large boundary of current file.
2695 break;
2696 }
2697 next_level_it++;
2698 }
2699
2700 assert(file->compensated_file_size != 0);
2701 file_to_order[file->fd.GetNumber()] =
2702 overlapping_bytes * 1024u / file->compensated_file_size;
2703 }
2704
2705 std::sort(temp->begin(), temp->end(),
2706 [&](const Fsize& f1, const Fsize& f2) -> bool {
2707 return file_to_order[f1.file->fd.GetNumber()] <
2708 file_to_order[f2.file->fd.GetNumber()];
2709 });
2710 }
2711 } // namespace
2712
UpdateFilesByCompactionPri(CompactionPri compaction_pri)2713 void VersionStorageInfo::UpdateFilesByCompactionPri(
2714 CompactionPri compaction_pri) {
2715 if (compaction_style_ == kCompactionStyleNone ||
2716 compaction_style_ == kCompactionStyleFIFO ||
2717 compaction_style_ == kCompactionStyleUniversal) {
2718 // don't need this
2719 return;
2720 }
2721 // No need to sort the highest level because it is never compacted.
2722 for (int level = 0; level < num_levels() - 1; level++) {
2723 const std::vector<FileMetaData*>& files = files_[level];
2724 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2725 assert(files_by_compaction_pri.size() == 0);
2726
2727 // populate a temp vector for sorting based on size
2728 std::vector<Fsize> temp(files.size());
2729 for (size_t i = 0; i < files.size(); i++) {
2730 temp[i].index = i;
2731 temp[i].file = files[i];
2732 }
2733
2734 // sort the top number_of_files_to_sort_ based on file size
2735 size_t num = VersionStorageInfo::kNumberFilesToSort;
2736 if (num > temp.size()) {
2737 num = temp.size();
2738 }
2739 switch (compaction_pri) {
2740 case kByCompensatedSize:
2741 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2742 CompareCompensatedSizeDescending);
2743 break;
2744 case kOldestLargestSeqFirst:
2745 std::sort(temp.begin(), temp.end(),
2746 [](const Fsize& f1, const Fsize& f2) -> bool {
2747 return f1.file->fd.largest_seqno <
2748 f2.file->fd.largest_seqno;
2749 });
2750 break;
2751 case kOldestSmallestSeqFirst:
2752 std::sort(temp.begin(), temp.end(),
2753 [](const Fsize& f1, const Fsize& f2) -> bool {
2754 return f1.file->fd.smallest_seqno <
2755 f2.file->fd.smallest_seqno;
2756 });
2757 break;
2758 case kMinOverlappingRatio:
2759 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2760 files_[level + 1], &temp);
2761 break;
2762 default:
2763 assert(false);
2764 }
2765 assert(temp.size() == files.size());
2766
2767 // initialize files_by_compaction_pri_
2768 for (size_t i = 0; i < temp.size(); i++) {
2769 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2770 }
2771 next_file_to_compact_by_size_[level] = 0;
2772 assert(files_[level].size() == files_by_compaction_pri_[level].size());
2773 }
2774 }
2775
GenerateLevel0NonOverlapping()2776 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2777 assert(!finalized_);
2778 level0_non_overlapping_ = true;
2779 if (level_files_brief_.size() == 0) {
2780 return;
2781 }
2782
2783 // A copy of L0 files sorted by smallest key
2784 std::vector<FdWithKeyRange> level0_sorted_file(
2785 level_files_brief_[0].files,
2786 level_files_brief_[0].files + level_files_brief_[0].num_files);
2787 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2788 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2789 return (internal_comparator_->Compare(f1.smallest_key,
2790 f2.smallest_key) < 0);
2791 });
2792
2793 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
2794 FdWithKeyRange& f = level0_sorted_file[i];
2795 FdWithKeyRange& prev = level0_sorted_file[i - 1];
2796 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
2797 level0_non_overlapping_ = false;
2798 break;
2799 }
2800 }
2801 }
2802
GenerateBottommostFiles()2803 void VersionStorageInfo::GenerateBottommostFiles() {
2804 assert(!finalized_);
2805 assert(bottommost_files_.empty());
2806 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
2807 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
2808 ++file_idx) {
2809 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
2810 int l0_file_idx;
2811 if (level == 0) {
2812 l0_file_idx = static_cast<int>(file_idx);
2813 } else {
2814 l0_file_idx = -1;
2815 }
2816 Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2817 Slice largest_user_key = ExtractUserKey(f.largest_key);
2818 if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2819 static_cast<int>(level),
2820 l0_file_idx)) {
2821 bottommost_files_.emplace_back(static_cast<int>(level),
2822 f.file_metadata);
2823 }
2824 }
2825 }
2826 }
2827
UpdateOldestSnapshot(SequenceNumber seqnum)2828 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
2829 assert(seqnum >= oldest_snapshot_seqnum_);
2830 oldest_snapshot_seqnum_ = seqnum;
2831 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
2832 ComputeBottommostFilesMarkedForCompaction();
2833 }
2834 }
2835
ComputeBottommostFilesMarkedForCompaction()2836 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2837 bottommost_files_marked_for_compaction_.clear();
2838 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2839 for (auto& level_and_file : bottommost_files_) {
2840 if (!level_and_file.second->being_compacted &&
2841 level_and_file.second->fd.largest_seqno != 0 &&
2842 level_and_file.second->num_deletions > 1) {
2843 // largest_seqno might be nonzero due to containing the final key in an
2844 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2845 // ensures the file really contains deleted or overwritten keys.
2846 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2847 bottommost_files_marked_for_compaction_.push_back(level_and_file);
2848 } else {
2849 bottommost_files_mark_threshold_ =
2850 std::min(bottommost_files_mark_threshold_,
2851 level_and_file.second->fd.largest_seqno);
2852 }
2853 }
2854 }
2855 }
2856
Ref()2857 void Version::Ref() {
2858 ++refs_;
2859 }
2860
Unref()2861 bool Version::Unref() {
2862 assert(refs_ >= 1);
2863 --refs_;
2864 if (refs_ == 0) {
2865 delete this;
2866 return true;
2867 }
2868 return false;
2869 }
2870
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)2871 bool VersionStorageInfo::OverlapInLevel(int level,
2872 const Slice* smallest_user_key,
2873 const Slice* largest_user_key) {
2874 if (level >= num_non_empty_levels_) {
2875 // empty level, no overlap
2876 return false;
2877 }
2878 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2879 level_files_brief_[level], smallest_user_key,
2880 largest_user_key);
2881 }
2882
2883 // Store in "*inputs" all files in "level" that overlap [begin,end]
2884 // If hint_index is specified, then it points to a file in the
2885 // overlapping range.
2886 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const2887 void VersionStorageInfo::GetOverlappingInputs(
2888 int level, const InternalKey* begin, const InternalKey* end,
2889 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2890 bool expand_range, InternalKey** next_smallest) const {
2891 if (level >= num_non_empty_levels_) {
2892 // this level is empty, no overlapping inputs
2893 return;
2894 }
2895
2896 inputs->clear();
2897 if (file_index) {
2898 *file_index = -1;
2899 }
2900 const Comparator* user_cmp = user_comparator_;
2901 if (level > 0) {
2902 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2903 file_index, false, next_smallest);
2904 return;
2905 }
2906
2907 if (next_smallest) {
2908 // next_smallest key only makes sense for non-level 0, where files are
2909 // non-overlapping
2910 *next_smallest = nullptr;
2911 }
2912
2913 Slice user_begin, user_end;
2914 if (begin != nullptr) {
2915 user_begin = begin->user_key();
2916 }
2917 if (end != nullptr) {
2918 user_end = end->user_key();
2919 }
2920
2921 // index stores the file index need to check.
2922 std::list<size_t> index;
2923 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2924 index.emplace_back(i);
2925 }
2926
2927 while (!index.empty()) {
2928 bool found_overlapping_file = false;
2929 auto iter = index.begin();
2930 while (iter != index.end()) {
2931 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2932 const Slice file_start = ExtractUserKey(f->smallest_key);
2933 const Slice file_limit = ExtractUserKey(f->largest_key);
2934 if (begin != nullptr &&
2935 user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
2936 // "f" is completely before specified range; skip it
2937 iter++;
2938 } else if (end != nullptr &&
2939 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
2940 // "f" is completely after specified range; skip it
2941 iter++;
2942 } else {
2943 // if overlap
2944 inputs->emplace_back(files_[level][*iter]);
2945 found_overlapping_file = true;
2946 // record the first file index.
2947 if (file_index && *file_index == -1) {
2948 *file_index = static_cast<int>(*iter);
2949 }
2950 // the related file is overlap, erase to avoid checking again.
2951 iter = index.erase(iter);
2952 if (expand_range) {
2953 if (begin != nullptr &&
2954 user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
2955 user_begin = file_start;
2956 }
2957 if (end != nullptr &&
2958 user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
2959 user_end = file_limit;
2960 }
2961 }
2962 }
2963 }
2964 // if all the files left are not overlap, break
2965 if (!found_overlapping_file) {
2966 break;
2967 }
2968 }
2969 }
2970
2971 // Store in "*inputs" files in "level" that within range [begin,end]
2972 // Guarantee a "clean cut" boundary between the files in inputs
2973 // and the surrounding files and the maxinum number of files.
2974 // This will ensure that no parts of a key are lost during compaction.
2975 // If hint_index is specified, then it points to a file in the range.
2976 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const2977 void VersionStorageInfo::GetCleanInputsWithinInterval(
2978 int level, const InternalKey* begin, const InternalKey* end,
2979 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2980 inputs->clear();
2981 if (file_index) {
2982 *file_index = -1;
2983 }
2984 if (level >= num_non_empty_levels_ || level == 0 ||
2985 level_files_brief_[level].num_files == 0) {
2986 // this level is empty, no inputs within range
2987 // also don't support clean input interval within L0
2988 return;
2989 }
2990
2991 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
2992 hint_index, file_index,
2993 true /* within_interval */);
2994 }
2995
2996 // Store in "*inputs" all files in "level" that overlap [begin,end]
2997 // Employ binary search to find at least one file that overlaps the
2998 // specified range. From that file, iterate backwards and
2999 // forwards to find all overlapping files.
3000 // if within_range is set, then only store the maximum clean inputs
3001 // within range [begin, end]. "clean" means there is a boudnary
3002 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3003 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3004 int level, const InternalKey* begin, const InternalKey* end,
3005 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3006 bool within_interval, InternalKey** next_smallest) const {
3007 assert(level > 0);
3008
3009 auto user_cmp = user_comparator_;
3010 const FdWithKeyRange* files = level_files_brief_[level].files;
3011 const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3012
3013 // begin to use binary search to find lower bound
3014 // and upper bound.
3015 int start_index = 0;
3016 int end_index = num_files;
3017
3018 if (begin != nullptr) {
3019 // if within_interval is true, with file_key would find
3020 // not overlapping ranges in std::lower_bound.
3021 auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3022 const InternalKey* k) {
3023 auto& file_key = within_interval ? f.file_metadata->smallest
3024 : f.file_metadata->largest;
3025 return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3026 };
3027
3028 start_index = static_cast<int>(
3029 std::lower_bound(files,
3030 files + (hint_index == -1 ? num_files : hint_index),
3031 begin, cmp) -
3032 files);
3033
3034 if (start_index > 0 && within_interval) {
3035 bool is_overlapping = true;
3036 while (is_overlapping && start_index < num_files) {
3037 auto& pre_limit = files[start_index - 1].file_metadata->largest;
3038 auto& cur_start = files[start_index].file_metadata->smallest;
3039 is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3040 start_index += is_overlapping;
3041 }
3042 }
3043 }
3044
3045 if (end != nullptr) {
3046 // if within_interval is true, with file_key would find
3047 // not overlapping ranges in std::upper_bound.
3048 auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3049 const FdWithKeyRange& f) {
3050 auto& file_key = within_interval ? f.file_metadata->largest
3051 : f.file_metadata->smallest;
3052 return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3053 };
3054
3055 end_index = static_cast<int>(
3056 std::upper_bound(files + start_index, files + num_files, end, cmp) -
3057 files);
3058
3059 if (end_index < num_files && within_interval) {
3060 bool is_overlapping = true;
3061 while (is_overlapping && end_index > start_index) {
3062 auto& next_start = files[end_index].file_metadata->smallest;
3063 auto& cur_limit = files[end_index - 1].file_metadata->largest;
3064 is_overlapping =
3065 sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3066 end_index -= is_overlapping;
3067 }
3068 }
3069 }
3070
3071 assert(start_index <= end_index);
3072
3073 // If there were no overlapping files, return immediately.
3074 if (start_index == end_index) {
3075 if (next_smallest) {
3076 *next_smallest = nullptr;
3077 }
3078 return;
3079 }
3080
3081 assert(start_index < end_index);
3082
3083 // returns the index where an overlap is found
3084 if (file_index) {
3085 *file_index = start_index;
3086 }
3087
3088 // insert overlapping files into vector
3089 for (int i = start_index; i < end_index; i++) {
3090 inputs->push_back(files_[level][i]);
3091 }
3092
3093 if (next_smallest != nullptr) {
3094 // Provide the next key outside the range covered by inputs
3095 if (end_index < static_cast<int>(files_[level].size())) {
3096 **next_smallest = files_[level][end_index]->smallest;
3097 } else {
3098 *next_smallest = nullptr;
3099 }
3100 }
3101 }
3102
NumLevelBytes(int level) const3103 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3104 assert(level >= 0);
3105 assert(level < num_levels());
3106 return TotalFileSize(files_[level]);
3107 }
3108
LevelSummary(LevelSummaryStorage * scratch) const3109 const char* VersionStorageInfo::LevelSummary(
3110 LevelSummaryStorage* scratch) const {
3111 int len = 0;
3112 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3113 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3114 if (level_multiplier_ != 0.0) {
3115 len = snprintf(
3116 scratch->buffer, sizeof(scratch->buffer),
3117 "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3118 base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3119 }
3120 }
3121 len +=
3122 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3123 for (int i = 0; i < num_levels(); i++) {
3124 int sz = sizeof(scratch->buffer) - len;
3125 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3126 if (ret < 0 || ret >= sz) break;
3127 len += ret;
3128 }
3129 if (len > 0) {
3130 // overwrite the last space
3131 --len;
3132 }
3133 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3134 "] max score %.2f", compaction_score_[0]);
3135
3136 if (!files_marked_for_compaction_.empty()) {
3137 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3138 " (%" ROCKSDB_PRIszt " files need compaction)",
3139 files_marked_for_compaction_.size());
3140 }
3141
3142 return scratch->buffer;
3143 }
3144
LevelFileSummary(FileSummaryStorage * scratch,int level) const3145 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3146 int level) const {
3147 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3148 for (const auto& f : files_[level]) {
3149 int sz = sizeof(scratch->buffer) - len;
3150 char sztxt[16];
3151 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3152 int ret = snprintf(scratch->buffer + len, sz,
3153 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3154 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3155 static_cast<int>(f->being_compacted));
3156 if (ret < 0 || ret >= sz)
3157 break;
3158 len += ret;
3159 }
3160 // overwrite the last space (only if files_[level].size() is non-zero)
3161 if (files_[level].size() && len > 0) {
3162 --len;
3163 }
3164 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3165 return scratch->buffer;
3166 }
3167
MaxNextLevelOverlappingBytes()3168 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3169 uint64_t result = 0;
3170 std::vector<FileMetaData*> overlaps;
3171 for (int level = 1; level < num_levels() - 1; level++) {
3172 for (const auto& f : files_[level]) {
3173 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3174 const uint64_t sum = TotalFileSize(overlaps);
3175 if (sum > result) {
3176 result = sum;
3177 }
3178 }
3179 }
3180 return result;
3181 }
3182
MaxBytesForLevel(int level) const3183 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3184 // Note: the result for level zero is not really used since we set
3185 // the level-0 compaction threshold based on number of files.
3186 assert(level >= 0);
3187 assert(level < static_cast<int>(level_max_bytes_.size()));
3188 return level_max_bytes_[level];
3189 }
3190
CalculateBaseBytes(const ImmutableCFOptions & ioptions,const MutableCFOptions & options)3191 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
3192 const MutableCFOptions& options) {
3193 // Special logic to set number of sorted runs.
3194 // It is to match the previous behavior when all files are in L0.
3195 int num_l0_count = static_cast<int>(files_[0].size());
3196 if (compaction_style_ == kCompactionStyleUniversal) {
3197 // For universal compaction, we use level0 score to indicate
3198 // compaction score for the whole DB. Adding other levels as if
3199 // they are L0 files.
3200 for (int i = 1; i < num_levels(); i++) {
3201 if (!files_[i].empty()) {
3202 num_l0_count++;
3203 }
3204 }
3205 }
3206 set_l0_delay_trigger_count(num_l0_count);
3207
3208 level_max_bytes_.resize(ioptions.num_levels);
3209 if (!ioptions.level_compaction_dynamic_level_bytes) {
3210 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3211
3212 // Calculate for static bytes base case
3213 for (int i = 0; i < ioptions.num_levels; ++i) {
3214 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3215 level_max_bytes_[i] = options.max_bytes_for_level_base;
3216 } else if (i > 1) {
3217 level_max_bytes_[i] = MultiplyCheckOverflow(
3218 MultiplyCheckOverflow(level_max_bytes_[i - 1],
3219 options.max_bytes_for_level_multiplier),
3220 options.MaxBytesMultiplerAdditional(i - 1));
3221 } else {
3222 level_max_bytes_[i] = options.max_bytes_for_level_base;
3223 }
3224 }
3225 } else {
3226 uint64_t max_level_size = 0;
3227
3228 int first_non_empty_level = -1;
3229 // Find size of non-L0 level of most data.
3230 // Cannot use the size of the last level because it can be empty or less
3231 // than previous levels after compaction.
3232 for (int i = 1; i < num_levels_; i++) {
3233 uint64_t total_size = 0;
3234 for (const auto& f : files_[i]) {
3235 total_size += f->fd.GetFileSize();
3236 }
3237 if (total_size > 0 && first_non_empty_level == -1) {
3238 first_non_empty_level = i;
3239 }
3240 if (total_size > max_level_size) {
3241 max_level_size = total_size;
3242 }
3243 }
3244
3245 // Prefill every level's max bytes to disallow compaction from there.
3246 for (int i = 0; i < num_levels_; i++) {
3247 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3248 }
3249
3250 if (max_level_size == 0) {
3251 // No data for L1 and up. L0 compacts to last level directly.
3252 // No compaction from L1+ needs to be scheduled.
3253 base_level_ = num_levels_ - 1;
3254 } else {
3255 uint64_t l0_size = 0;
3256 for (const auto& f : files_[0]) {
3257 l0_size += f->fd.GetFileSize();
3258 }
3259
3260 uint64_t base_bytes_max =
3261 std::max(options.max_bytes_for_level_base, l0_size);
3262 uint64_t base_bytes_min = static_cast<uint64_t>(
3263 base_bytes_max / options.max_bytes_for_level_multiplier);
3264
3265 // Try whether we can make last level's target size to be max_level_size
3266 uint64_t cur_level_size = max_level_size;
3267 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3268 // Round up after dividing
3269 cur_level_size = static_cast<uint64_t>(
3270 cur_level_size / options.max_bytes_for_level_multiplier);
3271 }
3272
3273 // Calculate base level and its size.
3274 uint64_t base_level_size;
3275 if (cur_level_size <= base_bytes_min) {
3276 // Case 1. If we make target size of last level to be max_level_size,
3277 // target size of the first non-empty level would be smaller than
3278 // base_bytes_min. We set it be base_bytes_min.
3279 base_level_size = base_bytes_min + 1U;
3280 base_level_ = first_non_empty_level;
3281 ROCKS_LOG_INFO(ioptions.info_log,
3282 "More existing levels in DB than needed. "
3283 "max_bytes_for_level_multiplier may not be guaranteed.");
3284 } else {
3285 // Find base level (where L0 data is compacted to).
3286 base_level_ = first_non_empty_level;
3287 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3288 --base_level_;
3289 cur_level_size = static_cast<uint64_t>(
3290 cur_level_size / options.max_bytes_for_level_multiplier);
3291 }
3292 if (cur_level_size > base_bytes_max) {
3293 // Even L1 will be too large
3294 assert(base_level_ == 1);
3295 base_level_size = base_bytes_max;
3296 } else {
3297 base_level_size = cur_level_size;
3298 }
3299 }
3300
3301 level_multiplier_ = options.max_bytes_for_level_multiplier;
3302 assert(base_level_size > 0);
3303 if (l0_size > base_level_size &&
3304 (l0_size > options.max_bytes_for_level_base ||
3305 static_cast<int>(files_[0].size() / 2) >=
3306 options.level0_file_num_compaction_trigger)) {
3307 // We adjust the base level according to actual L0 size, and adjust
3308 // the level multiplier accordingly, when:
3309 // 1. the L0 size is larger than level size base, or
3310 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
3311 // We don't do this otherwise to keep the LSM-tree structure stable
3312 // unless the L0 compation is backlogged.
3313 base_level_size = l0_size;
3314 if (base_level_ == num_levels_ - 1) {
3315 level_multiplier_ = 1.0;
3316 } else {
3317 level_multiplier_ = std::pow(
3318 static_cast<double>(max_level_size) /
3319 static_cast<double>(base_level_size),
3320 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3321 }
3322 }
3323
3324 uint64_t level_size = base_level_size;
3325 for (int i = base_level_; i < num_levels_; i++) {
3326 if (i > base_level_) {
3327 level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3328 }
3329 // Don't set any level below base_bytes_max. Otherwise, the LSM can
3330 // assume an hourglass shape where L1+ sizes are smaller than L0. This
3331 // causes compaction scoring, which depends on level sizes, to favor L1+
3332 // at the expense of L0, which may fill up and stall.
3333 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3334 }
3335 }
3336 }
3337 }
3338
EstimateLiveDataSize() const3339 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3340 // Estimate the live data size by adding up the size of the last level for all
3341 // key ranges. Note: Estimate depends on the ordering of files in level 0
3342 // because files in level 0 can be overlapping.
3343 uint64_t size = 0;
3344
3345 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3346 return internal_comparator_->Compare(*x, *y) < 0;
3347 };
3348 // (Ordered) map of largest keys in non-overlapping files
3349 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3350
3351 for (int l = num_levels_ - 1; l >= 0; l--) {
3352 bool found_end = false;
3353 for (auto file : files_[l]) {
3354 // Find the first file where the largest key is larger than the smallest
3355 // key of the current file. If this file does not overlap with the
3356 // current file, none of the files in the map does. If there is
3357 // no potential overlap, we can safely insert the rest of this level
3358 // (if the level is not 0) into the map without checking again because
3359 // the elements in the level are sorted and non-overlapping.
3360 auto lb = (found_end && l != 0) ?
3361 ranges.end() : ranges.lower_bound(&file->smallest);
3362 found_end = (lb == ranges.end());
3363 if (found_end || internal_comparator_->Compare(
3364 file->largest, (*lb).second->smallest) < 0) {
3365 ranges.emplace_hint(lb, &file->largest, file);
3366 size += file->fd.file_size;
3367 }
3368 }
3369 }
3370 return size;
3371 }
3372
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3373 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3374 const Slice& smallest_user_key, const Slice& largest_user_key,
3375 int last_level, int last_l0_idx) {
3376 assert((last_l0_idx != -1) == (last_level == 0));
3377 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3378 // bottommost only if it's the oldest L0 file and there are no files on older
3379 // levels. It'd be better to consider it bottommost if there's no overlap in
3380 // older levels/files.
3381 if (last_level == 0 &&
3382 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3383 return true;
3384 }
3385
3386 // Checks whether there are files living beyond the `last_level`. If lower
3387 // levels have files, it checks for overlap between [`smallest_key`,
3388 // `largest_key`] and those files. Bottomlevel optimizations can be made if
3389 // there are no files in lower levels or if there is no overlap with the files
3390 // in the lower levels.
3391 for (int level = last_level + 1; level < num_levels(); level++) {
3392 // The range is not in the bottommost level if there are files in lower
3393 // levels when the `last_level` is 0 or if there are files in lower levels
3394 // which overlap with [`smallest_key`, `largest_key`].
3395 if (files_[level].size() > 0 &&
3396 (last_level == 0 ||
3397 OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3398 return true;
3399 }
3400 }
3401 return false;
3402 }
3403
AddLiveFiles(std::vector<FileDescriptor> * live)3404 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
3405 for (int level = 0; level < storage_info_.num_levels(); level++) {
3406 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3407 for (const auto& file : files) {
3408 live->push_back(file->fd);
3409 }
3410 }
3411 }
3412
DebugString(bool hex,bool print_stats) const3413 std::string Version::DebugString(bool hex, bool print_stats) const {
3414 std::string r;
3415 for (int level = 0; level < storage_info_.num_levels_; level++) {
3416 // E.g.,
3417 // --- level 1 ---
3418 // 17:123[1 .. 124]['a' .. 'd']
3419 // 20:43[124 .. 128]['e' .. 'g']
3420 //
3421 // if print_stats=true:
3422 // 17:123[1 .. 124]['a' .. 'd'](4096)
3423 r.append("--- level ");
3424 AppendNumberTo(&r, level);
3425 r.append(" --- version# ");
3426 AppendNumberTo(&r, version_number_);
3427 r.append(" ---\n");
3428 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3429 for (size_t i = 0; i < files.size(); i++) {
3430 r.push_back(' ');
3431 AppendNumberTo(&r, files[i]->fd.GetNumber());
3432 r.push_back(':');
3433 AppendNumberTo(&r, files[i]->fd.GetFileSize());
3434 r.append("[");
3435 AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3436 r.append(" .. ");
3437 AppendNumberTo(&r, files[i]->fd.largest_seqno);
3438 r.append("]");
3439 r.append("[");
3440 r.append(files[i]->smallest.DebugString(hex));
3441 r.append(" .. ");
3442 r.append(files[i]->largest.DebugString(hex));
3443 r.append("]");
3444 if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3445 r.append(" blob_file:");
3446 AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3447 }
3448 if (print_stats) {
3449 r.append("(");
3450 r.append(ToString(
3451 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3452 r.append(")");
3453 }
3454 r.append("\n");
3455 }
3456 }
3457
3458 const auto& blob_files = storage_info_.GetBlobFiles();
3459 if (!blob_files.empty()) {
3460 r.append("--- blob files --- version# ");
3461 AppendNumberTo(&r, version_number_);
3462 r.append(" ---\n");
3463 for (const auto& pair : blob_files) {
3464 const auto& blob_file_meta = pair.second;
3465 assert(blob_file_meta);
3466
3467 r.append(blob_file_meta->DebugString());
3468 r.push_back('\n');
3469 }
3470 }
3471
3472 return r;
3473 }
3474
3475 // this is used to batch writes to the manifest file
3476 struct VersionSet::ManifestWriter {
3477 Status status;
3478 bool done;
3479 InstrumentedCondVar cv;
3480 ColumnFamilyData* cfd;
3481 const MutableCFOptions mutable_cf_options;
3482 const autovector<VersionEdit*>& edit_list;
3483
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3484 explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3485 const MutableCFOptions& cf_options,
3486 const autovector<VersionEdit*>& e)
3487 : done(false),
3488 cv(mu),
3489 cfd(_cfd),
3490 mutable_cf_options(cf_options),
3491 edit_list(e) {}
3492 };
3493
AddEdit(VersionEdit * edit)3494 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3495 assert(edit);
3496 if (edit->is_in_atomic_group_) {
3497 TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3498 if (replay_buffer_.empty()) {
3499 replay_buffer_.resize(edit->remaining_entries_ + 1);
3500 TEST_SYNC_POINT_CALLBACK(
3501 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3502 }
3503 read_edits_in_atomic_group_++;
3504 if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3505 static_cast<uint32_t>(replay_buffer_.size())) {
3506 TEST_SYNC_POINT_CALLBACK(
3507 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3508 return Status::Corruption("corrupted atomic group");
3509 }
3510 replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3511 if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3512 TEST_SYNC_POINT_CALLBACK(
3513 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3514 return Status::OK();
3515 }
3516 return Status::OK();
3517 }
3518
3519 // A normal edit.
3520 if (!replay_buffer().empty()) {
3521 TEST_SYNC_POINT_CALLBACK(
3522 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3523 return Status::Corruption("corrupted atomic group");
3524 }
3525 return Status::OK();
3526 }
3527
IsFull() const3528 bool AtomicGroupReadBuffer::IsFull() const {
3529 return read_edits_in_atomic_group_ == replay_buffer_.size();
3530 }
3531
IsEmpty() const3532 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3533
Clear()3534 void AtomicGroupReadBuffer::Clear() {
3535 read_edits_in_atomic_group_ = 0;
3536 replay_buffer_.clear();
3537 }
3538
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)3539 VersionSet::VersionSet(const std::string& dbname,
3540 const ImmutableDBOptions* _db_options,
3541 const FileOptions& storage_options, Cache* table_cache,
3542 WriteBufferManager* write_buffer_manager,
3543 WriteController* write_controller,
3544 BlockCacheTracer* const block_cache_tracer)
3545 : column_family_set_(new ColumnFamilySet(
3546 dbname, _db_options, storage_options, table_cache,
3547 write_buffer_manager, write_controller, block_cache_tracer)),
3548 env_(_db_options->env),
3549 fs_(_db_options->fs.get()),
3550 dbname_(dbname),
3551 db_options_(_db_options),
3552 next_file_number_(2),
3553 manifest_file_number_(0), // Filled by Recover()
3554 options_file_number_(0),
3555 pending_manifest_file_number_(0),
3556 last_sequence_(0),
3557 last_allocated_sequence_(0),
3558 last_published_sequence_(0),
3559 prev_log_number_(0),
3560 current_version_number_(0),
3561 manifest_file_size_(0),
3562 file_options_(storage_options),
3563 block_cache_tracer_(block_cache_tracer) {}
3564
~VersionSet()3565 VersionSet::~VersionSet() {
3566 // we need to delete column_family_set_ because its destructor depends on
3567 // VersionSet
3568 Cache* table_cache = column_family_set_->get_table_cache();
3569 column_family_set_.reset();
3570 for (auto& file : obsolete_files_) {
3571 if (file.metadata->table_reader_handle) {
3572 table_cache->Release(file.metadata->table_reader_handle);
3573 TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
3574 }
3575 file.DeleteMetadata();
3576 }
3577 obsolete_files_.clear();
3578 }
3579
Reset()3580 void VersionSet::Reset() {
3581 if (column_family_set_) {
3582 Cache* table_cache = column_family_set_->get_table_cache();
3583 WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
3584 WriteController* wc = column_family_set_->write_controller();
3585 column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_,
3586 file_options_, table_cache,
3587 wbm, wc, block_cache_tracer_));
3588 }
3589 db_id_.clear();
3590 next_file_number_.store(2);
3591 min_log_number_to_keep_2pc_.store(0);
3592 manifest_file_number_ = 0;
3593 options_file_number_ = 0;
3594 pending_manifest_file_number_ = 0;
3595 last_sequence_.store(0);
3596 last_allocated_sequence_.store(0);
3597 last_published_sequence_.store(0);
3598 prev_log_number_ = 0;
3599 descriptor_log_.reset();
3600 current_version_number_ = 0;
3601 manifest_writers_.clear();
3602 manifest_file_size_ = 0;
3603 obsolete_files_.clear();
3604 obsolete_manifests_.clear();
3605 }
3606
AppendVersion(ColumnFamilyData * column_family_data,Version * v)3607 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3608 Version* v) {
3609 // compute new compaction score
3610 v->storage_info()->ComputeCompactionScore(
3611 *column_family_data->ioptions(),
3612 *column_family_data->GetLatestMutableCFOptions());
3613
3614 // Mark v finalized
3615 v->storage_info_.SetFinalized();
3616
3617 // Make "v" current
3618 assert(v->refs_ == 0);
3619 Version* current = column_family_data->current();
3620 assert(v != current);
3621 if (current != nullptr) {
3622 assert(current->refs_ > 0);
3623 current->Unref();
3624 }
3625 column_family_data->SetCurrent(v);
3626 v->Ref();
3627
3628 // Append to linked list
3629 v->prev_ = column_family_data->dummy_versions()->prev_;
3630 v->next_ = column_family_data->dummy_versions();
3631 v->prev_->next_ = v;
3632 v->next_->prev_ = v;
3633 }
3634
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)3635 Status VersionSet::ProcessManifestWrites(
3636 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3637 FSDirectory* db_directory, bool new_descriptor_log,
3638 const ColumnFamilyOptions* new_cf_options) {
3639 assert(!writers.empty());
3640 ManifestWriter& first_writer = writers.front();
3641 ManifestWriter* last_writer = &first_writer;
3642
3643 assert(!manifest_writers_.empty());
3644 assert(manifest_writers_.front() == &first_writer);
3645
3646 autovector<VersionEdit*> batch_edits;
3647 autovector<Version*> versions;
3648 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3649 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3650
3651 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3652 // No group commits for column family add or drop
3653 LogAndApplyCFHelper(first_writer.edit_list.front());
3654 batch_edits.push_back(first_writer.edit_list.front());
3655 } else {
3656 auto it = manifest_writers_.cbegin();
3657 size_t group_start = std::numeric_limits<size_t>::max();
3658 while (it != manifest_writers_.cend()) {
3659 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3660 // no group commits for column family add or drop
3661 break;
3662 }
3663 last_writer = *(it++);
3664 assert(last_writer != nullptr);
3665 assert(last_writer->cfd != nullptr);
3666 if (last_writer->cfd->IsDropped()) {
3667 // If we detect a dropped CF at this point, and the corresponding
3668 // version edits belong to an atomic group, then we need to find out
3669 // the preceding version edits in the same atomic group, and update
3670 // their `remaining_entries_` member variable because we are NOT going
3671 // to write the version edits' of dropped CF to the MANIFEST. If we
3672 // don't update, then Recover can report corrupted atomic group because
3673 // the `remaining_entries_` do not match.
3674 if (!batch_edits.empty()) {
3675 if (batch_edits.back()->is_in_atomic_group_ &&
3676 batch_edits.back()->remaining_entries_ > 0) {
3677 assert(group_start < batch_edits.size());
3678 const auto& edit_list = last_writer->edit_list;
3679 size_t k = 0;
3680 while (k < edit_list.size()) {
3681 if (!edit_list[k]->is_in_atomic_group_) {
3682 break;
3683 } else if (edit_list[k]->remaining_entries_ == 0) {
3684 ++k;
3685 break;
3686 }
3687 ++k;
3688 }
3689 for (auto i = group_start; i < batch_edits.size(); ++i) {
3690 assert(static_cast<uint32_t>(k) <=
3691 batch_edits.back()->remaining_entries_);
3692 batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3693 }
3694 }
3695 }
3696 continue;
3697 }
3698 // We do a linear search on versions because versions is small.
3699 // TODO(yanqin) maybe consider unordered_map
3700 Version* version = nullptr;
3701 VersionBuilder* builder = nullptr;
3702 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3703 uint32_t cf_id = last_writer->cfd->GetID();
3704 if (versions[i]->cfd()->GetID() == cf_id) {
3705 version = versions[i];
3706 assert(!builder_guards.empty() &&
3707 builder_guards.size() == versions.size());
3708 builder = builder_guards[i]->version_builder();
3709 TEST_SYNC_POINT_CALLBACK(
3710 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3711 break;
3712 }
3713 }
3714 if (version == nullptr) {
3715 version = new Version(last_writer->cfd, this, file_options_,
3716 last_writer->mutable_cf_options,
3717 current_version_number_++);
3718 versions.push_back(version);
3719 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3720 builder_guards.emplace_back(
3721 new BaseReferencedVersionBuilder(last_writer->cfd));
3722 builder = builder_guards.back()->version_builder();
3723 }
3724 assert(builder != nullptr); // make checker happy
3725 for (const auto& e : last_writer->edit_list) {
3726 if (e->is_in_atomic_group_) {
3727 if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3728 (batch_edits.back()->is_in_atomic_group_ &&
3729 batch_edits.back()->remaining_entries_ == 0)) {
3730 group_start = batch_edits.size();
3731 }
3732 } else if (group_start != std::numeric_limits<size_t>::max()) {
3733 group_start = std::numeric_limits<size_t>::max();
3734 }
3735 Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3736 if (!s.ok()) {
3737 // free up the allocated memory
3738 for (auto v : versions) {
3739 delete v;
3740 }
3741 return s;
3742 }
3743 batch_edits.push_back(e);
3744 }
3745 }
3746 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3747 assert(!builder_guards.empty() &&
3748 builder_guards.size() == versions.size());
3749 auto* builder = builder_guards[i]->version_builder();
3750 Status s = builder->SaveTo(versions[i]->storage_info());
3751 if (!s.ok()) {
3752 // free up the allocated memory
3753 for (auto v : versions) {
3754 delete v;
3755 }
3756 return s;
3757 }
3758 }
3759 }
3760
3761 #ifndef NDEBUG
3762 // Verify that version edits of atomic groups have correct
3763 // remaining_entries_.
3764 size_t k = 0;
3765 while (k < batch_edits.size()) {
3766 while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
3767 ++k;
3768 }
3769 if (k == batch_edits.size()) {
3770 break;
3771 }
3772 size_t i = k;
3773 while (i < batch_edits.size()) {
3774 if (!batch_edits[i]->is_in_atomic_group_) {
3775 break;
3776 }
3777 assert(i - k + batch_edits[i]->remaining_entries_ ==
3778 batch_edits[k]->remaining_entries_);
3779 if (batch_edits[i]->remaining_entries_ == 0) {
3780 ++i;
3781 break;
3782 }
3783 ++i;
3784 }
3785 assert(batch_edits[i - 1]->is_in_atomic_group_);
3786 assert(0 == batch_edits[i - 1]->remaining_entries_);
3787 std::vector<VersionEdit*> tmp;
3788 for (size_t j = k; j != i; ++j) {
3789 tmp.emplace_back(batch_edits[j]);
3790 }
3791 TEST_SYNC_POINT_CALLBACK(
3792 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
3793 k = i;
3794 }
3795 #endif // NDEBUG
3796
3797 uint64_t new_manifest_file_size = 0;
3798 Status s;
3799 IOStatus io_s;
3800
3801 assert(pending_manifest_file_number_ == 0);
3802 if (!descriptor_log_ ||
3803 manifest_file_size_ > db_options_->max_manifest_file_size) {
3804 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
3805 new_descriptor_log = true;
3806 } else {
3807 pending_manifest_file_number_ = manifest_file_number_;
3808 }
3809
3810 // Local cached copy of state variable(s). WriteCurrentStateToManifest()
3811 // reads its content after releasing db mutex to avoid race with
3812 // SwitchMemtable().
3813 std::unordered_map<uint32_t, MutableCFState> curr_state;
3814 if (new_descriptor_log) {
3815 pending_manifest_file_number_ = NewFileNumber();
3816 batch_edits.back()->SetNextFile(next_file_number_.load());
3817
3818 // if we are writing out new snapshot make sure to persist max column
3819 // family.
3820 if (column_family_set_->GetMaxColumnFamily() > 0) {
3821 first_writer.edit_list.front()->SetMaxColumnFamily(
3822 column_family_set_->GetMaxColumnFamily());
3823 }
3824 for (const auto* cfd : *column_family_set_) {
3825 assert(curr_state.find(cfd->GetID()) == curr_state.end());
3826 curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
3827 }
3828 }
3829
3830 {
3831 FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
3832 mu->Unlock();
3833
3834 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3835 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3836 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3837 assert(!builder_guards.empty() &&
3838 builder_guards.size() == versions.size());
3839 assert(!mutable_cf_options_ptrs.empty() &&
3840 builder_guards.size() == versions.size());
3841 ColumnFamilyData* cfd = versions[i]->cfd_;
3842 s = builder_guards[i]->version_builder()->LoadTableHandlers(
3843 cfd->internal_stats(), 1 /* max_threads */,
3844 true /* prefetch_index_and_filter_in_cache */,
3845 false /* is_initial_load */,
3846 mutable_cf_options_ptrs[i]->prefix_extractor.get());
3847 if (!s.ok()) {
3848 if (db_options_->paranoid_checks) {
3849 break;
3850 }
3851 s = Status::OK();
3852 }
3853 }
3854 }
3855
3856 if (s.ok() && new_descriptor_log) {
3857 // This is fine because everything inside of this block is serialized --
3858 // only one thread can be here at the same time
3859 // create new manifest file
3860 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
3861 pending_manifest_file_number_);
3862 std::string descriptor_fname =
3863 DescriptorFileName(dbname_, pending_manifest_file_number_);
3864 std::unique_ptr<FSWritableFile> descriptor_file;
3865 s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3866 opt_file_opts);
3867 if (s.ok()) {
3868 descriptor_file->SetPreallocationBlockSize(
3869 db_options_->manifest_preallocation_size);
3870
3871 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3872 std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
3873 nullptr, db_options_->listeners));
3874 descriptor_log_.reset(
3875 new log::Writer(std::move(file_writer), 0, false));
3876 s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
3877 }
3878 }
3879
3880 if (s.ok()) {
3881 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3882 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3883 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
3884 }
3885 }
3886
3887 // Write new records to MANIFEST log
3888 #ifndef NDEBUG
3889 size_t idx = 0;
3890 #endif
3891 for (auto& e : batch_edits) {
3892 std::string record;
3893 if (!e->EncodeTo(&record)) {
3894 s = Status::Corruption("Unable to encode VersionEdit:" +
3895 e->DebugString(true));
3896 break;
3897 }
3898 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3899 rocksdb_kill_odds * REDUCE_ODDS2);
3900 #ifndef NDEBUG
3901 if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3902 TEST_SYNC_POINT_CALLBACK(
3903 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3904 nullptr);
3905 TEST_SYNC_POINT(
3906 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3907 }
3908 ++idx;
3909 #endif /* !NDEBUG */
3910 io_s = descriptor_log_->AddRecord(record);
3911 if (!io_s.ok()) {
3912 io_status_ = io_s;
3913 s = io_s;
3914 break;
3915 }
3916 }
3917 if (s.ok()) {
3918 io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
3919 }
3920 if (!io_s.ok()) {
3921 io_status_ = io_s;
3922 s = io_s;
3923 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3924 s.ToString().c_str());
3925 }
3926 }
3927
3928 // If we just created a new descriptor file, install it by writing a
3929 // new CURRENT file that points to it.
3930 if (s.ok() && new_descriptor_log) {
3931 io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
3932 db_directory);
3933 if (!io_s.ok()) {
3934 io_status_ = io_s;
3935 s = io_s;
3936 }
3937 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3938 }
3939
3940 if (s.ok()) {
3941 // find offset in manifest file where this version is stored.
3942 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
3943 }
3944
3945 if (first_writer.edit_list.front()->is_column_family_drop_) {
3946 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3947 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3948 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3949 }
3950
3951 LogFlush(db_options_->info_log);
3952 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3953 mu->Lock();
3954 }
3955
3956 // Append the old manifest file to the obsolete_manifest_ list to be deleted
3957 // by PurgeObsoleteFiles later.
3958 if (s.ok() && new_descriptor_log) {
3959 obsolete_manifests_.emplace_back(
3960 DescriptorFileName("", manifest_file_number_));
3961 }
3962
3963 // Install the new versions
3964 if (s.ok()) {
3965 if (first_writer.edit_list.front()->is_column_family_add_) {
3966 assert(batch_edits.size() == 1);
3967 assert(new_cf_options != nullptr);
3968 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3969 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3970 assert(batch_edits.size() == 1);
3971 first_writer.cfd->SetDropped();
3972 first_writer.cfd->UnrefAndTryDelete();
3973 } else {
3974 // Each version in versions corresponds to a column family.
3975 // For each column family, update its log number indicating that logs
3976 // with number smaller than this should be ignored.
3977 for (const auto version : versions) {
3978 uint64_t max_log_number_in_batch = 0;
3979 uint32_t cf_id = version->cfd_->GetID();
3980 for (const auto& e : batch_edits) {
3981 if (e->has_log_number_ && e->column_family_ == cf_id) {
3982 max_log_number_in_batch =
3983 std::max(max_log_number_in_batch, e->log_number_);
3984 }
3985 }
3986 if (max_log_number_in_batch != 0) {
3987 assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3988 version->cfd_->SetLogNumber(max_log_number_in_batch);
3989 }
3990 }
3991
3992 uint64_t last_min_log_number_to_keep = 0;
3993 for (auto& e : batch_edits) {
3994 if (e->has_min_log_number_to_keep_) {
3995 last_min_log_number_to_keep =
3996 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
3997 }
3998 }
3999
4000 if (last_min_log_number_to_keep != 0) {
4001 // Should only be set in 2PC mode.
4002 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4003 }
4004
4005 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4006 ColumnFamilyData* cfd = versions[i]->cfd_;
4007 AppendVersion(cfd, versions[i]);
4008 }
4009 }
4010 manifest_file_number_ = pending_manifest_file_number_;
4011 manifest_file_size_ = new_manifest_file_size;
4012 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4013 } else {
4014 std::string version_edits;
4015 for (auto& e : batch_edits) {
4016 version_edits += ("\n" + e->DebugString(true));
4017 }
4018 ROCKS_LOG_ERROR(db_options_->info_log,
4019 "Error in committing version edit to MANIFEST: %s",
4020 version_edits.c_str());
4021 for (auto v : versions) {
4022 delete v;
4023 }
4024 // If manifest append failed for whatever reason, the file could be
4025 // corrupted. So we need to force the next version update to start a
4026 // new manifest file.
4027 descriptor_log_.reset();
4028 if (new_descriptor_log) {
4029 ROCKS_LOG_INFO(db_options_->info_log,
4030 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4031 "\n",
4032 manifest_file_number_, pending_manifest_file_number_);
4033 env_->DeleteFile(
4034 DescriptorFileName(dbname_, pending_manifest_file_number_));
4035 }
4036 }
4037
4038 pending_manifest_file_number_ = 0;
4039
4040 // wake up all the waiting writers
4041 while (true) {
4042 ManifestWriter* ready = manifest_writers_.front();
4043 manifest_writers_.pop_front();
4044 bool need_signal = true;
4045 for (const auto& w : writers) {
4046 if (&w == ready) {
4047 need_signal = false;
4048 break;
4049 }
4050 }
4051 ready->status = s;
4052 ready->done = true;
4053 if (need_signal) {
4054 ready->cv.Signal();
4055 }
4056 if (ready == last_writer) {
4057 break;
4058 }
4059 }
4060 if (!manifest_writers_.empty()) {
4061 manifest_writers_.front()->cv.Signal();
4062 }
4063 return s;
4064 }
4065
4066 // 'datas' is gramatically incorrect. We still use this notation to indicate
4067 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4068 Status VersionSet::LogAndApply(
4069 const autovector<ColumnFamilyData*>& column_family_datas,
4070 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4071 const autovector<autovector<VersionEdit*>>& edit_lists,
4072 InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4073 const ColumnFamilyOptions* new_cf_options) {
4074 mu->AssertHeld();
4075 int num_edits = 0;
4076 for (const auto& elist : edit_lists) {
4077 num_edits += static_cast<int>(elist.size());
4078 }
4079 if (num_edits == 0) {
4080 return Status::OK();
4081 } else if (num_edits > 1) {
4082 #ifndef NDEBUG
4083 for (const auto& edit_list : edit_lists) {
4084 for (const auto& edit : edit_list) {
4085 assert(!edit->IsColumnFamilyManipulation());
4086 }
4087 }
4088 #endif /* ! NDEBUG */
4089 }
4090
4091 int num_cfds = static_cast<int>(column_family_datas.size());
4092 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4093 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4094 assert(edit_lists[0][0]->is_column_family_add_);
4095 assert(new_cf_options != nullptr);
4096 }
4097 std::deque<ManifestWriter> writers;
4098 if (num_cfds > 0) {
4099 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4100 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4101 }
4102 for (int i = 0; i < num_cfds; ++i) {
4103 writers.emplace_back(mu, column_family_datas[i],
4104 *mutable_cf_options_list[i], edit_lists[i]);
4105 manifest_writers_.push_back(&writers[i]);
4106 }
4107 assert(!writers.empty());
4108 ManifestWriter& first_writer = writers.front();
4109 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4110 first_writer.cv.Wait();
4111 }
4112 if (first_writer.done) {
4113 // All non-CF-manipulation operations can be grouped together and committed
4114 // to MANIFEST. They should all have finished. The status code is stored in
4115 // the first manifest writer.
4116 #ifndef NDEBUG
4117 for (const auto& writer : writers) {
4118 assert(writer.done);
4119 }
4120 #endif /* !NDEBUG */
4121 return first_writer.status;
4122 }
4123
4124 int num_undropped_cfds = 0;
4125 for (auto cfd : column_family_datas) {
4126 // if cfd == nullptr, it is a column family add.
4127 if (cfd == nullptr || !cfd->IsDropped()) {
4128 ++num_undropped_cfds;
4129 }
4130 }
4131 if (0 == num_undropped_cfds) {
4132 for (int i = 0; i != num_cfds; ++i) {
4133 manifest_writers_.pop_front();
4134 }
4135 // Notify new head of manifest write queue.
4136 if (!manifest_writers_.empty()) {
4137 manifest_writers_.front()->cv.Signal();
4138 }
4139 return Status::ColumnFamilyDropped();
4140 }
4141
4142 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4143 new_cf_options);
4144 }
4145
LogAndApplyCFHelper(VersionEdit * edit)4146 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4147 assert(edit->IsColumnFamilyManipulation());
4148 edit->SetNextFile(next_file_number_.load());
4149 // The log might have data that is not visible to memtbale and hence have not
4150 // updated the last_sequence_ yet. It is also possible that the log has is
4151 // expecting some new data that is not written yet. Since LastSequence is an
4152 // upper bound on the sequence, it is ok to record
4153 // last_allocated_sequence_ as the last sequence.
4154 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4155 : last_sequence_);
4156 if (edit->is_column_family_drop_) {
4157 // if we drop column family, we have to make sure to save max column family,
4158 // so that we don't reuse existing ID
4159 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4160 }
4161 }
4162
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4163 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4164 VersionBuilder* builder, VersionEdit* edit,
4165 InstrumentedMutex* mu) {
4166 #ifdef NDEBUG
4167 (void)cfd;
4168 #endif
4169 mu->AssertHeld();
4170 assert(!edit->IsColumnFamilyManipulation());
4171
4172 if (edit->has_log_number_) {
4173 assert(edit->log_number_ >= cfd->GetLogNumber());
4174 assert(edit->log_number_ < next_file_number_.load());
4175 }
4176
4177 if (!edit->has_prev_log_number_) {
4178 edit->SetPrevLogNumber(prev_log_number_);
4179 }
4180 edit->SetNextFile(next_file_number_.load());
4181 // The log might have data that is not visible to memtbale and hence have not
4182 // updated the last_sequence_ yet. It is also possible that the log has is
4183 // expecting some new data that is not written yet. Since LastSequence is an
4184 // upper bound on the sequence, it is ok to record
4185 // last_allocated_sequence_ as the last sequence.
4186 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4187 : last_sequence_);
4188
4189 Status s = builder->Apply(edit);
4190
4191 return s;
4192 }
4193
ApplyOneVersionEditToBuilder(VersionEdit & edit,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params)4194 Status VersionSet::ApplyOneVersionEditToBuilder(
4195 VersionEdit& edit,
4196 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4197 std::unordered_map<int, std::string>& column_families_not_found,
4198 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4199 builders,
4200 VersionEditParams* version_edit_params) {
4201 // Not found means that user didn't supply that column
4202 // family option AND we encountered column family add
4203 // record. Once we encounter column family drop record,
4204 // we will delete the column family from
4205 // column_families_not_found.
4206 bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
4207 column_families_not_found.end());
4208 // in builders means that user supplied that column family
4209 // option AND that we encountered column family add record
4210 bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
4211
4212 // they can't both be true
4213 assert(!(cf_in_not_found && cf_in_builders));
4214
4215 ColumnFamilyData* cfd = nullptr;
4216
4217 if (edit.is_column_family_add_) {
4218 if (cf_in_builders || cf_in_not_found) {
4219 return Status::Corruption(
4220 "Manifest adding the same column family twice: " +
4221 edit.column_family_name_);
4222 }
4223 auto cf_options = name_to_options.find(edit.column_family_name_);
4224 // implicitly add persistent_stats column family without requiring user
4225 // to specify
4226 bool is_persistent_stats_column_family =
4227 edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
4228 if (cf_options == name_to_options.end() &&
4229 !is_persistent_stats_column_family) {
4230 column_families_not_found.insert(
4231 {edit.column_family_, edit.column_family_name_});
4232 } else {
4233 // recover persistent_stats CF from a DB that already contains it
4234 if (is_persistent_stats_column_family) {
4235 ColumnFamilyOptions cfo;
4236 OptimizeForPersistentStats(&cfo);
4237 cfd = CreateColumnFamily(cfo, &edit);
4238 } else {
4239 cfd = CreateColumnFamily(cf_options->second, &edit);
4240 }
4241 cfd->set_initialized();
4242 builders.insert(std::make_pair(
4243 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4244 new BaseReferencedVersionBuilder(cfd))));
4245 }
4246 } else if (edit.is_column_family_drop_) {
4247 if (cf_in_builders) {
4248 auto builder = builders.find(edit.column_family_);
4249 assert(builder != builders.end());
4250 builders.erase(builder);
4251 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4252 assert(cfd != nullptr);
4253 if (cfd->UnrefAndTryDelete()) {
4254 cfd = nullptr;
4255 } else {
4256 // who else can have reference to cfd!?
4257 assert(false);
4258 }
4259 } else if (cf_in_not_found) {
4260 column_families_not_found.erase(edit.column_family_);
4261 } else {
4262 return Status::Corruption(
4263 "Manifest - dropping non-existing column family");
4264 }
4265 } else if (!cf_in_not_found) {
4266 if (!cf_in_builders) {
4267 return Status::Corruption(
4268 "Manifest record referencing unknown column family");
4269 }
4270
4271 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4272 // this should never happen since cf_in_builders is true
4273 assert(cfd != nullptr);
4274
4275 // if it is not column family add or column family drop,
4276 // then it's a file add/delete, which should be forwarded
4277 // to builder
4278 auto builder = builders.find(edit.column_family_);
4279 assert(builder != builders.end());
4280 Status s = builder->second->version_builder()->Apply(&edit);
4281 if (!s.ok()) {
4282 return s;
4283 }
4284 }
4285 return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4286 }
4287
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & from_edit,VersionEditParams * version_edit_params)4288 Status VersionSet::ExtractInfoFromVersionEdit(
4289 ColumnFamilyData* cfd, const VersionEdit& from_edit,
4290 VersionEditParams* version_edit_params) {
4291 if (cfd != nullptr) {
4292 if (from_edit.has_db_id_) {
4293 version_edit_params->SetDBId(from_edit.db_id_);
4294 }
4295 if (from_edit.has_log_number_) {
4296 if (cfd->GetLogNumber() > from_edit.log_number_) {
4297 ROCKS_LOG_WARN(
4298 db_options_->info_log,
4299 "MANIFEST corruption detected, but ignored - Log numbers in "
4300 "records NOT monotonically increasing");
4301 } else {
4302 cfd->SetLogNumber(from_edit.log_number_);
4303 version_edit_params->SetLogNumber(from_edit.log_number_);
4304 }
4305 }
4306 if (from_edit.has_comparator_ &&
4307 from_edit.comparator_ != cfd->user_comparator()->Name()) {
4308 return Status::InvalidArgument(
4309 cfd->user_comparator()->Name(),
4310 "does not match existing comparator " + from_edit.comparator_);
4311 }
4312 }
4313
4314 if (from_edit.has_prev_log_number_) {
4315 version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4316 }
4317
4318 if (from_edit.has_next_file_number_) {
4319 version_edit_params->SetNextFile(from_edit.next_file_number_);
4320 }
4321
4322 if (from_edit.has_max_column_family_) {
4323 version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4324 }
4325
4326 if (from_edit.has_min_log_number_to_keep_) {
4327 version_edit_params->min_log_number_to_keep_ =
4328 std::max(version_edit_params->min_log_number_to_keep_,
4329 from_edit.min_log_number_to_keep_);
4330 }
4331
4332 if (from_edit.has_last_sequence_) {
4333 version_edit_params->SetLastSequence(from_edit.last_sequence_);
4334 }
4335 return Status::OK();
4336 }
4337
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4338 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4339 FileSystem* fs,
4340 std::string* manifest_path,
4341 uint64_t* manifest_file_number) {
4342 assert(fs != nullptr);
4343 assert(manifest_path != nullptr);
4344 assert(manifest_file_number != nullptr);
4345
4346 std::string fname;
4347 Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4348 if (!s.ok()) {
4349 return s;
4350 }
4351 if (fname.empty() || fname.back() != '\n') {
4352 return Status::Corruption("CURRENT file does not end with newline");
4353 }
4354 // remove the trailing '\n'
4355 fname.resize(fname.size() - 1);
4356 FileType type;
4357 bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4358 if (!parse_ok || type != kDescriptorFile) {
4359 return Status::Corruption("CURRENT file corrupted");
4360 }
4361 *manifest_path = dbname;
4362 if (dbname.back() != '/') {
4363 manifest_path->push_back('/');
4364 }
4365 *manifest_path += fname;
4366 return Status::OK();
4367 }
4368
ReadAndRecover(log::Reader * reader,AtomicGroupReadBuffer * read_buffer,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params,std::string * db_id)4369 Status VersionSet::ReadAndRecover(
4370 log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
4371 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4372 std::unordered_map<int, std::string>& column_families_not_found,
4373 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4374 builders,
4375 VersionEditParams* version_edit_params, std::string* db_id) {
4376 assert(reader != nullptr);
4377 assert(read_buffer != nullptr);
4378 Status s;
4379 Slice record;
4380 std::string scratch;
4381 size_t recovered_edits = 0;
4382 while (reader->ReadRecord(&record, &scratch) && s.ok()) {
4383 VersionEdit edit;
4384 s = edit.DecodeFrom(record);
4385 if (!s.ok()) {
4386 break;
4387 }
4388 if (edit.has_db_id_) {
4389 db_id_ = edit.GetDbId();
4390 if (db_id != nullptr) {
4391 db_id->assign(edit.GetDbId());
4392 }
4393 }
4394 s = read_buffer->AddEdit(&edit);
4395 if (!s.ok()) {
4396 break;
4397 }
4398 if (edit.is_in_atomic_group_) {
4399 if (read_buffer->IsFull()) {
4400 // Apply edits in an atomic group when we have read all edits in the
4401 // group.
4402 for (auto& e : read_buffer->replay_buffer()) {
4403 s = ApplyOneVersionEditToBuilder(e, name_to_options,
4404 column_families_not_found, builders,
4405 version_edit_params);
4406 if (!s.ok()) {
4407 break;
4408 }
4409 recovered_edits++;
4410 }
4411 if (!s.ok()) {
4412 break;
4413 }
4414 read_buffer->Clear();
4415 }
4416 } else {
4417 // Apply a normal edit immediately.
4418 s = ApplyOneVersionEditToBuilder(edit, name_to_options,
4419 column_families_not_found, builders,
4420 version_edit_params);
4421 if (s.ok()) {
4422 recovered_edits++;
4423 }
4424 }
4425 }
4426 if (!s.ok()) {
4427 // Clear the buffer if we fail to decode/apply an edit.
4428 read_buffer->Clear();
4429 }
4430 TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4431 &recovered_edits);
4432 return s;
4433 }
4434
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4435 Status VersionSet::Recover(
4436 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4437 std::string* db_id) {
4438 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4439 for (const auto& cf : column_families) {
4440 cf_name_to_options.emplace(cf.name, cf.options);
4441 }
4442 // keeps track of column families in manifest that were not found in
4443 // column families parameters. if those column families are not dropped
4444 // by subsequent manifest records, Recover() will return failure status
4445 std::unordered_map<int, std::string> column_families_not_found;
4446
4447 // Read "CURRENT" file, which contains a pointer to the current manifest file
4448 std::string manifest_path;
4449 Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
4450 &manifest_file_number_);
4451 if (!s.ok()) {
4452 return s;
4453 }
4454
4455 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4456 manifest_path.c_str());
4457
4458 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4459 {
4460 std::unique_ptr<FSSequentialFile> manifest_file;
4461 s = fs_->NewSequentialFile(manifest_path,
4462 fs_->OptimizeForManifestRead(file_options_),
4463 &manifest_file, nullptr);
4464 if (!s.ok()) {
4465 return s;
4466 }
4467 manifest_file_reader.reset(
4468 new SequentialFileReader(std::move(manifest_file), manifest_path,
4469 db_options_->log_readahead_size));
4470 }
4471
4472 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4473 builders;
4474
4475 // add default column family
4476 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
4477 if (default_cf_iter == cf_name_to_options.end()) {
4478 return Status::InvalidArgument("Default column family not specified");
4479 }
4480 VersionEdit default_cf_edit;
4481 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4482 default_cf_edit.SetColumnFamily(0);
4483 ColumnFamilyData* default_cfd =
4484 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4485 // In recovery, nobody else can access it, so it's fine to set it to be
4486 // initialized earlier.
4487 default_cfd->set_initialized();
4488 builders.insert(
4489 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4490 new BaseReferencedVersionBuilder(default_cfd))));
4491 uint64_t current_manifest_file_size = 0;
4492 VersionEditParams version_edit_params;
4493 {
4494 VersionSet::LogReporter reporter;
4495 reporter.status = &s;
4496 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4497 true /* checksum */, 0 /* log_number */);
4498 AtomicGroupReadBuffer read_buffer;
4499 s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
4500 column_families_not_found, builders,
4501 &version_edit_params, db_id);
4502 current_manifest_file_size = reader.GetReadOffset();
4503 assert(current_manifest_file_size != 0);
4504 }
4505
4506 if (s.ok()) {
4507 if (!version_edit_params.has_next_file_number_) {
4508 s = Status::Corruption("no meta-nextfile entry in descriptor");
4509 } else if (!version_edit_params.has_log_number_) {
4510 s = Status::Corruption("no meta-lognumber entry in descriptor");
4511 } else if (!version_edit_params.has_last_sequence_) {
4512 s = Status::Corruption("no last-sequence-number entry in descriptor");
4513 }
4514
4515 if (!version_edit_params.has_prev_log_number_) {
4516 version_edit_params.SetPrevLogNumber(0);
4517 }
4518
4519 column_family_set_->UpdateMaxColumnFamily(
4520 version_edit_params.max_column_family_);
4521
4522 // When reading DB generated using old release, min_log_number_to_keep=0.
4523 // All log files will be scanned for potential prepare entries.
4524 MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
4525 MarkFileNumberUsed(version_edit_params.prev_log_number_);
4526 MarkFileNumberUsed(version_edit_params.log_number_);
4527 }
4528
4529 // there were some column families in the MANIFEST that weren't specified
4530 // in the argument. This is OK in read_only mode
4531 if (read_only == false && !column_families_not_found.empty()) {
4532 std::string list_of_not_found;
4533 for (const auto& cf : column_families_not_found) {
4534 list_of_not_found += ", " + cf.second;
4535 }
4536 list_of_not_found = list_of_not_found.substr(2);
4537 s = Status::InvalidArgument(
4538 "You have to open all column families. Column families not opened: " +
4539 list_of_not_found);
4540 }
4541
4542 if (s.ok()) {
4543 for (auto cfd : *column_family_set_) {
4544 assert(builders.count(cfd->GetID()) > 0);
4545 auto* builder = builders[cfd->GetID()]->version_builder();
4546 if (!builder->CheckConsistencyForNumLevels()) {
4547 s = Status::InvalidArgument(
4548 "db has more levels than options.num_levels");
4549 break;
4550 }
4551 }
4552 }
4553
4554 if (s.ok()) {
4555 for (auto cfd : *column_family_set_) {
4556 if (cfd->IsDropped()) {
4557 continue;
4558 }
4559 if (read_only) {
4560 cfd->table_cache()->SetTablesAreImmortal();
4561 }
4562 assert(cfd->initialized());
4563 auto builders_iter = builders.find(cfd->GetID());
4564 assert(builders_iter != builders.end());
4565 auto builder = builders_iter->second->version_builder();
4566
4567 // unlimited table cache. Pre-load table handle now.
4568 // Need to do it out of the mutex.
4569 s = builder->LoadTableHandlers(
4570 cfd->internal_stats(), db_options_->max_file_opening_threads,
4571 false /* prefetch_index_and_filter_in_cache */,
4572 true /* is_initial_load */,
4573 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4574 if (!s.ok()) {
4575 if (db_options_->paranoid_checks) {
4576 return s;
4577 }
4578 s = Status::OK();
4579 }
4580
4581 Version* v = new Version(cfd, this, file_options_,
4582 *cfd->GetLatestMutableCFOptions(),
4583 current_version_number_++);
4584 builder->SaveTo(v->storage_info());
4585
4586 // Install recovered version
4587 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
4588 !(db_options_->skip_stats_update_on_db_open));
4589 AppendVersion(cfd, v);
4590 }
4591
4592 manifest_file_size_ = current_manifest_file_size;
4593 next_file_number_.store(version_edit_params.next_file_number_ + 1);
4594 last_allocated_sequence_ = version_edit_params.last_sequence_;
4595 last_published_sequence_ = version_edit_params.last_sequence_;
4596 last_sequence_ = version_edit_params.last_sequence_;
4597 prev_log_number_ = version_edit_params.prev_log_number_;
4598
4599 ROCKS_LOG_INFO(
4600 db_options_->info_log,
4601 "Recovered from manifest file:%s succeeded,"
4602 "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4603 ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4604 ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4605 ",min_log_number_to_keep is %" PRIu64 "\n",
4606 manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4607 last_sequence_.load(), version_edit_params.log_number_,
4608 prev_log_number_, column_family_set_->GetMaxColumnFamily(),
4609 min_log_number_to_keep_2pc());
4610
4611 for (auto cfd : *column_family_set_) {
4612 if (cfd->IsDropped()) {
4613 continue;
4614 }
4615 ROCKS_LOG_INFO(db_options_->info_log,
4616 "Column family [%s] (ID %" PRIu32
4617 "), log number is %" PRIu64 "\n",
4618 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4619 }
4620 }
4621
4622 return s;
4623 }
4624
4625 namespace {
4626 class ManifestPicker {
4627 public:
4628 explicit ManifestPicker(const std::string& dbname, FileSystem* fs);
4629 void SeekToFirstManifest();
4630 // REQUIRES Valid() == true
4631 std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
Valid() const4632 bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
status() const4633 const Status& status() const { return status_; }
4634
4635 private:
4636 const std::string& dbname_;
4637 FileSystem* const fs_;
4638 // MANIFEST file names(s)
4639 std::vector<std::string> manifest_files_;
4640 std::vector<std::string>::const_iterator manifest_file_iter_;
4641 Status status_;
4642 };
4643
ManifestPicker(const std::string & dbname,FileSystem * fs)4644 ManifestPicker::ManifestPicker(const std::string& dbname, FileSystem* fs)
4645 : dbname_(dbname), fs_(fs) {}
4646
SeekToFirstManifest()4647 void ManifestPicker::SeekToFirstManifest() {
4648 assert(fs_ != nullptr);
4649 std::vector<std::string> children;
4650 Status s = fs_->GetChildren(dbname_, IOOptions(), &children, /*dbg=*/nullptr);
4651 if (!s.ok()) {
4652 status_ = s;
4653 return;
4654 }
4655 for (const auto& fname : children) {
4656 uint64_t file_num = 0;
4657 FileType file_type;
4658 bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4659 if (parse_ok && file_type == kDescriptorFile) {
4660 manifest_files_.push_back(fname);
4661 }
4662 }
4663 std::sort(manifest_files_.begin(), manifest_files_.end(),
4664 [](const std::string& lhs, const std::string& rhs) {
4665 uint64_t num1 = 0;
4666 uint64_t num2 = 0;
4667 FileType type1;
4668 FileType type2;
4669 bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4670 bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4671 #ifndef NDEBUG
4672 assert(parse_ok1);
4673 assert(parse_ok2);
4674 #else
4675 (void)parse_ok1;
4676 (void)parse_ok2;
4677 #endif
4678 return num1 > num2;
4679 });
4680 manifest_file_iter_ = manifest_files_.begin();
4681 }
4682
GetNextManifest(uint64_t * number,std::string * file_name)4683 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4684 std::string* file_name) {
4685 assert(status_.ok());
4686 assert(Valid());
4687 std::string ret;
4688 if (manifest_file_iter_ != manifest_files_.end()) {
4689 ret.assign(dbname_);
4690 if (ret.back() != kFilePathSeparator) {
4691 ret.push_back(kFilePathSeparator);
4692 }
4693 ret.append(*manifest_file_iter_);
4694 if (number) {
4695 FileType type;
4696 bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4697 assert(type == kDescriptorFile);
4698 #ifndef NDEBUG
4699 assert(parse);
4700 #else
4701 (void)parse;
4702 #endif
4703 }
4704 if (file_name) {
4705 *file_name = *manifest_file_iter_;
4706 }
4707 ++manifest_file_iter_;
4708 }
4709 return ret;
4710 }
4711 } // namespace
4712
TryRecover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)4713 Status VersionSet::TryRecover(
4714 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4715 std::string* db_id, bool* has_missing_table_file) {
4716 ManifestPicker manifest_picker(dbname_, fs_);
4717 manifest_picker.SeekToFirstManifest();
4718 Status s = manifest_picker.status();
4719 if (!s.ok()) {
4720 return s;
4721 }
4722 if (!manifest_picker.Valid()) {
4723 return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4724 }
4725 std::string manifest_path =
4726 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4727 while (!manifest_path.empty()) {
4728 s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
4729 db_id, has_missing_table_file);
4730 if (s.ok() || !manifest_picker.Valid()) {
4731 break;
4732 }
4733 Reset();
4734 manifest_path =
4735 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4736 }
4737 return s;
4738 }
4739
TryRecoverFromOneManifest(const std::string & manifest_path,const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)4740 Status VersionSet::TryRecoverFromOneManifest(
4741 const std::string& manifest_path,
4742 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4743 std::string* db_id, bool* has_missing_table_file) {
4744 ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
4745 manifest_path.c_str());
4746 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4747 Status s;
4748 {
4749 std::unique_ptr<FSSequentialFile> manifest_file;
4750 s = fs_->NewSequentialFile(manifest_path,
4751 fs_->OptimizeForManifestRead(file_options_),
4752 &manifest_file, nullptr);
4753 if (!s.ok()) {
4754 return s;
4755 }
4756 manifest_file_reader.reset(
4757 new SequentialFileReader(std::move(manifest_file), manifest_path,
4758 db_options_->log_readahead_size));
4759 }
4760
4761 VersionSet::LogReporter reporter;
4762 reporter.status = &s;
4763 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4764 /*checksum=*/true, /*log_num=*/0);
4765 {
4766 VersionEditHandlerPointInTime handler_pit(read_only, column_families,
4767 const_cast<VersionSet*>(this));
4768
4769 s = handler_pit.Iterate(reader, db_id);
4770
4771 assert(nullptr != has_missing_table_file);
4772 *has_missing_table_file = handler_pit.HasMissingFiles();
4773 }
4774
4775 return s;
4776 }
4777
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)4778 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4779 const std::string& dbname,
4780 FileSystem* fs) {
4781 // these are just for performance reasons, not correcntes,
4782 // so we're fine using the defaults
4783 FileOptions soptions;
4784 // Read "CURRENT" file, which contains a pointer to the current manifest file
4785 std::string manifest_path;
4786 uint64_t manifest_file_number;
4787 Status s =
4788 GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4789 if (!s.ok()) {
4790 return s;
4791 }
4792
4793 std::unique_ptr<SequentialFileReader> file_reader;
4794 {
4795 std::unique_ptr<FSSequentialFile> file;
4796 s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4797 if (!s.ok()) {
4798 return s;
4799 }
4800 file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
4801 }
4802
4803 std::map<uint32_t, std::string> column_family_names;
4804 // default column family is always implicitly there
4805 column_family_names.insert({0, kDefaultColumnFamilyName});
4806 VersionSet::LogReporter reporter;
4807 reporter.status = &s;
4808 log::Reader reader(nullptr, std::move(file_reader), &reporter,
4809 true /* checksum */, 0 /* log_number */);
4810 Slice record;
4811 std::string scratch;
4812 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4813 VersionEdit edit;
4814 s = edit.DecodeFrom(record);
4815 if (!s.ok()) {
4816 break;
4817 }
4818 if (edit.is_column_family_add_) {
4819 if (column_family_names.find(edit.column_family_) !=
4820 column_family_names.end()) {
4821 s = Status::Corruption("Manifest adding the same column family twice");
4822 break;
4823 }
4824 column_family_names.insert(
4825 {edit.column_family_, edit.column_family_name_});
4826 } else if (edit.is_column_family_drop_) {
4827 if (column_family_names.find(edit.column_family_) ==
4828 column_family_names.end()) {
4829 s = Status::Corruption(
4830 "Manifest - dropping non-existing column family");
4831 break;
4832 }
4833 column_family_names.erase(edit.column_family_);
4834 }
4835 }
4836
4837 column_families->clear();
4838 if (s.ok()) {
4839 for (const auto& iter : column_family_names) {
4840 column_families->push_back(iter.second);
4841 }
4842 }
4843
4844 return s;
4845 }
4846
4847 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)4848 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4849 const Options* options,
4850 const FileOptions& file_options,
4851 int new_levels) {
4852 if (new_levels <= 1) {
4853 return Status::InvalidArgument(
4854 "Number of levels needs to be bigger than 1");
4855 }
4856
4857 ImmutableDBOptions db_options(*options);
4858 ColumnFamilyOptions cf_options(*options);
4859 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4860 options->table_cache_numshardbits));
4861 WriteController wc(options->delayed_write_rate);
4862 WriteBufferManager wb(options->db_write_buffer_size);
4863 VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4864 /*block_cache_tracer=*/nullptr);
4865 Status status;
4866
4867 std::vector<ColumnFamilyDescriptor> dummy;
4868 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4869 ColumnFamilyOptions(*options));
4870 dummy.push_back(dummy_descriptor);
4871 status = versions.Recover(dummy);
4872 if (!status.ok()) {
4873 return status;
4874 }
4875
4876 Version* current_version =
4877 versions.GetColumnFamilySet()->GetDefault()->current();
4878 auto* vstorage = current_version->storage_info();
4879 int current_levels = vstorage->num_levels();
4880
4881 if (current_levels <= new_levels) {
4882 return Status::OK();
4883 }
4884
4885 // Make sure there are file only on one level from
4886 // (new_levels-1) to (current_levels-1)
4887 int first_nonempty_level = -1;
4888 int first_nonempty_level_filenum = 0;
4889 for (int i = new_levels - 1; i < current_levels; i++) {
4890 int file_num = vstorage->NumLevelFiles(i);
4891 if (file_num != 0) {
4892 if (first_nonempty_level < 0) {
4893 first_nonempty_level = i;
4894 first_nonempty_level_filenum = file_num;
4895 } else {
4896 char msg[255];
4897 snprintf(msg, sizeof(msg),
4898 "Found at least two levels containing files: "
4899 "[%d:%d],[%d:%d].\n",
4900 first_nonempty_level, first_nonempty_level_filenum, i,
4901 file_num);
4902 return Status::InvalidArgument(msg);
4903 }
4904 }
4905 }
4906
4907 // we need to allocate an array with the old number of levels size to
4908 // avoid SIGSEGV in WriteCurrentStatetoManifest()
4909 // however, all levels bigger or equal to new_levels will be empty
4910 std::vector<FileMetaData*>* new_files_list =
4911 new std::vector<FileMetaData*>[current_levels];
4912 for (int i = 0; i < new_levels - 1; i++) {
4913 new_files_list[i] = vstorage->LevelFiles(i);
4914 }
4915
4916 if (first_nonempty_level > 0) {
4917 new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4918 }
4919
4920 delete[] vstorage -> files_;
4921 vstorage->files_ = new_files_list;
4922 vstorage->num_levels_ = new_levels;
4923
4924 MutableCFOptions mutable_cf_options(*options);
4925 VersionEdit ve;
4926 InstrumentedMutex dummy_mutex;
4927 InstrumentedMutexLock l(&dummy_mutex);
4928 return versions.LogAndApply(
4929 versions.GetColumnFamilySet()->GetDefault(),
4930 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4931 }
4932
4933 // Get the checksum information including the checksum and checksum function
4934 // name of all SST files in VersionSet. Store the information in
4935 // FileChecksumList which contains a map from file number to its checksum info.
4936 // If DB is not running, make sure call VersionSet::Recover() to load the file
4937 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)4938 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4939 // Clean the previously stored checksum information if any.
4940 if (checksum_list == nullptr) {
4941 return Status::InvalidArgument("checksum_list is nullptr");
4942 }
4943 checksum_list->reset();
4944
4945 for (auto cfd : *column_family_set_) {
4946 if (cfd->IsDropped() || !cfd->initialized()) {
4947 continue;
4948 }
4949 for (int level = 0; level < cfd->NumberLevels(); level++) {
4950 for (const auto& file :
4951 cfd->current()->storage_info()->LevelFiles(level)) {
4952 checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
4953 file->file_checksum,
4954 file->file_checksum_func_name);
4955 }
4956 }
4957 }
4958 return Status::OK();
4959 }
4960
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)4961 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4962 bool verbose, bool hex, bool json) {
4963 // Open the specified manifest file.
4964 std::unique_ptr<SequentialFileReader> file_reader;
4965 Status s;
4966 {
4967 std::unique_ptr<FSSequentialFile> file;
4968 const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
4969 s = fs->NewSequentialFile(
4970 dscname,
4971 fs->OptimizeForManifestRead(file_options_), &file,
4972 nullptr);
4973 if (!s.ok()) {
4974 return s;
4975 }
4976 file_reader.reset(new SequentialFileReader(
4977 std::move(file), dscname, db_options_->log_readahead_size));
4978 }
4979
4980 bool have_prev_log_number = false;
4981 bool have_next_file = false;
4982 bool have_last_sequence = false;
4983 uint64_t next_file = 0;
4984 uint64_t last_sequence = 0;
4985 uint64_t previous_log_number = 0;
4986 int count = 0;
4987 std::unordered_map<uint32_t, std::string> comparators;
4988 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4989 builders;
4990
4991 // add default column family
4992 VersionEdit default_cf_edit;
4993 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4994 default_cf_edit.SetColumnFamily(0);
4995 ColumnFamilyData* default_cfd =
4996 CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
4997 builders.insert(
4998 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4999 new BaseReferencedVersionBuilder(default_cfd))));
5000
5001 {
5002 VersionSet::LogReporter reporter;
5003 reporter.status = &s;
5004 log::Reader reader(nullptr, std::move(file_reader), &reporter,
5005 true /* checksum */, 0 /* log_number */);
5006 Slice record;
5007 std::string scratch;
5008 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
5009 VersionEdit edit;
5010 s = edit.DecodeFrom(record);
5011 if (!s.ok()) {
5012 break;
5013 }
5014
5015 // Write out each individual edit
5016 if (verbose && !json) {
5017 printf("%s\n", edit.DebugString(hex).c_str());
5018 } else if (json) {
5019 printf("%s\n", edit.DebugJSON(count, hex).c_str());
5020 }
5021 count++;
5022
5023 bool cf_in_builders =
5024 builders.find(edit.column_family_) != builders.end();
5025
5026 if (edit.has_comparator_) {
5027 comparators.insert({edit.column_family_, edit.comparator_});
5028 }
5029
5030 ColumnFamilyData* cfd = nullptr;
5031
5032 if (edit.is_column_family_add_) {
5033 if (cf_in_builders) {
5034 s = Status::Corruption(
5035 "Manifest adding the same column family twice");
5036 break;
5037 }
5038 cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
5039 cfd->set_initialized();
5040 builders.insert(std::make_pair(
5041 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
5042 new BaseReferencedVersionBuilder(cfd))));
5043 } else if (edit.is_column_family_drop_) {
5044 if (!cf_in_builders) {
5045 s = Status::Corruption(
5046 "Manifest - dropping non-existing column family");
5047 break;
5048 }
5049 auto builder_iter = builders.find(edit.column_family_);
5050 builders.erase(builder_iter);
5051 comparators.erase(edit.column_family_);
5052 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
5053 assert(cfd != nullptr);
5054 cfd->UnrefAndTryDelete();
5055 cfd = nullptr;
5056 } else {
5057 if (!cf_in_builders) {
5058 s = Status::Corruption(
5059 "Manifest record referencing unknown column family");
5060 break;
5061 }
5062
5063 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
5064 // this should never happen since cf_in_builders is true
5065 assert(cfd != nullptr);
5066
5067 // if it is not column family add or column family drop,
5068 // then it's a file add/delete, which should be forwarded
5069 // to builder
5070 auto builder = builders.find(edit.column_family_);
5071 assert(builder != builders.end());
5072 s = builder->second->version_builder()->Apply(&edit);
5073 if (!s.ok()) {
5074 break;
5075 }
5076 }
5077
5078 if (cfd != nullptr && edit.has_log_number_) {
5079 cfd->SetLogNumber(edit.log_number_);
5080 }
5081
5082
5083 if (edit.has_prev_log_number_) {
5084 previous_log_number = edit.prev_log_number_;
5085 have_prev_log_number = true;
5086 }
5087
5088 if (edit.has_next_file_number_) {
5089 next_file = edit.next_file_number_;
5090 have_next_file = true;
5091 }
5092
5093 if (edit.has_last_sequence_) {
5094 last_sequence = edit.last_sequence_;
5095 have_last_sequence = true;
5096 }
5097
5098 if (edit.has_max_column_family_) {
5099 column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
5100 }
5101
5102 if (edit.has_min_log_number_to_keep_) {
5103 MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
5104 }
5105 }
5106 }
5107 file_reader.reset();
5108
5109 if (s.ok()) {
5110 if (!have_next_file) {
5111 s = Status::Corruption("no meta-nextfile entry in descriptor");
5112 printf("no meta-nextfile entry in descriptor");
5113 } else if (!have_last_sequence) {
5114 printf("no last-sequence-number entry in descriptor");
5115 s = Status::Corruption("no last-sequence-number entry in descriptor");
5116 }
5117
5118 if (!have_prev_log_number) {
5119 previous_log_number = 0;
5120 }
5121 }
5122
5123 if (s.ok()) {
5124 for (auto cfd : *column_family_set_) {
5125 if (cfd->IsDropped()) {
5126 continue;
5127 }
5128 auto builders_iter = builders.find(cfd->GetID());
5129 assert(builders_iter != builders.end());
5130 auto builder = builders_iter->second->version_builder();
5131
5132 Version* v = new Version(cfd, this, file_options_,
5133 *cfd->GetLatestMutableCFOptions(),
5134 current_version_number_++);
5135 builder->SaveTo(v->storage_info());
5136 v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
5137
5138 printf("--------------- Column family \"%s\" (ID %" PRIu32
5139 ") --------------\n",
5140 cfd->GetName().c_str(), cfd->GetID());
5141 printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
5142 auto comparator = comparators.find(cfd->GetID());
5143 if (comparator != comparators.end()) {
5144 printf("comparator: %s\n", comparator->second.c_str());
5145 } else {
5146 printf("comparator: <NO COMPARATOR>\n");
5147 }
5148 printf("%s \n", v->DebugString(hex).c_str());
5149 delete v;
5150 }
5151
5152 next_file_number_.store(next_file + 1);
5153 last_allocated_sequence_ = last_sequence;
5154 last_published_sequence_ = last_sequence;
5155 last_sequence_ = last_sequence;
5156 prev_log_number_ = previous_log_number;
5157
5158 printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
5159 " prev_log_number %" PRIu64 " max_column_family %" PRIu32
5160 " min_log_number_to_keep "
5161 "%" PRIu64 "\n",
5162 next_file_number_.load(), last_sequence, previous_log_number,
5163 column_family_set_->GetMaxColumnFamily(),
5164 min_log_number_to_keep_2pc());
5165 }
5166
5167 return s;
5168 }
5169 #endif // ROCKSDB_LITE
5170
MarkFileNumberUsed(uint64_t number)5171 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5172 // only called during recovery and repair which are single threaded, so this
5173 // works because there can't be concurrent calls
5174 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5175 next_file_number_.store(number + 1, std::memory_order_relaxed);
5176 }
5177 }
5178 // Called only either from ::LogAndApply which is protected by mutex or during
5179 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)5180 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5181 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5182 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5183 }
5184 }
5185
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,log::Writer * log)5186 Status VersionSet::WriteCurrentStateToManifest(
5187 const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5188 log::Writer* log) {
5189 // TODO: Break up into multiple records to reduce memory usage on recovery?
5190
5191 // WARNING: This method doesn't hold a mutex!!
5192
5193 // This is done without DB mutex lock held, but only within single-threaded
5194 // LogAndApply. Column family manipulations can only happen within LogAndApply
5195 // (the same single thread), so we're safe to iterate.
5196
5197 if (db_options_->write_dbid_to_manifest) {
5198 VersionEdit edit_for_db_id;
5199 assert(!db_id_.empty());
5200 edit_for_db_id.SetDBId(db_id_);
5201 std::string db_id_record;
5202 if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5203 return Status::Corruption("Unable to Encode VersionEdit:" +
5204 edit_for_db_id.DebugString(true));
5205 }
5206 IOStatus io_s = log->AddRecord(db_id_record);
5207 if (!io_s.ok()) {
5208 io_status_ = io_s;
5209 return std::move(io_s);
5210 }
5211 }
5212
5213 for (auto cfd : *column_family_set_) {
5214 if (cfd->IsDropped()) {
5215 continue;
5216 }
5217 assert(cfd->initialized());
5218 {
5219 // Store column family info
5220 VersionEdit edit;
5221 if (cfd->GetID() != 0) {
5222 // default column family is always there,
5223 // no need to explicitly write it
5224 edit.AddColumnFamily(cfd->GetName());
5225 edit.SetColumnFamily(cfd->GetID());
5226 }
5227 edit.SetComparatorName(
5228 cfd->internal_comparator().user_comparator()->Name());
5229 std::string record;
5230 if (!edit.EncodeTo(&record)) {
5231 return Status::Corruption(
5232 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5233 }
5234 IOStatus io_s = log->AddRecord(record);
5235 if (!io_s.ok()) {
5236 io_status_ = io_s;
5237 return std::move(io_s);
5238 }
5239 }
5240
5241 {
5242 // Save files
5243 VersionEdit edit;
5244 edit.SetColumnFamily(cfd->GetID());
5245
5246 for (int level = 0; level < cfd->NumberLevels(); level++) {
5247 for (const auto& f :
5248 cfd->current()->storage_info()->LevelFiles(level)) {
5249 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5250 f->fd.GetFileSize(), f->smallest, f->largest,
5251 f->fd.smallest_seqno, f->fd.largest_seqno,
5252 f->marked_for_compaction, f->oldest_blob_file_number,
5253 f->oldest_ancester_time, f->file_creation_time,
5254 f->file_checksum, f->file_checksum_func_name);
5255 }
5256 }
5257 const auto iter = curr_state.find(cfd->GetID());
5258 assert(iter != curr_state.end());
5259 uint64_t log_number = iter->second.log_number;
5260 edit.SetLogNumber(log_number);
5261 std::string record;
5262 if (!edit.EncodeTo(&record)) {
5263 return Status::Corruption(
5264 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5265 }
5266 IOStatus io_s = log->AddRecord(record);
5267 if (!io_s.ok()) {
5268 io_status_ = io_s;
5269 return std::move(io_s);
5270 }
5271 }
5272 }
5273 return Status::OK();
5274 }
5275
5276 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5277 // function is called repeatedly with consecutive pairs of slices. For example
5278 // if the slice list is [a, b, c, d] this function is called with arguments
5279 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5280 // we avoid doing binary search for the keys b and c twice and instead somehow
5281 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5282 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5283 Version* v, const Slice& start,
5284 const Slice& end, int start_level,
5285 int end_level, TableReaderCaller caller) {
5286 const auto& icmp = v->cfd_->internal_comparator();
5287
5288 // pre-condition
5289 assert(icmp.Compare(start, end) <= 0);
5290
5291 uint64_t total_full_size = 0;
5292 const auto* vstorage = v->storage_info();
5293 const int num_non_empty_levels = vstorage->num_non_empty_levels();
5294 end_level = (end_level == -1) ? num_non_empty_levels
5295 : std::min(end_level, num_non_empty_levels);
5296
5297 assert(start_level <= end_level);
5298
5299 // Outline of the optimization that uses options.files_size_error_margin.
5300 // When approximating the files total size that is used to store a keys range,
5301 // we first sum up the sizes of the files that fully fall into the range.
5302 // Then we sum up the sizes of all the files that may intersect with the range
5303 // (this includes all files in L0 as well). Then, if total_intersecting_size
5304 // is smaller than total_full_size * options.files_size_error_margin - we can
5305 // infer that the intersecting files have a sufficiently negligible
5306 // contribution to the total size, and we can approximate the storage required
5307 // for the keys in range as just half of the intersecting_files_size.
5308 // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5309 // approximation is limited to only ~10% of the total size of files that fully
5310 // fall into the keys range. In such case, this helps to avoid a costly
5311 // process of binary searching the intersecting files that is required only
5312 // for a more precise calculation of the total size.
5313
5314 autovector<FdWithKeyRange*, 32> first_files;
5315 autovector<FdWithKeyRange*, 16> last_files;
5316
5317 // scan all the levels
5318 for (int level = start_level; level < end_level; ++level) {
5319 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5320 if (files_brief.num_files == 0) {
5321 // empty level, skip exploration
5322 continue;
5323 }
5324
5325 if (level == 0) {
5326 // level 0 files are not in sorted order, we need to iterate through
5327 // the list to compute the total bytes that require scanning,
5328 // so handle the case explicitly (similarly to first_files case)
5329 for (size_t i = 0; i < files_brief.num_files; i++) {
5330 first_files.push_back(&files_brief.files[i]);
5331 }
5332 continue;
5333 }
5334
5335 assert(level > 0);
5336 assert(files_brief.num_files > 0);
5337
5338 // identify the file position for start key
5339 const int idx_start =
5340 FindFileInRange(icmp, files_brief, start, 0,
5341 static_cast<uint32_t>(files_brief.num_files - 1));
5342 assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5343
5344 // identify the file position for end key
5345 int idx_end = idx_start;
5346 if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5347 idx_end =
5348 FindFileInRange(icmp, files_brief, end, idx_start,
5349 static_cast<uint32_t>(files_brief.num_files - 1));
5350 }
5351 assert(idx_end >= idx_start &&
5352 static_cast<size_t>(idx_end) < files_brief.num_files);
5353
5354 // scan all files from the starting index to the ending index
5355 // (inferred from the sorted order)
5356
5357 // first scan all the intermediate full files (excluding first and last)
5358 for (int i = idx_start + 1; i < idx_end; ++i) {
5359 uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5360 // The entire file falls into the range, so we can just take its size.
5361 assert(file_size ==
5362 ApproximateSize(v, files_brief.files[i], start, end, caller));
5363 total_full_size += file_size;
5364 }
5365
5366 // save the first and the last files (which may be the same file), so we
5367 // can scan them later.
5368 first_files.push_back(&files_brief.files[idx_start]);
5369 if (idx_start != idx_end) {
5370 // we need to estimate size for both files, only if they are different
5371 last_files.push_back(&files_brief.files[idx_end]);
5372 }
5373 }
5374
5375 // The sum of all file sizes that intersect the [start, end] keys range.
5376 uint64_t total_intersecting_size = 0;
5377 for (const auto* file_ptr : first_files) {
5378 total_intersecting_size += file_ptr->fd.GetFileSize();
5379 }
5380 for (const auto* file_ptr : last_files) {
5381 total_intersecting_size += file_ptr->fd.GetFileSize();
5382 }
5383
5384 // Now scan all the first & last files at each level, and estimate their size.
5385 // If the total_intersecting_size is less than X% of the total_full_size - we
5386 // want to approximate the result in order to avoid the costly binary search
5387 // inside ApproximateSize. We use half of file size as an approximation below.
5388
5389 const double margin = options.files_size_error_margin;
5390 if (margin > 0 && total_intersecting_size <
5391 static_cast<uint64_t>(total_full_size * margin)) {
5392 total_full_size += total_intersecting_size / 2;
5393 } else {
5394 // Estimate for all the first files, at each level
5395 for (const auto file_ptr : first_files) {
5396 total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5397 }
5398
5399 // Estimate for all the last files, at each level
5400 for (const auto file_ptr : last_files) {
5401 // We could use ApproximateSize here, but calling ApproximateOffsetOf
5402 // directly is just more efficient.
5403 total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5404 }
5405 }
5406
5407 return total_full_size;
5408 }
5409
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5410 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5411 const Slice& key,
5412 TableReaderCaller caller) {
5413 // pre-condition
5414 assert(v);
5415 const auto& icmp = v->cfd_->internal_comparator();
5416
5417 uint64_t result = 0;
5418 if (icmp.Compare(f.largest_key, key) <= 0) {
5419 // Entire file is before "key", so just add the file size
5420 result = f.fd.GetFileSize();
5421 } else if (icmp.Compare(f.smallest_key, key) > 0) {
5422 // Entire file is after "key", so ignore
5423 result = 0;
5424 } else {
5425 // "key" falls in the range for this table. Add the
5426 // approximate offset of "key" within the table.
5427 TableCache* table_cache = v->cfd_->table_cache();
5428 if (table_cache != nullptr) {
5429 result = table_cache->ApproximateOffsetOf(
5430 key, f.file_metadata->fd, caller, icmp,
5431 v->GetMutableCFOptions().prefix_extractor.get());
5432 }
5433 }
5434 return result;
5435 }
5436
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5437 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5438 const Slice& start, const Slice& end,
5439 TableReaderCaller caller) {
5440 // pre-condition
5441 assert(v);
5442 const auto& icmp = v->cfd_->internal_comparator();
5443 assert(icmp.Compare(start, end) <= 0);
5444
5445 if (icmp.Compare(f.largest_key, start) <= 0 ||
5446 icmp.Compare(f.smallest_key, end) > 0) {
5447 // Entire file is before or after the start/end keys range
5448 return 0;
5449 }
5450
5451 if (icmp.Compare(f.smallest_key, start) >= 0) {
5452 // Start of the range is before the file start - approximate by end offset
5453 return ApproximateOffsetOf(v, f, end, caller);
5454 }
5455
5456 if (icmp.Compare(f.largest_key, end) < 0) {
5457 // End of the range is after the file end - approximate by subtracting
5458 // start offset from the file size
5459 uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5460 assert(f.fd.GetFileSize() >= start_offset);
5461 return f.fd.GetFileSize() - start_offset;
5462 }
5463
5464 // The interval falls entirely in the range for this file.
5465 TableCache* table_cache = v->cfd_->table_cache();
5466 if (table_cache == nullptr) {
5467 return 0;
5468 }
5469 return table_cache->ApproximateSize(
5470 start, end, f.file_metadata->fd, caller, icmp,
5471 v->GetMutableCFOptions().prefix_extractor.get());
5472 }
5473
AddLiveFiles(std::vector<FileDescriptor> * live_list)5474 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
5475 // pre-calculate space requirement
5476 int64_t total_files = 0;
5477 for (auto cfd : *column_family_set_) {
5478 if (!cfd->initialized()) {
5479 continue;
5480 }
5481 Version* dummy_versions = cfd->dummy_versions();
5482 for (Version* v = dummy_versions->next_; v != dummy_versions;
5483 v = v->next_) {
5484 const auto* vstorage = v->storage_info();
5485 for (int level = 0; level < vstorage->num_levels(); level++) {
5486 total_files += vstorage->LevelFiles(level).size();
5487 }
5488 }
5489 }
5490
5491 // just one time extension to the right size
5492 live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
5493
5494 for (auto cfd : *column_family_set_) {
5495 if (!cfd->initialized()) {
5496 continue;
5497 }
5498 auto* current = cfd->current();
5499 bool found_current = false;
5500 Version* dummy_versions = cfd->dummy_versions();
5501 for (Version* v = dummy_versions->next_; v != dummy_versions;
5502 v = v->next_) {
5503 v->AddLiveFiles(live_list);
5504 if (v == current) {
5505 found_current = true;
5506 }
5507 }
5508 if (!found_current && current != nullptr) {
5509 // Should never happen unless it is a bug.
5510 assert(false);
5511 current->AddLiveFiles(live_list);
5512 }
5513 }
5514 }
5515
MakeInputIterator(const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5516 InternalIterator* VersionSet::MakeInputIterator(
5517 const Compaction* c, RangeDelAggregator* range_del_agg,
5518 const FileOptions& file_options_compactions) {
5519 auto cfd = c->column_family_data();
5520 ReadOptions read_options;
5521 read_options.verify_checksums = true;
5522 read_options.fill_cache = false;
5523 // Compaction iterators shouldn't be confined to a single prefix.
5524 // Compactions use Seek() for
5525 // (a) concurrent compactions,
5526 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
5527 read_options.total_order_seek = true;
5528
5529 // Level-0 files have to be merged together. For other levels,
5530 // we will make a concatenating iterator per level.
5531 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5532 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5533 c->num_input_levels() - 1
5534 : c->num_input_levels());
5535 InternalIterator** list = new InternalIterator* [space];
5536 size_t num = 0;
5537 for (size_t which = 0; which < c->num_input_levels(); which++) {
5538 if (c->input_levels(which)->num_files != 0) {
5539 if (c->level(which) == 0) {
5540 const LevelFilesBrief* flevel = c->input_levels(which);
5541 for (size_t i = 0; i < flevel->num_files; i++) {
5542 list[num++] = cfd->table_cache()->NewIterator(
5543 read_options, file_options_compactions,
5544 cfd->internal_comparator(),
5545 *flevel->files[i].file_metadata, range_del_agg,
5546 c->mutable_cf_options()->prefix_extractor.get(),
5547 /*table_reader_ptr=*/nullptr,
5548 /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5549 /*arena=*/nullptr,
5550 /*skip_filters=*/false, /*level=*/static_cast<int>(which),
5551 /*smallest_compaction_key=*/nullptr,
5552 /*largest_compaction_key=*/nullptr);
5553 }
5554 } else {
5555 // Create concatenating iterator for the files from this level
5556 list[num++] = new LevelIterator(
5557 cfd->table_cache(), read_options, file_options_compactions,
5558 cfd->internal_comparator(), c->input_levels(which),
5559 c->mutable_cf_options()->prefix_extractor.get(),
5560 /*should_sample=*/false,
5561 /*no per level latency histogram=*/nullptr,
5562 TableReaderCaller::kCompaction, /*skip_filters=*/false,
5563 /*level=*/static_cast<int>(which), range_del_agg,
5564 c->boundaries(which));
5565 }
5566 }
5567 }
5568 assert(num <= space);
5569 InternalIterator* result =
5570 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5571 static_cast<int>(num));
5572 delete[] list;
5573 return result;
5574 }
5575
5576 // verify that the files listed in this compaction are present
5577 // in the current version
VerifyCompactionFileConsistency(Compaction * c)5578 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5579 #ifndef NDEBUG
5580 Version* version = c->column_family_data()->current();
5581 const VersionStorageInfo* vstorage = version->storage_info();
5582 if (c->input_version() != version) {
5583 ROCKS_LOG_INFO(
5584 db_options_->info_log,
5585 "[%s] compaction output being applied to a different base version from"
5586 " input version",
5587 c->column_family_data()->GetName().c_str());
5588
5589 if (vstorage->compaction_style_ == kCompactionStyleLevel &&
5590 c->start_level() == 0 && c->num_input_levels() > 2U) {
5591 // We are doing a L0->base_level compaction. The assumption is if
5592 // base level is not L1, levels from L1 to base_level - 1 is empty.
5593 // This is ensured by having one compaction from L0 going on at the
5594 // same time in level-based compaction. So that during the time, no
5595 // compaction/flush can put files to those levels.
5596 for (int l = c->start_level() + 1; l < c->output_level(); l++) {
5597 if (vstorage->NumLevelFiles(l) != 0) {
5598 return false;
5599 }
5600 }
5601 }
5602 }
5603
5604 for (size_t input = 0; input < c->num_input_levels(); ++input) {
5605 int level = c->level(input);
5606 for (size_t i = 0; i < c->num_input_files(input); ++i) {
5607 uint64_t number = c->input(input, i)->fd.GetNumber();
5608 bool found = false;
5609 for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5610 FileMetaData* f = vstorage->files_[level][j];
5611 if (f->fd.GetNumber() == number) {
5612 found = true;
5613 break;
5614 }
5615 }
5616 if (!found) {
5617 return false; // input files non existent in current version
5618 }
5619 }
5620 }
5621 #else
5622 (void)c;
5623 #endif
5624 return true; // everything good
5625 }
5626
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5627 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5628 FileMetaData** meta,
5629 ColumnFamilyData** cfd) {
5630 for (auto cfd_iter : *column_family_set_) {
5631 if (!cfd_iter->initialized()) {
5632 continue;
5633 }
5634 Version* version = cfd_iter->current();
5635 const auto* vstorage = version->storage_info();
5636 for (int level = 0; level < vstorage->num_levels(); level++) {
5637 for (const auto& file : vstorage->LevelFiles(level)) {
5638 if (file->fd.GetNumber() == number) {
5639 *meta = file;
5640 *filelevel = level;
5641 *cfd = cfd_iter;
5642 return Status::OK();
5643 }
5644 }
5645 }
5646 }
5647 return Status::NotFound("File not present in any level");
5648 }
5649
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5650 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5651 for (auto cfd : *column_family_set_) {
5652 if (cfd->IsDropped() || !cfd->initialized()) {
5653 continue;
5654 }
5655 for (int level = 0; level < cfd->NumberLevels(); level++) {
5656 for (const auto& file :
5657 cfd->current()->storage_info()->LevelFiles(level)) {
5658 LiveFileMetaData filemetadata;
5659 filemetadata.column_family_name = cfd->GetName();
5660 uint32_t path_id = file->fd.GetPathId();
5661 if (path_id < cfd->ioptions()->cf_paths.size()) {
5662 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5663 } else {
5664 assert(!cfd->ioptions()->cf_paths.empty());
5665 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5666 }
5667 const uint64_t file_number = file->fd.GetNumber();
5668 filemetadata.name = MakeTableFileName("", file_number);
5669 filemetadata.file_number = file_number;
5670 filemetadata.level = level;
5671 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5672 filemetadata.smallestkey = file->smallest.user_key().ToString();
5673 filemetadata.largestkey = file->largest.user_key().ToString();
5674 filemetadata.smallest_seqno = file->fd.smallest_seqno;
5675 filemetadata.largest_seqno = file->fd.largest_seqno;
5676 filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5677 std::memory_order_relaxed);
5678 filemetadata.being_compacted = file->being_compacted;
5679 filemetadata.num_entries = file->num_entries;
5680 filemetadata.num_deletions = file->num_deletions;
5681 filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5682 filemetadata.file_checksum = file->file_checksum;
5683 filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5684 metadata->push_back(filemetadata);
5685 }
5686 }
5687 }
5688 }
5689
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5690 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5691 std::vector<std::string>* manifest_filenames,
5692 uint64_t min_pending_output) {
5693 assert(manifest_filenames->empty());
5694 obsolete_manifests_.swap(*manifest_filenames);
5695 std::vector<ObsoleteFileInfo> pending_files;
5696 for (auto& f : obsolete_files_) {
5697 if (f.metadata->fd.GetNumber() < min_pending_output) {
5698 files->push_back(std::move(f));
5699 } else {
5700 pending_files.push_back(std::move(f));
5701 }
5702 }
5703 obsolete_files_.swap(pending_files);
5704 }
5705
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const VersionEdit * edit)5706 ColumnFamilyData* VersionSet::CreateColumnFamily(
5707 const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5708 assert(edit->is_column_family_add_);
5709
5710 MutableCFOptions dummy_cf_options;
5711 Version* dummy_versions =
5712 new Version(nullptr, this, file_options_, dummy_cf_options);
5713 // Ref() dummy version once so that later we can call Unref() to delete it
5714 // by avoiding calling "delete" explicitly (~Version is private)
5715 dummy_versions->Ref();
5716 auto new_cfd = column_family_set_->CreateColumnFamily(
5717 edit->column_family_name_, edit->column_family_, dummy_versions,
5718 cf_options);
5719
5720 Version* v = new Version(new_cfd, this, file_options_,
5721 *new_cfd->GetLatestMutableCFOptions(),
5722 current_version_number_++);
5723
5724 // Fill level target base information.
5725 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5726 *new_cfd->GetLatestMutableCFOptions());
5727 AppendVersion(new_cfd, v);
5728 // GetLatestMutableCFOptions() is safe here without mutex since the
5729 // cfd is not available to client
5730 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5731 LastSequence());
5732 new_cfd->SetLogNumber(edit->log_number_);
5733 return new_cfd;
5734 }
5735
GetNumLiveVersions(Version * dummy_versions)5736 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5737 uint64_t count = 0;
5738 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5739 count++;
5740 }
5741 return count;
5742 }
5743
GetTotalSstFilesSize(Version * dummy_versions)5744 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5745 std::unordered_set<uint64_t> unique_files;
5746 uint64_t total_files_size = 0;
5747 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5748 VersionStorageInfo* storage_info = v->storage_info();
5749 for (int level = 0; level < storage_info->num_levels_; level++) {
5750 for (const auto& file_meta : storage_info->LevelFiles(level)) {
5751 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5752 unique_files.end()) {
5753 unique_files.insert(file_meta->fd.packed_number_and_path_id);
5754 total_files_size += file_meta->fd.GetFileSize();
5755 }
5756 }
5757 }
5758 }
5759 return total_files_size;
5760 }
5761
VerifyFileMetadata(const std::string & fpath,const FileMetaData & meta) const5762 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5763 const FileMetaData& meta) const {
5764 uint64_t fsize = 0;
5765 Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5766 if (status.ok()) {
5767 if (fsize != meta.fd.GetFileSize()) {
5768 status = Status::Corruption("File size mismatch: " + fpath);
5769 }
5770 }
5771 return status;
5772 }
5773
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller)5774 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
5775 const ImmutableDBOptions* _db_options,
5776 const FileOptions& _file_options,
5777 Cache* table_cache,
5778 WriteBufferManager* write_buffer_manager,
5779 WriteController* write_controller)
5780 : VersionSet(dbname, _db_options, _file_options, table_cache,
5781 write_buffer_manager, write_controller,
5782 /*block_cache_tracer=*/nullptr),
5783 number_of_edits_to_skip_(0) {}
5784
~ReactiveVersionSet()5785 ReactiveVersionSet::~ReactiveVersionSet() {}
5786
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5787 Status ReactiveVersionSet::Recover(
5788 const std::vector<ColumnFamilyDescriptor>& column_families,
5789 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5790 std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5791 std::unique_ptr<Status>* manifest_reader_status) {
5792 assert(manifest_reader != nullptr);
5793 assert(manifest_reporter != nullptr);
5794 assert(manifest_reader_status != nullptr);
5795
5796 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
5797 for (const auto& cf : column_families) {
5798 cf_name_to_options.insert({cf.name, cf.options});
5799 }
5800
5801 // add default column family
5802 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
5803 if (default_cf_iter == cf_name_to_options.end()) {
5804 return Status::InvalidArgument("Default column family not specified");
5805 }
5806 VersionEdit default_cf_edit;
5807 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
5808 default_cf_edit.SetColumnFamily(0);
5809 ColumnFamilyData* default_cfd =
5810 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
5811 // In recovery, nobody else can access it, so it's fine to set it to be
5812 // initialized earlier.
5813 default_cfd->set_initialized();
5814 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
5815 builders;
5816 std::unordered_map<int, std::string> column_families_not_found;
5817 builders.insert(
5818 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
5819 new BaseReferencedVersionBuilder(default_cfd))));
5820
5821 manifest_reader_status->reset(new Status());
5822 manifest_reporter->reset(new LogReporter());
5823 static_cast<LogReporter*>(manifest_reporter->get())->status =
5824 manifest_reader_status->get();
5825 Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5826 log::Reader* reader = manifest_reader->get();
5827
5828 int retry = 0;
5829 VersionEdit version_edit;
5830 while (s.ok() && retry < 1) {
5831 assert(reader != nullptr);
5832 Slice record;
5833 std::string scratch;
5834 s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
5835 column_families_not_found, builders, &version_edit);
5836 if (s.ok()) {
5837 bool enough = version_edit.has_next_file_number_ &&
5838 version_edit.has_log_number_ &&
5839 version_edit.has_last_sequence_;
5840 if (enough) {
5841 for (const auto& cf : column_families) {
5842 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5843 if (cfd == nullptr) {
5844 enough = false;
5845 break;
5846 }
5847 }
5848 }
5849 if (enough) {
5850 for (const auto& cf : column_families) {
5851 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5852 assert(cfd != nullptr);
5853 if (!cfd->IsDropped()) {
5854 auto builder_iter = builders.find(cfd->GetID());
5855 assert(builder_iter != builders.end());
5856 auto builder = builder_iter->second->version_builder();
5857 assert(builder != nullptr);
5858 s = builder->LoadTableHandlers(
5859 cfd->internal_stats(), db_options_->max_file_opening_threads,
5860 false /* prefetch_index_and_filter_in_cache */,
5861 true /* is_initial_load */,
5862 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5863 if (!s.ok()) {
5864 enough = false;
5865 if (s.IsPathNotFound()) {
5866 s = Status::OK();
5867 }
5868 break;
5869 }
5870 }
5871 }
5872 }
5873 if (enough) {
5874 break;
5875 }
5876 }
5877 ++retry;
5878 }
5879
5880 if (s.ok()) {
5881 if (!version_edit.has_prev_log_number_) {
5882 version_edit.prev_log_number_ = 0;
5883 }
5884 column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5885
5886 MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
5887 MarkFileNumberUsed(version_edit.prev_log_number_);
5888 MarkFileNumberUsed(version_edit.log_number_);
5889
5890 for (auto cfd : *column_family_set_) {
5891 assert(builders.count(cfd->GetID()) > 0);
5892 auto builder = builders[cfd->GetID()]->version_builder();
5893 if (!builder->CheckConsistencyForNumLevels()) {
5894 s = Status::InvalidArgument(
5895 "db has more levels than options.num_levels");
5896 break;
5897 }
5898 }
5899 }
5900
5901 if (s.ok()) {
5902 for (auto cfd : *column_family_set_) {
5903 if (cfd->IsDropped()) {
5904 continue;
5905 }
5906 assert(cfd->initialized());
5907 auto builders_iter = builders.find(cfd->GetID());
5908 assert(builders_iter != builders.end());
5909 auto* builder = builders_iter->second->version_builder();
5910
5911 Version* v = new Version(cfd, this, file_options_,
5912 *cfd->GetLatestMutableCFOptions(),
5913 current_version_number_++);
5914 builder->SaveTo(v->storage_info());
5915
5916 // Install recovered version
5917 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
5918 !(db_options_->skip_stats_update_on_db_open));
5919 AppendVersion(cfd, v);
5920 }
5921 next_file_number_.store(version_edit.next_file_number_ + 1);
5922 last_allocated_sequence_ = version_edit.last_sequence_;
5923 last_published_sequence_ = version_edit.last_sequence_;
5924 last_sequence_ = version_edit.last_sequence_;
5925 prev_log_number_ = version_edit.prev_log_number_;
5926 for (auto cfd : *column_family_set_) {
5927 if (cfd->IsDropped()) {
5928 continue;
5929 }
5930 ROCKS_LOG_INFO(db_options_->info_log,
5931 "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
5932 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
5933 }
5934 }
5935 return s;
5936 }
5937
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unordered_set<ColumnFamilyData * > * cfds_changed)5938 Status ReactiveVersionSet::ReadAndApply(
5939 InstrumentedMutex* mu,
5940 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5941 std::unordered_set<ColumnFamilyData*>* cfds_changed) {
5942 assert(manifest_reader != nullptr);
5943 assert(cfds_changed != nullptr);
5944 mu->AssertHeld();
5945
5946 Status s;
5947 uint64_t applied_edits = 0;
5948 while (s.ok()) {
5949 Slice record;
5950 std::string scratch;
5951 log::Reader* reader = manifest_reader->get();
5952 std::string old_manifest_path = reader->file()->file_name();
5953 while (reader->ReadRecord(&record, &scratch)) {
5954 VersionEdit edit;
5955 s = edit.DecodeFrom(record);
5956 if (!s.ok()) {
5957 break;
5958 }
5959
5960 // Skip the first VersionEdits of each MANIFEST generated by
5961 // VersionSet::WriteCurrentStatetoManifest.
5962 if (number_of_edits_to_skip_ > 0) {
5963 ColumnFamilyData* cfd =
5964 column_family_set_->GetColumnFamily(edit.column_family_);
5965 if (cfd != nullptr && !cfd->IsDropped()) {
5966 --number_of_edits_to_skip_;
5967 }
5968 continue;
5969 }
5970
5971 s = read_buffer_.AddEdit(&edit);
5972 if (!s.ok()) {
5973 break;
5974 }
5975 VersionEdit temp_edit;
5976 if (edit.is_in_atomic_group_) {
5977 if (read_buffer_.IsFull()) {
5978 // Apply edits in an atomic group when we have read all edits in the
5979 // group.
5980 for (auto& e : read_buffer_.replay_buffer()) {
5981 s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
5982 if (!s.ok()) {
5983 break;
5984 }
5985 applied_edits++;
5986 }
5987 if (!s.ok()) {
5988 break;
5989 }
5990 read_buffer_.Clear();
5991 }
5992 } else {
5993 // Apply a normal edit immediately.
5994 s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
5995 if (s.ok()) {
5996 applied_edits++;
5997 }
5998 }
5999 }
6000 if (!s.ok()) {
6001 // Clear the buffer if we fail to decode/apply an edit.
6002 read_buffer_.Clear();
6003 }
6004 // It's possible that:
6005 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
6006 // 2) we have finished reading the current MANIFEST.
6007 // 3) we have encountered an IOError reading the current MANIFEST.
6008 // We need to look for the next MANIFEST and start from there. If we cannot
6009 // find the next MANIFEST, we should exit the loop.
6010 s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
6011 reader = manifest_reader->get();
6012 if (s.ok()) {
6013 if (reader->file()->file_name() == old_manifest_path) {
6014 // Still processing the same MANIFEST, thus no need to continue this
6015 // loop since no record is available if we have reached here.
6016 break;
6017 } else {
6018 // We have switched to a new MANIFEST whose first records have been
6019 // generated by VersionSet::WriteCurrentStatetoManifest. Since the
6020 // secondary instance has already finished recovering upon start, there
6021 // is no need for the secondary to process these records. Actually, if
6022 // the secondary were to replay these records, the secondary may end up
6023 // adding the same SST files AGAIN to each column family, causing
6024 // consistency checks done by VersionBuilder to fail. Therefore, we
6025 // record the number of records to skip at the beginning of the new
6026 // MANIFEST and ignore them.
6027 number_of_edits_to_skip_ = 0;
6028 for (auto* cfd : *column_family_set_) {
6029 if (cfd->IsDropped()) {
6030 continue;
6031 }
6032 // Increase number_of_edits_to_skip by 2 because
6033 // WriteCurrentStatetoManifest() writes 2 version edits for each
6034 // column family at the beginning of the newly-generated MANIFEST.
6035 // TODO(yanqin) remove hard-coded value.
6036 if (db_options_->write_dbid_to_manifest) {
6037 number_of_edits_to_skip_ += 3;
6038 } else {
6039 number_of_edits_to_skip_ += 2;
6040 }
6041 }
6042 }
6043 }
6044 }
6045
6046 if (s.ok()) {
6047 for (auto cfd : *column_family_set_) {
6048 auto builder_iter = active_version_builders_.find(cfd->GetID());
6049 if (builder_iter == active_version_builders_.end()) {
6050 continue;
6051 }
6052 auto builder = builder_iter->second->version_builder();
6053 if (!builder->CheckConsistencyForNumLevels()) {
6054 s = Status::InvalidArgument(
6055 "db has more levels than options.num_levels");
6056 break;
6057 }
6058 }
6059 }
6060 TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
6061 &applied_edits);
6062 return s;
6063 }
6064
ApplyOneVersionEditToBuilder(VersionEdit & edit,std::unordered_set<ColumnFamilyData * > * cfds_changed,VersionEdit * version_edit)6065 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
6066 VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
6067 VersionEdit* version_edit) {
6068 ColumnFamilyData* cfd =
6069 column_family_set_->GetColumnFamily(edit.column_family_);
6070
6071 // If we cannot find this column family in our column family set, then it
6072 // may be a new column family created by the primary after the secondary
6073 // starts. It is also possible that the secondary instance opens only a subset
6074 // of column families. Ignore it for now.
6075 if (nullptr == cfd) {
6076 return Status::OK();
6077 }
6078 if (active_version_builders_.find(edit.column_family_) ==
6079 active_version_builders_.end() &&
6080 !cfd->IsDropped()) {
6081 std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
6082 new BaseReferencedVersionBuilder(cfd));
6083 active_version_builders_.insert(
6084 std::make_pair(edit.column_family_, std::move(builder_guard)));
6085 }
6086
6087 auto builder_iter = active_version_builders_.find(edit.column_family_);
6088 assert(builder_iter != active_version_builders_.end());
6089 auto builder = builder_iter->second->version_builder();
6090 assert(builder != nullptr);
6091
6092 if (edit.is_column_family_add_) {
6093 // TODO (yanqin) for now the secondary ignores column families created
6094 // after Open. This also simplifies handling of switching to a new MANIFEST
6095 // and processing the snapshot of the system at the beginning of the
6096 // MANIFEST.
6097 } else if (edit.is_column_family_drop_) {
6098 // Drop the column family by setting it to be 'dropped' without destroying
6099 // the column family handle.
6100 // TODO (haoyu) figure out how to handle column faimly drop for
6101 // secondary instance. (Is it possible that the ref count for cfd is 0 but
6102 // the ref count for its versions is higher than 0?)
6103 cfd->SetDropped();
6104 if (cfd->UnrefAndTryDelete()) {
6105 cfd = nullptr;
6106 }
6107 active_version_builders_.erase(builder_iter);
6108 } else {
6109 Status s = builder->Apply(&edit);
6110 if (!s.ok()) {
6111 return s;
6112 }
6113 }
6114 Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
6115 if (!s.ok()) {
6116 return s;
6117 }
6118
6119 if (cfd != nullptr && !cfd->IsDropped()) {
6120 s = builder->LoadTableHandlers(
6121 cfd->internal_stats(), db_options_->max_file_opening_threads,
6122 false /* prefetch_index_and_filter_in_cache */,
6123 false /* is_initial_load */,
6124 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
6125 TEST_SYNC_POINT_CALLBACK(
6126 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
6127 "AfterLoadTableHandlers",
6128 &s);
6129
6130 if (s.ok()) {
6131 auto version = new Version(cfd, this, file_options_,
6132 *cfd->GetLatestMutableCFOptions(),
6133 current_version_number_++);
6134 builder->SaveTo(version->storage_info());
6135 version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
6136 AppendVersion(cfd, version);
6137 active_version_builders_.erase(builder_iter);
6138 if (cfds_changed->count(cfd) == 0) {
6139 cfds_changed->insert(cfd);
6140 }
6141 } else if (s.IsPathNotFound()) {
6142 s = Status::OK();
6143 }
6144 // Some other error has occurred during LoadTableHandlers.
6145 }
6146
6147 if (version_edit->HasNextFile()) {
6148 next_file_number_.store(version_edit->next_file_number_ + 1);
6149 }
6150 if (version_edit->has_last_sequence_) {
6151 last_allocated_sequence_ = version_edit->last_sequence_;
6152 last_published_sequence_ = version_edit->last_sequence_;
6153 last_sequence_ = version_edit->last_sequence_;
6154 }
6155 if (version_edit->has_prev_log_number_) {
6156 prev_log_number_ = version_edit->prev_log_number_;
6157 MarkFileNumberUsed(version_edit->prev_log_number_);
6158 }
6159 if (version_edit->has_log_number_) {
6160 MarkFileNumberUsed(version_edit->log_number_);
6161 }
6162 column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
6163 MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
6164 return s;
6165 }
6166
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)6167 Status ReactiveVersionSet::MaybeSwitchManifest(
6168 log::Reader::Reporter* reporter,
6169 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
6170 assert(manifest_reader != nullptr);
6171 Status s;
6172 do {
6173 std::string manifest_path;
6174 s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
6175 &manifest_file_number_);
6176 std::unique_ptr<FSSequentialFile> manifest_file;
6177 if (s.ok()) {
6178 if (nullptr == manifest_reader->get() ||
6179 manifest_reader->get()->file()->file_name() != manifest_path) {
6180 TEST_SYNC_POINT(
6181 "ReactiveVersionSet::MaybeSwitchManifest:"
6182 "AfterGetCurrentManifestPath:0");
6183 TEST_SYNC_POINT(
6184 "ReactiveVersionSet::MaybeSwitchManifest:"
6185 "AfterGetCurrentManifestPath:1");
6186 s = fs_->NewSequentialFile(manifest_path,
6187 env_->OptimizeForManifestRead(file_options_),
6188 &manifest_file, nullptr);
6189 } else {
6190 // No need to switch manifest.
6191 break;
6192 }
6193 }
6194 std::unique_ptr<SequentialFileReader> manifest_file_reader;
6195 if (s.ok()) {
6196 manifest_file_reader.reset(
6197 new SequentialFileReader(std::move(manifest_file), manifest_path,
6198 db_options_->log_readahead_size));
6199 manifest_reader->reset(new log::FragmentBufferedReader(
6200 nullptr, std::move(manifest_file_reader), reporter,
6201 true /* checksum */, 0 /* log_number */));
6202 ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
6203 manifest_path.c_str());
6204 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
6205 // active_version_builders_ map because we choose to construct the
6206 // versions from scratch, thanks to the first part of each MANIFEST
6207 // written by VersionSet::WriteCurrentStatetoManifest. This is not
6208 // necessary, but we choose this at present for the sake of simplicity.
6209 active_version_builders_.clear();
6210 }
6211 } while (s.IsPathNotFound());
6212 return s;
6213 }
6214
6215 } // namespace ROCKSDB_NAMESPACE
6216