1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_builder.h"
11 
12 #include <algorithm>
13 #include <atomic>
14 #include <cinttypes>
15 #include <functional>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <sstream>
20 #include <thread>
21 #include <unordered_map>
22 #include <unordered_set>
23 #include <utility>
24 #include <vector>
25 
26 #include "db/blob/blob_file_meta.h"
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
33 #include "util/string_util.h"
34 
35 namespace ROCKSDB_NAMESPACE {
36 
NewestFirstBySeqNo(FileMetaData * a,FileMetaData * b)37 bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
38   if (a->fd.largest_seqno != b->fd.largest_seqno) {
39     return a->fd.largest_seqno > b->fd.largest_seqno;
40   }
41   if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
42     return a->fd.smallest_seqno > b->fd.smallest_seqno;
43   }
44   // Break ties by file number
45   return a->fd.GetNumber() > b->fd.GetNumber();
46 }
47 
48 namespace {
BySmallestKey(FileMetaData * a,FileMetaData * b,const InternalKeyComparator * cmp)49 bool BySmallestKey(FileMetaData* a, FileMetaData* b,
50                    const InternalKeyComparator* cmp) {
51   int r = cmp->Compare(a->smallest, b->smallest);
52   if (r != 0) {
53     return (r < 0);
54   }
55   // Break ties by file number
56   return (a->fd.GetNumber() < b->fd.GetNumber());
57 }
58 }  // namespace
59 
60 class VersionBuilder::Rep {
61  private:
62   // Helper to sort files_ in v
63   // kLevel0 -- NewestFirstBySeqNo
64   // kLevelNon0 -- BySmallestKey
65   struct FileComparator {
66     enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
67     const InternalKeyComparator* internal_comparator;
68 
FileComparatorROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator69     FileComparator() : internal_comparator(nullptr) {}
70 
operator ()ROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator71     bool operator()(FileMetaData* f1, FileMetaData* f2) const {
72       switch (sort_method) {
73         case kLevel0:
74           return NewestFirstBySeqNo(f1, f2);
75         case kLevelNon0:
76           return BySmallestKey(f1, f2, internal_comparator);
77       }
78       assert(false);
79       return false;
80     }
81   };
82 
83   struct LevelState {
84     std::unordered_set<uint64_t> deleted_files;
85     // Map from file number to file meta data.
86     std::unordered_map<uint64_t, FileMetaData*> added_files;
87   };
88 
89   const FileOptions& file_options_;
90   Logger* info_log_;
91   TableCache* table_cache_;
92   VersionStorageInfo* base_vstorage_;
93   int num_levels_;
94   LevelState* levels_;
95   // Store states of levels larger than num_levels_. We do this instead of
96   // storing them in levels_ to avoid regression in case there are no files
97   // on invalid levels. The version is not consistent if in the end the files
98   // on invalid levels don't cancel out.
99   std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
100   // Whether there are invalid new files or invalid deletion on levels larger
101   // than num_levels_.
102   bool has_invalid_levels_;
103   FileComparator level_zero_cmp_;
104   FileComparator level_nonzero_cmp_;
105 
106   // Metadata for all blob files affected by the series of version edits.
107   std::map<uint64_t, std::shared_ptr<BlobFileMetaData>> changed_blob_files_;
108 
109  public:
Rep(const FileOptions & file_options,Logger * info_log,TableCache * table_cache,VersionStorageInfo * base_vstorage)110   Rep(const FileOptions& file_options, Logger* info_log,
111       TableCache* table_cache,
112       VersionStorageInfo* base_vstorage)
113       : file_options_(file_options),
114         info_log_(info_log),
115         table_cache_(table_cache),
116         base_vstorage_(base_vstorage),
117         num_levels_(base_vstorage->num_levels()),
118         has_invalid_levels_(false) {
119     levels_ = new LevelState[num_levels_];
120     level_zero_cmp_.sort_method = FileComparator::kLevel0;
121     level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
122     level_nonzero_cmp_.internal_comparator =
123         base_vstorage_->InternalComparator();
124   }
125 
~Rep()126   ~Rep() {
127     for (int level = 0; level < num_levels_; level++) {
128       const auto& added = levels_[level].added_files;
129       for (auto& pair : added) {
130         UnrefFile(pair.second);
131       }
132     }
133 
134     delete[] levels_;
135   }
136 
UnrefFile(FileMetaData * f)137   void UnrefFile(FileMetaData* f) {
138     f->refs--;
139     if (f->refs <= 0) {
140       if (f->table_reader_handle) {
141         assert(table_cache_ != nullptr);
142         table_cache_->ReleaseHandle(f->table_reader_handle);
143         f->table_reader_handle = nullptr;
144       }
145       delete f;
146     }
147   }
148 
GetBlobFileMetaData(uint64_t blob_file_number) const149   std::shared_ptr<BlobFileMetaData> GetBlobFileMetaData(
150       uint64_t blob_file_number) const {
151     auto changed_it = changed_blob_files_.find(blob_file_number);
152     if (changed_it != changed_blob_files_.end()) {
153       const auto& meta = changed_it->second;
154       assert(meta);
155 
156       return meta;
157     }
158 
159     assert(base_vstorage_);
160 
161     const auto& base_blob_files = base_vstorage_->GetBlobFiles();
162 
163     auto base_it = base_blob_files.find(blob_file_number);
164     if (base_it != base_blob_files.end()) {
165       const auto& meta = base_it->second;
166       assert(meta);
167 
168       return meta;
169     }
170 
171     return std::shared_ptr<BlobFileMetaData>();
172   }
173 
CheckConsistencyOfOldestBlobFileReference(const VersionStorageInfo * vstorage,uint64_t blob_file_number) const174   Status CheckConsistencyOfOldestBlobFileReference(
175       const VersionStorageInfo* vstorage, uint64_t blob_file_number) const {
176     assert(vstorage);
177 
178     // TODO: remove this check once we actually start recoding metadata for
179     // blob files in the MANIFEST.
180     if (vstorage->GetBlobFiles().empty()) {
181       return Status::OK();
182     }
183 
184     if (blob_file_number == kInvalidBlobFileNumber) {
185       return Status::OK();
186     }
187 
188     const auto meta = GetBlobFileMetaData(blob_file_number);
189     if (!meta) {
190       std::ostringstream oss;
191       oss << "Blob file #" << blob_file_number
192           << " is not part of this version";
193 
194       return Status::Corruption("VersionBuilder", oss.str());
195     }
196 
197     return Status::OK();
198   }
199 
CheckConsistency(VersionStorageInfo * vstorage)200   Status CheckConsistency(VersionStorageInfo* vstorage) {
201 #ifdef NDEBUG
202     if (!vstorage->force_consistency_checks()) {
203       // Dont run consistency checks in release mode except if
204       // explicitly asked to
205       return Status::OK();
206     }
207 #endif
208     // Make sure the files are sorted correctly and that the oldest blob file
209     // reference for each table file points to a valid blob file in this
210     // version.
211     for (int level = 0; level < num_levels_; level++) {
212       auto& level_files = vstorage->LevelFiles(level);
213 
214       if (level_files.empty()) {
215         continue;
216       }
217 
218       assert(level_files[0]);
219       Status s = CheckConsistencyOfOldestBlobFileReference(
220           vstorage, level_files[0]->oldest_blob_file_number);
221       if (!s.ok()) {
222         return s;
223       }
224 
225       for (size_t i = 1; i < level_files.size(); i++) {
226         assert(level_files[i]);
227         s = CheckConsistencyOfOldestBlobFileReference(
228             vstorage, level_files[i]->oldest_blob_file_number);
229         if (!s.ok()) {
230           return s;
231         }
232 
233         auto f1 = level_files[i - 1];
234         auto f2 = level_files[i];
235 #ifndef NDEBUG
236         auto pair = std::make_pair(&f1, &f2);
237         TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency", &pair);
238 #endif
239         if (level == 0) {
240           if (!level_zero_cmp_(f1, f2)) {
241             fprintf(stderr, "L0 files are not sorted properly");
242             return Status::Corruption("L0 files are not sorted properly");
243           }
244 
245           if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
246             // This is an external file that we ingested
247             SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
248             if (!(external_file_seqno < f1->fd.largest_seqno ||
249                   external_file_seqno == 0)) {
250               fprintf(stderr,
251                       "L0 file with seqno %" PRIu64 " %" PRIu64
252                       " vs. file with global_seqno %" PRIu64 "\n",
253                       f1->fd.smallest_seqno, f1->fd.largest_seqno,
254                       external_file_seqno);
255               return Status::Corruption(
256                   "L0 file with seqno " +
257                   NumberToString(f1->fd.smallest_seqno) + " " +
258                   NumberToString(f1->fd.largest_seqno) +
259                   " vs. file with global_seqno" +
260                   NumberToString(external_file_seqno) + " with fileNumber " +
261                   NumberToString(f1->fd.GetNumber()));
262             }
263           } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
264             fprintf(stderr,
265                     "L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
266                     " %" PRIu64 "\n",
267                     f1->fd.smallest_seqno, f1->fd.largest_seqno,
268                     f2->fd.smallest_seqno, f2->fd.largest_seqno);
269             return Status::Corruption(
270                 "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
271                 " " + NumberToString(f1->fd.largest_seqno) + " " +
272                 NumberToString(f1->fd.GetNumber()) + " vs. " +
273                 NumberToString(f2->fd.smallest_seqno) + " " +
274                 NumberToString(f2->fd.largest_seqno) + " " +
275                 NumberToString(f2->fd.GetNumber()));
276           }
277         } else {
278           if (!level_nonzero_cmp_(f1, f2)) {
279             fprintf(stderr, "L%d files are not sorted properly", level);
280             return Status::Corruption("L" + NumberToString(level) +
281                                       " files are not sorted properly");
282           }
283 
284           // Make sure there is no overlap in levels > 0
285           if (vstorage->InternalComparator()->Compare(f1->largest,
286                                                       f2->smallest) >= 0) {
287             fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
288                     (f1->largest).DebugString(true).c_str(),
289                     (f2->smallest).DebugString(true).c_str());
290             return Status::Corruption(
291                 "L" + NumberToString(level) + " have overlapping ranges " +
292                 (f1->largest).DebugString(true) + " vs. " +
293                 (f2->smallest).DebugString(true));
294           }
295         }
296       }
297     }
298 
299     // Make sure that all blob files in the version have non-garbage data.
300     const auto& blob_files = vstorage->GetBlobFiles();
301     for (const auto& pair : blob_files) {
302       const auto& blob_file_meta = pair.second;
303       assert(blob_file_meta);
304 
305       if (blob_file_meta->GetGarbageBlobCount() >=
306           blob_file_meta->GetTotalBlobCount()) {
307         std::ostringstream oss;
308         oss << "Blob file #" << blob_file_meta->GetBlobFileNumber()
309             << " consists entirely of garbage";
310 
311         return Status::Corruption("VersionBuilder", oss.str());
312       }
313     }
314 
315     return Status::OK();
316   }
317 
CheckConsistencyForDeletes(VersionEdit *,uint64_t number,int level)318   Status CheckConsistencyForDeletes(VersionEdit* /*edit*/, uint64_t number,
319                                     int level) {
320 #ifdef NDEBUG
321     if (!base_vstorage_->force_consistency_checks()) {
322       // Dont run consistency checks in release mode except if
323       // explicitly asked to
324       return Status::OK();
325     }
326 #endif
327     // a file to be deleted better exist in the previous version
328     bool found = false;
329     for (int l = 0; !found && l < num_levels_; l++) {
330       const std::vector<FileMetaData*>& base_files =
331           base_vstorage_->LevelFiles(l);
332       for (size_t i = 0; i < base_files.size(); i++) {
333         FileMetaData* f = base_files[i];
334         if (f->fd.GetNumber() == number) {
335           found = true;
336           break;
337         }
338       }
339     }
340     // if the file did not exist in the previous version, then it
341     // is possibly moved from lower level to higher level in current
342     // version
343     for (int l = level + 1; !found && l < num_levels_; l++) {
344       auto& level_added = levels_[l].added_files;
345       auto got = level_added.find(number);
346       if (got != level_added.end()) {
347         found = true;
348         break;
349       }
350     }
351 
352     // maybe this file was added in a previous edit that was Applied
353     if (!found) {
354       auto& level_added = levels_[level].added_files;
355       auto got = level_added.find(number);
356       if (got != level_added.end()) {
357         found = true;
358       }
359     }
360     if (!found) {
361       fprintf(stderr, "not found %" PRIu64 "\n", number);
362       return Status::Corruption("not found " + NumberToString(number));
363     }
364     return Status::OK();
365   }
366 
CheckConsistencyForNumLevels()367   bool CheckConsistencyForNumLevels() {
368     // Make sure there are no files on or beyond num_levels().
369     if (has_invalid_levels_) {
370       return false;
371     }
372     for (auto& level : invalid_levels_) {
373       if (level.second.size() > 0) {
374         return false;
375       }
376     }
377     return true;
378   }
379 
ApplyBlobFileAddition(const BlobFileAddition & blob_file_addition)380   Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
381     const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
382 
383     auto meta = GetBlobFileMetaData(blob_file_number);
384     if (meta) {
385       std::ostringstream oss;
386       oss << "Blob file #" << blob_file_number << " already added";
387 
388       return Status::Corruption("VersionBuilder", oss.str());
389     }
390 
391     auto shared_meta = std::make_shared<SharedBlobFileMetaData>(
392         blob_file_number, blob_file_addition.GetTotalBlobCount(),
393         blob_file_addition.GetTotalBlobBytes(),
394         blob_file_addition.GetChecksumMethod(),
395         blob_file_addition.GetChecksumValue());
396 
397     constexpr uint64_t garbage_blob_count = 0;
398     constexpr uint64_t garbage_blob_bytes = 0;
399     auto new_meta = std::make_shared<BlobFileMetaData>(
400         std::move(shared_meta), garbage_blob_count, garbage_blob_bytes);
401 
402     changed_blob_files_.emplace(blob_file_number, std::move(new_meta));
403 
404     return Status::OK();
405   }
406 
ApplyBlobFileGarbage(const BlobFileGarbage & blob_file_garbage)407   Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
408     const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
409 
410     auto meta = GetBlobFileMetaData(blob_file_number);
411     if (!meta) {
412       std::ostringstream oss;
413       oss << "Blob file #" << blob_file_number << " not found";
414 
415       return Status::Corruption("VersionBuilder", oss.str());
416     }
417 
418     assert(meta->GetBlobFileNumber() == blob_file_number);
419 
420     auto new_meta = std::make_shared<BlobFileMetaData>(
421         meta->GetSharedMeta(),
422         meta->GetGarbageBlobCount() + blob_file_garbage.GetGarbageBlobCount(),
423         meta->GetGarbageBlobBytes() + blob_file_garbage.GetGarbageBlobBytes());
424 
425     changed_blob_files_[blob_file_number] = std::move(new_meta);
426 
427     return Status::OK();
428   }
429 
430   // Apply all of the edits in *edit to the current state.
Apply(VersionEdit * edit)431   Status Apply(VersionEdit* edit) {
432     Status s = CheckConsistency(base_vstorage_);
433     if (!s.ok()) {
434       return s;
435     }
436 
437     // Delete files
438     const auto& del = edit->GetDeletedFiles();
439     for (const auto& del_file : del) {
440       const auto level = del_file.first;
441       const auto number = del_file.second;
442       if (level < num_levels_) {
443         levels_[level].deleted_files.insert(number);
444         CheckConsistencyForDeletes(edit, number, level);
445 
446         auto exising = levels_[level].added_files.find(number);
447         if (exising != levels_[level].added_files.end()) {
448           UnrefFile(exising->second);
449           levels_[level].added_files.erase(exising);
450         }
451       } else {
452         if (invalid_levels_[level].erase(number) == 0) {
453           // Deleting an non-existing file on invalid level.
454           has_invalid_levels_ = true;
455         }
456       }
457     }
458 
459     // Add new files
460     for (const auto& new_file : edit->GetNewFiles()) {
461       const int level = new_file.first;
462       if (level < num_levels_) {
463         FileMetaData* f = new FileMetaData(new_file.second);
464         f->refs = 1;
465 
466         assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
467                levels_[level].added_files.end());
468         levels_[level].deleted_files.erase(f->fd.GetNumber());
469         levels_[level].added_files[f->fd.GetNumber()] = f;
470       } else {
471         uint64_t number = new_file.second.fd.GetNumber();
472         auto& lvls = invalid_levels_[level];
473         if (lvls.count(number) == 0) {
474           lvls.insert(number);
475         } else {
476           // Creating an already existing file on invalid level.
477           has_invalid_levels_ = true;
478         }
479       }
480     }
481 
482     // Add new blob files
483     for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
484       s = ApplyBlobFileAddition(blob_file_addition);
485       if (!s.ok()) {
486         return s;
487       }
488     }
489 
490     // Increase the amount of garbage for blob files affected by GC
491     for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
492       s = ApplyBlobFileGarbage(blob_file_garbage);
493       if (!s.ok()) {
494         return s;
495       }
496     }
497 
498     return s;
499   }
500 
AddBlobFileIfNeeded(VersionStorageInfo * vstorage,const std::shared_ptr<BlobFileMetaData> & meta) const501   void AddBlobFileIfNeeded(
502       VersionStorageInfo* vstorage,
503       const std::shared_ptr<BlobFileMetaData>& meta) const {
504     assert(vstorage);
505     assert(meta);
506 
507     if (meta->GetGarbageBlobCount() < meta->GetTotalBlobCount()) {
508       vstorage->AddBlobFile(meta);
509     }
510   }
511 
512   // Merge the blob file metadata from the base version with the changes (edits)
513   // applied, and save the result into *vstorage.
SaveBlobFilesTo(VersionStorageInfo * vstorage) const514   void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
515     assert(base_vstorage_);
516     assert(vstorage);
517 
518     const auto& base_blob_files = base_vstorage_->GetBlobFiles();
519     auto base_it = base_blob_files.begin();
520     const auto base_it_end = base_blob_files.end();
521 
522     auto changed_it = changed_blob_files_.begin();
523     const auto changed_it_end = changed_blob_files_.end();
524 
525     while (base_it != base_it_end && changed_it != changed_it_end) {
526       const uint64_t base_blob_file_number = base_it->first;
527       const uint64_t changed_blob_file_number = changed_it->first;
528 
529       const auto& base_meta = base_it->second;
530       const auto& changed_meta = changed_it->second;
531 
532       assert(base_meta);
533       assert(changed_meta);
534 
535       if (base_blob_file_number < changed_blob_file_number) {
536         assert(base_meta->GetGarbageBlobCount() <
537                base_meta->GetTotalBlobCount());
538 
539         vstorage->AddBlobFile(base_meta);
540 
541         ++base_it;
542       } else if (changed_blob_file_number < base_blob_file_number) {
543         AddBlobFileIfNeeded(vstorage, changed_meta);
544 
545         ++changed_it;
546       } else {
547         assert(base_blob_file_number == changed_blob_file_number);
548         assert(base_meta->GetSharedMeta() == changed_meta->GetSharedMeta());
549         assert(base_meta->GetGarbageBlobCount() <=
550                changed_meta->GetGarbageBlobCount());
551         assert(base_meta->GetGarbageBlobBytes() <=
552                changed_meta->GetGarbageBlobBytes());
553 
554         AddBlobFileIfNeeded(vstorage, changed_meta);
555 
556         ++base_it;
557         ++changed_it;
558       }
559     }
560 
561     while (base_it != base_it_end) {
562       const auto& base_meta = base_it->second;
563       assert(base_meta);
564       assert(base_meta->GetGarbageBlobCount() < base_meta->GetTotalBlobCount());
565 
566       vstorage->AddBlobFile(base_meta);
567       ++base_it;
568     }
569 
570     while (changed_it != changed_it_end) {
571       const auto& changed_meta = changed_it->second;
572       assert(changed_meta);
573 
574       AddBlobFileIfNeeded(vstorage, changed_meta);
575       ++changed_it;
576     }
577   }
578 
579   // Save the current state in *v.
SaveTo(VersionStorageInfo * vstorage)580   Status SaveTo(VersionStorageInfo* vstorage) {
581     Status s = CheckConsistency(base_vstorage_);
582     if (!s.ok()) {
583       return s;
584     }
585 
586     s = CheckConsistency(vstorage);
587     if (!s.ok()) {
588       return s;
589     }
590 
591     for (int level = 0; level < num_levels_; level++) {
592       const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
593       // Merge the set of added files with the set of pre-existing files.
594       // Drop any deleted files.  Store the result in *v.
595       const auto& base_files = base_vstorage_->LevelFiles(level);
596       const auto& unordered_added_files = levels_[level].added_files;
597       vstorage->Reserve(level,
598                         base_files.size() + unordered_added_files.size());
599 
600       // Sort added files for the level.
601       std::vector<FileMetaData*> added_files;
602       added_files.reserve(unordered_added_files.size());
603       for (const auto& pair : unordered_added_files) {
604         added_files.push_back(pair.second);
605       }
606       std::sort(added_files.begin(), added_files.end(), cmp);
607 
608 #ifndef NDEBUG
609       FileMetaData* prev_added_file = nullptr;
610       for (const auto& added : added_files) {
611         if (level > 0 && prev_added_file != nullptr) {
612           assert(base_vstorage_->InternalComparator()->Compare(
613                      prev_added_file->smallest, added->smallest) <= 0);
614         }
615         prev_added_file = added;
616       }
617 #endif
618 
619       auto base_iter = base_files.begin();
620       auto base_end = base_files.end();
621       auto added_iter = added_files.begin();
622       auto added_end = added_files.end();
623       while (added_iter != added_end || base_iter != base_end) {
624         if (base_iter == base_end ||
625                 (added_iter != added_end && cmp(*added_iter, *base_iter))) {
626           MaybeAddFile(vstorage, level, *added_iter++);
627         } else {
628           MaybeAddFile(vstorage, level, *base_iter++);
629         }
630       }
631     }
632 
633     SaveBlobFilesTo(vstorage);
634 
635     s = CheckConsistency(vstorage);
636     return s;
637   }
638 
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor)639   Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
640                            bool prefetch_index_and_filter_in_cache,
641                            bool is_initial_load,
642                            const SliceTransform* prefix_extractor) {
643     assert(table_cache_ != nullptr);
644 
645     size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
646     bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
647     size_t max_load = port::kMaxSizet;
648 
649     if (!always_load) {
650       // If it is initial loading and not set to always loading all the
651       // files, we only load up to kInitialLoadLimit files, to limit the
652       // time reopening the DB.
653       const size_t kInitialLoadLimit = 16;
654       size_t load_limit;
655       // If the table cache is not 1/4 full, we pin the table handle to
656       // file metadata to avoid the cache read costs when reading the file.
657       // The downside of pinning those files is that LRU won't be followed
658       // for those files. This doesn't matter much because if number of files
659       // of the DB excceeds table cache capacity, eventually no table reader
660       // will be pinned and LRU will be followed.
661       if (is_initial_load) {
662         load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
663       } else {
664         load_limit = table_cache_capacity / 4;
665       }
666 
667       size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
668       if (table_cache_usage >= load_limit) {
669         // TODO (yanqin) find a suitable status code.
670         return Status::OK();
671       } else {
672         max_load = load_limit - table_cache_usage;
673       }
674     }
675 
676     // <file metadata, level>
677     std::vector<std::pair<FileMetaData*, int>> files_meta;
678     std::vector<Status> statuses;
679     for (int level = 0; level < num_levels_; level++) {
680       for (auto& file_meta_pair : levels_[level].added_files) {
681         auto* file_meta = file_meta_pair.second;
682         // If the file has been opened before, just skip it.
683         if (!file_meta->table_reader_handle) {
684           files_meta.emplace_back(file_meta, level);
685           statuses.emplace_back(Status::OK());
686         }
687         if (files_meta.size() >= max_load) {
688           break;
689         }
690       }
691       if (files_meta.size() >= max_load) {
692         break;
693       }
694     }
695 
696     std::atomic<size_t> next_file_meta_idx(0);
697     std::function<void()> load_handlers_func([&]() {
698       while (true) {
699         size_t file_idx = next_file_meta_idx.fetch_add(1);
700         if (file_idx >= files_meta.size()) {
701           break;
702         }
703 
704         auto* file_meta = files_meta[file_idx].first;
705         int level = files_meta[file_idx].second;
706         statuses[file_idx] = table_cache_->FindTable(
707             file_options_, *(base_vstorage_->InternalComparator()),
708             file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
709             false /*no_io */, true /* record_read_stats */,
710             internal_stats->GetFileReadHist(level), false, level,
711             prefetch_index_and_filter_in_cache);
712         if (file_meta->table_reader_handle != nullptr) {
713           // Load table_reader
714           file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
715               file_meta->table_reader_handle);
716         }
717       }
718     });
719 
720     std::vector<port::Thread> threads;
721     for (int i = 1; i < max_threads; i++) {
722       threads.emplace_back(load_handlers_func);
723     }
724     load_handlers_func();
725     for (auto& t : threads) {
726       t.join();
727     }
728     for (const auto& s : statuses) {
729       if (!s.ok()) {
730         return s;
731       }
732     }
733     return Status::OK();
734   }
735 
MaybeAddFile(VersionStorageInfo * vstorage,int level,FileMetaData * f)736   void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
737     if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
738       // f is to-be-deleted table file
739       vstorage->RemoveCurrentStats(f);
740     } else {
741       vstorage->AddFile(level, f, info_log_);
742     }
743   }
744 };
745 
VersionBuilder(const FileOptions & file_options,TableCache * table_cache,VersionStorageInfo * base_vstorage,Logger * info_log)746 VersionBuilder::VersionBuilder(const FileOptions& file_options,
747                                TableCache* table_cache,
748                                VersionStorageInfo* base_vstorage,
749                                Logger* info_log)
750     : rep_(new Rep(file_options, info_log, table_cache, base_vstorage)) {}
751 
752 VersionBuilder::~VersionBuilder() = default;
753 
CheckConsistencyForNumLevels()754 bool VersionBuilder::CheckConsistencyForNumLevels() {
755   return rep_->CheckConsistencyForNumLevels();
756 }
757 
Apply(VersionEdit * edit)758 Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
759 
SaveTo(VersionStorageInfo * vstorage)760 Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
761   return rep_->SaveTo(vstorage);
762 }
763 
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor)764 Status VersionBuilder::LoadTableHandlers(
765     InternalStats* internal_stats, int max_threads,
766     bool prefetch_index_and_filter_in_cache, bool is_initial_load,
767     const SliceTransform* prefix_extractor) {
768   return rep_->LoadTableHandlers(internal_stats, max_threads,
769                                  prefetch_index_and_filter_in_cache,
770                                  is_initial_load, prefix_extractor);
771 }
772 
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)773 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
774     ColumnFamilyData* cfd)
775     : version_builder_(new VersionBuilder(
776           cfd->current()->version_set()->file_options(), cfd->table_cache(),
777           cfd->current()->storage_info(), cfd->ioptions()->info_log)),
778       version_(cfd->current()) {
779   version_->Ref();
780 }
781 
BaseReferencedVersionBuilder(ColumnFamilyData * cfd,Version * v)782 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
783     ColumnFamilyData* cfd, Version* v)
784     : version_builder_(new VersionBuilder(
785           cfd->current()->version_set()->file_options(), cfd->table_cache(),
786           v->storage_info(), cfd->ioptions()->info_log)),
787       version_(v) {
788   assert(version_ != cfd->current());
789 }
790 
~BaseReferencedVersionBuilder()791 BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
792   version_->Unref();
793 }
794 
795 }  // namespace ROCKSDB_NAMESPACE
796