1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include <string>
11 #include <utility>
12 #include <vector>
13 
14 #include "db/compaction/compaction_picker_level.h"
15 #include "logging/log_buffer.h"
16 #include "test_util/sync_point.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
NeedsCompaction(const VersionStorageInfo * vstorage) const20 bool LevelCompactionPicker::NeedsCompaction(
21     const VersionStorageInfo* vstorage) const {
22   if (!vstorage->ExpiredTtlFiles().empty()) {
23     return true;
24   }
25   if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
26     return true;
27   }
28   if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
29     return true;
30   }
31   if (!vstorage->FilesMarkedForCompaction().empty()) {
32     return true;
33   }
34   for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
35     if (vstorage->CompactionScore(i) >= 1) {
36       return true;
37     }
38   }
39   return false;
40 }
41 
42 namespace {
43 // A class to build a leveled compaction step-by-step.
44 class LevelCompactionBuilder {
45  public:
LevelCompactionBuilder(const std::string & cf_name,VersionStorageInfo * vstorage,SequenceNumber earliest_mem_seqno,CompactionPicker * compaction_picker,LogBuffer * log_buffer,const MutableCFOptions & mutable_cf_options,const ImmutableCFOptions & ioptions)46   LevelCompactionBuilder(const std::string& cf_name,
47                          VersionStorageInfo* vstorage,
48                          SequenceNumber earliest_mem_seqno,
49                          CompactionPicker* compaction_picker,
50                          LogBuffer* log_buffer,
51                          const MutableCFOptions& mutable_cf_options,
52                          const ImmutableCFOptions& ioptions)
53       : cf_name_(cf_name),
54         vstorage_(vstorage),
55         earliest_mem_seqno_(earliest_mem_seqno),
56         compaction_picker_(compaction_picker),
57         log_buffer_(log_buffer),
58         mutable_cf_options_(mutable_cf_options),
59         ioptions_(ioptions) {}
60 
61   // Pick and return a compaction.
62   Compaction* PickCompaction();
63 
64   // Pick the initial files to compact to the next level. (or together
65   // in Intra-L0 compactions)
66   void SetupInitialFiles();
67 
68   // If the initial files are from L0 level, pick other L0
69   // files if needed.
70   bool SetupOtherL0FilesIfNeeded();
71 
72   // Based on initial files, setup other files need to be compacted
73   // in this compaction, accordingly.
74   bool SetupOtherInputsIfNeeded();
75 
76   Compaction* GetCompaction();
77 
78   // For the specfied level, pick a file that we want to compact.
79   // Returns false if there is no file to compact.
80   // If it returns true, inputs->files.size() will be exactly one.
81   // If level is 0 and there is already a compaction on that level, this
82   // function will return false.
83   bool PickFileToCompact();
84 
85   // For L0->L0, picks the longest span of files that aren't currently
86   // undergoing compaction for which work-per-deleted-file decreases. The span
87   // always starts from the newest L0 file.
88   //
89   // Intra-L0 compaction is independent of all other files, so it can be
90   // performed even when L0->base_level compactions are blocked.
91   //
92   // Returns true if `inputs` is populated with a span of files to be compacted;
93   // otherwise, returns false.
94   bool PickIntraL0Compaction();
95 
96   void PickExpiredTtlFiles();
97 
98   void PickFilesMarkedForPeriodicCompaction();
99 
100   const std::string& cf_name_;
101   VersionStorageInfo* vstorage_;
102   SequenceNumber earliest_mem_seqno_;
103   CompactionPicker* compaction_picker_;
104   LogBuffer* log_buffer_;
105   int start_level_ = -1;
106   int output_level_ = -1;
107   int parent_index_ = -1;
108   int base_index_ = -1;
109   double start_level_score_ = 0;
110   bool is_manual_ = false;
111   CompactionInputFiles start_level_inputs_;
112   std::vector<CompactionInputFiles> compaction_inputs_;
113   CompactionInputFiles output_level_inputs_;
114   std::vector<FileMetaData*> grandparents_;
115   CompactionReason compaction_reason_ = CompactionReason::kUnknown;
116 
117   const MutableCFOptions& mutable_cf_options_;
118   const ImmutableCFOptions& ioptions_;
119   // Pick a path ID to place a newly generated file, with its level
120   static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
121                             const MutableCFOptions& mutable_cf_options,
122                             int level);
123 
124   static const int kMinFilesForIntraL0Compaction = 4;
125 };
126 
PickExpiredTtlFiles()127 void LevelCompactionBuilder::PickExpiredTtlFiles() {
128   if (vstorage_->ExpiredTtlFiles().empty()) {
129     return;
130   }
131 
132   auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
133     // If it's being compacted it has nothing to do here.
134     // If this assert() fails that means that some function marked some
135     // files as being_compacted, but didn't call ComputeCompactionScore()
136     assert(!level_file.second->being_compacted);
137     start_level_ = level_file.first;
138     output_level_ =
139         (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
140 
141     if ((start_level_ == vstorage_->num_non_empty_levels() - 1) ||
142         (start_level_ == 0 &&
143          !compaction_picker_->level0_compactions_in_progress()->empty())) {
144       return false;
145     }
146 
147     start_level_inputs_.files = {level_file.second};
148     start_level_inputs_.level = start_level_;
149     return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
150                                                       &start_level_inputs_);
151   };
152 
153   for (auto& level_file : vstorage_->ExpiredTtlFiles()) {
154     if (continuation(level_file)) {
155       // found the compaction!
156       return;
157     }
158   }
159 
160   start_level_inputs_.files.clear();
161 }
162 
PickFilesMarkedForPeriodicCompaction()163 void LevelCompactionBuilder::PickFilesMarkedForPeriodicCompaction() {
164   if (vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
165     return;
166   }
167 
168   auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
169     // If it's being compacted it has nothing to do here.
170     // If this assert() fails that means that some function marked some
171     // files as being_compacted, but didn't call ComputeCompactionScore()
172     assert(!level_file.second->being_compacted);
173     output_level_ = start_level_ = level_file.first;
174 
175     if (start_level_ == 0 &&
176         !compaction_picker_->level0_compactions_in_progress()->empty()) {
177       return false;
178     }
179 
180     start_level_inputs_.files = {level_file.second};
181     start_level_inputs_.level = start_level_;
182     return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
183                                                       &start_level_inputs_);
184   };
185 
186   for (auto& level_file : vstorage_->FilesMarkedForPeriodicCompaction()) {
187     if (continuation(level_file)) {
188       // found the compaction!
189       return;
190     }
191   }
192 
193   start_level_inputs_.files.clear();
194 }
195 
SetupInitialFiles()196 void LevelCompactionBuilder::SetupInitialFiles() {
197   // Find the compactions by size on all levels.
198   bool skipped_l0_to_base = false;
199   for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) {
200     start_level_score_ = vstorage_->CompactionScore(i);
201     start_level_ = vstorage_->CompactionScoreLevel(i);
202     assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1));
203     if (start_level_score_ >= 1) {
204       if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) {
205         // If L0->base_level compaction is pending, don't schedule further
206         // compaction from base level. Otherwise L0->base_level compaction
207         // may starve.
208         continue;
209       }
210       output_level_ =
211           (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1;
212       if (PickFileToCompact()) {
213         // found the compaction!
214         if (start_level_ == 0) {
215           // L0 score = `num L0 files` / `level0_file_num_compaction_trigger`
216           compaction_reason_ = CompactionReason::kLevelL0FilesNum;
217         } else {
218           // L1+ score = `Level files size` / `MaxBytesForLevel`
219           compaction_reason_ = CompactionReason::kLevelMaxLevelSize;
220         }
221         break;
222       } else {
223         // didn't find the compaction, clear the inputs
224         start_level_inputs_.clear();
225         if (start_level_ == 0) {
226           skipped_l0_to_base = true;
227           // L0->base_level may be blocked due to ongoing L0->base_level
228           // compactions. It may also be blocked by an ongoing compaction from
229           // base_level downwards.
230           //
231           // In these cases, to reduce L0 file count and thus reduce likelihood
232           // of write stalls, we can attempt compacting a span of files within
233           // L0.
234           if (PickIntraL0Compaction()) {
235             output_level_ = 0;
236             compaction_reason_ = CompactionReason::kLevelL0FilesNum;
237             break;
238           }
239         }
240       }
241     }
242   }
243 
244   // if we didn't find a compaction, check if there are any files marked for
245   // compaction
246   if (start_level_inputs_.empty()) {
247     parent_index_ = base_index_ = -1;
248 
249     compaction_picker_->PickFilesMarkedForCompaction(
250         cf_name_, vstorage_, &start_level_, &output_level_,
251         &start_level_inputs_);
252     if (!start_level_inputs_.empty()) {
253       is_manual_ = true;
254       compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
255       return;
256     }
257   }
258 
259   // Bottommost Files Compaction on deleting tombstones
260   if (start_level_inputs_.empty()) {
261     size_t i;
262     for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
263          ++i) {
264       auto& level_and_file = vstorage_->BottommostFilesMarkedForCompaction()[i];
265       assert(!level_and_file.second->being_compacted);
266       start_level_inputs_.level = output_level_ = start_level_ =
267           level_and_file.first;
268       start_level_inputs_.files = {level_and_file.second};
269       if (compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
270                                                      &start_level_inputs_)) {
271         break;
272       }
273     }
274     if (i == vstorage_->BottommostFilesMarkedForCompaction().size()) {
275       start_level_inputs_.clear();
276     } else {
277       assert(!start_level_inputs_.empty());
278       compaction_reason_ = CompactionReason::kBottommostFiles;
279       return;
280     }
281   }
282 
283   // TTL Compaction
284   if (start_level_inputs_.empty()) {
285     PickExpiredTtlFiles();
286     if (!start_level_inputs_.empty()) {
287       compaction_reason_ = CompactionReason::kTtl;
288       return;
289     }
290   }
291 
292   // Periodic Compaction
293   if (start_level_inputs_.empty()) {
294     PickFilesMarkedForPeriodicCompaction();
295     if (!start_level_inputs_.empty()) {
296       compaction_reason_ = CompactionReason::kPeriodicCompaction;
297       return;
298     }
299   }
300 }
301 
SetupOtherL0FilesIfNeeded()302 bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {
303   if (start_level_ == 0 && output_level_ != 0) {
304     return compaction_picker_->GetOverlappingL0Files(
305         vstorage_, &start_level_inputs_, output_level_, &parent_index_);
306   }
307   return true;
308 }
309 
SetupOtherInputsIfNeeded()310 bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
311   // Setup input files from output level. For output to L0, we only compact
312   // spans of files that do not interact with any pending compactions, so don't
313   // need to consider other levels.
314   if (output_level_ != 0) {
315     output_level_inputs_.level = output_level_;
316     if (!compaction_picker_->SetupOtherInputs(
317             cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_,
318             &output_level_inputs_, &parent_index_, base_index_)) {
319       return false;
320     }
321 
322     compaction_inputs_.push_back(start_level_inputs_);
323     if (!output_level_inputs_.empty()) {
324       compaction_inputs_.push_back(output_level_inputs_);
325     }
326 
327     // In some edge cases we could pick a compaction that will be compacting
328     // a key range that overlap with another running compaction, and both
329     // of them have the same output level. This could happen if
330     // (1) we are running a non-exclusive manual compaction
331     // (2) AddFile ingest a new file into the LSM tree
332     // We need to disallow this from happening.
333     if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_,
334                                                             output_level_)) {
335       // This compaction output could potentially conflict with the output
336       // of a currently running compaction, we cannot run it.
337       return false;
338     }
339     compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
340                                         output_level_inputs_, &grandparents_);
341   } else {
342     compaction_inputs_.push_back(start_level_inputs_);
343   }
344   return true;
345 }
346 
PickCompaction()347 Compaction* LevelCompactionBuilder::PickCompaction() {
348   // Pick up the first file to start compaction. It may have been extended
349   // to a clean cut.
350   SetupInitialFiles();
351   if (start_level_inputs_.empty()) {
352     return nullptr;
353   }
354   assert(start_level_ >= 0 && output_level_ >= 0);
355 
356   // If it is a L0 -> base level compaction, we need to set up other L0
357   // files if needed.
358   if (!SetupOtherL0FilesIfNeeded()) {
359     return nullptr;
360   }
361 
362   // Pick files in the output level and expand more files in the start level
363   // if needed.
364   if (!SetupOtherInputsIfNeeded()) {
365     return nullptr;
366   }
367 
368   // Form a compaction object containing the files we picked.
369   Compaction* c = GetCompaction();
370 
371   TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);
372 
373   return c;
374 }
375 
GetCompaction()376 Compaction* LevelCompactionBuilder::GetCompaction() {
377   auto c = new Compaction(
378       vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_),
379       output_level_,
380       MaxFileSizeForLevel(mutable_cf_options_, output_level_,
381                           ioptions_.compaction_style, vstorage_->base_level(),
382                           ioptions_.level_compaction_dynamic_level_bytes),
383       mutable_cf_options_.max_compaction_bytes,
384       GetPathId(ioptions_, mutable_cf_options_, output_level_),
385       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
386                          output_level_, vstorage_->base_level()),
387       GetCompressionOptions(ioptions_, vstorage_, output_level_),
388       /* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
389       start_level_score_, false /* deletion_compaction */, compaction_reason_);
390 
391   // If it's level 0 compaction, make sure we don't execute any other level 0
392   // compactions in parallel
393   compaction_picker_->RegisterCompaction(c);
394 
395   // Creating a compaction influences the compaction score because the score
396   // takes running compactions into account (by skipping files that are already
397   // being compacted). Since we just changed compaction score, we recalculate it
398   // here
399   vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
400   return c;
401 }
402 
403 /*
404  * Find the optimal path to place a file
405  * Given a level, finds the path where levels up to it will fit in levels
406  * up to and including this path
407  */
GetPathId(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,int level)408 uint32_t LevelCompactionBuilder::GetPathId(
409     const ImmutableCFOptions& ioptions,
410     const MutableCFOptions& mutable_cf_options, int level) {
411   uint32_t p = 0;
412   assert(!ioptions.cf_paths.empty());
413 
414   // size remaining in the most recent path
415   uint64_t current_path_size = ioptions.cf_paths[0].target_size;
416 
417   uint64_t level_size;
418   int cur_level = 0;
419 
420   // max_bytes_for_level_base denotes L1 size.
421   // We estimate L0 size to be the same as L1.
422   level_size = mutable_cf_options.max_bytes_for_level_base;
423 
424   // Last path is the fallback
425   while (p < ioptions.cf_paths.size() - 1) {
426     if (level_size <= current_path_size) {
427       if (cur_level == level) {
428         // Does desired level fit in this path?
429         return p;
430       } else {
431         current_path_size -= level_size;
432         if (cur_level > 0) {
433           if (ioptions.level_compaction_dynamic_level_bytes) {
434             // Currently, level_compaction_dynamic_level_bytes is ignored when
435             // multiple db paths are specified. https://github.com/facebook/
436             // rocksdb/blob/master/db/column_family.cc.
437             // Still, adding this check to avoid accidentally using
438             // max_bytes_for_level_multiplier_additional
439             level_size = static_cast<uint64_t>(
440                 level_size * mutable_cf_options.max_bytes_for_level_multiplier);
441           } else {
442             level_size = static_cast<uint64_t>(
443                 level_size * mutable_cf_options.max_bytes_for_level_multiplier *
444                 mutable_cf_options.MaxBytesMultiplerAdditional(cur_level));
445           }
446         }
447         cur_level++;
448         continue;
449       }
450     }
451     p++;
452     current_path_size = ioptions.cf_paths[p].target_size;
453   }
454   return p;
455 }
456 
PickFileToCompact()457 bool LevelCompactionBuilder::PickFileToCompact() {
458   // level 0 files are overlapping. So we cannot pick more
459   // than one concurrent compactions at this level. This
460   // could be made better by looking at key-ranges that are
461   // being compacted at level 0.
462   if (start_level_ == 0 &&
463       !compaction_picker_->level0_compactions_in_progress()->empty()) {
464     TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0");
465     return false;
466   }
467 
468   start_level_inputs_.clear();
469 
470   assert(start_level_ >= 0);
471 
472   // Pick the largest file in this level that is not already
473   // being compacted
474   const std::vector<int>& file_size =
475       vstorage_->FilesByCompactionPri(start_level_);
476   const std::vector<FileMetaData*>& level_files =
477       vstorage_->LevelFiles(start_level_);
478 
479   unsigned int cmp_idx;
480   for (cmp_idx = vstorage_->NextCompactionIndex(start_level_);
481        cmp_idx < file_size.size(); cmp_idx++) {
482     int index = file_size[cmp_idx];
483     auto* f = level_files[index];
484 
485     // do not pick a file to compact if it is being compacted
486     // from n-1 level.
487     if (f->being_compacted) {
488       continue;
489     }
490 
491     start_level_inputs_.files.push_back(f);
492     start_level_inputs_.level = start_level_;
493     if (!compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
494                                                     &start_level_inputs_) ||
495         compaction_picker_->FilesRangeOverlapWithCompaction(
496             {start_level_inputs_}, output_level_)) {
497       // A locked (pending compaction) input-level file was pulled in due to
498       // user-key overlap.
499       start_level_inputs_.clear();
500       continue;
501     }
502 
503     // Now that input level is fully expanded, we check whether any output files
504     // are locked due to pending compaction.
505     //
506     // Note we rely on ExpandInputsToCleanCut() to tell us whether any output-
507     // level files are locked, not just the extra ones pulled in for user-key
508     // overlap.
509     InternalKey smallest, largest;
510     compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest);
511     CompactionInputFiles output_level_inputs;
512     output_level_inputs.level = output_level_;
513     vstorage_->GetOverlappingInputs(output_level_, &smallest, &largest,
514                                     &output_level_inputs.files);
515     if (!output_level_inputs.empty() &&
516         !compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
517                                                     &output_level_inputs)) {
518       start_level_inputs_.clear();
519       continue;
520     }
521     base_index_ = index;
522     break;
523   }
524 
525   // store where to start the iteration in the next call to PickCompaction
526   vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);
527 
528   return start_level_inputs_.size() > 0;
529 }
530 
PickIntraL0Compaction()531 bool LevelCompactionBuilder::PickIntraL0Compaction() {
532   start_level_inputs_.clear();
533   const std::vector<FileMetaData*>& level_files =
534       vstorage_->LevelFiles(0 /* level */);
535   if (level_files.size() <
536           static_cast<size_t>(
537               mutable_cf_options_.level0_file_num_compaction_trigger + 2) ||
538       level_files[0]->being_compacted) {
539     // If L0 isn't accumulating much files beyond the regular trigger, don't
540     // resort to L0->L0 compaction yet.
541     return false;
542   }
543   return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction,
544                                port::kMaxUint64,
545                                mutable_cf_options_.max_compaction_bytes,
546                                &start_level_inputs_, earliest_mem_seqno_);
547 }
548 }  // namespace
549 
PickCompaction(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,LogBuffer * log_buffer,SequenceNumber earliest_mem_seqno)550 Compaction* LevelCompactionPicker::PickCompaction(
551     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
552     VersionStorageInfo* vstorage, LogBuffer* log_buffer,
553     SequenceNumber earliest_mem_seqno) {
554   LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this,
555                                  log_buffer, mutable_cf_options, ioptions_);
556   return builder.PickCompaction();
557 }
558 }  // namespace ROCKSDB_NAMESPACE
559