1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #if !defined(ROCKSDB_LITE)
13 #include "rocksdb/utilities/table_properties_collectors.h"
14 #include "test_util/sync_point.h"
15 
16 namespace ROCKSDB_NAMESPACE {
17 
CompressibleString(Random * rnd,int len)18 static std::string CompressibleString(Random* rnd, int len) {
19   std::string r;
20   test::CompressibleString(rnd, 0.8, len, &r);
21   return r;
22 }
23 
24 class DBTestUniversalCompactionBase
25     : public DBTestBase,
26       public ::testing::WithParamInterface<std::tuple<int, bool>> {
27  public:
DBTestUniversalCompactionBase(const std::string & path)28   explicit DBTestUniversalCompactionBase(
29       const std::string& path) : DBTestBase(path) {}
SetUp()30   void SetUp() override {
31     num_levels_ = std::get<0>(GetParam());
32     exclusive_manual_compaction_ = std::get<1>(GetParam());
33   }
34   int num_levels_;
35   bool exclusive_manual_compaction_;
36 };
37 
38 class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
39  public:
DBTestUniversalCompaction()40   DBTestUniversalCompaction() :
41       DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
42 };
43 
44 class DBTestUniversalCompaction2 : public DBTestBase {
45  public:
DBTestUniversalCompaction2()46   DBTestUniversalCompaction2() : DBTestBase("/db_universal_compaction_test2") {}
47 };
48 
49 namespace {
VerifyCompactionResult(const ColumnFamilyMetaData & cf_meta,const std::set<std::string> & overlapping_file_numbers)50 void VerifyCompactionResult(
51     const ColumnFamilyMetaData& cf_meta,
52     const std::set<std::string>& overlapping_file_numbers) {
53 #ifndef NDEBUG
54   for (auto& level : cf_meta.levels) {
55     for (auto& file : level.files) {
56       assert(overlapping_file_numbers.find(file.name) ==
57              overlapping_file_numbers.end());
58     }
59   }
60 #endif
61 }
62 
63 class KeepFilter : public CompactionFilter {
64  public:
Filter(int,const Slice &,const Slice &,std::string *,bool *) const65   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
66               std::string* /*new_value*/,
67               bool* /*value_changed*/) const override {
68     return false;
69   }
70 
Name() const71   const char* Name() const override { return "KeepFilter"; }
72 };
73 
74 class KeepFilterFactory : public CompactionFilterFactory {
75  public:
KeepFilterFactory(bool check_context=false)76   explicit KeepFilterFactory(bool check_context = false)
77       : check_context_(check_context) {}
78 
CreateCompactionFilter(const CompactionFilter::Context & context)79   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
80       const CompactionFilter::Context& context) override {
81     if (check_context_) {
82       EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
83       EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
84     }
85     return std::unique_ptr<CompactionFilter>(new KeepFilter());
86   }
87 
Name() const88   const char* Name() const override { return "KeepFilterFactory"; }
89   bool check_context_;
90   std::atomic_bool expect_full_compaction_;
91   std::atomic_bool expect_manual_compaction_;
92 };
93 
94 class DelayFilter : public CompactionFilter {
95  public:
DelayFilter(DBTestBase * d)96   explicit DelayFilter(DBTestBase* d) : db_test(d) {}
Filter(int,const Slice &,const Slice &,std::string *,bool *) const97   bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
98               std::string* /*new_value*/,
99               bool* /*value_changed*/) const override {
100     db_test->env_->addon_time_.fetch_add(1000);
101     return true;
102   }
103 
Name() const104   const char* Name() const override { return "DelayFilter"; }
105 
106  private:
107   DBTestBase* db_test;
108 };
109 
110 class DelayFilterFactory : public CompactionFilterFactory {
111  public:
DelayFilterFactory(DBTestBase * d)112   explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
CreateCompactionFilter(const CompactionFilter::Context &)113   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
114       const CompactionFilter::Context& /*context*/) override {
115     return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
116   }
117 
Name() const118   const char* Name() const override { return "DelayFilterFactory"; }
119 
120  private:
121   DBTestBase* db_test;
122 };
123 }  // namespace
124 
125 // Make sure we don't trigger a problem if the trigger condtion is given
126 // to be 0, which is invalid.
TEST_P(DBTestUniversalCompaction,UniversalCompactionSingleSortedRun)127 TEST_P(DBTestUniversalCompaction, UniversalCompactionSingleSortedRun) {
128   Options options = CurrentOptions();
129 
130   options.compaction_style = kCompactionStyleUniversal;
131   options.num_levels = num_levels_;
132   // Config universal compaction to always compact to one single sorted run.
133   options.level0_file_num_compaction_trigger = 0;
134   options.compaction_options_universal.size_ratio = 10;
135   options.compaction_options_universal.min_merge_width = 2;
136   options.compaction_options_universal.max_size_amplification_percent = 0;
137 
138   options.write_buffer_size = 105 << 10;  // 105KB
139   options.arena_block_size = 4 << 10;
140   options.target_file_size_base = 32 << 10;  // 32KB
141   // trigger compaction if there are >= 4 files
142   KeepFilterFactory* filter = new KeepFilterFactory(true);
143   filter->expect_manual_compaction_.store(false);
144   options.compaction_filter_factory.reset(filter);
145 
146   DestroyAndReopen(options);
147   ASSERT_EQ(1, db_->GetOptions().level0_file_num_compaction_trigger);
148 
149   Random rnd(301);
150   int key_idx = 0;
151 
152   filter->expect_full_compaction_.store(true);
153 
154   for (int num = 0; num < 16; num++) {
155     // Write 100KB file. And immediately it should be compacted to one file.
156     GenerateNewFile(&rnd, &key_idx);
157     dbfull()->TEST_WaitForCompact();
158     ASSERT_EQ(NumSortedRuns(0), 1);
159   }
160   ASSERT_OK(Put(Key(key_idx), ""));
161   dbfull()->TEST_WaitForCompact();
162   ASSERT_EQ(NumSortedRuns(0), 1);
163 }
164 
TEST_P(DBTestUniversalCompaction,OptimizeFiltersForHits)165 TEST_P(DBTestUniversalCompaction, OptimizeFiltersForHits) {
166   Options options = CurrentOptions();
167   options.compaction_style = kCompactionStyleUniversal;
168   options.compaction_options_universal.size_ratio = 5;
169   options.num_levels = num_levels_;
170   options.write_buffer_size = 105 << 10;  // 105KB
171   options.arena_block_size = 4 << 10;
172   options.target_file_size_base = 32 << 10;  // 32KB
173   // trigger compaction if there are >= 4 files
174   options.level0_file_num_compaction_trigger = 4;
175   BlockBasedTableOptions bbto;
176   bbto.cache_index_and_filter_blocks = true;
177   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
178   bbto.whole_key_filtering = true;
179   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
180   options.optimize_filters_for_hits = true;
181   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
182   options.memtable_factory.reset(new SpecialSkipListFactory(3));
183 
184   DestroyAndReopen(options);
185 
186   // block compaction from happening
187   env_->SetBackgroundThreads(1, Env::LOW);
188   test::SleepingBackgroundTask sleeping_task_low;
189   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
190                  Env::Priority::LOW);
191 
192   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
193     Put(Key(num * 10), "val");
194     if (num) {
195       dbfull()->TEST_WaitForFlushMemTable();
196     }
197     Put(Key(30 + num * 10), "val");
198     Put(Key(60 + num * 10), "val");
199   }
200   Put("", "");
201   dbfull()->TEST_WaitForFlushMemTable();
202 
203   // Query set of non existing keys
204   for (int i = 5; i < 90; i += 10) {
205     ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
206   }
207 
208   // Make sure bloom filter is used at least once.
209   ASSERT_GT(TestGetTickerCount(options, BLOOM_FILTER_USEFUL), 0);
210   auto prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
211 
212   // Make sure bloom filter is used for all but the last L0 file when looking
213   // up a non-existent key that's in the range of all L0 files.
214   ASSERT_EQ(Get(Key(35)), "NOT_FOUND");
215   ASSERT_EQ(prev_counter + NumTableFilesAtLevel(0) - 1,
216             TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
217   prev_counter = TestGetTickerCount(options, BLOOM_FILTER_USEFUL);
218 
219   // Unblock compaction and wait it for happening.
220   sleeping_task_low.WakeUp();
221   dbfull()->TEST_WaitForCompact();
222 
223   // The same queries will not trigger bloom filter
224   for (int i = 5; i < 90; i += 10) {
225     ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
226   }
227   ASSERT_EQ(prev_counter, TestGetTickerCount(options, BLOOM_FILTER_USEFUL));
228 }
229 
230 // TODO(kailiu) The tests on UniversalCompaction has some issues:
231 //  1. A lot of magic numbers ("11" or "12").
232 //  2. Made assumption on the memtable flush conditions, which may change from
233 //     time to time.
TEST_P(DBTestUniversalCompaction,UniversalCompactionTrigger)234 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) {
235   Options options;
236   options.compaction_style = kCompactionStyleUniversal;
237   options.compaction_options_universal.size_ratio = 5;
238   options.num_levels = num_levels_;
239   options.write_buffer_size = 105 << 10;  // 105KB
240   options.arena_block_size = 4 << 10;
241   options.target_file_size_base = 32 << 10;  // 32KB
242   // trigger compaction if there are >= 4 files
243   options.level0_file_num_compaction_trigger = 4;
244   KeepFilterFactory* filter = new KeepFilterFactory(true);
245   filter->expect_manual_compaction_.store(false);
246   options.compaction_filter_factory.reset(filter);
247 
248   options = CurrentOptions(options);
249   DestroyAndReopen(options);
250   CreateAndReopenWithCF({"pikachu"}, options);
251 
252   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
253       "DBTestWritableFile.GetPreallocationStatus", [&](void* arg) {
254         ASSERT_TRUE(arg != nullptr);
255         size_t preallocation_size = *(static_cast<size_t*>(arg));
256         if (num_levels_ > 3) {
257           ASSERT_LE(preallocation_size, options.target_file_size_base * 1.1);
258         }
259       });
260   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
261 
262   Random rnd(301);
263   int key_idx = 0;
264 
265   filter->expect_full_compaction_.store(true);
266   // Stage 1:
267   //   Generate a set of files at level 0, but don't trigger level-0
268   //   compaction.
269   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
270        num++) {
271     // Write 100KB
272     GenerateNewFile(1, &rnd, &key_idx);
273   }
274 
275   // Generate one more file at level-0, which should trigger level-0
276   // compaction.
277   GenerateNewFile(1, &rnd, &key_idx);
278   // Suppose each file flushed from mem table has size 1. Now we compact
279   // (level0_file_num_compaction_trigger+1)=4 files and should have a big
280   // file of size 4.
281   ASSERT_EQ(NumSortedRuns(1), 1);
282 
283   // Stage 2:
284   //   Now we have one file at level 0, with size 4. We also have some data in
285   //   mem table. Let's continue generating new files at level 0, but don't
286   //   trigger level-0 compaction.
287   //   First, clean up memtable before inserting new data. This will generate
288   //   a level-0 file, with size around 0.4 (according to previously written
289   //   data amount).
290   filter->expect_full_compaction_.store(false);
291   ASSERT_OK(Flush(1));
292   for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
293        num++) {
294     GenerateNewFile(1, &rnd, &key_idx);
295     ASSERT_EQ(NumSortedRuns(1), num + 3);
296   }
297 
298   // Generate one more file at level-0, which should trigger level-0
299   // compaction.
300   GenerateNewFile(1, &rnd, &key_idx);
301   // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
302   // After compaction, we should have 2 files, with size 4, 2.4.
303   ASSERT_EQ(NumSortedRuns(1), 2);
304 
305   // Stage 3:
306   //   Now we have 2 files at level 0, with size 4 and 2.4. Continue
307   //   generating new files at level 0.
308   for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
309        num++) {
310     GenerateNewFile(1, &rnd, &key_idx);
311     ASSERT_EQ(NumSortedRuns(1), num + 3);
312   }
313 
314   // Generate one more file at level-0, which should trigger level-0
315   // compaction.
316   GenerateNewFile(1, &rnd, &key_idx);
317   // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1.
318   // After compaction, we should have 3 files, with size 4, 2.4, 2.
319   ASSERT_EQ(NumSortedRuns(1), 3);
320 
321   // Stage 4:
322   //   Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
323   //   new file of size 1.
324   GenerateNewFile(1, &rnd, &key_idx);
325   dbfull()->TEST_WaitForCompact();
326   // Level-0 compaction is triggered, but no file will be picked up.
327   ASSERT_EQ(NumSortedRuns(1), 4);
328 
329   // Stage 5:
330   //   Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
331   //   a new file of size 1.
332   filter->expect_full_compaction_.store(true);
333   GenerateNewFile(1, &rnd, &key_idx);
334   dbfull()->TEST_WaitForCompact();
335   // All files at level 0 will be compacted into a single one.
336   ASSERT_EQ(NumSortedRuns(1), 1);
337 
338   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
339 }
340 
TEST_P(DBTestUniversalCompaction,UniversalCompactionSizeAmplification)341 TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) {
342   Options options = CurrentOptions();
343   options.compaction_style = kCompactionStyleUniversal;
344   options.num_levels = num_levels_;
345   options.write_buffer_size = 100 << 10;     // 100KB
346   options.target_file_size_base = 32 << 10;  // 32KB
347   options.level0_file_num_compaction_trigger = 3;
348   DestroyAndReopen(options);
349   CreateAndReopenWithCF({"pikachu"}, options);
350 
351   // Trigger compaction if size amplification exceeds 110%
352   options.compaction_options_universal.max_size_amplification_percent = 110;
353   options = CurrentOptions(options);
354   ReopenWithColumnFamilies({"default", "pikachu"}, options);
355 
356   Random rnd(301);
357   int key_idx = 0;
358 
359   //   Generate two files in Level 0. Both files are approx the same size.
360   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
361        num++) {
362     // Write 110KB (11 values, each 10K)
363     for (int i = 0; i < 11; i++) {
364       ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
365       key_idx++;
366     }
367     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
368     ASSERT_EQ(NumSortedRuns(1), num + 1);
369   }
370   ASSERT_EQ(NumSortedRuns(1), 2);
371 
372   // Flush whatever is remaining in memtable. This is typically
373   // small, which should not trigger size ratio based compaction
374   // but will instead trigger size amplification.
375   ASSERT_OK(Flush(1));
376 
377   dbfull()->TEST_WaitForCompact();
378 
379   // Verify that size amplification did occur
380   ASSERT_EQ(NumSortedRuns(1), 1);
381 }
382 
TEST_P(DBTestUniversalCompaction,DynamicUniversalCompactionSizeAmplification)383 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionSizeAmplification) {
384   Options options = CurrentOptions();
385   options.compaction_style = kCompactionStyleUniversal;
386   options.num_levels = 1;
387   options.write_buffer_size = 100 << 10;     // 100KB
388   options.target_file_size_base = 32 << 10;  // 32KB
389   options.level0_file_num_compaction_trigger = 3;
390   // Initial setup of compaction_options_universal will prevent universal
391   // compaction from happening
392   options.compaction_options_universal.size_ratio = 100;
393   options.compaction_options_universal.min_merge_width = 100;
394   DestroyAndReopen(options);
395 
396   int total_picked_compactions = 0;
397   int total_size_amp_compactions = 0;
398   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
399       "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
400         if (arg) {
401           total_picked_compactions++;
402           Compaction* c = static_cast<Compaction*>(arg);
403           if (c->compaction_reason() ==
404               CompactionReason::kUniversalSizeAmplification) {
405             total_size_amp_compactions++;
406           }
407         }
408       });
409   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
410 
411   MutableCFOptions mutable_cf_options;
412   CreateAndReopenWithCF({"pikachu"}, options);
413 
414   Random rnd(301);
415   int key_idx = 0;
416 
417   //   Generate two files in Level 0. Both files are approx the same size.
418   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
419        num++) {
420     // Write 110KB (11 values, each 10K)
421     for (int i = 0; i < 11; i++) {
422       ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
423       key_idx++;
424     }
425     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
426     ASSERT_EQ(NumSortedRuns(1), num + 1);
427   }
428   ASSERT_EQ(NumSortedRuns(1), 2);
429 
430   // Flush whatever is remaining in memtable. This is typically
431   // small, which should not trigger size ratio based compaction
432   // but could instead trigger size amplification if it's set
433   // to 110.
434   ASSERT_OK(Flush(1));
435   dbfull()->TEST_WaitForCompact();
436   // Verify compaction did not happen
437   ASSERT_EQ(NumSortedRuns(1), 3);
438 
439   // Trigger compaction if size amplification exceeds 110% without reopening DB
440   ASSERT_EQ(dbfull()
441                 ->GetOptions(handles_[1])
442                 .compaction_options_universal.max_size_amplification_percent,
443             200U);
444   ASSERT_OK(dbfull()->SetOptions(handles_[1],
445                                  {{"compaction_options_universal",
446                                    "{max_size_amplification_percent=110;}"}}));
447   ASSERT_EQ(dbfull()
448                 ->GetOptions(handles_[1])
449                 .compaction_options_universal.max_size_amplification_percent,
450             110u);
451   ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
452                                                      &mutable_cf_options));
453   ASSERT_EQ(110u, mutable_cf_options.compaction_options_universal
454                       .max_size_amplification_percent);
455 
456   dbfull()->TEST_WaitForCompact();
457   // Verify that size amplification did happen
458   ASSERT_EQ(NumSortedRuns(1), 1);
459   ASSERT_EQ(total_picked_compactions, 1);
460   ASSERT_EQ(total_size_amp_compactions, 1);
461 }
462 
TEST_P(DBTestUniversalCompaction,DynamicUniversalCompactionReadAmplification)463 TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionReadAmplification) {
464   Options options = CurrentOptions();
465   options.compaction_style = kCompactionStyleUniversal;
466   options.num_levels = 1;
467   options.write_buffer_size = 100 << 10;     // 100KB
468   options.target_file_size_base = 32 << 10;  // 32KB
469   options.level0_file_num_compaction_trigger = 3;
470   // Initial setup of compaction_options_universal will prevent universal
471   // compaction from happening
472   options.compaction_options_universal.max_size_amplification_percent = 2000;
473   options.compaction_options_universal.size_ratio = 0;
474   options.compaction_options_universal.min_merge_width = 100;
475   DestroyAndReopen(options);
476 
477   int total_picked_compactions = 0;
478   int total_size_ratio_compactions = 0;
479   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
480       "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
481         if (arg) {
482           total_picked_compactions++;
483           Compaction* c = static_cast<Compaction*>(arg);
484           if (c->compaction_reason() == CompactionReason::kUniversalSizeRatio) {
485             total_size_ratio_compactions++;
486           }
487         }
488       });
489   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
490 
491   MutableCFOptions mutable_cf_options;
492   CreateAndReopenWithCF({"pikachu"}, options);
493 
494   Random rnd(301);
495   int key_idx = 0;
496 
497   // Generate three files in Level 0. All files are approx the same size.
498   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
499     // Write 110KB (11 values, each 10K)
500     for (int i = 0; i < 11; i++) {
501       ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
502       key_idx++;
503     }
504     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
505     ASSERT_EQ(NumSortedRuns(1), num + 1);
506   }
507   ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger);
508 
509   // Flush whatever is remaining in memtable. This is typically small, about
510   // 30KB.
511   ASSERT_OK(Flush(1));
512   dbfull()->TEST_WaitForCompact();
513   // Verify compaction did not happen
514   ASSERT_EQ(NumSortedRuns(1), options.level0_file_num_compaction_trigger + 1);
515   ASSERT_EQ(total_picked_compactions, 0);
516 
517   ASSERT_OK(dbfull()->SetOptions(
518       handles_[1],
519       {{"compaction_options_universal",
520         "{min_merge_width=2;max_merge_width=2;size_ratio=100;}"}}));
521   ASSERT_EQ(dbfull()
522                 ->GetOptions(handles_[1])
523                 .compaction_options_universal.min_merge_width,
524             2u);
525   ASSERT_EQ(dbfull()
526                 ->GetOptions(handles_[1])
527                 .compaction_options_universal.max_merge_width,
528             2u);
529   ASSERT_EQ(
530       dbfull()->GetOptions(handles_[1]).compaction_options_universal.size_ratio,
531       100u);
532 
533   ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
534                                                      &mutable_cf_options));
535   ASSERT_EQ(mutable_cf_options.compaction_options_universal.size_ratio, 100u);
536   ASSERT_EQ(mutable_cf_options.compaction_options_universal.min_merge_width,
537             2u);
538   ASSERT_EQ(mutable_cf_options.compaction_options_universal.max_merge_width,
539             2u);
540 
541   dbfull()->TEST_WaitForCompact();
542 
543   // Files in L0 are approx: 0.3 (30KB), 1, 1, 1.
544   // On compaction: the files are below the size amp threshold, so we
545   // fallthrough to checking read amp conditions. The configured size ratio is
546   // not big enough to take 0.3 into consideration. So the next files 1 and 1
547   // are compacted together first as they satisfy size ratio condition and
548   // (min_merge_width, max_merge_width) condition, to give out a file size of 2.
549   // Next, the newly generated 2 and the last file 1 are compacted together. So
550   // at the end: #sortedRuns = 2, #picked_compactions = 2, and all the picked
551   // ones are size ratio based compactions.
552   ASSERT_EQ(NumSortedRuns(1), 2);
553   // If max_merge_width had not been changed dynamically above, and if it
554   // continued to be the default value of UINIT_MAX, total_picked_compactions
555   // would have been 1.
556   ASSERT_EQ(total_picked_compactions, 2);
557   ASSERT_EQ(total_size_ratio_compactions, 2);
558 }
559 
TEST_P(DBTestUniversalCompaction,CompactFilesOnUniversalCompaction)560 TEST_P(DBTestUniversalCompaction, CompactFilesOnUniversalCompaction) {
561   const int kTestKeySize = 16;
562   const int kTestValueSize = 984;
563   const int kEntrySize = kTestKeySize + kTestValueSize;
564   const int kEntriesPerBuffer = 10;
565 
566   ChangeCompactOptions();
567   Options options;
568   options.create_if_missing = true;
569   options.compaction_style = kCompactionStyleLevel;
570   options.num_levels = 1;
571   options.target_file_size_base = options.write_buffer_size;
572   options.compression = kNoCompression;
573   options = CurrentOptions(options);
574   options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
575   CreateAndReopenWithCF({"pikachu"}, options);
576   ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
577   Random rnd(301);
578   for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
579     ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
580   }
581   dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
582   dbfull()->TEST_WaitForCompact();
583   ColumnFamilyMetaData cf_meta;
584   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
585   std::vector<std::string> compaction_input_file_names;
586   for (auto file : cf_meta.levels[0].files) {
587     if (rnd.OneIn(2)) {
588       compaction_input_file_names.push_back(file.name);
589     }
590   }
591 
592   if (compaction_input_file_names.size() == 0) {
593     compaction_input_file_names.push_back(
594         cf_meta.levels[0].files[0].name);
595   }
596 
597   // expect fail since universal compaction only allow L0 output
598   ASSERT_FALSE(dbfull()
599                    ->CompactFiles(CompactionOptions(), handles_[1],
600                                   compaction_input_file_names, 1)
601                    .ok());
602 
603   // expect ok and verify the compacted files no longer exist.
604   ASSERT_OK(dbfull()->CompactFiles(
605       CompactionOptions(), handles_[1],
606       compaction_input_file_names, 0));
607 
608   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
609   VerifyCompactionResult(
610       cf_meta,
611       std::set<std::string>(compaction_input_file_names.begin(),
612           compaction_input_file_names.end()));
613 
614   compaction_input_file_names.clear();
615 
616   // Pick the first and the last file, expect everything is
617   // compacted into one single file.
618   compaction_input_file_names.push_back(
619       cf_meta.levels[0].files[0].name);
620   compaction_input_file_names.push_back(
621       cf_meta.levels[0].files[
622           cf_meta.levels[0].files.size() - 1].name);
623   ASSERT_OK(dbfull()->CompactFiles(
624       CompactionOptions(), handles_[1],
625       compaction_input_file_names, 0));
626 
627   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
628   ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
629 }
630 
TEST_P(DBTestUniversalCompaction,UniversalCompactionTargetLevel)631 TEST_P(DBTestUniversalCompaction, UniversalCompactionTargetLevel) {
632   Options options = CurrentOptions();
633   options.compaction_style = kCompactionStyleUniversal;
634   options.write_buffer_size = 100 << 10;     // 100KB
635   options.num_levels = 7;
636   options.disable_auto_compactions = true;
637   DestroyAndReopen(options);
638 
639   // Generate 3 overlapping files
640   Random rnd(301);
641   for (int i = 0; i < 210; i++) {
642     ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
643   }
644   ASSERT_OK(Flush());
645 
646   for (int i = 200; i < 300; i++) {
647     ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
648   }
649   ASSERT_OK(Flush());
650 
651   for (int i = 250; i < 260; i++) {
652     ASSERT_OK(Put(Key(i), RandomString(&rnd, 100)));
653   }
654   ASSERT_OK(Flush());
655 
656   ASSERT_EQ("3", FilesPerLevel(0));
657   // Compact all files into 1 file and put it in L4
658   CompactRangeOptions compact_options;
659   compact_options.change_level = true;
660   compact_options.target_level = 4;
661   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
662   db_->CompactRange(compact_options, nullptr, nullptr);
663   ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
664 }
665 
666 #ifndef ROCKSDB_VALGRIND_RUN
667 class DBTestUniversalCompactionMultiLevels
668     : public DBTestUniversalCompactionBase {
669  public:
DBTestUniversalCompactionMultiLevels()670   DBTestUniversalCompactionMultiLevels() :
671       DBTestUniversalCompactionBase(
672           "/db_universal_compaction_multi_levels_test") {}
673 };
674 
TEST_P(DBTestUniversalCompactionMultiLevels,UniversalCompactionMultiLevels)675 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) {
676   Options options = CurrentOptions();
677   options.compaction_style = kCompactionStyleUniversal;
678   options.num_levels = num_levels_;
679   options.write_buffer_size = 100 << 10;  // 100KB
680   options.level0_file_num_compaction_trigger = 8;
681   options.max_background_compactions = 3;
682   options.target_file_size_base = 32 * 1024;
683   CreateAndReopenWithCF({"pikachu"}, options);
684 
685   // Trigger compaction if size amplification exceeds 110%
686   options.compaction_options_universal.max_size_amplification_percent = 110;
687   options = CurrentOptions(options);
688   ReopenWithColumnFamilies({"default", "pikachu"}, options);
689 
690   Random rnd(301);
691   int num_keys = 100000;
692   for (int i = 0; i < num_keys * 2; i++) {
693     ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
694   }
695 
696   dbfull()->TEST_WaitForCompact();
697 
698   for (int i = num_keys; i < num_keys * 2; i++) {
699     ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
700   }
701 }
702 
703 // Tests universal compaction with trivial move enabled
TEST_P(DBTestUniversalCompactionMultiLevels,UniversalCompactionTrivialMove)704 TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) {
705   int32_t trivial_move = 0;
706   int32_t non_trivial_move = 0;
707   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
708       "DBImpl::BackgroundCompaction:TrivialMove",
709       [&](void* /*arg*/) { trivial_move++; });
710   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
711       "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
712         non_trivial_move++;
713         ASSERT_TRUE(arg != nullptr);
714         int output_level = *(static_cast<int*>(arg));
715         ASSERT_EQ(output_level, 0);
716       });
717   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
718 
719   Options options = CurrentOptions();
720   options.compaction_style = kCompactionStyleUniversal;
721   options.compaction_options_universal.allow_trivial_move = true;
722   options.num_levels = 3;
723   options.write_buffer_size = 100 << 10;  // 100KB
724   options.level0_file_num_compaction_trigger = 3;
725   options.max_background_compactions = 2;
726   options.target_file_size_base = 32 * 1024;
727   DestroyAndReopen(options);
728   CreateAndReopenWithCF({"pikachu"}, options);
729 
730   // Trigger compaction if size amplification exceeds 110%
731   options.compaction_options_universal.max_size_amplification_percent = 110;
732   options = CurrentOptions(options);
733   ReopenWithColumnFamilies({"default", "pikachu"}, options);
734 
735   Random rnd(301);
736   int num_keys = 150000;
737   for (int i = 0; i < num_keys; i++) {
738     ASSERT_OK(Put(1, Key(i), Key(i)));
739   }
740   std::vector<std::string> values;
741 
742   ASSERT_OK(Flush(1));
743   dbfull()->TEST_WaitForCompact();
744 
745   ASSERT_GT(trivial_move, 0);
746   ASSERT_GT(non_trivial_move, 0);
747 
748   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
749 }
750 
751 INSTANTIATE_TEST_CASE_P(MultiLevels, DBTestUniversalCompactionMultiLevels,
752                         ::testing::Combine(::testing::Values(3, 20),
753                                            ::testing::Bool()));
754 
755 class DBTestUniversalCompactionParallel :
756     public DBTestUniversalCompactionBase {
757  public:
DBTestUniversalCompactionParallel()758   DBTestUniversalCompactionParallel() :
759       DBTestUniversalCompactionBase(
760           "/db_universal_compaction_prallel_test") {}
761 };
762 
TEST_P(DBTestUniversalCompactionParallel,UniversalCompactionParallel)763 TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
764   Options options = CurrentOptions();
765   options.compaction_style = kCompactionStyleUniversal;
766   options.num_levels = num_levels_;
767   options.write_buffer_size = 1 << 10;  // 1KB
768   options.level0_file_num_compaction_trigger = 3;
769   options.max_background_compactions = 3;
770   options.max_background_flushes = 3;
771   options.target_file_size_base = 1 * 1024;
772   options.compaction_options_universal.max_size_amplification_percent = 110;
773   DestroyAndReopen(options);
774   CreateAndReopenWithCF({"pikachu"}, options);
775 
776   // Delay every compaction so multiple compactions will happen.
777   std::atomic<int> num_compactions_running(0);
778   std::atomic<bool> has_parallel(false);
779   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
780       "CompactionJob::Run():Start", [&](void* /*arg*/) {
781         if (num_compactions_running.fetch_add(1) > 0) {
782           has_parallel.store(true);
783           return;
784         }
785         for (int nwait = 0; nwait < 20000; nwait++) {
786           if (has_parallel.load() || num_compactions_running.load() > 1) {
787             has_parallel.store(true);
788             break;
789           }
790           env_->SleepForMicroseconds(1000);
791         }
792       });
793   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
794       "CompactionJob::Run():End",
795       [&](void* /*arg*/) { num_compactions_running.fetch_add(-1); });
796   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
797 
798   options = CurrentOptions(options);
799   ReopenWithColumnFamilies({"default", "pikachu"}, options);
800 
801   Random rnd(301);
802   int num_keys = 30000;
803   for (int i = 0; i < num_keys * 2; i++) {
804     ASSERT_OK(Put(1, Key(i % num_keys), Key(i)));
805   }
806   dbfull()->TEST_WaitForCompact();
807 
808   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
809   ASSERT_EQ(num_compactions_running.load(), 0);
810   ASSERT_TRUE(has_parallel.load());
811 
812   for (int i = num_keys; i < num_keys * 2; i++) {
813     ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
814   }
815 
816   // Reopen and check.
817   ReopenWithColumnFamilies({"default", "pikachu"}, options);
818   for (int i = num_keys; i < num_keys * 2; i++) {
819     ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i));
820   }
821 }
822 
TEST_P(DBTestUniversalCompactionParallel,PickByFileNumberBug)823 TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) {
824   Options options = CurrentOptions();
825   options.compaction_style = kCompactionStyleUniversal;
826   options.num_levels = num_levels_;
827   options.write_buffer_size = 1 * 1024;  // 1KB
828   options.level0_file_num_compaction_trigger = 7;
829   options.max_background_compactions = 2;
830   options.target_file_size_base = 1024 * 1024;  // 1MB
831 
832   // Disable size amplifiction compaction
833   options.compaction_options_universal.max_size_amplification_percent =
834       UINT_MAX;
835   DestroyAndReopen(options);
836 
837   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
838       {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0",
839         "BackgroundCallCompaction:0"},
840        {"UniversalCompactionBuilder::PickCompaction:Return",
841         "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"},
842        {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2",
843         "CompactionJob::Run():Start"}});
844 
845   int total_picked_compactions = 0;
846   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
847       "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
848         if (arg) {
849           total_picked_compactions++;
850         }
851       });
852 
853   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
854 
855   // Write 7 files to trigger compaction
856   int key_idx = 1;
857   for (int i = 1; i <= 70; i++) {
858     std::string k = Key(key_idx++);
859     ASSERT_OK(Put(k, k));
860     if (i % 10 == 0) {
861       ASSERT_OK(Flush());
862     }
863   }
864 
865   // Wait for the 1st background compaction process to start
866   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
867   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
868   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
869 
870   // Write 3 files while 1st compaction is held
871   // These 3 files have different sizes to avoid compacting based on size_ratio
872   int num_keys = 1000;
873   for (int i = 0; i < 3; i++) {
874     for (int j = 1; j <= num_keys; j++) {
875       std::string k = Key(key_idx++);
876       ASSERT_OK(Put(k, k));
877     }
878     ASSERT_OK(Flush());
879     num_keys -= 100;
880   }
881 
882   // Hold the 1st compaction from finishing
883   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
884   dbfull()->TEST_WaitForCompact();
885 
886   // There should only be one picked compaction as the score drops below one
887   // after the first one is picked.
888   EXPECT_EQ(total_picked_compactions, 1);
889   EXPECT_EQ(TotalTableFiles(), 4);
890 
891   // Stop SyncPoint and destroy the DB and reopen it again
892   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
893   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
894   key_idx = 1;
895   total_picked_compactions = 0;
896   DestroyAndReopen(options);
897 
898   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
899 
900   // Write 7 files to trigger compaction
901   for (int i = 1; i <= 70; i++) {
902     std::string k = Key(key_idx++);
903     ASSERT_OK(Put(k, k));
904     if (i % 10 == 0) {
905       ASSERT_OK(Flush());
906     }
907   }
908 
909   // Wait for the 1st background compaction process to start
910   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
911   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
912   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
913 
914   // Write 8 files while 1st compaction is held
915   // These 8 files have different sizes to avoid compacting based on size_ratio
916   num_keys = 1000;
917   for (int i = 0; i < 8; i++) {
918     for (int j = 1; j <= num_keys; j++) {
919       std::string k = Key(key_idx++);
920       ASSERT_OK(Put(k, k));
921     }
922     ASSERT_OK(Flush());
923     num_keys -= 100;
924   }
925 
926   // Wait for the 2nd background compaction process to start
927   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:0");
928   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:1");
929 
930   // Hold the 1st and 2nd compaction from finishing
931   TEST_SYNC_POINT("DBTestUniversalCompactionParallel::PickByFileNumberBug:2");
932   dbfull()->TEST_WaitForCompact();
933 
934   // This time we will trigger a compaction because of size ratio and
935   // another compaction because of number of files that are not compacted
936   // greater than 7
937   EXPECT_GE(total_picked_compactions, 2);
938 }
939 
940 INSTANTIATE_TEST_CASE_P(Parallel, DBTestUniversalCompactionParallel,
941                         ::testing::Combine(::testing::Values(1, 10),
942                                            ::testing::Values(false)));
943 #endif  // ROCKSDB_VALGRIND_RUN
944 
TEST_P(DBTestUniversalCompaction,UniversalCompactionOptions)945 TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) {
946   Options options = CurrentOptions();
947   options.compaction_style = kCompactionStyleUniversal;
948   options.write_buffer_size = 105 << 10;    // 105KB
949   options.arena_block_size = 4 << 10;       // 4KB
950   options.target_file_size_base = 32 << 10;  // 32KB
951   options.level0_file_num_compaction_trigger = 4;
952   options.num_levels = num_levels_;
953   options.compaction_options_universal.compression_size_percent = -1;
954   DestroyAndReopen(options);
955   CreateAndReopenWithCF({"pikachu"}, options);
956 
957   Random rnd(301);
958   int key_idx = 0;
959 
960   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
961     // Write 100KB (100 values, each 1K)
962     for (int i = 0; i < 100; i++) {
963       ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 990)));
964       key_idx++;
965     }
966     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
967 
968     if (num < options.level0_file_num_compaction_trigger - 1) {
969       ASSERT_EQ(NumSortedRuns(1), num + 1);
970     }
971   }
972 
973   dbfull()->TEST_WaitForCompact();
974   ASSERT_EQ(NumSortedRuns(1), 1);
975 }
976 
TEST_P(DBTestUniversalCompaction,UniversalCompactionStopStyleSimilarSize)977 TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) {
978   Options options = CurrentOptions();
979   options.compaction_style = kCompactionStyleUniversal;
980   options.write_buffer_size = 105 << 10;    // 105KB
981   options.arena_block_size = 4 << 10;       // 4KB
982   options.target_file_size_base = 32 << 10;  // 32KB
983   // trigger compaction if there are >= 4 files
984   options.level0_file_num_compaction_trigger = 4;
985   options.compaction_options_universal.size_ratio = 10;
986   options.compaction_options_universal.stop_style =
987       kCompactionStopStyleSimilarSize;
988   options.num_levels = num_levels_;
989   DestroyAndReopen(options);
990 
991   Random rnd(301);
992   int key_idx = 0;
993 
994   // Stage 1:
995   //   Generate a set of files at level 0, but don't trigger level-0
996   //   compaction.
997   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
998        num++) {
999     // Write 100KB (100 values, each 1K)
1000     for (int i = 0; i < 100; i++) {
1001       ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1002       key_idx++;
1003     }
1004     dbfull()->TEST_WaitForFlushMemTable();
1005     ASSERT_EQ(NumSortedRuns(), num + 1);
1006   }
1007 
1008   // Generate one more file at level-0, which should trigger level-0
1009   // compaction.
1010   for (int i = 0; i < 100; i++) {
1011     ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1012     key_idx++;
1013   }
1014   dbfull()->TEST_WaitForCompact();
1015   // Suppose each file flushed from mem table has size 1. Now we compact
1016   // (level0_file_num_compaction_trigger+1)=4 files and should have a big
1017   // file of size 4.
1018   ASSERT_EQ(NumSortedRuns(), 1);
1019 
1020   // Stage 2:
1021   //   Now we have one file at level 0, with size 4. We also have some data in
1022   //   mem table. Let's continue generating new files at level 0, but don't
1023   //   trigger level-0 compaction.
1024   //   First, clean up memtable before inserting new data. This will generate
1025   //   a level-0 file, with size around 0.4 (according to previously written
1026   //   data amount).
1027   dbfull()->Flush(FlushOptions());
1028   for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
1029        num++) {
1030     // Write 110KB (11 values, each 10K)
1031     for (int i = 0; i < 100; i++) {
1032       ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1033       key_idx++;
1034     }
1035     dbfull()->TEST_WaitForFlushMemTable();
1036     ASSERT_EQ(NumSortedRuns(), num + 3);
1037   }
1038 
1039   // Generate one more file at level-0, which should trigger level-0
1040   // compaction.
1041   for (int i = 0; i < 100; i++) {
1042     ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1043     key_idx++;
1044   }
1045   dbfull()->TEST_WaitForCompact();
1046   // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1.
1047   // After compaction, we should have 3 files, with size 4, 0.4, 2.
1048   ASSERT_EQ(NumSortedRuns(), 3);
1049   // Stage 3:
1050   //   Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
1051   //   more file at level-0, which should trigger level-0 compaction.
1052   for (int i = 0; i < 100; i++) {
1053     ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 990)));
1054     key_idx++;
1055   }
1056   dbfull()->TEST_WaitForCompact();
1057   // Level-0 compaction is triggered, but no file will be picked up.
1058   ASSERT_EQ(NumSortedRuns(), 4);
1059 }
1060 
TEST_P(DBTestUniversalCompaction,UniversalCompactionCompressRatio1)1061 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) {
1062   if (!Snappy_Supported()) {
1063     return;
1064   }
1065 
1066   Options options = CurrentOptions();
1067   options.compaction_style = kCompactionStyleUniversal;
1068   options.write_buffer_size = 100 << 10;     // 100KB
1069   options.target_file_size_base = 32 << 10;  // 32KB
1070   options.level0_file_num_compaction_trigger = 2;
1071   options.num_levels = num_levels_;
1072   options.compaction_options_universal.compression_size_percent = 70;
1073   DestroyAndReopen(options);
1074 
1075   Random rnd(301);
1076   int key_idx = 0;
1077 
1078   // The first compaction (2) is compressed.
1079   for (int num = 0; num < 2; num++) {
1080     // Write 110KB (11 values, each 10K)
1081     for (int i = 0; i < 11; i++) {
1082       ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1083       key_idx++;
1084     }
1085     dbfull()->TEST_WaitForFlushMemTable();
1086     dbfull()->TEST_WaitForCompact();
1087   }
1088   ASSERT_LT(TotalSize(), 110000U * 2 * 0.9);
1089 
1090   // The second compaction (4) is compressed
1091   for (int num = 0; num < 2; num++) {
1092     // Write 110KB (11 values, each 10K)
1093     for (int i = 0; i < 11; i++) {
1094       ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1095       key_idx++;
1096     }
1097     dbfull()->TEST_WaitForFlushMemTable();
1098     dbfull()->TEST_WaitForCompact();
1099   }
1100   ASSERT_LT(TotalSize(), 110000 * 4 * 0.9);
1101 
1102   // The third compaction (2 4) is compressed since this time it is
1103   // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
1104   for (int num = 0; num < 2; num++) {
1105     // Write 110KB (11 values, each 10K)
1106     for (int i = 0; i < 11; i++) {
1107       ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1108       key_idx++;
1109     }
1110     dbfull()->TEST_WaitForFlushMemTable();
1111     dbfull()->TEST_WaitForCompact();
1112   }
1113   ASSERT_LT(TotalSize(), 110000 * 6 * 0.9);
1114 
1115   // When we start for the compaction up to (2 4 8), the latest
1116   // compressed is not compressed.
1117   for (int num = 0; num < 8; num++) {
1118     // Write 110KB (11 values, each 10K)
1119     for (int i = 0; i < 11; i++) {
1120       ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1121       key_idx++;
1122     }
1123     dbfull()->TEST_WaitForFlushMemTable();
1124     dbfull()->TEST_WaitForCompact();
1125   }
1126   ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2);
1127 }
1128 
TEST_P(DBTestUniversalCompaction,UniversalCompactionCompressRatio2)1129 TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) {
1130   if (!Snappy_Supported()) {
1131     return;
1132   }
1133   Options options = CurrentOptions();
1134   options.compaction_style = kCompactionStyleUniversal;
1135   options.write_buffer_size = 100 << 10;     // 100KB
1136   options.target_file_size_base = 32 << 10;  // 32KB
1137   options.level0_file_num_compaction_trigger = 2;
1138   options.num_levels = num_levels_;
1139   options.compaction_options_universal.compression_size_percent = 95;
1140   DestroyAndReopen(options);
1141 
1142   Random rnd(301);
1143   int key_idx = 0;
1144 
1145   // When we start for the compaction up to (2 4 8), the latest
1146   // compressed is compressed given the size ratio to compress.
1147   for (int num = 0; num < 14; num++) {
1148     // Write 120KB (12 values, each 10K)
1149     for (int i = 0; i < 12; i++) {
1150       ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
1151       key_idx++;
1152     }
1153     dbfull()->TEST_WaitForFlushMemTable();
1154     dbfull()->TEST_WaitForCompact();
1155   }
1156   ASSERT_LT(TotalSize(), 120000U * 12 * 0.82 + 120000 * 2);
1157 }
1158 
1159 #ifndef ROCKSDB_VALGRIND_RUN
1160 // Test that checks trivial move in universal compaction
TEST_P(DBTestUniversalCompaction,UniversalCompactionTrivialMoveTest1)1161 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) {
1162   int32_t trivial_move = 0;
1163   int32_t non_trivial_move = 0;
1164   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1165       "DBImpl::BackgroundCompaction:TrivialMove",
1166       [&](void* /*arg*/) { trivial_move++; });
1167   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1168       "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1169         non_trivial_move++;
1170         ASSERT_TRUE(arg != nullptr);
1171         int output_level = *(static_cast<int*>(arg));
1172         ASSERT_EQ(output_level, 0);
1173       });
1174   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1175 
1176   Options options = CurrentOptions();
1177   options.compaction_style = kCompactionStyleUniversal;
1178   options.compaction_options_universal.allow_trivial_move = true;
1179   options.num_levels = 2;
1180   options.write_buffer_size = 100 << 10;  // 100KB
1181   options.level0_file_num_compaction_trigger = 3;
1182   options.max_background_compactions = 1;
1183   options.target_file_size_base = 32 * 1024;
1184   DestroyAndReopen(options);
1185   CreateAndReopenWithCF({"pikachu"}, options);
1186 
1187   // Trigger compaction if size amplification exceeds 110%
1188   options.compaction_options_universal.max_size_amplification_percent = 110;
1189   options = CurrentOptions(options);
1190   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1191 
1192   Random rnd(301);
1193   int num_keys = 250000;
1194   for (int i = 0; i < num_keys; i++) {
1195     ASSERT_OK(Put(1, Key(i), Key(i)));
1196   }
1197   std::vector<std::string> values;
1198 
1199   ASSERT_OK(Flush(1));
1200   dbfull()->TEST_WaitForCompact();
1201 
1202   ASSERT_GT(trivial_move, 0);
1203   ASSERT_GT(non_trivial_move, 0);
1204 
1205   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1206 }
1207 // Test that checks trivial move in universal compaction
TEST_P(DBTestUniversalCompaction,UniversalCompactionTrivialMoveTest2)1208 TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) {
1209   int32_t trivial_move = 0;
1210   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1211       "DBImpl::BackgroundCompaction:TrivialMove",
1212       [&](void* /*arg*/) { trivial_move++; });
1213   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1214       "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) {
1215         ASSERT_TRUE(arg != nullptr);
1216         int output_level = *(static_cast<int*>(arg));
1217         ASSERT_EQ(output_level, 0);
1218       });
1219 
1220   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1221 
1222   Options options = CurrentOptions();
1223   options.compaction_style = kCompactionStyleUniversal;
1224   options.compaction_options_universal.allow_trivial_move = true;
1225   options.num_levels = 15;
1226   options.write_buffer_size = 100 << 10;  // 100KB
1227   options.level0_file_num_compaction_trigger = 8;
1228   options.max_background_compactions = 2;
1229   options.target_file_size_base = 64 * 1024;
1230   DestroyAndReopen(options);
1231   CreateAndReopenWithCF({"pikachu"}, options);
1232 
1233   // Trigger compaction if size amplification exceeds 110%
1234   options.compaction_options_universal.max_size_amplification_percent = 110;
1235   options = CurrentOptions(options);
1236   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1237 
1238   Random rnd(301);
1239   int num_keys = 500000;
1240   for (int i = 0; i < num_keys; i++) {
1241     ASSERT_OK(Put(1, Key(i), Key(i)));
1242   }
1243   std::vector<std::string> values;
1244 
1245   ASSERT_OK(Flush(1));
1246   dbfull()->TEST_WaitForCompact();
1247 
1248   ASSERT_GT(trivial_move, 0);
1249 
1250   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1251 }
1252 #endif  // ROCKSDB_VALGRIND_RUN
1253 
TEST_P(DBTestUniversalCompaction,UniversalCompactionFourPaths)1254 TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
1255   Options options = CurrentOptions();
1256   options.db_paths.emplace_back(dbname_, 300 * 1024);
1257   options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1258   options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1259   options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1260   options.memtable_factory.reset(
1261       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1262   options.compaction_style = kCompactionStyleUniversal;
1263   options.compaction_options_universal.size_ratio = 5;
1264   options.write_buffer_size = 111 << 10;  // 114KB
1265   options.arena_block_size = 4 << 10;
1266   options.level0_file_num_compaction_trigger = 2;
1267   options.num_levels = 1;
1268 
1269   std::vector<std::string> filenames;
1270   env_->GetChildren(options.db_paths[1].path, &filenames);
1271   // Delete archival files.
1272   for (size_t i = 0; i < filenames.size(); ++i) {
1273     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1274   }
1275   env_->DeleteDir(options.db_paths[1].path);
1276   Reopen(options);
1277 
1278   Random rnd(301);
1279   int key_idx = 0;
1280 
1281   // First three 110KB files are not going to second path.
1282   // After that, (100K, 200K)
1283   for (int num = 0; num < 3; num++) {
1284     GenerateNewFile(&rnd, &key_idx);
1285   }
1286 
1287   // Another 110KB triggers a compaction to 400K file to second path
1288   GenerateNewFile(&rnd, &key_idx);
1289   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1290 
1291   // (1, 4)
1292   GenerateNewFile(&rnd, &key_idx);
1293   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1294   ASSERT_EQ(1, GetSstFileCount(dbname_));
1295 
1296   // (1,1,4) -> (2, 4)
1297   GenerateNewFile(&rnd, &key_idx);
1298   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1299   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1300   ASSERT_EQ(0, GetSstFileCount(dbname_));
1301 
1302   // (1, 2, 4) -> (3, 4)
1303   GenerateNewFile(&rnd, &key_idx);
1304   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1305   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1306   ASSERT_EQ(0, GetSstFileCount(dbname_));
1307 
1308   // (1, 3, 4) -> (8)
1309   GenerateNewFile(&rnd, &key_idx);
1310   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1311 
1312   // (1, 8)
1313   GenerateNewFile(&rnd, &key_idx);
1314   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1315   ASSERT_EQ(1, GetSstFileCount(dbname_));
1316 
1317   // (1, 1, 8) -> (2, 8)
1318   GenerateNewFile(&rnd, &key_idx);
1319   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1320   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1321 
1322   // (1, 2, 8) -> (3, 8)
1323   GenerateNewFile(&rnd, &key_idx);
1324   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1325   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1326   ASSERT_EQ(0, GetSstFileCount(dbname_));
1327 
1328   // (1, 3, 8) -> (4, 8)
1329   GenerateNewFile(&rnd, &key_idx);
1330   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1331   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1332 
1333   // (1, 4, 8) -> (5, 8)
1334   GenerateNewFile(&rnd, &key_idx);
1335   ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
1336   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1337   ASSERT_EQ(0, GetSstFileCount(dbname_));
1338 
1339   for (int i = 0; i < key_idx; i++) {
1340     auto v = Get(Key(i));
1341     ASSERT_NE(v, "NOT_FOUND");
1342     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1343   }
1344 
1345   Reopen(options);
1346 
1347   for (int i = 0; i < key_idx; i++) {
1348     auto v = Get(Key(i));
1349     ASSERT_NE(v, "NOT_FOUND");
1350     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1351   }
1352 
1353   Destroy(options);
1354 }
1355 
TEST_P(DBTestUniversalCompaction,UniversalCompactionCFPathUse)1356 TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) {
1357   Options options = CurrentOptions();
1358   options.db_paths.emplace_back(dbname_, 300 * 1024);
1359   options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
1360   options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
1361   options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
1362   options.memtable_factory.reset(
1363       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1364   options.compaction_style = kCompactionStyleUniversal;
1365   options.compaction_options_universal.size_ratio = 10;
1366   options.write_buffer_size = 111 << 10;  // 114KB
1367   options.arena_block_size = 4 << 10;
1368   options.level0_file_num_compaction_trigger = 2;
1369   options.num_levels = 1;
1370 
1371   std::vector<Options> option_vector;
1372   option_vector.emplace_back(options);
1373   ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
1374   // Configure CF1 specific paths.
1375   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024);
1376   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024);
1377   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024);
1378   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024);
1379   option_vector.emplace_back(DBOptions(options), cf_opt1);
1380   CreateColumnFamilies({"one"},option_vector[1]);
1381 
1382   // Configura CF2 specific paths.
1383   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024);
1384   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024);
1385   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024);
1386   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024);
1387   option_vector.emplace_back(DBOptions(options), cf_opt2);
1388   CreateColumnFamilies({"two"},option_vector[2]);
1389 
1390   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1391 
1392   Random rnd(301);
1393   int key_idx = 0;
1394   int key_idx1 = 0;
1395   int key_idx2 = 0;
1396 
1397   auto generate_file = [&]() {
1398     GenerateNewFile(0, &rnd, &key_idx);
1399     GenerateNewFile(1, &rnd, &key_idx1);
1400     GenerateNewFile(2, &rnd, &key_idx2);
1401   };
1402 
1403   auto check_sstfilecount = [&](int path_id, int expected) {
1404     ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
1405     ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
1406     ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
1407   };
1408 
1409   auto check_getvalues = [&]() {
1410     for (int i = 0; i < key_idx; i++) {
1411       auto v = Get(0, Key(i));
1412       ASSERT_NE(v, "NOT_FOUND");
1413       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1414     }
1415 
1416     for (int i = 0; i < key_idx1; i++) {
1417       auto v = Get(1, Key(i));
1418       ASSERT_NE(v, "NOT_FOUND");
1419       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1420     }
1421 
1422     for (int i = 0; i < key_idx2; i++) {
1423       auto v = Get(2, Key(i));
1424       ASSERT_NE(v, "NOT_FOUND");
1425       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1426     }
1427   };
1428 
1429   // First three 110KB files are not going to second path.
1430   // After that, (100K, 200K)
1431   for (int num = 0; num < 3; num++) {
1432     generate_file();
1433   }
1434 
1435   // Another 110KB triggers a compaction to 400K file to second path
1436   generate_file();
1437   check_sstfilecount(2, 1);
1438 
1439   // (1, 4)
1440   generate_file();
1441   check_sstfilecount(2, 1);
1442   check_sstfilecount(0, 1);
1443 
1444   // (1,1,4) -> (2, 4)
1445   generate_file();
1446   check_sstfilecount(2, 1);
1447   check_sstfilecount(1, 1);
1448   check_sstfilecount(0, 0);
1449 
1450   // (1, 2, 4) -> (3, 4)
1451   generate_file();
1452   check_sstfilecount(2, 1);
1453   check_sstfilecount(1, 1);
1454   check_sstfilecount(0, 0);
1455 
1456   // (1, 3, 4) -> (8)
1457   generate_file();
1458   check_sstfilecount(3, 1);
1459 
1460   // (1, 8)
1461   generate_file();
1462   check_sstfilecount(3, 1);
1463   check_sstfilecount(0, 1);
1464 
1465   // (1, 1, 8) -> (2, 8)
1466   generate_file();
1467   check_sstfilecount(3, 1);
1468   check_sstfilecount(1, 1);
1469 
1470   // (1, 2, 8) -> (3, 8)
1471   generate_file();
1472   check_sstfilecount(3, 1);
1473   check_sstfilecount(1, 1);
1474   check_sstfilecount(0, 0);
1475 
1476   // (1, 3, 8) -> (4, 8)
1477   generate_file();
1478   check_sstfilecount(2, 1);
1479   check_sstfilecount(3, 1);
1480 
1481   // (1, 4, 8) -> (5, 8)
1482   generate_file();
1483   check_sstfilecount(3, 1);
1484   check_sstfilecount(2, 1);
1485   check_sstfilecount(0, 0);
1486 
1487   check_getvalues();
1488 
1489   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
1490 
1491   check_getvalues();
1492 
1493   Destroy(options, true);
1494 }
1495 
TEST_P(DBTestUniversalCompaction,IncreaseUniversalCompactionNumLevels)1496 TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
1497   std::function<void(int)> verify_func = [&](int num_keys_in_db) {
1498     std::string keys_in_db;
1499     Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
1500     for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1501       keys_in_db.append(iter->key().ToString());
1502       keys_in_db.push_back(',');
1503     }
1504     delete iter;
1505 
1506     std::string expected_keys;
1507     for (int i = 0; i <= num_keys_in_db; i++) {
1508       expected_keys.append(Key(i));
1509       expected_keys.push_back(',');
1510     }
1511 
1512     ASSERT_EQ(keys_in_db, expected_keys);
1513   };
1514 
1515   Random rnd(301);
1516   int max_key1 = 200;
1517   int max_key2 = 600;
1518   int max_key3 = 800;
1519   const int KNumKeysPerFile = 10;
1520 
1521   // Stage 1: open a DB with universal compaction, num_levels=1
1522   Options options = CurrentOptions();
1523   options.compaction_style = kCompactionStyleUniversal;
1524   options.num_levels = 1;
1525   options.write_buffer_size = 200 << 10;  // 200KB
1526   options.level0_file_num_compaction_trigger = 3;
1527   options.memtable_factory.reset(new SpecialSkipListFactory(KNumKeysPerFile));
1528   options = CurrentOptions(options);
1529   CreateAndReopenWithCF({"pikachu"}, options);
1530 
1531   for (int i = 0; i <= max_key1; i++) {
1532     // each value is 10K
1533     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1534     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1535     dbfull()->TEST_WaitForCompact();
1536   }
1537   ASSERT_OK(Flush(1));
1538   dbfull()->TEST_WaitForCompact();
1539 
1540   // Stage 2: reopen with universal compaction, num_levels=4
1541   options.compaction_style = kCompactionStyleUniversal;
1542   options.num_levels = 4;
1543   options = CurrentOptions(options);
1544   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1545 
1546   verify_func(max_key1);
1547 
1548   // Insert more keys
1549   for (int i = max_key1 + 1; i <= max_key2; i++) {
1550     // each value is 10K
1551     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1552     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1553     dbfull()->TEST_WaitForCompact();
1554   }
1555   ASSERT_OK(Flush(1));
1556   dbfull()->TEST_WaitForCompact();
1557 
1558   verify_func(max_key2);
1559   // Compaction to non-L0 has happened.
1560   ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0);
1561 
1562   // Stage 3: Revert it back to one level and revert to num_levels=1.
1563   options.num_levels = 4;
1564   options.target_file_size_base = INT_MAX;
1565   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1566   // Compact all to level 0
1567   CompactRangeOptions compact_options;
1568   compact_options.change_level = true;
1569   compact_options.target_level = 0;
1570   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1571   dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1572   // Need to restart it once to remove higher level records in manifest.
1573   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1574   // Final reopen
1575   options.compaction_style = kCompactionStyleUniversal;
1576   options.num_levels = 1;
1577   options = CurrentOptions(options);
1578   ReopenWithColumnFamilies({"default", "pikachu"}, options);
1579 
1580   // Insert more keys
1581   for (int i = max_key2 + 1; i <= max_key3; i++) {
1582     // each value is 10K
1583     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
1584     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
1585     dbfull()->TEST_WaitForCompact();
1586   }
1587   ASSERT_OK(Flush(1));
1588   dbfull()->TEST_WaitForCompact();
1589   verify_func(max_key3);
1590 }
1591 
1592 
TEST_P(DBTestUniversalCompaction,UniversalCompactionSecondPathRatio)1593 TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) {
1594   if (!Snappy_Supported()) {
1595     return;
1596   }
1597   Options options = CurrentOptions();
1598   options.db_paths.emplace_back(dbname_, 500 * 1024);
1599   options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
1600   options.compaction_style = kCompactionStyleUniversal;
1601   options.compaction_options_universal.size_ratio = 5;
1602   options.write_buffer_size = 111 << 10;  // 114KB
1603   options.arena_block_size = 4 << 10;
1604   options.level0_file_num_compaction_trigger = 2;
1605   options.num_levels = 1;
1606   options.memtable_factory.reset(
1607       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1608 
1609   std::vector<std::string> filenames;
1610   env_->GetChildren(options.db_paths[1].path, &filenames);
1611   // Delete archival files.
1612   for (size_t i = 0; i < filenames.size(); ++i) {
1613     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1614   }
1615   env_->DeleteDir(options.db_paths[1].path);
1616   Reopen(options);
1617 
1618   Random rnd(301);
1619   int key_idx = 0;
1620 
1621   // First three 110KB files are not going to second path.
1622   // After that, (100K, 200K)
1623   for (int num = 0; num < 3; num++) {
1624     GenerateNewFile(&rnd, &key_idx);
1625   }
1626 
1627   // Another 110KB triggers a compaction to 400K file to second path
1628   GenerateNewFile(&rnd, &key_idx);
1629   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1630 
1631   // (1, 4)
1632   GenerateNewFile(&rnd, &key_idx);
1633   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1634   ASSERT_EQ(1, GetSstFileCount(dbname_));
1635 
1636   // (1,1,4) -> (2, 4)
1637   GenerateNewFile(&rnd, &key_idx);
1638   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1639   ASSERT_EQ(1, GetSstFileCount(dbname_));
1640 
1641   // (1, 2, 4) -> (3, 4)
1642   GenerateNewFile(&rnd, &key_idx);
1643   ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1644   ASSERT_EQ(0, GetSstFileCount(dbname_));
1645 
1646   // (1, 3, 4) -> (8)
1647   GenerateNewFile(&rnd, &key_idx);
1648   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1649   ASSERT_EQ(0, GetSstFileCount(dbname_));
1650 
1651   // (1, 8)
1652   GenerateNewFile(&rnd, &key_idx);
1653   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1654   ASSERT_EQ(1, GetSstFileCount(dbname_));
1655 
1656   // (1, 1, 8) -> (2, 8)
1657   GenerateNewFile(&rnd, &key_idx);
1658   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1659   ASSERT_EQ(1, GetSstFileCount(dbname_));
1660 
1661   // (1, 2, 8) -> (3, 8)
1662   GenerateNewFile(&rnd, &key_idx);
1663   ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1664   ASSERT_EQ(0, GetSstFileCount(dbname_));
1665 
1666   // (1, 3, 8) -> (4, 8)
1667   GenerateNewFile(&rnd, &key_idx);
1668   ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1669   ASSERT_EQ(0, GetSstFileCount(dbname_));
1670 
1671   // (1, 4, 8) -> (5, 8)
1672   GenerateNewFile(&rnd, &key_idx);
1673   ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
1674   ASSERT_EQ(0, GetSstFileCount(dbname_));
1675 
1676   for (int i = 0; i < key_idx; i++) {
1677     auto v = Get(Key(i));
1678     ASSERT_NE(v, "NOT_FOUND");
1679     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1680   }
1681 
1682   Reopen(options);
1683 
1684   for (int i = 0; i < key_idx; i++) {
1685     auto v = Get(Key(i));
1686     ASSERT_NE(v, "NOT_FOUND");
1687     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1688   }
1689 
1690   Destroy(options);
1691 }
1692 
TEST_P(DBTestUniversalCompaction,ConcurrentBottomPriLowPriCompactions)1693 TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) {
1694   if (num_levels_ == 1) {
1695     // for single-level universal, everything's bottom level so nothing should
1696     // be executed in bottom-pri thread pool.
1697     return;
1698   }
1699   const int kNumFilesTrigger = 3;
1700   Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
1701   Options options = CurrentOptions();
1702   options.compaction_style = kCompactionStyleUniversal;
1703   options.num_levels = num_levels_;
1704   options.write_buffer_size = 100 << 10;     // 100KB
1705   options.target_file_size_base = 32 << 10;  // 32KB
1706   options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1707   // Trigger compaction if size amplification exceeds 110%
1708   options.compaction_options_universal.max_size_amplification_percent = 110;
1709   DestroyAndReopen(options);
1710 
1711   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1712       {// wait for the full compaction to be picked before adding files intended
1713        // for the second one.
1714        {"DBImpl::BackgroundCompaction:ForwardToBottomPriPool",
1715         "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0"},
1716        // the full (bottom-pri) compaction waits until a partial (low-pri)
1717        // compaction has started to verify they can run in parallel.
1718        {"DBImpl::BackgroundCompaction:NonTrivial",
1719         "DBImpl::BGWorkBottomCompaction"}});
1720   SyncPoint::GetInstance()->EnableProcessing();
1721 
1722   Random rnd(301);
1723   for (int i = 0; i < 2; ++i) {
1724     for (int num = 0; num < kNumFilesTrigger; num++) {
1725       int key_idx = 0;
1726       GenerateNewFile(&rnd, &key_idx, true /* no_wait */);
1727       // use no_wait above because that one waits for flush and compaction. We
1728       // don't want to wait for compaction because the full compaction is
1729       // intentionally blocked while more files are flushed.
1730       dbfull()->TEST_WaitForFlushMemTable();
1731     }
1732     if (i == 0) {
1733       TEST_SYNC_POINT(
1734           "DBTestUniversalCompaction:ConcurrentBottomPriLowPriCompactions:0");
1735     }
1736   }
1737   dbfull()->TEST_WaitForCompact();
1738 
1739   // First compaction should output to bottom level. Second should output to L0
1740   // since older L0 files pending compaction prevent it from being placed lower.
1741   ASSERT_EQ(NumSortedRuns(), 2);
1742   ASSERT_GT(NumTableFilesAtLevel(0), 0);
1743   ASSERT_GT(NumTableFilesAtLevel(num_levels_ - 1), 0);
1744   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1745   Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
1746 }
1747 
TEST_P(DBTestUniversalCompaction,RecalculateScoreAfterPicking)1748 TEST_P(DBTestUniversalCompaction, RecalculateScoreAfterPicking) {
1749   // Regression test for extra compactions scheduled. Once enough compactions
1750   // have been scheduled to bring the score below one, we should stop
1751   // scheduling more; otherwise, other CFs/DBs may be delayed unnecessarily.
1752   const int kNumFilesTrigger = 8;
1753   Options options = CurrentOptions();
1754   options.memtable_factory.reset(
1755       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1756   options.compaction_options_universal.max_merge_width = kNumFilesTrigger / 2;
1757   options.compaction_options_universal.max_size_amplification_percent =
1758       static_cast<unsigned int>(-1);
1759   options.compaction_style = kCompactionStyleUniversal;
1760   options.level0_file_num_compaction_trigger = kNumFilesTrigger;
1761   options.num_levels = num_levels_;
1762   Reopen(options);
1763 
1764   std::atomic<int> num_compactions_attempted(0);
1765   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1766       "DBImpl::BackgroundCompaction:Start",
1767       [&](void* /*arg*/) { ++num_compactions_attempted; });
1768   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1769 
1770   Random rnd(301);
1771   for (int num = 0; num < kNumFilesTrigger; num++) {
1772     ASSERT_EQ(NumSortedRuns(), num);
1773     int key_idx = 0;
1774     GenerateNewFile(&rnd, &key_idx);
1775   }
1776   dbfull()->TEST_WaitForCompact();
1777   // Compacting the first four files was enough to bring the score below one so
1778   // there's no need to schedule any more compactions.
1779   ASSERT_EQ(1, num_compactions_attempted);
1780   ASSERT_EQ(NumSortedRuns(), 5);
1781 }
1782 
TEST_P(DBTestUniversalCompaction,FinalSortedRunCompactFilesConflict)1783 TEST_P(DBTestUniversalCompaction, FinalSortedRunCompactFilesConflict) {
1784   // Regression test for conflict between:
1785   // (1) Running CompactFiles including file in the final sorted run; and
1786   // (2) Picking universal size-amp-triggered compaction, which always includes
1787   //     the final sorted run.
1788   if (exclusive_manual_compaction_) {
1789     return;
1790   }
1791 
1792   Options opts = CurrentOptions();
1793   opts.compaction_style = kCompactionStyleUniversal;
1794   opts.compaction_options_universal.max_size_amplification_percent = 50;
1795   opts.compaction_options_universal.min_merge_width = 2;
1796   opts.compression = kNoCompression;
1797   opts.level0_file_num_compaction_trigger = 2;
1798   opts.max_background_compactions = 2;
1799   opts.num_levels = num_levels_;
1800   Reopen(opts);
1801 
1802   // make sure compaction jobs can be parallelized
1803   auto stop_token =
1804       dbfull()->TEST_write_controler().GetCompactionPressureToken();
1805 
1806   Put("key", "val");
1807   Flush();
1808   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1809   ASSERT_EQ(NumTableFilesAtLevel(num_levels_ - 1), 1);
1810   ColumnFamilyMetaData cf_meta;
1811   ColumnFamilyHandle* default_cfh = db_->DefaultColumnFamily();
1812   dbfull()->GetColumnFamilyMetaData(default_cfh, &cf_meta);
1813   ASSERT_EQ(1, cf_meta.levels[num_levels_ - 1].files.size());
1814   std::string first_sst_filename =
1815       cf_meta.levels[num_levels_ - 1].files[0].name;
1816 
1817   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1818       {{"CompactFilesImpl:0",
1819         "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0"},
1820        {"DBImpl::BackgroundCompaction():AfterPickCompaction",
1821         "CompactFilesImpl:1"}});
1822   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1823 
1824   port::Thread compact_files_thread([&]() {
1825     ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), default_cfh,
1826                                      {first_sst_filename}, num_levels_ - 1));
1827   });
1828 
1829   TEST_SYNC_POINT(
1830       "DBTestUniversalCompaction:FinalSortedRunCompactFilesConflict:0");
1831   for (int i = 0; i < 2; ++i) {
1832     Put("key", "val");
1833     Flush();
1834   }
1835   dbfull()->TEST_WaitForCompact();
1836 
1837   compact_files_thread.join();
1838 }
1839 
1840 INSTANTIATE_TEST_CASE_P(NumLevels, DBTestUniversalCompaction,
1841                         ::testing::Combine(::testing::Values(1, 3, 5),
1842                                            ::testing::Bool()));
1843 
1844 class DBTestUniversalManualCompactionOutputPathId
1845     : public DBTestUniversalCompactionBase {
1846  public:
DBTestUniversalManualCompactionOutputPathId()1847   DBTestUniversalManualCompactionOutputPathId() :
1848       DBTestUniversalCompactionBase(
1849           "/db_universal_compaction_manual_pid_test") {}
1850 };
1851 
TEST_P(DBTestUniversalManualCompactionOutputPathId,ManualCompactionOutputPathId)1852 TEST_P(DBTestUniversalManualCompactionOutputPathId,
1853        ManualCompactionOutputPathId) {
1854   Options options = CurrentOptions();
1855   options.create_if_missing = true;
1856   options.db_paths.emplace_back(dbname_, 1000000000);
1857   options.db_paths.emplace_back(dbname_ + "_2", 1000000000);
1858   options.compaction_style = kCompactionStyleUniversal;
1859   options.num_levels = num_levels_;
1860   options.target_file_size_base = 1 << 30;  // Big size
1861   options.level0_file_num_compaction_trigger = 10;
1862   Destroy(options);
1863   DestroyAndReopen(options);
1864   CreateAndReopenWithCF({"pikachu"}, options);
1865   MakeTables(3, "p", "q", 1);
1866   dbfull()->TEST_WaitForCompact();
1867   ASSERT_EQ(2, TotalLiveFiles(1));
1868   ASSERT_EQ(2, GetSstFileCount(options.db_paths[0].path));
1869   ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1870 
1871   // Full compaction to DB path 0
1872   CompactRangeOptions compact_options;
1873   compact_options.target_path_id = 1;
1874   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1875   db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1876   ASSERT_EQ(1, TotalLiveFiles(1));
1877   ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1878   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1879 
1880   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1881   ASSERT_EQ(1, TotalLiveFiles(1));
1882   ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
1883   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1884 
1885   MakeTables(1, "p", "q", 1);
1886   ASSERT_EQ(2, TotalLiveFiles(1));
1887   ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1888   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1889 
1890   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
1891   ASSERT_EQ(2, TotalLiveFiles(1));
1892   ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1893   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1894 
1895   // Full compaction to DB path 0
1896   compact_options.target_path_id = 0;
1897   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1898   db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
1899   ASSERT_EQ(1, TotalLiveFiles(1));
1900   ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
1901   ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
1902 
1903   // Fail when compacting to an invalid path ID
1904   compact_options.target_path_id = 2;
1905   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1906   ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
1907                   .IsInvalidArgument());
1908 }
1909 
1910 INSTANTIATE_TEST_CASE_P(OutputPathId,
1911                         DBTestUniversalManualCompactionOutputPathId,
1912                         ::testing::Combine(::testing::Values(1, 8),
1913                                            ::testing::Bool()));
1914 
TEST_F(DBTestUniversalCompaction2,BasicL0toL1)1915 TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
1916   const int kNumKeys = 3000;
1917   const int kWindowSize = 100;
1918   const int kNumDelsTrigger = 90;
1919 
1920   Options opts = CurrentOptions();
1921   opts.table_properties_collector_factories.emplace_back(
1922       NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1923   opts.compaction_style = kCompactionStyleUniversal;
1924   opts.level0_file_num_compaction_trigger = 2;
1925   opts.compression = kNoCompression;
1926   opts.compaction_options_universal.size_ratio = 10;
1927   opts.compaction_options_universal.min_merge_width = 2;
1928   opts.compaction_options_universal.max_size_amplification_percent = 200;
1929   Reopen(opts);
1930 
1931   // add an L1 file to prevent tombstones from dropping due to obsolescence
1932   // during flush
1933   int i;
1934   for (i = 0; i < 2000; ++i) {
1935     Put(Key(i), "val");
1936   }
1937   Flush();
1938   //  MoveFilesToLevel(6);
1939   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1940 
1941   for (i = 1999; i < kNumKeys; ++i) {
1942     if (i >= kNumKeys - kWindowSize &&
1943         i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1944       Delete(Key(i));
1945     } else {
1946       Put(Key(i), "val");
1947     }
1948   }
1949   Flush();
1950 
1951   dbfull()->TEST_WaitForCompact();
1952   ASSERT_EQ(0, NumTableFilesAtLevel(0));
1953   ASSERT_GT(NumTableFilesAtLevel(6), 0);
1954 }
1955 
TEST_F(DBTestUniversalCompaction2,SingleLevel)1956 TEST_F(DBTestUniversalCompaction2, SingleLevel) {
1957   const int kNumKeys = 3000;
1958   const int kWindowSize = 100;
1959   const int kNumDelsTrigger = 90;
1960 
1961   Options opts = CurrentOptions();
1962   opts.table_properties_collector_factories.emplace_back(
1963       NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
1964   opts.compaction_style = kCompactionStyleUniversal;
1965   opts.level0_file_num_compaction_trigger = 2;
1966   opts.compression = kNoCompression;
1967   opts.num_levels = 1;
1968   opts.compaction_options_universal.size_ratio = 10;
1969   opts.compaction_options_universal.min_merge_width = 2;
1970   opts.compaction_options_universal.max_size_amplification_percent = 200;
1971   Reopen(opts);
1972 
1973   // add an L1 file to prevent tombstones from dropping due to obsolescence
1974   // during flush
1975   int i;
1976   for (i = 0; i < 2000; ++i) {
1977     Put(Key(i), "val");
1978   }
1979   Flush();
1980 
1981   for (i = 1999; i < kNumKeys; ++i) {
1982     if (i >= kNumKeys - kWindowSize &&
1983         i < kNumKeys - kWindowSize + kNumDelsTrigger) {
1984       Delete(Key(i));
1985     } else {
1986       Put(Key(i), "val");
1987     }
1988   }
1989   Flush();
1990 
1991   dbfull()->TEST_WaitForCompact();
1992   ASSERT_EQ(1, NumTableFilesAtLevel(0));
1993 }
1994 
TEST_F(DBTestUniversalCompaction2,MultipleLevels)1995 TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
1996   const int kWindowSize = 100;
1997   const int kNumDelsTrigger = 90;
1998 
1999   Options opts = CurrentOptions();
2000   opts.table_properties_collector_factories.emplace_back(
2001       NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2002   opts.compaction_style = kCompactionStyleUniversal;
2003   opts.level0_file_num_compaction_trigger = 4;
2004   opts.compression = kNoCompression;
2005   opts.compaction_options_universal.size_ratio = 10;
2006   opts.compaction_options_universal.min_merge_width = 2;
2007   opts.compaction_options_universal.max_size_amplification_percent = 200;
2008   Reopen(opts);
2009 
2010   // add an L1 file to prevent tombstones from dropping due to obsolescence
2011   // during flush
2012   int i;
2013   for (i = 0; i < 500; ++i) {
2014     Put(Key(i), "val");
2015   }
2016   Flush();
2017   for (i = 500; i < 1000; ++i) {
2018     Put(Key(i), "val");
2019   }
2020   Flush();
2021   for (i = 1000; i < 1500; ++i) {
2022     Put(Key(i), "val");
2023   }
2024   Flush();
2025   for (i = 1500; i < 2000; ++i) {
2026     Put(Key(i), "val");
2027   }
2028   Flush();
2029 
2030   dbfull()->TEST_WaitForCompact();
2031   ASSERT_EQ(0, NumTableFilesAtLevel(0));
2032   ASSERT_GT(NumTableFilesAtLevel(6), 0);
2033 
2034   for (i = 1999; i < 2333; ++i) {
2035     Put(Key(i), "val");
2036   }
2037   Flush();
2038   for (i = 2333; i < 2666; ++i) {
2039     Put(Key(i), "val");
2040   }
2041   Flush();
2042   for (i = 2666; i < 2999; ++i) {
2043     Put(Key(i), "val");
2044   }
2045   Flush();
2046 
2047   dbfull()->TEST_WaitForCompact();
2048   ASSERT_EQ(0, NumTableFilesAtLevel(0));
2049   ASSERT_GT(NumTableFilesAtLevel(6), 0);
2050   ASSERT_GT(NumTableFilesAtLevel(5), 0);
2051 
2052   for (i = 1900; i < 2100; ++i) {
2053     Delete(Key(i));
2054   }
2055   Flush();
2056 
2057   dbfull()->TEST_WaitForCompact();
2058   ASSERT_EQ(0, NumTableFilesAtLevel(0));
2059   ASSERT_EQ(0, NumTableFilesAtLevel(1));
2060   ASSERT_EQ(0, NumTableFilesAtLevel(2));
2061   ASSERT_EQ(0, NumTableFilesAtLevel(3));
2062   ASSERT_EQ(0, NumTableFilesAtLevel(4));
2063   ASSERT_EQ(0, NumTableFilesAtLevel(5));
2064   ASSERT_GT(NumTableFilesAtLevel(6), 0);
2065 }
2066 
TEST_F(DBTestUniversalCompaction2,OverlappingL0)2067 TEST_F(DBTestUniversalCompaction2, OverlappingL0) {
2068   const int kWindowSize = 100;
2069   const int kNumDelsTrigger = 90;
2070 
2071   Options opts = CurrentOptions();
2072   opts.table_properties_collector_factories.emplace_back(
2073       NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2074   opts.compaction_style = kCompactionStyleUniversal;
2075   opts.level0_file_num_compaction_trigger = 5;
2076   opts.compression = kNoCompression;
2077   opts.compaction_options_universal.size_ratio = 10;
2078   opts.compaction_options_universal.min_merge_width = 2;
2079   opts.compaction_options_universal.max_size_amplification_percent = 200;
2080   Reopen(opts);
2081 
2082   // add an L1 file to prevent tombstones from dropping due to obsolescence
2083   // during flush
2084   int i;
2085   for (i = 0; i < 2000; ++i) {
2086     Put(Key(i), "val");
2087   }
2088   Flush();
2089   for (i = 2000; i < 3000; ++i) {
2090     Put(Key(i), "val");
2091   }
2092   Flush();
2093   for (i = 3500; i < 4000; ++i) {
2094     Put(Key(i), "val");
2095   }
2096   Flush();
2097   for (i = 2900; i < 3100; ++i) {
2098     Delete(Key(i));
2099   }
2100   Flush();
2101 
2102   dbfull()->TEST_WaitForCompact();
2103   ASSERT_EQ(2, NumTableFilesAtLevel(0));
2104   ASSERT_GT(NumTableFilesAtLevel(6), 0);
2105 }
2106 
TEST_F(DBTestUniversalCompaction2,IngestBehind)2107 TEST_F(DBTestUniversalCompaction2, IngestBehind) {
2108   const int kNumKeys = 3000;
2109   const int kWindowSize = 100;
2110   const int kNumDelsTrigger = 90;
2111 
2112   Options opts = CurrentOptions();
2113   opts.table_properties_collector_factories.emplace_back(
2114       NewCompactOnDeletionCollectorFactory(kWindowSize, kNumDelsTrigger));
2115   opts.compaction_style = kCompactionStyleUniversal;
2116   opts.level0_file_num_compaction_trigger = 2;
2117   opts.compression = kNoCompression;
2118   opts.allow_ingest_behind = true;
2119   opts.compaction_options_universal.size_ratio = 10;
2120   opts.compaction_options_universal.min_merge_width = 2;
2121   opts.compaction_options_universal.max_size_amplification_percent = 200;
2122   Reopen(opts);
2123 
2124   // add an L1 file to prevent tombstones from dropping due to obsolescence
2125   // during flush
2126   int i;
2127   for (i = 0; i < 2000; ++i) {
2128     Put(Key(i), "val");
2129   }
2130   Flush();
2131   //  MoveFilesToLevel(6);
2132   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2133 
2134   for (i = 1999; i < kNumKeys; ++i) {
2135     if (i >= kNumKeys - kWindowSize &&
2136         i < kNumKeys - kWindowSize + kNumDelsTrigger) {
2137       Delete(Key(i));
2138     } else {
2139       Put(Key(i), "val");
2140     }
2141   }
2142   Flush();
2143 
2144   dbfull()->TEST_WaitForCompact();
2145   ASSERT_EQ(0, NumTableFilesAtLevel(0));
2146   ASSERT_EQ(0, NumTableFilesAtLevel(6));
2147   ASSERT_GT(NumTableFilesAtLevel(5), 0);
2148 }
2149 
TEST_F(DBTestUniversalCompaction2,PeriodicCompactionDefault)2150 TEST_F(DBTestUniversalCompaction2, PeriodicCompactionDefault) {
2151   Options options;
2152   options.compaction_style = kCompactionStyleUniversal;
2153 
2154   KeepFilterFactory* filter = new KeepFilterFactory(true);
2155   options.compaction_filter_factory.reset(filter);
2156   Reopen(options);
2157   ASSERT_EQ(30 * 24 * 60 * 60,
2158             dbfull()->GetOptions().periodic_compaction_seconds);
2159 
2160   KeepFilter df;
2161   options.compaction_filter_factory.reset();
2162   options.compaction_filter = &df;
2163   Reopen(options);
2164   ASSERT_EQ(30 * 24 * 60 * 60,
2165             dbfull()->GetOptions().periodic_compaction_seconds);
2166 
2167   options.ttl = 60 * 24 * 60 * 60;
2168   options.compaction_filter = nullptr;
2169   Reopen(options);
2170   ASSERT_EQ(60 * 24 * 60 * 60,
2171             dbfull()->GetOptions().periodic_compaction_seconds);
2172 }
2173 
TEST_F(DBTestUniversalCompaction2,PeriodicCompaction)2174 TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
2175   Options opts = CurrentOptions();
2176   opts.env = env_;
2177   opts.compaction_style = kCompactionStyleUniversal;
2178   opts.level0_file_num_compaction_trigger = 10;
2179   opts.max_open_files = -1;
2180   opts.compaction_options_universal.size_ratio = 10;
2181   opts.compaction_options_universal.min_merge_width = 2;
2182   opts.compaction_options_universal.max_size_amplification_percent = 200;
2183   opts.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
2184   opts.num_levels = 5;
2185   env_->addon_time_.store(0);
2186   Reopen(opts);
2187 
2188   int periodic_compactions = 0;
2189   int start_level = -1;
2190   int output_level = -1;
2191   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2192       "UniversalCompactionPicker::PickPeriodicCompaction:Return",
2193       [&](void* arg) {
2194         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
2195         ASSERT_TRUE(arg != nullptr);
2196         ASSERT_TRUE(compaction->compaction_reason() ==
2197                     CompactionReason::kPeriodicCompaction);
2198         start_level = compaction->start_level();
2199         output_level = compaction->output_level();
2200         periodic_compactions++;
2201       });
2202   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2203 
2204   // Case 1: Oldest flushed file excceeds periodic compaction threshold.
2205   ASSERT_OK(Put("foo", "bar"));
2206   Flush();
2207   ASSERT_EQ(0, periodic_compactions);
2208   // Move clock forward so that the flushed file would qualify periodic
2209   // compaction.
2210   env_->addon_time_.store(48 * 60 * 60 + 100);
2211 
2212   // Another flush would trigger compaction the oldest file.
2213   ASSERT_OK(Put("foo", "bar2"));
2214   Flush();
2215   dbfull()->TEST_WaitForCompact();
2216 
2217   ASSERT_EQ(1, periodic_compactions);
2218   ASSERT_EQ(0, start_level);
2219   ASSERT_EQ(4, output_level);
2220 
2221   // Case 2: Oldest compacted file excceeds periodic compaction threshold
2222   periodic_compactions = 0;
2223   // A flush doesn't trigger a periodic compaction when threshold not hit
2224   ASSERT_OK(Put("foo", "bar2"));
2225   Flush();
2226   dbfull()->TEST_WaitForCompact();
2227   ASSERT_EQ(0, periodic_compactions);
2228 
2229   // After periodic compaction threshold hits, a flush will trigger
2230   // a compaction
2231   ASSERT_OK(Put("foo", "bar2"));
2232   env_->addon_time_.fetch_add(48 * 60 * 60 + 100);
2233   Flush();
2234   dbfull()->TEST_WaitForCompact();
2235   ASSERT_EQ(1, periodic_compactions);
2236   ASSERT_EQ(0, start_level);
2237   ASSERT_EQ(4, output_level);
2238 }
2239 
2240 }  // namespace ROCKSDB_NAMESPACE
2241 
2242 #endif  // !defined(ROCKSDB_LITE)
2243 
main(int argc,char ** argv)2244 int main(int argc, char** argv) {
2245 #if !defined(ROCKSDB_LITE)
2246   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2247   ::testing::InitGoogleTest(&argc, argv);
2248   return RUN_ALL_TESTS();
2249 #else
2250   (void) argc;
2251   (void) argv;
2252   return 0;
2253 #endif
2254 }
2255