xref: /leveldb-1.20/db/version_set.cc (revision a2fb086d)
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 
5 #include "db/version_set.h"
6 
7 #include <algorithm>
8 #include <stdio.h>
9 #include "db/filename.h"
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "db/memtable.h"
13 #include "db/table_cache.h"
14 #include "leveldb/env.h"
15 #include "leveldb/table_builder.h"
16 #include "table/merger.h"
17 #include "table/two_level_iterator.h"
18 #include "util/coding.h"
19 #include "util/logging.h"
20 
21 namespace leveldb {
22 
TargetFileSize(const Options * options)23 static int TargetFileSize(const Options* options) {
24   return options->max_file_size;
25 }
26 
27 // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
28 // stop building a single file in a level->level+1 compaction.
MaxGrandParentOverlapBytes(const Options * options)29 static int64_t MaxGrandParentOverlapBytes(const Options* options) {
30   return 10 * TargetFileSize(options);
31 }
32 
33 // Maximum number of bytes in all compacted files.  We avoid expanding
34 // the lower level file set of a compaction if it would make the
35 // total compaction cover more than this many bytes.
ExpandedCompactionByteSizeLimit(const Options * options)36 static int64_t ExpandedCompactionByteSizeLimit(const Options* options) {
37   return 25 * TargetFileSize(options);
38 }
39 
MaxBytesForLevel(const Options * options,int level)40 static double MaxBytesForLevel(const Options* options, int level) {
41   // Note: the result for level zero is not really used since we set
42   // the level-0 compaction threshold based on number of files.
43 
44   // Result for both level-0 and level-1
45   double result = 10. * 1048576.0;
46   while (level > 1) {
47     result *= 10;
48     level--;
49   }
50   return result;
51 }
52 
MaxFileSizeForLevel(const Options * options,int level)53 static uint64_t MaxFileSizeForLevel(const Options* options, int level) {
54   // We could vary per level to reduce number of files?
55   return TargetFileSize(options);
56 }
57 
TotalFileSize(const std::vector<FileMetaData * > & files)58 static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
59   int64_t sum = 0;
60   for (size_t i = 0; i < files.size(); i++) {
61     sum += files[i]->file_size;
62   }
63   return sum;
64 }
65 
~Version()66 Version::~Version() {
67   assert(refs_ == 0);
68 
69   // Remove from linked list
70   prev_->next_ = next_;
71   next_->prev_ = prev_;
72 
73   // Drop references to files
74   for (int level = 0; level < config::kNumLevels; level++) {
75     for (size_t i = 0; i < files_[level].size(); i++) {
76       FileMetaData* f = files_[level][i];
77       assert(f->refs > 0);
78       f->refs--;
79       if (f->refs <= 0) {
80         delete f;
81       }
82     }
83   }
84 }
85 
FindFile(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const Slice & key)86 int FindFile(const InternalKeyComparator& icmp,
87              const std::vector<FileMetaData*>& files,
88              const Slice& key) {
89   uint32_t left = 0;
90   uint32_t right = files.size();
91   while (left < right) {
92     uint32_t mid = (left + right) / 2;
93     const FileMetaData* f = files[mid];
94     if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
95       // Key at "mid.largest" is < "target".  Therefore all
96       // files at or before "mid" are uninteresting.
97       left = mid + 1;
98     } else {
99       // Key at "mid.largest" is >= "target".  Therefore all files
100       // after "mid" are uninteresting.
101       right = mid;
102     }
103   }
104   return right;
105 }
106 
AfterFile(const Comparator * ucmp,const Slice * user_key,const FileMetaData * f)107 static bool AfterFile(const Comparator* ucmp,
108                       const Slice* user_key, const FileMetaData* f) {
109   // NULL user_key occurs before all keys and is therefore never after *f
110   return (user_key != NULL &&
111           ucmp->Compare(*user_key, f->largest.user_key()) > 0);
112 }
113 
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FileMetaData * f)114 static bool BeforeFile(const Comparator* ucmp,
115                        const Slice* user_key, const FileMetaData* f) {
116   // NULL user_key occurs after all keys and is therefore never before *f
117   return (user_key != NULL &&
118           ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
119 }
120 
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const std::vector<FileMetaData * > & files,const Slice * smallest_user_key,const Slice * largest_user_key)121 bool SomeFileOverlapsRange(
122     const InternalKeyComparator& icmp,
123     bool disjoint_sorted_files,
124     const std::vector<FileMetaData*>& files,
125     const Slice* smallest_user_key,
126     const Slice* largest_user_key) {
127   const Comparator* ucmp = icmp.user_comparator();
128   if (!disjoint_sorted_files) {
129     // Need to check against all files
130     for (size_t i = 0; i < files.size(); i++) {
131       const FileMetaData* f = files[i];
132       if (AfterFile(ucmp, smallest_user_key, f) ||
133           BeforeFile(ucmp, largest_user_key, f)) {
134         // No overlap
135       } else {
136         return true;  // Overlap
137       }
138     }
139     return false;
140   }
141 
142   // Binary search over file list
143   uint32_t index = 0;
144   if (smallest_user_key != NULL) {
145     // Find the earliest possible internal key for smallest_user_key
146     InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
147     index = FindFile(icmp, files, small.Encode());
148   }
149 
150   if (index >= files.size()) {
151     // beginning of range is after all files, so no overlap.
152     return false;
153   }
154 
155   return !BeforeFile(ucmp, largest_user_key, files[index]);
156 }
157 
158 // An internal iterator.  For a given version/level pair, yields
159 // information about the files in the level.  For a given entry, key()
160 // is the largest key that occurs in the file, and value() is an
161 // 16-byte value containing the file number and file size, both
162 // encoded using EncodeFixed64.
163 class Version::LevelFileNumIterator : public Iterator {
164  public:
LevelFileNumIterator(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > * flist)165   LevelFileNumIterator(const InternalKeyComparator& icmp,
166                        const std::vector<FileMetaData*>* flist)
167       : icmp_(icmp),
168         flist_(flist),
169         index_(flist->size()) {        // Marks as invalid
170   }
Valid() const171   virtual bool Valid() const {
172     return index_ < flist_->size();
173   }
Seek(const Slice & target)174   virtual void Seek(const Slice& target) {
175     index_ = FindFile(icmp_, *flist_, target);
176   }
SeekToFirst()177   virtual void SeekToFirst() { index_ = 0; }
SeekToLast()178   virtual void SeekToLast() {
179     index_ = flist_->empty() ? 0 : flist_->size() - 1;
180   }
Next()181   virtual void Next() {
182     assert(Valid());
183     index_++;
184   }
Prev()185   virtual void Prev() {
186     assert(Valid());
187     if (index_ == 0) {
188       index_ = flist_->size();  // Marks as invalid
189     } else {
190       index_--;
191     }
192   }
key() const193   Slice key() const {
194     assert(Valid());
195     return (*flist_)[index_]->largest.Encode();
196   }
value() const197   Slice value() const {
198     assert(Valid());
199     EncodeFixed64(value_buf_, (*flist_)[index_]->number);
200     EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
201     return Slice(value_buf_, sizeof(value_buf_));
202   }
status() const203   virtual Status status() const { return Status::OK(); }
204  private:
205   const InternalKeyComparator icmp_;
206   const std::vector<FileMetaData*>* const flist_;
207   uint32_t index_;
208 
209   // Backing store for value().  Holds the file number and size.
210   mutable char value_buf_[16];
211 };
212 
GetFileIterator(void * arg,const ReadOptions & options,const Slice & file_value)213 static Iterator* GetFileIterator(void* arg,
214                                  const ReadOptions& options,
215                                  const Slice& file_value) {
216   TableCache* cache = reinterpret_cast<TableCache*>(arg);
217   if (file_value.size() != 16) {
218     return NewErrorIterator(
219         Status::Corruption("FileReader invoked with unexpected value"));
220   } else {
221     return cache->NewIterator(options,
222                               DecodeFixed64(file_value.data()),
223                               DecodeFixed64(file_value.data() + 8));
224   }
225 }
226 
NewConcatenatingIterator(const ReadOptions & options,int level) const227 Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
228                                             int level) const {
229   return NewTwoLevelIterator(
230       new LevelFileNumIterator(vset_->icmp_, &files_[level]),
231       &GetFileIterator, vset_->table_cache_, options);
232 }
233 
AddIterators(const ReadOptions & options,std::vector<Iterator * > * iters)234 void Version::AddIterators(const ReadOptions& options,
235                            std::vector<Iterator*>* iters) {
236   // Merge all level zero files together since they may overlap
237   for (size_t i = 0; i < files_[0].size(); i++) {
238     iters->push_back(
239         vset_->table_cache_->NewIterator(
240             options, files_[0][i]->number, files_[0][i]->file_size));
241   }
242 
243   // For levels > 0, we can use a concatenating iterator that sequentially
244   // walks through the non-overlapping files in the level, opening them
245   // lazily.
246   for (int level = 1; level < config::kNumLevels; level++) {
247     if (!files_[level].empty()) {
248       iters->push_back(NewConcatenatingIterator(options, level));
249     }
250   }
251 }
252 
253 // Callback from TableCache::Get()
254 namespace {
255 enum SaverState {
256   kNotFound,
257   kFound,
258   kDeleted,
259   kCorrupt,
260 };
261 struct Saver {
262   SaverState state;
263   const Comparator* ucmp;
264   Slice user_key;
265   std::string* value;
266 };
267 }
SaveValue(void * arg,const Slice & ikey,const Slice & v)268 static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
269   Saver* s = reinterpret_cast<Saver*>(arg);
270   ParsedInternalKey parsed_key;
271   if (!ParseInternalKey(ikey, &parsed_key)) {
272     s->state = kCorrupt;
273   } else {
274     if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
275       s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
276       if (s->state == kFound) {
277         s->value->assign(v.data(), v.size());
278       }
279     }
280   }
281 }
282 
NewestFirst(FileMetaData * a,FileMetaData * b)283 static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
284   return a->number > b->number;
285 }
286 
ForEachOverlapping(Slice user_key,Slice internal_key,void * arg,bool (* func)(void *,int,FileMetaData *))287 void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
288                                  void* arg,
289                                  bool (*func)(void*, int, FileMetaData*)) {
290   // TODO(sanjay): Change Version::Get() to use this function.
291   const Comparator* ucmp = vset_->icmp_.user_comparator();
292 
293   // Search level-0 in order from newest to oldest.
294   std::vector<FileMetaData*> tmp;
295   tmp.reserve(files_[0].size());
296   for (uint32_t i = 0; i < files_[0].size(); i++) {
297     FileMetaData* f = files_[0][i];
298     if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
299         ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
300       tmp.push_back(f);
301     }
302   }
303   if (!tmp.empty()) {
304     std::sort(tmp.begin(), tmp.end(), NewestFirst);
305     for (uint32_t i = 0; i < tmp.size(); i++) {
306       if (!(*func)(arg, 0, tmp[i])) {
307         return;
308       }
309     }
310   }
311 
312   // Search other levels.
313   for (int level = 1; level < config::kNumLevels; level++) {
314     size_t num_files = files_[level].size();
315     if (num_files == 0) continue;
316 
317     // Binary search to find earliest index whose largest key >= internal_key.
318     uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
319     if (index < num_files) {
320       FileMetaData* f = files_[level][index];
321       if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
322         // All of "f" is past any data for user_key
323       } else {
324         if (!(*func)(arg, level, f)) {
325           return;
326         }
327       }
328     }
329   }
330 }
331 
Get(const ReadOptions & options,const LookupKey & k,std::string * value,GetStats * stats)332 Status Version::Get(const ReadOptions& options,
333                     const LookupKey& k,
334                     std::string* value,
335                     GetStats* stats) {
336   Slice ikey = k.internal_key();
337   Slice user_key = k.user_key();
338   const Comparator* ucmp = vset_->icmp_.user_comparator();
339   Status s;
340 
341   stats->seek_file = NULL;
342   stats->seek_file_level = -1;
343   FileMetaData* last_file_read = NULL;
344   int last_file_read_level = -1;
345 
346   // We can search level-by-level since entries never hop across
347   // levels.  Therefore we are guaranteed that if we find data
348   // in an smaller level, later levels are irrelevant.
349   std::vector<FileMetaData*> tmp;
350   FileMetaData* tmp2;
351   for (int level = 0; level < config::kNumLevels; level++) {
352     size_t num_files = files_[level].size();
353     if (num_files == 0) continue;
354 
355     // Get the list of files to search in this level
356     FileMetaData* const* files = &files_[level][0];
357     if (level == 0) {
358       // Level-0 files may overlap each other.  Find all files that
359       // overlap user_key and process them in order from newest to oldest.
360       tmp.reserve(num_files);
361       for (uint32_t i = 0; i < num_files; i++) {
362         FileMetaData* f = files[i];
363         if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
364             ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
365           tmp.push_back(f);
366         }
367       }
368       if (tmp.empty()) continue;
369 
370       std::sort(tmp.begin(), tmp.end(), NewestFirst);
371       files = &tmp[0];
372       num_files = tmp.size();
373     } else {
374       // Binary search to find earliest index whose largest key >= ikey.
375       uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
376       if (index >= num_files) {
377         files = NULL;
378         num_files = 0;
379       } else {
380         tmp2 = files[index];
381         if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
382           // All of "tmp2" is past any data for user_key
383           files = NULL;
384           num_files = 0;
385         } else {
386           files = &tmp2;
387           num_files = 1;
388         }
389       }
390     }
391 
392     for (uint32_t i = 0; i < num_files; ++i) {
393       if (last_file_read != NULL && stats->seek_file == NULL) {
394         // We have had more than one seek for this read.  Charge the 1st file.
395         stats->seek_file = last_file_read;
396         stats->seek_file_level = last_file_read_level;
397       }
398 
399       FileMetaData* f = files[i];
400       last_file_read = f;
401       last_file_read_level = level;
402 
403       Saver saver;
404       saver.state = kNotFound;
405       saver.ucmp = ucmp;
406       saver.user_key = user_key;
407       saver.value = value;
408       s = vset_->table_cache_->Get(options, f->number, f->file_size,
409                                    ikey, &saver, SaveValue);
410       if (!s.ok()) {
411         return s;
412       }
413       switch (saver.state) {
414         case kNotFound:
415           break;      // Keep searching in other files
416         case kFound:
417           return s;
418         case kDeleted:
419           s = Status::NotFound(Slice());  // Use empty error message for speed
420           return s;
421         case kCorrupt:
422           s = Status::Corruption("corrupted key for ", user_key);
423           return s;
424       }
425     }
426   }
427 
428   return Status::NotFound(Slice());  // Use an empty error message for speed
429 }
430 
UpdateStats(const GetStats & stats)431 bool Version::UpdateStats(const GetStats& stats) {
432   FileMetaData* f = stats.seek_file;
433   if (f != NULL) {
434     f->allowed_seeks--;
435     if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
436       file_to_compact_ = f;
437       file_to_compact_level_ = stats.seek_file_level;
438       return true;
439     }
440   }
441   return false;
442 }
443 
RecordReadSample(Slice internal_key)444 bool Version::RecordReadSample(Slice internal_key) {
445   ParsedInternalKey ikey;
446   if (!ParseInternalKey(internal_key, &ikey)) {
447     return false;
448   }
449 
450   struct State {
451     GetStats stats;  // Holds first matching file
452     int matches;
453 
454     static bool Match(void* arg, int level, FileMetaData* f) {
455       State* state = reinterpret_cast<State*>(arg);
456       state->matches++;
457       if (state->matches == 1) {
458         // Remember first match.
459         state->stats.seek_file = f;
460         state->stats.seek_file_level = level;
461       }
462       // We can stop iterating once we have a second match.
463       return state->matches < 2;
464     }
465   };
466 
467   State state;
468   state.matches = 0;
469   ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
470 
471   // Must have at least two matches since we want to merge across
472   // files. But what if we have a single file that contains many
473   // overwrites and deletions?  Should we have another mechanism for
474   // finding such files?
475   if (state.matches >= 2) {
476     // 1MB cost is about 1 seek (see comment in Builder::Apply).
477     return UpdateStats(state.stats);
478   }
479   return false;
480 }
481 
Ref()482 void Version::Ref() {
483   ++refs_;
484 }
485 
Unref()486 void Version::Unref() {
487   assert(this != &vset_->dummy_versions_);
488   assert(refs_ >= 1);
489   --refs_;
490   if (refs_ == 0) {
491     delete this;
492   }
493 }
494 
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)495 bool Version::OverlapInLevel(int level,
496                              const Slice* smallest_user_key,
497                              const Slice* largest_user_key) {
498   return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
499                                smallest_user_key, largest_user_key);
500 }
501 
PickLevelForMemTableOutput(const Slice & smallest_user_key,const Slice & largest_user_key)502 int Version::PickLevelForMemTableOutput(
503     const Slice& smallest_user_key,
504     const Slice& largest_user_key) {
505   int level = 0;
506   if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
507     // Push to next level if there is no overlap in next level,
508     // and the #bytes overlapping in the level after that are limited.
509     InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
510     InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
511     std::vector<FileMetaData*> overlaps;
512     while (level < config::kMaxMemCompactLevel) {
513       if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
514         break;
515       }
516       if (level + 2 < config::kNumLevels) {
517         // Check that file does not overlap too many grandparent bytes.
518         GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
519         const int64_t sum = TotalFileSize(overlaps);
520         if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
521           break;
522         }
523       }
524       level++;
525     }
526   }
527   return level;
528 }
529 
530 // Store in "*inputs" all files in "level" that overlap [begin,end]
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs)531 void Version::GetOverlappingInputs(
532     int level,
533     const InternalKey* begin,
534     const InternalKey* end,
535     std::vector<FileMetaData*>* inputs) {
536   assert(level >= 0);
537   assert(level < config::kNumLevels);
538   inputs->clear();
539   Slice user_begin, user_end;
540   if (begin != NULL) {
541     user_begin = begin->user_key();
542   }
543   if (end != NULL) {
544     user_end = end->user_key();
545   }
546   const Comparator* user_cmp = vset_->icmp_.user_comparator();
547   for (size_t i = 0; i < files_[level].size(); ) {
548     FileMetaData* f = files_[level][i++];
549     const Slice file_start = f->smallest.user_key();
550     const Slice file_limit = f->largest.user_key();
551     if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
552       // "f" is completely before specified range; skip it
553     } else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
554       // "f" is completely after specified range; skip it
555     } else {
556       inputs->push_back(f);
557       if (level == 0) {
558         // Level-0 files may overlap each other.  So check if the newly
559         // added file has expanded the range.  If so, restart search.
560         if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
561           user_begin = file_start;
562           inputs->clear();
563           i = 0;
564         } else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
565           user_end = file_limit;
566           inputs->clear();
567           i = 0;
568         }
569       }
570     }
571   }
572 }
573 
DebugString() const574 std::string Version::DebugString() const {
575   std::string r;
576   for (int level = 0; level < config::kNumLevels; level++) {
577     // E.g.,
578     //   --- level 1 ---
579     //   17:123['a' .. 'd']
580     //   20:43['e' .. 'g']
581     r.append("--- level ");
582     AppendNumberTo(&r, level);
583     r.append(" ---\n");
584     const std::vector<FileMetaData*>& files = files_[level];
585     for (size_t i = 0; i < files.size(); i++) {
586       r.push_back(' ');
587       AppendNumberTo(&r, files[i]->number);
588       r.push_back(':');
589       AppendNumberTo(&r, files[i]->file_size);
590       r.append("[");
591       r.append(files[i]->smallest.DebugString());
592       r.append(" .. ");
593       r.append(files[i]->largest.DebugString());
594       r.append("]\n");
595     }
596   }
597   return r;
598 }
599 
600 // A helper class so we can efficiently apply a whole sequence
601 // of edits to a particular state without creating intermediate
602 // Versions that contain full copies of the intermediate state.
603 class VersionSet::Builder {
604  private:
605   // Helper to sort by v->files_[file_number].smallest
606   struct BySmallestKey {
607     const InternalKeyComparator* internal_comparator;
608 
operator ()leveldb::VersionSet::Builder::BySmallestKey609     bool operator()(FileMetaData* f1, FileMetaData* f2) const {
610       int r = internal_comparator->Compare(f1->smallest, f2->smallest);
611       if (r != 0) {
612         return (r < 0);
613       } else {
614         // Break ties by file number
615         return (f1->number < f2->number);
616       }
617     }
618   };
619 
620   typedef std::set<FileMetaData*, BySmallestKey> FileSet;
621   struct LevelState {
622     std::set<uint64_t> deleted_files;
623     FileSet* added_files;
624   };
625 
626   VersionSet* vset_;
627   Version* base_;
628   LevelState levels_[config::kNumLevels];
629 
630  public:
631   // Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet * vset,Version * base)632   Builder(VersionSet* vset, Version* base)
633       : vset_(vset),
634         base_(base) {
635     base_->Ref();
636     BySmallestKey cmp;
637     cmp.internal_comparator = &vset_->icmp_;
638     for (int level = 0; level < config::kNumLevels; level++) {
639       levels_[level].added_files = new FileSet(cmp);
640     }
641   }
642 
~Builder()643   ~Builder() {
644     for (int level = 0; level < config::kNumLevels; level++) {
645       const FileSet* added = levels_[level].added_files;
646       std::vector<FileMetaData*> to_unref;
647       to_unref.reserve(added->size());
648       for (FileSet::const_iterator it = added->begin();
649           it != added->end(); ++it) {
650         to_unref.push_back(*it);
651       }
652       delete added;
653       for (uint32_t i = 0; i < to_unref.size(); i++) {
654         FileMetaData* f = to_unref[i];
655         f->refs--;
656         if (f->refs <= 0) {
657           delete f;
658         }
659       }
660     }
661     base_->Unref();
662   }
663 
664   // Apply all of the edits in *edit to the current state.
Apply(VersionEdit * edit)665   void Apply(VersionEdit* edit) {
666     // Update compaction pointers
667     for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
668       const int level = edit->compact_pointers_[i].first;
669       vset_->compact_pointer_[level] =
670           edit->compact_pointers_[i].second.Encode().ToString();
671     }
672 
673     // Delete files
674     const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
675     for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
676          iter != del.end();
677          ++iter) {
678       const int level = iter->first;
679       const uint64_t number = iter->second;
680       levels_[level].deleted_files.insert(number);
681     }
682 
683     // Add new files
684     for (size_t i = 0; i < edit->new_files_.size(); i++) {
685       const int level = edit->new_files_[i].first;
686       FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
687       f->refs = 1;
688 
689       // We arrange to automatically compact this file after
690       // a certain number of seeks.  Let's assume:
691       //   (1) One seek costs 10ms
692       //   (2) Writing or reading 1MB costs 10ms (100MB/s)
693       //   (3) A compaction of 1MB does 25MB of IO:
694       //         1MB read from this level
695       //         10-12MB read from next level (boundaries may be misaligned)
696       //         10-12MB written to next level
697       // This implies that 25 seeks cost the same as the compaction
698       // of 1MB of data.  I.e., one seek costs approximately the
699       // same as the compaction of 40KB of data.  We are a little
700       // conservative and allow approximately one seek for every 16KB
701       // of data before triggering a compaction.
702       f->allowed_seeks = (f->file_size / 16384);
703       if (f->allowed_seeks < 100) f->allowed_seeks = 100;
704 
705       levels_[level].deleted_files.erase(f->number);
706       levels_[level].added_files->insert(f);
707     }
708   }
709 
710   // Save the current state in *v.
SaveTo(Version * v)711   void SaveTo(Version* v) {
712     BySmallestKey cmp;
713     cmp.internal_comparator = &vset_->icmp_;
714     for (int level = 0; level < config::kNumLevels; level++) {
715       // Merge the set of added files with the set of pre-existing files.
716       // Drop any deleted files.  Store the result in *v.
717       const std::vector<FileMetaData*>& base_files = base_->files_[level];
718       std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
719       std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
720       const FileSet* added = levels_[level].added_files;
721       v->files_[level].reserve(base_files.size() + added->size());
722       for (FileSet::const_iterator added_iter = added->begin();
723            added_iter != added->end();
724            ++added_iter) {
725         // Add all smaller files listed in base_
726         for (std::vector<FileMetaData*>::const_iterator bpos
727                  = std::upper_bound(base_iter, base_end, *added_iter, cmp);
728              base_iter != bpos;
729              ++base_iter) {
730           MaybeAddFile(v, level, *base_iter);
731         }
732 
733         MaybeAddFile(v, level, *added_iter);
734       }
735 
736       // Add remaining base files
737       for (; base_iter != base_end; ++base_iter) {
738         MaybeAddFile(v, level, *base_iter);
739       }
740 
741 #ifndef NDEBUG
742       // Make sure there is no overlap in levels > 0
743       if (level > 0) {
744         for (uint32_t i = 1; i < v->files_[level].size(); i++) {
745           const InternalKey& prev_end = v->files_[level][i-1]->largest;
746           const InternalKey& this_begin = v->files_[level][i]->smallest;
747           if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
748             fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
749                     prev_end.DebugString().c_str(),
750                     this_begin.DebugString().c_str());
751             abort();
752           }
753         }
754       }
755 #endif
756     }
757   }
758 
MaybeAddFile(Version * v,int level,FileMetaData * f)759   void MaybeAddFile(Version* v, int level, FileMetaData* f) {
760     if (levels_[level].deleted_files.count(f->number) > 0) {
761       // File is deleted: do nothing
762     } else {
763       std::vector<FileMetaData*>* files = &v->files_[level];
764       if (level > 0 && !files->empty()) {
765         // Must not overlap
766         assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
767                                     f->smallest) < 0);
768       }
769       f->refs++;
770       files->push_back(f);
771     }
772   }
773 };
774 
VersionSet(const std::string & dbname,const Options * options,TableCache * table_cache,const InternalKeyComparator * cmp)775 VersionSet::VersionSet(const std::string& dbname,
776                        const Options* options,
777                        TableCache* table_cache,
778                        const InternalKeyComparator* cmp)
779     : env_(options->env),
780       dbname_(dbname),
781       options_(options),
782       table_cache_(table_cache),
783       icmp_(*cmp),
784       next_file_number_(2),
785       manifest_file_number_(0),  // Filled by Recover()
786       last_sequence_(0),
787       log_number_(0),
788       prev_log_number_(0),
789       descriptor_file_(NULL),
790       descriptor_log_(NULL),
791       dummy_versions_(this),
792       current_(NULL) {
793   AppendVersion(new Version(this));
794 }
795 
~VersionSet()796 VersionSet::~VersionSet() {
797   current_->Unref();
798   assert(dummy_versions_.next_ == &dummy_versions_);  // List must be empty
799   delete descriptor_log_;
800   delete descriptor_file_;
801 }
802 
AppendVersion(Version * v)803 void VersionSet::AppendVersion(Version* v) {
804   // Make "v" current
805   assert(v->refs_ == 0);
806   assert(v != current_);
807   if (current_ != NULL) {
808     current_->Unref();
809   }
810   current_ = v;
811   v->Ref();
812 
813   // Append to linked list
814   v->prev_ = dummy_versions_.prev_;
815   v->next_ = &dummy_versions_;
816   v->prev_->next_ = v;
817   v->next_->prev_ = v;
818 }
819 
LogAndApply(VersionEdit * edit,port::Mutex * mu)820 Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
821   if (edit->has_log_number_) {
822     assert(edit->log_number_ >= log_number_);
823     assert(edit->log_number_ < next_file_number_);
824   } else {
825     edit->SetLogNumber(log_number_);
826   }
827 
828   if (!edit->has_prev_log_number_) {
829     edit->SetPrevLogNumber(prev_log_number_);
830   }
831 
832   edit->SetNextFile(next_file_number_);
833   edit->SetLastSequence(last_sequence_);
834 
835   Version* v = new Version(this);
836   {
837     Builder builder(this, current_);
838     builder.Apply(edit);
839     builder.SaveTo(v);
840   }
841   Finalize(v);
842 
843   // Initialize new descriptor log file if necessary by creating
844   // a temporary file that contains a snapshot of the current version.
845   std::string new_manifest_file;
846   Status s;
847   if (descriptor_log_ == NULL) {
848     // No reason to unlock *mu here since we only hit this path in the
849     // first call to LogAndApply (when opening the database).
850     assert(descriptor_file_ == NULL);
851     new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
852     edit->SetNextFile(next_file_number_);
853     s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
854     if (s.ok()) {
855       descriptor_log_ = new log::Writer(descriptor_file_);
856       s = WriteSnapshot(descriptor_log_);
857     }
858   }
859 
860   // Unlock during expensive MANIFEST log write
861   {
862     mu->Unlock();
863 
864     // Write new record to MANIFEST log
865     if (s.ok()) {
866       std::string record;
867       edit->EncodeTo(&record);
868       s = descriptor_log_->AddRecord(record);
869       if (s.ok()) {
870         s = descriptor_file_->Sync();
871       }
872       if (!s.ok()) {
873         Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
874       }
875     }
876 
877     // If we just created a new descriptor file, install it by writing a
878     // new CURRENT file that points to it.
879     if (s.ok() && !new_manifest_file.empty()) {
880       s = SetCurrentFile(env_, dbname_, manifest_file_number_);
881     }
882 
883     mu->Lock();
884   }
885 
886   // Install the new version
887   if (s.ok()) {
888     AppendVersion(v);
889     log_number_ = edit->log_number_;
890     prev_log_number_ = edit->prev_log_number_;
891   } else {
892     delete v;
893     if (!new_manifest_file.empty()) {
894       delete descriptor_log_;
895       delete descriptor_file_;
896       descriptor_log_ = NULL;
897       descriptor_file_ = NULL;
898       env_->DeleteFile(new_manifest_file);
899     }
900   }
901 
902   return s;
903 }
904 
Recover(bool * save_manifest)905 Status VersionSet::Recover(bool *save_manifest) {
906   struct LogReporter : public log::Reader::Reporter {
907     Status* status;
908     virtual void Corruption(size_t bytes, const Status& s) {
909       if (this->status->ok()) *this->status = s;
910     }
911   };
912 
913   // Read "CURRENT" file, which contains a pointer to the current manifest file
914   std::string current;
915   Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
916   if (!s.ok()) {
917     return s;
918   }
919   if (current.empty() || current[current.size()-1] != '\n') {
920     return Status::Corruption("CURRENT file does not end with newline");
921   }
922   current.resize(current.size() - 1);
923 
924   std::string dscname = dbname_ + "/" + current;
925   SequentialFile* file;
926   s = env_->NewSequentialFile(dscname, &file);
927   if (!s.ok()) {
928     return s;
929   }
930 
931   bool have_log_number = false;
932   bool have_prev_log_number = false;
933   bool have_next_file = false;
934   bool have_last_sequence = false;
935   uint64_t next_file = 0;
936   uint64_t last_sequence = 0;
937   uint64_t log_number = 0;
938   uint64_t prev_log_number = 0;
939   Builder builder(this, current_);
940 
941   {
942     LogReporter reporter;
943     reporter.status = &s;
944     log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
945     Slice record;
946     std::string scratch;
947     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
948       VersionEdit edit;
949       s = edit.DecodeFrom(record);
950       if (s.ok()) {
951         if (edit.has_comparator_ &&
952             edit.comparator_ != icmp_.user_comparator()->Name()) {
953           s = Status::InvalidArgument(
954               edit.comparator_ + " does not match existing comparator ",
955               icmp_.user_comparator()->Name());
956         }
957       }
958 
959       if (s.ok()) {
960         builder.Apply(&edit);
961       }
962 
963       if (edit.has_log_number_) {
964         log_number = edit.log_number_;
965         have_log_number = true;
966       }
967 
968       if (edit.has_prev_log_number_) {
969         prev_log_number = edit.prev_log_number_;
970         have_prev_log_number = true;
971       }
972 
973       if (edit.has_next_file_number_) {
974         next_file = edit.next_file_number_;
975         have_next_file = true;
976       }
977 
978       if (edit.has_last_sequence_) {
979         last_sequence = edit.last_sequence_;
980         have_last_sequence = true;
981       }
982     }
983   }
984   delete file;
985   file = NULL;
986 
987   if (s.ok()) {
988     if (!have_next_file) {
989       s = Status::Corruption("no meta-nextfile entry in descriptor");
990     } else if (!have_log_number) {
991       s = Status::Corruption("no meta-lognumber entry in descriptor");
992     } else if (!have_last_sequence) {
993       s = Status::Corruption("no last-sequence-number entry in descriptor");
994     }
995 
996     if (!have_prev_log_number) {
997       prev_log_number = 0;
998     }
999 
1000     MarkFileNumberUsed(prev_log_number);
1001     MarkFileNumberUsed(log_number);
1002   }
1003 
1004   if (s.ok()) {
1005     Version* v = new Version(this);
1006     builder.SaveTo(v);
1007     // Install recovered version
1008     Finalize(v);
1009     AppendVersion(v);
1010     manifest_file_number_ = next_file;
1011     next_file_number_ = next_file + 1;
1012     last_sequence_ = last_sequence;
1013     log_number_ = log_number;
1014     prev_log_number_ = prev_log_number;
1015 
1016     // See if we can reuse the existing MANIFEST file.
1017     if (ReuseManifest(dscname, current)) {
1018       // No need to save new manifest
1019     } else {
1020       *save_manifest = true;
1021     }
1022   }
1023 
1024   return s;
1025 }
1026 
ReuseManifest(const std::string & dscname,const std::string & dscbase)1027 bool VersionSet::ReuseManifest(const std::string& dscname,
1028                                const std::string& dscbase) {
1029   if (!options_->reuse_logs) {
1030     return false;
1031   }
1032   FileType manifest_type;
1033   uint64_t manifest_number;
1034   uint64_t manifest_size;
1035   if (!ParseFileName(dscbase, &manifest_number, &manifest_type) ||
1036       manifest_type != kDescriptorFile ||
1037       !env_->GetFileSize(dscname, &manifest_size).ok() ||
1038       // Make new compacted MANIFEST if old one is too big
1039       manifest_size >= TargetFileSize(options_)) {
1040     return false;
1041   }
1042 
1043   assert(descriptor_file_ == NULL);
1044   assert(descriptor_log_ == NULL);
1045   Status r = env_->NewAppendableFile(dscname, &descriptor_file_);
1046   if (!r.ok()) {
1047     Log(options_->info_log, "Reuse MANIFEST: %s\n", r.ToString().c_str());
1048     assert(descriptor_file_ == NULL);
1049     return false;
1050   }
1051 
1052   Log(options_->info_log, "Reusing MANIFEST %s\n", dscname.c_str());
1053   descriptor_log_ = new log::Writer(descriptor_file_, manifest_size);
1054   manifest_file_number_ = manifest_number;
1055   return true;
1056 }
1057 
MarkFileNumberUsed(uint64_t number)1058 void VersionSet::MarkFileNumberUsed(uint64_t number) {
1059   if (next_file_number_ <= number) {
1060     next_file_number_ = number + 1;
1061   }
1062 }
1063 
Finalize(Version * v)1064 void VersionSet::Finalize(Version* v) {
1065   // Precomputed best level for next compaction
1066   int best_level = -1;
1067   double best_score = -1;
1068 
1069   for (int level = 0; level < config::kNumLevels-1; level++) {
1070     double score;
1071     if (level == 0) {
1072       // We treat level-0 specially by bounding the number of files
1073       // instead of number of bytes for two reasons:
1074       //
1075       // (1) With larger write-buffer sizes, it is nice not to do too
1076       // many level-0 compactions.
1077       //
1078       // (2) The files in level-0 are merged on every read and
1079       // therefore we wish to avoid too many files when the individual
1080       // file size is small (perhaps because of a small write-buffer
1081       // setting, or very high compression ratios, or lots of
1082       // overwrites/deletions).
1083       score = v->files_[level].size() /
1084           static_cast<double>(config::kL0_CompactionTrigger);
1085     } else {
1086       // Compute the ratio of current size to size limit.
1087       const uint64_t level_bytes = TotalFileSize(v->files_[level]);
1088       score =
1089           static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
1090     }
1091 
1092     if (score > best_score) {
1093       best_level = level;
1094       best_score = score;
1095     }
1096   }
1097 
1098   v->compaction_level_ = best_level;
1099   v->compaction_score_ = best_score;
1100 }
1101 
WriteSnapshot(log::Writer * log)1102 Status VersionSet::WriteSnapshot(log::Writer* log) {
1103   // TODO: Break up into multiple records to reduce memory usage on recovery?
1104 
1105   // Save metadata
1106   VersionEdit edit;
1107   edit.SetComparatorName(icmp_.user_comparator()->Name());
1108 
1109   // Save compaction pointers
1110   for (int level = 0; level < config::kNumLevels; level++) {
1111     if (!compact_pointer_[level].empty()) {
1112       InternalKey key;
1113       key.DecodeFrom(compact_pointer_[level]);
1114       edit.SetCompactPointer(level, key);
1115     }
1116   }
1117 
1118   // Save files
1119   for (int level = 0; level < config::kNumLevels; level++) {
1120     const std::vector<FileMetaData*>& files = current_->files_[level];
1121     for (size_t i = 0; i < files.size(); i++) {
1122       const FileMetaData* f = files[i];
1123       edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
1124     }
1125   }
1126 
1127   std::string record;
1128   edit.EncodeTo(&record);
1129   return log->AddRecord(record);
1130 }
1131 
NumLevelFiles(int level) const1132 int VersionSet::NumLevelFiles(int level) const {
1133   assert(level >= 0);
1134   assert(level < config::kNumLevels);
1135   return current_->files_[level].size();
1136 }
1137 
LevelSummary(LevelSummaryStorage * scratch) const1138 const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
1139   // Update code if kNumLevels changes
1140   assert(config::kNumLevels == 7);
1141   snprintf(scratch->buffer, sizeof(scratch->buffer),
1142            "files[ %d %d %d %d %d %d %d ]",
1143            int(current_->files_[0].size()),
1144            int(current_->files_[1].size()),
1145            int(current_->files_[2].size()),
1146            int(current_->files_[3].size()),
1147            int(current_->files_[4].size()),
1148            int(current_->files_[5].size()),
1149            int(current_->files_[6].size()));
1150   return scratch->buffer;
1151 }
1152 
ApproximateOffsetOf(Version * v,const InternalKey & ikey)1153 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
1154   uint64_t result = 0;
1155   for (int level = 0; level < config::kNumLevels; level++) {
1156     const std::vector<FileMetaData*>& files = v->files_[level];
1157     for (size_t i = 0; i < files.size(); i++) {
1158       if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
1159         // Entire file is before "ikey", so just add the file size
1160         result += files[i]->file_size;
1161       } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
1162         // Entire file is after "ikey", so ignore
1163         if (level > 0) {
1164           // Files other than level 0 are sorted by meta->smallest, so
1165           // no further files in this level will contain data for
1166           // "ikey".
1167           break;
1168         }
1169       } else {
1170         // "ikey" falls in the range for this table.  Add the
1171         // approximate offset of "ikey" within the table.
1172         Table* tableptr;
1173         Iterator* iter = table_cache_->NewIterator(
1174             ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
1175         if (tableptr != NULL) {
1176           result += tableptr->ApproximateOffsetOf(ikey.Encode());
1177         }
1178         delete iter;
1179       }
1180     }
1181   }
1182   return result;
1183 }
1184 
AddLiveFiles(std::set<uint64_t> * live)1185 void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
1186   for (Version* v = dummy_versions_.next_;
1187        v != &dummy_versions_;
1188        v = v->next_) {
1189     for (int level = 0; level < config::kNumLevels; level++) {
1190       const std::vector<FileMetaData*>& files = v->files_[level];
1191       for (size_t i = 0; i < files.size(); i++) {
1192         live->insert(files[i]->number);
1193       }
1194     }
1195   }
1196 }
1197 
NumLevelBytes(int level) const1198 int64_t VersionSet::NumLevelBytes(int level) const {
1199   assert(level >= 0);
1200   assert(level < config::kNumLevels);
1201   return TotalFileSize(current_->files_[level]);
1202 }
1203 
MaxNextLevelOverlappingBytes()1204 int64_t VersionSet::MaxNextLevelOverlappingBytes() {
1205   int64_t result = 0;
1206   std::vector<FileMetaData*> overlaps;
1207   for (int level = 1; level < config::kNumLevels - 1; level++) {
1208     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1209       const FileMetaData* f = current_->files_[level][i];
1210       current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
1211                                      &overlaps);
1212       const int64_t sum = TotalFileSize(overlaps);
1213       if (sum > result) {
1214         result = sum;
1215       }
1216     }
1217   }
1218   return result;
1219 }
1220 
1221 // Stores the minimal range that covers all entries in inputs in
1222 // *smallest, *largest.
1223 // REQUIRES: inputs is not empty
GetRange(const std::vector<FileMetaData * > & inputs,InternalKey * smallest,InternalKey * largest)1224 void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
1225                           InternalKey* smallest,
1226                           InternalKey* largest) {
1227   assert(!inputs.empty());
1228   smallest->Clear();
1229   largest->Clear();
1230   for (size_t i = 0; i < inputs.size(); i++) {
1231     FileMetaData* f = inputs[i];
1232     if (i == 0) {
1233       *smallest = f->smallest;
1234       *largest = f->largest;
1235     } else {
1236       if (icmp_.Compare(f->smallest, *smallest) < 0) {
1237         *smallest = f->smallest;
1238       }
1239       if (icmp_.Compare(f->largest, *largest) > 0) {
1240         *largest = f->largest;
1241       }
1242     }
1243   }
1244 }
1245 
1246 // Stores the minimal range that covers all entries in inputs1 and inputs2
1247 // in *smallest, *largest.
1248 // REQUIRES: inputs is not empty
GetRange2(const std::vector<FileMetaData * > & inputs1,const std::vector<FileMetaData * > & inputs2,InternalKey * smallest,InternalKey * largest)1249 void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
1250                            const std::vector<FileMetaData*>& inputs2,
1251                            InternalKey* smallest,
1252                            InternalKey* largest) {
1253   std::vector<FileMetaData*> all = inputs1;
1254   all.insert(all.end(), inputs2.begin(), inputs2.end());
1255   GetRange(all, smallest, largest);
1256 }
1257 
MakeInputIterator(Compaction * c)1258 Iterator* VersionSet::MakeInputIterator(Compaction* c) {
1259   ReadOptions options;
1260   options.verify_checksums = options_->paranoid_checks;
1261   options.fill_cache = false;
1262 
1263   // Level-0 files have to be merged together.  For other levels,
1264   // we will make a concatenating iterator per level.
1265   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
1266   const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
1267   Iterator** list = new Iterator*[space];
1268   int num = 0;
1269   for (int which = 0; which < 2; which++) {
1270     if (!c->inputs_[which].empty()) {
1271       if (c->level() + which == 0) {
1272         const std::vector<FileMetaData*>& files = c->inputs_[which];
1273         for (size_t i = 0; i < files.size(); i++) {
1274           list[num++] = table_cache_->NewIterator(
1275               options, files[i]->number, files[i]->file_size);
1276         }
1277       } else {
1278         // Create concatenating iterator for the files from this level
1279         list[num++] = NewTwoLevelIterator(
1280             new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
1281             &GetFileIterator, table_cache_, options);
1282       }
1283     }
1284   }
1285   assert(num <= space);
1286   Iterator* result = NewMergingIterator(&icmp_, list, num);
1287   delete[] list;
1288   return result;
1289 }
1290 
PickCompaction()1291 Compaction* VersionSet::PickCompaction() {
1292   Compaction* c;
1293   int level;
1294 
1295   // We prefer compactions triggered by too much data in a level over
1296   // the compactions triggered by seeks.
1297   const bool size_compaction = (current_->compaction_score_ >= 1);
1298   const bool seek_compaction = (current_->file_to_compact_ != NULL);
1299   if (size_compaction) {
1300     level = current_->compaction_level_;
1301     assert(level >= 0);
1302     assert(level+1 < config::kNumLevels);
1303     c = new Compaction(options_, level);
1304 
1305     // Pick the first file that comes after compact_pointer_[level]
1306     for (size_t i = 0; i < current_->files_[level].size(); i++) {
1307       FileMetaData* f = current_->files_[level][i];
1308       if (compact_pointer_[level].empty() ||
1309           icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
1310         c->inputs_[0].push_back(f);
1311         break;
1312       }
1313     }
1314     if (c->inputs_[0].empty()) {
1315       // Wrap-around to the beginning of the key space
1316       c->inputs_[0].push_back(current_->files_[level][0]);
1317     }
1318   } else if (seek_compaction) {
1319     level = current_->file_to_compact_level_;
1320     c = new Compaction(options_, level);
1321     c->inputs_[0].push_back(current_->file_to_compact_);
1322   } else {
1323     return NULL;
1324   }
1325 
1326   c->input_version_ = current_;
1327   c->input_version_->Ref();
1328 
1329   // Files in level 0 may overlap each other, so pick up all overlapping ones
1330   if (level == 0) {
1331     InternalKey smallest, largest;
1332     GetRange(c->inputs_[0], &smallest, &largest);
1333     // Note that the next call will discard the file we placed in
1334     // c->inputs_[0] earlier and replace it with an overlapping set
1335     // which will include the picked file.
1336     current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
1337     assert(!c->inputs_[0].empty());
1338   }
1339 
1340   SetupOtherInputs(c);
1341 
1342   return c;
1343 }
1344 
SetupOtherInputs(Compaction * c)1345 void VersionSet::SetupOtherInputs(Compaction* c) {
1346   const int level = c->level();
1347   InternalKey smallest, largest;
1348   GetRange(c->inputs_[0], &smallest, &largest);
1349 
1350   current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
1351 
1352   // Get entire range covered by compaction
1353   InternalKey all_start, all_limit;
1354   GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1355 
1356   // See if we can grow the number of inputs in "level" without
1357   // changing the number of "level+1" files we pick up.
1358   if (!c->inputs_[1].empty()) {
1359     std::vector<FileMetaData*> expanded0;
1360     current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
1361     const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
1362     const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
1363     const int64_t expanded0_size = TotalFileSize(expanded0);
1364     if (expanded0.size() > c->inputs_[0].size() &&
1365         inputs1_size + expanded0_size <
1366             ExpandedCompactionByteSizeLimit(options_)) {
1367       InternalKey new_start, new_limit;
1368       GetRange(expanded0, &new_start, &new_limit);
1369       std::vector<FileMetaData*> expanded1;
1370       current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
1371                                      &expanded1);
1372       if (expanded1.size() == c->inputs_[1].size()) {
1373         Log(options_->info_log,
1374             "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
1375             level,
1376             int(c->inputs_[0].size()),
1377             int(c->inputs_[1].size()),
1378             long(inputs0_size), long(inputs1_size),
1379             int(expanded0.size()),
1380             int(expanded1.size()),
1381             long(expanded0_size), long(inputs1_size));
1382         smallest = new_start;
1383         largest = new_limit;
1384         c->inputs_[0] = expanded0;
1385         c->inputs_[1] = expanded1;
1386         GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
1387       }
1388     }
1389   }
1390 
1391   // Compute the set of grandparent files that overlap this compaction
1392   // (parent == level+1; grandparent == level+2)
1393   if (level + 2 < config::kNumLevels) {
1394     current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
1395                                    &c->grandparents_);
1396   }
1397 
1398   if (false) {
1399     Log(options_->info_log, "Compacting %d '%s' .. '%s'",
1400         level,
1401         smallest.DebugString().c_str(),
1402         largest.DebugString().c_str());
1403   }
1404 
1405   // Update the place where we will do the next compaction for this level.
1406   // We update this immediately instead of waiting for the VersionEdit
1407   // to be applied so that if the compaction fails, we will try a different
1408   // key range next time.
1409   compact_pointer_[level] = largest.Encode().ToString();
1410   c->edit_.SetCompactPointer(level, largest);
1411 }
1412 
CompactRange(int level,const InternalKey * begin,const InternalKey * end)1413 Compaction* VersionSet::CompactRange(
1414     int level,
1415     const InternalKey* begin,
1416     const InternalKey* end) {
1417   std::vector<FileMetaData*> inputs;
1418   current_->GetOverlappingInputs(level, begin, end, &inputs);
1419   if (inputs.empty()) {
1420     return NULL;
1421   }
1422 
1423   // Avoid compacting too much in one shot in case the range is large.
1424   // But we cannot do this for level-0 since level-0 files can overlap
1425   // and we must not pick one file and drop another older file if the
1426   // two files overlap.
1427   if (level > 0) {
1428     const uint64_t limit = MaxFileSizeForLevel(options_, level);
1429     uint64_t total = 0;
1430     for (size_t i = 0; i < inputs.size(); i++) {
1431       uint64_t s = inputs[i]->file_size;
1432       total += s;
1433       if (total >= limit) {
1434         inputs.resize(i + 1);
1435         break;
1436       }
1437     }
1438   }
1439 
1440   Compaction* c = new Compaction(options_, level);
1441   c->input_version_ = current_;
1442   c->input_version_->Ref();
1443   c->inputs_[0] = inputs;
1444   SetupOtherInputs(c);
1445   return c;
1446 }
1447 
Compaction(const Options * options,int level)1448 Compaction::Compaction(const Options* options, int level)
1449     : level_(level),
1450       max_output_file_size_(MaxFileSizeForLevel(options, level)),
1451       input_version_(NULL),
1452       grandparent_index_(0),
1453       seen_key_(false),
1454       overlapped_bytes_(0) {
1455   for (int i = 0; i < config::kNumLevels; i++) {
1456     level_ptrs_[i] = 0;
1457   }
1458 }
1459 
~Compaction()1460 Compaction::~Compaction() {
1461   if (input_version_ != NULL) {
1462     input_version_->Unref();
1463   }
1464 }
1465 
IsTrivialMove() const1466 bool Compaction::IsTrivialMove() const {
1467   const VersionSet* vset = input_version_->vset_;
1468   // Avoid a move if there is lots of overlapping grandparent data.
1469   // Otherwise, the move could create a parent file that will require
1470   // a very expensive merge later on.
1471   return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
1472           TotalFileSize(grandparents_) <=
1473               MaxGrandParentOverlapBytes(vset->options_));
1474 }
1475 
AddInputDeletions(VersionEdit * edit)1476 void Compaction::AddInputDeletions(VersionEdit* edit) {
1477   for (int which = 0; which < 2; which++) {
1478     for (size_t i = 0; i < inputs_[which].size(); i++) {
1479       edit->DeleteFile(level_ + which, inputs_[which][i]->number);
1480     }
1481   }
1482 }
1483 
IsBaseLevelForKey(const Slice & user_key)1484 bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
1485   // Maybe use binary search to find right entry instead of linear search?
1486   const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
1487   for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
1488     const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
1489     for (; level_ptrs_[lvl] < files.size(); ) {
1490       FileMetaData* f = files[level_ptrs_[lvl]];
1491       if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
1492         // We've advanced far enough
1493         if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
1494           // Key falls in this file's range, so definitely not base level
1495           return false;
1496         }
1497         break;
1498       }
1499       level_ptrs_[lvl]++;
1500     }
1501   }
1502   return true;
1503 }
1504 
ShouldStopBefore(const Slice & internal_key)1505 bool Compaction::ShouldStopBefore(const Slice& internal_key) {
1506   const VersionSet* vset = input_version_->vset_;
1507   // Scan to find earliest grandparent file that contains key.
1508   const InternalKeyComparator* icmp = &vset->icmp_;
1509   while (grandparent_index_ < grandparents_.size() &&
1510       icmp->Compare(internal_key,
1511                     grandparents_[grandparent_index_]->largest.Encode()) > 0) {
1512     if (seen_key_) {
1513       overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
1514     }
1515     grandparent_index_++;
1516   }
1517   seen_key_ = true;
1518 
1519   if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
1520     // Too much overlap for current output; start new output
1521     overlapped_bytes_ = 0;
1522     return true;
1523   } else {
1524     return false;
1525   }
1526 }
1527 
ReleaseInputs()1528 void Compaction::ReleaseInputs() {
1529   if (input_version_ != NULL) {
1530     input_version_->Unref();
1531     input_version_ = NULL;
1532   }
1533 }
1534 
1535 }  // namespace leveldb
1536