1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/concurrent_task_limiter.h"
14 #include "rocksdb/experimental.h"
15 #include "rocksdb/sst_file_writer.h"
16 #include "rocksdb/utilities/convenience.h"
17 #include "test_util/fault_injection_test_env.h"
18 #include "test_util/sync_point.h"
19 #include "util/concurrent_task_limiter_impl.h"
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 // SYNC_POINT is not supported in released Windows mode.
24 #if !defined(ROCKSDB_LITE)
25 
26 class DBCompactionTest : public DBTestBase {
27  public:
DBCompactionTest()28   DBCompactionTest() : DBTestBase("/db_compaction_test") {}
29 };
30 
31 class DBCompactionTestWithParam
32     : public DBTestBase,
33       public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
34  public:
DBCompactionTestWithParam()35   DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
36     max_subcompactions_ = std::get<0>(GetParam());
37     exclusive_manual_compaction_ = std::get<1>(GetParam());
38   }
39 
40   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()41   static void SetUpTestCase() {}
TearDownTestCase()42   static void TearDownTestCase() {}
43 
44   uint32_t max_subcompactions_;
45   bool exclusive_manual_compaction_;
46 };
47 
48 class DBCompactionDirectIOTest : public DBCompactionTest,
49                                  public ::testing::WithParamInterface<bool> {
50  public:
DBCompactionDirectIOTest()51   DBCompactionDirectIOTest() : DBCompactionTest() {}
52 };
53 
54 namespace {
55 
56 class FlushedFileCollector : public EventListener {
57  public:
FlushedFileCollector()58   FlushedFileCollector() {}
~FlushedFileCollector()59   ~FlushedFileCollector() override {}
60 
OnFlushCompleted(DB *,const FlushJobInfo & info)61   void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
62     std::lock_guard<std::mutex> lock(mutex_);
63     flushed_files_.push_back(info.file_path);
64   }
65 
GetFlushedFiles()66   std::vector<std::string> GetFlushedFiles() {
67     std::lock_guard<std::mutex> lock(mutex_);
68     std::vector<std::string> result;
69     for (auto fname : flushed_files_) {
70       result.push_back(fname);
71     }
72     return result;
73   }
74 
ClearFlushedFiles()75   void ClearFlushedFiles() { flushed_files_.clear(); }
76 
77  private:
78   std::vector<std::string> flushed_files_;
79   std::mutex mutex_;
80 };
81 
82 class CompactionStatsCollector : public EventListener {
83 public:
CompactionStatsCollector()84   CompactionStatsCollector()
85       : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
86     for (auto& v : compaction_completed_) {
87       v.store(0);
88     }
89   }
90 
~CompactionStatsCollector()91   ~CompactionStatsCollector() override {}
92 
OnCompactionCompleted(DB *,const CompactionJobInfo & info)93   void OnCompactionCompleted(DB* /* db */,
94                              const CompactionJobInfo& info) override {
95     int k = static_cast<int>(info.compaction_reason);
96     int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
97     assert(k >= 0 && k < num_of_reasons);
98     compaction_completed_[k]++;
99   }
100 
OnExternalFileIngested(DB *,const ExternalFileIngestionInfo &)101   void OnExternalFileIngested(
102       DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
103     int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
104     compaction_completed_[k]++;
105   }
106 
OnFlushCompleted(DB *,const FlushJobInfo &)107   void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
108     int k = static_cast<int>(CompactionReason::kFlush);
109     compaction_completed_[k]++;
110   }
111 
NumberOfCompactions(CompactionReason reason) const112   int NumberOfCompactions(CompactionReason reason) const {
113     int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
114     int k = static_cast<int>(reason);
115     assert(k >= 0 && k < num_of_reasons);
116     return compaction_completed_.at(k).load();
117   }
118 
119 private:
120   std::vector<std::atomic<int>> compaction_completed_;
121 };
122 
123 class SstStatsCollector : public EventListener {
124  public:
SstStatsCollector()125   SstStatsCollector() : num_ssts_creation_started_(0) {}
126 
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)127   void OnTableFileCreationStarted(
128       const TableFileCreationBriefInfo& /* info */) override {
129     ++num_ssts_creation_started_;
130   }
131 
num_ssts_creation_started()132   int num_ssts_creation_started() { return num_ssts_creation_started_; }
133 
134  private:
135   std::atomic<int> num_ssts_creation_started_;
136 };
137 
138 static const int kCDTValueSize = 1000;
139 static const int kCDTKeysPerBuffer = 4;
140 static const int kCDTNumLevels = 8;
DeletionTriggerOptions(Options options)141 Options DeletionTriggerOptions(Options options) {
142   options.compression = kNoCompression;
143   options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
144   options.min_write_buffer_number_to_merge = 1;
145   options.max_write_buffer_size_to_maintain = 0;
146   options.num_levels = kCDTNumLevels;
147   options.level0_file_num_compaction_trigger = 1;
148   options.target_file_size_base = options.write_buffer_size * 2;
149   options.target_file_size_multiplier = 2;
150   options.max_bytes_for_level_base =
151       options.target_file_size_base * options.target_file_size_multiplier;
152   options.max_bytes_for_level_multiplier = 2;
153   options.disable_auto_compactions = false;
154   return options;
155 }
156 
HaveOverlappingKeyRanges(const Comparator * c,const SstFileMetaData & a,const SstFileMetaData & b)157 bool HaveOverlappingKeyRanges(
158     const Comparator* c,
159     const SstFileMetaData& a, const SstFileMetaData& b) {
160   if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
161     if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
162       // b.smallestkey <= a.smallestkey <= b.largestkey
163       return true;
164     }
165   } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
166     // a.smallestkey < b.smallestkey <= a.largestkey
167     return true;
168   }
169   if (c->Compare(a.largestkey, b.largestkey) <= 0) {
170     if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
171       // b.smallestkey <= a.largestkey <= b.largestkey
172       return true;
173     }
174   } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
175     // a.smallestkey <= b.largestkey < a.largestkey
176     return true;
177   }
178   return false;
179 }
180 
181 // Identifies all files between level "min_level" and "max_level"
182 // which has overlapping key range with "input_file_meta".
GetOverlappingFileNumbersForLevelCompaction(const ColumnFamilyMetaData & cf_meta,const Comparator * comparator,int min_level,int max_level,const SstFileMetaData * input_file_meta,std::set<std::string> * overlapping_file_names)183 void GetOverlappingFileNumbersForLevelCompaction(
184     const ColumnFamilyMetaData& cf_meta,
185     const Comparator* comparator,
186     int min_level, int max_level,
187     const SstFileMetaData* input_file_meta,
188     std::set<std::string>* overlapping_file_names) {
189   std::set<const SstFileMetaData*> overlapping_files;
190   overlapping_files.insert(input_file_meta);
191   for (int m = min_level; m <= max_level; ++m) {
192     for (auto& file : cf_meta.levels[m].files) {
193       for (auto* included_file : overlapping_files) {
194         if (HaveOverlappingKeyRanges(
195                 comparator, *included_file, file)) {
196           overlapping_files.insert(&file);
197           overlapping_file_names->insert(file.name);
198           break;
199         }
200       }
201     }
202   }
203 }
204 
VerifyCompactionResult(const ColumnFamilyMetaData & cf_meta,const std::set<std::string> & overlapping_file_numbers)205 void VerifyCompactionResult(
206     const ColumnFamilyMetaData& cf_meta,
207     const std::set<std::string>& overlapping_file_numbers) {
208 #ifndef NDEBUG
209   for (auto& level : cf_meta.levels) {
210     for (auto& file : level.files) {
211       assert(overlapping_file_numbers.find(file.name) ==
212              overlapping_file_numbers.end());
213     }
214   }
215 #endif
216 }
217 
218 /*
219  * Verifies compaction stats of cfd are valid.
220  *
221  * For each level of cfd, its compaction stats are valid if
222  * 1) sum(stat.counts) == stat.count, and
223  * 2) stat.counts[i] == collector.NumberOfCompactions(i)
224  */
VerifyCompactionStats(ColumnFamilyData & cfd,const CompactionStatsCollector & collector)225 void VerifyCompactionStats(ColumnFamilyData& cfd,
226     const CompactionStatsCollector& collector) {
227 #ifndef NDEBUG
228   InternalStats* internal_stats_ptr = cfd.internal_stats();
229   ASSERT_TRUE(internal_stats_ptr != nullptr);
230   const std::vector<InternalStats::CompactionStats>& comp_stats =
231       internal_stats_ptr->TEST_GetCompactionStats();
232   const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
233   std::vector<int> counts(num_of_reasons, 0);
234   // Count the number of compactions caused by each CompactionReason across
235   // all levels.
236   for (const auto& stat : comp_stats) {
237     int sum = 0;
238     for (int i = 0; i < num_of_reasons; i++) {
239       counts[i] += stat.counts[i];
240       sum += stat.counts[i];
241     }
242     ASSERT_EQ(sum, stat.count);
243   }
244   // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
245   // assuming that all compactions complete.
246   for (int i = 0; i < num_of_reasons; i++) {
247     ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
248   }
249 #endif /* NDEBUG */
250 }
251 
PickFileRandomly(const ColumnFamilyMetaData & cf_meta,Random * rand,int * level=nullptr)252 const SstFileMetaData* PickFileRandomly(
253     const ColumnFamilyMetaData& cf_meta,
254     Random* rand,
255     int* level = nullptr) {
256   auto file_id = rand->Uniform(static_cast<int>(
257       cf_meta.file_count)) + 1;
258   for (auto& level_meta : cf_meta.levels) {
259     if (file_id <= level_meta.files.size()) {
260       if (level != nullptr) {
261         *level = level_meta.level;
262       }
263       auto result = rand->Uniform(file_id);
264       return &(level_meta.files[result]);
265     }
266     file_id -= static_cast<uint32_t>(level_meta.files.size());
267   }
268   assert(false);
269   return nullptr;
270 }
271 }  // anonymous namespace
272 
273 #ifndef ROCKSDB_VALGRIND_RUN
274 // All the TEST_P tests run once with sub_compactions disabled (i.e.
275 // options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam,CompactionDeletionTrigger)276 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
277   for (int tid = 0; tid < 3; ++tid) {
278     uint64_t db_size[2];
279     Options options = DeletionTriggerOptions(CurrentOptions());
280     options.max_subcompactions = max_subcompactions_;
281 
282     if (tid == 1) {
283       // the following only disable stats update in DB::Open()
284       // and should not affect the result of this test.
285       options.skip_stats_update_on_db_open = true;
286     } else if (tid == 2) {
287       // third pass with universal compaction
288       options.compaction_style = kCompactionStyleUniversal;
289       options.num_levels = 1;
290     }
291 
292     DestroyAndReopen(options);
293     Random rnd(301);
294 
295     const int kTestSize = kCDTKeysPerBuffer * 1024;
296     std::vector<std::string> values;
297     for (int k = 0; k < kTestSize; ++k) {
298       values.push_back(RandomString(&rnd, kCDTValueSize));
299       ASSERT_OK(Put(Key(k), values[k]));
300     }
301     dbfull()->TEST_WaitForFlushMemTable();
302     dbfull()->TEST_WaitForCompact();
303     db_size[0] = Size(Key(0), Key(kTestSize - 1));
304 
305     for (int k = 0; k < kTestSize; ++k) {
306       ASSERT_OK(Delete(Key(k)));
307     }
308     dbfull()->TEST_WaitForFlushMemTable();
309     dbfull()->TEST_WaitForCompact();
310     db_size[1] = Size(Key(0), Key(kTestSize - 1));
311 
312     // must have much smaller db size.
313     ASSERT_GT(db_size[0] / 3, db_size[1]);
314   }
315 }
316 #endif  // ROCKSDB_VALGRIND_RUN
317 
TEST_P(DBCompactionTestWithParam,CompactionsPreserveDeletes)318 TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
319   //  For each options type we test following
320   //  - Enable preserve_deletes
321   //  - write bunch of keys and deletes
322   //  - Set start_seqnum to the beginning; compact; check that keys are present
323   //  - rewind start_seqnum way forward; compact; check that keys are gone
324 
325   for (int tid = 0; tid < 3; ++tid) {
326     Options options = DeletionTriggerOptions(CurrentOptions());
327     options.max_subcompactions = max_subcompactions_;
328     options.preserve_deletes=true;
329     options.num_levels = 2;
330 
331     if (tid == 1) {
332       options.skip_stats_update_on_db_open = true;
333     } else if (tid == 2) {
334       // third pass with universal compaction
335       options.compaction_style = kCompactionStyleUniversal;
336     }
337 
338     DestroyAndReopen(options);
339     Random rnd(301);
340     // highlight the default; all deletes should be preserved
341     SetPreserveDeletesSequenceNumber(0);
342 
343     const int kTestSize = kCDTKeysPerBuffer;
344     std::vector<std::string> values;
345     for (int k = 0; k < kTestSize; ++k) {
346       values.push_back(RandomString(&rnd, kCDTValueSize));
347       ASSERT_OK(Put(Key(k), values[k]));
348     }
349 
350     for (int k = 0; k < kTestSize; ++k) {
351       ASSERT_OK(Delete(Key(k)));
352     }
353     // to ensure we tackle all tombstones
354     CompactRangeOptions cro;
355     cro.change_level = true;
356     cro.target_level = 2;
357     cro.bottommost_level_compaction =
358         BottommostLevelCompaction::kForceOptimized;
359 
360     dbfull()->TEST_WaitForFlushMemTable();
361     dbfull()->CompactRange(cro, nullptr, nullptr);
362 
363     // check that normal user iterator doesn't see anything
364     Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
365     int i = 0;
366     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
367       i++;
368     }
369     ASSERT_EQ(i, 0);
370     delete db_iter;
371 
372     // check that iterator that sees internal keys sees tombstones
373     ReadOptions ro;
374     ro.iter_start_seqnum=1;
375     db_iter = dbfull()->NewIterator(ro);
376     i = 0;
377     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
378       i++;
379     }
380     ASSERT_EQ(i, 4);
381     delete db_iter;
382 
383     // now all deletes should be gone
384     SetPreserveDeletesSequenceNumber(100000000);
385     dbfull()->CompactRange(cro, nullptr, nullptr);
386 
387     db_iter = dbfull()->NewIterator(ro);
388     i = 0;
389     for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
390       i++;
391     }
392     ASSERT_EQ(i, 0);
393     delete db_iter;
394   }
395 }
396 
TEST_F(DBCompactionTest,SkipStatsUpdateTest)397 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
398   // This test verify UpdateAccumulatedStats is not on
399   // if options.skip_stats_update_on_db_open = true
400   // The test will need to be updated if the internal behavior changes.
401 
402   Options options = DeletionTriggerOptions(CurrentOptions());
403   options.disable_auto_compactions = true;
404   options.env = env_;
405   DestroyAndReopen(options);
406   Random rnd(301);
407 
408   const int kTestSize = kCDTKeysPerBuffer * 512;
409   std::vector<std::string> values;
410   for (int k = 0; k < kTestSize; ++k) {
411     values.push_back(RandomString(&rnd, kCDTValueSize));
412     ASSERT_OK(Put(Key(k), values[k]));
413   }
414 
415   ASSERT_OK(Flush());
416 
417   Close();
418 
419   int update_acc_stats_called = 0;
420   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
421       "VersionStorageInfo::UpdateAccumulatedStats",
422       [&](void* /* arg */) { ++update_acc_stats_called; });
423   SyncPoint::GetInstance()->EnableProcessing();
424 
425   // Reopen the DB with stats-update disabled
426   options.skip_stats_update_on_db_open = true;
427   options.max_open_files = 20;
428   Reopen(options);
429 
430   ASSERT_EQ(update_acc_stats_called, 0);
431 
432   // Repeat the reopen process, but this time we enable
433   // stats-update.
434   options.skip_stats_update_on_db_open = false;
435   Reopen(options);
436 
437   ASSERT_GT(update_acc_stats_called, 0);
438 
439   SyncPoint::GetInstance()->ClearAllCallBacks();
440   SyncPoint::GetInstance()->DisableProcessing();
441 }
442 
TEST_F(DBCompactionTest,TestTableReaderForCompaction)443 TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
444   Options options = CurrentOptions();
445   options.env = env_;
446   options.new_table_reader_for_compaction_inputs = true;
447   options.max_open_files = 20;
448   options.level0_file_num_compaction_trigger = 3;
449   DestroyAndReopen(options);
450   Random rnd(301);
451 
452   int num_table_cache_lookup = 0;
453   int num_new_table_reader = 0;
454   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
455       "TableCache::FindTable:0", [&](void* arg) {
456         assert(arg != nullptr);
457         bool no_io = *(reinterpret_cast<bool*>(arg));
458         if (!no_io) {
459           // filter out cases for table properties queries.
460           num_table_cache_lookup++;
461         }
462       });
463   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464       "TableCache::GetTableReader:0",
465       [&](void* /*arg*/) { num_new_table_reader++; });
466   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
467 
468   for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
469     ASSERT_OK(Put(Key(k), Key(k)));
470     ASSERT_OK(Put(Key(10 - k), "bar"));
471     if (k < options.level0_file_num_compaction_trigger - 1) {
472       num_table_cache_lookup = 0;
473       Flush();
474       dbfull()->TEST_WaitForCompact();
475       // preloading iterator issues one table cache lookup and create
476       // a new table reader, if not preloaded.
477       int old_num_table_cache_lookup = num_table_cache_lookup;
478       ASSERT_GE(num_table_cache_lookup, 1);
479       ASSERT_EQ(num_new_table_reader, 1);
480 
481       num_table_cache_lookup = 0;
482       num_new_table_reader = 0;
483       ASSERT_EQ(Key(k), Get(Key(k)));
484       // lookup iterator from table cache and no need to create a new one.
485       ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
486       ASSERT_EQ(num_new_table_reader, 0);
487     }
488   }
489 
490   num_table_cache_lookup = 0;
491   num_new_table_reader = 0;
492   Flush();
493   dbfull()->TEST_WaitForCompact();
494   // Preloading iterator issues one table cache lookup and creates
495   // a new table reader. One file is created for flush and one for compaction.
496   // Compaction inputs make no table cache look-up for data/range deletion
497   // iterators
498   // May preload table cache too.
499   ASSERT_GE(num_table_cache_lookup, 2);
500   int old_num_table_cache_lookup2 = num_table_cache_lookup;
501 
502   // Create new iterator for:
503   // (1) 1 for verifying flush results
504   // (2) 1 for verifying compaction results.
505   // (3) New TableReaders will not be created for compaction inputs
506   ASSERT_EQ(num_new_table_reader, 2);
507 
508   num_table_cache_lookup = 0;
509   num_new_table_reader = 0;
510   ASSERT_EQ(Key(1), Get(Key(1)));
511   ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
512   ASSERT_EQ(num_new_table_reader, 0);
513 
514   num_table_cache_lookup = 0;
515   num_new_table_reader = 0;
516   CompactRangeOptions cro;
517   cro.change_level = true;
518   cro.target_level = 2;
519   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
520   db_->CompactRange(cro, nullptr, nullptr);
521   // Only verifying compaction outputs issues one table cache lookup
522   // for both data block and range deletion block).
523   // May preload table cache too.
524   ASSERT_GE(num_table_cache_lookup, 1);
525   old_num_table_cache_lookup2 = num_table_cache_lookup;
526   // One for verifying compaction results.
527   // No new iterator created for compaction.
528   ASSERT_EQ(num_new_table_reader, 1);
529 
530   num_table_cache_lookup = 0;
531   num_new_table_reader = 0;
532   ASSERT_EQ(Key(1), Get(Key(1)));
533   ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
534   ASSERT_EQ(num_new_table_reader, 0);
535 
536   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
537 }
538 
TEST_P(DBCompactionTestWithParam,CompactionDeletionTriggerReopen)539 TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
540   for (int tid = 0; tid < 2; ++tid) {
541     uint64_t db_size[3];
542     Options options = DeletionTriggerOptions(CurrentOptions());
543     options.max_subcompactions = max_subcompactions_;
544 
545     if (tid == 1) {
546       // second pass with universal compaction
547       options.compaction_style = kCompactionStyleUniversal;
548       options.num_levels = 1;
549     }
550 
551     DestroyAndReopen(options);
552     Random rnd(301);
553 
554     // round 1 --- insert key/value pairs.
555     const int kTestSize = kCDTKeysPerBuffer * 512;
556     std::vector<std::string> values;
557     for (int k = 0; k < kTestSize; ++k) {
558       values.push_back(RandomString(&rnd, kCDTValueSize));
559       ASSERT_OK(Put(Key(k), values[k]));
560     }
561     dbfull()->TEST_WaitForFlushMemTable();
562     dbfull()->TEST_WaitForCompact();
563     db_size[0] = Size(Key(0), Key(kTestSize - 1));
564     Close();
565 
566     // round 2 --- disable auto-compactions and issue deletions.
567     options.create_if_missing = false;
568     options.disable_auto_compactions = true;
569     Reopen(options);
570 
571     for (int k = 0; k < kTestSize; ++k) {
572       ASSERT_OK(Delete(Key(k)));
573     }
574     db_size[1] = Size(Key(0), Key(kTestSize - 1));
575     Close();
576     // as auto_compaction is off, we shouldn't see too much reduce
577     // in db size.
578     ASSERT_LT(db_size[0] / 3, db_size[1]);
579 
580     // round 3 --- reopen db with auto_compaction on and see if
581     // deletion compensation still work.
582     options.disable_auto_compactions = false;
583     Reopen(options);
584     // insert relatively small amount of data to trigger auto compaction.
585     for (int k = 0; k < kTestSize / 10; ++k) {
586       ASSERT_OK(Put(Key(k), values[k]));
587     }
588     dbfull()->TEST_WaitForFlushMemTable();
589     dbfull()->TEST_WaitForCompact();
590     db_size[2] = Size(Key(0), Key(kTestSize - 1));
591     // this time we're expecting significant drop in size.
592     ASSERT_GT(db_size[0] / 3, db_size[2]);
593   }
594 }
595 
TEST_F(DBCompactionTest,CompactRangeBottomPri)596 TEST_F(DBCompactionTest, CompactRangeBottomPri) {
597   ASSERT_OK(Put(Key(50), ""));
598   ASSERT_OK(Flush());
599   ASSERT_OK(Put(Key(100), ""));
600   ASSERT_OK(Flush());
601   ASSERT_OK(Put(Key(200), ""));
602   ASSERT_OK(Flush());
603 
604   {
605     CompactRangeOptions cro;
606     cro.change_level = true;
607     cro.target_level = 2;
608     dbfull()->CompactRange(cro, nullptr, nullptr);
609   }
610   ASSERT_EQ("0,0,3", FilesPerLevel(0));
611 
612   ASSERT_OK(Put(Key(1), ""));
613   ASSERT_OK(Put(Key(199), ""));
614   ASSERT_OK(Flush());
615   ASSERT_OK(Put(Key(2), ""));
616   ASSERT_OK(Put(Key(199), ""));
617   ASSERT_OK(Flush());
618   ASSERT_EQ("2,0,3", FilesPerLevel(0));
619 
620   // Now we have 2 L0 files, and 3 L2 files, and a manual compaction will
621   // be triggered.
622   // Two compaction jobs will run. One compacts 2 L0 files in Low Pri Pool
623   // and one compact to L2 in bottom pri pool.
624   int low_pri_count = 0;
625   int bottom_pri_count = 0;
626   SyncPoint::GetInstance()->SetCallBack(
627       "ThreadPoolImpl::Impl::BGThread:BeforeRun", [&](void* arg) {
628         Env::Priority* pri = reinterpret_cast<Env::Priority*>(arg);
629         // First time is low pri pool in the test case.
630         if (low_pri_count == 0 && bottom_pri_count == 0) {
631           ASSERT_EQ(Env::Priority::LOW, *pri);
632         }
633         if (*pri == Env::Priority::LOW) {
634           low_pri_count++;
635         } else {
636           bottom_pri_count++;
637         }
638       });
639   SyncPoint::GetInstance()->EnableProcessing();
640   env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
641   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
642   ASSERT_EQ(1, low_pri_count);
643   ASSERT_EQ(1, bottom_pri_count);
644   ASSERT_EQ("0,0,2", FilesPerLevel(0));
645 
646   // Recompact bottom most level uses bottom pool
647   CompactRangeOptions cro;
648   cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
649   dbfull()->CompactRange(cro, nullptr, nullptr);
650   ASSERT_EQ(1, low_pri_count);
651   ASSERT_EQ(2, bottom_pri_count);
652 
653   env_->SetBackgroundThreads(0, Env::Priority::BOTTOM);
654   dbfull()->CompactRange(cro, nullptr, nullptr);
655   // Low pri pool is used if bottom pool has size 0.
656   ASSERT_EQ(2, low_pri_count);
657   ASSERT_EQ(2, bottom_pri_count);
658 
659   SyncPoint::GetInstance()->DisableProcessing();
660 }
661 
TEST_F(DBCompactionTest,DisableStatsUpdateReopen)662 TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
663   uint64_t db_size[3];
664   for (int test = 0; test < 2; ++test) {
665     Options options = DeletionTriggerOptions(CurrentOptions());
666     options.skip_stats_update_on_db_open = (test == 0);
667 
668     env_->random_read_counter_.Reset();
669     DestroyAndReopen(options);
670     Random rnd(301);
671 
672     // round 1 --- insert key/value pairs.
673     const int kTestSize = kCDTKeysPerBuffer * 512;
674     std::vector<std::string> values;
675     for (int k = 0; k < kTestSize; ++k) {
676       values.push_back(RandomString(&rnd, kCDTValueSize));
677       ASSERT_OK(Put(Key(k), values[k]));
678     }
679     dbfull()->TEST_WaitForFlushMemTable();
680     dbfull()->TEST_WaitForCompact();
681     db_size[0] = Size(Key(0), Key(kTestSize - 1));
682     Close();
683 
684     // round 2 --- disable auto-compactions and issue deletions.
685     options.create_if_missing = false;
686     options.disable_auto_compactions = true;
687 
688     env_->random_read_counter_.Reset();
689     Reopen(options);
690 
691     for (int k = 0; k < kTestSize; ++k) {
692       ASSERT_OK(Delete(Key(k)));
693     }
694     db_size[1] = Size(Key(0), Key(kTestSize - 1));
695     Close();
696     // as auto_compaction is off, we shouldn't see too much reduce
697     // in db size.
698     ASSERT_LT(db_size[0] / 3, db_size[1]);
699 
700     // round 3 --- reopen db with auto_compaction on and see if
701     // deletion compensation still work.
702     options.disable_auto_compactions = false;
703     Reopen(options);
704     dbfull()->TEST_WaitForFlushMemTable();
705     dbfull()->TEST_WaitForCompact();
706     db_size[2] = Size(Key(0), Key(kTestSize - 1));
707 
708     if (options.skip_stats_update_on_db_open) {
709       // If update stats on DB::Open is disable, we don't expect
710       // deletion entries taking effect.
711       ASSERT_LT(db_size[0] / 3, db_size[2]);
712     } else {
713       // Otherwise, we should see a significant drop in db size.
714       ASSERT_GT(db_size[0] / 3, db_size[2]);
715     }
716   }
717 }
718 
719 
TEST_P(DBCompactionTestWithParam,CompactionTrigger)720 TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
721   const int kNumKeysPerFile = 100;
722 
723   Options options = CurrentOptions();
724   options.write_buffer_size = 110 << 10;  // 110KB
725   options.arena_block_size = 4 << 10;
726   options.num_levels = 3;
727   options.level0_file_num_compaction_trigger = 3;
728   options.max_subcompactions = max_subcompactions_;
729   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
730   CreateAndReopenWithCF({"pikachu"}, options);
731 
732   Random rnd(301);
733 
734   for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
735        num++) {
736     std::vector<std::string> values;
737     // Write 100KB (100 values, each 1K)
738     for (int i = 0; i < kNumKeysPerFile; i++) {
739       values.push_back(RandomString(&rnd, 990));
740       ASSERT_OK(Put(1, Key(i), values[i]));
741     }
742     // put extra key to trigger flush
743     ASSERT_OK(Put(1, "", ""));
744     dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
745     ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
746   }
747 
748   // generate one more file in level-0, and should trigger level-0 compaction
749   std::vector<std::string> values;
750   for (int i = 0; i < kNumKeysPerFile; i++) {
751     values.push_back(RandomString(&rnd, 990));
752     ASSERT_OK(Put(1, Key(i), values[i]));
753   }
754   // put extra key to trigger flush
755   ASSERT_OK(Put(1, "", ""));
756   dbfull()->TEST_WaitForCompact();
757 
758   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
759   ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
760 }
761 
TEST_F(DBCompactionTest,BGCompactionsAllowed)762 TEST_F(DBCompactionTest, BGCompactionsAllowed) {
763   // Create several column families. Make compaction triggers in all of them
764   // and see number of compactions scheduled to be less than allowed.
765   const int kNumKeysPerFile = 100;
766 
767   Options options = CurrentOptions();
768   options.write_buffer_size = 110 << 10;  // 110KB
769   options.arena_block_size = 4 << 10;
770   options.num_levels = 3;
771   // Should speed up compaction when there are 4 files.
772   options.level0_file_num_compaction_trigger = 2;
773   options.level0_slowdown_writes_trigger = 20;
774   options.soft_pending_compaction_bytes_limit = 1 << 30;  // Infinitely large
775   options.max_background_compactions = 3;
776   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
777 
778   // Block all threads in thread pool.
779   const size_t kTotalTasks = 4;
780   env_->SetBackgroundThreads(4, Env::LOW);
781   test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
782   for (size_t i = 0; i < kTotalTasks; i++) {
783     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
784                    &sleeping_tasks[i], Env::Priority::LOW);
785     sleeping_tasks[i].WaitUntilSleeping();
786   }
787 
788   CreateAndReopenWithCF({"one", "two", "three"}, options);
789 
790   Random rnd(301);
791   for (int cf = 0; cf < 4; cf++) {
792     for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
793       for (int i = 0; i < kNumKeysPerFile; i++) {
794         ASSERT_OK(Put(cf, Key(i), ""));
795       }
796       // put extra key to trigger flush
797       ASSERT_OK(Put(cf, "", ""));
798       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
799       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
800     }
801   }
802 
803   // Now all column families qualify compaction but only one should be
804   // scheduled, because no column family hits speed up condition.
805   ASSERT_EQ(1u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
806 
807   // Create two more files for one column family, which triggers speed up
808   // condition, three compactions will be scheduled.
809   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
810     for (int i = 0; i < kNumKeysPerFile; i++) {
811       ASSERT_OK(Put(2, Key(i), ""));
812     }
813     // put extra key to trigger flush
814     ASSERT_OK(Put(2, "", ""));
815     dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
816     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
817               NumTableFilesAtLevel(0, 2));
818   }
819   ASSERT_EQ(3U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
820 
821   // Unblock all threads to unblock all compactions.
822   for (size_t i = 0; i < kTotalTasks; i++) {
823     sleeping_tasks[i].WakeUp();
824     sleeping_tasks[i].WaitUntilDone();
825   }
826   dbfull()->TEST_WaitForCompact();
827 
828   // Verify number of compactions allowed will come back to 1.
829 
830   for (size_t i = 0; i < kTotalTasks; i++) {
831     sleeping_tasks[i].Reset();
832     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
833                    &sleeping_tasks[i], Env::Priority::LOW);
834     sleeping_tasks[i].WaitUntilSleeping();
835   }
836   for (int cf = 0; cf < 4; cf++) {
837     for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
838       for (int i = 0; i < kNumKeysPerFile; i++) {
839         ASSERT_OK(Put(cf, Key(i), ""));
840       }
841       // put extra key to trigger flush
842       ASSERT_OK(Put(cf, "", ""));
843       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
844       ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
845     }
846   }
847 
848   // Now all column families qualify compaction but only one should be
849   // scheduled, because no column family hits speed up condition.
850   ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
851 
852   for (size_t i = 0; i < kTotalTasks; i++) {
853     sleeping_tasks[i].WakeUp();
854     sleeping_tasks[i].WaitUntilDone();
855   }
856 }
857 
TEST_P(DBCompactionTestWithParam,CompactionsGenerateMultipleFiles)858 TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
859   Options options = CurrentOptions();
860   options.write_buffer_size = 100000000;        // Large write buffer
861   options.max_subcompactions = max_subcompactions_;
862   CreateAndReopenWithCF({"pikachu"}, options);
863 
864   Random rnd(301);
865 
866   // Write 8MB (80 values, each 100K)
867   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
868   std::vector<std::string> values;
869   for (int i = 0; i < 80; i++) {
870     values.push_back(RandomString(&rnd, 100000));
871     ASSERT_OK(Put(1, Key(i), values[i]));
872   }
873 
874   // Reopening moves updates to level-0
875   ReopenWithColumnFamilies({"default", "pikachu"}, options);
876   dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
877                               true /* disallow trivial move */);
878 
879   ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
880   ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
881   for (int i = 0; i < 80; i++) {
882     ASSERT_EQ(Get(1, Key(i)), values[i]);
883   }
884 }
885 
TEST_F(DBCompactionTest,MinorCompactionsHappen)886 TEST_F(DBCompactionTest, MinorCompactionsHappen) {
887   do {
888     Options options = CurrentOptions();
889     options.write_buffer_size = 10000;
890     CreateAndReopenWithCF({"pikachu"}, options);
891 
892     const int N = 500;
893 
894     int starting_num_tables = TotalTableFiles(1);
895     for (int i = 0; i < N; i++) {
896       ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v')));
897     }
898     int ending_num_tables = TotalTableFiles(1);
899     ASSERT_GT(ending_num_tables, starting_num_tables);
900 
901     for (int i = 0; i < N; i++) {
902       ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
903     }
904 
905     ReopenWithColumnFamilies({"default", "pikachu"}, options);
906 
907     for (int i = 0; i < N; i++) {
908       ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
909     }
910   } while (ChangeCompactOptions());
911 }
912 
TEST_F(DBCompactionTest,UserKeyCrossFile1)913 TEST_F(DBCompactionTest, UserKeyCrossFile1) {
914   Options options = CurrentOptions();
915   options.compaction_style = kCompactionStyleLevel;
916   options.level0_file_num_compaction_trigger = 3;
917 
918   DestroyAndReopen(options);
919 
920   // create first file and flush to l0
921   Put("4", "A");
922   Put("3", "A");
923   Flush();
924   dbfull()->TEST_WaitForFlushMemTable();
925 
926   Put("2", "A");
927   Delete("3");
928   Flush();
929   dbfull()->TEST_WaitForFlushMemTable();
930   ASSERT_EQ("NOT_FOUND", Get("3"));
931 
932   // move both files down to l1
933   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
934   ASSERT_EQ("NOT_FOUND", Get("3"));
935 
936   for (int i = 0; i < 3; i++) {
937     Put("2", "B");
938     Flush();
939     dbfull()->TEST_WaitForFlushMemTable();
940   }
941   dbfull()->TEST_WaitForCompact();
942 
943   ASSERT_EQ("NOT_FOUND", Get("3"));
944 }
945 
TEST_F(DBCompactionTest,UserKeyCrossFile2)946 TEST_F(DBCompactionTest, UserKeyCrossFile2) {
947   Options options = CurrentOptions();
948   options.compaction_style = kCompactionStyleLevel;
949   options.level0_file_num_compaction_trigger = 3;
950 
951   DestroyAndReopen(options);
952 
953   // create first file and flush to l0
954   Put("4", "A");
955   Put("3", "A");
956   Flush();
957   dbfull()->TEST_WaitForFlushMemTable();
958 
959   Put("2", "A");
960   SingleDelete("3");
961   Flush();
962   dbfull()->TEST_WaitForFlushMemTable();
963   ASSERT_EQ("NOT_FOUND", Get("3"));
964 
965   // move both files down to l1
966   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
967   ASSERT_EQ("NOT_FOUND", Get("3"));
968 
969   for (int i = 0; i < 3; i++) {
970     Put("2", "B");
971     Flush();
972     dbfull()->TEST_WaitForFlushMemTable();
973   }
974   dbfull()->TEST_WaitForCompact();
975 
976   ASSERT_EQ("NOT_FOUND", Get("3"));
977 }
978 
TEST_F(DBCompactionTest,ZeroSeqIdCompaction)979 TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
980   Options options = CurrentOptions();
981   options.compaction_style = kCompactionStyleLevel;
982   options.level0_file_num_compaction_trigger = 3;
983 
984   FlushedFileCollector* collector = new FlushedFileCollector();
985   options.listeners.emplace_back(collector);
986 
987   // compaction options
988   CompactionOptions compact_opt;
989   compact_opt.compression = kNoCompression;
990   compact_opt.output_file_size_limit = 4096;
991   const size_t key_len =
992     static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
993 
994   DestroyAndReopen(options);
995 
996   std::vector<const Snapshot*> snaps;
997 
998   // create first file and flush to l0
999   for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
1000     Put(key, std::string(key_len, 'A'));
1001     snaps.push_back(dbfull()->GetSnapshot());
1002   }
1003   Flush();
1004   dbfull()->TEST_WaitForFlushMemTable();
1005 
1006   // create second file and flush to l0
1007   for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
1008     Put(key, std::string(key_len, 'A'));
1009     snaps.push_back(dbfull()->GetSnapshot());
1010   }
1011   Flush();
1012   dbfull()->TEST_WaitForFlushMemTable();
1013 
1014   // move both files down to l1
1015   dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
1016 
1017   // release snap so that first instance of key(3) can have seqId=0
1018   for (auto snap : snaps) {
1019     dbfull()->ReleaseSnapshot(snap);
1020   }
1021 
1022   // create 3 files in l0 so to trigger compaction
1023   for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
1024     Put("2", std::string(1, 'A'));
1025     Flush();
1026     dbfull()->TEST_WaitForFlushMemTable();
1027   }
1028 
1029   dbfull()->TEST_WaitForCompact();
1030   ASSERT_OK(Put("", ""));
1031 }
1032 
TEST_F(DBCompactionTest,ManualCompactionUnknownOutputSize)1033 TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
1034   // github issue #2249
1035   Options options = CurrentOptions();
1036   options.compaction_style = kCompactionStyleLevel;
1037   options.level0_file_num_compaction_trigger = 3;
1038   DestroyAndReopen(options);
1039 
1040   // create two files in l1 that we can compact
1041   for (int i = 0; i < 2; ++i) {
1042     for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
1043       // make l0 files' ranges overlap to avoid trivial move
1044       Put(std::to_string(2 * i), std::string(1, 'A'));
1045       Put(std::to_string(2 * i + 1), std::string(1, 'A'));
1046       Flush();
1047       dbfull()->TEST_WaitForFlushMemTable();
1048     }
1049     dbfull()->TEST_WaitForCompact();
1050     ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1051     ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
1052   }
1053 
1054   ColumnFamilyMetaData cf_meta;
1055   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
1056   ASSERT_EQ(2, cf_meta.levels[1].files.size());
1057   std::vector<std::string> input_filenames;
1058   for (const auto& sst_file : cf_meta.levels[1].files) {
1059     input_filenames.push_back(sst_file.name);
1060   }
1061 
1062   // note CompactionOptions::output_file_size_limit is unset.
1063   CompactionOptions compact_opt;
1064   compact_opt.compression = kNoCompression;
1065   dbfull()->CompactFiles(compact_opt, input_filenames, 1);
1066 }
1067 
1068 // Check that writes done during a memtable compaction are recovered
1069 // if the database is shutdown during the memtable compaction.
TEST_F(DBCompactionTest,RecoverDuringMemtableCompaction)1070 TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
1071   do {
1072     Options options = CurrentOptions();
1073     options.env = env_;
1074     CreateAndReopenWithCF({"pikachu"}, options);
1075 
1076     // Trigger a long memtable compaction and reopen the database during it
1077     ASSERT_OK(Put(1, "foo", "v1"));  // Goes to 1st log file
1078     ASSERT_OK(Put(1, "big1", std::string(10000000, 'x')));  // Fills memtable
1079     ASSERT_OK(Put(1, "big2", std::string(1000, 'y')));  // Triggers compaction
1080     ASSERT_OK(Put(1, "bar", "v2"));                     // Goes to new log file
1081 
1082     ReopenWithColumnFamilies({"default", "pikachu"}, options);
1083     ASSERT_EQ("v1", Get(1, "foo"));
1084     ASSERT_EQ("v2", Get(1, "bar"));
1085     ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1"));
1086     ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2"));
1087   } while (ChangeOptions());
1088 }
1089 
TEST_P(DBCompactionTestWithParam,TrivialMoveOneFile)1090 TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
1091   int32_t trivial_move = 0;
1092   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1093       "DBImpl::BackgroundCompaction:TrivialMove",
1094       [&](void* /*arg*/) { trivial_move++; });
1095   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1096 
1097   Options options = CurrentOptions();
1098   options.write_buffer_size = 100000000;
1099   options.max_subcompactions = max_subcompactions_;
1100   DestroyAndReopen(options);
1101 
1102   int32_t num_keys = 80;
1103   int32_t value_size = 100 * 1024;  // 100 KB
1104 
1105   Random rnd(301);
1106   std::vector<std::string> values;
1107   for (int i = 0; i < num_keys; i++) {
1108     values.push_back(RandomString(&rnd, value_size));
1109     ASSERT_OK(Put(Key(i), values[i]));
1110   }
1111 
1112   // Reopening moves updates to L0
1113   Reopen(options);
1114   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1);  // 1 file in L0
1115   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0);  // 0 files in L1
1116 
1117   std::vector<LiveFileMetaData> metadata;
1118   db_->GetLiveFilesMetaData(&metadata);
1119   ASSERT_EQ(metadata.size(), 1U);
1120   LiveFileMetaData level0_file = metadata[0];  // L0 file meta
1121 
1122   CompactRangeOptions cro;
1123   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1124 
1125   // Compaction will initiate a trivial move from L0 to L1
1126   dbfull()->CompactRange(cro, nullptr, nullptr);
1127 
1128   // File moved From L0 to L1
1129   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);  // 0 files in L0
1130   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1);  // 1 file in L1
1131 
1132   metadata.clear();
1133   db_->GetLiveFilesMetaData(&metadata);
1134   ASSERT_EQ(metadata.size(), 1U);
1135   ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
1136   ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
1137 
1138   for (int i = 0; i < num_keys; i++) {
1139     ASSERT_EQ(Get(Key(i)), values[i]);
1140   }
1141 
1142   ASSERT_EQ(trivial_move, 1);
1143   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1144 }
1145 
TEST_P(DBCompactionTestWithParam,TrivialMoveNonOverlappingFiles)1146 TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
1147   int32_t trivial_move = 0;
1148   int32_t non_trivial_move = 0;
1149   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1150       "DBImpl::BackgroundCompaction:TrivialMove",
1151       [&](void* /*arg*/) { trivial_move++; });
1152   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1153       "DBImpl::BackgroundCompaction:NonTrivial",
1154       [&](void* /*arg*/) { non_trivial_move++; });
1155   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1156 
1157   Options options = CurrentOptions();
1158   options.disable_auto_compactions = true;
1159   options.write_buffer_size = 10 * 1024 * 1024;
1160   options.max_subcompactions = max_subcompactions_;
1161 
1162   DestroyAndReopen(options);
1163   // non overlapping ranges
1164   std::vector<std::pair<int32_t, int32_t>> ranges = {
1165     {100, 199},
1166     {300, 399},
1167     {0, 99},
1168     {200, 299},
1169     {600, 699},
1170     {400, 499},
1171     {500, 550},
1172     {551, 599},
1173   };
1174   int32_t value_size = 10 * 1024;  // 10 KB
1175 
1176   Random rnd(301);
1177   std::map<int32_t, std::string> values;
1178   for (size_t i = 0; i < ranges.size(); i++) {
1179     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1180       values[j] = RandomString(&rnd, value_size);
1181       ASSERT_OK(Put(Key(j), values[j]));
1182     }
1183     ASSERT_OK(Flush());
1184   }
1185 
1186   int32_t level0_files = NumTableFilesAtLevel(0, 0);
1187   ASSERT_EQ(level0_files, ranges.size());    // Multiple files in L0
1188   ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0);  // No files in L1
1189 
1190   CompactRangeOptions cro;
1191   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1192 
1193   // Since data is non-overlapping we expect compaction to initiate
1194   // a trivial move
1195   db_->CompactRange(cro, nullptr, nullptr);
1196   // We expect that all the files were trivially moved from L0 to L1
1197   ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1198   ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
1199 
1200   for (size_t i = 0; i < ranges.size(); i++) {
1201     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1202       ASSERT_EQ(Get(Key(j)), values[j]);
1203     }
1204   }
1205 
1206   ASSERT_EQ(trivial_move, 1);
1207   ASSERT_EQ(non_trivial_move, 0);
1208 
1209   trivial_move = 0;
1210   non_trivial_move = 0;
1211   values.clear();
1212   DestroyAndReopen(options);
1213   // Same ranges as above but overlapping
1214   ranges = {
1215     {100, 199},
1216     {300, 399},
1217     {0, 99},
1218     {200, 299},
1219     {600, 699},
1220     {400, 499},
1221     {500, 560},  // this range overlap with the next one
1222     {551, 599},
1223   };
1224   for (size_t i = 0; i < ranges.size(); i++) {
1225     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1226       values[j] = RandomString(&rnd, value_size);
1227       ASSERT_OK(Put(Key(j), values[j]));
1228     }
1229     ASSERT_OK(Flush());
1230   }
1231 
1232   db_->CompactRange(cro, nullptr, nullptr);
1233 
1234   for (size_t i = 0; i < ranges.size(); i++) {
1235     for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1236       ASSERT_EQ(Get(Key(j)), values[j]);
1237     }
1238   }
1239   ASSERT_EQ(trivial_move, 0);
1240   ASSERT_EQ(non_trivial_move, 1);
1241 
1242   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1243 }
1244 
TEST_P(DBCompactionTestWithParam,TrivialMoveTargetLevel)1245 TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
1246   int32_t trivial_move = 0;
1247   int32_t non_trivial_move = 0;
1248   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1249       "DBImpl::BackgroundCompaction:TrivialMove",
1250       [&](void* /*arg*/) { trivial_move++; });
1251   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1252       "DBImpl::BackgroundCompaction:NonTrivial",
1253       [&](void* /*arg*/) { non_trivial_move++; });
1254   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1255 
1256   Options options = CurrentOptions();
1257   options.disable_auto_compactions = true;
1258   options.write_buffer_size = 10 * 1024 * 1024;
1259   options.num_levels = 7;
1260   options.max_subcompactions = max_subcompactions_;
1261 
1262   DestroyAndReopen(options);
1263   int32_t value_size = 10 * 1024;  // 10 KB
1264 
1265   // Add 2 non-overlapping files
1266   Random rnd(301);
1267   std::map<int32_t, std::string> values;
1268 
1269   // file 1 [0 => 300]
1270   for (int32_t i = 0; i <= 300; i++) {
1271     values[i] = RandomString(&rnd, value_size);
1272     ASSERT_OK(Put(Key(i), values[i]));
1273   }
1274   ASSERT_OK(Flush());
1275 
1276   // file 2 [600 => 700]
1277   for (int32_t i = 600; i <= 700; i++) {
1278     values[i] = RandomString(&rnd, value_size);
1279     ASSERT_OK(Put(Key(i), values[i]));
1280   }
1281   ASSERT_OK(Flush());
1282 
1283   // 2 files in L0
1284   ASSERT_EQ("2", FilesPerLevel(0));
1285   CompactRangeOptions compact_options;
1286   compact_options.change_level = true;
1287   compact_options.target_level = 6;
1288   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1289   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1290   // 2 files in L6
1291   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1292 
1293   ASSERT_EQ(trivial_move, 1);
1294   ASSERT_EQ(non_trivial_move, 0);
1295 
1296   for (int32_t i = 0; i <= 300; i++) {
1297     ASSERT_EQ(Get(Key(i)), values[i]);
1298   }
1299   for (int32_t i = 600; i <= 700; i++) {
1300     ASSERT_EQ(Get(Key(i)), values[i]);
1301   }
1302 }
1303 
TEST_P(DBCompactionTestWithParam,ManualCompactionPartial)1304 TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
1305   int32_t trivial_move = 0;
1306   int32_t non_trivial_move = 0;
1307   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1308       "DBImpl::BackgroundCompaction:TrivialMove",
1309       [&](void* /*arg*/) { trivial_move++; });
1310   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1311       "DBImpl::BackgroundCompaction:NonTrivial",
1312       [&](void* /*arg*/) { non_trivial_move++; });
1313   bool first = true;
1314   // Purpose of dependencies:
1315   // 4 -> 1: ensure the order of two non-trivial compactions
1316   // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
1317   // are installed
1318   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1319       {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
1320        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
1321        {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
1322   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1323       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1324         if (first) {
1325           first = false;
1326           TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
1327           TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
1328         } else {  // second non-trivial compaction
1329           TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
1330         }
1331       });
1332 
1333   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1334 
1335   Options options = CurrentOptions();
1336   options.write_buffer_size = 10 * 1024 * 1024;
1337   options.num_levels = 7;
1338   options.max_subcompactions = max_subcompactions_;
1339   options.level0_file_num_compaction_trigger = 3;
1340   options.max_background_compactions = 3;
1341   options.target_file_size_base = 1 << 23;  // 8 MB
1342 
1343   DestroyAndReopen(options);
1344   int32_t value_size = 10 * 1024;  // 10 KB
1345 
1346   // Add 2 non-overlapping files
1347   Random rnd(301);
1348   std::map<int32_t, std::string> values;
1349 
1350   // file 1 [0 => 100]
1351   for (int32_t i = 0; i < 100; i++) {
1352     values[i] = RandomString(&rnd, value_size);
1353     ASSERT_OK(Put(Key(i), values[i]));
1354   }
1355   ASSERT_OK(Flush());
1356 
1357   // file 2 [100 => 300]
1358   for (int32_t i = 100; i < 300; i++) {
1359     values[i] = RandomString(&rnd, value_size);
1360     ASSERT_OK(Put(Key(i), values[i]));
1361   }
1362   ASSERT_OK(Flush());
1363 
1364   // 2 files in L0
1365   ASSERT_EQ("2", FilesPerLevel(0));
1366   CompactRangeOptions compact_options;
1367   compact_options.change_level = true;
1368   compact_options.target_level = 6;
1369   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1370   // Trivial move the two non-overlapping files to level 6
1371   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1372   // 2 files in L6
1373   ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1374 
1375   ASSERT_EQ(trivial_move, 1);
1376   ASSERT_EQ(non_trivial_move, 0);
1377 
1378   // file 3 [ 0 => 200]
1379   for (int32_t i = 0; i < 200; i++) {
1380     values[i] = RandomString(&rnd, value_size);
1381     ASSERT_OK(Put(Key(i), values[i]));
1382   }
1383   ASSERT_OK(Flush());
1384 
1385   // 1 files in L0
1386   ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
1387   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1388   ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
1389   ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
1390   ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
1391   ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
1392   // 2 files in L6, 1 file in L5
1393   ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
1394 
1395   ASSERT_EQ(trivial_move, 6);
1396   ASSERT_EQ(non_trivial_move, 0);
1397 
1398   ROCKSDB_NAMESPACE::port::Thread threads([&] {
1399     compact_options.change_level = false;
1400     compact_options.exclusive_manual_compaction = false;
1401     std::string begin_string = Key(0);
1402     std::string end_string = Key(199);
1403     Slice begin(begin_string);
1404     Slice end(end_string);
1405     // First non-trivial compaction is triggered
1406     ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1407   });
1408 
1409   TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
1410   // file 4 [300 => 400)
1411   for (int32_t i = 300; i <= 400; i++) {
1412     values[i] = RandomString(&rnd, value_size);
1413     ASSERT_OK(Put(Key(i), values[i]));
1414   }
1415   ASSERT_OK(Flush());
1416 
1417   // file 5 [400 => 500)
1418   for (int32_t i = 400; i <= 500; i++) {
1419     values[i] = RandomString(&rnd, value_size);
1420     ASSERT_OK(Put(Key(i), values[i]));
1421   }
1422   ASSERT_OK(Flush());
1423 
1424   // file 6 [500 => 600)
1425   for (int32_t i = 500; i <= 600; i++) {
1426     values[i] = RandomString(&rnd, value_size);
1427     ASSERT_OK(Put(Key(i), values[i]));
1428   }
1429   // Second non-trivial compaction is triggered
1430   ASSERT_OK(Flush());
1431 
1432   // Before two non-trivial compactions are installed, there are 3 files in L0
1433   ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
1434   TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
1435 
1436   dbfull()->TEST_WaitForFlushMemTable();
1437   dbfull()->TEST_WaitForCompact();
1438   // After two non-trivial compactions are installed, there is 1 file in L6, and
1439   // 1 file in L1
1440   ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
1441   threads.join();
1442 
1443   for (int32_t i = 0; i < 600; i++) {
1444     ASSERT_EQ(Get(Key(i)), values[i]);
1445   }
1446 }
1447 
1448 // Disable as the test is flaky.
TEST_F(DBCompactionTest,DISABLED_ManualPartialFill)1449 TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
1450   int32_t trivial_move = 0;
1451   int32_t non_trivial_move = 0;
1452   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1453       "DBImpl::BackgroundCompaction:TrivialMove",
1454       [&](void* /*arg*/) { trivial_move++; });
1455   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1456       "DBImpl::BackgroundCompaction:NonTrivial",
1457       [&](void* /*arg*/) { non_trivial_move++; });
1458   bool first = true;
1459   bool second = true;
1460   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1461       {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
1462        {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
1463   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1464       "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1465         if (first) {
1466           TEST_SYNC_POINT("DBCompaction::PartialFill:4");
1467           first = false;
1468           TEST_SYNC_POINT("DBCompaction::PartialFill:3");
1469         } else if (second) {
1470         }
1471       });
1472 
1473   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1474 
1475   Options options = CurrentOptions();
1476   options.write_buffer_size = 10 * 1024 * 1024;
1477   options.max_bytes_for_level_multiplier = 2;
1478   options.num_levels = 4;
1479   options.level0_file_num_compaction_trigger = 3;
1480   options.max_background_compactions = 3;
1481 
1482   DestroyAndReopen(options);
1483   // make sure all background compaction jobs can be scheduled
1484   auto stop_token =
1485       dbfull()->TEST_write_controler().GetCompactionPressureToken();
1486   int32_t value_size = 10 * 1024;  // 10 KB
1487 
1488   // Add 2 non-overlapping files
1489   Random rnd(301);
1490   std::map<int32_t, std::string> values;
1491 
1492   // file 1 [0 => 100]
1493   for (int32_t i = 0; i < 100; i++) {
1494     values[i] = RandomString(&rnd, value_size);
1495     ASSERT_OK(Put(Key(i), values[i]));
1496   }
1497   ASSERT_OK(Flush());
1498 
1499   // file 2 [100 => 300]
1500   for (int32_t i = 100; i < 300; i++) {
1501     values[i] = RandomString(&rnd, value_size);
1502     ASSERT_OK(Put(Key(i), values[i]));
1503   }
1504   ASSERT_OK(Flush());
1505 
1506   // 2 files in L0
1507   ASSERT_EQ("2", FilesPerLevel(0));
1508   CompactRangeOptions compact_options;
1509   compact_options.change_level = true;
1510   compact_options.target_level = 2;
1511   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1512   // 2 files in L2
1513   ASSERT_EQ("0,0,2", FilesPerLevel(0));
1514 
1515   ASSERT_EQ(trivial_move, 1);
1516   ASSERT_EQ(non_trivial_move, 0);
1517 
1518   // file 3 [ 0 => 200]
1519   for (int32_t i = 0; i < 200; i++) {
1520     values[i] = RandomString(&rnd, value_size);
1521     ASSERT_OK(Put(Key(i), values[i]));
1522   }
1523   ASSERT_OK(Flush());
1524 
1525   // 2 files in L2, 1 in L0
1526   ASSERT_EQ("1,0,2", FilesPerLevel(0));
1527   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1528   // 2 files in L2, 1 in L1
1529   ASSERT_EQ("0,1,2", FilesPerLevel(0));
1530 
1531   ASSERT_EQ(trivial_move, 2);
1532   ASSERT_EQ(non_trivial_move, 0);
1533 
1534   ROCKSDB_NAMESPACE::port::Thread threads([&] {
1535     compact_options.change_level = false;
1536     compact_options.exclusive_manual_compaction = false;
1537     std::string begin_string = Key(0);
1538     std::string end_string = Key(199);
1539     Slice begin(begin_string);
1540     Slice end(end_string);
1541     ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1542   });
1543 
1544   TEST_SYNC_POINT("DBCompaction::PartialFill:1");
1545   // Many files 4 [300 => 4300)
1546   for (int32_t i = 0; i <= 5; i++) {
1547     for (int32_t j = 300; j < 4300; j++) {
1548       if (j == 2300) {
1549         ASSERT_OK(Flush());
1550         dbfull()->TEST_WaitForFlushMemTable();
1551       }
1552       values[j] = RandomString(&rnd, value_size);
1553       ASSERT_OK(Put(Key(j), values[j]));
1554     }
1555   }
1556 
1557   // Verify level sizes
1558   uint64_t target_size = 4 * options.max_bytes_for_level_base;
1559   for (int32_t i = 1; i < options.num_levels; i++) {
1560     ASSERT_LE(SizeAtLevel(i), target_size);
1561     target_size = static_cast<uint64_t>(target_size *
1562                                         options.max_bytes_for_level_multiplier);
1563   }
1564 
1565   TEST_SYNC_POINT("DBCompaction::PartialFill:2");
1566   dbfull()->TEST_WaitForFlushMemTable();
1567   dbfull()->TEST_WaitForCompact();
1568   threads.join();
1569 
1570   for (int32_t i = 0; i < 4300; i++) {
1571     ASSERT_EQ(Get(Key(i)), values[i]);
1572   }
1573 }
1574 
TEST_F(DBCompactionTest,ManualCompactionWithUnorderedWrite)1575 TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
1576   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1577       {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
1578         "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"},
1579        {"DBImpl::WaitForPendingWrites:BeforeBlock",
1580         "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
1581 
1582   Options options = CurrentOptions();
1583   options.unordered_write = true;
1584   DestroyAndReopen(options);
1585   Put("foo", "v1");
1586   ASSERT_OK(Flush());
1587 
1588   Put("bar", "v1");
1589   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1590   port::Thread writer([&]() { Put("foo", "v2"); });
1591 
1592   TEST_SYNC_POINT(
1593       "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
1594   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1595 
1596   writer.join();
1597   ASSERT_EQ(Get("foo"), "v2");
1598 
1599   SyncPoint::GetInstance()->DisableProcessing();
1600   SyncPoint::GetInstance()->ClearAllCallBacks();
1601 
1602   Reopen(options);
1603   ASSERT_EQ(Get("foo"), "v2");
1604 }
1605 
TEST_F(DBCompactionTest,DeleteFileRange)1606 TEST_F(DBCompactionTest, DeleteFileRange) {
1607   Options options = CurrentOptions();
1608   options.write_buffer_size = 10 * 1024 * 1024;
1609   options.max_bytes_for_level_multiplier = 2;
1610   options.num_levels = 4;
1611   options.level0_file_num_compaction_trigger = 3;
1612   options.max_background_compactions = 3;
1613 
1614   DestroyAndReopen(options);
1615   int32_t value_size = 10 * 1024;  // 10 KB
1616 
1617   // Add 2 non-overlapping files
1618   Random rnd(301);
1619   std::map<int32_t, std::string> values;
1620 
1621   // file 1 [0 => 100]
1622   for (int32_t i = 0; i < 100; i++) {
1623     values[i] = RandomString(&rnd, value_size);
1624     ASSERT_OK(Put(Key(i), values[i]));
1625   }
1626   ASSERT_OK(Flush());
1627 
1628   // file 2 [100 => 300]
1629   for (int32_t i = 100; i < 300; i++) {
1630     values[i] = RandomString(&rnd, value_size);
1631     ASSERT_OK(Put(Key(i), values[i]));
1632   }
1633   ASSERT_OK(Flush());
1634 
1635   // 2 files in L0
1636   ASSERT_EQ("2", FilesPerLevel(0));
1637   CompactRangeOptions compact_options;
1638   compact_options.change_level = true;
1639   compact_options.target_level = 2;
1640   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1641   // 2 files in L2
1642   ASSERT_EQ("0,0,2", FilesPerLevel(0));
1643 
1644   // file 3 [ 0 => 200]
1645   for (int32_t i = 0; i < 200; i++) {
1646     values[i] = RandomString(&rnd, value_size);
1647     ASSERT_OK(Put(Key(i), values[i]));
1648   }
1649   ASSERT_OK(Flush());
1650 
1651   // Many files 4 [300 => 4300)
1652   for (int32_t i = 0; i <= 5; i++) {
1653     for (int32_t j = 300; j < 4300; j++) {
1654       if (j == 2300) {
1655         ASSERT_OK(Flush());
1656         dbfull()->TEST_WaitForFlushMemTable();
1657       }
1658       values[j] = RandomString(&rnd, value_size);
1659       ASSERT_OK(Put(Key(j), values[j]));
1660     }
1661   }
1662   ASSERT_OK(Flush());
1663   dbfull()->TEST_WaitForFlushMemTable();
1664   dbfull()->TEST_WaitForCompact();
1665 
1666   // Verify level sizes
1667   uint64_t target_size = 4 * options.max_bytes_for_level_base;
1668   for (int32_t i = 1; i < options.num_levels; i++) {
1669     ASSERT_LE(SizeAtLevel(i), target_size);
1670     target_size = static_cast<uint64_t>(target_size *
1671                                         options.max_bytes_for_level_multiplier);
1672   }
1673 
1674   size_t old_num_files = CountFiles();
1675   std::string begin_string = Key(1000);
1676   std::string end_string = Key(2000);
1677   Slice begin(begin_string);
1678   Slice end(end_string);
1679   ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1680 
1681   int32_t deleted_count = 0;
1682   for (int32_t i = 0; i < 4300; i++) {
1683     if (i < 1000 || i > 2000) {
1684       ASSERT_EQ(Get(Key(i)), values[i]);
1685     } else {
1686       ReadOptions roptions;
1687       std::string result;
1688       Status s = db_->Get(roptions, Key(i), &result);
1689       ASSERT_TRUE(s.IsNotFound() || s.ok());
1690       if (s.IsNotFound()) {
1691         deleted_count++;
1692       }
1693     }
1694   }
1695   ASSERT_GT(deleted_count, 0);
1696   begin_string = Key(5000);
1697   end_string = Key(6000);
1698   Slice begin1(begin_string);
1699   Slice end1(end_string);
1700   // Try deleting files in range which contain no keys
1701   ASSERT_OK(
1702       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1));
1703 
1704   // Push data from level 0 to level 1 to force all data to be deleted
1705   // Note that we don't delete level 0 files
1706   compact_options.change_level = true;
1707   compact_options.target_level = 1;
1708   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1709   dbfull()->TEST_WaitForCompact();
1710 
1711   ASSERT_OK(
1712       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
1713 
1714   int32_t deleted_count2 = 0;
1715   for (int32_t i = 0; i < 4300; i++) {
1716     ReadOptions roptions;
1717     std::string result;
1718     Status s = db_->Get(roptions, Key(i), &result);
1719     ASSERT_TRUE(s.IsNotFound());
1720     deleted_count2++;
1721   }
1722   ASSERT_GT(deleted_count2, deleted_count);
1723   size_t new_num_files = CountFiles();
1724   ASSERT_GT(old_num_files, new_num_files);
1725 }
1726 
TEST_F(DBCompactionTest,DeleteFilesInRanges)1727 TEST_F(DBCompactionTest, DeleteFilesInRanges) {
1728   Options options = CurrentOptions();
1729   options.write_buffer_size = 10 * 1024 * 1024;
1730   options.max_bytes_for_level_multiplier = 2;
1731   options.num_levels = 4;
1732   options.max_background_compactions = 3;
1733   options.disable_auto_compactions = true;
1734 
1735   DestroyAndReopen(options);
1736   int32_t value_size = 10 * 1024;  // 10 KB
1737 
1738   Random rnd(301);
1739   std::map<int32_t, std::string> values;
1740 
1741   // file [0 => 100), [100 => 200), ... [900, 1000)
1742   for (auto i = 0; i < 10; i++) {
1743     for (auto j = 0; j < 100; j++) {
1744       auto k = i * 100 + j;
1745       values[k] = RandomString(&rnd, value_size);
1746       ASSERT_OK(Put(Key(k), values[k]));
1747     }
1748     ASSERT_OK(Flush());
1749   }
1750   ASSERT_EQ("10", FilesPerLevel(0));
1751   CompactRangeOptions compact_options;
1752   compact_options.change_level = true;
1753   compact_options.target_level = 2;
1754   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1755   ASSERT_EQ("0,0,10", FilesPerLevel(0));
1756 
1757   // file [0 => 100), [200 => 300), ... [800, 900)
1758   for (auto i = 0; i < 10; i+=2) {
1759     for (auto j = 0; j < 100; j++) {
1760       auto k = i * 100 + j;
1761       ASSERT_OK(Put(Key(k), values[k]));
1762     }
1763     ASSERT_OK(Flush());
1764   }
1765   ASSERT_EQ("5,0,10", FilesPerLevel(0));
1766   ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1767   ASSERT_EQ("0,5,10", FilesPerLevel(0));
1768 
1769   // Delete files in range [0, 299] (inclusive)
1770   {
1771     auto begin_str1 = Key(0), end_str1 = Key(100);
1772     auto begin_str2 = Key(100), end_str2 = Key(200);
1773     auto begin_str3 = Key(200), end_str3 = Key(299);
1774     Slice begin1(begin_str1), end1(end_str1);
1775     Slice begin2(begin_str2), end2(end_str2);
1776     Slice begin3(begin_str3), end3(end_str3);
1777     std::vector<RangePtr> ranges;
1778     ranges.push_back(RangePtr(&begin1, &end1));
1779     ranges.push_back(RangePtr(&begin2, &end2));
1780     ranges.push_back(RangePtr(&begin3, &end3));
1781     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1782                                   ranges.data(), ranges.size()));
1783     ASSERT_EQ("0,3,7", FilesPerLevel(0));
1784 
1785     // Keys [0, 300) should not exist.
1786     for (auto i = 0; i < 300; i++) {
1787       ReadOptions ropts;
1788       std::string result;
1789       auto s = db_->Get(ropts, Key(i), &result);
1790       ASSERT_TRUE(s.IsNotFound());
1791     }
1792     for (auto i = 300; i < 1000; i++) {
1793       ASSERT_EQ(Get(Key(i)), values[i]);
1794     }
1795   }
1796 
1797   // Delete files in range [600, 999) (exclusive)
1798   {
1799     auto begin_str1 = Key(600), end_str1 = Key(800);
1800     auto begin_str2 = Key(700), end_str2 = Key(900);
1801     auto begin_str3 = Key(800), end_str3 = Key(999);
1802     Slice begin1(begin_str1), end1(end_str1);
1803     Slice begin2(begin_str2), end2(end_str2);
1804     Slice begin3(begin_str3), end3(end_str3);
1805     std::vector<RangePtr> ranges;
1806     ranges.push_back(RangePtr(&begin1, &end1));
1807     ranges.push_back(RangePtr(&begin2, &end2));
1808     ranges.push_back(RangePtr(&begin3, &end3));
1809     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1810                                   ranges.data(), ranges.size(), false));
1811     ASSERT_EQ("0,1,4", FilesPerLevel(0));
1812 
1813     // Keys [600, 900) should not exist.
1814     for (auto i = 600; i < 900; i++) {
1815       ReadOptions ropts;
1816       std::string result;
1817       auto s = db_->Get(ropts, Key(i), &result);
1818       ASSERT_TRUE(s.IsNotFound());
1819     }
1820     for (auto i = 300; i < 600; i++) {
1821       ASSERT_EQ(Get(Key(i)), values[i]);
1822     }
1823     for (auto i = 900; i < 1000; i++) {
1824       ASSERT_EQ(Get(Key(i)), values[i]);
1825     }
1826   }
1827 
1828   // Delete all files.
1829   {
1830     RangePtr range;
1831     ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
1832     ASSERT_EQ("", FilesPerLevel(0));
1833 
1834     for (auto i = 0; i < 1000; i++) {
1835       ReadOptions ropts;
1836       std::string result;
1837       auto s = db_->Get(ropts, Key(i), &result);
1838       ASSERT_TRUE(s.IsNotFound());
1839     }
1840   }
1841 }
1842 
TEST_F(DBCompactionTest,DeleteFileRangeFileEndpointsOverlapBug)1843 TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
1844   // regression test for #2833: groups of files whose user-keys overlap at the
1845   // endpoints could be split by `DeleteFilesInRange`. This caused old data to
1846   // reappear, either because a new version of the key was removed, or a range
1847   // deletion was partially dropped. It could also cause non-overlapping
1848   // invariant to be violated if the files dropped by DeleteFilesInRange were
1849   // a subset of files that a range deletion spans.
1850   const int kNumL0Files = 2;
1851   const int kValSize = 8 << 10;  // 8KB
1852   Options options = CurrentOptions();
1853   options.level0_file_num_compaction_trigger = kNumL0Files;
1854   options.target_file_size_base = 1 << 10;  // 1KB
1855   DestroyAndReopen(options);
1856 
1857   // The snapshot prevents key 1 from having its old version dropped. The low
1858   // `target_file_size_base` ensures two keys will be in each output file.
1859   const Snapshot* snapshot = nullptr;
1860   Random rnd(301);
1861   // The value indicates which flush the key belonged to, which is enough
1862   // for us to determine the keys' relative ages. After L0 flushes finish,
1863   // files look like:
1864   //
1865   // File 0: 0 -> vals[0], 1 -> vals[0]
1866   // File 1:               1 -> vals[1], 2 -> vals[1]
1867   //
1868   // Then L0->L1 compaction happens, which outputs keys as follows:
1869   //
1870   // File 0: 0 -> vals[0], 1 -> vals[1]
1871   // File 1:               1 -> vals[0], 2 -> vals[1]
1872   //
1873   // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
1874   // would cause `1 -> vals[0]` (an older key) to reappear.
1875   std::string vals[kNumL0Files];
1876   for (int i = 0; i < kNumL0Files; ++i) {
1877     vals[i] = RandomString(&rnd, kValSize);
1878     Put(Key(i), vals[i]);
1879     Put(Key(i + 1), vals[i]);
1880     Flush();
1881     if (i == 0) {
1882       snapshot = db_->GetSnapshot();
1883     }
1884   }
1885   dbfull()->TEST_WaitForCompact();
1886 
1887   // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
1888   // "1 -> vals[0]" to reappear.
1889   std::string begin_str = Key(0), end_str = Key(1);
1890   Slice begin = begin_str, end = end_str;
1891   ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1892   ASSERT_EQ(vals[1], Get(Key(1)));
1893 
1894   db_->ReleaseSnapshot(snapshot);
1895 }
1896 
TEST_P(DBCompactionTestWithParam,TrivialMoveToLastLevelWithFiles)1897 TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
1898   int32_t trivial_move = 0;
1899   int32_t non_trivial_move = 0;
1900   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1901       "DBImpl::BackgroundCompaction:TrivialMove",
1902       [&](void* /*arg*/) { trivial_move++; });
1903   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1904       "DBImpl::BackgroundCompaction:NonTrivial",
1905       [&](void* /*arg*/) { non_trivial_move++; });
1906   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1907 
1908   Options options = CurrentOptions();
1909   options.write_buffer_size = 100000000;
1910   options.max_subcompactions = max_subcompactions_;
1911   DestroyAndReopen(options);
1912 
1913   int32_t value_size = 10 * 1024;  // 10 KB
1914 
1915   Random rnd(301);
1916   std::vector<std::string> values;
1917   // File with keys [ 0 => 99 ]
1918   for (int i = 0; i < 100; i++) {
1919     values.push_back(RandomString(&rnd, value_size));
1920     ASSERT_OK(Put(Key(i), values[i]));
1921   }
1922   ASSERT_OK(Flush());
1923 
1924   ASSERT_EQ("1", FilesPerLevel(0));
1925   // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
1926   CompactRangeOptions compact_options;
1927   compact_options.change_level = true;
1928   compact_options.target_level = 3;
1929   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1930   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1931   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
1932   ASSERT_EQ(trivial_move, 1);
1933   ASSERT_EQ(non_trivial_move, 0);
1934 
1935   // File with keys [ 100 => 199 ]
1936   for (int i = 100; i < 200; i++) {
1937     values.push_back(RandomString(&rnd, value_size));
1938     ASSERT_OK(Put(Key(i), values[i]));
1939   }
1940   ASSERT_OK(Flush());
1941 
1942   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
1943   CompactRangeOptions cro;
1944   cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1945   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
1946   ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1947   ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
1948   ASSERT_EQ(trivial_move, 4);
1949   ASSERT_EQ(non_trivial_move, 0);
1950 
1951   for (int i = 0; i < 200; i++) {
1952     ASSERT_EQ(Get(Key(i)), values[i]);
1953   }
1954 
1955   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1956 }
1957 
TEST_P(DBCompactionTestWithParam,LevelCompactionThirdPath)1958 TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
1959   Options options = CurrentOptions();
1960   options.db_paths.emplace_back(dbname_, 500 * 1024);
1961   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1962   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1963   options.memtable_factory.reset(
1964       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1965   options.compaction_style = kCompactionStyleLevel;
1966   options.write_buffer_size = 110 << 10;  // 110KB
1967   options.arena_block_size = 4 << 10;
1968   options.level0_file_num_compaction_trigger = 2;
1969   options.num_levels = 4;
1970   options.max_bytes_for_level_base = 400 * 1024;
1971   options.max_subcompactions = max_subcompactions_;
1972   //  options = CurrentOptions(options);
1973 
1974   std::vector<std::string> filenames;
1975   env_->GetChildren(options.db_paths[1].path, &filenames);
1976   // Delete archival files.
1977   for (size_t i = 0; i < filenames.size(); ++i) {
1978     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1979   }
1980   env_->DeleteDir(options.db_paths[1].path);
1981   Reopen(options);
1982 
1983   Random rnd(301);
1984   int key_idx = 0;
1985 
1986   // First three 110KB files are not going to second path.
1987   // After that, (100K, 200K)
1988   for (int num = 0; num < 3; num++) {
1989     GenerateNewFile(&rnd, &key_idx);
1990   }
1991 
1992   // Another 110KB triggers a compaction to 400K file to fill up first path
1993   GenerateNewFile(&rnd, &key_idx);
1994   ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
1995 
1996   // (1, 4)
1997   GenerateNewFile(&rnd, &key_idx);
1998   ASSERT_EQ("1,4", FilesPerLevel(0));
1999   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2000   ASSERT_EQ(1, GetSstFileCount(dbname_));
2001 
2002   // (1, 4, 1)
2003   GenerateNewFile(&rnd, &key_idx);
2004   ASSERT_EQ("1,4,1", FilesPerLevel(0));
2005   ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
2006   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2007   ASSERT_EQ(1, GetSstFileCount(dbname_));
2008 
2009   // (1, 4, 2)
2010   GenerateNewFile(&rnd, &key_idx);
2011   ASSERT_EQ("1,4,2", FilesPerLevel(0));
2012   ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
2013   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2014   ASSERT_EQ(1, GetSstFileCount(dbname_));
2015 
2016   // (1, 4, 3)
2017   GenerateNewFile(&rnd, &key_idx);
2018   ASSERT_EQ("1,4,3", FilesPerLevel(0));
2019   ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
2020   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2021   ASSERT_EQ(1, GetSstFileCount(dbname_));
2022 
2023   // (1, 4, 4)
2024   GenerateNewFile(&rnd, &key_idx);
2025   ASSERT_EQ("1,4,4", FilesPerLevel(0));
2026   ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
2027   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2028   ASSERT_EQ(1, GetSstFileCount(dbname_));
2029 
2030   // (1, 4, 5)
2031   GenerateNewFile(&rnd, &key_idx);
2032   ASSERT_EQ("1,4,5", FilesPerLevel(0));
2033   ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
2034   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2035   ASSERT_EQ(1, GetSstFileCount(dbname_));
2036 
2037   // (1, 4, 6)
2038   GenerateNewFile(&rnd, &key_idx);
2039   ASSERT_EQ("1,4,6", FilesPerLevel(0));
2040   ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
2041   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2042   ASSERT_EQ(1, GetSstFileCount(dbname_));
2043 
2044   // (1, 4, 7)
2045   GenerateNewFile(&rnd, &key_idx);
2046   ASSERT_EQ("1,4,7", FilesPerLevel(0));
2047   ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
2048   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2049   ASSERT_EQ(1, GetSstFileCount(dbname_));
2050 
2051   // (1, 4, 8)
2052   GenerateNewFile(&rnd, &key_idx);
2053   ASSERT_EQ("1,4,8", FilesPerLevel(0));
2054   ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
2055   ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2056   ASSERT_EQ(1, GetSstFileCount(dbname_));
2057 
2058   for (int i = 0; i < key_idx; i++) {
2059     auto v = Get(Key(i));
2060     ASSERT_NE(v, "NOT_FOUND");
2061     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2062   }
2063 
2064   Reopen(options);
2065 
2066   for (int i = 0; i < key_idx; i++) {
2067     auto v = Get(Key(i));
2068     ASSERT_NE(v, "NOT_FOUND");
2069     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2070   }
2071 
2072   Destroy(options);
2073 }
2074 
TEST_P(DBCompactionTestWithParam,LevelCompactionPathUse)2075 TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
2076   Options options = CurrentOptions();
2077   options.db_paths.emplace_back(dbname_, 500 * 1024);
2078   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2079   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2080   options.memtable_factory.reset(
2081       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2082   options.compaction_style = kCompactionStyleLevel;
2083   options.write_buffer_size = 110 << 10;  // 110KB
2084   options.arena_block_size = 4 << 10;
2085   options.level0_file_num_compaction_trigger = 2;
2086   options.num_levels = 4;
2087   options.max_bytes_for_level_base = 400 * 1024;
2088   options.max_subcompactions = max_subcompactions_;
2089   //  options = CurrentOptions(options);
2090 
2091   std::vector<std::string> filenames;
2092   env_->GetChildren(options.db_paths[1].path, &filenames);
2093   // Delete archival files.
2094   for (size_t i = 0; i < filenames.size(); ++i) {
2095     env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
2096   }
2097   env_->DeleteDir(options.db_paths[1].path);
2098   Reopen(options);
2099 
2100   Random rnd(301);
2101   int key_idx = 0;
2102 
2103   // Always gets compacted into 1 Level1 file,
2104   // 0/1 Level 0 file
2105   for (int num = 0; num < 3; num++) {
2106     key_idx = 0;
2107     GenerateNewFile(&rnd, &key_idx);
2108   }
2109 
2110   key_idx = 0;
2111   GenerateNewFile(&rnd, &key_idx);
2112   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2113 
2114   key_idx = 0;
2115   GenerateNewFile(&rnd, &key_idx);
2116   ASSERT_EQ("1,1", FilesPerLevel(0));
2117   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2118   ASSERT_EQ(1, GetSstFileCount(dbname_));
2119 
2120   key_idx = 0;
2121   GenerateNewFile(&rnd, &key_idx);
2122   ASSERT_EQ("0,1", FilesPerLevel(0));
2123   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2124   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2125   ASSERT_EQ(0, GetSstFileCount(dbname_));
2126 
2127   key_idx = 0;
2128   GenerateNewFile(&rnd, &key_idx);
2129   ASSERT_EQ("1,1", FilesPerLevel(0));
2130   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2131   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2132   ASSERT_EQ(1, GetSstFileCount(dbname_));
2133 
2134   key_idx = 0;
2135   GenerateNewFile(&rnd, &key_idx);
2136   ASSERT_EQ("0,1", FilesPerLevel(0));
2137   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2138   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2139   ASSERT_EQ(0, GetSstFileCount(dbname_));
2140 
2141   key_idx = 0;
2142   GenerateNewFile(&rnd, &key_idx);
2143   ASSERT_EQ("1,1", FilesPerLevel(0));
2144   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2145   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2146   ASSERT_EQ(1, GetSstFileCount(dbname_));
2147 
2148   key_idx = 0;
2149   GenerateNewFile(&rnd, &key_idx);
2150   ASSERT_EQ("0,1", FilesPerLevel(0));
2151   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2152   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2153   ASSERT_EQ(0, GetSstFileCount(dbname_));
2154 
2155   key_idx = 0;
2156   GenerateNewFile(&rnd, &key_idx);
2157   ASSERT_EQ("1,1", FilesPerLevel(0));
2158   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2159   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2160   ASSERT_EQ(1, GetSstFileCount(dbname_));
2161 
2162   key_idx = 0;
2163   GenerateNewFile(&rnd, &key_idx);
2164   ASSERT_EQ("0,1", FilesPerLevel(0));
2165   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2166   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2167   ASSERT_EQ(0, GetSstFileCount(dbname_));
2168 
2169   key_idx = 0;
2170   GenerateNewFile(&rnd, &key_idx);
2171   ASSERT_EQ("1,1", FilesPerLevel(0));
2172   ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2173   ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2174   ASSERT_EQ(1, GetSstFileCount(dbname_));
2175 
2176   for (int i = 0; i < key_idx; i++) {
2177     auto v = Get(Key(i));
2178     ASSERT_NE(v, "NOT_FOUND");
2179     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2180   }
2181 
2182   Reopen(options);
2183 
2184   for (int i = 0; i < key_idx; i++) {
2185     auto v = Get(Key(i));
2186     ASSERT_NE(v, "NOT_FOUND");
2187     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2188   }
2189 
2190   Destroy(options);
2191 }
2192 
TEST_P(DBCompactionTestWithParam,LevelCompactionCFPathUse)2193 TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
2194   Options options = CurrentOptions();
2195   options.db_paths.emplace_back(dbname_, 500 * 1024);
2196   options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2197   options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2198   options.memtable_factory.reset(
2199     new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2200   options.compaction_style = kCompactionStyleLevel;
2201   options.write_buffer_size = 110 << 10;  // 110KB
2202   options.arena_block_size = 4 << 10;
2203   options.level0_file_num_compaction_trigger = 2;
2204   options.num_levels = 4;
2205   options.max_bytes_for_level_base = 400 * 1024;
2206   options.max_subcompactions = max_subcompactions_;
2207 
2208   std::vector<Options> option_vector;
2209   option_vector.emplace_back(options);
2210   ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
2211   // Configure CF1 specific paths.
2212   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
2213   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
2214   cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
2215   option_vector.emplace_back(DBOptions(options), cf_opt1);
2216   CreateColumnFamilies({"one"},option_vector[1]);
2217 
2218   // Configura CF2 specific paths.
2219   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
2220   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
2221   cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
2222   option_vector.emplace_back(DBOptions(options), cf_opt2);
2223   CreateColumnFamilies({"two"},option_vector[2]);
2224 
2225   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2226 
2227   Random rnd(301);
2228   int key_idx = 0;
2229   int key_idx1 = 0;
2230   int key_idx2 = 0;
2231 
2232   auto generate_file = [&]() {
2233     GenerateNewFile(0, &rnd, &key_idx);
2234     GenerateNewFile(1, &rnd, &key_idx1);
2235     GenerateNewFile(2, &rnd, &key_idx2);
2236   };
2237 
2238   auto check_sstfilecount = [&](int path_id, int expected) {
2239     ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
2240     ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
2241     ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
2242   };
2243 
2244   auto check_filesperlevel = [&](const std::string& expected) {
2245     ASSERT_EQ(expected, FilesPerLevel(0));
2246     ASSERT_EQ(expected, FilesPerLevel(1));
2247     ASSERT_EQ(expected, FilesPerLevel(2));
2248   };
2249 
2250   auto check_getvalues = [&]() {
2251     for (int i = 0; i < key_idx; i++) {
2252       auto v = Get(0, Key(i));
2253       ASSERT_NE(v, "NOT_FOUND");
2254       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2255     }
2256 
2257     for (int i = 0; i < key_idx1; i++) {
2258       auto v = Get(1, Key(i));
2259       ASSERT_NE(v, "NOT_FOUND");
2260       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2261     }
2262 
2263     for (int i = 0; i < key_idx2; i++) {
2264       auto v = Get(2, Key(i));
2265       ASSERT_NE(v, "NOT_FOUND");
2266       ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2267     }
2268   };
2269 
2270   // Check that default column family uses db_paths.
2271   // And Column family "one" uses cf_paths.
2272 
2273   // First three 110KB files are not going to second path.
2274   // After that, (100K, 200K)
2275   for (int num = 0; num < 3; num++) {
2276     generate_file();
2277   }
2278 
2279   // Another 110KB triggers a compaction to 400K file to fill up first path
2280   generate_file();
2281   check_sstfilecount(1, 3);
2282 
2283   // (1, 4)
2284   generate_file();
2285   check_filesperlevel("1,4");
2286   check_sstfilecount(1, 4);
2287   check_sstfilecount(0, 1);
2288 
2289   // (1, 4, 1)
2290   generate_file();
2291   check_filesperlevel("1,4,1");
2292   check_sstfilecount(2, 1);
2293   check_sstfilecount(1, 4);
2294   check_sstfilecount(0, 1);
2295 
2296   // (1, 4, 2)
2297   generate_file();
2298   check_filesperlevel("1,4,2");
2299   check_sstfilecount(2, 2);
2300   check_sstfilecount(1, 4);
2301   check_sstfilecount(0, 1);
2302 
2303   check_getvalues();
2304 
2305   ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2306 
2307   check_getvalues();
2308 
2309   Destroy(options, true);
2310 }
2311 
TEST_P(DBCompactionTestWithParam,ConvertCompactionStyle)2312 TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
2313   Random rnd(301);
2314   int max_key_level_insert = 200;
2315   int max_key_universal_insert = 600;
2316 
2317   // Stage 1: generate a db with level compaction
2318   Options options = CurrentOptions();
2319   options.write_buffer_size = 110 << 10;  // 110KB
2320   options.arena_block_size = 4 << 10;
2321   options.num_levels = 4;
2322   options.level0_file_num_compaction_trigger = 3;
2323   options.max_bytes_for_level_base = 500 << 10;  // 500KB
2324   options.max_bytes_for_level_multiplier = 1;
2325   options.target_file_size_base = 200 << 10;  // 200KB
2326   options.target_file_size_multiplier = 1;
2327   options.max_subcompactions = max_subcompactions_;
2328   CreateAndReopenWithCF({"pikachu"}, options);
2329 
2330   for (int i = 0; i <= max_key_level_insert; i++) {
2331     // each value is 10K
2332     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2333   }
2334   ASSERT_OK(Flush(1));
2335   dbfull()->TEST_WaitForCompact();
2336 
2337   ASSERT_GT(TotalTableFiles(1, 4), 1);
2338   int non_level0_num_files = 0;
2339   for (int i = 1; i < options.num_levels; i++) {
2340     non_level0_num_files += NumTableFilesAtLevel(i, 1);
2341   }
2342   ASSERT_GT(non_level0_num_files, 0);
2343 
2344   // Stage 2: reopen with universal compaction - should fail
2345   options = CurrentOptions();
2346   options.compaction_style = kCompactionStyleUniversal;
2347   options.num_levels = 1;
2348   options = CurrentOptions(options);
2349   Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2350   ASSERT_TRUE(s.IsInvalidArgument());
2351 
2352   // Stage 3: compact into a single file and move the file to level 0
2353   options = CurrentOptions();
2354   options.disable_auto_compactions = true;
2355   options.target_file_size_base = INT_MAX;
2356   options.target_file_size_multiplier = 1;
2357   options.max_bytes_for_level_base = INT_MAX;
2358   options.max_bytes_for_level_multiplier = 1;
2359   options.num_levels = 4;
2360   options = CurrentOptions(options);
2361   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2362 
2363   CompactRangeOptions compact_options;
2364   compact_options.change_level = true;
2365   compact_options.target_level = 0;
2366   // cannot use kForceOptimized here because the compaction here is expected
2367   // to generate one output file
2368   compact_options.bottommost_level_compaction =
2369       BottommostLevelCompaction::kForce;
2370   compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2371   dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2372 
2373   // Only 1 file in L0
2374   ASSERT_EQ("1", FilesPerLevel(1));
2375 
2376   // Stage 4: re-open in universal compaction style and do some db operations
2377   options = CurrentOptions();
2378   options.compaction_style = kCompactionStyleUniversal;
2379   options.num_levels = 4;
2380   options.write_buffer_size = 110 << 10;  // 110KB
2381   options.arena_block_size = 4 << 10;
2382   options.level0_file_num_compaction_trigger = 3;
2383   options = CurrentOptions(options);
2384   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2385 
2386   options.num_levels = 1;
2387   ReopenWithColumnFamilies({"default", "pikachu"}, options);
2388 
2389   for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
2390     ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2391   }
2392   dbfull()->Flush(FlushOptions());
2393   ASSERT_OK(Flush(1));
2394   dbfull()->TEST_WaitForCompact();
2395 
2396   for (int i = 1; i < options.num_levels; i++) {
2397     ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
2398   }
2399 
2400   // verify keys inserted in both level compaction style and universal
2401   // compaction style
2402   std::string keys_in_db;
2403   Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
2404   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2405     keys_in_db.append(iter->key().ToString());
2406     keys_in_db.push_back(',');
2407   }
2408   delete iter;
2409 
2410   std::string expected_keys;
2411   for (int i = 0; i <= max_key_universal_insert; i++) {
2412     expected_keys.append(Key(i));
2413     expected_keys.push_back(',');
2414   }
2415 
2416   ASSERT_EQ(keys_in_db, expected_keys);
2417 }
2418 
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_a)2419 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
2420   do {
2421     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2422     ASSERT_OK(Put(1, "b", "v"));
2423     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2424     ASSERT_OK(Delete(1, "b"));
2425     ASSERT_OK(Delete(1, "a"));
2426     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2427     ASSERT_OK(Delete(1, "a"));
2428     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2429     ASSERT_OK(Put(1, "a", "v"));
2430     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2431     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2432     ASSERT_EQ("(a->v)", Contents(1));
2433     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2434     ASSERT_EQ("(a->v)", Contents(1));
2435   } while (ChangeCompactOptions());
2436 }
2437 
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_b)2438 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
2439   do {
2440     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2441     Put(1, "", "");
2442     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2443     Delete(1, "e");
2444     Put(1, "", "");
2445     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2446     Put(1, "c", "cv");
2447     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2448     Put(1, "", "");
2449     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2450     Put(1, "", "");
2451     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2452     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2453     Put(1, "d", "dv");
2454     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2455     Put(1, "", "");
2456     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2457     Delete(1, "d");
2458     Delete(1, "b");
2459     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2460     ASSERT_EQ("(->)(c->cv)", Contents(1));
2461     env_->SleepForMicroseconds(1000000);  // Wait for compaction to finish
2462     ASSERT_EQ("(->)(c->cv)", Contents(1));
2463   } while (ChangeCompactOptions());
2464 }
2465 
TEST_F(DBCompactionTest,ManualAutoRace)2466 TEST_F(DBCompactionTest, ManualAutoRace) {
2467   CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2468   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2469       {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
2470        {"DBImpl::RunManualCompaction:WaitScheduled",
2471         "BackgroundCallCompaction:0"}});
2472 
2473   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2474 
2475   Put(1, "foo", "");
2476   Put(1, "bar", "");
2477   Flush(1);
2478   Put(1, "foo", "");
2479   Put(1, "bar", "");
2480   // Generate four files in CF 0, which should trigger an auto compaction
2481   Put("foo", "");
2482   Put("bar", "");
2483   Flush();
2484   Put("foo", "");
2485   Put("bar", "");
2486   Flush();
2487   Put("foo", "");
2488   Put("bar", "");
2489   Flush();
2490   Put("foo", "");
2491   Put("bar", "");
2492   Flush();
2493 
2494   // The auto compaction is scheduled but waited until here
2495   TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
2496   // The auto compaction will wait until the manual compaction is registerd
2497   // before processing so that it will be cancelled.
2498   dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
2499   ASSERT_EQ("0,1", FilesPerLevel(1));
2500 
2501   // Eventually the cancelled compaction will be rescheduled and executed.
2502   dbfull()->TEST_WaitForCompact();
2503   ASSERT_EQ("0,1", FilesPerLevel(0));
2504   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2505 }
2506 
TEST_P(DBCompactionTestWithParam,ManualCompaction)2507 TEST_P(DBCompactionTestWithParam, ManualCompaction) {
2508   Options options = CurrentOptions();
2509   options.max_subcompactions = max_subcompactions_;
2510   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2511   CreateAndReopenWithCF({"pikachu"}, options);
2512 
2513   // iter - 0 with 7 levels
2514   // iter - 1 with 3 levels
2515   for (int iter = 0; iter < 2; ++iter) {
2516     MakeTables(3, "p", "q", 1);
2517     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2518 
2519     // Compaction range falls before files
2520     Compact(1, "", "c");
2521     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2522 
2523     // Compaction range falls after files
2524     Compact(1, "r", "z");
2525     ASSERT_EQ("1,1,1", FilesPerLevel(1));
2526 
2527     // Compaction range overlaps files
2528     Compact(1, "p", "q");
2529     ASSERT_EQ("0,0,1", FilesPerLevel(1));
2530 
2531     // Populate a different range
2532     MakeTables(3, "c", "e", 1);
2533     ASSERT_EQ("1,1,2", FilesPerLevel(1));
2534 
2535     // Compact just the new range
2536     Compact(1, "b", "f");
2537     ASSERT_EQ("0,0,2", FilesPerLevel(1));
2538 
2539     // Compact all
2540     MakeTables(1, "a", "z", 1);
2541     ASSERT_EQ("1,0,2", FilesPerLevel(1));
2542 
2543     uint64_t prev_block_cache_add =
2544         options.statistics->getTickerCount(BLOCK_CACHE_ADD);
2545     CompactRangeOptions cro;
2546     cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2547     db_->CompactRange(cro, handles_[1], nullptr, nullptr);
2548     // Verify manual compaction doesn't fill block cache
2549     ASSERT_EQ(prev_block_cache_add,
2550               options.statistics->getTickerCount(BLOCK_CACHE_ADD));
2551 
2552     ASSERT_EQ("0,0,1", FilesPerLevel(1));
2553 
2554     if (iter == 0) {
2555       options = CurrentOptions();
2556       options.num_levels = 3;
2557       options.create_if_missing = true;
2558       options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2559       DestroyAndReopen(options);
2560       CreateAndReopenWithCF({"pikachu"}, options);
2561     }
2562   }
2563 }
2564 
2565 
TEST_P(DBCompactionTestWithParam,ManualLevelCompactionOutputPathId)2566 TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
2567   Options options = CurrentOptions();
2568   options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2569   options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2570   options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2571   options.max_subcompactions = max_subcompactions_;
2572   CreateAndReopenWithCF({"pikachu"}, options);
2573 
2574   // iter - 0 with 7 levels
2575   // iter - 1 with 3 levels
2576   for (int iter = 0; iter < 2; ++iter) {
2577     for (int i = 0; i < 3; ++i) {
2578       ASSERT_OK(Put(1, "p", "begin"));
2579       ASSERT_OK(Put(1, "q", "end"));
2580       ASSERT_OK(Flush(1));
2581     }
2582     ASSERT_EQ("3", FilesPerLevel(1));
2583     ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
2584     ASSERT_EQ(0, GetSstFileCount(dbname_));
2585 
2586     // Compaction range falls before files
2587     Compact(1, "", "c");
2588     ASSERT_EQ("3", FilesPerLevel(1));
2589 
2590     // Compaction range falls after files
2591     Compact(1, "r", "z");
2592     ASSERT_EQ("3", FilesPerLevel(1));
2593 
2594     // Compaction range overlaps files
2595     Compact(1, "p", "q", 1);
2596     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2597     ASSERT_EQ("0,1", FilesPerLevel(1));
2598     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2599     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2600     ASSERT_EQ(0, GetSstFileCount(dbname_));
2601 
2602     // Populate a different range
2603     for (int i = 0; i < 3; ++i) {
2604       ASSERT_OK(Put(1, "c", "begin"));
2605       ASSERT_OK(Put(1, "e", "end"));
2606       ASSERT_OK(Flush(1));
2607     }
2608     ASSERT_EQ("3,1", FilesPerLevel(1));
2609 
2610     // Compact just the new range
2611     Compact(1, "b", "f", 1);
2612     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2613     ASSERT_EQ("0,2", FilesPerLevel(1));
2614     ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2615     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2616     ASSERT_EQ(0, GetSstFileCount(dbname_));
2617 
2618     // Compact all
2619     ASSERT_OK(Put(1, "a", "begin"));
2620     ASSERT_OK(Put(1, "z", "end"));
2621     ASSERT_OK(Flush(1));
2622     ASSERT_EQ("1,2", FilesPerLevel(1));
2623     ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2624     ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
2625     CompactRangeOptions compact_options;
2626     compact_options.target_path_id = 1;
2627     compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2628     db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2629     ASSERT_OK(dbfull()->TEST_WaitForCompact());
2630 
2631     ASSERT_EQ("0,1", FilesPerLevel(1));
2632     ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2633     ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2634     ASSERT_EQ(0, GetSstFileCount(dbname_));
2635 
2636     if (iter == 0) {
2637       DestroyAndReopen(options);
2638       options = CurrentOptions();
2639       options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2640       options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2641       options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2642       options.max_background_flushes = 1;
2643       options.num_levels = 3;
2644       options.create_if_missing = true;
2645       CreateAndReopenWithCF({"pikachu"}, options);
2646     }
2647   }
2648 }
2649 
TEST_F(DBCompactionTest,FilesDeletedAfterCompaction)2650 TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) {
2651   do {
2652     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2653     ASSERT_OK(Put(1, "foo", "v2"));
2654     Compact(1, "a", "z");
2655     const size_t num_files = CountLiveFiles();
2656     for (int i = 0; i < 10; i++) {
2657       ASSERT_OK(Put(1, "foo", "v2"));
2658       Compact(1, "a", "z");
2659     }
2660     ASSERT_EQ(CountLiveFiles(), num_files);
2661   } while (ChangeCompactOptions());
2662 }
2663 
2664 // Check level comapction with compact files
TEST_P(DBCompactionTestWithParam,DISABLED_CompactFilesOnLevelCompaction)2665 TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
2666   const int kTestKeySize = 16;
2667   const int kTestValueSize = 984;
2668   const int kEntrySize = kTestKeySize + kTestValueSize;
2669   const int kEntriesPerBuffer = 100;
2670   Options options;
2671   options.create_if_missing = true;
2672   options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
2673   options.compaction_style = kCompactionStyleLevel;
2674   options.target_file_size_base = options.write_buffer_size;
2675   options.max_bytes_for_level_base = options.target_file_size_base * 2;
2676   options.level0_stop_writes_trigger = 2;
2677   options.max_bytes_for_level_multiplier = 2;
2678   options.compression = kNoCompression;
2679   options.max_subcompactions = max_subcompactions_;
2680   options = CurrentOptions(options);
2681   CreateAndReopenWithCF({"pikachu"}, options);
2682 
2683   Random rnd(301);
2684   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2685     ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
2686   }
2687   dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
2688   dbfull()->TEST_WaitForCompact();
2689 
2690   ColumnFamilyMetaData cf_meta;
2691   dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2692   int output_level = static_cast<int>(cf_meta.levels.size()) - 1;
2693   for (int file_picked = 5; file_picked > 0; --file_picked) {
2694     std::set<std::string> overlapping_file_names;
2695     std::vector<std::string> compaction_input_file_names;
2696     for (int f = 0; f < file_picked; ++f) {
2697       int level = 0;
2698       auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
2699       compaction_input_file_names.push_back(file_meta->name);
2700       GetOverlappingFileNumbersForLevelCompaction(
2701           cf_meta, options.comparator, level, output_level,
2702           file_meta, &overlapping_file_names);
2703     }
2704 
2705     ASSERT_OK(dbfull()->CompactFiles(
2706         CompactionOptions(), handles_[1],
2707         compaction_input_file_names,
2708         output_level));
2709 
2710     // Make sure all overlapping files do not exist after compaction
2711     dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2712     VerifyCompactionResult(cf_meta, overlapping_file_names);
2713   }
2714 
2715   // make sure all key-values are still there.
2716   for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2717     ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
2718   }
2719 }
2720 
TEST_P(DBCompactionTestWithParam,PartialCompactionFailure)2721 TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
2722   Options options;
2723   const int kKeySize = 16;
2724   const int kKvSize = 1000;
2725   const int kKeysPerBuffer = 100;
2726   const int kNumL1Files = 5;
2727   options.create_if_missing = true;
2728   options.write_buffer_size = kKeysPerBuffer * kKvSize;
2729   options.max_write_buffer_number = 2;
2730   options.target_file_size_base =
2731       options.write_buffer_size *
2732       (options.max_write_buffer_number - 1);
2733   options.level0_file_num_compaction_trigger = kNumL1Files;
2734   options.max_bytes_for_level_base =
2735       options.level0_file_num_compaction_trigger *
2736       options.target_file_size_base;
2737   options.max_bytes_for_level_multiplier = 2;
2738   options.compression = kNoCompression;
2739   options.max_subcompactions = max_subcompactions_;
2740 
2741   env_->SetBackgroundThreads(1, Env::HIGH);
2742   env_->SetBackgroundThreads(1, Env::LOW);
2743   // stop the compaction thread until we simulate the file creation failure.
2744   test::SleepingBackgroundTask sleeping_task_low;
2745   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
2746                  Env::Priority::LOW);
2747 
2748   options.env = env_;
2749 
2750   DestroyAndReopen(options);
2751 
2752   const int kNumInsertedKeys =
2753       options.level0_file_num_compaction_trigger *
2754       (options.max_write_buffer_number - 1) *
2755       kKeysPerBuffer;
2756 
2757   Random rnd(301);
2758   std::vector<std::string> keys;
2759   std::vector<std::string> values;
2760   for (int k = 0; k < kNumInsertedKeys; ++k) {
2761     keys.emplace_back(RandomString(&rnd, kKeySize));
2762     values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
2763     ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
2764     dbfull()->TEST_WaitForFlushMemTable();
2765   }
2766 
2767   dbfull()->TEST_FlushMemTable(true);
2768   // Make sure the number of L0 files can trigger compaction.
2769   ASSERT_GE(NumTableFilesAtLevel(0),
2770             options.level0_file_num_compaction_trigger);
2771 
2772   auto previous_num_level0_files = NumTableFilesAtLevel(0);
2773 
2774   // Fail the first file creation.
2775   env_->non_writable_count_ = 1;
2776   sleeping_task_low.WakeUp();
2777   sleeping_task_low.WaitUntilDone();
2778 
2779   // Expect compaction to fail here as one file will fail its
2780   // creation.
2781   ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok());
2782 
2783   // Verify L0 -> L1 compaction does fail.
2784   ASSERT_EQ(NumTableFilesAtLevel(1), 0);
2785 
2786   // Verify all L0 files are still there.
2787   ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
2788 
2789   // All key-values must exist after compaction fails.
2790   for (int k = 0; k < kNumInsertedKeys; ++k) {
2791     ASSERT_EQ(values[k], Get(keys[k]));
2792   }
2793 
2794   env_->non_writable_count_ = 0;
2795 
2796   // Make sure RocksDB will not get into corrupted state.
2797   Reopen(options);
2798 
2799   // Verify again after reopen.
2800   for (int k = 0; k < kNumInsertedKeys; ++k) {
2801     ASSERT_EQ(values[k], Get(keys[k]));
2802   }
2803 }
2804 
TEST_P(DBCompactionTestWithParam,DeleteMovedFileAfterCompaction)2805 TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2806   // iter 1 -- delete_obsolete_files_period_micros == 0
2807   for (int iter = 0; iter < 2; ++iter) {
2808     // This test triggers move compaction and verifies that the file is not
2809     // deleted when it's part of move compaction
2810     Options options = CurrentOptions();
2811     options.env = env_;
2812     if (iter == 1) {
2813       options.delete_obsolete_files_period_micros = 0;
2814     }
2815     options.create_if_missing = true;
2816     options.level0_file_num_compaction_trigger =
2817         2;  // trigger compaction when we have 2 files
2818     OnFileDeletionListener* listener = new OnFileDeletionListener();
2819     options.listeners.emplace_back(listener);
2820     options.max_subcompactions = max_subcompactions_;
2821     DestroyAndReopen(options);
2822 
2823     Random rnd(301);
2824     // Create two 1MB sst files
2825     for (int i = 0; i < 2; ++i) {
2826       // Create 1MB sst file
2827       for (int j = 0; j < 100; ++j) {
2828         ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
2829       }
2830       ASSERT_OK(Flush());
2831     }
2832     // this should execute L0->L1
2833     dbfull()->TEST_WaitForCompact();
2834     ASSERT_EQ("0,1", FilesPerLevel(0));
2835 
2836     // block compactions
2837     test::SleepingBackgroundTask sleeping_task;
2838     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2839                    Env::Priority::LOW);
2840 
2841     options.max_bytes_for_level_base = 1024 * 1024;  // 1 MB
2842     Reopen(options);
2843     std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
2844     ASSERT_EQ("0,1", FilesPerLevel(0));
2845     // let compactions go
2846     sleeping_task.WakeUp();
2847     sleeping_task.WaitUntilDone();
2848 
2849     // this should execute L1->L2 (move)
2850     dbfull()->TEST_WaitForCompact();
2851 
2852     ASSERT_EQ("0,0,1", FilesPerLevel(0));
2853 
2854     std::vector<LiveFileMetaData> metadata;
2855     db_->GetLiveFilesMetaData(&metadata);
2856     ASSERT_EQ(metadata.size(), 1U);
2857     auto moved_file_name = metadata[0].name;
2858 
2859     // Create two more 1MB sst files
2860     for (int i = 0; i < 2; ++i) {
2861       // Create 1MB sst file
2862       for (int j = 0; j < 100; ++j) {
2863         ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
2864       }
2865       ASSERT_OK(Flush());
2866     }
2867     // this should execute both L0->L1 and L1->L2 (merge with previous file)
2868     dbfull()->TEST_WaitForCompact();
2869 
2870     ASSERT_EQ("0,0,2", FilesPerLevel(0));
2871 
2872     // iterator is holding the file
2873     ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
2874 
2875     listener->SetExpectedFileName(dbname_ + moved_file_name);
2876     iterator.reset();
2877 
2878     // this file should have been compacted away
2879     ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name));
2880     listener->VerifyMatchedCount(1);
2881   }
2882 }
2883 
TEST_P(DBCompactionTestWithParam,CompressLevelCompaction)2884 TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
2885   if (!Zlib_Supported()) {
2886     return;
2887   }
2888   Options options = CurrentOptions();
2889   options.memtable_factory.reset(
2890       new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2891   options.compaction_style = kCompactionStyleLevel;
2892   options.write_buffer_size = 110 << 10;  // 110KB
2893   options.arena_block_size = 4 << 10;
2894   options.level0_file_num_compaction_trigger = 2;
2895   options.num_levels = 4;
2896   options.max_bytes_for_level_base = 400 * 1024;
2897   options.max_subcompactions = max_subcompactions_;
2898   // First two levels have no compression, so that a trivial move between
2899   // them will be allowed. Level 2 has Zlib compression so that a trivial
2900   // move to level 3 will not be allowed
2901   options.compression_per_level = {kNoCompression, kNoCompression,
2902                                    kZlibCompression};
2903   int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
2904 
2905   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2906       "Compaction::InputCompressionMatchesOutput:Matches",
2907       [&](void* /*arg*/) { matches++; });
2908   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2909       "Compaction::InputCompressionMatchesOutput:DidntMatch",
2910       [&](void* /*arg*/) { didnt_match++; });
2911   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2912       "DBImpl::BackgroundCompaction:NonTrivial",
2913       [&](void* /*arg*/) { non_trivial++; });
2914   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2915       "DBImpl::BackgroundCompaction:TrivialMove",
2916       [&](void* /*arg*/) { trivial_move++; });
2917   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2918 
2919   Reopen(options);
2920 
2921   Random rnd(301);
2922   int key_idx = 0;
2923 
2924   // First three 110KB files are going to level 0
2925   // After that, (100K, 200K)
2926   for (int num = 0; num < 3; num++) {
2927     GenerateNewFile(&rnd, &key_idx);
2928   }
2929 
2930   // Another 110KB triggers a compaction to 400K file to fill up level 0
2931   GenerateNewFile(&rnd, &key_idx);
2932   ASSERT_EQ(4, GetSstFileCount(dbname_));
2933 
2934   // (1, 4)
2935   GenerateNewFile(&rnd, &key_idx);
2936   ASSERT_EQ("1,4", FilesPerLevel(0));
2937 
2938   // (1, 4, 1)
2939   GenerateNewFile(&rnd, &key_idx);
2940   ASSERT_EQ("1,4,1", FilesPerLevel(0));
2941 
2942   // (1, 4, 2)
2943   GenerateNewFile(&rnd, &key_idx);
2944   ASSERT_EQ("1,4,2", FilesPerLevel(0));
2945 
2946   // (1, 4, 3)
2947   GenerateNewFile(&rnd, &key_idx);
2948   ASSERT_EQ("1,4,3", FilesPerLevel(0));
2949 
2950   // (1, 4, 4)
2951   GenerateNewFile(&rnd, &key_idx);
2952   ASSERT_EQ("1,4,4", FilesPerLevel(0));
2953 
2954   // (1, 4, 5)
2955   GenerateNewFile(&rnd, &key_idx);
2956   ASSERT_EQ("1,4,5", FilesPerLevel(0));
2957 
2958   // (1, 4, 6)
2959   GenerateNewFile(&rnd, &key_idx);
2960   ASSERT_EQ("1,4,6", FilesPerLevel(0));
2961 
2962   // (1, 4, 7)
2963   GenerateNewFile(&rnd, &key_idx);
2964   ASSERT_EQ("1,4,7", FilesPerLevel(0));
2965 
2966   // (1, 4, 8)
2967   GenerateNewFile(&rnd, &key_idx);
2968   ASSERT_EQ("1,4,8", FilesPerLevel(0));
2969 
2970   ASSERT_EQ(matches, 12);
2971   // Currently, the test relies on the number of calls to
2972   // InputCompressionMatchesOutput() per compaction.
2973   const int kCallsToInputCompressionMatch = 2;
2974   ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
2975   ASSERT_EQ(trivial_move, 12);
2976   ASSERT_EQ(non_trivial, 8);
2977 
2978   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2979 
2980   for (int i = 0; i < key_idx; i++) {
2981     auto v = Get(Key(i));
2982     ASSERT_NE(v, "NOT_FOUND");
2983     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2984   }
2985 
2986   Reopen(options);
2987 
2988   for (int i = 0; i < key_idx; i++) {
2989     auto v = Get(Key(i));
2990     ASSERT_NE(v, "NOT_FOUND");
2991     ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2992   }
2993 
2994   Destroy(options);
2995 }
2996 
TEST_F(DBCompactionTest,SanitizeCompactionOptionsTest)2997 TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
2998   Options options = CurrentOptions();
2999   options.max_background_compactions = 5;
3000   options.soft_pending_compaction_bytes_limit = 0;
3001   options.hard_pending_compaction_bytes_limit = 100;
3002   options.create_if_missing = true;
3003   DestroyAndReopen(options);
3004   ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
3005 
3006   options.max_background_compactions = 3;
3007   options.soft_pending_compaction_bytes_limit = 200;
3008   options.hard_pending_compaction_bytes_limit = 150;
3009   DestroyAndReopen(options);
3010   ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
3011 }
3012 
3013 // This tests for a bug that could cause two level0 compactions running
3014 // concurrently
3015 // TODO(aekmekji): Make sure that the reason this fails when run with
3016 // max_subcompactions > 1 is not a correctness issue but just inherent to
3017 // running parallel L0-L1 compactions
TEST_F(DBCompactionTest,SuggestCompactRangeNoTwoLevel0Compactions)3018 TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
3019   Options options = CurrentOptions();
3020   options.compaction_style = kCompactionStyleLevel;
3021   options.write_buffer_size = 110 << 10;
3022   options.arena_block_size = 4 << 10;
3023   options.level0_file_num_compaction_trigger = 4;
3024   options.num_levels = 4;
3025   options.compression = kNoCompression;
3026   options.max_bytes_for_level_base = 450 << 10;
3027   options.target_file_size_base = 98 << 10;
3028   options.max_write_buffer_number = 2;
3029   options.max_background_compactions = 2;
3030 
3031   DestroyAndReopen(options);
3032 
3033   // fill up the DB
3034   Random rnd(301);
3035   for (int num = 0; num < 10; num++) {
3036     GenerateNewRandomFile(&rnd);
3037   }
3038   db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3039 
3040   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3041       {{"CompactionJob::Run():Start",
3042         "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
3043        {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
3044         "CompactionJob::Run():End"}});
3045 
3046   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3047 
3048   // trigger L0 compaction
3049   for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
3050        num++) {
3051     GenerateNewRandomFile(&rnd, /* nowait */ true);
3052     ASSERT_OK(Flush());
3053   }
3054 
3055   TEST_SYNC_POINT(
3056       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
3057 
3058   GenerateNewRandomFile(&rnd, /* nowait */ true);
3059   dbfull()->TEST_WaitForFlushMemTable();
3060   ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
3061   for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
3062        num++) {
3063     GenerateNewRandomFile(&rnd, /* nowait */ true);
3064     ASSERT_OK(Flush());
3065   }
3066 
3067   TEST_SYNC_POINT(
3068       "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
3069   dbfull()->TEST_WaitForCompact();
3070 }
3071 
ShortKey(int i)3072 static std::string ShortKey(int i) {
3073   assert(i < 10000);
3074   char buf[100];
3075   snprintf(buf, sizeof(buf), "key%04d", i);
3076   return std::string(buf);
3077 }
3078 
TEST_P(DBCompactionTestWithParam,ForceBottommostLevelCompaction)3079 TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
3080   int32_t trivial_move = 0;
3081   int32_t non_trivial_move = 0;
3082   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3083       "DBImpl::BackgroundCompaction:TrivialMove",
3084       [&](void* /*arg*/) { trivial_move++; });
3085   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3086       "DBImpl::BackgroundCompaction:NonTrivial",
3087       [&](void* /*arg*/) { non_trivial_move++; });
3088   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3089 
3090   // The key size is guaranteed to be <= 8
3091   class ShortKeyComparator : public Comparator {
3092     int Compare(const ROCKSDB_NAMESPACE::Slice& a,
3093                 const ROCKSDB_NAMESPACE::Slice& b) const override {
3094       assert(a.size() <= 8);
3095       assert(b.size() <= 8);
3096       return BytewiseComparator()->Compare(a, b);
3097     }
3098     const char* Name() const override { return "ShortKeyComparator"; }
3099     void FindShortestSeparator(
3100         std::string* start,
3101         const ROCKSDB_NAMESPACE::Slice& limit) const override {
3102       return BytewiseComparator()->FindShortestSeparator(start, limit);
3103     }
3104     void FindShortSuccessor(std::string* key) const override {
3105       return BytewiseComparator()->FindShortSuccessor(key);
3106     }
3107   } short_key_cmp;
3108   Options options = CurrentOptions();
3109   options.target_file_size_base = 100000000;
3110   options.write_buffer_size = 100000000;
3111   options.max_subcompactions = max_subcompactions_;
3112   options.comparator = &short_key_cmp;
3113   DestroyAndReopen(options);
3114 
3115   int32_t value_size = 10 * 1024;  // 10 KB
3116 
3117   Random rnd(301);
3118   std::vector<std::string> values;
3119   // File with keys [ 0 => 99 ]
3120   for (int i = 0; i < 100; i++) {
3121     values.push_back(RandomString(&rnd, value_size));
3122     ASSERT_OK(Put(ShortKey(i), values[i]));
3123   }
3124   ASSERT_OK(Flush());
3125 
3126   ASSERT_EQ("1", FilesPerLevel(0));
3127   // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
3128   CompactRangeOptions compact_options;
3129   compact_options.change_level = true;
3130   compact_options.target_level = 3;
3131   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3132   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3133   ASSERT_EQ(trivial_move, 1);
3134   ASSERT_EQ(non_trivial_move, 0);
3135 
3136   // File with keys [ 100 => 199 ]
3137   for (int i = 100; i < 200; i++) {
3138     values.push_back(RandomString(&rnd, value_size));
3139     ASSERT_OK(Put(ShortKey(i), values[i]));
3140   }
3141   ASSERT_OK(Flush());
3142 
3143   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3144   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3145   // then compacte the bottommost level L3=>L3 (non trivial move)
3146   compact_options = CompactRangeOptions();
3147   compact_options.bottommost_level_compaction =
3148       BottommostLevelCompaction::kForceOptimized;
3149   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3150   ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3151   ASSERT_EQ(trivial_move, 4);
3152   ASSERT_EQ(non_trivial_move, 1);
3153 
3154   // File with keys [ 200 => 299 ]
3155   for (int i = 200; i < 300; i++) {
3156     values.push_back(RandomString(&rnd, value_size));
3157     ASSERT_OK(Put(ShortKey(i), values[i]));
3158   }
3159   ASSERT_OK(Flush());
3160 
3161   ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3162   trivial_move = 0;
3163   non_trivial_move = 0;
3164   compact_options = CompactRangeOptions();
3165   compact_options.bottommost_level_compaction =
3166       BottommostLevelCompaction::kSkip;
3167   // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3168   // and will skip bottommost level compaction
3169   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3170   ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
3171   ASSERT_EQ(trivial_move, 3);
3172   ASSERT_EQ(non_trivial_move, 0);
3173 
3174   for (int i = 0; i < 300; i++) {
3175     ASSERT_EQ(Get(ShortKey(i)), values[i]);
3176   }
3177 
3178   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3179 }
3180 
TEST_P(DBCompactionTestWithParam,IntraL0Compaction)3181 TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
3182   Options options = CurrentOptions();
3183   options.compression = kNoCompression;
3184   options.level0_file_num_compaction_trigger = 5;
3185   options.max_background_compactions = 2;
3186   options.max_subcompactions = max_subcompactions_;
3187   DestroyAndReopen(options);
3188 
3189   const size_t kValueSize = 1 << 20;
3190   Random rnd(301);
3191   std::string value(RandomString(&rnd, kValueSize));
3192 
3193   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3194       {{"LevelCompactionPicker::PickCompactionBySize:0",
3195         "CompactionJob::Run():Start"}});
3196   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3197 
3198   // index:   0   1   2   3   4   5   6   7   8   9
3199   // size:  1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
3200   // score:                     1.5 1.3 1.5 2.0 inf
3201   //
3202   // Files 0-4 will be included in an L0->L1 compaction.
3203   //
3204   // L0->L0 will be triggered since the sync points guarantee compaction to base
3205   // level is still blocked when files 5-9 trigger another compaction.
3206   //
3207   // Files 6-9 are the longest span of available files for which
3208   // work-per-deleted-file decreases (see "score" row above).
3209   for (int i = 0; i < 10; ++i) {
3210     ASSERT_OK(Put(Key(0), ""));  // prevents trivial move
3211     if (i == 5) {
3212       ASSERT_OK(Put(Key(i + 1), value + value));
3213     } else {
3214       ASSERT_OK(Put(Key(i + 1), value));
3215     }
3216     ASSERT_OK(Flush());
3217   }
3218   dbfull()->TEST_WaitForCompact();
3219   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3220 
3221   std::vector<std::vector<FileMetaData>> level_to_files;
3222   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3223                                   &level_to_files);
3224   ASSERT_GE(level_to_files.size(), 2);  // at least L0 and L1
3225   // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
3226   ASSERT_EQ(2, level_to_files[0].size());
3227   ASSERT_GT(level_to_files[1].size(), 0);
3228   for (int i = 0; i < 2; ++i) {
3229     ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
3230   }
3231 }
3232 
TEST_P(DBCompactionTestWithParam,IntraL0CompactionDoesNotObsoleteDeletions)3233 TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
3234   // regression test for issue #2722: L0->L0 compaction can resurrect deleted
3235   // keys from older L0 files if L1+ files' key-ranges do not include the key.
3236   Options options = CurrentOptions();
3237   options.compression = kNoCompression;
3238   options.level0_file_num_compaction_trigger = 5;
3239   options.max_background_compactions = 2;
3240   options.max_subcompactions = max_subcompactions_;
3241   DestroyAndReopen(options);
3242 
3243   const size_t kValueSize = 1 << 20;
3244   Random rnd(301);
3245   std::string value(RandomString(&rnd, kValueSize));
3246 
3247   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3248       {{"LevelCompactionPicker::PickCompactionBySize:0",
3249         "CompactionJob::Run():Start"}});
3250   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3251 
3252   // index:   0   1   2   3   4    5    6   7   8   9
3253   // size:  1MB 1MB 1MB 1MB 1MB  1MB  1MB 1MB 1MB 1MB
3254   // score:                     1.25 1.33 1.5 2.0 inf
3255   //
3256   // Files 0-4 will be included in an L0->L1 compaction.
3257   //
3258   // L0->L0 will be triggered since the sync points guarantee compaction to base
3259   // level is still blocked when files 5-9 trigger another compaction. All files
3260   // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
3261   //
3262   // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
3263   // L0->L0 preserves the deletion such that the key remains deleted.
3264   for (int i = 0; i < 10; ++i) {
3265     // key 0 serves both to prevent trivial move and as the key we want to
3266     // verify is not resurrected by L0->L0 compaction.
3267     if (i < 5) {
3268       ASSERT_OK(Put(Key(0), ""));
3269     } else {
3270       ASSERT_OK(Delete(Key(0)));
3271     }
3272     ASSERT_OK(Put(Key(i + 1), value));
3273     ASSERT_OK(Flush());
3274   }
3275   dbfull()->TEST_WaitForCompact();
3276   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3277 
3278   std::vector<std::vector<FileMetaData>> level_to_files;
3279   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3280                                   &level_to_files);
3281   ASSERT_GE(level_to_files.size(), 2);  // at least L0 and L1
3282   // L0 has a single output file from L0->L0
3283   ASSERT_EQ(1, level_to_files[0].size());
3284   ASSERT_GT(level_to_files[1].size(), 0);
3285   ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
3286 
3287   ReadOptions roptions;
3288   std::string result;
3289   ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
3290 }
3291 
TEST_P(DBCompactionTestWithParam,FullCompactionInBottomPriThreadPool)3292 TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
3293   const int kNumFilesTrigger = 3;
3294   Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
3295   for (bool use_universal_compaction : {false, true}) {
3296     Options options = CurrentOptions();
3297     if (use_universal_compaction) {
3298       options.compaction_style = kCompactionStyleUniversal;
3299     } else {
3300       options.compaction_style = kCompactionStyleLevel;
3301       options.level_compaction_dynamic_level_bytes = true;
3302     }
3303     options.num_levels = 4;
3304     options.write_buffer_size = 100 << 10;     // 100KB
3305     options.target_file_size_base = 32 << 10;  // 32KB
3306     options.level0_file_num_compaction_trigger = kNumFilesTrigger;
3307     // Trigger compaction if size amplification exceeds 110%
3308     options.compaction_options_universal.max_size_amplification_percent = 110;
3309     DestroyAndReopen(options);
3310 
3311     int num_bottom_pri_compactions = 0;
3312     SyncPoint::GetInstance()->SetCallBack(
3313         "DBImpl::BGWorkBottomCompaction",
3314         [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
3315     SyncPoint::GetInstance()->EnableProcessing();
3316 
3317     Random rnd(301);
3318     for (int num = 0; num < kNumFilesTrigger; num++) {
3319       ASSERT_EQ(NumSortedRuns(), num);
3320       int key_idx = 0;
3321       GenerateNewFile(&rnd, &key_idx);
3322     }
3323     dbfull()->TEST_WaitForCompact();
3324 
3325     ASSERT_EQ(1, num_bottom_pri_compactions);
3326 
3327     // Verify that size amplification did occur
3328     ASSERT_EQ(NumSortedRuns(), 1);
3329     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3330   }
3331   Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
3332 }
3333 
TEST_F(DBCompactionTest,OptimizedDeletionObsoleting)3334 TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
3335   // Deletions can be dropped when compacted to non-last level if they fall
3336   // outside the lower-level files' key-ranges.
3337   const int kNumL0Files = 4;
3338   Options options = CurrentOptions();
3339   options.level0_file_num_compaction_trigger = kNumL0Files;
3340   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3341   DestroyAndReopen(options);
3342 
3343   // put key 1 and 3 in separate L1, L2 files.
3344   // So key 0, 2, and 4+ fall outside these levels' key-ranges.
3345   for (int level = 2; level >= 1; --level) {
3346     for (int i = 0; i < 2; ++i) {
3347       Put(Key(2 * i + 1), "val");
3348       Flush();
3349     }
3350     MoveFilesToLevel(level);
3351     ASSERT_EQ(2, NumTableFilesAtLevel(level));
3352   }
3353 
3354   // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
3355   // - Tombstones for keys 2 and 4 can be dropped early.
3356   // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
3357   for (int i = 0; i < kNumL0Files; ++i) {
3358     Put(Key(0), "val");  // sentinel to prevent trivial move
3359     Delete(Key(i + 1));
3360     Flush();
3361   }
3362   dbfull()->TEST_WaitForCompact();
3363 
3364   for (int i = 0; i < kNumL0Files; ++i) {
3365     std::string value;
3366     ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
3367   }
3368   ASSERT_EQ(2, options.statistics->getTickerCount(
3369                    COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
3370   ASSERT_EQ(2,
3371             options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
3372 }
3373 
TEST_F(DBCompactionTest,CompactFilesPendingL0Bug)3374 TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
3375   // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
3376   // CompactFiles() had a bug where it failed to pick a compaction when an L0
3377   // compaction existed, but marked it as scheduled anyways. It'd never be
3378   // unmarked as scheduled, so future compactions or DB close could hang.
3379   const int kNumL0Files = 5;
3380   Options options = CurrentOptions();
3381   options.level0_file_num_compaction_trigger = kNumL0Files - 1;
3382   options.max_background_compactions = 2;
3383   DestroyAndReopen(options);
3384 
3385   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3386       {{"LevelCompactionPicker::PickCompaction:Return",
3387         "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
3388        {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
3389         "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
3390   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3391 
3392   auto schedule_multi_compaction_token =
3393       dbfull()->TEST_write_controler().GetCompactionPressureToken();
3394 
3395   // Files 0-3 will be included in an L0->L1 compaction.
3396   //
3397   // File 4 will be included in a call to CompactFiles() while the first
3398   // compaction is running.
3399   for (int i = 0; i < kNumL0Files - 1; ++i) {
3400     ASSERT_OK(Put(Key(0), "val"));  // sentinel to prevent trivial move
3401     ASSERT_OK(Put(Key(i + 1), "val"));
3402     ASSERT_OK(Flush());
3403   }
3404   TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
3405   // file 4 flushed after 0-3 picked
3406   ASSERT_OK(Put(Key(kNumL0Files), "val"));
3407   ASSERT_OK(Flush());
3408 
3409   // previously DB close would hang forever as this situation caused scheduled
3410   // compactions count to never decrement to zero.
3411   ColumnFamilyMetaData cf_meta;
3412   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3413   ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
3414   std::vector<std::string> input_filenames;
3415   input_filenames.push_back(cf_meta.levels[0].files.front().name);
3416   ASSERT_OK(dbfull()
3417                   ->CompactFiles(CompactionOptions(), input_filenames,
3418                                  0 /* output_level */));
3419   TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
3420   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3421 }
3422 
TEST_F(DBCompactionTest,CompactFilesOverlapInL0Bug)3423 TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
3424   // Regression test for bug of not pulling in L0 files that overlap the user-
3425   // specified input files in time- and key-ranges.
3426   Put(Key(0), "old_val");
3427   Flush();
3428   Put(Key(0), "new_val");
3429   Flush();
3430 
3431   ColumnFamilyMetaData cf_meta;
3432   dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3433   ASSERT_GE(cf_meta.levels.size(), 2);
3434   ASSERT_EQ(2, cf_meta.levels[0].files.size());
3435 
3436   // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
3437   // overlaps in key-range and time-range.
3438   std::vector<std::string> input_filenames;
3439   input_filenames.push_back(cf_meta.levels[0].files.front().name);
3440   ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
3441                                    1 /* output_level */));
3442   ASSERT_EQ("new_val", Get(Key(0)));
3443 }
3444 
TEST_F(DBCompactionTest,CompactBottomLevelFilesWithDeletions)3445 TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
3446   // bottom-level files may contain deletions due to snapshots protecting the
3447   // deleted keys. Once the snapshot is released, we should see files with many
3448   // such deletions undergo single-file compactions.
3449   const int kNumKeysPerFile = 1024;
3450   const int kNumLevelFiles = 4;
3451   const int kValueSize = 128;
3452   Options options = CurrentOptions();
3453   options.compression = kNoCompression;
3454   options.level0_file_num_compaction_trigger = kNumLevelFiles;
3455   // inflate it a bit to account for key/metadata overhead
3456   options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
3457   CreateAndReopenWithCF({"one"}, options);
3458 
3459   Random rnd(301);
3460   const Snapshot* snapshot = nullptr;
3461   for (int i = 0; i < kNumLevelFiles; ++i) {
3462     for (int j = 0; j < kNumKeysPerFile; ++j) {
3463       ASSERT_OK(
3464           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3465     }
3466     if (i == kNumLevelFiles - 1) {
3467       snapshot = db_->GetSnapshot();
3468       // delete every other key after grabbing a snapshot, so these deletions
3469       // and the keys they cover can't be dropped until after the snapshot is
3470       // released.
3471       for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
3472         ASSERT_OK(Delete(Key(j)));
3473       }
3474     }
3475     Flush();
3476     if (i < kNumLevelFiles - 1) {
3477       ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
3478     }
3479   }
3480   dbfull()->TEST_WaitForCompact();
3481   ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
3482 
3483   std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
3484   db_->GetLiveFilesMetaData(&pre_release_metadata);
3485   // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
3486   // files does not need to be preserved in case of a future snapshot.
3487   ASSERT_OK(Put(Key(0), "val"));
3488   ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3489   // release snapshot and wait for compactions to finish. Single-file
3490   // compactions should be triggered, which reduce the size of each bottom-level
3491   // file without changing file count.
3492   db_->ReleaseSnapshot(snapshot);
3493   ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3494   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3495       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3496         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3497         ASSERT_TRUE(compaction->compaction_reason() ==
3498                     CompactionReason::kBottommostFiles);
3499       });
3500   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3501   dbfull()->TEST_WaitForCompact();
3502   db_->GetLiveFilesMetaData(&post_release_metadata);
3503   ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
3504 
3505   for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
3506     const auto& pre_file = pre_release_metadata[i];
3507     const auto& post_file = post_release_metadata[i];
3508     ASSERT_EQ(1, pre_file.level);
3509     ASSERT_EQ(1, post_file.level);
3510     // each file is smaller than it was before as it was rewritten without
3511     // deletion markers/deleted keys.
3512     ASSERT_LT(post_file.size, pre_file.size);
3513   }
3514   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3515 }
3516 
TEST_F(DBCompactionTest,LevelCompactExpiredTtlFiles)3517 TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
3518   const int kNumKeysPerFile = 32;
3519   const int kNumLevelFiles = 2;
3520   const int kValueSize = 1024;
3521 
3522   Options options = CurrentOptions();
3523   options.compression = kNoCompression;
3524   options.ttl = 24 * 60 * 60;  // 24 hours
3525   options.max_open_files = -1;
3526   env_->time_elapse_only_sleep_ = false;
3527   options.env = env_;
3528 
3529   env_->addon_time_.store(0);
3530   DestroyAndReopen(options);
3531 
3532   Random rnd(301);
3533   for (int i = 0; i < kNumLevelFiles; ++i) {
3534     for (int j = 0; j < kNumKeysPerFile; ++j) {
3535       ASSERT_OK(
3536           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3537     }
3538     Flush();
3539   }
3540   dbfull()->TEST_WaitForCompact();
3541   MoveFilesToLevel(3);
3542   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3543 
3544   // Delete previously written keys.
3545   for (int i = 0; i < kNumLevelFiles; ++i) {
3546     for (int j = 0; j < kNumKeysPerFile; ++j) {
3547       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3548     }
3549     Flush();
3550   }
3551   dbfull()->TEST_WaitForCompact();
3552   ASSERT_EQ("2,0,0,2", FilesPerLevel());
3553   MoveFilesToLevel(1);
3554   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3555 
3556   env_->addon_time_.fetch_add(36 * 60 * 60);  // 36 hours
3557   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3558 
3559   // Just do a simple write + flush so that the Ttl expired files get
3560   // compacted.
3561   ASSERT_OK(Put("a", "1"));
3562   Flush();
3563   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3564       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3565         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3566         ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3567       });
3568   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3569   dbfull()->TEST_WaitForCompact();
3570   // All non-L0 files are deleted, as they contained only deleted data.
3571   ASSERT_EQ("1", FilesPerLevel());
3572   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3573 
3574   // Test dynamically changing ttl.
3575 
3576   env_->addon_time_.store(0);
3577   DestroyAndReopen(options);
3578 
3579   for (int i = 0; i < kNumLevelFiles; ++i) {
3580     for (int j = 0; j < kNumKeysPerFile; ++j) {
3581       ASSERT_OK(
3582           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3583     }
3584     Flush();
3585   }
3586   dbfull()->TEST_WaitForCompact();
3587   MoveFilesToLevel(3);
3588   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3589 
3590   // Delete previously written keys.
3591   for (int i = 0; i < kNumLevelFiles; ++i) {
3592     for (int j = 0; j < kNumKeysPerFile; ++j) {
3593       ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3594     }
3595     Flush();
3596   }
3597   dbfull()->TEST_WaitForCompact();
3598   ASSERT_EQ("2,0,0,2", FilesPerLevel());
3599   MoveFilesToLevel(1);
3600   ASSERT_EQ("0,2,0,2", FilesPerLevel());
3601 
3602   // Move time forward by 12 hours, and make sure that compaction still doesn't
3603   // trigger as ttl is set to 24 hours.
3604   env_->addon_time_.fetch_add(12 * 60 * 60);
3605   ASSERT_OK(Put("a", "1"));
3606   Flush();
3607   dbfull()->TEST_WaitForCompact();
3608   ASSERT_EQ("1,2,0,2", FilesPerLevel());
3609 
3610   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3611       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3612         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3613         ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3614       });
3615   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3616 
3617   // Dynamically change ttl to 10 hours.
3618   // This should trigger a ttl compaction, as 12 hours have already passed.
3619   ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
3620   dbfull()->TEST_WaitForCompact();
3621   // All non-L0 files are deleted, as they contained only deleted data.
3622   ASSERT_EQ("1", FilesPerLevel());
3623   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3624 }
3625 
TEST_F(DBCompactionTest,LevelTtlCascadingCompactions)3626 TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
3627   const int kValueSize = 100;
3628 
3629   for (bool if_restart : {false, true}) {
3630     for (bool if_open_all_files : {false, true}) {
3631       Options options = CurrentOptions();
3632       options.compression = kNoCompression;
3633       options.ttl = 24 * 60 * 60;  // 24 hours
3634       if (if_open_all_files) {
3635         options.max_open_files = -1;
3636       } else {
3637         options.max_open_files = 20;
3638       }
3639       // RocksDB sanitize max open files to at least 20. Modify it back.
3640       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3641           "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3642             int* max_open_files = static_cast<int*>(arg);
3643             *max_open_files = 2;
3644           });
3645       // In the case where all files are opened and doing DB restart
3646       // forcing the oldest ancester time in manifest file to be 0 to
3647       // simulate the case of reading from an old version.
3648       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3649           "VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
3650             if (if_restart && if_open_all_files) {
3651               std::string* encoded_fieled = static_cast<std::string*>(arg);
3652               *encoded_fieled = "";
3653               PutVarint64(encoded_fieled, 0);
3654             }
3655           });
3656 
3657       env_->time_elapse_only_sleep_ = false;
3658       options.env = env_;
3659 
3660       env_->addon_time_.store(0);
3661       DestroyAndReopen(options);
3662 
3663       int ttl_compactions = 0;
3664       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3665           "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3666             Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3667             auto compaction_reason = compaction->compaction_reason();
3668             if (compaction_reason == CompactionReason::kTtl) {
3669               ttl_compactions++;
3670             }
3671           });
3672       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3673 
3674       // Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
3675       Random rnd(301);
3676       for (int i = 1; i <= 100; ++i) {
3677         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3678       }
3679       Flush();
3680       // Get the first file's creation time. This will be the oldest file in the
3681       // DB. Compactions inolving this file's descendents should keep getting
3682       // this time.
3683       std::vector<std::vector<FileMetaData>> level_to_files;
3684       dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3685                                       &level_to_files);
3686       uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time;
3687       // Add 1 hour and do another flush.
3688       env_->addon_time_.fetch_add(1 * 60 * 60);
3689       for (int i = 101; i <= 200; ++i) {
3690         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3691       }
3692       Flush();
3693       MoveFilesToLevel(6);
3694       ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
3695 
3696       env_->addon_time_.fetch_add(1 * 60 * 60);
3697       // Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
3698       for (int i = 1; i <= 50; ++i) {
3699         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3700       }
3701       Flush();
3702       env_->addon_time_.fetch_add(1 * 60 * 60);
3703       for (int i = 51; i <= 150; ++i) {
3704         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3705       }
3706       Flush();
3707       MoveFilesToLevel(4);
3708       ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
3709 
3710       env_->addon_time_.fetch_add(1 * 60 * 60);
3711       // Add one L1 file with key range: [26, 75].
3712       for (int i = 26; i <= 75; ++i) {
3713         ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3714       }
3715       Flush();
3716       dbfull()->TEST_WaitForCompact();
3717       MoveFilesToLevel(1);
3718       ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
3719 
3720       // LSM tree:
3721       // L1:         [26 .. 75]
3722       // L4:     [1 .. 50][51 ..... 150]
3723       // L6:     [1 ........ 100][101 .... 200]
3724       //
3725       // On TTL expiry, TTL compaction should be initiated on L1 file, and the
3726       // compactions should keep going on until the key range hits bottom level.
3727       // In other words: the compaction on this data range "cascasdes" until
3728       // reaching the bottom level.
3729       //
3730       // Order of events on TTL expiry:
3731       // 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
3732       // ttl
3733       //    compaction.
3734       // 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
3735       // 3. The new output file from L4 falls to L5 via 1 trival move initiated
3736       //    by the ttl compaction.
3737       // 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
3738 
3739       // Add 25 hours and do a write
3740       env_->addon_time_.fetch_add(25 * 60 * 60);
3741 
3742       ASSERT_OK(Put(Key(1), "1"));
3743       if (if_restart) {
3744         Reopen(options);
3745       } else {
3746         Flush();
3747       }
3748       dbfull()->TEST_WaitForCompact();
3749       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3750       ASSERT_EQ(5, ttl_compactions);
3751 
3752       dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3753                                       &level_to_files);
3754       ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time);
3755 
3756       env_->addon_time_.fetch_add(25 * 60 * 60);
3757       ASSERT_OK(Put(Key(2), "1"));
3758       if (if_restart) {
3759         Reopen(options);
3760       } else {
3761         Flush();
3762       }
3763       dbfull()->TEST_WaitForCompact();
3764       ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3765       ASSERT_GE(ttl_compactions, 6);
3766 
3767       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3768     }
3769   }
3770 }
3771 
TEST_F(DBCompactionTest,LevelPeriodicCompaction)3772 TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
3773   const int kNumKeysPerFile = 32;
3774   const int kNumLevelFiles = 2;
3775   const int kValueSize = 100;
3776 
3777   for (bool if_restart : {false, true}) {
3778     for (bool if_open_all_files : {false, true}) {
3779       Options options = CurrentOptions();
3780       options.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
3781       if (if_open_all_files) {
3782         options.max_open_files = -1;  // needed for ttl compaction
3783       } else {
3784         options.max_open_files = 20;
3785       }
3786       // RocksDB sanitize max open files to at least 20. Modify it back.
3787       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3788           "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3789             int* max_open_files = static_cast<int*>(arg);
3790             *max_open_files = 0;
3791           });
3792       // In the case where all files are opened and doing DB restart
3793       // forcing the file creation time in manifest file to be 0 to
3794       // simulate the case of reading from an old version.
3795       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3796           "VersionEdit::EncodeTo:VarintFileCreationTime", [&](void* arg) {
3797             if (if_restart && if_open_all_files) {
3798               std::string* encoded_fieled = static_cast<std::string*>(arg);
3799               *encoded_fieled = "";
3800               PutVarint64(encoded_fieled, 0);
3801             }
3802           });
3803 
3804       env_->time_elapse_only_sleep_ = false;
3805       options.env = env_;
3806 
3807       env_->addon_time_.store(0);
3808       DestroyAndReopen(options);
3809 
3810       int periodic_compactions = 0;
3811       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3812           "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3813             Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3814             auto compaction_reason = compaction->compaction_reason();
3815             if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3816               periodic_compactions++;
3817             }
3818           });
3819       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3820 
3821       Random rnd(301);
3822       for (int i = 0; i < kNumLevelFiles; ++i) {
3823         for (int j = 0; j < kNumKeysPerFile; ++j) {
3824           ASSERT_OK(Put(Key(i * kNumKeysPerFile + j),
3825                         RandomString(&rnd, kValueSize)));
3826         }
3827         Flush();
3828       }
3829       dbfull()->TEST_WaitForCompact();
3830 
3831       ASSERT_EQ("2", FilesPerLevel());
3832       ASSERT_EQ(0, periodic_compactions);
3833 
3834       // Add 50 hours and do a write
3835       env_->addon_time_.fetch_add(50 * 60 * 60);
3836       ASSERT_OK(Put("a", "1"));
3837       Flush();
3838       dbfull()->TEST_WaitForCompact();
3839       // Assert that the files stay in the same level
3840       ASSERT_EQ("3", FilesPerLevel());
3841       // The two old files go through the periodic compaction process
3842       ASSERT_EQ(2, periodic_compactions);
3843 
3844       MoveFilesToLevel(1);
3845       ASSERT_EQ("0,3", FilesPerLevel());
3846 
3847       // Add another 50 hours and do another write
3848       env_->addon_time_.fetch_add(50 * 60 * 60);
3849       ASSERT_OK(Put("b", "2"));
3850       if (if_restart) {
3851         Reopen(options);
3852       } else {
3853         Flush();
3854       }
3855       dbfull()->TEST_WaitForCompact();
3856       ASSERT_EQ("1,3", FilesPerLevel());
3857       // The three old files now go through the periodic compaction process. 2
3858       // + 3.
3859       ASSERT_EQ(5, periodic_compactions);
3860 
3861       // Add another 50 hours and do another write
3862       env_->addon_time_.fetch_add(50 * 60 * 60);
3863       ASSERT_OK(Put("c", "3"));
3864       Flush();
3865       dbfull()->TEST_WaitForCompact();
3866       ASSERT_EQ("2,3", FilesPerLevel());
3867       // The four old files now go through the periodic compaction process. 5
3868       // + 4.
3869       ASSERT_EQ(9, periodic_compactions);
3870 
3871       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3872     }
3873   }
3874 }
3875 
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithOldDB)3876 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
3877   // This test makes sure that periodic compactions are working with a DB
3878   // where file_creation_time of some files is 0.
3879   // After compactions the new files are created with a valid file_creation_time
3880 
3881   const int kNumKeysPerFile = 32;
3882   const int kNumFiles = 4;
3883   const int kValueSize = 100;
3884 
3885   Options options = CurrentOptions();
3886   env_->time_elapse_only_sleep_ = false;
3887   options.env = env_;
3888 
3889   env_->addon_time_.store(0);
3890   DestroyAndReopen(options);
3891 
3892   int periodic_compactions = 0;
3893   bool set_file_creation_time_to_zero = true;
3894   bool set_creation_time_to_zero = true;
3895   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3896       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3897         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3898         auto compaction_reason = compaction->compaction_reason();
3899         if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3900           periodic_compactions++;
3901         }
3902       });
3903   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3904       "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
3905         TableProperties* props = reinterpret_cast<TableProperties*>(arg);
3906         if (set_file_creation_time_to_zero) {
3907           props->file_creation_time = 0;
3908         }
3909         if (set_creation_time_to_zero) {
3910           props->creation_time = 0;
3911         }
3912       });
3913   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3914 
3915   Random rnd(301);
3916   for (int i = 0; i < kNumFiles; ++i) {
3917     for (int j = 0; j < kNumKeysPerFile; ++j) {
3918       ASSERT_OK(
3919           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3920     }
3921     Flush();
3922     // Move the first two files to L2.
3923     if (i == 1) {
3924       MoveFilesToLevel(2);
3925       set_creation_time_to_zero = false;
3926     }
3927   }
3928   ASSERT_OK(dbfull()->TEST_WaitForCompact());
3929 
3930   ASSERT_EQ("2,0,2", FilesPerLevel());
3931   ASSERT_EQ(0, periodic_compactions);
3932 
3933   Close();
3934 
3935   set_file_creation_time_to_zero = false;
3936   // Forward the clock by 2 days.
3937   env_->addon_time_.fetch_add(2 * 24 * 60 * 60);
3938   options.periodic_compaction_seconds = 1 * 24 * 60 * 60;  // 1 day
3939 
3940   Reopen(options);
3941   ASSERT_OK(dbfull()->TEST_WaitForCompact());
3942   ASSERT_EQ("2,0,2", FilesPerLevel());
3943   // Make sure that all files go through periodic compaction.
3944   ASSERT_EQ(kNumFiles, periodic_compactions);
3945 
3946   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3947 }
3948 
TEST_F(DBCompactionTest,LevelPeriodicAndTtlCompaction)3949 TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
3950   const int kNumKeysPerFile = 32;
3951   const int kNumLevelFiles = 2;
3952   const int kValueSize = 100;
3953 
3954   Options options = CurrentOptions();
3955   options.ttl = 10 * 60 * 60;  // 10 hours
3956   options.periodic_compaction_seconds = 48 * 60 * 60;  // 2 days
3957   options.max_open_files = -1;   // needed for both periodic and ttl compactions
3958   env_->time_elapse_only_sleep_ = false;
3959   options.env = env_;
3960 
3961   env_->addon_time_.store(0);
3962   DestroyAndReopen(options);
3963 
3964   int periodic_compactions = 0;
3965   int ttl_compactions = 0;
3966   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3967       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3968         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3969         auto compaction_reason = compaction->compaction_reason();
3970         if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3971           periodic_compactions++;
3972         } else if (compaction_reason == CompactionReason::kTtl) {
3973           ttl_compactions++;
3974         }
3975       });
3976   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3977 
3978   Random rnd(301);
3979   for (int i = 0; i < kNumLevelFiles; ++i) {
3980     for (int j = 0; j < kNumKeysPerFile; ++j) {
3981       ASSERT_OK(
3982           Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3983     }
3984     Flush();
3985   }
3986   dbfull()->TEST_WaitForCompact();
3987 
3988   MoveFilesToLevel(3);
3989 
3990   ASSERT_EQ("0,0,0,2", FilesPerLevel());
3991   ASSERT_EQ(0, periodic_compactions);
3992   ASSERT_EQ(0, ttl_compactions);
3993 
3994   // Add some time greater than periodic_compaction_time.
3995   env_->addon_time_.fetch_add(50 * 60 * 60);
3996   ASSERT_OK(Put("a", "1"));
3997   Flush();
3998   dbfull()->TEST_WaitForCompact();
3999   // Files in the bottom level go through periodic compactions.
4000   ASSERT_EQ("1,0,0,2", FilesPerLevel());
4001   ASSERT_EQ(2, periodic_compactions);
4002   ASSERT_EQ(0, ttl_compactions);
4003 
4004   // Add a little more time than ttl
4005   env_->addon_time_.fetch_add(11 * 60 * 60);
4006   ASSERT_OK(Put("b", "1"));
4007   Flush();
4008   dbfull()->TEST_WaitForCompact();
4009   // Notice that the previous file in level 1 falls down to the bottom level
4010   // due to ttl compactions, one level at a time.
4011   // And bottom level files don't get picked up for ttl compactions.
4012   ASSERT_EQ("1,0,0,3", FilesPerLevel());
4013   ASSERT_EQ(2, periodic_compactions);
4014   ASSERT_EQ(3, ttl_compactions);
4015 
4016   // Add some time greater than periodic_compaction_time.
4017   env_->addon_time_.fetch_add(50 * 60 * 60);
4018   ASSERT_OK(Put("c", "1"));
4019   Flush();
4020   dbfull()->TEST_WaitForCompact();
4021   // Previous L0 file falls one level at a time to bottom level due to ttl.
4022   // And all 4 bottom files go through periodic compactions.
4023   ASSERT_EQ("1,0,0,4", FilesPerLevel());
4024   ASSERT_EQ(6, periodic_compactions);
4025   ASSERT_EQ(6, ttl_compactions);
4026 
4027   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4028 }
4029 
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithCompactionFilters)4030 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
4031   class TestCompactionFilter : public CompactionFilter {
4032     const char* Name() const override { return "TestCompactionFilter"; }
4033   };
4034   class TestCompactionFilterFactory : public CompactionFilterFactory {
4035     const char* Name() const override { return "TestCompactionFilterFactory"; }
4036     std::unique_ptr<CompactionFilter> CreateCompactionFilter(
4037         const CompactionFilter::Context& /*context*/) override {
4038       return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
4039     }
4040   };
4041 
4042   const int kNumKeysPerFile = 32;
4043   const int kNumLevelFiles = 2;
4044   const int kValueSize = 100;
4045 
4046   Random rnd(301);
4047 
4048   Options options = CurrentOptions();
4049   TestCompactionFilter test_compaction_filter;
4050   env_->time_elapse_only_sleep_ = false;
4051   options.env = env_;
4052   env_->addon_time_.store(0);
4053 
4054   enum CompactionFilterType {
4055     kUseCompactionFilter,
4056     kUseCompactionFilterFactory
4057   };
4058 
4059   for (CompactionFilterType comp_filter_type :
4060        {kUseCompactionFilter, kUseCompactionFilterFactory}) {
4061     // Assert that periodic compactions are not enabled.
4062     ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
4063 
4064     if (comp_filter_type == kUseCompactionFilter) {
4065       options.compaction_filter = &test_compaction_filter;
4066       options.compaction_filter_factory.reset();
4067     } else if (comp_filter_type == kUseCompactionFilterFactory) {
4068       options.compaction_filter = nullptr;
4069       options.compaction_filter_factory.reset(
4070           new TestCompactionFilterFactory());
4071     }
4072     DestroyAndReopen(options);
4073 
4074     // periodic_compaction_seconds should be set to the sanitized value when
4075     // a compaction filter or a compaction filter factory is used.
4076     ASSERT_EQ(30 * 24 * 60 * 60,
4077               dbfull()->GetOptions().periodic_compaction_seconds);
4078 
4079     int periodic_compactions = 0;
4080     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4081         "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4082           Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4083           auto compaction_reason = compaction->compaction_reason();
4084           if (compaction_reason == CompactionReason::kPeriodicCompaction) {
4085             periodic_compactions++;
4086           }
4087         });
4088     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4089 
4090     for (int i = 0; i < kNumLevelFiles; ++i) {
4091       for (int j = 0; j < kNumKeysPerFile; ++j) {
4092         ASSERT_OK(
4093             Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
4094       }
4095       Flush();
4096     }
4097     dbfull()->TEST_WaitForCompact();
4098 
4099     ASSERT_EQ("2", FilesPerLevel());
4100     ASSERT_EQ(0, periodic_compactions);
4101 
4102     // Add 31 days and do a write
4103     env_->addon_time_.fetch_add(31 * 24 * 60 * 60);
4104     ASSERT_OK(Put("a", "1"));
4105     Flush();
4106     dbfull()->TEST_WaitForCompact();
4107     // Assert that the files stay in the same level
4108     ASSERT_EQ("3", FilesPerLevel());
4109     // The two old files go through the periodic compaction process
4110     ASSERT_EQ(2, periodic_compactions);
4111 
4112     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4113   }
4114 }
4115 
TEST_F(DBCompactionTest,CompactRangeDelayedByL0FileCount)4116 TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
4117   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4118   // compaction only triggers flush after it's sure stall won't be triggered for
4119   // L0 file count going too high.
4120   const int kNumL0FilesTrigger = 4;
4121   const int kNumL0FilesLimit = 8;
4122   // i == 0: verifies normal case where stall is avoided by delay
4123   // i == 1: verifies no delay in edge case where stall trigger is same as
4124   //         compaction trigger, so stall can't be avoided
4125   for (int i = 0; i < 2; ++i) {
4126     Options options = CurrentOptions();
4127     options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4128     if (i == 0) {
4129       options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4130     } else {
4131       options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
4132     }
4133     Reopen(options);
4134 
4135     if (i == 0) {
4136       // ensure the auto compaction doesn't finish until manual compaction has
4137       // had a chance to be delayed.
4138       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4139           {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4140             "CompactionJob::Run():End"}});
4141     } else {
4142       // ensure the auto-compaction doesn't finish until manual compaction has
4143       // continued without delay.
4144       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4145           {{"DBImpl::FlushMemTable:StallWaitDone",
4146             "CompactionJob::Run():End"}});
4147     }
4148     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4149 
4150     Random rnd(301);
4151     for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4152       for (int k = 0; k < 2; ++k) {
4153         ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
4154       }
4155       Flush();
4156     }
4157     auto manual_compaction_thread = port::Thread([this]() {
4158       CompactRangeOptions cro;
4159       cro.allow_write_stall = false;
4160       db_->CompactRange(cro, nullptr, nullptr);
4161     });
4162 
4163     manual_compaction_thread.join();
4164     dbfull()->TEST_WaitForCompact();
4165     ASSERT_EQ(0, NumTableFilesAtLevel(0));
4166     ASSERT_GT(NumTableFilesAtLevel(1), 0);
4167     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4168   }
4169 }
4170 
TEST_F(DBCompactionTest,CompactRangeDelayedByImmMemTableCount)4171 TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
4172   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4173   // compaction only triggers flush after it's sure stall won't be triggered for
4174   // immutable memtable count going too high.
4175   const int kNumImmMemTableLimit = 8;
4176   // i == 0: verifies normal case where stall is avoided by delay
4177   // i == 1: verifies no delay in edge case where stall trigger is same as flush
4178   //         trigger, so stall can't be avoided
4179   for (int i = 0; i < 2; ++i) {
4180     Options options = CurrentOptions();
4181     options.disable_auto_compactions = true;
4182     // the delay limit is one less than the stop limit. This test focuses on
4183     // avoiding delay limit, but this option sets stop limit, so add one.
4184     options.max_write_buffer_number = kNumImmMemTableLimit + 1;
4185     if (i == 1) {
4186       options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
4187     }
4188     Reopen(options);
4189 
4190     if (i == 0) {
4191       // ensure the flush doesn't finish until manual compaction has had a
4192       // chance to be delayed.
4193       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4194           {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4195             "FlushJob::WriteLevel0Table"}});
4196     } else {
4197       // ensure the flush doesn't finish until manual compaction has continued
4198       // without delay.
4199       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4200           {{"DBImpl::FlushMemTable:StallWaitDone",
4201             "FlushJob::WriteLevel0Table"}});
4202     }
4203     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4204 
4205     Random rnd(301);
4206     for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
4207       ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
4208       FlushOptions flush_opts;
4209       flush_opts.wait = false;
4210       flush_opts.allow_write_stall = true;
4211       dbfull()->Flush(flush_opts);
4212     }
4213 
4214     auto manual_compaction_thread = port::Thread([this]() {
4215       CompactRangeOptions cro;
4216       cro.allow_write_stall = false;
4217       db_->CompactRange(cro, nullptr, nullptr);
4218     });
4219 
4220     manual_compaction_thread.join();
4221     dbfull()->TEST_WaitForFlushMemTable();
4222     ASSERT_EQ(0, NumTableFilesAtLevel(0));
4223     ASSERT_GT(NumTableFilesAtLevel(1), 0);
4224     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4225   }
4226 }
4227 
TEST_F(DBCompactionTest,CompactRangeShutdownWhileDelayed)4228 TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
4229   // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
4230   // does not hang if CF is dropped or DB is closed
4231   const int kNumL0FilesTrigger = 4;
4232   const int kNumL0FilesLimit = 8;
4233   Options options = CurrentOptions();
4234   options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4235   options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4236   // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
4237   // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
4238   //         simulate what happens during Close as we can't call Close (it
4239   //         blocks on the auto-compaction, making a cycle).
4240   for (int i = 0; i < 2; ++i) {
4241     CreateAndReopenWithCF({"one"}, options);
4242     // The calls to close CF/DB wait until the manual compaction stalls.
4243     // The auto-compaction waits until the manual compaction finishes to ensure
4244     // the signal comes from closing CF/DB, not from compaction making progress.
4245     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4246         {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4247           "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
4248          {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
4249           "CompactionJob::Run():End"}});
4250     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4251 
4252     Random rnd(301);
4253     for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4254       for (int k = 0; k < 2; ++k) {
4255         ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
4256       }
4257       Flush(1);
4258     }
4259     auto manual_compaction_thread = port::Thread([this, i]() {
4260       CompactRangeOptions cro;
4261       cro.allow_write_stall = false;
4262       Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
4263       if (i == 0) {
4264         ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4265                         .IsColumnFamilyDropped());
4266       } else {
4267         ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4268                         .IsShutdownInProgress());
4269       }
4270     });
4271 
4272     TEST_SYNC_POINT(
4273         "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
4274     if (i == 0) {
4275       ASSERT_OK(db_->DropColumnFamily(handles_[1]));
4276     } else {
4277       dbfull()->CancelAllBackgroundWork(false /* wait */);
4278     }
4279     manual_compaction_thread.join();
4280     TEST_SYNC_POINT(
4281         "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
4282     dbfull()->TEST_WaitForCompact();
4283     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4284   }
4285 }
4286 
TEST_F(DBCompactionTest,CompactRangeSkipFlushAfterDelay)4287 TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
4288   // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
4289   // CompactRange skips its flush if the delay is long enough that the memtables
4290   // existing at the beginning of the call have already been flushed.
4291   const int kNumL0FilesTrigger = 4;
4292   const int kNumL0FilesLimit = 8;
4293   Options options = CurrentOptions();
4294   options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4295   options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4296   Reopen(options);
4297 
4298   Random rnd(301);
4299   // The manual flush includes the memtable that was active when CompactRange
4300   // began. So it unblocks CompactRange and precludes its flush. Throughout the
4301   // test, stall conditions are upheld via high L0 file count.
4302   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4303       {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4304         "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
4305        {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
4306         "DBImpl::FlushMemTable:StallWaitDone"},
4307        {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
4308   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4309 
4310   //used for the delayable flushes
4311   FlushOptions flush_opts;
4312   flush_opts.allow_write_stall = true;
4313   for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
4314     for (int j = 0; j < 2; ++j) {
4315       ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
4316     }
4317     dbfull()->Flush(flush_opts);
4318   }
4319   auto manual_compaction_thread = port::Thread([this]() {
4320     CompactRangeOptions cro;
4321     cro.allow_write_stall = false;
4322     db_->CompactRange(cro, nullptr, nullptr);
4323   });
4324 
4325   TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
4326   Put(ToString(0), RandomString(&rnd, 1024));
4327   dbfull()->Flush(flush_opts);
4328   Put(ToString(0), RandomString(&rnd, 1024));
4329   TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
4330   manual_compaction_thread.join();
4331 
4332   // If CompactRange's flush was skipped, the final Put above will still be
4333   // in the active memtable.
4334   std::string num_keys_in_memtable;
4335   db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
4336   ASSERT_EQ(ToString(1), num_keys_in_memtable);
4337 
4338   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4339 }
4340 
TEST_F(DBCompactionTest,CompactRangeFlushOverlappingMemtable)4341 TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
4342   // Verify memtable only gets flushed if it contains data overlapping the range
4343   // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
4344   const int kNumEndpointKeys = 5;
4345   std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
4346   Options options = CurrentOptions();
4347   options.disable_auto_compactions = true;
4348   Reopen(options);
4349 
4350   // One extra iteration for nullptr, which means left side of interval is
4351   // unbounded.
4352   for (int i = 0; i <= kNumEndpointKeys; ++i) {
4353     Slice begin;
4354     Slice* begin_ptr;
4355     if (i == 0) {
4356       begin_ptr = nullptr;
4357     } else {
4358       begin = keys[i - 1];
4359       begin_ptr = &begin;
4360     }
4361     // Start at `i` so right endpoint comes after left endpoint. One extra
4362     // iteration for nullptr, which means right side of interval is unbounded.
4363     for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
4364       Slice end;
4365       Slice* end_ptr;
4366       if (j == kNumEndpointKeys) {
4367         end_ptr = nullptr;
4368       } else {
4369         end = keys[j];
4370         end_ptr = &end;
4371       }
4372       ASSERT_OK(Put("b", "val"));
4373       ASSERT_OK(Put("d", "val"));
4374       CompactRangeOptions compact_range_opts;
4375       ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
4376 
4377       uint64_t get_prop_tmp, num_memtable_entries = 0;
4378       ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
4379                                       &get_prop_tmp));
4380       num_memtable_entries += get_prop_tmp;
4381       ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
4382                                       &get_prop_tmp));
4383       num_memtable_entries += get_prop_tmp;
4384       if (begin_ptr == nullptr || end_ptr == nullptr ||
4385           (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
4386         // In this case `CompactRange`'s range overlapped in some way with the
4387         // memtable's range, so flush should've happened. Then "b" and "d" won't
4388         // be in the memtable.
4389         ASSERT_EQ(0, num_memtable_entries);
4390       } else {
4391         ASSERT_EQ(2, num_memtable_entries);
4392         // flush anyways to prepare for next iteration
4393         db_->Flush(FlushOptions());
4394       }
4395     }
4396   }
4397 }
4398 
TEST_F(DBCompactionTest,CompactionStatsTest)4399 TEST_F(DBCompactionTest, CompactionStatsTest) {
4400   Options options = CurrentOptions();
4401   options.level0_file_num_compaction_trigger = 2;
4402   CompactionStatsCollector* collector = new CompactionStatsCollector();
4403   options.listeners.emplace_back(collector);
4404   DestroyAndReopen(options);
4405 
4406   for (int i = 0; i < 32; i++) {
4407     for (int j = 0; j < 5000; j++) {
4408       Put(std::to_string(j), std::string(1, 'A'));
4409     }
4410     ASSERT_OK(Flush());
4411     ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
4412   }
4413   dbfull()->TEST_WaitForCompact();
4414   ColumnFamilyHandleImpl* cfh =
4415       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4416   ColumnFamilyData* cfd = cfh->cfd();
4417 
4418   VerifyCompactionStats(*cfd, *collector);
4419 }
4420 
TEST_F(DBCompactionTest,CompactFilesOutputRangeConflict)4421 TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
4422   // LSM setup:
4423   // L1:      [ba bz]
4424   // L2: [a b]       [c d]
4425   // L3: [a b]       [c d]
4426   //
4427   // Thread 1:                        Thread 2:
4428   // Begin compacting all L2->L3
4429   //                                  Compact [ba bz] L1->L3
4430   // End compacting all L2->L3
4431   //
4432   // The compaction operation in thread 2 should be disallowed because the range
4433   // overlaps with the compaction in thread 1, which also covers that range in
4434   // L3.
4435   Options options = CurrentOptions();
4436   FlushedFileCollector* collector = new FlushedFileCollector();
4437   options.listeners.emplace_back(collector);
4438   Reopen(options);
4439 
4440   for (int level = 3; level >= 2; --level) {
4441     ASSERT_OK(Put("a", "val"));
4442     ASSERT_OK(Put("b", "val"));
4443     ASSERT_OK(Flush());
4444     ASSERT_OK(Put("c", "val"));
4445     ASSERT_OK(Put("d", "val"));
4446     ASSERT_OK(Flush());
4447     MoveFilesToLevel(level);
4448   }
4449   ASSERT_OK(Put("ba", "val"));
4450   ASSERT_OK(Put("bz", "val"));
4451   ASSERT_OK(Flush());
4452   MoveFilesToLevel(1);
4453 
4454   SyncPoint::GetInstance()->LoadDependency({
4455       {"CompactFilesImpl:0",
4456        "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
4457       {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
4458        "CompactFilesImpl:1"},
4459   });
4460   SyncPoint::GetInstance()->EnableProcessing();
4461 
4462   auto bg_thread = port::Thread([&]() {
4463     // Thread 1
4464     std::vector<std::string> filenames = collector->GetFlushedFiles();
4465     filenames.pop_back();
4466     ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
4467                                 3 /* output_level */));
4468   });
4469 
4470   // Thread 2
4471   TEST_SYNC_POINT(
4472       "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
4473   std::string filename = collector->GetFlushedFiles().back();
4474   ASSERT_FALSE(
4475       db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
4476           .ok());
4477   TEST_SYNC_POINT(
4478       "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
4479 
4480   bg_thread.join();
4481 }
4482 
TEST_F(DBCompactionTest,CompactionHasEmptyOutput)4483 TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
4484   Options options = CurrentOptions();
4485   SstStatsCollector* collector = new SstStatsCollector();
4486   options.level0_file_num_compaction_trigger = 2;
4487   options.listeners.emplace_back(collector);
4488   Reopen(options);
4489 
4490   // Make sure the L0 files overlap to prevent trivial move.
4491   ASSERT_OK(Put("a", "val"));
4492   ASSERT_OK(Put("b", "val"));
4493   ASSERT_OK(Flush());
4494   ASSERT_OK(Delete("a"));
4495   ASSERT_OK(Delete("b"));
4496   ASSERT_OK(Flush());
4497 
4498   dbfull()->TEST_WaitForCompact();
4499   ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4500   ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4501 
4502   // Expect one file creation to start for each flush, and zero for compaction
4503   // since no keys are written.
4504   ASSERT_EQ(2, collector->num_ssts_creation_started());
4505 }
4506 
TEST_F(DBCompactionTest,CompactionLimiter)4507 TEST_F(DBCompactionTest, CompactionLimiter) {
4508   const int kNumKeysPerFile = 10;
4509   const int kMaxBackgroundThreads = 64;
4510 
4511   struct CompactionLimiter {
4512     std::string name;
4513     int limit_tasks;
4514     int max_tasks;
4515     int tasks;
4516     std::shared_ptr<ConcurrentTaskLimiter> limiter;
4517   };
4518 
4519   std::vector<CompactionLimiter> limiter_settings;
4520   limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
4521   limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
4522   limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
4523 
4524   for (auto& ls : limiter_settings) {
4525     ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
4526   }
4527 
4528   std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
4529     NewConcurrentTaskLimiter("unique_limiter", -1));
4530 
4531   const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
4532     "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
4533   const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
4534 
4535   std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
4536 
4537   Options options = CurrentOptions();
4538   options.write_buffer_size = 110 * 1024;  // 110KB
4539   options.arena_block_size = 4096;
4540   options.num_levels = 3;
4541   options.level0_file_num_compaction_trigger = 4;
4542   options.level0_slowdown_writes_trigger = 64;
4543   options.level0_stop_writes_trigger = 64;
4544   options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
4545   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4546   options.max_write_buffer_number = 10; // Enough memtables
4547   DestroyAndReopen(options);
4548 
4549   std::vector<Options> option_vector;
4550   option_vector.reserve(cf_count);
4551 
4552   for (unsigned int cf = 0; cf < cf_count; cf++) {
4553     ColumnFamilyOptions cf_opt(options);
4554     if (cf == 0) {
4555       // "Default" CF does't use compaction limiter
4556       cf_opt.compaction_thread_limiter = nullptr;
4557     } else if (cf == 1) {
4558       // "1" CF uses bypass compaction limiter
4559       unique_limiter->SetMaxOutstandingTask(-1);
4560       cf_opt.compaction_thread_limiter = unique_limiter;
4561     } else {
4562       // Assign limiter by mod
4563       auto& ls = limiter_settings[cf % 3];
4564       cf_opt.compaction_thread_limiter = ls.limiter;
4565       cf_to_limiter[cf_names[cf]] = &ls;
4566     }
4567     option_vector.emplace_back(DBOptions(options), cf_opt);
4568   }
4569 
4570   for (unsigned int cf = 1; cf < cf_count; cf++) {
4571     CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
4572   }
4573 
4574   ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
4575                                                     cf_names + cf_count),
4576                            option_vector);
4577 
4578   port::Mutex mutex;
4579 
4580   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4581       "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
4582         const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4583         auto iter = cf_to_limiter.find(cf_name);
4584         if (iter != cf_to_limiter.end()) {
4585           MutexLock l(&mutex);
4586           ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
4587           iter->second->max_tasks =
4588               std::max(iter->second->max_tasks, iter->second->limit_tasks);
4589         }
4590       });
4591 
4592   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4593       "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
4594         const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4595         auto iter = cf_to_limiter.find(cf_name);
4596         if (iter != cf_to_limiter.end()) {
4597           MutexLock l(&mutex);
4598           ASSERT_GE(--iter->second->tasks, 0);
4599         }
4600       });
4601 
4602   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4603 
4604   // Block all compact threads in thread pool.
4605   const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
4606   const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
4607   env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
4608   env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
4609 
4610   test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
4611 
4612   // Block all compaction threads in thread pool.
4613   for (size_t i = 0; i < kTotalCompactTasks; i++) {
4614     env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4615                    &sleeping_compact_tasks[i], Env::LOW);
4616     sleeping_compact_tasks[i].WaitUntilSleeping();
4617   }
4618 
4619   int keyIndex = 0;
4620 
4621   for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
4622     for (unsigned int cf = 0; cf < cf_count; cf++) {
4623       for (int i = 0; i < kNumKeysPerFile; i++) {
4624         ASSERT_OK(Put(cf, Key(keyIndex++), ""));
4625       }
4626       // put extra key to trigger flush
4627       ASSERT_OK(Put(cf, "", ""));
4628     }
4629 
4630     for (unsigned int cf = 0; cf < cf_count; cf++) {
4631       dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4632     }
4633   }
4634 
4635   // Enough L0 files to trigger compaction
4636   for (unsigned int cf = 0; cf < cf_count; cf++) {
4637     ASSERT_EQ(NumTableFilesAtLevel(0, cf),
4638       options.level0_file_num_compaction_trigger);
4639   }
4640 
4641   // Create more files for one column family, which triggers speed up
4642   // condition, all compactions will be scheduled.
4643   for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
4644     for (int i = 0; i < kNumKeysPerFile; i++) {
4645       ASSERT_OK(Put(0, Key(i), ""));
4646     }
4647     // put extra key to trigger flush
4648     ASSERT_OK(Put(0, "", ""));
4649     dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
4650     ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
4651               NumTableFilesAtLevel(0, 0));
4652   }
4653 
4654   // All CFs are pending compaction
4655   ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
4656 
4657   // Unblock all compaction threads
4658   for (size_t i = 0; i < kTotalCompactTasks; i++) {
4659     sleeping_compact_tasks[i].WakeUp();
4660     sleeping_compact_tasks[i].WaitUntilDone();
4661   }
4662 
4663   for (unsigned int cf = 0; cf < cf_count; cf++) {
4664     dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4665   }
4666 
4667   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4668 
4669   // Max outstanding compact tasks reached limit
4670   for (auto& ls : limiter_settings) {
4671     ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
4672     ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
4673   }
4674 
4675   // test manual compaction under a fully throttled limiter
4676   int cf_test = 1;
4677   unique_limiter->SetMaxOutstandingTask(0);
4678 
4679   // flush one more file to cf 1
4680   for (int i = 0; i < kNumKeysPerFile; i++) {
4681       ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
4682   }
4683   // put extra key to trigger flush
4684   ASSERT_OK(Put(cf_test, "", ""));
4685 
4686   dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
4687   ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
4688 
4689   Compact(cf_test, Key(0), Key(keyIndex));
4690   ASSERT_OK(dbfull()->TEST_WaitForCompact());
4691 }
4692 
4693 INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
4694                         ::testing::Values(std::make_tuple(1, true),
4695                                           std::make_tuple(1, false),
4696                                           std::make_tuple(4, true),
4697                                           std::make_tuple(4, false)));
4698 
TEST_P(DBCompactionDirectIOTest,DirectIO)4699 TEST_P(DBCompactionDirectIOTest, DirectIO) {
4700   Options options = CurrentOptions();
4701   Destroy(options);
4702   options.create_if_missing = true;
4703   options.disable_auto_compactions = true;
4704   options.use_direct_io_for_flush_and_compaction = GetParam();
4705   options.env = new MockEnv(Env::Default());
4706   Reopen(options);
4707   bool readahead = false;
4708   SyncPoint::GetInstance()->SetCallBack(
4709       "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
4710         bool* use_direct_writes = static_cast<bool*>(arg);
4711         ASSERT_EQ(*use_direct_writes,
4712                   options.use_direct_io_for_flush_and_compaction);
4713       });
4714   if (options.use_direct_io_for_flush_and_compaction) {
4715     SyncPoint::GetInstance()->SetCallBack(
4716         "SanitizeOptions:direct_io", [&](void* /*arg*/) {
4717           readahead = true;
4718         });
4719   }
4720   SyncPoint::GetInstance()->EnableProcessing();
4721   CreateAndReopenWithCF({"pikachu"}, options);
4722   MakeTables(3, "p", "q", 1);
4723   ASSERT_EQ("1,1,1", FilesPerLevel(1));
4724   Compact(1, "p", "q");
4725   ASSERT_EQ(readahead, options.use_direct_reads);
4726   ASSERT_EQ("0,0,1", FilesPerLevel(1));
4727   Destroy(options);
4728   delete options.env;
4729 }
4730 
4731 INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
4732                         testing::Bool());
4733 
4734 class CompactionPriTest : public DBTestBase,
4735                           public testing::WithParamInterface<uint32_t> {
4736  public:
CompactionPriTest()4737   CompactionPriTest() : DBTestBase("/compaction_pri_test") {
4738     compaction_pri_ = GetParam();
4739   }
4740 
4741   // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()4742   static void SetUpTestCase() {}
TearDownTestCase()4743   static void TearDownTestCase() {}
4744 
4745   uint32_t compaction_pri_;
4746 };
4747 
TEST_P(CompactionPriTest,Test)4748 TEST_P(CompactionPriTest, Test) {
4749   Options options = CurrentOptions();
4750   options.write_buffer_size = 16 * 1024;
4751   options.compaction_pri = static_cast<CompactionPri>(compaction_pri_);
4752   options.hard_pending_compaction_bytes_limit = 256 * 1024;
4753   options.max_bytes_for_level_base = 64 * 1024;
4754   options.max_bytes_for_level_multiplier = 4;
4755   options.compression = kNoCompression;
4756 
4757   DestroyAndReopen(options);
4758 
4759   Random rnd(301);
4760   const int kNKeys = 5000;
4761   int keys[kNKeys];
4762   for (int i = 0; i < kNKeys; i++) {
4763     keys[i] = i;
4764   }
4765   std::random_shuffle(std::begin(keys), std::end(keys));
4766 
4767   for (int i = 0; i < kNKeys; i++) {
4768     ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102)));
4769   }
4770 
4771   dbfull()->TEST_WaitForCompact();
4772   for (int i = 0; i < kNKeys; i++) {
4773     ASSERT_NE("NOT_FOUND", Get(Key(i)));
4774   }
4775 }
4776 
4777 INSTANTIATE_TEST_CASE_P(
4778     CompactionPriTest, CompactionPriTest,
4779     ::testing::Values(CompactionPri::kByCompensatedSize,
4780                       CompactionPri::kOldestLargestSeqFirst,
4781                       CompactionPri::kOldestSmallestSeqFirst,
4782                       CompactionPri::kMinOverlappingRatio));
4783 
4784 class NoopMergeOperator : public MergeOperator {
4785  public:
NoopMergeOperator()4786   NoopMergeOperator() {}
4787 
FullMergeV2(const MergeOperationInput &,MergeOperationOutput * merge_out) const4788   bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
4789                    MergeOperationOutput* merge_out) const override {
4790     std::string val("bar");
4791     merge_out->new_value = val;
4792     return true;
4793   }
4794 
Name() const4795   const char* Name() const override { return "Noop"; }
4796 };
4797 
TEST_F(DBCompactionTest,PartialManualCompaction)4798 TEST_F(DBCompactionTest, PartialManualCompaction) {
4799   Options opts = CurrentOptions();
4800   opts.num_levels = 3;
4801   opts.level0_file_num_compaction_trigger = 10;
4802   opts.compression = kNoCompression;
4803   opts.merge_operator.reset(new NoopMergeOperator());
4804   opts.target_file_size_base = 10240;
4805   DestroyAndReopen(opts);
4806 
4807   Random rnd(301);
4808   for (auto i = 0; i < 8; ++i) {
4809     for (auto j = 0; j < 10; ++j) {
4810       Merge("foo", RandomString(&rnd, 1024));
4811     }
4812     Flush();
4813   }
4814 
4815   MoveFilesToLevel(2);
4816 
4817   std::string prop;
4818   EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
4819   uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
4820   ASSERT_OK(dbfull()->SetOptions(
4821       {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
4822 
4823   CompactRangeOptions cro;
4824   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4825   dbfull()->CompactRange(cro, nullptr, nullptr);
4826 }
4827 
TEST_F(DBCompactionTest,ManualCompactionFailsInReadOnlyMode)4828 TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
4829   // Regression test for bug where manual compaction hangs forever when the DB
4830   // is in read-only mode. Verify it now at least returns, despite failing.
4831   const int kNumL0Files = 4;
4832   std::unique_ptr<FaultInjectionTestEnv> mock_env(
4833       new FaultInjectionTestEnv(Env::Default()));
4834   Options opts = CurrentOptions();
4835   opts.disable_auto_compactions = true;
4836   opts.env = mock_env.get();
4837   DestroyAndReopen(opts);
4838 
4839   Random rnd(301);
4840   for (int i = 0; i < kNumL0Files; ++i) {
4841     // Make sure files are overlapping in key-range to prevent trivial move.
4842     Put("key1", RandomString(&rnd, 1024));
4843     Put("key2", RandomString(&rnd, 1024));
4844     Flush();
4845   }
4846   ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
4847 
4848   // Enter read-only mode by failing a write.
4849   mock_env->SetFilesystemActive(false);
4850   // Make sure this is outside `CompactRange`'s range so that it doesn't fail
4851   // early trying to flush memtable.
4852   ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
4853 
4854   // In the bug scenario, the first manual compaction would fail and forget to
4855   // unregister itself, causing the second one to hang forever due to conflict
4856   // with a non-running compaction.
4857   CompactRangeOptions cro;
4858   cro.exclusive_manual_compaction = false;
4859   Slice begin_key("key1");
4860   Slice end_key("key2");
4861   ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4862   ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4863 
4864   // Close before mock_env destruct.
4865   Close();
4866 }
4867 
4868 // ManualCompactionBottomLevelOptimization tests the bottom level manual
4869 // compaction optimization to skip recompacting files created by Ln-1 to Ln
4870 // compaction
TEST_F(DBCompactionTest,ManualCompactionBottomLevelOptimized)4871 TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
4872   Options opts = CurrentOptions();
4873   opts.num_levels = 3;
4874   opts.level0_file_num_compaction_trigger = 5;
4875   opts.compression = kNoCompression;
4876   opts.merge_operator.reset(new NoopMergeOperator());
4877   opts.target_file_size_base = 1024;
4878   opts.max_bytes_for_level_multiplier = 2;
4879   opts.disable_auto_compactions = true;
4880   DestroyAndReopen(opts);
4881   ColumnFamilyHandleImpl* cfh =
4882       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4883   ColumnFamilyData* cfd = cfh->cfd();
4884   InternalStats* internal_stats_ptr = cfd->internal_stats();
4885   ASSERT_NE(internal_stats_ptr, nullptr);
4886 
4887   Random rnd(301);
4888   for (auto i = 0; i < 8; ++i) {
4889     for (auto j = 0; j < 10; ++j) {
4890       ASSERT_OK(
4891           Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4892     }
4893     Flush();
4894   }
4895 
4896   MoveFilesToLevel(2);
4897 
4898   for (auto i = 0; i < 8; ++i) {
4899     for (auto j = 0; j < 10; ++j) {
4900       ASSERT_OK(
4901           Put("bar" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4902     }
4903     Flush();
4904   }
4905   const std::vector<InternalStats::CompactionStats>& comp_stats =
4906       internal_stats_ptr->TEST_GetCompactionStats();
4907   int num = comp_stats[2].num_input_files_in_output_level;
4908   ASSERT_EQ(num, 0);
4909 
4910   CompactRangeOptions cro;
4911   cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4912   dbfull()->CompactRange(cro, nullptr, nullptr);
4913 
4914   const std::vector<InternalStats::CompactionStats>& comp_stats2 =
4915       internal_stats_ptr->TEST_GetCompactionStats();
4916   num = comp_stats2[2].num_input_files_in_output_level;
4917   ASSERT_EQ(num, 0);
4918 }
4919 
TEST_F(DBCompactionTest,CompactionDuringShutdown)4920 TEST_F(DBCompactionTest, CompactionDuringShutdown) {
4921   Options opts = CurrentOptions();
4922   opts.level0_file_num_compaction_trigger = 2;
4923   opts.disable_auto_compactions = true;
4924   DestroyAndReopen(opts);
4925   ColumnFamilyHandleImpl* cfh =
4926       static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4927   ColumnFamilyData* cfd = cfh->cfd();
4928   InternalStats* internal_stats_ptr = cfd->internal_stats();
4929   ASSERT_NE(internal_stats_ptr, nullptr);
4930 
4931   Random rnd(301);
4932   for (auto i = 0; i < 2; ++i) {
4933     for (auto j = 0; j < 10; ++j) {
4934       ASSERT_OK(
4935           Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4936     }
4937     Flush();
4938   }
4939 
4940   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4941       "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
4942       [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
4943   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4944   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4945   ASSERT_OK(dbfull()->error_handler_.GetBGError());
4946 }
4947 
4948 // FixFileIngestionCompactionDeadlock tests and verifies that compaction and
4949 // file ingestion do not cause deadlock in the event of write stall triggered
4950 // by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam,FixFileIngestionCompactionDeadlock)4951 TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
4952   const int kNumKeysPerFile = 100;
4953   // Generate SST files.
4954   Options options = CurrentOptions();
4955 
4956   // Generate an external SST file containing a single key, i.e. 99
4957   std::string sst_files_dir = dbname_ + "/sst_files/";
4958   test::DestroyDir(env_, sst_files_dir);
4959   ASSERT_OK(env_->CreateDir(sst_files_dir));
4960   SstFileWriter sst_writer(EnvOptions(), options);
4961   const std::string sst_file_path = sst_files_dir + "test.sst";
4962   ASSERT_OK(sst_writer.Open(sst_file_path));
4963   ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
4964   ASSERT_OK(sst_writer.Finish());
4965 
4966   SyncPoint::GetInstance()->DisableProcessing();
4967   SyncPoint::GetInstance()->ClearAllCallBacks();
4968   SyncPoint::GetInstance()->LoadDependency({
4969       {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
4970        "BackgroundCallCompaction:0"},
4971   });
4972   SyncPoint::GetInstance()->EnableProcessing();
4973 
4974   options.write_buffer_size = 110 << 10;  // 110KB
4975   options.level0_file_num_compaction_trigger =
4976       options.level0_stop_writes_trigger;
4977   options.max_subcompactions = max_subcompactions_;
4978   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4979   DestroyAndReopen(options);
4980   Random rnd(301);
4981 
4982   // Generate level0_stop_writes_trigger L0 files to trigger write stop
4983   for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
4984     for (int j = 0; j != kNumKeysPerFile; ++j) {
4985       ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
4986     }
4987     if (0 == i) {
4988       // When we reach here, the memtables have kNumKeysPerFile keys. Note that
4989       // flush is not yet triggered. We need to write an extra key so that the
4990       // write path will call PreprocessWrite and flush the previous key-value
4991       // pairs to e flushed. After that, there will be the newest key in the
4992       // memtable, and a bunch of L0 files. Since there is already one key in
4993       // the memtable, then for i = 1, 2, ..., we do not have to write this
4994       // extra key to trigger flush.
4995       ASSERT_OK(Put("", ""));
4996     }
4997     dbfull()->TEST_WaitForFlushMemTable();
4998     ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
4999   }
5000   // When we reach this point, there will be level0_stop_writes_trigger L0
5001   // files and one extra key (99) in memory, which overlaps with the external
5002   // SST file. Write stall triggers, and can be cleared only after compaction
5003   // reduces the number of L0 files.
5004 
5005   // Compaction will also be triggered since we have reached the threshold for
5006   // auto compaction. Note that compaction may begin after the following file
5007   // ingestion thread and waits for ingestion to finish.
5008 
5009   // Thread to ingest file with overlapping key range with the current
5010   // memtable. Consequently ingestion will trigger a flush. The flush MUST
5011   // proceed without waiting for the write stall condition to clear, otherwise
5012   // deadlock can happen.
5013   port::Thread ingestion_thr([&]() {
5014     IngestExternalFileOptions ifo;
5015     Status s = db_->IngestExternalFile({sst_file_path}, ifo);
5016     ASSERT_OK(s);
5017   });
5018 
5019   // More write to trigger write stop
5020   ingestion_thr.join();
5021   ASSERT_OK(dbfull()->TEST_WaitForCompact());
5022   Close();
5023 }
5024 
TEST_F(DBCompactionTest,ConsistencyFailTest)5025 TEST_F(DBCompactionTest, ConsistencyFailTest) {
5026   Options options = CurrentOptions();
5027   DestroyAndReopen(options);
5028 
5029   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5030       "VersionBuilder::CheckConsistency", [&](void* arg) {
5031         auto p =
5032             reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
5033         // just swap the two FileMetaData so that we hit error
5034         // in CheckConsistency funcion
5035         FileMetaData* temp = *(p->first);
5036         *(p->first) = *(p->second);
5037         *(p->second) = temp;
5038       });
5039 
5040   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5041 
5042   for (int k = 0; k < 2; ++k) {
5043     ASSERT_OK(Put("foo", "bar"));
5044     Flush();
5045   }
5046 
5047   ASSERT_NOK(Put("foo", "bar"));
5048   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5049 }
5050 
IngestOneKeyValue(DBImpl * db,const std::string & key,const std::string & value,const Options & options)5051 void IngestOneKeyValue(DBImpl* db, const std::string& key,
5052                        const std::string& value, const Options& options) {
5053   ExternalSstFileInfo info;
5054   std::string f = test::PerThreadDBPath("sst_file" + key);
5055   EnvOptions env;
5056   ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
5057   auto s = writer.Open(f);
5058   ASSERT_OK(s);
5059   // ASSERT_OK(writer.Put(Key(), ""));
5060   ASSERT_OK(writer.Put(key, value));
5061 
5062   ASSERT_OK(writer.Finish(&info));
5063   IngestExternalFileOptions ingest_opt;
5064 
5065   ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
5066 }
5067 
TEST_P(DBCompactionTestWithParam,FlushAfterIntraL0CompactionCheckConsistencyFail)5068 TEST_P(DBCompactionTestWithParam,
5069        FlushAfterIntraL0CompactionCheckConsistencyFail) {
5070   Options options = CurrentOptions();
5071   options.force_consistency_checks = true;
5072   options.compression = kNoCompression;
5073   options.level0_file_num_compaction_trigger = 5;
5074   options.max_background_compactions = 2;
5075   options.max_subcompactions = max_subcompactions_;
5076   DestroyAndReopen(options);
5077 
5078   const size_t kValueSize = 1 << 20;
5079   Random rnd(301);
5080   std::atomic<int> pick_intra_l0_count(0);
5081   std::string value(RandomString(&rnd, kValueSize));
5082 
5083   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5084       {{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
5085         "CompactionJob::Run():Start"}});
5086   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5087       "FindIntraL0Compaction",
5088       [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5089 
5090   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5091 
5092   // prevents trivial move
5093   for (int i = 0; i < 10; ++i) {
5094     ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
5095   }
5096   ASSERT_OK(Flush());
5097   Compact("", Key(99));
5098   ASSERT_EQ(0, NumTableFilesAtLevel(0));
5099 
5100   // Flush 5 L0 sst.
5101   for (int i = 0; i < 5; ++i) {
5102     ASSERT_OK(Put(Key(i + 1), value));
5103     ASSERT_OK(Flush());
5104   }
5105   ASSERT_EQ(5, NumTableFilesAtLevel(0));
5106 
5107   // Put one key, to make smallest log sequence number in this memtable is less
5108   // than sst which would be ingested in next step.
5109   ASSERT_OK(Put(Key(0), "a"));
5110 
5111   ASSERT_EQ(5, NumTableFilesAtLevel(0));
5112 
5113   // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
5114   for (int i = 5; i < 10; i++) {
5115     IngestOneKeyValue(dbfull(), Key(i), value, options);
5116     ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
5117   }
5118 
5119   TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
5120   // Put one key, to make biggest log sequence number in this memtable is bigger
5121   // than sst which would be ingested in next step.
5122   ASSERT_OK(Put(Key(2), "b"));
5123   ASSERT_EQ(10, NumTableFilesAtLevel(0));
5124   dbfull()->TEST_WaitForCompact();
5125   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5126   std::vector<std::vector<FileMetaData>> level_to_files;
5127   dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
5128                                   &level_to_files);
5129   ASSERT_GT(level_to_files[0].size(), 0);
5130   ASSERT_GT(pick_intra_l0_count.load(), 0);
5131 
5132   ASSERT_OK(Flush());
5133 }
5134 
TEST_P(DBCompactionTestWithParam,IntraL0CompactionAfterFlushCheckConsistencyFail)5135 TEST_P(DBCompactionTestWithParam,
5136        IntraL0CompactionAfterFlushCheckConsistencyFail) {
5137   Options options = CurrentOptions();
5138   options.force_consistency_checks = true;
5139   options.compression = kNoCompression;
5140   options.level0_file_num_compaction_trigger = 5;
5141   options.max_background_compactions = 2;
5142   options.max_subcompactions = max_subcompactions_;
5143   options.write_buffer_size = 2 << 20;
5144   options.max_write_buffer_number = 6;
5145   DestroyAndReopen(options);
5146 
5147   const size_t kValueSize = 1 << 20;
5148   Random rnd(301);
5149   std::string value(RandomString(&rnd, kValueSize));
5150   std::string value2(RandomString(&rnd, kValueSize));
5151   std::string bigvalue = value + value;
5152 
5153   // prevents trivial move
5154   for (int i = 0; i < 10; ++i) {
5155     ASSERT_OK(Put(Key(i), ""));  // prevents trivial move
5156   }
5157   ASSERT_OK(Flush());
5158   Compact("", Key(99));
5159   ASSERT_EQ(0, NumTableFilesAtLevel(0));
5160 
5161   std::atomic<int> pick_intra_l0_count(0);
5162   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5163       {{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
5164         "CompactionJob::Run():Start"}});
5165   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5166       "FindIntraL0Compaction",
5167       [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5168   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5169   // Make 6 L0 sst.
5170   for (int i = 0; i < 6; ++i) {
5171     if (i % 2 == 0) {
5172       IngestOneKeyValue(dbfull(), Key(i), value, options);
5173     } else {
5174       ASSERT_OK(Put(Key(i), value));
5175       ASSERT_OK(Flush());
5176     }
5177   }
5178 
5179   ASSERT_EQ(6, NumTableFilesAtLevel(0));
5180 
5181   // Stop run flush job
5182   env_->SetBackgroundThreads(1, Env::HIGH);
5183   test::SleepingBackgroundTask sleeping_tasks;
5184   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
5185                  Env::Priority::HIGH);
5186   sleeping_tasks.WaitUntilSleeping();
5187 
5188   // Put many keys to make memtable request to flush
5189   for (int i = 0; i < 6; ++i) {
5190     ASSERT_OK(Put(Key(i), bigvalue));
5191   }
5192 
5193   ASSERT_EQ(6, NumTableFilesAtLevel(0));
5194   // ingest file to trigger IntraL0Compaction
5195   for (int i = 6; i < 10; ++i) {
5196     ASSERT_EQ(i, NumTableFilesAtLevel(0));
5197     IngestOneKeyValue(dbfull(), Key(i), value2, options);
5198   }
5199   ASSERT_EQ(10, NumTableFilesAtLevel(0));
5200 
5201   // Wake up flush job
5202   sleeping_tasks.WakeUp();
5203   sleeping_tasks.WaitUntilDone();
5204   TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
5205   dbfull()->TEST_WaitForCompact();
5206   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5207 
5208   uint64_t error_count = 0;
5209   db_->GetIntProperty("rocksdb.background-errors", &error_count);
5210   ASSERT_EQ(error_count, 0);
5211   ASSERT_GT(pick_intra_l0_count.load(), 0);
5212   for (int i = 0; i < 6; ++i) {
5213     ASSERT_EQ(bigvalue, Get(Key(i)));
5214   }
5215   for (int i = 6; i < 10; ++i) {
5216     ASSERT_EQ(value2, Get(Key(i)));
5217   }
5218 }
5219 
TEST_F(DBCompactionTest,FifoCompactionGetFileCreationTime)5220 TEST_F(DBCompactionTest, FifoCompactionGetFileCreationTime) {
5221   MockEnv mock_env(env_);
5222   do {
5223     Options options = CurrentOptions();
5224     options.table_factory.reset(new BlockBasedTableFactory());
5225     options.env = &mock_env;
5226     options.ttl = static_cast<uint64_t>(24) * 3600;
5227     options.compaction_style = kCompactionStyleFIFO;
5228     constexpr size_t kNumFiles = 24;
5229     options.max_open_files = 20;
5230     constexpr size_t kNumKeysPerFile = 10;
5231     DestroyAndReopen(options);
5232     for (size_t i = 0; i < kNumFiles; ++i) {
5233       for (size_t j = 0; j < kNumKeysPerFile; ++j) {
5234         ASSERT_OK(Put(std::to_string(j), "value_" + std::to_string(i)));
5235       }
5236       ASSERT_OK(Flush());
5237     }
5238     mock_env.FakeSleepForMicroseconds(
5239         static_cast<uint64_t>(1000 * 1000 * (1 + options.ttl)));
5240     ASSERT_OK(Put("foo", "value"));
5241     ASSERT_OK(Flush());
5242   } while (ChangeOptions());
5243 }
5244 
5245 #endif // !defined(ROCKSDB_LITE)
5246 }  // namespace ROCKSDB_NAMESPACE
5247 
main(int argc,char ** argv)5248 int main(int argc, char** argv) {
5249 #if !defined(ROCKSDB_LITE)
5250   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
5251   ::testing::InitGoogleTest(&argc, argv);
5252   return RUN_ALL_TESTS();
5253 #else
5254   (void) argc;
5255   (void) argv;
5256   return 0;
5257 #endif
5258 }
5259