1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction_picker_universal.h"
11 #ifndef ROCKSDB_LITE
12 
13 #include <cinttypes>
14 #include <limits>
15 #include <queue>
16 #include <string>
17 #include <utility>
18 #include "db/column_family.h"
19 #include "file/filename.h"
20 #include "logging/log_buffer.h"
21 #include "monitoring/statistics.h"
22 #include "test_util/sync_point.h"
23 #include "util/random.h"
24 #include "util/string_util.h"
25 
26 namespace ROCKSDB_NAMESPACE {
27 namespace {
28 // A helper class that form universal compactions. The class is used by
29 // UniversalCompactionPicker::PickCompaction().
30 // The usage is to create the class, and get the compaction object by calling
31 // PickCompaction().
32 class UniversalCompactionBuilder {
33  public:
UniversalCompactionBuilder(const ImmutableCFOptions & ioptions,const InternalKeyComparator * icmp,const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,UniversalCompactionPicker * picker,LogBuffer * log_buffer)34   UniversalCompactionBuilder(const ImmutableCFOptions& ioptions,
35                              const InternalKeyComparator* icmp,
36                              const std::string& cf_name,
37                              const MutableCFOptions& mutable_cf_options,
38                              VersionStorageInfo* vstorage,
39                              UniversalCompactionPicker* picker,
40                              LogBuffer* log_buffer)
41       : ioptions_(ioptions),
42         icmp_(icmp),
43         cf_name_(cf_name),
44         mutable_cf_options_(mutable_cf_options),
45         vstorage_(vstorage),
46         picker_(picker),
47         log_buffer_(log_buffer) {}
48 
49   // Form and return the compaction object. The caller owns return object.
50   Compaction* PickCompaction();
51 
52  private:
53   struct SortedRun {
SortedRunROCKSDB_NAMESPACE::__anon109b84230111::UniversalCompactionBuilder::SortedRun54     SortedRun(int _level, FileMetaData* _file, uint64_t _size,
55               uint64_t _compensated_file_size, bool _being_compacted)
56         : level(_level),
57           file(_file),
58           size(_size),
59           compensated_file_size(_compensated_file_size),
60           being_compacted(_being_compacted) {
61       assert(compensated_file_size > 0);
62       assert(level != 0 || file != nullptr);
63     }
64 
65     void Dump(char* out_buf, size_t out_buf_size,
66               bool print_path = false) const;
67 
68     // sorted_run_count is added into the string to print
69     void DumpSizeInfo(char* out_buf, size_t out_buf_size,
70                       size_t sorted_run_count) const;
71 
72     int level;
73     // `file` Will be null for level > 0. For level = 0, the sorted run is
74     // for this file.
75     FileMetaData* file;
76     // For level > 0, `size` and `compensated_file_size` are sum of sizes all
77     // files in the level. `being_compacted` should be the same for all files
78     // in a non-zero level. Use the value here.
79     uint64_t size;
80     uint64_t compensated_file_size;
81     bool being_compacted;
82   };
83 
84   // Pick Universal compaction to limit read amplification
85   Compaction* PickCompactionToReduceSortedRuns(
86       unsigned int ratio, unsigned int max_number_of_files_to_compact);
87 
88   // Pick Universal compaction to limit space amplification.
89   Compaction* PickCompactionToReduceSizeAmp();
90 
91   Compaction* PickDeleteTriggeredCompaction();
92 
93   // Form a compaction from the sorted run indicated by start_index to the
94   // oldest sorted run.
95   // The caller is responsible for making sure that those files are not in
96   // compaction.
97   Compaction* PickCompactionToOldest(size_t start_index,
98                                      CompactionReason compaction_reason);
99 
100   // Try to pick periodic compaction. The caller should only call it
101   // if there is at least one file marked for periodic compaction.
102   // null will be returned if no such a compaction can be formed
103   // because some files are being compacted.
104   Compaction* PickPeriodicCompaction();
105 
106   // Used in universal compaction when the enabled_trivial_move
107   // option is set. Checks whether there are any overlapping files
108   // in the input. Returns true if the input files are non
109   // overlapping.
110   bool IsInputFilesNonOverlapping(Compaction* c);
111 
112   const ImmutableCFOptions& ioptions_;
113   const InternalKeyComparator* icmp_;
114   double score_;
115   std::vector<SortedRun> sorted_runs_;
116   const std::string& cf_name_;
117   const MutableCFOptions& mutable_cf_options_;
118   VersionStorageInfo* vstorage_;
119   UniversalCompactionPicker* picker_;
120   LogBuffer* log_buffer_;
121 
122   static std::vector<SortedRun> CalculateSortedRuns(
123       const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions,
124       const MutableCFOptions& mutable_cf_options);
125 
126   // Pick a path ID to place a newly generated file, with its estimated file
127   // size.
128   static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
129                             const MutableCFOptions& mutable_cf_options,
130                             uint64_t file_size);
131 };
132 
133 // Used in universal compaction when trivial move is enabled.
134 // This structure is used for the construction of min heap
135 // that contains the file meta data, the level of the file
136 // and the index of the file in that level
137 
138 struct InputFileInfo {
InputFileInfoROCKSDB_NAMESPACE::__anon109b84230111::InputFileInfo139   InputFileInfo() : f(nullptr), level(0), index(0) {}
140 
141   FileMetaData* f;
142   size_t level;
143   size_t index;
144 };
145 
146 // Used in universal compaction when trivial move is enabled.
147 // This comparator is used for the construction of min heap
148 // based on the smallest key of the file.
149 struct SmallestKeyHeapComparator {
SmallestKeyHeapComparatorROCKSDB_NAMESPACE::__anon109b84230111::SmallestKeyHeapComparator150   explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; }
151 
operator ()ROCKSDB_NAMESPACE::__anon109b84230111::SmallestKeyHeapComparator152   bool operator()(InputFileInfo i1, InputFileInfo i2) const {
153     return (ucmp_->Compare(i1.f->smallest.user_key(),
154                            i2.f->smallest.user_key()) > 0);
155   }
156 
157  private:
158   const Comparator* ucmp_;
159 };
160 
161 typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
162                             SmallestKeyHeapComparator>
163     SmallestKeyHeap;
164 
165 // This function creates the heap that is used to find if the files are
166 // overlapping during universal compaction when the allow_trivial_move
167 // is set.
create_level_heap(Compaction * c,const Comparator * ucmp)168 SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
169   SmallestKeyHeap smallest_key_priority_q =
170       SmallestKeyHeap(SmallestKeyHeapComparator(ucmp));
171 
172   InputFileInfo input_file;
173 
174   for (size_t l = 0; l < c->num_input_levels(); l++) {
175     if (c->num_input_files(l) != 0) {
176       if (l == 0 && c->start_level() == 0) {
177         for (size_t i = 0; i < c->num_input_files(0); i++) {
178           input_file.f = c->input(0, i);
179           input_file.level = 0;
180           input_file.index = i;
181           smallest_key_priority_q.push(std::move(input_file));
182         }
183       } else {
184         input_file.f = c->input(l, 0);
185         input_file.level = l;
186         input_file.index = 0;
187         smallest_key_priority_q.push(std::move(input_file));
188       }
189     }
190   }
191   return smallest_key_priority_q;
192 }
193 
194 #ifndef NDEBUG
195 // smallest_seqno and largest_seqno are set iff. `files` is not empty.
GetSmallestLargestSeqno(const std::vector<FileMetaData * > & files,SequenceNumber * smallest_seqno,SequenceNumber * largest_seqno)196 void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
197                              SequenceNumber* smallest_seqno,
198                              SequenceNumber* largest_seqno) {
199   bool is_first = true;
200   for (FileMetaData* f : files) {
201     assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
202     if (is_first) {
203       is_first = false;
204       *smallest_seqno = f->fd.smallest_seqno;
205       *largest_seqno = f->fd.largest_seqno;
206     } else {
207       if (f->fd.smallest_seqno < *smallest_seqno) {
208         *smallest_seqno = f->fd.smallest_seqno;
209       }
210       if (f->fd.largest_seqno > *largest_seqno) {
211         *largest_seqno = f->fd.largest_seqno;
212       }
213     }
214   }
215 }
216 #endif
217 }  // namespace
218 
219 // Algorithm that checks to see if there are any overlapping
220 // files in the input
IsInputFilesNonOverlapping(Compaction * c)221 bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) {
222   auto comparator = icmp_->user_comparator();
223   int first_iter = 1;
224 
225   InputFileInfo prev, curr, next;
226 
227   SmallestKeyHeap smallest_key_priority_q =
228       create_level_heap(c, icmp_->user_comparator());
229 
230   while (!smallest_key_priority_q.empty()) {
231     curr = smallest_key_priority_q.top();
232     smallest_key_priority_q.pop();
233 
234     if (first_iter) {
235       prev = curr;
236       first_iter = 0;
237     } else {
238       if (comparator->Compare(prev.f->largest.user_key(),
239                               curr.f->smallest.user_key()) >= 0) {
240         // found overlapping files, return false
241         return false;
242       }
243       assert(comparator->Compare(curr.f->largest.user_key(),
244                                  prev.f->largest.user_key()) > 0);
245       prev = curr;
246     }
247 
248     next.f = nullptr;
249 
250     if (c->level(curr.level) != 0 &&
251         curr.index < c->num_input_files(curr.level) - 1) {
252       next.f = c->input(curr.level, curr.index + 1);
253       next.level = curr.level;
254       next.index = curr.index + 1;
255     }
256 
257     if (next.f) {
258       smallest_key_priority_q.push(std::move(next));
259     }
260   }
261   return true;
262 }
263 
NeedsCompaction(const VersionStorageInfo * vstorage) const264 bool UniversalCompactionPicker::NeedsCompaction(
265     const VersionStorageInfo* vstorage) const {
266   const int kLevel0 = 0;
267   if (vstorage->CompactionScore(kLevel0) >= 1) {
268     return true;
269   }
270   if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
271     return true;
272   }
273   if (!vstorage->FilesMarkedForCompaction().empty()) {
274     return true;
275   }
276   return false;
277 }
278 
PickCompaction(const std::string & cf_name,const MutableCFOptions & mutable_cf_options,VersionStorageInfo * vstorage,LogBuffer * log_buffer,SequenceNumber)279 Compaction* UniversalCompactionPicker::PickCompaction(
280     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
281     VersionStorageInfo* vstorage, LogBuffer* log_buffer,
282     SequenceNumber /* earliest_memtable_seqno */) {
283   UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name,
284                                      mutable_cf_options, vstorage, this,
285                                      log_buffer);
286   return builder.PickCompaction();
287 }
288 
Dump(char * out_buf,size_t out_buf_size,bool print_path) const289 void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf,
290                                                  size_t out_buf_size,
291                                                  bool print_path) const {
292   if (level == 0) {
293     assert(file != nullptr);
294     if (file->fd.GetPathId() == 0 || !print_path) {
295       snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber());
296     } else {
297       snprintf(out_buf, out_buf_size, "file %" PRIu64
298                                       "(path "
299                                       "%" PRIu32 ")",
300                file->fd.GetNumber(), file->fd.GetPathId());
301     }
302   } else {
303     snprintf(out_buf, out_buf_size, "level %d", level);
304   }
305 }
306 
DumpSizeInfo(char * out_buf,size_t out_buf_size,size_t sorted_run_count) const307 void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
308     char* out_buf, size_t out_buf_size, size_t sorted_run_count) const {
309   if (level == 0) {
310     assert(file != nullptr);
311     snprintf(out_buf, out_buf_size,
312              "file %" PRIu64 "[%" ROCKSDB_PRIszt
313              "] "
314              "with size %" PRIu64 " (compensated size %" PRIu64 ")",
315              file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(),
316              file->compensated_file_size);
317   } else {
318     snprintf(out_buf, out_buf_size,
319              "level %d[%" ROCKSDB_PRIszt
320              "] "
321              "with size %" PRIu64 " (compensated size %" PRIu64 ")",
322              level, sorted_run_count, size, compensated_file_size);
323   }
324 }
325 
326 std::vector<UniversalCompactionBuilder::SortedRun>
CalculateSortedRuns(const VersionStorageInfo & vstorage,const ImmutableCFOptions &,const MutableCFOptions & mutable_cf_options)327 UniversalCompactionBuilder::CalculateSortedRuns(
328     const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/,
329     const MutableCFOptions& mutable_cf_options) {
330   std::vector<UniversalCompactionBuilder::SortedRun> ret;
331   for (FileMetaData* f : vstorage.LevelFiles(0)) {
332     ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
333                      f->being_compacted);
334   }
335   for (int level = 1; level < vstorage.num_levels(); level++) {
336     uint64_t total_compensated_size = 0U;
337     uint64_t total_size = 0U;
338     bool being_compacted = false;
339     bool is_first = true;
340     for (FileMetaData* f : vstorage.LevelFiles(level)) {
341       total_compensated_size += f->compensated_file_size;
342       total_size += f->fd.GetFileSize();
343       if (mutable_cf_options.compaction_options_universal.allow_trivial_move ==
344           true) {
345         if (f->being_compacted) {
346           being_compacted = f->being_compacted;
347         }
348       } else {
349         // Compaction always includes all files for a non-zero level, so for a
350         // non-zero level, all the files should share the same being_compacted
351         // value.
352         // This assumption is only valid when
353         // mutable_cf_options.compaction_options_universal.allow_trivial_move
354         // is false
355         assert(is_first || f->being_compacted == being_compacted);
356       }
357       if (is_first) {
358         being_compacted = f->being_compacted;
359         is_first = false;
360       }
361     }
362     if (total_compensated_size > 0) {
363       ret.emplace_back(level, nullptr, total_size, total_compensated_size,
364                        being_compacted);
365     }
366   }
367   return ret;
368 }
369 
370 // Universal style of compaction. Pick files that are contiguous in
371 // time-range to compact.
PickCompaction()372 Compaction* UniversalCompactionBuilder::PickCompaction() {
373   const int kLevel0 = 0;
374   score_ = vstorage_->CompactionScore(kLevel0);
375   sorted_runs_ =
376       CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
377 
378   if (sorted_runs_.size() == 0 ||
379       (vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
380        vstorage_->FilesMarkedForCompaction().empty() &&
381        sorted_runs_.size() < (unsigned int)mutable_cf_options_
382                                  .level0_file_num_compaction_trigger)) {
383     ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
384                      cf_name_.c_str());
385     TEST_SYNC_POINT_CALLBACK(
386         "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
387     return nullptr;
388   }
389   VersionStorageInfo::LevelSummaryStorage tmp;
390   ROCKS_LOG_BUFFER_MAX_SZ(
391       log_buffer_, 3072,
392       "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
393       cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
394 
395   Compaction* c = nullptr;
396   // Periodic compaction has higher priority than other type of compaction
397   // because it's a hard requirement.
398   if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
399     // Always need to do a full compaction for periodic compaction.
400     c = PickPeriodicCompaction();
401   }
402 
403   // Check for size amplification.
404   if (c == nullptr &&
405       sorted_runs_.size() >=
406           static_cast<size_t>(
407               mutable_cf_options_.level0_file_num_compaction_trigger)) {
408     if ((c = PickCompactionToReduceSizeAmp()) != nullptr) {
409       ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n",
410                        cf_name_.c_str());
411     } else {
412       // Size amplification is within limits. Try reducing read
413       // amplification while maintaining file size ratios.
414       unsigned int ratio =
415           mutable_cf_options_.compaction_options_universal.size_ratio;
416 
417       if ((c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX)) != nullptr) {
418         ROCKS_LOG_BUFFER(log_buffer_,
419                          "[%s] Universal: compacting for size ratio\n",
420                          cf_name_.c_str());
421       } else {
422         // Size amplification and file size ratios are within configured limits.
423         // If max read amplification is exceeding configured limits, then force
424         // compaction without looking at filesize ratios and try to reduce
425         // the number of files to fewer than level0_file_num_compaction_trigger.
426         // This is guaranteed by NeedsCompaction()
427         assert(sorted_runs_.size() >=
428                static_cast<size_t>(
429                    mutable_cf_options_.level0_file_num_compaction_trigger));
430         // Get the total number of sorted runs that are not being compacted
431         int num_sr_not_compacted = 0;
432         for (size_t i = 0; i < sorted_runs_.size(); i++) {
433           if (sorted_runs_[i].being_compacted == false) {
434             num_sr_not_compacted++;
435           }
436         }
437 
438         // The number of sorted runs that are not being compacted is greater
439         // than the maximum allowed number of sorted runs
440         if (num_sr_not_compacted >
441             mutable_cf_options_.level0_file_num_compaction_trigger) {
442           unsigned int num_files =
443               num_sr_not_compacted -
444               mutable_cf_options_.level0_file_num_compaction_trigger + 1;
445           if ((c = PickCompactionToReduceSortedRuns(UINT_MAX, num_files)) !=
446               nullptr) {
447             ROCKS_LOG_BUFFER(log_buffer_,
448                              "[%s] Universal: compacting for file num -- %u\n",
449                              cf_name_.c_str(), num_files);
450           }
451         }
452       }
453     }
454   }
455 
456   if (c == nullptr) {
457     if ((c = PickDeleteTriggeredCompaction()) != nullptr) {
458       ROCKS_LOG_BUFFER(log_buffer_,
459                        "[%s] Universal: delete triggered compaction\n",
460                        cf_name_.c_str());
461     }
462   }
463 
464   if (c == nullptr) {
465     TEST_SYNC_POINT_CALLBACK(
466         "UniversalCompactionBuilder::PickCompaction:Return", nullptr);
467     return nullptr;
468   }
469 
470   if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
471           true &&
472       c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
473     c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
474   }
475 
476 // validate that all the chosen files of L0 are non overlapping in time
477 #ifndef NDEBUG
478   SequenceNumber prev_smallest_seqno = 0U;
479   bool is_first = true;
480 
481   size_t level_index = 0U;
482   if (c->start_level() == 0) {
483     for (auto f : *c->inputs(0)) {
484       assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
485       if (is_first) {
486         is_first = false;
487       }
488       prev_smallest_seqno = f->fd.smallest_seqno;
489     }
490     level_index = 1U;
491   }
492   for (; level_index < c->num_input_levels(); level_index++) {
493     if (c->num_input_files(level_index) != 0) {
494       SequenceNumber smallest_seqno = 0U;
495       SequenceNumber largest_seqno = 0U;
496       GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno,
497                               &largest_seqno);
498       if (is_first) {
499         is_first = false;
500       } else if (prev_smallest_seqno > 0) {
501         // A level is considered as the bottommost level if there are
502         // no files in higher levels or if files in higher levels do
503         // not overlap with the files being compacted. Sequence numbers
504         // of files in bottommost level can be set to 0 to help
505         // compression. As a result, the following assert may not hold
506         // if the prev_smallest_seqno is 0.
507         assert(prev_smallest_seqno > largest_seqno);
508       }
509       prev_smallest_seqno = smallest_seqno;
510     }
511   }
512 #endif
513   // update statistics
514   RecordInHistogram(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION,
515                     c->inputs(0)->size());
516 
517   picker_->RegisterCompaction(c);
518   vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_);
519 
520   TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return",
521                            c);
522   return c;
523 }
524 
GetPathId(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,uint64_t file_size)525 uint32_t UniversalCompactionBuilder::GetPathId(
526     const ImmutableCFOptions& ioptions,
527     const MutableCFOptions& mutable_cf_options, uint64_t file_size) {
528   // Two conditions need to be satisfied:
529   // (1) the target path needs to be able to hold the file's size
530   // (2) Total size left in this and previous paths need to be not
531   //     smaller than expected future file size before this new file is
532   //     compacted, which is estimated based on size_ratio.
533   // For example, if now we are compacting files of size (1, 1, 2, 4, 8),
534   // we will make sure the target file, probably with size of 16, will be
535   // placed in a path so that eventually when new files are generated and
536   // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
537   // before the path we chose.
538   //
539   // TODO(sdong): now the case of multiple column families is not
540   // considered in this algorithm. So the target size can be violated in
541   // that case. We need to improve it.
542   uint64_t accumulated_size = 0;
543   uint64_t future_size =
544       file_size *
545       (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
546   uint32_t p = 0;
547   assert(!ioptions.cf_paths.empty());
548   for (; p < ioptions.cf_paths.size() - 1; p++) {
549     uint64_t target_size = ioptions.cf_paths[p].target_size;
550     if (target_size > file_size &&
551         accumulated_size + (target_size - file_size) > future_size) {
552       return p;
553     }
554     accumulated_size += target_size;
555   }
556   return p;
557 }
558 
559 //
560 // Consider compaction files based on their size differences with
561 // the next file in time order.
562 //
PickCompactionToReduceSortedRuns(unsigned int ratio,unsigned int max_number_of_files_to_compact)563 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
564     unsigned int ratio, unsigned int max_number_of_files_to_compact) {
565   unsigned int min_merge_width =
566       mutable_cf_options_.compaction_options_universal.min_merge_width;
567   unsigned int max_merge_width =
568       mutable_cf_options_.compaction_options_universal.max_merge_width;
569 
570   const SortedRun* sr = nullptr;
571   bool done = false;
572   size_t start_index = 0;
573   unsigned int candidate_count = 0;
574 
575   unsigned int max_files_to_compact =
576       std::min(max_merge_width, max_number_of_files_to_compact);
577   min_merge_width = std::max(min_merge_width, 2U);
578 
579   // Caller checks the size before executing this function. This invariant is
580   // important because otherwise we may have a possible integer underflow when
581   // dealing with unsigned types.
582   assert(sorted_runs_.size() > 0);
583 
584   // Considers a candidate file only if it is smaller than the
585   // total size accumulated so far.
586   for (size_t loop = 0; loop < sorted_runs_.size(); loop++) {
587     candidate_count = 0;
588 
589     // Skip files that are already being compacted
590     for (sr = nullptr; loop < sorted_runs_.size(); loop++) {
591       sr = &sorted_runs_[loop];
592 
593       if (!sr->being_compacted) {
594         candidate_count = 1;
595         break;
596       }
597       char file_num_buf[kFormatFileNumberBufSize];
598       sr->Dump(file_num_buf, sizeof(file_num_buf));
599       ROCKS_LOG_BUFFER(log_buffer_,
600                        "[%s] Universal: %s"
601                        "[%d] being compacted, skipping",
602                        cf_name_.c_str(), file_num_buf, loop);
603 
604       sr = nullptr;
605     }
606 
607     // This file is not being compacted. Consider it as the
608     // first candidate to be compacted.
609     uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0;
610     if (sr != nullptr) {
611       char file_num_buf[kFormatFileNumberBufSize];
612       sr->Dump(file_num_buf, sizeof(file_num_buf), true);
613       ROCKS_LOG_BUFFER(log_buffer_,
614                        "[%s] Universal: Possible candidate %s[%d].",
615                        cf_name_.c_str(), file_num_buf, loop);
616     }
617 
618     // Check if the succeeding files need compaction.
619     for (size_t i = loop + 1;
620          candidate_count < max_files_to_compact && i < sorted_runs_.size();
621          i++) {
622       const SortedRun* succeeding_sr = &sorted_runs_[i];
623       if (succeeding_sr->being_compacted) {
624         break;
625       }
626       // Pick files if the total/last candidate file size (increased by the
627       // specified ratio) is still larger than the next candidate file.
628       // candidate_size is the total size of files picked so far with the
629       // default kCompactionStopStyleTotalSize; with
630       // kCompactionStopStyleSimilarSize, it's simply the size of the last
631       // picked file.
632       double sz = candidate_size * (100.0 + ratio) / 100.0;
633       if (sz < static_cast<double>(succeeding_sr->size)) {
634         break;
635       }
636       if (mutable_cf_options_.compaction_options_universal.stop_style ==
637           kCompactionStopStyleSimilarSize) {
638         // Similar-size stopping rule: also check the last picked file isn't
639         // far larger than the next candidate file.
640         sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0;
641         if (sz < static_cast<double>(candidate_size)) {
642           // If the small file we've encountered begins a run of similar-size
643           // files, we'll pick them up on a future iteration of the outer
644           // loop. If it's some lonely straggler, it'll eventually get picked
645           // by the last-resort read amp strategy which disregards size ratios.
646           break;
647         }
648         candidate_size = succeeding_sr->compensated_file_size;
649       } else {  // default kCompactionStopStyleTotalSize
650         candidate_size += succeeding_sr->compensated_file_size;
651       }
652       candidate_count++;
653     }
654 
655     // Found a series of consecutive files that need compaction.
656     if (candidate_count >= (unsigned int)min_merge_width) {
657       start_index = loop;
658       done = true;
659       break;
660     } else {
661       for (size_t i = loop;
662            i < loop + candidate_count && i < sorted_runs_.size(); i++) {
663         const SortedRun* skipping_sr = &sorted_runs_[i];
664         char file_num_buf[256];
665         skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
666         ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s",
667                          cf_name_.c_str(), file_num_buf);
668       }
669     }
670   }
671   if (!done || candidate_count <= 1) {
672     return nullptr;
673   }
674   size_t first_index_after = start_index + candidate_count;
675   // Compression is enabled if files compacted earlier already reached
676   // size ratio of compression.
677   bool enable_compression = true;
678   int ratio_to_compress =
679       mutable_cf_options_.compaction_options_universal.compression_size_percent;
680   if (ratio_to_compress >= 0) {
681     uint64_t total_size = 0;
682     for (auto& sorted_run : sorted_runs_) {
683       total_size += sorted_run.compensated_file_size;
684     }
685 
686     uint64_t older_file_size = 0;
687     for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) {
688       older_file_size += sorted_runs_[i].size;
689       if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
690         enable_compression = false;
691         break;
692       }
693     }
694   }
695 
696   uint64_t estimated_total_size = 0;
697   for (unsigned int i = 0; i < first_index_after; i++) {
698     estimated_total_size += sorted_runs_[i].size;
699   }
700   uint32_t path_id =
701       GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
702   int start_level = sorted_runs_[start_index].level;
703   int output_level;
704   if (first_index_after == sorted_runs_.size()) {
705     output_level = vstorage_->num_levels() - 1;
706   } else if (sorted_runs_[first_index_after].level == 0) {
707     output_level = 0;
708   } else {
709     output_level = sorted_runs_[first_index_after].level - 1;
710   }
711 
712   // last level is reserved for the files ingested behind
713   if (ioptions_.allow_ingest_behind &&
714       (output_level == vstorage_->num_levels() - 1)) {
715     assert(output_level > 1);
716     output_level--;
717   }
718 
719   std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
720   for (size_t i = 0; i < inputs.size(); ++i) {
721     inputs[i].level = start_level + static_cast<int>(i);
722   }
723   for (size_t i = start_index; i < first_index_after; i++) {
724     auto& picking_sr = sorted_runs_[i];
725     if (picking_sr.level == 0) {
726       FileMetaData* picking_file = picking_sr.file;
727       inputs[0].files.push_back(picking_file);
728     } else {
729       auto& files = inputs[picking_sr.level - start_level].files;
730       for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
731         files.push_back(f);
732       }
733     }
734     char file_num_buf[256];
735     picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i);
736     ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s",
737                      cf_name_.c_str(), file_num_buf);
738   }
739 
740   CompactionReason compaction_reason;
741   if (max_number_of_files_to_compact == UINT_MAX) {
742     compaction_reason = CompactionReason::kUniversalSizeRatio;
743   } else {
744     compaction_reason = CompactionReason::kUniversalSortedRunNum;
745   }
746   return new Compaction(
747       vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
748       output_level,
749       MaxFileSizeForLevel(mutable_cf_options_, output_level,
750                           kCompactionStyleUniversal),
751       LLONG_MAX, path_id,
752       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
753                          1, enable_compression),
754       GetCompressionOptions(ioptions_, vstorage_, start_level,
755                             enable_compression),
756       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
757       score_, false /* deletion_compaction */, compaction_reason);
758 }
759 
760 // Look at overall size amplification. If size amplification
761 // exceeeds the configured value, then do a compaction
762 // of the candidate files all the way upto the earliest
763 // base file (overrides configured values of file-size ratios,
764 // min_merge_width and max_merge_width).
765 //
PickCompactionToReduceSizeAmp()766 Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
767   // percentage flexibility while reducing size amplification
768   uint64_t ratio = mutable_cf_options_.compaction_options_universal
769                        .max_size_amplification_percent;
770 
771   unsigned int candidate_count = 0;
772   uint64_t candidate_size = 0;
773   size_t start_index = 0;
774   const SortedRun* sr = nullptr;
775 
776   assert(!sorted_runs_.empty());
777   if (sorted_runs_.back().being_compacted) {
778     return nullptr;
779   }
780 
781   // Skip files that are already being compacted
782   for (size_t loop = 0; loop < sorted_runs_.size() - 1; loop++) {
783     sr = &sorted_runs_[loop];
784     if (!sr->being_compacted) {
785       start_index = loop;  // Consider this as the first candidate.
786       break;
787     }
788     char file_num_buf[kFormatFileNumberBufSize];
789     sr->Dump(file_num_buf, sizeof(file_num_buf), true);
790     ROCKS_LOG_BUFFER(log_buffer_,
791                      "[%s] Universal: skipping %s[%d] compacted %s",
792                      cf_name_.c_str(), file_num_buf, loop,
793                      " cannot be a candidate to reduce size amp.\n");
794     sr = nullptr;
795   }
796 
797   if (sr == nullptr) {
798     return nullptr;  // no candidate files
799   }
800   {
801     char file_num_buf[kFormatFileNumberBufSize];
802     sr->Dump(file_num_buf, sizeof(file_num_buf), true);
803     ROCKS_LOG_BUFFER(
804         log_buffer_,
805         "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s",
806         cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n");
807   }
808 
809   // keep adding up all the remaining files
810   for (size_t loop = start_index; loop < sorted_runs_.size() - 1; loop++) {
811     sr = &sorted_runs_[loop];
812     if (sr->being_compacted) {
813       char file_num_buf[kFormatFileNumberBufSize];
814       sr->Dump(file_num_buf, sizeof(file_num_buf), true);
815       ROCKS_LOG_BUFFER(
816           log_buffer_, "[%s] Universal: Possible candidate %s[%d] %s",
817           cf_name_.c_str(), file_num_buf, start_index,
818           " is already being compacted. No size amp reduction possible.\n");
819       return nullptr;
820     }
821     candidate_size += sr->compensated_file_size;
822     candidate_count++;
823   }
824   if (candidate_count == 0) {
825     return nullptr;
826   }
827 
828   // size of earliest file
829   uint64_t earliest_file_size = sorted_runs_.back().size;
830 
831   // size amplification = percentage of additional size
832   if (candidate_size * 100 < ratio * earliest_file_size) {
833     ROCKS_LOG_BUFFER(
834         log_buffer_,
835         "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
836         " earliest-file-size %" PRIu64,
837         cf_name_.c_str(), candidate_size, earliest_file_size);
838     return nullptr;
839   } else {
840     ROCKS_LOG_BUFFER(
841         log_buffer_,
842         "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
843         " earliest-file-size %" PRIu64,
844         cf_name_.c_str(), candidate_size, earliest_file_size);
845   }
846   return PickCompactionToOldest(start_index,
847                                 CompactionReason::kUniversalSizeAmplification);
848 }
849 
850 // Pick files marked for compaction. Typically, files are marked by
851 // CompactOnDeleteCollector due to the presence of tombstones.
PickDeleteTriggeredCompaction()852 Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
853   CompactionInputFiles start_level_inputs;
854   int output_level;
855   std::vector<CompactionInputFiles> inputs;
856 
857   if (vstorage_->num_levels() == 1) {
858     // This is single level universal. Since we're basically trying to reclaim
859     // space by processing files marked for compaction due to high tombstone
860     // density, let's do the same thing as compaction to reduce size amp which
861     // has the same goals.
862     bool compact = false;
863 
864     start_level_inputs.level = 0;
865     start_level_inputs.files.clear();
866     output_level = 0;
867     for (FileMetaData* f : vstorage_->LevelFiles(0)) {
868       if (f->marked_for_compaction) {
869         compact = true;
870       }
871       if (compact) {
872         start_level_inputs.files.push_back(f);
873       }
874     }
875     if (start_level_inputs.size() <= 1) {
876       // If only the last file in L0 is marked for compaction, ignore it
877       return nullptr;
878     }
879     inputs.push_back(start_level_inputs);
880   } else {
881     int start_level;
882 
883     // For multi-level universal, the strategy is to make this look more like
884     // leveled. We pick one of the files marked for compaction and compact with
885     // overlapping files in the adjacent level.
886     picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level,
887                                           &output_level, &start_level_inputs);
888     if (start_level_inputs.empty()) {
889       return nullptr;
890     }
891 
892     // Pick the first non-empty level after the start_level
893     for (output_level = start_level + 1; output_level < vstorage_->num_levels();
894          output_level++) {
895       if (vstorage_->NumLevelFiles(output_level) != 0) {
896         break;
897       }
898     }
899 
900     // If all higher levels are empty, pick the highest level as output level
901     if (output_level == vstorage_->num_levels()) {
902       if (start_level == 0) {
903         output_level = vstorage_->num_levels() - 1;
904       } else {
905         // If start level is non-zero and all higher levels are empty, this
906         // compaction will translate into a trivial move. Since the idea is
907         // to reclaim space and trivial move doesn't help with that, we
908         // skip compaction in this case and return nullptr
909         return nullptr;
910       }
911     }
912     if (ioptions_.allow_ingest_behind &&
913         output_level == vstorage_->num_levels() - 1) {
914       assert(output_level > 1);
915       output_level--;
916     }
917 
918     if (output_level != 0) {
919       if (start_level == 0) {
920         if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
921                                             output_level, nullptr)) {
922           return nullptr;
923         }
924       }
925 
926       CompactionInputFiles output_level_inputs;
927       int parent_index = -1;
928 
929       output_level_inputs.level = output_level;
930       if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
931                                      &start_level_inputs, &output_level_inputs,
932                                      &parent_index, -1)) {
933         return nullptr;
934       }
935       inputs.push_back(start_level_inputs);
936       if (!output_level_inputs.empty()) {
937         inputs.push_back(output_level_inputs);
938       }
939       if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) {
940         return nullptr;
941       }
942     } else {
943       inputs.push_back(start_level_inputs);
944     }
945   }
946 
947   uint64_t estimated_total_size = 0;
948   // Use size of the output level as estimated file size
949   for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
950     estimated_total_size += f->fd.GetFileSize();
951   }
952   uint32_t path_id =
953       GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
954   return new Compaction(
955       vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
956       output_level,
957       MaxFileSizeForLevel(mutable_cf_options_, output_level,
958                           kCompactionStyleUniversal),
959       /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
960       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
961                          output_level, 1),
962       GetCompressionOptions(ioptions_, vstorage_, output_level),
963       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
964       score_, false /* deletion_compaction */,
965       CompactionReason::kFilesMarkedForCompaction);
966 }
967 
PickCompactionToOldest(size_t start_index,CompactionReason compaction_reason)968 Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
969     size_t start_index, CompactionReason compaction_reason) {
970   assert(start_index < sorted_runs_.size());
971 
972   // Estimate total file size
973   uint64_t estimated_total_size = 0;
974   for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
975     estimated_total_size += sorted_runs_[loop].size;
976   }
977   uint32_t path_id =
978       GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
979   int start_level = sorted_runs_[start_index].level;
980 
981   std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
982   for (size_t i = 0; i < inputs.size(); ++i) {
983     inputs[i].level = start_level + static_cast<int>(i);
984   }
985   for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
986     auto& picking_sr = sorted_runs_[loop];
987     if (picking_sr.level == 0) {
988       FileMetaData* f = picking_sr.file;
989       inputs[0].files.push_back(f);
990     } else {
991       auto& files = inputs[picking_sr.level - start_level].files;
992       for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
993         files.push_back(f);
994       }
995     }
996     std::string comp_reason_print_string;
997     if (compaction_reason == CompactionReason::kPeriodicCompaction) {
998       comp_reason_print_string = "periodic compaction";
999     } else if (compaction_reason ==
1000                CompactionReason::kUniversalSizeAmplification) {
1001       comp_reason_print_string = "size amp";
1002     } else {
1003       assert(false);
1004     }
1005 
1006     char file_num_buf[256];
1007     picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
1008     ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
1009                      cf_name_.c_str(), comp_reason_print_string.c_str(),
1010                      file_num_buf);
1011   }
1012 
1013   // output files at the bottom most level, unless it's reserved
1014   int output_level = vstorage_->num_levels() - 1;
1015   // last level is reserved for the files ingested behind
1016   if (ioptions_.allow_ingest_behind) {
1017     assert(output_level > 1);
1018     output_level--;
1019   }
1020 
1021   // We never check size for
1022   // compaction_options_universal.compression_size_percent,
1023   // because we always compact all the files, so always compress.
1024   return new Compaction(
1025       vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
1026       output_level,
1027       MaxFileSizeForLevel(mutable_cf_options_, output_level,
1028                           kCompactionStyleUniversal),
1029       LLONG_MAX, path_id,
1030       GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1031                          1, true /* enable_compression */),
1032       GetCompressionOptions(ioptions_, vstorage_, start_level,
1033                             true /* enable_compression */),
1034       /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
1035       score_, false /* deletion_compaction */, compaction_reason);
1036 }
1037 
PickPeriodicCompaction()1038 Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
1039   ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
1040                    cf_name_.c_str());
1041 
1042   // In universal compaction, sorted runs contain older data are almost always
1043   // generated earlier too. To simplify the problem, we just try to trigger
1044   // a full compaction. We start from the oldest sorted run and include
1045   // all sorted runs, until we hit a sorted already being compacted.
1046   // Since usually the largest (which is usually the oldest) sorted run is
1047   // included anyway, doing a full compaction won't increase write
1048   // amplification much.
1049 
1050   // Get some information from marked files to check whether a file is
1051   // included in the compaction.
1052 
1053   size_t start_index = sorted_runs_.size();
1054   while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) {
1055     start_index--;
1056   }
1057   if (start_index == sorted_runs_.size()) {
1058     return nullptr;
1059   }
1060 
1061   // There is a rare corner case where we can't pick up all the files
1062   // because some files are being compacted and we end up with picking files
1063   // but none of them need periodic compaction. Unless we simply recompact
1064   // the last sorted run (either the last level or last L0 file), we would just
1065   // execute the compaction, in order to simplify  the logic.
1066   if (start_index == sorted_runs_.size() - 1) {
1067     bool included_file_marked = false;
1068     int start_level = sorted_runs_[start_index].level;
1069     FileMetaData* start_file = sorted_runs_[start_index].file;
1070     for (const std::pair<int, FileMetaData*>& level_file_pair :
1071          vstorage_->FilesMarkedForPeriodicCompaction()) {
1072       if (start_level != 0) {
1073         // Last sorted run is a level
1074         if (start_level == level_file_pair.first) {
1075           included_file_marked = true;
1076           break;
1077         }
1078       } else {
1079         // Last sorted run is a L0 file.
1080         if (start_file == level_file_pair.second) {
1081           included_file_marked = true;
1082           break;
1083         }
1084       }
1085     }
1086     if (!included_file_marked) {
1087       ROCKS_LOG_BUFFER(log_buffer_,
1088                        "[%s] Universal: Cannot form a compaction covering file "
1089                        "marked for periodic compaction",
1090                        cf_name_.c_str());
1091       return nullptr;
1092     }
1093   }
1094 
1095   Compaction* c = PickCompactionToOldest(start_index,
1096                                          CompactionReason::kPeriodicCompaction);
1097 
1098   TEST_SYNC_POINT_CALLBACK(
1099       "UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
1100 
1101   return c;
1102 }
1103 }  // namespace ROCKSDB_NAMESPACE
1104 
1105 #endif  // !ROCKSDB_LITE
1106