1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_builder.h"
11
12 #include <algorithm>
13 #include <atomic>
14 #include <cinttypes>
15 #include <functional>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <sstream>
20 #include <thread>
21 #include <unordered_map>
22 #include <unordered_set>
23 #include <utility>
24 #include <vector>
25
26 #include "db/blob/blob_file_meta.h"
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
33 #include "util/string_util.h"
34
35 namespace ROCKSDB_NAMESPACE {
36
NewestFirstBySeqNo(FileMetaData * a,FileMetaData * b)37 bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
38 if (a->fd.largest_seqno != b->fd.largest_seqno) {
39 return a->fd.largest_seqno > b->fd.largest_seqno;
40 }
41 if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
42 return a->fd.smallest_seqno > b->fd.smallest_seqno;
43 }
44 // Break ties by file number
45 return a->fd.GetNumber() > b->fd.GetNumber();
46 }
47
48 namespace {
BySmallestKey(FileMetaData * a,FileMetaData * b,const InternalKeyComparator * cmp)49 bool BySmallestKey(FileMetaData* a, FileMetaData* b,
50 const InternalKeyComparator* cmp) {
51 int r = cmp->Compare(a->smallest, b->smallest);
52 if (r != 0) {
53 return (r < 0);
54 }
55 // Break ties by file number
56 return (a->fd.GetNumber() < b->fd.GetNumber());
57 }
58 } // namespace
59
60 class VersionBuilder::Rep {
61 private:
62 // Helper to sort files_ in v
63 // kLevel0 -- NewestFirstBySeqNo
64 // kLevelNon0 -- BySmallestKey
65 struct FileComparator {
66 enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
67 const InternalKeyComparator* internal_comparator;
68
FileComparatorROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator69 FileComparator() : internal_comparator(nullptr) {}
70
operator ()ROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator71 bool operator()(FileMetaData* f1, FileMetaData* f2) const {
72 switch (sort_method) {
73 case kLevel0:
74 return NewestFirstBySeqNo(f1, f2);
75 case kLevelNon0:
76 return BySmallestKey(f1, f2, internal_comparator);
77 }
78 assert(false);
79 return false;
80 }
81 };
82
83 struct LevelState {
84 std::unordered_set<uint64_t> deleted_files;
85 // Map from file number to file meta data.
86 std::unordered_map<uint64_t, FileMetaData*> added_files;
87 };
88
89 const FileOptions& file_options_;
90 Logger* info_log_;
91 TableCache* table_cache_;
92 VersionStorageInfo* base_vstorage_;
93 int num_levels_;
94 LevelState* levels_;
95 // Store states of levels larger than num_levels_. We do this instead of
96 // storing them in levels_ to avoid regression in case there are no files
97 // on invalid levels. The version is not consistent if in the end the files
98 // on invalid levels don't cancel out.
99 std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
100 // Whether there are invalid new files or invalid deletion on levels larger
101 // than num_levels_.
102 bool has_invalid_levels_;
103 FileComparator level_zero_cmp_;
104 FileComparator level_nonzero_cmp_;
105
106 // Metadata for all blob files affected by the series of version edits.
107 std::map<uint64_t, std::shared_ptr<BlobFileMetaData>> changed_blob_files_;
108
109 public:
Rep(const FileOptions & file_options,Logger * info_log,TableCache * table_cache,VersionStorageInfo * base_vstorage)110 Rep(const FileOptions& file_options, Logger* info_log,
111 TableCache* table_cache,
112 VersionStorageInfo* base_vstorage)
113 : file_options_(file_options),
114 info_log_(info_log),
115 table_cache_(table_cache),
116 base_vstorage_(base_vstorage),
117 num_levels_(base_vstorage->num_levels()),
118 has_invalid_levels_(false) {
119 levels_ = new LevelState[num_levels_];
120 level_zero_cmp_.sort_method = FileComparator::kLevel0;
121 level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
122 level_nonzero_cmp_.internal_comparator =
123 base_vstorage_->InternalComparator();
124 }
125
~Rep()126 ~Rep() {
127 for (int level = 0; level < num_levels_; level++) {
128 const auto& added = levels_[level].added_files;
129 for (auto& pair : added) {
130 UnrefFile(pair.second);
131 }
132 }
133
134 delete[] levels_;
135 }
136
UnrefFile(FileMetaData * f)137 void UnrefFile(FileMetaData* f) {
138 f->refs--;
139 if (f->refs <= 0) {
140 if (f->table_reader_handle) {
141 assert(table_cache_ != nullptr);
142 table_cache_->ReleaseHandle(f->table_reader_handle);
143 f->table_reader_handle = nullptr;
144 }
145 delete f;
146 }
147 }
148
GetBlobFileMetaData(uint64_t blob_file_number) const149 std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
150 uint64_t blob_file_number) const {
151 auto changed_it = changed_blob_files_.find(blob_file_number);
152 if (changed_it != changed_blob_files_.end()) {
153 const auto& meta = changed_it->second;
154 assert(meta);
155
156 return meta;
157 }
158
159 assert(base_vstorage_);
160
161 const auto& base_blob_files = base_vstorage_->GetBlobFiles();
162
163 auto base_it = base_blob_files.find(blob_file_number);
164 if (base_it != base_blob_files.end()) {
165 const auto& meta = base_it->second;
166 assert(meta);
167
168 return meta;
169 }
170
171 return std::shared_ptr<BlobFileMetaData>();
172 }
173
CheckConsistencyOfOldestBlobFileReference(const VersionStorageInfo * vstorage,uint64_t blob_file_number) const174 Status CheckConsistencyOfOldestBlobFileReference(
175 const VersionStorageInfo* vstorage, uint64_t blob_file_number) const {
176 assert(vstorage);
177
178 // TODO: remove this check once we actually start recoding metadata for
179 // blob files in the MANIFEST.
180 if (vstorage->GetBlobFiles().empty()) {
181 return Status::OK();
182 }
183
184 if (blob_file_number == kInvalidBlobFileNumber) {
185 return Status::OK();
186 }
187
188 const auto meta = GetBlobFileMetaData(blob_file_number);
189 if (!meta) {
190 std::ostringstream oss;
191 oss << "Blob file #" << blob_file_number
192 << " is not part of this version";
193
194 return Status::Corruption("VersionBuilder", oss.str());
195 }
196
197 return Status::OK();
198 }
199
CheckConsistency(VersionStorageInfo * vstorage)200 Status CheckConsistency(VersionStorageInfo* vstorage) {
201 #ifdef NDEBUG
202 if (!vstorage->force_consistency_checks()) {
203 // Dont run consistency checks in release mode except if
204 // explicitly asked to
205 return Status::OK();
206 }
207 #endif
208 // Make sure the files are sorted correctly and that the oldest blob file
209 // reference for each table file points to a valid blob file in this
210 // version.
211 for (int level = 0; level < num_levels_; level++) {
212 auto& level_files = vstorage->LevelFiles(level);
213
214 if (level_files.empty()) {
215 continue;
216 }
217
218 assert(level_files[0]);
219 Status s = CheckConsistencyOfOldestBlobFileReference(
220 vstorage, level_files[0]->oldest_blob_file_number);
221 if (!s.ok()) {
222 return s;
223 }
224
225 for (size_t i = 1; i < level_files.size(); i++) {
226 assert(level_files[i]);
227 s = CheckConsistencyOfOldestBlobFileReference(
228 vstorage, level_files[i]->oldest_blob_file_number);
229 if (!s.ok()) {
230 return s;
231 }
232
233 auto f1 = level_files[i - 1];
234 auto f2 = level_files[i];
235 #ifndef NDEBUG
236 auto pair = std::make_pair(&f1, &f2);
237 TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
238 #endif
239 if (level == 0) {
240 if (!level_zero_cmp_(f1, f2)) {
241 fprintf(stderr, "L0 files are not sorted properly");
242 return Status::Corruption("L0 files are not sorted properly");
243 }
244
245 if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
246 // This is an external file that we ingested
247 SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
248 if (!(external_file_seqno < f1->fd.largest_seqno ||
249 external_file_seqno == 0)) {
250 fprintf(stderr,
251 "L0 file with seqno %" PRIu64 " %" PRIu64
252 " vs. file with global_seqno %" PRIu64 "\n",
253 f1->fd.smallest_seqno, f1->fd.largest_seqno,
254 external_file_seqno);
255 return Status::Corruption(
256 "L0 file with seqno " +
257 NumberToString(f1->fd.smallest_seqno) + " " +
258 NumberToString(f1->fd.largest_seqno) +
259 " vs. file with global_seqno" +
260 NumberToString(external_file_seqno) + " with fileNumber " +
261 NumberToString(f1->fd.GetNumber()));
262 }
263 } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
264 fprintf(stderr,
265 "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
266 " %" PRIu64 "\n",
267 f1->fd.smallest_seqno, f1->fd.largest_seqno,
268 f2->fd.smallest_seqno, f2->fd.largest_seqno);
269 return Status::Corruption(
270 "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
271 " " + NumberToString(f1->fd.largest_seqno) + " " +
272 NumberToString(f1->fd.GetNumber()) + " vs. " +
273 NumberToString(f2->fd.smallest_seqno) + " " +
274 NumberToString(f2->fd.largest_seqno) + " " +
275 NumberToString(f2->fd.GetNumber()));
276 }
277 } else {
278 if (!level_nonzero_cmp_(f1, f2)) {
279 fprintf(stderr, "L%d files are not sorted properly", level);
280 return Status::Corruption("L" + NumberToString(level) +
281 " files are not sorted properly");
282 }
283
284 // Make sure there is no overlap in levels > 0
285 if (vstorage->InternalComparator()->Compare(f1->largest,
286 f2->smallest) >= 0) {
287 fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
288 (f1->largest).DebugString(true).c_str(),
289 (f2->smallest).DebugString(true).c_str());
290 return Status::Corruption(
291 "L" + NumberToString(level) + " have overlapping ranges " +
292 (f1->largest).DebugString(true) + " vs. " +
293 (f2->smallest).DebugString(true));
294 }
295 }
296 }
297 }
298
299 // Make sure that all blob files in the version have non-garbage data.
300 const auto& blob_files = vstorage->GetBlobFiles();
301 for (const auto& pair : blob_files) {
302 const auto& blob_file_meta = pair.second;
303 assert(blob_file_meta);
304
305 if (blob_file_meta->GetGarbageBlobCount() >=
306 blob_file_meta->GetTotalBlobCount()) {
307 std::ostringstream oss;
308 oss << "Blob file #" << blob_file_meta->GetBlobFileNumber()
309 << " consists entirely of garbage";
310
311 return Status::Corruption("VersionBuilder", oss.str());
312 }
313 }
314
315 return Status::OK();
316 }
317
CheckConsistencyForDeletes(VersionEdit *,uint64_t number,int level)318 Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
319 int level) {
320 #ifdef NDEBUG
321 if (!base_vstorage_->force_consistency_checks()) {
322 // Dont run consistency checks in release mode except if
323 // explicitly asked to
324 return Status::OK();
325 }
326 #endif
327 // a file to be deleted better exist in the previous version
328 bool found = false;
329 for (int l = 0; !found && l < num_levels_; l++) {
330 const std::vector<FileMetaData*>& base_files =
331 base_vstorage_->LevelFiles(l);
332 for (size_t i = 0; i < base_files.size(); i++) {
333 FileMetaData* f = base_files[i];
334 if (f->fd.GetNumber() == number) {
335 found = true;
336 break;
337 }
338 }
339 }
340 // if the file did not exist in the previous version, then it
341 // is possibly moved from lower level to higher level in current
342 // version
343 for (int l = level + 1; !found && l < num_levels_; l++) {
344 auto& level_added = levels_[l].added_files;
345 auto got = level_added.find(number);
346 if (got != level_added.end()) {
347 found = true;
348 break;
349 }
350 }
351
352 // maybe this file was added in a previous edit that was Applied
353 if (!found) {
354 auto& level_added = levels_[level].added_files;
355 auto got = level_added.find(number);
356 if (got != level_added.end()) {
357 found = true;
358 }
359 }
360 if (!found) {
361 fprintf(stderr, "not found %" PRIu64 "\n", number);
362 return Status::Corruption("not found " + NumberToString(number));
363 }
364 return Status::OK();
365 }
366
CheckConsistencyForNumLevels()367 bool CheckConsistencyForNumLevels() {
368 // Make sure there are no files on or beyond num_levels().
369 if (has_invalid_levels_) {
370 return false;
371 }
372 for (auto& level : invalid_levels_) {
373 if (level.second.size() > 0) {
374 return false;
375 }
376 }
377 return true;
378 }
379
ApplyBlobFileAddition(const BlobFileAddition & blob_file_addition)380 Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
381 const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
382
383 auto meta = GetBlobFileMetaData(blob_file_number);
384 if (meta) {
385 std::ostringstream oss;
386 oss << "Blob file #" << blob_file_number << " already added";
387
388 return Status::Corruption("VersionBuilder", oss.str());
389 }
390
391 auto shared_meta = std::make_shared<SharedBlobFileMetaData>(
392 blob_file_number, blob_file_addition.GetTotalBlobCount(),
393 blob_file_addition.GetTotalBlobBytes(),
394 blob_file_addition.GetChecksumMethod(),
395 blob_file_addition.GetChecksumValue());
396
397 constexpr uint64_t garbage_blob_count = 0;
398 constexpr uint64_t garbage_blob_bytes = 0;
399 auto new_meta = std::make_shared<BlobFileMetaData>(
400 std::move(shared_meta), garbage_blob_count, garbage_blob_bytes);
401
402 changed_blob_files_.emplace(blob_file_number, std::move(new_meta));
403
404 return Status::OK();
405 }
406
ApplyBlobFileGarbage(const BlobFileGarbage & blob_file_garbage)407 Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
408 const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
409
410 auto meta = GetBlobFileMetaData(blob_file_number);
411 if (!meta) {
412 std::ostringstream oss;
413 oss << "Blob file #" << blob_file_number << " not found";
414
415 return Status::Corruption("VersionBuilder", oss.str());
416 }
417
418 assert(meta->GetBlobFileNumber() == blob_file_number);
419
420 auto new_meta = std::make_shared<BlobFileMetaData>(
421 meta->GetSharedMeta(),
422 meta->GetGarbageBlobCount() + blob_file_garbage.GetGarbageBlobCount(),
423 meta->GetGarbageBlobBytes() + blob_file_garbage.GetGarbageBlobBytes());
424
425 changed_blob_files_[blob_file_number] = std::move(new_meta);
426
427 return Status::OK();
428 }
429
430 // Apply all of the edits in *edit to the current state.
Apply(VersionEdit * edit)431 Status Apply(VersionEdit* edit) {
432 Status s = CheckConsistency(base_vstorage_);
433 if (!s.ok()) {
434 return s;
435 }
436
437 // Delete files
438 const auto& del = edit->GetDeletedFiles();
439 for (const auto& del_file : del) {
440 const auto level = del_file.first;
441 const auto number = del_file.second;
442 if (level < num_levels_) {
443 levels_[level].deleted_files.insert(number);
444 CheckConsistencyForDeletes(edit, number, level);
445
446 auto exising = levels_[level].added_files.find(number);
447 if (exising != levels_[level].added_files.end()) {
448 UnrefFile(exising->second);
449 levels_[level].added_files.erase(exising);
450 }
451 } else {
452 if (invalid_levels_[level].erase(number) == 0) {
453 // Deleting an non-existing file on invalid level.
454 has_invalid_levels_ = true;
455 }
456 }
457 }
458
459 // Add new files
460 for (const auto& new_file : edit->GetNewFiles()) {
461 const int level = new_file.first;
462 if (level < num_levels_) {
463 FileMetaData* f = new FileMetaData(new_file.second);
464 f->refs = 1;
465
466 assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
467 levels_[level].added_files.end());
468 levels_[level].deleted_files.erase(f->fd.GetNumber());
469 levels_[level].added_files[f->fd.GetNumber()] = f;
470 } else {
471 uint64_t number = new_file.second.fd.GetNumber();
472 auto& lvls = invalid_levels_[level];
473 if (lvls.count(number) == 0) {
474 lvls.insert(number);
475 } else {
476 // Creating an already existing file on invalid level.
477 has_invalid_levels_ = true;
478 }
479 }
480 }
481
482 // Add new blob files
483 for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
484 s = ApplyBlobFileAddition(blob_file_addition);
485 if (!s.ok()) {
486 return s;
487 }
488 }
489
490 // Increase the amount of garbage for blob files affected by GC
491 for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
492 s = ApplyBlobFileGarbage(blob_file_garbage);
493 if (!s.ok()) {
494 return s;
495 }
496 }
497
498 return s;
499 }
500
AddBlobFileIfNeeded(VersionStorageInfo * vstorage,const std::shared_ptr<BlobFileMetaData> & meta) const501 void AddBlobFileIfNeeded(
502 VersionStorageInfo* vstorage,
503 const std::shared_ptr<BlobFileMetaData>& meta) const {
504 assert(vstorage);
505 assert(meta);
506
507 if (meta->GetGarbageBlobCount() < meta->GetTotalBlobCount()) {
508 vstorage->AddBlobFile(meta);
509 }
510 }
511
512 // Merge the blob file metadata from the base version with the changes (edits)
513 // applied, and save the result into *vstorage.
SaveBlobFilesTo(VersionStorageInfo * vstorage) const514 void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
515 assert(base_vstorage_);
516 assert(vstorage);
517
518 const auto& base_blob_files = base_vstorage_->GetBlobFiles();
519 auto base_it = base_blob_files.begin();
520 const auto base_it_end = base_blob_files.end();
521
522 auto changed_it = changed_blob_files_.begin();
523 const auto changed_it_end = changed_blob_files_.end();
524
525 while (base_it != base_it_end && changed_it != changed_it_end) {
526 const uint64_t base_blob_file_number = base_it->first;
527 const uint64_t changed_blob_file_number = changed_it->first;
528
529 const auto& base_meta = base_it->second;
530 const auto& changed_meta = changed_it->second;
531
532 assert(base_meta);
533 assert(changed_meta);
534
535 if (base_blob_file_number < changed_blob_file_number) {
536 assert(base_meta->GetGarbageBlobCount() <
537 base_meta->GetTotalBlobCount());
538
539 vstorage->AddBlobFile(base_meta);
540
541 ++base_it;
542 } else if (changed_blob_file_number < base_blob_file_number) {
543 AddBlobFileIfNeeded(vstorage, changed_meta);
544
545 ++changed_it;
546 } else {
547 assert(base_blob_file_number == changed_blob_file_number);
548 assert(base_meta->GetSharedMeta() == changed_meta->GetSharedMeta());
549 assert(base_meta->GetGarbageBlobCount() <=
550 changed_meta->GetGarbageBlobCount());
551 assert(base_meta->GetGarbageBlobBytes() <=
552 changed_meta->GetGarbageBlobBytes());
553
554 AddBlobFileIfNeeded(vstorage, changed_meta);
555
556 ++base_it;
557 ++changed_it;
558 }
559 }
560
561 while (base_it != base_it_end) {
562 const auto& base_meta = base_it->second;
563 assert(base_meta);
564 assert(base_meta->GetGarbageBlobCount() < base_meta->GetTotalBlobCount());
565
566 vstorage->AddBlobFile(base_meta);
567 ++base_it;
568 }
569
570 while (changed_it != changed_it_end) {
571 const auto& changed_meta = changed_it->second;
572 assert(changed_meta);
573
574 AddBlobFileIfNeeded(vstorage, changed_meta);
575 ++changed_it;
576 }
577 }
578
579 // Save the current state in *v.
SaveTo(VersionStorageInfo * vstorage)580 Status SaveTo(VersionStorageInfo* vstorage) {
581 Status s = CheckConsistency(base_vstorage_);
582 if (!s.ok()) {
583 return s;
584 }
585
586 s = CheckConsistency(vstorage);
587 if (!s.ok()) {
588 return s;
589 }
590
591 for (int level = 0; level < num_levels_; level++) {
592 const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
593 // Merge the set of added files with the set of pre-existing files.
594 // Drop any deleted files. Store the result in *v.
595 const auto& base_files = base_vstorage_->LevelFiles(level);
596 const auto& unordered_added_files = levels_[level].added_files;
597 vstorage->Reserve(level,
598 base_files.size() + unordered_added_files.size());
599
600 // Sort added files for the level.
601 std::vector<FileMetaData*> added_files;
602 added_files.reserve(unordered_added_files.size());
603 for (const auto& pair : unordered_added_files) {
604 added_files.push_back(pair.second);
605 }
606 std::sort(added_files.begin(), added_files.end(), cmp);
607
608 #ifndef NDEBUG
609 FileMetaData* prev_added_file = nullptr;
610 for (const auto& added : added_files) {
611 if (level > 0 && prev_added_file != nullptr) {
612 assert(base_vstorage_->InternalComparator()->Compare(
613 prev_added_file->smallest, added->smallest) <= 0);
614 }
615 prev_added_file = added;
616 }
617 #endif
618
619 auto base_iter = base_files.begin();
620 auto base_end = base_files.end();
621 auto added_iter = added_files.begin();
622 auto added_end = added_files.end();
623 while (added_iter != added_end || base_iter != base_end) {
624 if (base_iter == base_end ||
625 (added_iter != added_end && cmp(*added_iter, *base_iter))) {
626 MaybeAddFile(vstorage, level, *added_iter++);
627 } else {
628 MaybeAddFile(vstorage, level, *base_iter++);
629 }
630 }
631 }
632
633 SaveBlobFilesTo(vstorage);
634
635 s = CheckConsistency(vstorage);
636 return s;
637 }
638
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor)639 Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
640 bool prefetch_index_and_filter_in_cache,
641 bool is_initial_load,
642 const SliceTransform* prefix_extractor) {
643 assert(table_cache_ != nullptr);
644
645 size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
646 bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
647 size_t max_load = port::kMaxSizet;
648
649 if (!always_load) {
650 // If it is initial loading and not set to always loading all the
651 // files, we only load up to kInitialLoadLimit files, to limit the
652 // time reopening the DB.
653 const size_t kInitialLoadLimit = 16;
654 size_t load_limit;
655 // If the table cache is not 1/4 full, we pin the table handle to
656 // file metadata to avoid the cache read costs when reading the file.
657 // The downside of pinning those files is that LRU won't be followed
658 // for those files. This doesn't matter much because if number of files
659 // of the DB excceeds table cache capacity, eventually no table reader
660 // will be pinned and LRU will be followed.
661 if (is_initial_load) {
662 load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
663 } else {
664 load_limit = table_cache_capacity / 4;
665 }
666
667 size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
668 if (table_cache_usage >= load_limit) {
669 // TODO (yanqin) find a suitable status code.
670 return Status::OK();
671 } else {
672 max_load = load_limit - table_cache_usage;
673 }
674 }
675
676 // <file metadata, level>
677 std::vector<std::pair<FileMetaData*, int>> files_meta;
678 std::vector<Status> statuses;
679 for (int level = 0; level < num_levels_; level++) {
680 for (auto& file_meta_pair : levels_[level].added_files) {
681 auto* file_meta = file_meta_pair.second;
682 // If the file has been opened before, just skip it.
683 if (!file_meta->table_reader_handle) {
684 files_meta.emplace_back(file_meta, level);
685 statuses.emplace_back(Status::OK());
686 }
687 if (files_meta.size() >= max_load) {
688 break;
689 }
690 }
691 if (files_meta.size() >= max_load) {
692 break;
693 }
694 }
695
696 std::atomic<size_t> next_file_meta_idx(0);
697 std::function<void()> load_handlers_func([&]() {
698 while (true) {
699 size_t file_idx = next_file_meta_idx.fetch_add(1);
700 if (file_idx >= files_meta.size()) {
701 break;
702 }
703
704 auto* file_meta = files_meta[file_idx].first;
705 int level = files_meta[file_idx].second;
706 statuses[file_idx] = table_cache_->FindTable(
707 file_options_, *(base_vstorage_->InternalComparator()),
708 file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
709 false /*no_io */, true /* record_read_stats */,
710 internal_stats->GetFileReadHist(level), false, level,
711 prefetch_index_and_filter_in_cache);
712 if (file_meta->table_reader_handle != nullptr) {
713 // Load table_reader
714 file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
715 file_meta->table_reader_handle);
716 }
717 }
718 });
719
720 std::vector<port::Thread> threads;
721 for (int i = 1; i < max_threads; i++) {
722 threads.emplace_back(load_handlers_func);
723 }
724 load_handlers_func();
725 for (auto& t : threads) {
726 t.join();
727 }
728 for (const auto& s : statuses) {
729 if (!s.ok()) {
730 return s;
731 }
732 }
733 return Status::OK();
734 }
735
MaybeAddFile(VersionStorageInfo * vstorage,int level,FileMetaData * f)736 void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
737 if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
738 // f is to-be-deleted table file
739 vstorage->RemoveCurrentStats(f);
740 } else {
741 vstorage->AddFile(level, f, info_log_);
742 }
743 }
744 };
745
VersionBuilder(const FileOptions & file_options,TableCache * table_cache,VersionStorageInfo * base_vstorage,Logger * info_log)746 VersionBuilder::VersionBuilder(const FileOptions& file_options,
747 TableCache* table_cache,
748 VersionStorageInfo* base_vstorage,
749 Logger* info_log)
750 : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
751
752 VersionBuilder::~VersionBuilder() = default;
753
CheckConsistencyForNumLevels()754 bool VersionBuilder::CheckConsistencyForNumLevels() {
755 return rep_->CheckConsistencyForNumLevels();
756 }
757
Apply(VersionEdit * edit)758 Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
759
SaveTo(VersionStorageInfo * vstorage)760 Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
761 return rep_->SaveTo(vstorage);
762 }
763
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor)764 Status VersionBuilder::LoadTableHandlers(
765 InternalStats* internal_stats, int max_threads,
766 bool prefetch_index_and_filter_in_cache, bool is_initial_load,
767 const SliceTransform* prefix_extractor) {
768 return rep_->LoadTableHandlers(internal_stats, max_threads,
769 prefetch_index_and_filter_in_cache,
770 is_initial_load, prefix_extractor);
771 }
772
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)773 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
774 ColumnFamilyData* cfd)
775 : version_builder_(new VersionBuilder(
776 cfd->current()->version_set()->file_options(), cfd->table_cache(),
777 cfd->current()->storage_info(), cfd->ioptions()->info_log)),
778 version_(cfd->current()) {
779 version_->Ref();
780 }
781
BaseReferencedVersionBuilder(ColumnFamilyData * cfd,Version * v)782 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
783 ColumnFamilyData* cfd, Version* v)
784 : version_builder_(new VersionBuilder(
785 cfd->current()->version_set()->file_options(), cfd->table_cache(),
786 v->storage_info(), cfd->ioptions()->info_log)),
787 version_(v) {
788 assert(version_ != cfd->current());
789 }
790
~BaseReferencedVersionBuilder()791 BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
792 version_->Unref();
793 }
794
795 } // namespace ROCKSDB_NAMESPACE
796