1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/port.h"
12 #include "port/stack_trace.h"
13 #include "rocksdb/concurrent_task_limiter.h"
14 #include "rocksdb/experimental.h"
15 #include "rocksdb/sst_file_writer.h"
16 #include "rocksdb/utilities/convenience.h"
17 #include "test_util/fault_injection_test_env.h"
18 #include "test_util/sync_point.h"
19 #include "util/concurrent_task_limiter_impl.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 // SYNC_POINT is not supported in released Windows mode.
24 #if !defined(ROCKSDB_LITE)
25
26 class DBCompactionTest : public DBTestBase {
27 public:
DBCompactionTest()28 DBCompactionTest() : DBTestBase("/db_compaction_test") {}
29 };
30
31 class DBCompactionTestWithParam
32 : public DBTestBase,
33 public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
34 public:
DBCompactionTestWithParam()35 DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
36 max_subcompactions_ = std::get<0>(GetParam());
37 exclusive_manual_compaction_ = std::get<1>(GetParam());
38 }
39
40 // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()41 static void SetUpTestCase() {}
TearDownTestCase()42 static void TearDownTestCase() {}
43
44 uint32_t max_subcompactions_;
45 bool exclusive_manual_compaction_;
46 };
47
48 class DBCompactionDirectIOTest : public DBCompactionTest,
49 public ::testing::WithParamInterface<bool> {
50 public:
DBCompactionDirectIOTest()51 DBCompactionDirectIOTest() : DBCompactionTest() {}
52 };
53
54 namespace {
55
56 class FlushedFileCollector : public EventListener {
57 public:
FlushedFileCollector()58 FlushedFileCollector() {}
~FlushedFileCollector()59 ~FlushedFileCollector() override {}
60
OnFlushCompleted(DB *,const FlushJobInfo & info)61 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
62 std::lock_guard<std::mutex> lock(mutex_);
63 flushed_files_.push_back(info.file_path);
64 }
65
GetFlushedFiles()66 std::vector<std::string> GetFlushedFiles() {
67 std::lock_guard<std::mutex> lock(mutex_);
68 std::vector<std::string> result;
69 for (auto fname : flushed_files_) {
70 result.push_back(fname);
71 }
72 return result;
73 }
74
ClearFlushedFiles()75 void ClearFlushedFiles() { flushed_files_.clear(); }
76
77 private:
78 std::vector<std::string> flushed_files_;
79 std::mutex mutex_;
80 };
81
82 class CompactionStatsCollector : public EventListener {
83 public:
CompactionStatsCollector()84 CompactionStatsCollector()
85 : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
86 for (auto& v : compaction_completed_) {
87 v.store(0);
88 }
89 }
90
~CompactionStatsCollector()91 ~CompactionStatsCollector() override {}
92
OnCompactionCompleted(DB *,const CompactionJobInfo & info)93 void OnCompactionCompleted(DB* /* db */,
94 const CompactionJobInfo& info) override {
95 int k = static_cast<int>(info.compaction_reason);
96 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
97 assert(k >= 0 && k < num_of_reasons);
98 compaction_completed_[k]++;
99 }
100
OnExternalFileIngested(DB *,const ExternalFileIngestionInfo &)101 void OnExternalFileIngested(
102 DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
103 int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
104 compaction_completed_[k]++;
105 }
106
OnFlushCompleted(DB *,const FlushJobInfo &)107 void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
108 int k = static_cast<int>(CompactionReason::kFlush);
109 compaction_completed_[k]++;
110 }
111
NumberOfCompactions(CompactionReason reason) const112 int NumberOfCompactions(CompactionReason reason) const {
113 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
114 int k = static_cast<int>(reason);
115 assert(k >= 0 && k < num_of_reasons);
116 return compaction_completed_.at(k).load();
117 }
118
119 private:
120 std::vector<std::atomic<int>> compaction_completed_;
121 };
122
123 class SstStatsCollector : public EventListener {
124 public:
SstStatsCollector()125 SstStatsCollector() : num_ssts_creation_started_(0) {}
126
OnTableFileCreationStarted(const TableFileCreationBriefInfo &)127 void OnTableFileCreationStarted(
128 const TableFileCreationBriefInfo& /* info */) override {
129 ++num_ssts_creation_started_;
130 }
131
num_ssts_creation_started()132 int num_ssts_creation_started() { return num_ssts_creation_started_; }
133
134 private:
135 std::atomic<int> num_ssts_creation_started_;
136 };
137
138 static const int kCDTValueSize = 1000;
139 static const int kCDTKeysPerBuffer = 4;
140 static const int kCDTNumLevels = 8;
DeletionTriggerOptions(Options options)141 Options DeletionTriggerOptions(Options options) {
142 options.compression = kNoCompression;
143 options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
144 options.min_write_buffer_number_to_merge = 1;
145 options.max_write_buffer_size_to_maintain = 0;
146 options.num_levels = kCDTNumLevels;
147 options.level0_file_num_compaction_trigger = 1;
148 options.target_file_size_base = options.write_buffer_size * 2;
149 options.target_file_size_multiplier = 2;
150 options.max_bytes_for_level_base =
151 options.target_file_size_base * options.target_file_size_multiplier;
152 options.max_bytes_for_level_multiplier = 2;
153 options.disable_auto_compactions = false;
154 return options;
155 }
156
HaveOverlappingKeyRanges(const Comparator * c,const SstFileMetaData & a,const SstFileMetaData & b)157 bool HaveOverlappingKeyRanges(
158 const Comparator* c,
159 const SstFileMetaData& a, const SstFileMetaData& b) {
160 if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
161 if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
162 // b.smallestkey <= a.smallestkey <= b.largestkey
163 return true;
164 }
165 } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
166 // a.smallestkey < b.smallestkey <= a.largestkey
167 return true;
168 }
169 if (c->Compare(a.largestkey, b.largestkey) <= 0) {
170 if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
171 // b.smallestkey <= a.largestkey <= b.largestkey
172 return true;
173 }
174 } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
175 // a.smallestkey <= b.largestkey < a.largestkey
176 return true;
177 }
178 return false;
179 }
180
181 // Identifies all files between level "min_level" and "max_level"
182 // which has overlapping key range with "input_file_meta".
GetOverlappingFileNumbersForLevelCompaction(const ColumnFamilyMetaData & cf_meta,const Comparator * comparator,int min_level,int max_level,const SstFileMetaData * input_file_meta,std::set<std::string> * overlapping_file_names)183 void GetOverlappingFileNumbersForLevelCompaction(
184 const ColumnFamilyMetaData& cf_meta,
185 const Comparator* comparator,
186 int min_level, int max_level,
187 const SstFileMetaData* input_file_meta,
188 std::set<std::string>* overlapping_file_names) {
189 std::set<const SstFileMetaData*> overlapping_files;
190 overlapping_files.insert(input_file_meta);
191 for (int m = min_level; m <= max_level; ++m) {
192 for (auto& file : cf_meta.levels[m].files) {
193 for (auto* included_file : overlapping_files) {
194 if (HaveOverlappingKeyRanges(
195 comparator, *included_file, file)) {
196 overlapping_files.insert(&file);
197 overlapping_file_names->insert(file.name);
198 break;
199 }
200 }
201 }
202 }
203 }
204
VerifyCompactionResult(const ColumnFamilyMetaData & cf_meta,const std::set<std::string> & overlapping_file_numbers)205 void VerifyCompactionResult(
206 const ColumnFamilyMetaData& cf_meta,
207 const std::set<std::string>& overlapping_file_numbers) {
208 #ifndef NDEBUG
209 for (auto& level : cf_meta.levels) {
210 for (auto& file : level.files) {
211 assert(overlapping_file_numbers.find(file.name) ==
212 overlapping_file_numbers.end());
213 }
214 }
215 #endif
216 }
217
218 /*
219 * Verifies compaction stats of cfd are valid.
220 *
221 * For each level of cfd, its compaction stats are valid if
222 * 1) sum(stat.counts) == stat.count, and
223 * 2) stat.counts[i] == collector.NumberOfCompactions(i)
224 */
VerifyCompactionStats(ColumnFamilyData & cfd,const CompactionStatsCollector & collector)225 void VerifyCompactionStats(ColumnFamilyData& cfd,
226 const CompactionStatsCollector& collector) {
227 #ifndef NDEBUG
228 InternalStats* internal_stats_ptr = cfd.internal_stats();
229 ASSERT_TRUE(internal_stats_ptr != nullptr);
230 const std::vector<InternalStats::CompactionStats>& comp_stats =
231 internal_stats_ptr->TEST_GetCompactionStats();
232 const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
233 std::vector<int> counts(num_of_reasons, 0);
234 // Count the number of compactions caused by each CompactionReason across
235 // all levels.
236 for (const auto& stat : comp_stats) {
237 int sum = 0;
238 for (int i = 0; i < num_of_reasons; i++) {
239 counts[i] += stat.counts[i];
240 sum += stat.counts[i];
241 }
242 ASSERT_EQ(sum, stat.count);
243 }
244 // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
245 // assuming that all compactions complete.
246 for (int i = 0; i < num_of_reasons; i++) {
247 ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
248 }
249 #endif /* NDEBUG */
250 }
251
PickFileRandomly(const ColumnFamilyMetaData & cf_meta,Random * rand,int * level=nullptr)252 const SstFileMetaData* PickFileRandomly(
253 const ColumnFamilyMetaData& cf_meta,
254 Random* rand,
255 int* level = nullptr) {
256 auto file_id = rand->Uniform(static_cast<int>(
257 cf_meta.file_count)) + 1;
258 for (auto& level_meta : cf_meta.levels) {
259 if (file_id <= level_meta.files.size()) {
260 if (level != nullptr) {
261 *level = level_meta.level;
262 }
263 auto result = rand->Uniform(file_id);
264 return &(level_meta.files[result]);
265 }
266 file_id -= static_cast<uint32_t>(level_meta.files.size());
267 }
268 assert(false);
269 return nullptr;
270 }
271 } // anonymous namespace
272
273 #ifndef ROCKSDB_VALGRIND_RUN
274 // All the TEST_P tests run once with sub_compactions disabled (i.e.
275 // options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam,CompactionDeletionTrigger)276 TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
277 for (int tid = 0; tid < 3; ++tid) {
278 uint64_t db_size[2];
279 Options options = DeletionTriggerOptions(CurrentOptions());
280 options.max_subcompactions = max_subcompactions_;
281
282 if (tid == 1) {
283 // the following only disable stats update in DB::Open()
284 // and should not affect the result of this test.
285 options.skip_stats_update_on_db_open = true;
286 } else if (tid == 2) {
287 // third pass with universal compaction
288 options.compaction_style = kCompactionStyleUniversal;
289 options.num_levels = 1;
290 }
291
292 DestroyAndReopen(options);
293 Random rnd(301);
294
295 const int kTestSize = kCDTKeysPerBuffer * 1024;
296 std::vector<std::string> values;
297 for (int k = 0; k < kTestSize; ++k) {
298 values.push_back(RandomString(&rnd, kCDTValueSize));
299 ASSERT_OK(Put(Key(k), values[k]));
300 }
301 dbfull()->TEST_WaitForFlushMemTable();
302 dbfull()->TEST_WaitForCompact();
303 db_size[0] = Size(Key(0), Key(kTestSize - 1));
304
305 for (int k = 0; k < kTestSize; ++k) {
306 ASSERT_OK(Delete(Key(k)));
307 }
308 dbfull()->TEST_WaitForFlushMemTable();
309 dbfull()->TEST_WaitForCompact();
310 db_size[1] = Size(Key(0), Key(kTestSize - 1));
311
312 // must have much smaller db size.
313 ASSERT_GT(db_size[0] / 3, db_size[1]);
314 }
315 }
316 #endif // ROCKSDB_VALGRIND_RUN
317
TEST_P(DBCompactionTestWithParam,CompactionsPreserveDeletes)318 TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
319 // For each options type we test following
320 // - Enable preserve_deletes
321 // - write bunch of keys and deletes
322 // - Set start_seqnum to the beginning; compact; check that keys are present
323 // - rewind start_seqnum way forward; compact; check that keys are gone
324
325 for (int tid = 0; tid < 3; ++tid) {
326 Options options = DeletionTriggerOptions(CurrentOptions());
327 options.max_subcompactions = max_subcompactions_;
328 options.preserve_deletes=true;
329 options.num_levels = 2;
330
331 if (tid == 1) {
332 options.skip_stats_update_on_db_open = true;
333 } else if (tid == 2) {
334 // third pass with universal compaction
335 options.compaction_style = kCompactionStyleUniversal;
336 }
337
338 DestroyAndReopen(options);
339 Random rnd(301);
340 // highlight the default; all deletes should be preserved
341 SetPreserveDeletesSequenceNumber(0);
342
343 const int kTestSize = kCDTKeysPerBuffer;
344 std::vector<std::string> values;
345 for (int k = 0; k < kTestSize; ++k) {
346 values.push_back(RandomString(&rnd, kCDTValueSize));
347 ASSERT_OK(Put(Key(k), values[k]));
348 }
349
350 for (int k = 0; k < kTestSize; ++k) {
351 ASSERT_OK(Delete(Key(k)));
352 }
353 // to ensure we tackle all tombstones
354 CompactRangeOptions cro;
355 cro.change_level = true;
356 cro.target_level = 2;
357 cro.bottommost_level_compaction =
358 BottommostLevelCompaction::kForceOptimized;
359
360 dbfull()->TEST_WaitForFlushMemTable();
361 dbfull()->CompactRange(cro, nullptr, nullptr);
362
363 // check that normal user iterator doesn't see anything
364 Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
365 int i = 0;
366 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
367 i++;
368 }
369 ASSERT_EQ(i, 0);
370 delete db_iter;
371
372 // check that iterator that sees internal keys sees tombstones
373 ReadOptions ro;
374 ro.iter_start_seqnum=1;
375 db_iter = dbfull()->NewIterator(ro);
376 i = 0;
377 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
378 i++;
379 }
380 ASSERT_EQ(i, 4);
381 delete db_iter;
382
383 // now all deletes should be gone
384 SetPreserveDeletesSequenceNumber(100000000);
385 dbfull()->CompactRange(cro, nullptr, nullptr);
386
387 db_iter = dbfull()->NewIterator(ro);
388 i = 0;
389 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
390 i++;
391 }
392 ASSERT_EQ(i, 0);
393 delete db_iter;
394 }
395 }
396
TEST_F(DBCompactionTest,SkipStatsUpdateTest)397 TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
398 // This test verify UpdateAccumulatedStats is not on
399 // if options.skip_stats_update_on_db_open = true
400 // The test will need to be updated if the internal behavior changes.
401
402 Options options = DeletionTriggerOptions(CurrentOptions());
403 options.disable_auto_compactions = true;
404 options.env = env_;
405 DestroyAndReopen(options);
406 Random rnd(301);
407
408 const int kTestSize = kCDTKeysPerBuffer * 512;
409 std::vector<std::string> values;
410 for (int k = 0; k < kTestSize; ++k) {
411 values.push_back(RandomString(&rnd, kCDTValueSize));
412 ASSERT_OK(Put(Key(k), values[k]));
413 }
414
415 ASSERT_OK(Flush());
416
417 Close();
418
419 int update_acc_stats_called = 0;
420 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
421 "VersionStorageInfo::UpdateAccumulatedStats",
422 [&](void* /* arg */) { ++update_acc_stats_called; });
423 SyncPoint::GetInstance()->EnableProcessing();
424
425 // Reopen the DB with stats-update disabled
426 options.skip_stats_update_on_db_open = true;
427 options.max_open_files = 20;
428 Reopen(options);
429
430 ASSERT_EQ(update_acc_stats_called, 0);
431
432 // Repeat the reopen process, but this time we enable
433 // stats-update.
434 options.skip_stats_update_on_db_open = false;
435 Reopen(options);
436
437 ASSERT_GT(update_acc_stats_called, 0);
438
439 SyncPoint::GetInstance()->ClearAllCallBacks();
440 SyncPoint::GetInstance()->DisableProcessing();
441 }
442
TEST_F(DBCompactionTest,TestTableReaderForCompaction)443 TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
444 Options options = CurrentOptions();
445 options.env = env_;
446 options.new_table_reader_for_compaction_inputs = true;
447 options.max_open_files = 20;
448 options.level0_file_num_compaction_trigger = 3;
449 DestroyAndReopen(options);
450 Random rnd(301);
451
452 int num_table_cache_lookup = 0;
453 int num_new_table_reader = 0;
454 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
455 "TableCache::FindTable:0", [&](void* arg) {
456 assert(arg != nullptr);
457 bool no_io = *(reinterpret_cast<bool*>(arg));
458 if (!no_io) {
459 // filter out cases for table properties queries.
460 num_table_cache_lookup++;
461 }
462 });
463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
464 "TableCache::GetTableReader:0",
465 [&](void* /*arg*/) { num_new_table_reader++; });
466 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
467
468 for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
469 ASSERT_OK(Put(Key(k), Key(k)));
470 ASSERT_OK(Put(Key(10 - k), "bar"));
471 if (k < options.level0_file_num_compaction_trigger - 1) {
472 num_table_cache_lookup = 0;
473 Flush();
474 dbfull()->TEST_WaitForCompact();
475 // preloading iterator issues one table cache lookup and create
476 // a new table reader, if not preloaded.
477 int old_num_table_cache_lookup = num_table_cache_lookup;
478 ASSERT_GE(num_table_cache_lookup, 1);
479 ASSERT_EQ(num_new_table_reader, 1);
480
481 num_table_cache_lookup = 0;
482 num_new_table_reader = 0;
483 ASSERT_EQ(Key(k), Get(Key(k)));
484 // lookup iterator from table cache and no need to create a new one.
485 ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
486 ASSERT_EQ(num_new_table_reader, 0);
487 }
488 }
489
490 num_table_cache_lookup = 0;
491 num_new_table_reader = 0;
492 Flush();
493 dbfull()->TEST_WaitForCompact();
494 // Preloading iterator issues one table cache lookup and creates
495 // a new table reader. One file is created for flush and one for compaction.
496 // Compaction inputs make no table cache look-up for data/range deletion
497 // iterators
498 // May preload table cache too.
499 ASSERT_GE(num_table_cache_lookup, 2);
500 int old_num_table_cache_lookup2 = num_table_cache_lookup;
501
502 // Create new iterator for:
503 // (1) 1 for verifying flush results
504 // (2) 1 for verifying compaction results.
505 // (3) New TableReaders will not be created for compaction inputs
506 ASSERT_EQ(num_new_table_reader, 2);
507
508 num_table_cache_lookup = 0;
509 num_new_table_reader = 0;
510 ASSERT_EQ(Key(1), Get(Key(1)));
511 ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 5);
512 ASSERT_EQ(num_new_table_reader, 0);
513
514 num_table_cache_lookup = 0;
515 num_new_table_reader = 0;
516 CompactRangeOptions cro;
517 cro.change_level = true;
518 cro.target_level = 2;
519 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
520 db_->CompactRange(cro, nullptr, nullptr);
521 // Only verifying compaction outputs issues one table cache lookup
522 // for both data block and range deletion block).
523 // May preload table cache too.
524 ASSERT_GE(num_table_cache_lookup, 1);
525 old_num_table_cache_lookup2 = num_table_cache_lookup;
526 // One for verifying compaction results.
527 // No new iterator created for compaction.
528 ASSERT_EQ(num_new_table_reader, 1);
529
530 num_table_cache_lookup = 0;
531 num_new_table_reader = 0;
532 ASSERT_EQ(Key(1), Get(Key(1)));
533 ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
534 ASSERT_EQ(num_new_table_reader, 0);
535
536 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
537 }
538
TEST_P(DBCompactionTestWithParam,CompactionDeletionTriggerReopen)539 TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
540 for (int tid = 0; tid < 2; ++tid) {
541 uint64_t db_size[3];
542 Options options = DeletionTriggerOptions(CurrentOptions());
543 options.max_subcompactions = max_subcompactions_;
544
545 if (tid == 1) {
546 // second pass with universal compaction
547 options.compaction_style = kCompactionStyleUniversal;
548 options.num_levels = 1;
549 }
550
551 DestroyAndReopen(options);
552 Random rnd(301);
553
554 // round 1 --- insert key/value pairs.
555 const int kTestSize = kCDTKeysPerBuffer * 512;
556 std::vector<std::string> values;
557 for (int k = 0; k < kTestSize; ++k) {
558 values.push_back(RandomString(&rnd, kCDTValueSize));
559 ASSERT_OK(Put(Key(k), values[k]));
560 }
561 dbfull()->TEST_WaitForFlushMemTable();
562 dbfull()->TEST_WaitForCompact();
563 db_size[0] = Size(Key(0), Key(kTestSize - 1));
564 Close();
565
566 // round 2 --- disable auto-compactions and issue deletions.
567 options.create_if_missing = false;
568 options.disable_auto_compactions = true;
569 Reopen(options);
570
571 for (int k = 0; k < kTestSize; ++k) {
572 ASSERT_OK(Delete(Key(k)));
573 }
574 db_size[1] = Size(Key(0), Key(kTestSize - 1));
575 Close();
576 // as auto_compaction is off, we shouldn't see too much reduce
577 // in db size.
578 ASSERT_LT(db_size[0] / 3, db_size[1]);
579
580 // round 3 --- reopen db with auto_compaction on and see if
581 // deletion compensation still work.
582 options.disable_auto_compactions = false;
583 Reopen(options);
584 // insert relatively small amount of data to trigger auto compaction.
585 for (int k = 0; k < kTestSize / 10; ++k) {
586 ASSERT_OK(Put(Key(k), values[k]));
587 }
588 dbfull()->TEST_WaitForFlushMemTable();
589 dbfull()->TEST_WaitForCompact();
590 db_size[2] = Size(Key(0), Key(kTestSize - 1));
591 // this time we're expecting significant drop in size.
592 ASSERT_GT(db_size[0] / 3, db_size[2]);
593 }
594 }
595
TEST_F(DBCompactionTest,CompactRangeBottomPri)596 TEST_F(DBCompactionTest, CompactRangeBottomPri) {
597 ASSERT_OK(Put(Key(50), ""));
598 ASSERT_OK(Flush());
599 ASSERT_OK(Put(Key(100), ""));
600 ASSERT_OK(Flush());
601 ASSERT_OK(Put(Key(200), ""));
602 ASSERT_OK(Flush());
603
604 {
605 CompactRangeOptions cro;
606 cro.change_level = true;
607 cro.target_level = 2;
608 dbfull()->CompactRange(cro, nullptr, nullptr);
609 }
610 ASSERT_EQ("0,0,3", FilesPerLevel(0));
611
612 ASSERT_OK(Put(Key(1), ""));
613 ASSERT_OK(Put(Key(199), ""));
614 ASSERT_OK(Flush());
615 ASSERT_OK(Put(Key(2), ""));
616 ASSERT_OK(Put(Key(199), ""));
617 ASSERT_OK(Flush());
618 ASSERT_EQ("2,0,3", FilesPerLevel(0));
619
620 // Now we have 2 L0 files, and 3 L2 files, and a manual compaction will
621 // be triggered.
622 // Two compaction jobs will run. One compacts 2 L0 files in Low Pri Pool
623 // and one compact to L2 in bottom pri pool.
624 int low_pri_count = 0;
625 int bottom_pri_count = 0;
626 SyncPoint::GetInstance()->SetCallBack(
627 "ThreadPoolImpl::Impl::BGThread:BeforeRun", [&](void* arg) {
628 Env::Priority* pri = reinterpret_cast<Env::Priority*>(arg);
629 // First time is low pri pool in the test case.
630 if (low_pri_count == 0 && bottom_pri_count == 0) {
631 ASSERT_EQ(Env::Priority::LOW, *pri);
632 }
633 if (*pri == Env::Priority::LOW) {
634 low_pri_count++;
635 } else {
636 bottom_pri_count++;
637 }
638 });
639 SyncPoint::GetInstance()->EnableProcessing();
640 env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
641 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
642 ASSERT_EQ(1, low_pri_count);
643 ASSERT_EQ(1, bottom_pri_count);
644 ASSERT_EQ("0,0,2", FilesPerLevel(0));
645
646 // Recompact bottom most level uses bottom pool
647 CompactRangeOptions cro;
648 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
649 dbfull()->CompactRange(cro, nullptr, nullptr);
650 ASSERT_EQ(1, low_pri_count);
651 ASSERT_EQ(2, bottom_pri_count);
652
653 env_->SetBackgroundThreads(0, Env::Priority::BOTTOM);
654 dbfull()->CompactRange(cro, nullptr, nullptr);
655 // Low pri pool is used if bottom pool has size 0.
656 ASSERT_EQ(2, low_pri_count);
657 ASSERT_EQ(2, bottom_pri_count);
658
659 SyncPoint::GetInstance()->DisableProcessing();
660 }
661
TEST_F(DBCompactionTest,DisableStatsUpdateReopen)662 TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
663 uint64_t db_size[3];
664 for (int test = 0; test < 2; ++test) {
665 Options options = DeletionTriggerOptions(CurrentOptions());
666 options.skip_stats_update_on_db_open = (test == 0);
667
668 env_->random_read_counter_.Reset();
669 DestroyAndReopen(options);
670 Random rnd(301);
671
672 // round 1 --- insert key/value pairs.
673 const int kTestSize = kCDTKeysPerBuffer * 512;
674 std::vector<std::string> values;
675 for (int k = 0; k < kTestSize; ++k) {
676 values.push_back(RandomString(&rnd, kCDTValueSize));
677 ASSERT_OK(Put(Key(k), values[k]));
678 }
679 dbfull()->TEST_WaitForFlushMemTable();
680 dbfull()->TEST_WaitForCompact();
681 db_size[0] = Size(Key(0), Key(kTestSize - 1));
682 Close();
683
684 // round 2 --- disable auto-compactions and issue deletions.
685 options.create_if_missing = false;
686 options.disable_auto_compactions = true;
687
688 env_->random_read_counter_.Reset();
689 Reopen(options);
690
691 for (int k = 0; k < kTestSize; ++k) {
692 ASSERT_OK(Delete(Key(k)));
693 }
694 db_size[1] = Size(Key(0), Key(kTestSize - 1));
695 Close();
696 // as auto_compaction is off, we shouldn't see too much reduce
697 // in db size.
698 ASSERT_LT(db_size[0] / 3, db_size[1]);
699
700 // round 3 --- reopen db with auto_compaction on and see if
701 // deletion compensation still work.
702 options.disable_auto_compactions = false;
703 Reopen(options);
704 dbfull()->TEST_WaitForFlushMemTable();
705 dbfull()->TEST_WaitForCompact();
706 db_size[2] = Size(Key(0), Key(kTestSize - 1));
707
708 if (options.skip_stats_update_on_db_open) {
709 // If update stats on DB::Open is disable, we don't expect
710 // deletion entries taking effect.
711 ASSERT_LT(db_size[0] / 3, db_size[2]);
712 } else {
713 // Otherwise, we should see a significant drop in db size.
714 ASSERT_GT(db_size[0] / 3, db_size[2]);
715 }
716 }
717 }
718
719
TEST_P(DBCompactionTestWithParam,CompactionTrigger)720 TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
721 const int kNumKeysPerFile = 100;
722
723 Options options = CurrentOptions();
724 options.write_buffer_size = 110 << 10; // 110KB
725 options.arena_block_size = 4 << 10;
726 options.num_levels = 3;
727 options.level0_file_num_compaction_trigger = 3;
728 options.max_subcompactions = max_subcompactions_;
729 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
730 CreateAndReopenWithCF({"pikachu"}, options);
731
732 Random rnd(301);
733
734 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
735 num++) {
736 std::vector<std::string> values;
737 // Write 100KB (100 values, each 1K)
738 for (int i = 0; i < kNumKeysPerFile; i++) {
739 values.push_back(RandomString(&rnd, 990));
740 ASSERT_OK(Put(1, Key(i), values[i]));
741 }
742 // put extra key to trigger flush
743 ASSERT_OK(Put(1, "", ""));
744 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
745 ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
746 }
747
748 // generate one more file in level-0, and should trigger level-0 compaction
749 std::vector<std::string> values;
750 for (int i = 0; i < kNumKeysPerFile; i++) {
751 values.push_back(RandomString(&rnd, 990));
752 ASSERT_OK(Put(1, Key(i), values[i]));
753 }
754 // put extra key to trigger flush
755 ASSERT_OK(Put(1, "", ""));
756 dbfull()->TEST_WaitForCompact();
757
758 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
759 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
760 }
761
TEST_F(DBCompactionTest,BGCompactionsAllowed)762 TEST_F(DBCompactionTest, BGCompactionsAllowed) {
763 // Create several column families. Make compaction triggers in all of them
764 // and see number of compactions scheduled to be less than allowed.
765 const int kNumKeysPerFile = 100;
766
767 Options options = CurrentOptions();
768 options.write_buffer_size = 110 << 10; // 110KB
769 options.arena_block_size = 4 << 10;
770 options.num_levels = 3;
771 // Should speed up compaction when there are 4 files.
772 options.level0_file_num_compaction_trigger = 2;
773 options.level0_slowdown_writes_trigger = 20;
774 options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
775 options.max_background_compactions = 3;
776 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
777
778 // Block all threads in thread pool.
779 const size_t kTotalTasks = 4;
780 env_->SetBackgroundThreads(4, Env::LOW);
781 test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
782 for (size_t i = 0; i < kTotalTasks; i++) {
783 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
784 &sleeping_tasks[i], Env::Priority::LOW);
785 sleeping_tasks[i].WaitUntilSleeping();
786 }
787
788 CreateAndReopenWithCF({"one", "two", "three"}, options);
789
790 Random rnd(301);
791 for (int cf = 0; cf < 4; cf++) {
792 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
793 for (int i = 0; i < kNumKeysPerFile; i++) {
794 ASSERT_OK(Put(cf, Key(i), ""));
795 }
796 // put extra key to trigger flush
797 ASSERT_OK(Put(cf, "", ""));
798 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
799 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
800 }
801 }
802
803 // Now all column families qualify compaction but only one should be
804 // scheduled, because no column family hits speed up condition.
805 ASSERT_EQ(1u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
806
807 // Create two more files for one column family, which triggers speed up
808 // condition, three compactions will be scheduled.
809 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
810 for (int i = 0; i < kNumKeysPerFile; i++) {
811 ASSERT_OK(Put(2, Key(i), ""));
812 }
813 // put extra key to trigger flush
814 ASSERT_OK(Put(2, "", ""));
815 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
816 ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
817 NumTableFilesAtLevel(0, 2));
818 }
819 ASSERT_EQ(3U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
820
821 // Unblock all threads to unblock all compactions.
822 for (size_t i = 0; i < kTotalTasks; i++) {
823 sleeping_tasks[i].WakeUp();
824 sleeping_tasks[i].WaitUntilDone();
825 }
826 dbfull()->TEST_WaitForCompact();
827
828 // Verify number of compactions allowed will come back to 1.
829
830 for (size_t i = 0; i < kTotalTasks; i++) {
831 sleeping_tasks[i].Reset();
832 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
833 &sleeping_tasks[i], Env::Priority::LOW);
834 sleeping_tasks[i].WaitUntilSleeping();
835 }
836 for (int cf = 0; cf < 4; cf++) {
837 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
838 for (int i = 0; i < kNumKeysPerFile; i++) {
839 ASSERT_OK(Put(cf, Key(i), ""));
840 }
841 // put extra key to trigger flush
842 ASSERT_OK(Put(cf, "", ""));
843 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
844 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
845 }
846 }
847
848 // Now all column families qualify compaction but only one should be
849 // scheduled, because no column family hits speed up condition.
850 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
851
852 for (size_t i = 0; i < kTotalTasks; i++) {
853 sleeping_tasks[i].WakeUp();
854 sleeping_tasks[i].WaitUntilDone();
855 }
856 }
857
TEST_P(DBCompactionTestWithParam,CompactionsGenerateMultipleFiles)858 TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
859 Options options = CurrentOptions();
860 options.write_buffer_size = 100000000; // Large write buffer
861 options.max_subcompactions = max_subcompactions_;
862 CreateAndReopenWithCF({"pikachu"}, options);
863
864 Random rnd(301);
865
866 // Write 8MB (80 values, each 100K)
867 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
868 std::vector<std::string> values;
869 for (int i = 0; i < 80; i++) {
870 values.push_back(RandomString(&rnd, 100000));
871 ASSERT_OK(Put(1, Key(i), values[i]));
872 }
873
874 // Reopening moves updates to level-0
875 ReopenWithColumnFamilies({"default", "pikachu"}, options);
876 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
877 true /* disallow trivial move */);
878
879 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
880 ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
881 for (int i = 0; i < 80; i++) {
882 ASSERT_EQ(Get(1, Key(i)), values[i]);
883 }
884 }
885
TEST_F(DBCompactionTest,MinorCompactionsHappen)886 TEST_F(DBCompactionTest, MinorCompactionsHappen) {
887 do {
888 Options options = CurrentOptions();
889 options.write_buffer_size = 10000;
890 CreateAndReopenWithCF({"pikachu"}, options);
891
892 const int N = 500;
893
894 int starting_num_tables = TotalTableFiles(1);
895 for (int i = 0; i < N; i++) {
896 ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v')));
897 }
898 int ending_num_tables = TotalTableFiles(1);
899 ASSERT_GT(ending_num_tables, starting_num_tables);
900
901 for (int i = 0; i < N; i++) {
902 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
903 }
904
905 ReopenWithColumnFamilies({"default", "pikachu"}, options);
906
907 for (int i = 0; i < N; i++) {
908 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
909 }
910 } while (ChangeCompactOptions());
911 }
912
TEST_F(DBCompactionTest,UserKeyCrossFile1)913 TEST_F(DBCompactionTest, UserKeyCrossFile1) {
914 Options options = CurrentOptions();
915 options.compaction_style = kCompactionStyleLevel;
916 options.level0_file_num_compaction_trigger = 3;
917
918 DestroyAndReopen(options);
919
920 // create first file and flush to l0
921 Put("4", "A");
922 Put("3", "A");
923 Flush();
924 dbfull()->TEST_WaitForFlushMemTable();
925
926 Put("2", "A");
927 Delete("3");
928 Flush();
929 dbfull()->TEST_WaitForFlushMemTable();
930 ASSERT_EQ("NOT_FOUND", Get("3"));
931
932 // move both files down to l1
933 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
934 ASSERT_EQ("NOT_FOUND", Get("3"));
935
936 for (int i = 0; i < 3; i++) {
937 Put("2", "B");
938 Flush();
939 dbfull()->TEST_WaitForFlushMemTable();
940 }
941 dbfull()->TEST_WaitForCompact();
942
943 ASSERT_EQ("NOT_FOUND", Get("3"));
944 }
945
TEST_F(DBCompactionTest,UserKeyCrossFile2)946 TEST_F(DBCompactionTest, UserKeyCrossFile2) {
947 Options options = CurrentOptions();
948 options.compaction_style = kCompactionStyleLevel;
949 options.level0_file_num_compaction_trigger = 3;
950
951 DestroyAndReopen(options);
952
953 // create first file and flush to l0
954 Put("4", "A");
955 Put("3", "A");
956 Flush();
957 dbfull()->TEST_WaitForFlushMemTable();
958
959 Put("2", "A");
960 SingleDelete("3");
961 Flush();
962 dbfull()->TEST_WaitForFlushMemTable();
963 ASSERT_EQ("NOT_FOUND", Get("3"));
964
965 // move both files down to l1
966 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
967 ASSERT_EQ("NOT_FOUND", Get("3"));
968
969 for (int i = 0; i < 3; i++) {
970 Put("2", "B");
971 Flush();
972 dbfull()->TEST_WaitForFlushMemTable();
973 }
974 dbfull()->TEST_WaitForCompact();
975
976 ASSERT_EQ("NOT_FOUND", Get("3"));
977 }
978
TEST_F(DBCompactionTest,ZeroSeqIdCompaction)979 TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
980 Options options = CurrentOptions();
981 options.compaction_style = kCompactionStyleLevel;
982 options.level0_file_num_compaction_trigger = 3;
983
984 FlushedFileCollector* collector = new FlushedFileCollector();
985 options.listeners.emplace_back(collector);
986
987 // compaction options
988 CompactionOptions compact_opt;
989 compact_opt.compression = kNoCompression;
990 compact_opt.output_file_size_limit = 4096;
991 const size_t key_len =
992 static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
993
994 DestroyAndReopen(options);
995
996 std::vector<const Snapshot*> snaps;
997
998 // create first file and flush to l0
999 for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
1000 Put(key, std::string(key_len, 'A'));
1001 snaps.push_back(dbfull()->GetSnapshot());
1002 }
1003 Flush();
1004 dbfull()->TEST_WaitForFlushMemTable();
1005
1006 // create second file and flush to l0
1007 for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
1008 Put(key, std::string(key_len, 'A'));
1009 snaps.push_back(dbfull()->GetSnapshot());
1010 }
1011 Flush();
1012 dbfull()->TEST_WaitForFlushMemTable();
1013
1014 // move both files down to l1
1015 dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
1016
1017 // release snap so that first instance of key(3) can have seqId=0
1018 for (auto snap : snaps) {
1019 dbfull()->ReleaseSnapshot(snap);
1020 }
1021
1022 // create 3 files in l0 so to trigger compaction
1023 for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
1024 Put("2", std::string(1, 'A'));
1025 Flush();
1026 dbfull()->TEST_WaitForFlushMemTable();
1027 }
1028
1029 dbfull()->TEST_WaitForCompact();
1030 ASSERT_OK(Put("", ""));
1031 }
1032
TEST_F(DBCompactionTest,ManualCompactionUnknownOutputSize)1033 TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
1034 // github issue #2249
1035 Options options = CurrentOptions();
1036 options.compaction_style = kCompactionStyleLevel;
1037 options.level0_file_num_compaction_trigger = 3;
1038 DestroyAndReopen(options);
1039
1040 // create two files in l1 that we can compact
1041 for (int i = 0; i < 2; ++i) {
1042 for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
1043 // make l0 files' ranges overlap to avoid trivial move
1044 Put(std::to_string(2 * i), std::string(1, 'A'));
1045 Put(std::to_string(2 * i + 1), std::string(1, 'A'));
1046 Flush();
1047 dbfull()->TEST_WaitForFlushMemTable();
1048 }
1049 dbfull()->TEST_WaitForCompact();
1050 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1051 ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
1052 }
1053
1054 ColumnFamilyMetaData cf_meta;
1055 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
1056 ASSERT_EQ(2, cf_meta.levels[1].files.size());
1057 std::vector<std::string> input_filenames;
1058 for (const auto& sst_file : cf_meta.levels[1].files) {
1059 input_filenames.push_back(sst_file.name);
1060 }
1061
1062 // note CompactionOptions::output_file_size_limit is unset.
1063 CompactionOptions compact_opt;
1064 compact_opt.compression = kNoCompression;
1065 dbfull()->CompactFiles(compact_opt, input_filenames, 1);
1066 }
1067
1068 // Check that writes done during a memtable compaction are recovered
1069 // if the database is shutdown during the memtable compaction.
TEST_F(DBCompactionTest,RecoverDuringMemtableCompaction)1070 TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
1071 do {
1072 Options options = CurrentOptions();
1073 options.env = env_;
1074 CreateAndReopenWithCF({"pikachu"}, options);
1075
1076 // Trigger a long memtable compaction and reopen the database during it
1077 ASSERT_OK(Put(1, "foo", "v1")); // Goes to 1st log file
1078 ASSERT_OK(Put(1, "big1", std::string(10000000, 'x'))); // Fills memtable
1079 ASSERT_OK(Put(1, "big2", std::string(1000, 'y'))); // Triggers compaction
1080 ASSERT_OK(Put(1, "bar", "v2")); // Goes to new log file
1081
1082 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1083 ASSERT_EQ("v1", Get(1, "foo"));
1084 ASSERT_EQ("v2", Get(1, "bar"));
1085 ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1"));
1086 ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2"));
1087 } while (ChangeOptions());
1088 }
1089
TEST_P(DBCompactionTestWithParam,TrivialMoveOneFile)1090 TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
1091 int32_t trivial_move = 0;
1092 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1093 "DBImpl::BackgroundCompaction:TrivialMove",
1094 [&](void* /*arg*/) { trivial_move++; });
1095 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1096
1097 Options options = CurrentOptions();
1098 options.write_buffer_size = 100000000;
1099 options.max_subcompactions = max_subcompactions_;
1100 DestroyAndReopen(options);
1101
1102 int32_t num_keys = 80;
1103 int32_t value_size = 100 * 1024; // 100 KB
1104
1105 Random rnd(301);
1106 std::vector<std::string> values;
1107 for (int i = 0; i < num_keys; i++) {
1108 values.push_back(RandomString(&rnd, value_size));
1109 ASSERT_OK(Put(Key(i), values[i]));
1110 }
1111
1112 // Reopening moves updates to L0
1113 Reopen(options);
1114 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1); // 1 file in L0
1115 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // 0 files in L1
1116
1117 std::vector<LiveFileMetaData> metadata;
1118 db_->GetLiveFilesMetaData(&metadata);
1119 ASSERT_EQ(metadata.size(), 1U);
1120 LiveFileMetaData level0_file = metadata[0]; // L0 file meta
1121
1122 CompactRangeOptions cro;
1123 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1124
1125 // Compaction will initiate a trivial move from L0 to L1
1126 dbfull()->CompactRange(cro, nullptr, nullptr);
1127
1128 // File moved From L0 to L1
1129 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
1130 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // 1 file in L1
1131
1132 metadata.clear();
1133 db_->GetLiveFilesMetaData(&metadata);
1134 ASSERT_EQ(metadata.size(), 1U);
1135 ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
1136 ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
1137
1138 for (int i = 0; i < num_keys; i++) {
1139 ASSERT_EQ(Get(Key(i)), values[i]);
1140 }
1141
1142 ASSERT_EQ(trivial_move, 1);
1143 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1144 }
1145
TEST_P(DBCompactionTestWithParam,TrivialMoveNonOverlappingFiles)1146 TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
1147 int32_t trivial_move = 0;
1148 int32_t non_trivial_move = 0;
1149 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1150 "DBImpl::BackgroundCompaction:TrivialMove",
1151 [&](void* /*arg*/) { trivial_move++; });
1152 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1153 "DBImpl::BackgroundCompaction:NonTrivial",
1154 [&](void* /*arg*/) { non_trivial_move++; });
1155 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1156
1157 Options options = CurrentOptions();
1158 options.disable_auto_compactions = true;
1159 options.write_buffer_size = 10 * 1024 * 1024;
1160 options.max_subcompactions = max_subcompactions_;
1161
1162 DestroyAndReopen(options);
1163 // non overlapping ranges
1164 std::vector<std::pair<int32_t, int32_t>> ranges = {
1165 {100, 199},
1166 {300, 399},
1167 {0, 99},
1168 {200, 299},
1169 {600, 699},
1170 {400, 499},
1171 {500, 550},
1172 {551, 599},
1173 };
1174 int32_t value_size = 10 * 1024; // 10 KB
1175
1176 Random rnd(301);
1177 std::map<int32_t, std::string> values;
1178 for (size_t i = 0; i < ranges.size(); i++) {
1179 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1180 values[j] = RandomString(&rnd, value_size);
1181 ASSERT_OK(Put(Key(j), values[j]));
1182 }
1183 ASSERT_OK(Flush());
1184 }
1185
1186 int32_t level0_files = NumTableFilesAtLevel(0, 0);
1187 ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
1188 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
1189
1190 CompactRangeOptions cro;
1191 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1192
1193 // Since data is non-overlapping we expect compaction to initiate
1194 // a trivial move
1195 db_->CompactRange(cro, nullptr, nullptr);
1196 // We expect that all the files were trivially moved from L0 to L1
1197 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1198 ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
1199
1200 for (size_t i = 0; i < ranges.size(); i++) {
1201 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1202 ASSERT_EQ(Get(Key(j)), values[j]);
1203 }
1204 }
1205
1206 ASSERT_EQ(trivial_move, 1);
1207 ASSERT_EQ(non_trivial_move, 0);
1208
1209 trivial_move = 0;
1210 non_trivial_move = 0;
1211 values.clear();
1212 DestroyAndReopen(options);
1213 // Same ranges as above but overlapping
1214 ranges = {
1215 {100, 199},
1216 {300, 399},
1217 {0, 99},
1218 {200, 299},
1219 {600, 699},
1220 {400, 499},
1221 {500, 560}, // this range overlap with the next one
1222 {551, 599},
1223 };
1224 for (size_t i = 0; i < ranges.size(); i++) {
1225 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1226 values[j] = RandomString(&rnd, value_size);
1227 ASSERT_OK(Put(Key(j), values[j]));
1228 }
1229 ASSERT_OK(Flush());
1230 }
1231
1232 db_->CompactRange(cro, nullptr, nullptr);
1233
1234 for (size_t i = 0; i < ranges.size(); i++) {
1235 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1236 ASSERT_EQ(Get(Key(j)), values[j]);
1237 }
1238 }
1239 ASSERT_EQ(trivial_move, 0);
1240 ASSERT_EQ(non_trivial_move, 1);
1241
1242 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1243 }
1244
TEST_P(DBCompactionTestWithParam,TrivialMoveTargetLevel)1245 TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
1246 int32_t trivial_move = 0;
1247 int32_t non_trivial_move = 0;
1248 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1249 "DBImpl::BackgroundCompaction:TrivialMove",
1250 [&](void* /*arg*/) { trivial_move++; });
1251 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1252 "DBImpl::BackgroundCompaction:NonTrivial",
1253 [&](void* /*arg*/) { non_trivial_move++; });
1254 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1255
1256 Options options = CurrentOptions();
1257 options.disable_auto_compactions = true;
1258 options.write_buffer_size = 10 * 1024 * 1024;
1259 options.num_levels = 7;
1260 options.max_subcompactions = max_subcompactions_;
1261
1262 DestroyAndReopen(options);
1263 int32_t value_size = 10 * 1024; // 10 KB
1264
1265 // Add 2 non-overlapping files
1266 Random rnd(301);
1267 std::map<int32_t, std::string> values;
1268
1269 // file 1 [0 => 300]
1270 for (int32_t i = 0; i <= 300; i++) {
1271 values[i] = RandomString(&rnd, value_size);
1272 ASSERT_OK(Put(Key(i), values[i]));
1273 }
1274 ASSERT_OK(Flush());
1275
1276 // file 2 [600 => 700]
1277 for (int32_t i = 600; i <= 700; i++) {
1278 values[i] = RandomString(&rnd, value_size);
1279 ASSERT_OK(Put(Key(i), values[i]));
1280 }
1281 ASSERT_OK(Flush());
1282
1283 // 2 files in L0
1284 ASSERT_EQ("2", FilesPerLevel(0));
1285 CompactRangeOptions compact_options;
1286 compact_options.change_level = true;
1287 compact_options.target_level = 6;
1288 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1289 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1290 // 2 files in L6
1291 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1292
1293 ASSERT_EQ(trivial_move, 1);
1294 ASSERT_EQ(non_trivial_move, 0);
1295
1296 for (int32_t i = 0; i <= 300; i++) {
1297 ASSERT_EQ(Get(Key(i)), values[i]);
1298 }
1299 for (int32_t i = 600; i <= 700; i++) {
1300 ASSERT_EQ(Get(Key(i)), values[i]);
1301 }
1302 }
1303
TEST_P(DBCompactionTestWithParam,ManualCompactionPartial)1304 TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
1305 int32_t trivial_move = 0;
1306 int32_t non_trivial_move = 0;
1307 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1308 "DBImpl::BackgroundCompaction:TrivialMove",
1309 [&](void* /*arg*/) { trivial_move++; });
1310 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1311 "DBImpl::BackgroundCompaction:NonTrivial",
1312 [&](void* /*arg*/) { non_trivial_move++; });
1313 bool first = true;
1314 // Purpose of dependencies:
1315 // 4 -> 1: ensure the order of two non-trivial compactions
1316 // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
1317 // are installed
1318 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1319 {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
1320 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
1321 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
1322 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1323 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1324 if (first) {
1325 first = false;
1326 TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
1327 TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
1328 } else { // second non-trivial compaction
1329 TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
1330 }
1331 });
1332
1333 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1334
1335 Options options = CurrentOptions();
1336 options.write_buffer_size = 10 * 1024 * 1024;
1337 options.num_levels = 7;
1338 options.max_subcompactions = max_subcompactions_;
1339 options.level0_file_num_compaction_trigger = 3;
1340 options.max_background_compactions = 3;
1341 options.target_file_size_base = 1 << 23; // 8 MB
1342
1343 DestroyAndReopen(options);
1344 int32_t value_size = 10 * 1024; // 10 KB
1345
1346 // Add 2 non-overlapping files
1347 Random rnd(301);
1348 std::map<int32_t, std::string> values;
1349
1350 // file 1 [0 => 100]
1351 for (int32_t i = 0; i < 100; i++) {
1352 values[i] = RandomString(&rnd, value_size);
1353 ASSERT_OK(Put(Key(i), values[i]));
1354 }
1355 ASSERT_OK(Flush());
1356
1357 // file 2 [100 => 300]
1358 for (int32_t i = 100; i < 300; i++) {
1359 values[i] = RandomString(&rnd, value_size);
1360 ASSERT_OK(Put(Key(i), values[i]));
1361 }
1362 ASSERT_OK(Flush());
1363
1364 // 2 files in L0
1365 ASSERT_EQ("2", FilesPerLevel(0));
1366 CompactRangeOptions compact_options;
1367 compact_options.change_level = true;
1368 compact_options.target_level = 6;
1369 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1370 // Trivial move the two non-overlapping files to level 6
1371 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1372 // 2 files in L6
1373 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1374
1375 ASSERT_EQ(trivial_move, 1);
1376 ASSERT_EQ(non_trivial_move, 0);
1377
1378 // file 3 [ 0 => 200]
1379 for (int32_t i = 0; i < 200; i++) {
1380 values[i] = RandomString(&rnd, value_size);
1381 ASSERT_OK(Put(Key(i), values[i]));
1382 }
1383 ASSERT_OK(Flush());
1384
1385 // 1 files in L0
1386 ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
1387 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1388 ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
1389 ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
1390 ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
1391 ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
1392 // 2 files in L6, 1 file in L5
1393 ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
1394
1395 ASSERT_EQ(trivial_move, 6);
1396 ASSERT_EQ(non_trivial_move, 0);
1397
1398 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1399 compact_options.change_level = false;
1400 compact_options.exclusive_manual_compaction = false;
1401 std::string begin_string = Key(0);
1402 std::string end_string = Key(199);
1403 Slice begin(begin_string);
1404 Slice end(end_string);
1405 // First non-trivial compaction is triggered
1406 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1407 });
1408
1409 TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
1410 // file 4 [300 => 400)
1411 for (int32_t i = 300; i <= 400; i++) {
1412 values[i] = RandomString(&rnd, value_size);
1413 ASSERT_OK(Put(Key(i), values[i]));
1414 }
1415 ASSERT_OK(Flush());
1416
1417 // file 5 [400 => 500)
1418 for (int32_t i = 400; i <= 500; i++) {
1419 values[i] = RandomString(&rnd, value_size);
1420 ASSERT_OK(Put(Key(i), values[i]));
1421 }
1422 ASSERT_OK(Flush());
1423
1424 // file 6 [500 => 600)
1425 for (int32_t i = 500; i <= 600; i++) {
1426 values[i] = RandomString(&rnd, value_size);
1427 ASSERT_OK(Put(Key(i), values[i]));
1428 }
1429 // Second non-trivial compaction is triggered
1430 ASSERT_OK(Flush());
1431
1432 // Before two non-trivial compactions are installed, there are 3 files in L0
1433 ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
1434 TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
1435
1436 dbfull()->TEST_WaitForFlushMemTable();
1437 dbfull()->TEST_WaitForCompact();
1438 // After two non-trivial compactions are installed, there is 1 file in L6, and
1439 // 1 file in L1
1440 ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
1441 threads.join();
1442
1443 for (int32_t i = 0; i < 600; i++) {
1444 ASSERT_EQ(Get(Key(i)), values[i]);
1445 }
1446 }
1447
1448 // Disable as the test is flaky.
TEST_F(DBCompactionTest,DISABLED_ManualPartialFill)1449 TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
1450 int32_t trivial_move = 0;
1451 int32_t non_trivial_move = 0;
1452 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1453 "DBImpl::BackgroundCompaction:TrivialMove",
1454 [&](void* /*arg*/) { trivial_move++; });
1455 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1456 "DBImpl::BackgroundCompaction:NonTrivial",
1457 [&](void* /*arg*/) { non_trivial_move++; });
1458 bool first = true;
1459 bool second = true;
1460 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1461 {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
1462 {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
1463 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1464 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
1465 if (first) {
1466 TEST_SYNC_POINT("DBCompaction::PartialFill:4");
1467 first = false;
1468 TEST_SYNC_POINT("DBCompaction::PartialFill:3");
1469 } else if (second) {
1470 }
1471 });
1472
1473 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1474
1475 Options options = CurrentOptions();
1476 options.write_buffer_size = 10 * 1024 * 1024;
1477 options.max_bytes_for_level_multiplier = 2;
1478 options.num_levels = 4;
1479 options.level0_file_num_compaction_trigger = 3;
1480 options.max_background_compactions = 3;
1481
1482 DestroyAndReopen(options);
1483 // make sure all background compaction jobs can be scheduled
1484 auto stop_token =
1485 dbfull()->TEST_write_controler().GetCompactionPressureToken();
1486 int32_t value_size = 10 * 1024; // 10 KB
1487
1488 // Add 2 non-overlapping files
1489 Random rnd(301);
1490 std::map<int32_t, std::string> values;
1491
1492 // file 1 [0 => 100]
1493 for (int32_t i = 0; i < 100; i++) {
1494 values[i] = RandomString(&rnd, value_size);
1495 ASSERT_OK(Put(Key(i), values[i]));
1496 }
1497 ASSERT_OK(Flush());
1498
1499 // file 2 [100 => 300]
1500 for (int32_t i = 100; i < 300; i++) {
1501 values[i] = RandomString(&rnd, value_size);
1502 ASSERT_OK(Put(Key(i), values[i]));
1503 }
1504 ASSERT_OK(Flush());
1505
1506 // 2 files in L0
1507 ASSERT_EQ("2", FilesPerLevel(0));
1508 CompactRangeOptions compact_options;
1509 compact_options.change_level = true;
1510 compact_options.target_level = 2;
1511 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1512 // 2 files in L2
1513 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1514
1515 ASSERT_EQ(trivial_move, 1);
1516 ASSERT_EQ(non_trivial_move, 0);
1517
1518 // file 3 [ 0 => 200]
1519 for (int32_t i = 0; i < 200; i++) {
1520 values[i] = RandomString(&rnd, value_size);
1521 ASSERT_OK(Put(Key(i), values[i]));
1522 }
1523 ASSERT_OK(Flush());
1524
1525 // 2 files in L2, 1 in L0
1526 ASSERT_EQ("1,0,2", FilesPerLevel(0));
1527 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1528 // 2 files in L2, 1 in L1
1529 ASSERT_EQ("0,1,2", FilesPerLevel(0));
1530
1531 ASSERT_EQ(trivial_move, 2);
1532 ASSERT_EQ(non_trivial_move, 0);
1533
1534 ROCKSDB_NAMESPACE::port::Thread threads([&] {
1535 compact_options.change_level = false;
1536 compact_options.exclusive_manual_compaction = false;
1537 std::string begin_string = Key(0);
1538 std::string end_string = Key(199);
1539 Slice begin(begin_string);
1540 Slice end(end_string);
1541 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1542 });
1543
1544 TEST_SYNC_POINT("DBCompaction::PartialFill:1");
1545 // Many files 4 [300 => 4300)
1546 for (int32_t i = 0; i <= 5; i++) {
1547 for (int32_t j = 300; j < 4300; j++) {
1548 if (j == 2300) {
1549 ASSERT_OK(Flush());
1550 dbfull()->TEST_WaitForFlushMemTable();
1551 }
1552 values[j] = RandomString(&rnd, value_size);
1553 ASSERT_OK(Put(Key(j), values[j]));
1554 }
1555 }
1556
1557 // Verify level sizes
1558 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1559 for (int32_t i = 1; i < options.num_levels; i++) {
1560 ASSERT_LE(SizeAtLevel(i), target_size);
1561 target_size = static_cast<uint64_t>(target_size *
1562 options.max_bytes_for_level_multiplier);
1563 }
1564
1565 TEST_SYNC_POINT("DBCompaction::PartialFill:2");
1566 dbfull()->TEST_WaitForFlushMemTable();
1567 dbfull()->TEST_WaitForCompact();
1568 threads.join();
1569
1570 for (int32_t i = 0; i < 4300; i++) {
1571 ASSERT_EQ(Get(Key(i)), values[i]);
1572 }
1573 }
1574
TEST_F(DBCompactionTest,ManualCompactionWithUnorderedWrite)1575 TEST_F(DBCompactionTest, ManualCompactionWithUnorderedWrite) {
1576 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1577 {{"DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL",
1578 "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL"},
1579 {"DBImpl::WaitForPendingWrites:BeforeBlock",
1580 "DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"}});
1581
1582 Options options = CurrentOptions();
1583 options.unordered_write = true;
1584 DestroyAndReopen(options);
1585 Put("foo", "v1");
1586 ASSERT_OK(Flush());
1587
1588 Put("bar", "v1");
1589 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1590 port::Thread writer([&]() { Put("foo", "v2"); });
1591
1592 TEST_SYNC_POINT(
1593 "DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
1594 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1595
1596 writer.join();
1597 ASSERT_EQ(Get("foo"), "v2");
1598
1599 SyncPoint::GetInstance()->DisableProcessing();
1600 SyncPoint::GetInstance()->ClearAllCallBacks();
1601
1602 Reopen(options);
1603 ASSERT_EQ(Get("foo"), "v2");
1604 }
1605
TEST_F(DBCompactionTest,DeleteFileRange)1606 TEST_F(DBCompactionTest, DeleteFileRange) {
1607 Options options = CurrentOptions();
1608 options.write_buffer_size = 10 * 1024 * 1024;
1609 options.max_bytes_for_level_multiplier = 2;
1610 options.num_levels = 4;
1611 options.level0_file_num_compaction_trigger = 3;
1612 options.max_background_compactions = 3;
1613
1614 DestroyAndReopen(options);
1615 int32_t value_size = 10 * 1024; // 10 KB
1616
1617 // Add 2 non-overlapping files
1618 Random rnd(301);
1619 std::map<int32_t, std::string> values;
1620
1621 // file 1 [0 => 100]
1622 for (int32_t i = 0; i < 100; i++) {
1623 values[i] = RandomString(&rnd, value_size);
1624 ASSERT_OK(Put(Key(i), values[i]));
1625 }
1626 ASSERT_OK(Flush());
1627
1628 // file 2 [100 => 300]
1629 for (int32_t i = 100; i < 300; i++) {
1630 values[i] = RandomString(&rnd, value_size);
1631 ASSERT_OK(Put(Key(i), values[i]));
1632 }
1633 ASSERT_OK(Flush());
1634
1635 // 2 files in L0
1636 ASSERT_EQ("2", FilesPerLevel(0));
1637 CompactRangeOptions compact_options;
1638 compact_options.change_level = true;
1639 compact_options.target_level = 2;
1640 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1641 // 2 files in L2
1642 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1643
1644 // file 3 [ 0 => 200]
1645 for (int32_t i = 0; i < 200; i++) {
1646 values[i] = RandomString(&rnd, value_size);
1647 ASSERT_OK(Put(Key(i), values[i]));
1648 }
1649 ASSERT_OK(Flush());
1650
1651 // Many files 4 [300 => 4300)
1652 for (int32_t i = 0; i <= 5; i++) {
1653 for (int32_t j = 300; j < 4300; j++) {
1654 if (j == 2300) {
1655 ASSERT_OK(Flush());
1656 dbfull()->TEST_WaitForFlushMemTable();
1657 }
1658 values[j] = RandomString(&rnd, value_size);
1659 ASSERT_OK(Put(Key(j), values[j]));
1660 }
1661 }
1662 ASSERT_OK(Flush());
1663 dbfull()->TEST_WaitForFlushMemTable();
1664 dbfull()->TEST_WaitForCompact();
1665
1666 // Verify level sizes
1667 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1668 for (int32_t i = 1; i < options.num_levels; i++) {
1669 ASSERT_LE(SizeAtLevel(i), target_size);
1670 target_size = static_cast<uint64_t>(target_size *
1671 options.max_bytes_for_level_multiplier);
1672 }
1673
1674 size_t old_num_files = CountFiles();
1675 std::string begin_string = Key(1000);
1676 std::string end_string = Key(2000);
1677 Slice begin(begin_string);
1678 Slice end(end_string);
1679 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1680
1681 int32_t deleted_count = 0;
1682 for (int32_t i = 0; i < 4300; i++) {
1683 if (i < 1000 || i > 2000) {
1684 ASSERT_EQ(Get(Key(i)), values[i]);
1685 } else {
1686 ReadOptions roptions;
1687 std::string result;
1688 Status s = db_->Get(roptions, Key(i), &result);
1689 ASSERT_TRUE(s.IsNotFound() || s.ok());
1690 if (s.IsNotFound()) {
1691 deleted_count++;
1692 }
1693 }
1694 }
1695 ASSERT_GT(deleted_count, 0);
1696 begin_string = Key(5000);
1697 end_string = Key(6000);
1698 Slice begin1(begin_string);
1699 Slice end1(end_string);
1700 // Try deleting files in range which contain no keys
1701 ASSERT_OK(
1702 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1));
1703
1704 // Push data from level 0 to level 1 to force all data to be deleted
1705 // Note that we don't delete level 0 files
1706 compact_options.change_level = true;
1707 compact_options.target_level = 1;
1708 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1709 dbfull()->TEST_WaitForCompact();
1710
1711 ASSERT_OK(
1712 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
1713
1714 int32_t deleted_count2 = 0;
1715 for (int32_t i = 0; i < 4300; i++) {
1716 ReadOptions roptions;
1717 std::string result;
1718 Status s = db_->Get(roptions, Key(i), &result);
1719 ASSERT_TRUE(s.IsNotFound());
1720 deleted_count2++;
1721 }
1722 ASSERT_GT(deleted_count2, deleted_count);
1723 size_t new_num_files = CountFiles();
1724 ASSERT_GT(old_num_files, new_num_files);
1725 }
1726
TEST_F(DBCompactionTest,DeleteFilesInRanges)1727 TEST_F(DBCompactionTest, DeleteFilesInRanges) {
1728 Options options = CurrentOptions();
1729 options.write_buffer_size = 10 * 1024 * 1024;
1730 options.max_bytes_for_level_multiplier = 2;
1731 options.num_levels = 4;
1732 options.max_background_compactions = 3;
1733 options.disable_auto_compactions = true;
1734
1735 DestroyAndReopen(options);
1736 int32_t value_size = 10 * 1024; // 10 KB
1737
1738 Random rnd(301);
1739 std::map<int32_t, std::string> values;
1740
1741 // file [0 => 100), [100 => 200), ... [900, 1000)
1742 for (auto i = 0; i < 10; i++) {
1743 for (auto j = 0; j < 100; j++) {
1744 auto k = i * 100 + j;
1745 values[k] = RandomString(&rnd, value_size);
1746 ASSERT_OK(Put(Key(k), values[k]));
1747 }
1748 ASSERT_OK(Flush());
1749 }
1750 ASSERT_EQ("10", FilesPerLevel(0));
1751 CompactRangeOptions compact_options;
1752 compact_options.change_level = true;
1753 compact_options.target_level = 2;
1754 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1755 ASSERT_EQ("0,0,10", FilesPerLevel(0));
1756
1757 // file [0 => 100), [200 => 300), ... [800, 900)
1758 for (auto i = 0; i < 10; i+=2) {
1759 for (auto j = 0; j < 100; j++) {
1760 auto k = i * 100 + j;
1761 ASSERT_OK(Put(Key(k), values[k]));
1762 }
1763 ASSERT_OK(Flush());
1764 }
1765 ASSERT_EQ("5,0,10", FilesPerLevel(0));
1766 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1767 ASSERT_EQ("0,5,10", FilesPerLevel(0));
1768
1769 // Delete files in range [0, 299] (inclusive)
1770 {
1771 auto begin_str1 = Key(0), end_str1 = Key(100);
1772 auto begin_str2 = Key(100), end_str2 = Key(200);
1773 auto begin_str3 = Key(200), end_str3 = Key(299);
1774 Slice begin1(begin_str1), end1(end_str1);
1775 Slice begin2(begin_str2), end2(end_str2);
1776 Slice begin3(begin_str3), end3(end_str3);
1777 std::vector<RangePtr> ranges;
1778 ranges.push_back(RangePtr(&begin1, &end1));
1779 ranges.push_back(RangePtr(&begin2, &end2));
1780 ranges.push_back(RangePtr(&begin3, &end3));
1781 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1782 ranges.data(), ranges.size()));
1783 ASSERT_EQ("0,3,7", FilesPerLevel(0));
1784
1785 // Keys [0, 300) should not exist.
1786 for (auto i = 0; i < 300; i++) {
1787 ReadOptions ropts;
1788 std::string result;
1789 auto s = db_->Get(ropts, Key(i), &result);
1790 ASSERT_TRUE(s.IsNotFound());
1791 }
1792 for (auto i = 300; i < 1000; i++) {
1793 ASSERT_EQ(Get(Key(i)), values[i]);
1794 }
1795 }
1796
1797 // Delete files in range [600, 999) (exclusive)
1798 {
1799 auto begin_str1 = Key(600), end_str1 = Key(800);
1800 auto begin_str2 = Key(700), end_str2 = Key(900);
1801 auto begin_str3 = Key(800), end_str3 = Key(999);
1802 Slice begin1(begin_str1), end1(end_str1);
1803 Slice begin2(begin_str2), end2(end_str2);
1804 Slice begin3(begin_str3), end3(end_str3);
1805 std::vector<RangePtr> ranges;
1806 ranges.push_back(RangePtr(&begin1, &end1));
1807 ranges.push_back(RangePtr(&begin2, &end2));
1808 ranges.push_back(RangePtr(&begin3, &end3));
1809 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1810 ranges.data(), ranges.size(), false));
1811 ASSERT_EQ("0,1,4", FilesPerLevel(0));
1812
1813 // Keys [600, 900) should not exist.
1814 for (auto i = 600; i < 900; i++) {
1815 ReadOptions ropts;
1816 std::string result;
1817 auto s = db_->Get(ropts, Key(i), &result);
1818 ASSERT_TRUE(s.IsNotFound());
1819 }
1820 for (auto i = 300; i < 600; i++) {
1821 ASSERT_EQ(Get(Key(i)), values[i]);
1822 }
1823 for (auto i = 900; i < 1000; i++) {
1824 ASSERT_EQ(Get(Key(i)), values[i]);
1825 }
1826 }
1827
1828 // Delete all files.
1829 {
1830 RangePtr range;
1831 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
1832 ASSERT_EQ("", FilesPerLevel(0));
1833
1834 for (auto i = 0; i < 1000; i++) {
1835 ReadOptions ropts;
1836 std::string result;
1837 auto s = db_->Get(ropts, Key(i), &result);
1838 ASSERT_TRUE(s.IsNotFound());
1839 }
1840 }
1841 }
1842
TEST_F(DBCompactionTest,DeleteFileRangeFileEndpointsOverlapBug)1843 TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
1844 // regression test for #2833: groups of files whose user-keys overlap at the
1845 // endpoints could be split by `DeleteFilesInRange`. This caused old data to
1846 // reappear, either because a new version of the key was removed, or a range
1847 // deletion was partially dropped. It could also cause non-overlapping
1848 // invariant to be violated if the files dropped by DeleteFilesInRange were
1849 // a subset of files that a range deletion spans.
1850 const int kNumL0Files = 2;
1851 const int kValSize = 8 << 10; // 8KB
1852 Options options = CurrentOptions();
1853 options.level0_file_num_compaction_trigger = kNumL0Files;
1854 options.target_file_size_base = 1 << 10; // 1KB
1855 DestroyAndReopen(options);
1856
1857 // The snapshot prevents key 1 from having its old version dropped. The low
1858 // `target_file_size_base` ensures two keys will be in each output file.
1859 const Snapshot* snapshot = nullptr;
1860 Random rnd(301);
1861 // The value indicates which flush the key belonged to, which is enough
1862 // for us to determine the keys' relative ages. After L0 flushes finish,
1863 // files look like:
1864 //
1865 // File 0: 0 -> vals[0], 1 -> vals[0]
1866 // File 1: 1 -> vals[1], 2 -> vals[1]
1867 //
1868 // Then L0->L1 compaction happens, which outputs keys as follows:
1869 //
1870 // File 0: 0 -> vals[0], 1 -> vals[1]
1871 // File 1: 1 -> vals[0], 2 -> vals[1]
1872 //
1873 // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
1874 // would cause `1 -> vals[0]` (an older key) to reappear.
1875 std::string vals[kNumL0Files];
1876 for (int i = 0; i < kNumL0Files; ++i) {
1877 vals[i] = RandomString(&rnd, kValSize);
1878 Put(Key(i), vals[i]);
1879 Put(Key(i + 1), vals[i]);
1880 Flush();
1881 if (i == 0) {
1882 snapshot = db_->GetSnapshot();
1883 }
1884 }
1885 dbfull()->TEST_WaitForCompact();
1886
1887 // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
1888 // "1 -> vals[0]" to reappear.
1889 std::string begin_str = Key(0), end_str = Key(1);
1890 Slice begin = begin_str, end = end_str;
1891 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1892 ASSERT_EQ(vals[1], Get(Key(1)));
1893
1894 db_->ReleaseSnapshot(snapshot);
1895 }
1896
TEST_P(DBCompactionTestWithParam,TrivialMoveToLastLevelWithFiles)1897 TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
1898 int32_t trivial_move = 0;
1899 int32_t non_trivial_move = 0;
1900 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1901 "DBImpl::BackgroundCompaction:TrivialMove",
1902 [&](void* /*arg*/) { trivial_move++; });
1903 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1904 "DBImpl::BackgroundCompaction:NonTrivial",
1905 [&](void* /*arg*/) { non_trivial_move++; });
1906 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1907
1908 Options options = CurrentOptions();
1909 options.write_buffer_size = 100000000;
1910 options.max_subcompactions = max_subcompactions_;
1911 DestroyAndReopen(options);
1912
1913 int32_t value_size = 10 * 1024; // 10 KB
1914
1915 Random rnd(301);
1916 std::vector<std::string> values;
1917 // File with keys [ 0 => 99 ]
1918 for (int i = 0; i < 100; i++) {
1919 values.push_back(RandomString(&rnd, value_size));
1920 ASSERT_OK(Put(Key(i), values[i]));
1921 }
1922 ASSERT_OK(Flush());
1923
1924 ASSERT_EQ("1", FilesPerLevel(0));
1925 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
1926 CompactRangeOptions compact_options;
1927 compact_options.change_level = true;
1928 compact_options.target_level = 3;
1929 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1930 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1931 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
1932 ASSERT_EQ(trivial_move, 1);
1933 ASSERT_EQ(non_trivial_move, 0);
1934
1935 // File with keys [ 100 => 199 ]
1936 for (int i = 100; i < 200; i++) {
1937 values.push_back(RandomString(&rnd, value_size));
1938 ASSERT_OK(Put(Key(i), values[i]));
1939 }
1940 ASSERT_OK(Flush());
1941
1942 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
1943 CompactRangeOptions cro;
1944 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1945 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
1946 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1947 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
1948 ASSERT_EQ(trivial_move, 4);
1949 ASSERT_EQ(non_trivial_move, 0);
1950
1951 for (int i = 0; i < 200; i++) {
1952 ASSERT_EQ(Get(Key(i)), values[i]);
1953 }
1954
1955 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1956 }
1957
TEST_P(DBCompactionTestWithParam,LevelCompactionThirdPath)1958 TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
1959 Options options = CurrentOptions();
1960 options.db_paths.emplace_back(dbname_, 500 * 1024);
1961 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1962 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1963 options.memtable_factory.reset(
1964 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1965 options.compaction_style = kCompactionStyleLevel;
1966 options.write_buffer_size = 110 << 10; // 110KB
1967 options.arena_block_size = 4 << 10;
1968 options.level0_file_num_compaction_trigger = 2;
1969 options.num_levels = 4;
1970 options.max_bytes_for_level_base = 400 * 1024;
1971 options.max_subcompactions = max_subcompactions_;
1972 // options = CurrentOptions(options);
1973
1974 std::vector<std::string> filenames;
1975 env_->GetChildren(options.db_paths[1].path, &filenames);
1976 // Delete archival files.
1977 for (size_t i = 0; i < filenames.size(); ++i) {
1978 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1979 }
1980 env_->DeleteDir(options.db_paths[1].path);
1981 Reopen(options);
1982
1983 Random rnd(301);
1984 int key_idx = 0;
1985
1986 // First three 110KB files are not going to second path.
1987 // After that, (100K, 200K)
1988 for (int num = 0; num < 3; num++) {
1989 GenerateNewFile(&rnd, &key_idx);
1990 }
1991
1992 // Another 110KB triggers a compaction to 400K file to fill up first path
1993 GenerateNewFile(&rnd, &key_idx);
1994 ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
1995
1996 // (1, 4)
1997 GenerateNewFile(&rnd, &key_idx);
1998 ASSERT_EQ("1,4", FilesPerLevel(0));
1999 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2000 ASSERT_EQ(1, GetSstFileCount(dbname_));
2001
2002 // (1, 4, 1)
2003 GenerateNewFile(&rnd, &key_idx);
2004 ASSERT_EQ("1,4,1", FilesPerLevel(0));
2005 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
2006 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2007 ASSERT_EQ(1, GetSstFileCount(dbname_));
2008
2009 // (1, 4, 2)
2010 GenerateNewFile(&rnd, &key_idx);
2011 ASSERT_EQ("1,4,2", FilesPerLevel(0));
2012 ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
2013 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2014 ASSERT_EQ(1, GetSstFileCount(dbname_));
2015
2016 // (1, 4, 3)
2017 GenerateNewFile(&rnd, &key_idx);
2018 ASSERT_EQ("1,4,3", FilesPerLevel(0));
2019 ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
2020 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2021 ASSERT_EQ(1, GetSstFileCount(dbname_));
2022
2023 // (1, 4, 4)
2024 GenerateNewFile(&rnd, &key_idx);
2025 ASSERT_EQ("1,4,4", FilesPerLevel(0));
2026 ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
2027 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2028 ASSERT_EQ(1, GetSstFileCount(dbname_));
2029
2030 // (1, 4, 5)
2031 GenerateNewFile(&rnd, &key_idx);
2032 ASSERT_EQ("1,4,5", FilesPerLevel(0));
2033 ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
2034 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2035 ASSERT_EQ(1, GetSstFileCount(dbname_));
2036
2037 // (1, 4, 6)
2038 GenerateNewFile(&rnd, &key_idx);
2039 ASSERT_EQ("1,4,6", FilesPerLevel(0));
2040 ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
2041 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2042 ASSERT_EQ(1, GetSstFileCount(dbname_));
2043
2044 // (1, 4, 7)
2045 GenerateNewFile(&rnd, &key_idx);
2046 ASSERT_EQ("1,4,7", FilesPerLevel(0));
2047 ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
2048 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2049 ASSERT_EQ(1, GetSstFileCount(dbname_));
2050
2051 // (1, 4, 8)
2052 GenerateNewFile(&rnd, &key_idx);
2053 ASSERT_EQ("1,4,8", FilesPerLevel(0));
2054 ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
2055 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
2056 ASSERT_EQ(1, GetSstFileCount(dbname_));
2057
2058 for (int i = 0; i < key_idx; i++) {
2059 auto v = Get(Key(i));
2060 ASSERT_NE(v, "NOT_FOUND");
2061 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2062 }
2063
2064 Reopen(options);
2065
2066 for (int i = 0; i < key_idx; i++) {
2067 auto v = Get(Key(i));
2068 ASSERT_NE(v, "NOT_FOUND");
2069 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2070 }
2071
2072 Destroy(options);
2073 }
2074
TEST_P(DBCompactionTestWithParam,LevelCompactionPathUse)2075 TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
2076 Options options = CurrentOptions();
2077 options.db_paths.emplace_back(dbname_, 500 * 1024);
2078 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2079 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2080 options.memtable_factory.reset(
2081 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2082 options.compaction_style = kCompactionStyleLevel;
2083 options.write_buffer_size = 110 << 10; // 110KB
2084 options.arena_block_size = 4 << 10;
2085 options.level0_file_num_compaction_trigger = 2;
2086 options.num_levels = 4;
2087 options.max_bytes_for_level_base = 400 * 1024;
2088 options.max_subcompactions = max_subcompactions_;
2089 // options = CurrentOptions(options);
2090
2091 std::vector<std::string> filenames;
2092 env_->GetChildren(options.db_paths[1].path, &filenames);
2093 // Delete archival files.
2094 for (size_t i = 0; i < filenames.size(); ++i) {
2095 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
2096 }
2097 env_->DeleteDir(options.db_paths[1].path);
2098 Reopen(options);
2099
2100 Random rnd(301);
2101 int key_idx = 0;
2102
2103 // Always gets compacted into 1 Level1 file,
2104 // 0/1 Level 0 file
2105 for (int num = 0; num < 3; num++) {
2106 key_idx = 0;
2107 GenerateNewFile(&rnd, &key_idx);
2108 }
2109
2110 key_idx = 0;
2111 GenerateNewFile(&rnd, &key_idx);
2112 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2113
2114 key_idx = 0;
2115 GenerateNewFile(&rnd, &key_idx);
2116 ASSERT_EQ("1,1", FilesPerLevel(0));
2117 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2118 ASSERT_EQ(1, GetSstFileCount(dbname_));
2119
2120 key_idx = 0;
2121 GenerateNewFile(&rnd, &key_idx);
2122 ASSERT_EQ("0,1", FilesPerLevel(0));
2123 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2124 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2125 ASSERT_EQ(0, GetSstFileCount(dbname_));
2126
2127 key_idx = 0;
2128 GenerateNewFile(&rnd, &key_idx);
2129 ASSERT_EQ("1,1", FilesPerLevel(0));
2130 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2131 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2132 ASSERT_EQ(1, GetSstFileCount(dbname_));
2133
2134 key_idx = 0;
2135 GenerateNewFile(&rnd, &key_idx);
2136 ASSERT_EQ("0,1", FilesPerLevel(0));
2137 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2138 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2139 ASSERT_EQ(0, GetSstFileCount(dbname_));
2140
2141 key_idx = 0;
2142 GenerateNewFile(&rnd, &key_idx);
2143 ASSERT_EQ("1,1", FilesPerLevel(0));
2144 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2145 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2146 ASSERT_EQ(1, GetSstFileCount(dbname_));
2147
2148 key_idx = 0;
2149 GenerateNewFile(&rnd, &key_idx);
2150 ASSERT_EQ("0,1", FilesPerLevel(0));
2151 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2152 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2153 ASSERT_EQ(0, GetSstFileCount(dbname_));
2154
2155 key_idx = 0;
2156 GenerateNewFile(&rnd, &key_idx);
2157 ASSERT_EQ("1,1", FilesPerLevel(0));
2158 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2159 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2160 ASSERT_EQ(1, GetSstFileCount(dbname_));
2161
2162 key_idx = 0;
2163 GenerateNewFile(&rnd, &key_idx);
2164 ASSERT_EQ("0,1", FilesPerLevel(0));
2165 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2166 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2167 ASSERT_EQ(0, GetSstFileCount(dbname_));
2168
2169 key_idx = 0;
2170 GenerateNewFile(&rnd, &key_idx);
2171 ASSERT_EQ("1,1", FilesPerLevel(0));
2172 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2173 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2174 ASSERT_EQ(1, GetSstFileCount(dbname_));
2175
2176 for (int i = 0; i < key_idx; i++) {
2177 auto v = Get(Key(i));
2178 ASSERT_NE(v, "NOT_FOUND");
2179 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2180 }
2181
2182 Reopen(options);
2183
2184 for (int i = 0; i < key_idx; i++) {
2185 auto v = Get(Key(i));
2186 ASSERT_NE(v, "NOT_FOUND");
2187 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2188 }
2189
2190 Destroy(options);
2191 }
2192
TEST_P(DBCompactionTestWithParam,LevelCompactionCFPathUse)2193 TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
2194 Options options = CurrentOptions();
2195 options.db_paths.emplace_back(dbname_, 500 * 1024);
2196 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2197 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2198 options.memtable_factory.reset(
2199 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2200 options.compaction_style = kCompactionStyleLevel;
2201 options.write_buffer_size = 110 << 10; // 110KB
2202 options.arena_block_size = 4 << 10;
2203 options.level0_file_num_compaction_trigger = 2;
2204 options.num_levels = 4;
2205 options.max_bytes_for_level_base = 400 * 1024;
2206 options.max_subcompactions = max_subcompactions_;
2207
2208 std::vector<Options> option_vector;
2209 option_vector.emplace_back(options);
2210 ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
2211 // Configure CF1 specific paths.
2212 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
2213 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
2214 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
2215 option_vector.emplace_back(DBOptions(options), cf_opt1);
2216 CreateColumnFamilies({"one"},option_vector[1]);
2217
2218 // Configura CF2 specific paths.
2219 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
2220 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
2221 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
2222 option_vector.emplace_back(DBOptions(options), cf_opt2);
2223 CreateColumnFamilies({"two"},option_vector[2]);
2224
2225 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2226
2227 Random rnd(301);
2228 int key_idx = 0;
2229 int key_idx1 = 0;
2230 int key_idx2 = 0;
2231
2232 auto generate_file = [&]() {
2233 GenerateNewFile(0, &rnd, &key_idx);
2234 GenerateNewFile(1, &rnd, &key_idx1);
2235 GenerateNewFile(2, &rnd, &key_idx2);
2236 };
2237
2238 auto check_sstfilecount = [&](int path_id, int expected) {
2239 ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
2240 ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
2241 ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
2242 };
2243
2244 auto check_filesperlevel = [&](const std::string& expected) {
2245 ASSERT_EQ(expected, FilesPerLevel(0));
2246 ASSERT_EQ(expected, FilesPerLevel(1));
2247 ASSERT_EQ(expected, FilesPerLevel(2));
2248 };
2249
2250 auto check_getvalues = [&]() {
2251 for (int i = 0; i < key_idx; i++) {
2252 auto v = Get(0, Key(i));
2253 ASSERT_NE(v, "NOT_FOUND");
2254 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2255 }
2256
2257 for (int i = 0; i < key_idx1; i++) {
2258 auto v = Get(1, Key(i));
2259 ASSERT_NE(v, "NOT_FOUND");
2260 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2261 }
2262
2263 for (int i = 0; i < key_idx2; i++) {
2264 auto v = Get(2, Key(i));
2265 ASSERT_NE(v, "NOT_FOUND");
2266 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2267 }
2268 };
2269
2270 // Check that default column family uses db_paths.
2271 // And Column family "one" uses cf_paths.
2272
2273 // First three 110KB files are not going to second path.
2274 // After that, (100K, 200K)
2275 for (int num = 0; num < 3; num++) {
2276 generate_file();
2277 }
2278
2279 // Another 110KB triggers a compaction to 400K file to fill up first path
2280 generate_file();
2281 check_sstfilecount(1, 3);
2282
2283 // (1, 4)
2284 generate_file();
2285 check_filesperlevel("1,4");
2286 check_sstfilecount(1, 4);
2287 check_sstfilecount(0, 1);
2288
2289 // (1, 4, 1)
2290 generate_file();
2291 check_filesperlevel("1,4,1");
2292 check_sstfilecount(2, 1);
2293 check_sstfilecount(1, 4);
2294 check_sstfilecount(0, 1);
2295
2296 // (1, 4, 2)
2297 generate_file();
2298 check_filesperlevel("1,4,2");
2299 check_sstfilecount(2, 2);
2300 check_sstfilecount(1, 4);
2301 check_sstfilecount(0, 1);
2302
2303 check_getvalues();
2304
2305 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2306
2307 check_getvalues();
2308
2309 Destroy(options, true);
2310 }
2311
TEST_P(DBCompactionTestWithParam,ConvertCompactionStyle)2312 TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
2313 Random rnd(301);
2314 int max_key_level_insert = 200;
2315 int max_key_universal_insert = 600;
2316
2317 // Stage 1: generate a db with level compaction
2318 Options options = CurrentOptions();
2319 options.write_buffer_size = 110 << 10; // 110KB
2320 options.arena_block_size = 4 << 10;
2321 options.num_levels = 4;
2322 options.level0_file_num_compaction_trigger = 3;
2323 options.max_bytes_for_level_base = 500 << 10; // 500KB
2324 options.max_bytes_for_level_multiplier = 1;
2325 options.target_file_size_base = 200 << 10; // 200KB
2326 options.target_file_size_multiplier = 1;
2327 options.max_subcompactions = max_subcompactions_;
2328 CreateAndReopenWithCF({"pikachu"}, options);
2329
2330 for (int i = 0; i <= max_key_level_insert; i++) {
2331 // each value is 10K
2332 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2333 }
2334 ASSERT_OK(Flush(1));
2335 dbfull()->TEST_WaitForCompact();
2336
2337 ASSERT_GT(TotalTableFiles(1, 4), 1);
2338 int non_level0_num_files = 0;
2339 for (int i = 1; i < options.num_levels; i++) {
2340 non_level0_num_files += NumTableFilesAtLevel(i, 1);
2341 }
2342 ASSERT_GT(non_level0_num_files, 0);
2343
2344 // Stage 2: reopen with universal compaction - should fail
2345 options = CurrentOptions();
2346 options.compaction_style = kCompactionStyleUniversal;
2347 options.num_levels = 1;
2348 options = CurrentOptions(options);
2349 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2350 ASSERT_TRUE(s.IsInvalidArgument());
2351
2352 // Stage 3: compact into a single file and move the file to level 0
2353 options = CurrentOptions();
2354 options.disable_auto_compactions = true;
2355 options.target_file_size_base = INT_MAX;
2356 options.target_file_size_multiplier = 1;
2357 options.max_bytes_for_level_base = INT_MAX;
2358 options.max_bytes_for_level_multiplier = 1;
2359 options.num_levels = 4;
2360 options = CurrentOptions(options);
2361 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2362
2363 CompactRangeOptions compact_options;
2364 compact_options.change_level = true;
2365 compact_options.target_level = 0;
2366 // cannot use kForceOptimized here because the compaction here is expected
2367 // to generate one output file
2368 compact_options.bottommost_level_compaction =
2369 BottommostLevelCompaction::kForce;
2370 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2371 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2372
2373 // Only 1 file in L0
2374 ASSERT_EQ("1", FilesPerLevel(1));
2375
2376 // Stage 4: re-open in universal compaction style and do some db operations
2377 options = CurrentOptions();
2378 options.compaction_style = kCompactionStyleUniversal;
2379 options.num_levels = 4;
2380 options.write_buffer_size = 110 << 10; // 110KB
2381 options.arena_block_size = 4 << 10;
2382 options.level0_file_num_compaction_trigger = 3;
2383 options = CurrentOptions(options);
2384 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2385
2386 options.num_levels = 1;
2387 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2388
2389 for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
2390 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2391 }
2392 dbfull()->Flush(FlushOptions());
2393 ASSERT_OK(Flush(1));
2394 dbfull()->TEST_WaitForCompact();
2395
2396 for (int i = 1; i < options.num_levels; i++) {
2397 ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
2398 }
2399
2400 // verify keys inserted in both level compaction style and universal
2401 // compaction style
2402 std::string keys_in_db;
2403 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
2404 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2405 keys_in_db.append(iter->key().ToString());
2406 keys_in_db.push_back(',');
2407 }
2408 delete iter;
2409
2410 std::string expected_keys;
2411 for (int i = 0; i <= max_key_universal_insert; i++) {
2412 expected_keys.append(Key(i));
2413 expected_keys.push_back(',');
2414 }
2415
2416 ASSERT_EQ(keys_in_db, expected_keys);
2417 }
2418
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_a)2419 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
2420 do {
2421 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2422 ASSERT_OK(Put(1, "b", "v"));
2423 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2424 ASSERT_OK(Delete(1, "b"));
2425 ASSERT_OK(Delete(1, "a"));
2426 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2427 ASSERT_OK(Delete(1, "a"));
2428 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2429 ASSERT_OK(Put(1, "a", "v"));
2430 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2431 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2432 ASSERT_EQ("(a->v)", Contents(1));
2433 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2434 ASSERT_EQ("(a->v)", Contents(1));
2435 } while (ChangeCompactOptions());
2436 }
2437
TEST_F(DBCompactionTest,L0_CompactionBug_Issue44_b)2438 TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
2439 do {
2440 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2441 Put(1, "", "");
2442 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2443 Delete(1, "e");
2444 Put(1, "", "");
2445 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2446 Put(1, "c", "cv");
2447 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2448 Put(1, "", "");
2449 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2450 Put(1, "", "");
2451 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2452 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2453 Put(1, "d", "dv");
2454 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2455 Put(1, "", "");
2456 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2457 Delete(1, "d");
2458 Delete(1, "b");
2459 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2460 ASSERT_EQ("(->)(c->cv)", Contents(1));
2461 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2462 ASSERT_EQ("(->)(c->cv)", Contents(1));
2463 } while (ChangeCompactOptions());
2464 }
2465
TEST_F(DBCompactionTest,ManualAutoRace)2466 TEST_F(DBCompactionTest, ManualAutoRace) {
2467 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2468 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2469 {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
2470 {"DBImpl::RunManualCompaction:WaitScheduled",
2471 "BackgroundCallCompaction:0"}});
2472
2473 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2474
2475 Put(1, "foo", "");
2476 Put(1, "bar", "");
2477 Flush(1);
2478 Put(1, "foo", "");
2479 Put(1, "bar", "");
2480 // Generate four files in CF 0, which should trigger an auto compaction
2481 Put("foo", "");
2482 Put("bar", "");
2483 Flush();
2484 Put("foo", "");
2485 Put("bar", "");
2486 Flush();
2487 Put("foo", "");
2488 Put("bar", "");
2489 Flush();
2490 Put("foo", "");
2491 Put("bar", "");
2492 Flush();
2493
2494 // The auto compaction is scheduled but waited until here
2495 TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
2496 // The auto compaction will wait until the manual compaction is registerd
2497 // before processing so that it will be cancelled.
2498 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
2499 ASSERT_EQ("0,1", FilesPerLevel(1));
2500
2501 // Eventually the cancelled compaction will be rescheduled and executed.
2502 dbfull()->TEST_WaitForCompact();
2503 ASSERT_EQ("0,1", FilesPerLevel(0));
2504 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2505 }
2506
TEST_P(DBCompactionTestWithParam,ManualCompaction)2507 TEST_P(DBCompactionTestWithParam, ManualCompaction) {
2508 Options options = CurrentOptions();
2509 options.max_subcompactions = max_subcompactions_;
2510 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2511 CreateAndReopenWithCF({"pikachu"}, options);
2512
2513 // iter - 0 with 7 levels
2514 // iter - 1 with 3 levels
2515 for (int iter = 0; iter < 2; ++iter) {
2516 MakeTables(3, "p", "q", 1);
2517 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2518
2519 // Compaction range falls before files
2520 Compact(1, "", "c");
2521 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2522
2523 // Compaction range falls after files
2524 Compact(1, "r", "z");
2525 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2526
2527 // Compaction range overlaps files
2528 Compact(1, "p", "q");
2529 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2530
2531 // Populate a different range
2532 MakeTables(3, "c", "e", 1);
2533 ASSERT_EQ("1,1,2", FilesPerLevel(1));
2534
2535 // Compact just the new range
2536 Compact(1, "b", "f");
2537 ASSERT_EQ("0,0,2", FilesPerLevel(1));
2538
2539 // Compact all
2540 MakeTables(1, "a", "z", 1);
2541 ASSERT_EQ("1,0,2", FilesPerLevel(1));
2542
2543 uint64_t prev_block_cache_add =
2544 options.statistics->getTickerCount(BLOCK_CACHE_ADD);
2545 CompactRangeOptions cro;
2546 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2547 db_->CompactRange(cro, handles_[1], nullptr, nullptr);
2548 // Verify manual compaction doesn't fill block cache
2549 ASSERT_EQ(prev_block_cache_add,
2550 options.statistics->getTickerCount(BLOCK_CACHE_ADD));
2551
2552 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2553
2554 if (iter == 0) {
2555 options = CurrentOptions();
2556 options.num_levels = 3;
2557 options.create_if_missing = true;
2558 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
2559 DestroyAndReopen(options);
2560 CreateAndReopenWithCF({"pikachu"}, options);
2561 }
2562 }
2563 }
2564
2565
TEST_P(DBCompactionTestWithParam,ManualLevelCompactionOutputPathId)2566 TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
2567 Options options = CurrentOptions();
2568 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2569 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2570 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2571 options.max_subcompactions = max_subcompactions_;
2572 CreateAndReopenWithCF({"pikachu"}, options);
2573
2574 // iter - 0 with 7 levels
2575 // iter - 1 with 3 levels
2576 for (int iter = 0; iter < 2; ++iter) {
2577 for (int i = 0; i < 3; ++i) {
2578 ASSERT_OK(Put(1, "p", "begin"));
2579 ASSERT_OK(Put(1, "q", "end"));
2580 ASSERT_OK(Flush(1));
2581 }
2582 ASSERT_EQ("3", FilesPerLevel(1));
2583 ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
2584 ASSERT_EQ(0, GetSstFileCount(dbname_));
2585
2586 // Compaction range falls before files
2587 Compact(1, "", "c");
2588 ASSERT_EQ("3", FilesPerLevel(1));
2589
2590 // Compaction range falls after files
2591 Compact(1, "r", "z");
2592 ASSERT_EQ("3", FilesPerLevel(1));
2593
2594 // Compaction range overlaps files
2595 Compact(1, "p", "q", 1);
2596 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2597 ASSERT_EQ("0,1", FilesPerLevel(1));
2598 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2599 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2600 ASSERT_EQ(0, GetSstFileCount(dbname_));
2601
2602 // Populate a different range
2603 for (int i = 0; i < 3; ++i) {
2604 ASSERT_OK(Put(1, "c", "begin"));
2605 ASSERT_OK(Put(1, "e", "end"));
2606 ASSERT_OK(Flush(1));
2607 }
2608 ASSERT_EQ("3,1", FilesPerLevel(1));
2609
2610 // Compact just the new range
2611 Compact(1, "b", "f", 1);
2612 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2613 ASSERT_EQ("0,2", FilesPerLevel(1));
2614 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2615 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2616 ASSERT_EQ(0, GetSstFileCount(dbname_));
2617
2618 // Compact all
2619 ASSERT_OK(Put(1, "a", "begin"));
2620 ASSERT_OK(Put(1, "z", "end"));
2621 ASSERT_OK(Flush(1));
2622 ASSERT_EQ("1,2", FilesPerLevel(1));
2623 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2624 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
2625 CompactRangeOptions compact_options;
2626 compact_options.target_path_id = 1;
2627 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2628 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2629 ASSERT_OK(dbfull()->TEST_WaitForCompact());
2630
2631 ASSERT_EQ("0,1", FilesPerLevel(1));
2632 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2633 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2634 ASSERT_EQ(0, GetSstFileCount(dbname_));
2635
2636 if (iter == 0) {
2637 DestroyAndReopen(options);
2638 options = CurrentOptions();
2639 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2640 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2641 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2642 options.max_background_flushes = 1;
2643 options.num_levels = 3;
2644 options.create_if_missing = true;
2645 CreateAndReopenWithCF({"pikachu"}, options);
2646 }
2647 }
2648 }
2649
TEST_F(DBCompactionTest,FilesDeletedAfterCompaction)2650 TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) {
2651 do {
2652 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2653 ASSERT_OK(Put(1, "foo", "v2"));
2654 Compact(1, "a", "z");
2655 const size_t num_files = CountLiveFiles();
2656 for (int i = 0; i < 10; i++) {
2657 ASSERT_OK(Put(1, "foo", "v2"));
2658 Compact(1, "a", "z");
2659 }
2660 ASSERT_EQ(CountLiveFiles(), num_files);
2661 } while (ChangeCompactOptions());
2662 }
2663
2664 // Check level comapction with compact files
TEST_P(DBCompactionTestWithParam,DISABLED_CompactFilesOnLevelCompaction)2665 TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
2666 const int kTestKeySize = 16;
2667 const int kTestValueSize = 984;
2668 const int kEntrySize = kTestKeySize + kTestValueSize;
2669 const int kEntriesPerBuffer = 100;
2670 Options options;
2671 options.create_if_missing = true;
2672 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
2673 options.compaction_style = kCompactionStyleLevel;
2674 options.target_file_size_base = options.write_buffer_size;
2675 options.max_bytes_for_level_base = options.target_file_size_base * 2;
2676 options.level0_stop_writes_trigger = 2;
2677 options.max_bytes_for_level_multiplier = 2;
2678 options.compression = kNoCompression;
2679 options.max_subcompactions = max_subcompactions_;
2680 options = CurrentOptions(options);
2681 CreateAndReopenWithCF({"pikachu"}, options);
2682
2683 Random rnd(301);
2684 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2685 ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
2686 }
2687 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
2688 dbfull()->TEST_WaitForCompact();
2689
2690 ColumnFamilyMetaData cf_meta;
2691 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2692 int output_level = static_cast<int>(cf_meta.levels.size()) - 1;
2693 for (int file_picked = 5; file_picked > 0; --file_picked) {
2694 std::set<std::string> overlapping_file_names;
2695 std::vector<std::string> compaction_input_file_names;
2696 for (int f = 0; f < file_picked; ++f) {
2697 int level = 0;
2698 auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
2699 compaction_input_file_names.push_back(file_meta->name);
2700 GetOverlappingFileNumbersForLevelCompaction(
2701 cf_meta, options.comparator, level, output_level,
2702 file_meta, &overlapping_file_names);
2703 }
2704
2705 ASSERT_OK(dbfull()->CompactFiles(
2706 CompactionOptions(), handles_[1],
2707 compaction_input_file_names,
2708 output_level));
2709
2710 // Make sure all overlapping files do not exist after compaction
2711 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2712 VerifyCompactionResult(cf_meta, overlapping_file_names);
2713 }
2714
2715 // make sure all key-values are still there.
2716 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2717 ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
2718 }
2719 }
2720
TEST_P(DBCompactionTestWithParam,PartialCompactionFailure)2721 TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
2722 Options options;
2723 const int kKeySize = 16;
2724 const int kKvSize = 1000;
2725 const int kKeysPerBuffer = 100;
2726 const int kNumL1Files = 5;
2727 options.create_if_missing = true;
2728 options.write_buffer_size = kKeysPerBuffer * kKvSize;
2729 options.max_write_buffer_number = 2;
2730 options.target_file_size_base =
2731 options.write_buffer_size *
2732 (options.max_write_buffer_number - 1);
2733 options.level0_file_num_compaction_trigger = kNumL1Files;
2734 options.max_bytes_for_level_base =
2735 options.level0_file_num_compaction_trigger *
2736 options.target_file_size_base;
2737 options.max_bytes_for_level_multiplier = 2;
2738 options.compression = kNoCompression;
2739 options.max_subcompactions = max_subcompactions_;
2740
2741 env_->SetBackgroundThreads(1, Env::HIGH);
2742 env_->SetBackgroundThreads(1, Env::LOW);
2743 // stop the compaction thread until we simulate the file creation failure.
2744 test::SleepingBackgroundTask sleeping_task_low;
2745 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
2746 Env::Priority::LOW);
2747
2748 options.env = env_;
2749
2750 DestroyAndReopen(options);
2751
2752 const int kNumInsertedKeys =
2753 options.level0_file_num_compaction_trigger *
2754 (options.max_write_buffer_number - 1) *
2755 kKeysPerBuffer;
2756
2757 Random rnd(301);
2758 std::vector<std::string> keys;
2759 std::vector<std::string> values;
2760 for (int k = 0; k < kNumInsertedKeys; ++k) {
2761 keys.emplace_back(RandomString(&rnd, kKeySize));
2762 values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
2763 ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
2764 dbfull()->TEST_WaitForFlushMemTable();
2765 }
2766
2767 dbfull()->TEST_FlushMemTable(true);
2768 // Make sure the number of L0 files can trigger compaction.
2769 ASSERT_GE(NumTableFilesAtLevel(0),
2770 options.level0_file_num_compaction_trigger);
2771
2772 auto previous_num_level0_files = NumTableFilesAtLevel(0);
2773
2774 // Fail the first file creation.
2775 env_->non_writable_count_ = 1;
2776 sleeping_task_low.WakeUp();
2777 sleeping_task_low.WaitUntilDone();
2778
2779 // Expect compaction to fail here as one file will fail its
2780 // creation.
2781 ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok());
2782
2783 // Verify L0 -> L1 compaction does fail.
2784 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
2785
2786 // Verify all L0 files are still there.
2787 ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
2788
2789 // All key-values must exist after compaction fails.
2790 for (int k = 0; k < kNumInsertedKeys; ++k) {
2791 ASSERT_EQ(values[k], Get(keys[k]));
2792 }
2793
2794 env_->non_writable_count_ = 0;
2795
2796 // Make sure RocksDB will not get into corrupted state.
2797 Reopen(options);
2798
2799 // Verify again after reopen.
2800 for (int k = 0; k < kNumInsertedKeys; ++k) {
2801 ASSERT_EQ(values[k], Get(keys[k]));
2802 }
2803 }
2804
TEST_P(DBCompactionTestWithParam,DeleteMovedFileAfterCompaction)2805 TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2806 // iter 1 -- delete_obsolete_files_period_micros == 0
2807 for (int iter = 0; iter < 2; ++iter) {
2808 // This test triggers move compaction and verifies that the file is not
2809 // deleted when it's part of move compaction
2810 Options options = CurrentOptions();
2811 options.env = env_;
2812 if (iter == 1) {
2813 options.delete_obsolete_files_period_micros = 0;
2814 }
2815 options.create_if_missing = true;
2816 options.level0_file_num_compaction_trigger =
2817 2; // trigger compaction when we have 2 files
2818 OnFileDeletionListener* listener = new OnFileDeletionListener();
2819 options.listeners.emplace_back(listener);
2820 options.max_subcompactions = max_subcompactions_;
2821 DestroyAndReopen(options);
2822
2823 Random rnd(301);
2824 // Create two 1MB sst files
2825 for (int i = 0; i < 2; ++i) {
2826 // Create 1MB sst file
2827 for (int j = 0; j < 100; ++j) {
2828 ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
2829 }
2830 ASSERT_OK(Flush());
2831 }
2832 // this should execute L0->L1
2833 dbfull()->TEST_WaitForCompact();
2834 ASSERT_EQ("0,1", FilesPerLevel(0));
2835
2836 // block compactions
2837 test::SleepingBackgroundTask sleeping_task;
2838 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2839 Env::Priority::LOW);
2840
2841 options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
2842 Reopen(options);
2843 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
2844 ASSERT_EQ("0,1", FilesPerLevel(0));
2845 // let compactions go
2846 sleeping_task.WakeUp();
2847 sleeping_task.WaitUntilDone();
2848
2849 // this should execute L1->L2 (move)
2850 dbfull()->TEST_WaitForCompact();
2851
2852 ASSERT_EQ("0,0,1", FilesPerLevel(0));
2853
2854 std::vector<LiveFileMetaData> metadata;
2855 db_->GetLiveFilesMetaData(&metadata);
2856 ASSERT_EQ(metadata.size(), 1U);
2857 auto moved_file_name = metadata[0].name;
2858
2859 // Create two more 1MB sst files
2860 for (int i = 0; i < 2; ++i) {
2861 // Create 1MB sst file
2862 for (int j = 0; j < 100; ++j) {
2863 ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
2864 }
2865 ASSERT_OK(Flush());
2866 }
2867 // this should execute both L0->L1 and L1->L2 (merge with previous file)
2868 dbfull()->TEST_WaitForCompact();
2869
2870 ASSERT_EQ("0,0,2", FilesPerLevel(0));
2871
2872 // iterator is holding the file
2873 ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
2874
2875 listener->SetExpectedFileName(dbname_ + moved_file_name);
2876 iterator.reset();
2877
2878 // this file should have been compacted away
2879 ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name));
2880 listener->VerifyMatchedCount(1);
2881 }
2882 }
2883
TEST_P(DBCompactionTestWithParam,CompressLevelCompaction)2884 TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
2885 if (!Zlib_Supported()) {
2886 return;
2887 }
2888 Options options = CurrentOptions();
2889 options.memtable_factory.reset(
2890 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2891 options.compaction_style = kCompactionStyleLevel;
2892 options.write_buffer_size = 110 << 10; // 110KB
2893 options.arena_block_size = 4 << 10;
2894 options.level0_file_num_compaction_trigger = 2;
2895 options.num_levels = 4;
2896 options.max_bytes_for_level_base = 400 * 1024;
2897 options.max_subcompactions = max_subcompactions_;
2898 // First two levels have no compression, so that a trivial move between
2899 // them will be allowed. Level 2 has Zlib compression so that a trivial
2900 // move to level 3 will not be allowed
2901 options.compression_per_level = {kNoCompression, kNoCompression,
2902 kZlibCompression};
2903 int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
2904
2905 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2906 "Compaction::InputCompressionMatchesOutput:Matches",
2907 [&](void* /*arg*/) { matches++; });
2908 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2909 "Compaction::InputCompressionMatchesOutput:DidntMatch",
2910 [&](void* /*arg*/) { didnt_match++; });
2911 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2912 "DBImpl::BackgroundCompaction:NonTrivial",
2913 [&](void* /*arg*/) { non_trivial++; });
2914 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2915 "DBImpl::BackgroundCompaction:TrivialMove",
2916 [&](void* /*arg*/) { trivial_move++; });
2917 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2918
2919 Reopen(options);
2920
2921 Random rnd(301);
2922 int key_idx = 0;
2923
2924 // First three 110KB files are going to level 0
2925 // After that, (100K, 200K)
2926 for (int num = 0; num < 3; num++) {
2927 GenerateNewFile(&rnd, &key_idx);
2928 }
2929
2930 // Another 110KB triggers a compaction to 400K file to fill up level 0
2931 GenerateNewFile(&rnd, &key_idx);
2932 ASSERT_EQ(4, GetSstFileCount(dbname_));
2933
2934 // (1, 4)
2935 GenerateNewFile(&rnd, &key_idx);
2936 ASSERT_EQ("1,4", FilesPerLevel(0));
2937
2938 // (1, 4, 1)
2939 GenerateNewFile(&rnd, &key_idx);
2940 ASSERT_EQ("1,4,1", FilesPerLevel(0));
2941
2942 // (1, 4, 2)
2943 GenerateNewFile(&rnd, &key_idx);
2944 ASSERT_EQ("1,4,2", FilesPerLevel(0));
2945
2946 // (1, 4, 3)
2947 GenerateNewFile(&rnd, &key_idx);
2948 ASSERT_EQ("1,4,3", FilesPerLevel(0));
2949
2950 // (1, 4, 4)
2951 GenerateNewFile(&rnd, &key_idx);
2952 ASSERT_EQ("1,4,4", FilesPerLevel(0));
2953
2954 // (1, 4, 5)
2955 GenerateNewFile(&rnd, &key_idx);
2956 ASSERT_EQ("1,4,5", FilesPerLevel(0));
2957
2958 // (1, 4, 6)
2959 GenerateNewFile(&rnd, &key_idx);
2960 ASSERT_EQ("1,4,6", FilesPerLevel(0));
2961
2962 // (1, 4, 7)
2963 GenerateNewFile(&rnd, &key_idx);
2964 ASSERT_EQ("1,4,7", FilesPerLevel(0));
2965
2966 // (1, 4, 8)
2967 GenerateNewFile(&rnd, &key_idx);
2968 ASSERT_EQ("1,4,8", FilesPerLevel(0));
2969
2970 ASSERT_EQ(matches, 12);
2971 // Currently, the test relies on the number of calls to
2972 // InputCompressionMatchesOutput() per compaction.
2973 const int kCallsToInputCompressionMatch = 2;
2974 ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
2975 ASSERT_EQ(trivial_move, 12);
2976 ASSERT_EQ(non_trivial, 8);
2977
2978 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
2979
2980 for (int i = 0; i < key_idx; i++) {
2981 auto v = Get(Key(i));
2982 ASSERT_NE(v, "NOT_FOUND");
2983 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2984 }
2985
2986 Reopen(options);
2987
2988 for (int i = 0; i < key_idx; i++) {
2989 auto v = Get(Key(i));
2990 ASSERT_NE(v, "NOT_FOUND");
2991 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2992 }
2993
2994 Destroy(options);
2995 }
2996
TEST_F(DBCompactionTest,SanitizeCompactionOptionsTest)2997 TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
2998 Options options = CurrentOptions();
2999 options.max_background_compactions = 5;
3000 options.soft_pending_compaction_bytes_limit = 0;
3001 options.hard_pending_compaction_bytes_limit = 100;
3002 options.create_if_missing = true;
3003 DestroyAndReopen(options);
3004 ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
3005
3006 options.max_background_compactions = 3;
3007 options.soft_pending_compaction_bytes_limit = 200;
3008 options.hard_pending_compaction_bytes_limit = 150;
3009 DestroyAndReopen(options);
3010 ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
3011 }
3012
3013 // This tests for a bug that could cause two level0 compactions running
3014 // concurrently
3015 // TODO(aekmekji): Make sure that the reason this fails when run with
3016 // max_subcompactions > 1 is not a correctness issue but just inherent to
3017 // running parallel L0-L1 compactions
TEST_F(DBCompactionTest,SuggestCompactRangeNoTwoLevel0Compactions)3018 TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
3019 Options options = CurrentOptions();
3020 options.compaction_style = kCompactionStyleLevel;
3021 options.write_buffer_size = 110 << 10;
3022 options.arena_block_size = 4 << 10;
3023 options.level0_file_num_compaction_trigger = 4;
3024 options.num_levels = 4;
3025 options.compression = kNoCompression;
3026 options.max_bytes_for_level_base = 450 << 10;
3027 options.target_file_size_base = 98 << 10;
3028 options.max_write_buffer_number = 2;
3029 options.max_background_compactions = 2;
3030
3031 DestroyAndReopen(options);
3032
3033 // fill up the DB
3034 Random rnd(301);
3035 for (int num = 0; num < 10; num++) {
3036 GenerateNewRandomFile(&rnd);
3037 }
3038 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
3039
3040 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3041 {{"CompactionJob::Run():Start",
3042 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
3043 {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
3044 "CompactionJob::Run():End"}});
3045
3046 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3047
3048 // trigger L0 compaction
3049 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
3050 num++) {
3051 GenerateNewRandomFile(&rnd, /* nowait */ true);
3052 ASSERT_OK(Flush());
3053 }
3054
3055 TEST_SYNC_POINT(
3056 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
3057
3058 GenerateNewRandomFile(&rnd, /* nowait */ true);
3059 dbfull()->TEST_WaitForFlushMemTable();
3060 ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
3061 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
3062 num++) {
3063 GenerateNewRandomFile(&rnd, /* nowait */ true);
3064 ASSERT_OK(Flush());
3065 }
3066
3067 TEST_SYNC_POINT(
3068 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
3069 dbfull()->TEST_WaitForCompact();
3070 }
3071
ShortKey(int i)3072 static std::string ShortKey(int i) {
3073 assert(i < 10000);
3074 char buf[100];
3075 snprintf(buf, sizeof(buf), "key%04d", i);
3076 return std::string(buf);
3077 }
3078
TEST_P(DBCompactionTestWithParam,ForceBottommostLevelCompaction)3079 TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
3080 int32_t trivial_move = 0;
3081 int32_t non_trivial_move = 0;
3082 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3083 "DBImpl::BackgroundCompaction:TrivialMove",
3084 [&](void* /*arg*/) { trivial_move++; });
3085 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3086 "DBImpl::BackgroundCompaction:NonTrivial",
3087 [&](void* /*arg*/) { non_trivial_move++; });
3088 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3089
3090 // The key size is guaranteed to be <= 8
3091 class ShortKeyComparator : public Comparator {
3092 int Compare(const ROCKSDB_NAMESPACE::Slice& a,
3093 const ROCKSDB_NAMESPACE::Slice& b) const override {
3094 assert(a.size() <= 8);
3095 assert(b.size() <= 8);
3096 return BytewiseComparator()->Compare(a, b);
3097 }
3098 const char* Name() const override { return "ShortKeyComparator"; }
3099 void FindShortestSeparator(
3100 std::string* start,
3101 const ROCKSDB_NAMESPACE::Slice& limit) const override {
3102 return BytewiseComparator()->FindShortestSeparator(start, limit);
3103 }
3104 void FindShortSuccessor(std::string* key) const override {
3105 return BytewiseComparator()->FindShortSuccessor(key);
3106 }
3107 } short_key_cmp;
3108 Options options = CurrentOptions();
3109 options.target_file_size_base = 100000000;
3110 options.write_buffer_size = 100000000;
3111 options.max_subcompactions = max_subcompactions_;
3112 options.comparator = &short_key_cmp;
3113 DestroyAndReopen(options);
3114
3115 int32_t value_size = 10 * 1024; // 10 KB
3116
3117 Random rnd(301);
3118 std::vector<std::string> values;
3119 // File with keys [ 0 => 99 ]
3120 for (int i = 0; i < 100; i++) {
3121 values.push_back(RandomString(&rnd, value_size));
3122 ASSERT_OK(Put(ShortKey(i), values[i]));
3123 }
3124 ASSERT_OK(Flush());
3125
3126 ASSERT_EQ("1", FilesPerLevel(0));
3127 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
3128 CompactRangeOptions compact_options;
3129 compact_options.change_level = true;
3130 compact_options.target_level = 3;
3131 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3132 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3133 ASSERT_EQ(trivial_move, 1);
3134 ASSERT_EQ(non_trivial_move, 0);
3135
3136 // File with keys [ 100 => 199 ]
3137 for (int i = 100; i < 200; i++) {
3138 values.push_back(RandomString(&rnd, value_size));
3139 ASSERT_OK(Put(ShortKey(i), values[i]));
3140 }
3141 ASSERT_OK(Flush());
3142
3143 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3144 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3145 // then compacte the bottommost level L3=>L3 (non trivial move)
3146 compact_options = CompactRangeOptions();
3147 compact_options.bottommost_level_compaction =
3148 BottommostLevelCompaction::kForceOptimized;
3149 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3150 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3151 ASSERT_EQ(trivial_move, 4);
3152 ASSERT_EQ(non_trivial_move, 1);
3153
3154 // File with keys [ 200 => 299 ]
3155 for (int i = 200; i < 300; i++) {
3156 values.push_back(RandomString(&rnd, value_size));
3157 ASSERT_OK(Put(ShortKey(i), values[i]));
3158 }
3159 ASSERT_OK(Flush());
3160
3161 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3162 trivial_move = 0;
3163 non_trivial_move = 0;
3164 compact_options = CompactRangeOptions();
3165 compact_options.bottommost_level_compaction =
3166 BottommostLevelCompaction::kSkip;
3167 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3168 // and will skip bottommost level compaction
3169 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3170 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
3171 ASSERT_EQ(trivial_move, 3);
3172 ASSERT_EQ(non_trivial_move, 0);
3173
3174 for (int i = 0; i < 300; i++) {
3175 ASSERT_EQ(Get(ShortKey(i)), values[i]);
3176 }
3177
3178 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3179 }
3180
TEST_P(DBCompactionTestWithParam,IntraL0Compaction)3181 TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
3182 Options options = CurrentOptions();
3183 options.compression = kNoCompression;
3184 options.level0_file_num_compaction_trigger = 5;
3185 options.max_background_compactions = 2;
3186 options.max_subcompactions = max_subcompactions_;
3187 DestroyAndReopen(options);
3188
3189 const size_t kValueSize = 1 << 20;
3190 Random rnd(301);
3191 std::string value(RandomString(&rnd, kValueSize));
3192
3193 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3194 {{"LevelCompactionPicker::PickCompactionBySize:0",
3195 "CompactionJob::Run():Start"}});
3196 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3197
3198 // index: 0 1 2 3 4 5 6 7 8 9
3199 // size: 1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
3200 // score: 1.5 1.3 1.5 2.0 inf
3201 //
3202 // Files 0-4 will be included in an L0->L1 compaction.
3203 //
3204 // L0->L0 will be triggered since the sync points guarantee compaction to base
3205 // level is still blocked when files 5-9 trigger another compaction.
3206 //
3207 // Files 6-9 are the longest span of available files for which
3208 // work-per-deleted-file decreases (see "score" row above).
3209 for (int i = 0; i < 10; ++i) {
3210 ASSERT_OK(Put(Key(0), "")); // prevents trivial move
3211 if (i == 5) {
3212 ASSERT_OK(Put(Key(i + 1), value + value));
3213 } else {
3214 ASSERT_OK(Put(Key(i + 1), value));
3215 }
3216 ASSERT_OK(Flush());
3217 }
3218 dbfull()->TEST_WaitForCompact();
3219 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3220
3221 std::vector<std::vector<FileMetaData>> level_to_files;
3222 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3223 &level_to_files);
3224 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3225 // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
3226 ASSERT_EQ(2, level_to_files[0].size());
3227 ASSERT_GT(level_to_files[1].size(), 0);
3228 for (int i = 0; i < 2; ++i) {
3229 ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
3230 }
3231 }
3232
TEST_P(DBCompactionTestWithParam,IntraL0CompactionDoesNotObsoleteDeletions)3233 TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
3234 // regression test for issue #2722: L0->L0 compaction can resurrect deleted
3235 // keys from older L0 files if L1+ files' key-ranges do not include the key.
3236 Options options = CurrentOptions();
3237 options.compression = kNoCompression;
3238 options.level0_file_num_compaction_trigger = 5;
3239 options.max_background_compactions = 2;
3240 options.max_subcompactions = max_subcompactions_;
3241 DestroyAndReopen(options);
3242
3243 const size_t kValueSize = 1 << 20;
3244 Random rnd(301);
3245 std::string value(RandomString(&rnd, kValueSize));
3246
3247 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3248 {{"LevelCompactionPicker::PickCompactionBySize:0",
3249 "CompactionJob::Run():Start"}});
3250 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3251
3252 // index: 0 1 2 3 4 5 6 7 8 9
3253 // size: 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB
3254 // score: 1.25 1.33 1.5 2.0 inf
3255 //
3256 // Files 0-4 will be included in an L0->L1 compaction.
3257 //
3258 // L0->L0 will be triggered since the sync points guarantee compaction to base
3259 // level is still blocked when files 5-9 trigger another compaction. All files
3260 // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
3261 //
3262 // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
3263 // L0->L0 preserves the deletion such that the key remains deleted.
3264 for (int i = 0; i < 10; ++i) {
3265 // key 0 serves both to prevent trivial move and as the key we want to
3266 // verify is not resurrected by L0->L0 compaction.
3267 if (i < 5) {
3268 ASSERT_OK(Put(Key(0), ""));
3269 } else {
3270 ASSERT_OK(Delete(Key(0)));
3271 }
3272 ASSERT_OK(Put(Key(i + 1), value));
3273 ASSERT_OK(Flush());
3274 }
3275 dbfull()->TEST_WaitForCompact();
3276 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3277
3278 std::vector<std::vector<FileMetaData>> level_to_files;
3279 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3280 &level_to_files);
3281 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3282 // L0 has a single output file from L0->L0
3283 ASSERT_EQ(1, level_to_files[0].size());
3284 ASSERT_GT(level_to_files[1].size(), 0);
3285 ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
3286
3287 ReadOptions roptions;
3288 std::string result;
3289 ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
3290 }
3291
TEST_P(DBCompactionTestWithParam,FullCompactionInBottomPriThreadPool)3292 TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
3293 const int kNumFilesTrigger = 3;
3294 Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
3295 for (bool use_universal_compaction : {false, true}) {
3296 Options options = CurrentOptions();
3297 if (use_universal_compaction) {
3298 options.compaction_style = kCompactionStyleUniversal;
3299 } else {
3300 options.compaction_style = kCompactionStyleLevel;
3301 options.level_compaction_dynamic_level_bytes = true;
3302 }
3303 options.num_levels = 4;
3304 options.write_buffer_size = 100 << 10; // 100KB
3305 options.target_file_size_base = 32 << 10; // 32KB
3306 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
3307 // Trigger compaction if size amplification exceeds 110%
3308 options.compaction_options_universal.max_size_amplification_percent = 110;
3309 DestroyAndReopen(options);
3310
3311 int num_bottom_pri_compactions = 0;
3312 SyncPoint::GetInstance()->SetCallBack(
3313 "DBImpl::BGWorkBottomCompaction",
3314 [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
3315 SyncPoint::GetInstance()->EnableProcessing();
3316
3317 Random rnd(301);
3318 for (int num = 0; num < kNumFilesTrigger; num++) {
3319 ASSERT_EQ(NumSortedRuns(), num);
3320 int key_idx = 0;
3321 GenerateNewFile(&rnd, &key_idx);
3322 }
3323 dbfull()->TEST_WaitForCompact();
3324
3325 ASSERT_EQ(1, num_bottom_pri_compactions);
3326
3327 // Verify that size amplification did occur
3328 ASSERT_EQ(NumSortedRuns(), 1);
3329 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3330 }
3331 Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
3332 }
3333
TEST_F(DBCompactionTest,OptimizedDeletionObsoleting)3334 TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
3335 // Deletions can be dropped when compacted to non-last level if they fall
3336 // outside the lower-level files' key-ranges.
3337 const int kNumL0Files = 4;
3338 Options options = CurrentOptions();
3339 options.level0_file_num_compaction_trigger = kNumL0Files;
3340 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
3341 DestroyAndReopen(options);
3342
3343 // put key 1 and 3 in separate L1, L2 files.
3344 // So key 0, 2, and 4+ fall outside these levels' key-ranges.
3345 for (int level = 2; level >= 1; --level) {
3346 for (int i = 0; i < 2; ++i) {
3347 Put(Key(2 * i + 1), "val");
3348 Flush();
3349 }
3350 MoveFilesToLevel(level);
3351 ASSERT_EQ(2, NumTableFilesAtLevel(level));
3352 }
3353
3354 // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
3355 // - Tombstones for keys 2 and 4 can be dropped early.
3356 // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
3357 for (int i = 0; i < kNumL0Files; ++i) {
3358 Put(Key(0), "val"); // sentinel to prevent trivial move
3359 Delete(Key(i + 1));
3360 Flush();
3361 }
3362 dbfull()->TEST_WaitForCompact();
3363
3364 for (int i = 0; i < kNumL0Files; ++i) {
3365 std::string value;
3366 ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
3367 }
3368 ASSERT_EQ(2, options.statistics->getTickerCount(
3369 COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
3370 ASSERT_EQ(2,
3371 options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
3372 }
3373
TEST_F(DBCompactionTest,CompactFilesPendingL0Bug)3374 TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
3375 // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
3376 // CompactFiles() had a bug where it failed to pick a compaction when an L0
3377 // compaction existed, but marked it as scheduled anyways. It'd never be
3378 // unmarked as scheduled, so future compactions or DB close could hang.
3379 const int kNumL0Files = 5;
3380 Options options = CurrentOptions();
3381 options.level0_file_num_compaction_trigger = kNumL0Files - 1;
3382 options.max_background_compactions = 2;
3383 DestroyAndReopen(options);
3384
3385 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
3386 {{"LevelCompactionPicker::PickCompaction:Return",
3387 "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
3388 {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
3389 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
3390 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3391
3392 auto schedule_multi_compaction_token =
3393 dbfull()->TEST_write_controler().GetCompactionPressureToken();
3394
3395 // Files 0-3 will be included in an L0->L1 compaction.
3396 //
3397 // File 4 will be included in a call to CompactFiles() while the first
3398 // compaction is running.
3399 for (int i = 0; i < kNumL0Files - 1; ++i) {
3400 ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
3401 ASSERT_OK(Put(Key(i + 1), "val"));
3402 ASSERT_OK(Flush());
3403 }
3404 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
3405 // file 4 flushed after 0-3 picked
3406 ASSERT_OK(Put(Key(kNumL0Files), "val"));
3407 ASSERT_OK(Flush());
3408
3409 // previously DB close would hang forever as this situation caused scheduled
3410 // compactions count to never decrement to zero.
3411 ColumnFamilyMetaData cf_meta;
3412 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3413 ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
3414 std::vector<std::string> input_filenames;
3415 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3416 ASSERT_OK(dbfull()
3417 ->CompactFiles(CompactionOptions(), input_filenames,
3418 0 /* output_level */));
3419 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
3420 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3421 }
3422
TEST_F(DBCompactionTest,CompactFilesOverlapInL0Bug)3423 TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
3424 // Regression test for bug of not pulling in L0 files that overlap the user-
3425 // specified input files in time- and key-ranges.
3426 Put(Key(0), "old_val");
3427 Flush();
3428 Put(Key(0), "new_val");
3429 Flush();
3430
3431 ColumnFamilyMetaData cf_meta;
3432 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3433 ASSERT_GE(cf_meta.levels.size(), 2);
3434 ASSERT_EQ(2, cf_meta.levels[0].files.size());
3435
3436 // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
3437 // overlaps in key-range and time-range.
3438 std::vector<std::string> input_filenames;
3439 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3440 ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
3441 1 /* output_level */));
3442 ASSERT_EQ("new_val", Get(Key(0)));
3443 }
3444
TEST_F(DBCompactionTest,CompactBottomLevelFilesWithDeletions)3445 TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
3446 // bottom-level files may contain deletions due to snapshots protecting the
3447 // deleted keys. Once the snapshot is released, we should see files with many
3448 // such deletions undergo single-file compactions.
3449 const int kNumKeysPerFile = 1024;
3450 const int kNumLevelFiles = 4;
3451 const int kValueSize = 128;
3452 Options options = CurrentOptions();
3453 options.compression = kNoCompression;
3454 options.level0_file_num_compaction_trigger = kNumLevelFiles;
3455 // inflate it a bit to account for key/metadata overhead
3456 options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
3457 CreateAndReopenWithCF({"one"}, options);
3458
3459 Random rnd(301);
3460 const Snapshot* snapshot = nullptr;
3461 for (int i = 0; i < kNumLevelFiles; ++i) {
3462 for (int j = 0; j < kNumKeysPerFile; ++j) {
3463 ASSERT_OK(
3464 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3465 }
3466 if (i == kNumLevelFiles - 1) {
3467 snapshot = db_->GetSnapshot();
3468 // delete every other key after grabbing a snapshot, so these deletions
3469 // and the keys they cover can't be dropped until after the snapshot is
3470 // released.
3471 for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
3472 ASSERT_OK(Delete(Key(j)));
3473 }
3474 }
3475 Flush();
3476 if (i < kNumLevelFiles - 1) {
3477 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
3478 }
3479 }
3480 dbfull()->TEST_WaitForCompact();
3481 ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
3482
3483 std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
3484 db_->GetLiveFilesMetaData(&pre_release_metadata);
3485 // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
3486 // files does not need to be preserved in case of a future snapshot.
3487 ASSERT_OK(Put(Key(0), "val"));
3488 ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3489 // release snapshot and wait for compactions to finish. Single-file
3490 // compactions should be triggered, which reduce the size of each bottom-level
3491 // file without changing file count.
3492 db_->ReleaseSnapshot(snapshot);
3493 ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
3494 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3495 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3496 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3497 ASSERT_TRUE(compaction->compaction_reason() ==
3498 CompactionReason::kBottommostFiles);
3499 });
3500 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3501 dbfull()->TEST_WaitForCompact();
3502 db_->GetLiveFilesMetaData(&post_release_metadata);
3503 ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
3504
3505 for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
3506 const auto& pre_file = pre_release_metadata[i];
3507 const auto& post_file = post_release_metadata[i];
3508 ASSERT_EQ(1, pre_file.level);
3509 ASSERT_EQ(1, post_file.level);
3510 // each file is smaller than it was before as it was rewritten without
3511 // deletion markers/deleted keys.
3512 ASSERT_LT(post_file.size, pre_file.size);
3513 }
3514 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3515 }
3516
TEST_F(DBCompactionTest,LevelCompactExpiredTtlFiles)3517 TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
3518 const int kNumKeysPerFile = 32;
3519 const int kNumLevelFiles = 2;
3520 const int kValueSize = 1024;
3521
3522 Options options = CurrentOptions();
3523 options.compression = kNoCompression;
3524 options.ttl = 24 * 60 * 60; // 24 hours
3525 options.max_open_files = -1;
3526 env_->time_elapse_only_sleep_ = false;
3527 options.env = env_;
3528
3529 env_->addon_time_.store(0);
3530 DestroyAndReopen(options);
3531
3532 Random rnd(301);
3533 for (int i = 0; i < kNumLevelFiles; ++i) {
3534 for (int j = 0; j < kNumKeysPerFile; ++j) {
3535 ASSERT_OK(
3536 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3537 }
3538 Flush();
3539 }
3540 dbfull()->TEST_WaitForCompact();
3541 MoveFilesToLevel(3);
3542 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3543
3544 // Delete previously written keys.
3545 for (int i = 0; i < kNumLevelFiles; ++i) {
3546 for (int j = 0; j < kNumKeysPerFile; ++j) {
3547 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3548 }
3549 Flush();
3550 }
3551 dbfull()->TEST_WaitForCompact();
3552 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3553 MoveFilesToLevel(1);
3554 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3555
3556 env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours
3557 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3558
3559 // Just do a simple write + flush so that the Ttl expired files get
3560 // compacted.
3561 ASSERT_OK(Put("a", "1"));
3562 Flush();
3563 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3564 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3565 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3566 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3567 });
3568 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3569 dbfull()->TEST_WaitForCompact();
3570 // All non-L0 files are deleted, as they contained only deleted data.
3571 ASSERT_EQ("1", FilesPerLevel());
3572 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3573
3574 // Test dynamically changing ttl.
3575
3576 env_->addon_time_.store(0);
3577 DestroyAndReopen(options);
3578
3579 for (int i = 0; i < kNumLevelFiles; ++i) {
3580 for (int j = 0; j < kNumKeysPerFile; ++j) {
3581 ASSERT_OK(
3582 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3583 }
3584 Flush();
3585 }
3586 dbfull()->TEST_WaitForCompact();
3587 MoveFilesToLevel(3);
3588 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3589
3590 // Delete previously written keys.
3591 for (int i = 0; i < kNumLevelFiles; ++i) {
3592 for (int j = 0; j < kNumKeysPerFile; ++j) {
3593 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3594 }
3595 Flush();
3596 }
3597 dbfull()->TEST_WaitForCompact();
3598 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3599 MoveFilesToLevel(1);
3600 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3601
3602 // Move time forward by 12 hours, and make sure that compaction still doesn't
3603 // trigger as ttl is set to 24 hours.
3604 env_->addon_time_.fetch_add(12 * 60 * 60);
3605 ASSERT_OK(Put("a", "1"));
3606 Flush();
3607 dbfull()->TEST_WaitForCompact();
3608 ASSERT_EQ("1,2,0,2", FilesPerLevel());
3609
3610 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3611 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3612 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3613 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3614 });
3615 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3616
3617 // Dynamically change ttl to 10 hours.
3618 // This should trigger a ttl compaction, as 12 hours have already passed.
3619 ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
3620 dbfull()->TEST_WaitForCompact();
3621 // All non-L0 files are deleted, as they contained only deleted data.
3622 ASSERT_EQ("1", FilesPerLevel());
3623 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3624 }
3625
TEST_F(DBCompactionTest,LevelTtlCascadingCompactions)3626 TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
3627 const int kValueSize = 100;
3628
3629 for (bool if_restart : {false, true}) {
3630 for (bool if_open_all_files : {false, true}) {
3631 Options options = CurrentOptions();
3632 options.compression = kNoCompression;
3633 options.ttl = 24 * 60 * 60; // 24 hours
3634 if (if_open_all_files) {
3635 options.max_open_files = -1;
3636 } else {
3637 options.max_open_files = 20;
3638 }
3639 // RocksDB sanitize max open files to at least 20. Modify it back.
3640 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3641 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3642 int* max_open_files = static_cast<int*>(arg);
3643 *max_open_files = 2;
3644 });
3645 // In the case where all files are opened and doing DB restart
3646 // forcing the oldest ancester time in manifest file to be 0 to
3647 // simulate the case of reading from an old version.
3648 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3649 "VersionEdit::EncodeTo:VarintOldestAncesterTime", [&](void* arg) {
3650 if (if_restart && if_open_all_files) {
3651 std::string* encoded_fieled = static_cast<std::string*>(arg);
3652 *encoded_fieled = "";
3653 PutVarint64(encoded_fieled, 0);
3654 }
3655 });
3656
3657 env_->time_elapse_only_sleep_ = false;
3658 options.env = env_;
3659
3660 env_->addon_time_.store(0);
3661 DestroyAndReopen(options);
3662
3663 int ttl_compactions = 0;
3664 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3665 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3666 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3667 auto compaction_reason = compaction->compaction_reason();
3668 if (compaction_reason == CompactionReason::kTtl) {
3669 ttl_compactions++;
3670 }
3671 });
3672 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3673
3674 // Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
3675 Random rnd(301);
3676 for (int i = 1; i <= 100; ++i) {
3677 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3678 }
3679 Flush();
3680 // Get the first file's creation time. This will be the oldest file in the
3681 // DB. Compactions inolving this file's descendents should keep getting
3682 // this time.
3683 std::vector<std::vector<FileMetaData>> level_to_files;
3684 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3685 &level_to_files);
3686 uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time;
3687 // Add 1 hour and do another flush.
3688 env_->addon_time_.fetch_add(1 * 60 * 60);
3689 for (int i = 101; i <= 200; ++i) {
3690 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3691 }
3692 Flush();
3693 MoveFilesToLevel(6);
3694 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
3695
3696 env_->addon_time_.fetch_add(1 * 60 * 60);
3697 // Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
3698 for (int i = 1; i <= 50; ++i) {
3699 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3700 }
3701 Flush();
3702 env_->addon_time_.fetch_add(1 * 60 * 60);
3703 for (int i = 51; i <= 150; ++i) {
3704 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3705 }
3706 Flush();
3707 MoveFilesToLevel(4);
3708 ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
3709
3710 env_->addon_time_.fetch_add(1 * 60 * 60);
3711 // Add one L1 file with key range: [26, 75].
3712 for (int i = 26; i <= 75; ++i) {
3713 ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
3714 }
3715 Flush();
3716 dbfull()->TEST_WaitForCompact();
3717 MoveFilesToLevel(1);
3718 ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
3719
3720 // LSM tree:
3721 // L1: [26 .. 75]
3722 // L4: [1 .. 50][51 ..... 150]
3723 // L6: [1 ........ 100][101 .... 200]
3724 //
3725 // On TTL expiry, TTL compaction should be initiated on L1 file, and the
3726 // compactions should keep going on until the key range hits bottom level.
3727 // In other words: the compaction on this data range "cascasdes" until
3728 // reaching the bottom level.
3729 //
3730 // Order of events on TTL expiry:
3731 // 1. L1 file falls to L3 via 2 trivial moves which are initiated by the
3732 // ttl
3733 // compaction.
3734 // 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
3735 // 3. The new output file from L4 falls to L5 via 1 trival move initiated
3736 // by the ttl compaction.
3737 // 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
3738
3739 // Add 25 hours and do a write
3740 env_->addon_time_.fetch_add(25 * 60 * 60);
3741
3742 ASSERT_OK(Put(Key(1), "1"));
3743 if (if_restart) {
3744 Reopen(options);
3745 } else {
3746 Flush();
3747 }
3748 dbfull()->TEST_WaitForCompact();
3749 ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3750 ASSERT_EQ(5, ttl_compactions);
3751
3752 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3753 &level_to_files);
3754 ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time);
3755
3756 env_->addon_time_.fetch_add(25 * 60 * 60);
3757 ASSERT_OK(Put(Key(2), "1"));
3758 if (if_restart) {
3759 Reopen(options);
3760 } else {
3761 Flush();
3762 }
3763 dbfull()->TEST_WaitForCompact();
3764 ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
3765 ASSERT_GE(ttl_compactions, 6);
3766
3767 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3768 }
3769 }
3770 }
3771
TEST_F(DBCompactionTest,LevelPeriodicCompaction)3772 TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
3773 const int kNumKeysPerFile = 32;
3774 const int kNumLevelFiles = 2;
3775 const int kValueSize = 100;
3776
3777 for (bool if_restart : {false, true}) {
3778 for (bool if_open_all_files : {false, true}) {
3779 Options options = CurrentOptions();
3780 options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
3781 if (if_open_all_files) {
3782 options.max_open_files = -1; // needed for ttl compaction
3783 } else {
3784 options.max_open_files = 20;
3785 }
3786 // RocksDB sanitize max open files to at least 20. Modify it back.
3787 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3788 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
3789 int* max_open_files = static_cast<int*>(arg);
3790 *max_open_files = 0;
3791 });
3792 // In the case where all files are opened and doing DB restart
3793 // forcing the file creation time in manifest file to be 0 to
3794 // simulate the case of reading from an old version.
3795 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3796 "VersionEdit::EncodeTo:VarintFileCreationTime", [&](void* arg) {
3797 if (if_restart && if_open_all_files) {
3798 std::string* encoded_fieled = static_cast<std::string*>(arg);
3799 *encoded_fieled = "";
3800 PutVarint64(encoded_fieled, 0);
3801 }
3802 });
3803
3804 env_->time_elapse_only_sleep_ = false;
3805 options.env = env_;
3806
3807 env_->addon_time_.store(0);
3808 DestroyAndReopen(options);
3809
3810 int periodic_compactions = 0;
3811 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3812 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3813 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3814 auto compaction_reason = compaction->compaction_reason();
3815 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3816 periodic_compactions++;
3817 }
3818 });
3819 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3820
3821 Random rnd(301);
3822 for (int i = 0; i < kNumLevelFiles; ++i) {
3823 for (int j = 0; j < kNumKeysPerFile; ++j) {
3824 ASSERT_OK(Put(Key(i * kNumKeysPerFile + j),
3825 RandomString(&rnd, kValueSize)));
3826 }
3827 Flush();
3828 }
3829 dbfull()->TEST_WaitForCompact();
3830
3831 ASSERT_EQ("2", FilesPerLevel());
3832 ASSERT_EQ(0, periodic_compactions);
3833
3834 // Add 50 hours and do a write
3835 env_->addon_time_.fetch_add(50 * 60 * 60);
3836 ASSERT_OK(Put("a", "1"));
3837 Flush();
3838 dbfull()->TEST_WaitForCompact();
3839 // Assert that the files stay in the same level
3840 ASSERT_EQ("3", FilesPerLevel());
3841 // The two old files go through the periodic compaction process
3842 ASSERT_EQ(2, periodic_compactions);
3843
3844 MoveFilesToLevel(1);
3845 ASSERT_EQ("0,3", FilesPerLevel());
3846
3847 // Add another 50 hours and do another write
3848 env_->addon_time_.fetch_add(50 * 60 * 60);
3849 ASSERT_OK(Put("b", "2"));
3850 if (if_restart) {
3851 Reopen(options);
3852 } else {
3853 Flush();
3854 }
3855 dbfull()->TEST_WaitForCompact();
3856 ASSERT_EQ("1,3", FilesPerLevel());
3857 // The three old files now go through the periodic compaction process. 2
3858 // + 3.
3859 ASSERT_EQ(5, periodic_compactions);
3860
3861 // Add another 50 hours and do another write
3862 env_->addon_time_.fetch_add(50 * 60 * 60);
3863 ASSERT_OK(Put("c", "3"));
3864 Flush();
3865 dbfull()->TEST_WaitForCompact();
3866 ASSERT_EQ("2,3", FilesPerLevel());
3867 // The four old files now go through the periodic compaction process. 5
3868 // + 4.
3869 ASSERT_EQ(9, periodic_compactions);
3870
3871 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3872 }
3873 }
3874 }
3875
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithOldDB)3876 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
3877 // This test makes sure that periodic compactions are working with a DB
3878 // where file_creation_time of some files is 0.
3879 // After compactions the new files are created with a valid file_creation_time
3880
3881 const int kNumKeysPerFile = 32;
3882 const int kNumFiles = 4;
3883 const int kValueSize = 100;
3884
3885 Options options = CurrentOptions();
3886 env_->time_elapse_only_sleep_ = false;
3887 options.env = env_;
3888
3889 env_->addon_time_.store(0);
3890 DestroyAndReopen(options);
3891
3892 int periodic_compactions = 0;
3893 bool set_file_creation_time_to_zero = true;
3894 bool set_creation_time_to_zero = true;
3895 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3896 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3897 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3898 auto compaction_reason = compaction->compaction_reason();
3899 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3900 periodic_compactions++;
3901 }
3902 });
3903 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3904 "PropertyBlockBuilder::AddTableProperty:Start", [&](void* arg) {
3905 TableProperties* props = reinterpret_cast<TableProperties*>(arg);
3906 if (set_file_creation_time_to_zero) {
3907 props->file_creation_time = 0;
3908 }
3909 if (set_creation_time_to_zero) {
3910 props->creation_time = 0;
3911 }
3912 });
3913 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3914
3915 Random rnd(301);
3916 for (int i = 0; i < kNumFiles; ++i) {
3917 for (int j = 0; j < kNumKeysPerFile; ++j) {
3918 ASSERT_OK(
3919 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3920 }
3921 Flush();
3922 // Move the first two files to L2.
3923 if (i == 1) {
3924 MoveFilesToLevel(2);
3925 set_creation_time_to_zero = false;
3926 }
3927 }
3928 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3929
3930 ASSERT_EQ("2,0,2", FilesPerLevel());
3931 ASSERT_EQ(0, periodic_compactions);
3932
3933 Close();
3934
3935 set_file_creation_time_to_zero = false;
3936 // Forward the clock by 2 days.
3937 env_->addon_time_.fetch_add(2 * 24 * 60 * 60);
3938 options.periodic_compaction_seconds = 1 * 24 * 60 * 60; // 1 day
3939
3940 Reopen(options);
3941 ASSERT_OK(dbfull()->TEST_WaitForCompact());
3942 ASSERT_EQ("2,0,2", FilesPerLevel());
3943 // Make sure that all files go through periodic compaction.
3944 ASSERT_EQ(kNumFiles, periodic_compactions);
3945
3946 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
3947 }
3948
TEST_F(DBCompactionTest,LevelPeriodicAndTtlCompaction)3949 TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
3950 const int kNumKeysPerFile = 32;
3951 const int kNumLevelFiles = 2;
3952 const int kValueSize = 100;
3953
3954 Options options = CurrentOptions();
3955 options.ttl = 10 * 60 * 60; // 10 hours
3956 options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
3957 options.max_open_files = -1; // needed for both periodic and ttl compactions
3958 env_->time_elapse_only_sleep_ = false;
3959 options.env = env_;
3960
3961 env_->addon_time_.store(0);
3962 DestroyAndReopen(options);
3963
3964 int periodic_compactions = 0;
3965 int ttl_compactions = 0;
3966 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
3967 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3968 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3969 auto compaction_reason = compaction->compaction_reason();
3970 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
3971 periodic_compactions++;
3972 } else if (compaction_reason == CompactionReason::kTtl) {
3973 ttl_compactions++;
3974 }
3975 });
3976 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
3977
3978 Random rnd(301);
3979 for (int i = 0; i < kNumLevelFiles; ++i) {
3980 for (int j = 0; j < kNumKeysPerFile; ++j) {
3981 ASSERT_OK(
3982 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3983 }
3984 Flush();
3985 }
3986 dbfull()->TEST_WaitForCompact();
3987
3988 MoveFilesToLevel(3);
3989
3990 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3991 ASSERT_EQ(0, periodic_compactions);
3992 ASSERT_EQ(0, ttl_compactions);
3993
3994 // Add some time greater than periodic_compaction_time.
3995 env_->addon_time_.fetch_add(50 * 60 * 60);
3996 ASSERT_OK(Put("a", "1"));
3997 Flush();
3998 dbfull()->TEST_WaitForCompact();
3999 // Files in the bottom level go through periodic compactions.
4000 ASSERT_EQ("1,0,0,2", FilesPerLevel());
4001 ASSERT_EQ(2, periodic_compactions);
4002 ASSERT_EQ(0, ttl_compactions);
4003
4004 // Add a little more time than ttl
4005 env_->addon_time_.fetch_add(11 * 60 * 60);
4006 ASSERT_OK(Put("b", "1"));
4007 Flush();
4008 dbfull()->TEST_WaitForCompact();
4009 // Notice that the previous file in level 1 falls down to the bottom level
4010 // due to ttl compactions, one level at a time.
4011 // And bottom level files don't get picked up for ttl compactions.
4012 ASSERT_EQ("1,0,0,3", FilesPerLevel());
4013 ASSERT_EQ(2, periodic_compactions);
4014 ASSERT_EQ(3, ttl_compactions);
4015
4016 // Add some time greater than periodic_compaction_time.
4017 env_->addon_time_.fetch_add(50 * 60 * 60);
4018 ASSERT_OK(Put("c", "1"));
4019 Flush();
4020 dbfull()->TEST_WaitForCompact();
4021 // Previous L0 file falls one level at a time to bottom level due to ttl.
4022 // And all 4 bottom files go through periodic compactions.
4023 ASSERT_EQ("1,0,0,4", FilesPerLevel());
4024 ASSERT_EQ(6, periodic_compactions);
4025 ASSERT_EQ(6, ttl_compactions);
4026
4027 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4028 }
4029
TEST_F(DBCompactionTest,LevelPeriodicCompactionWithCompactionFilters)4030 TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
4031 class TestCompactionFilter : public CompactionFilter {
4032 const char* Name() const override { return "TestCompactionFilter"; }
4033 };
4034 class TestCompactionFilterFactory : public CompactionFilterFactory {
4035 const char* Name() const override { return "TestCompactionFilterFactory"; }
4036 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
4037 const CompactionFilter::Context& /*context*/) override {
4038 return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
4039 }
4040 };
4041
4042 const int kNumKeysPerFile = 32;
4043 const int kNumLevelFiles = 2;
4044 const int kValueSize = 100;
4045
4046 Random rnd(301);
4047
4048 Options options = CurrentOptions();
4049 TestCompactionFilter test_compaction_filter;
4050 env_->time_elapse_only_sleep_ = false;
4051 options.env = env_;
4052 env_->addon_time_.store(0);
4053
4054 enum CompactionFilterType {
4055 kUseCompactionFilter,
4056 kUseCompactionFilterFactory
4057 };
4058
4059 for (CompactionFilterType comp_filter_type :
4060 {kUseCompactionFilter, kUseCompactionFilterFactory}) {
4061 // Assert that periodic compactions are not enabled.
4062 ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
4063
4064 if (comp_filter_type == kUseCompactionFilter) {
4065 options.compaction_filter = &test_compaction_filter;
4066 options.compaction_filter_factory.reset();
4067 } else if (comp_filter_type == kUseCompactionFilterFactory) {
4068 options.compaction_filter = nullptr;
4069 options.compaction_filter_factory.reset(
4070 new TestCompactionFilterFactory());
4071 }
4072 DestroyAndReopen(options);
4073
4074 // periodic_compaction_seconds should be set to the sanitized value when
4075 // a compaction filter or a compaction filter factory is used.
4076 ASSERT_EQ(30 * 24 * 60 * 60,
4077 dbfull()->GetOptions().periodic_compaction_seconds);
4078
4079 int periodic_compactions = 0;
4080 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4081 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
4082 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
4083 auto compaction_reason = compaction->compaction_reason();
4084 if (compaction_reason == CompactionReason::kPeriodicCompaction) {
4085 periodic_compactions++;
4086 }
4087 });
4088 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4089
4090 for (int i = 0; i < kNumLevelFiles; ++i) {
4091 for (int j = 0; j < kNumKeysPerFile; ++j) {
4092 ASSERT_OK(
4093 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
4094 }
4095 Flush();
4096 }
4097 dbfull()->TEST_WaitForCompact();
4098
4099 ASSERT_EQ("2", FilesPerLevel());
4100 ASSERT_EQ(0, periodic_compactions);
4101
4102 // Add 31 days and do a write
4103 env_->addon_time_.fetch_add(31 * 24 * 60 * 60);
4104 ASSERT_OK(Put("a", "1"));
4105 Flush();
4106 dbfull()->TEST_WaitForCompact();
4107 // Assert that the files stay in the same level
4108 ASSERT_EQ("3", FilesPerLevel());
4109 // The two old files go through the periodic compaction process
4110 ASSERT_EQ(2, periodic_compactions);
4111
4112 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4113 }
4114 }
4115
TEST_F(DBCompactionTest,CompactRangeDelayedByL0FileCount)4116 TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
4117 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4118 // compaction only triggers flush after it's sure stall won't be triggered for
4119 // L0 file count going too high.
4120 const int kNumL0FilesTrigger = 4;
4121 const int kNumL0FilesLimit = 8;
4122 // i == 0: verifies normal case where stall is avoided by delay
4123 // i == 1: verifies no delay in edge case where stall trigger is same as
4124 // compaction trigger, so stall can't be avoided
4125 for (int i = 0; i < 2; ++i) {
4126 Options options = CurrentOptions();
4127 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4128 if (i == 0) {
4129 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4130 } else {
4131 options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
4132 }
4133 Reopen(options);
4134
4135 if (i == 0) {
4136 // ensure the auto compaction doesn't finish until manual compaction has
4137 // had a chance to be delayed.
4138 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4139 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4140 "CompactionJob::Run():End"}});
4141 } else {
4142 // ensure the auto-compaction doesn't finish until manual compaction has
4143 // continued without delay.
4144 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4145 {{"DBImpl::FlushMemTable:StallWaitDone",
4146 "CompactionJob::Run():End"}});
4147 }
4148 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4149
4150 Random rnd(301);
4151 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4152 for (int k = 0; k < 2; ++k) {
4153 ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
4154 }
4155 Flush();
4156 }
4157 auto manual_compaction_thread = port::Thread([this]() {
4158 CompactRangeOptions cro;
4159 cro.allow_write_stall = false;
4160 db_->CompactRange(cro, nullptr, nullptr);
4161 });
4162
4163 manual_compaction_thread.join();
4164 dbfull()->TEST_WaitForCompact();
4165 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4166 ASSERT_GT(NumTableFilesAtLevel(1), 0);
4167 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4168 }
4169 }
4170
TEST_F(DBCompactionTest,CompactRangeDelayedByImmMemTableCount)4171 TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
4172 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
4173 // compaction only triggers flush after it's sure stall won't be triggered for
4174 // immutable memtable count going too high.
4175 const int kNumImmMemTableLimit = 8;
4176 // i == 0: verifies normal case where stall is avoided by delay
4177 // i == 1: verifies no delay in edge case where stall trigger is same as flush
4178 // trigger, so stall can't be avoided
4179 for (int i = 0; i < 2; ++i) {
4180 Options options = CurrentOptions();
4181 options.disable_auto_compactions = true;
4182 // the delay limit is one less than the stop limit. This test focuses on
4183 // avoiding delay limit, but this option sets stop limit, so add one.
4184 options.max_write_buffer_number = kNumImmMemTableLimit + 1;
4185 if (i == 1) {
4186 options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
4187 }
4188 Reopen(options);
4189
4190 if (i == 0) {
4191 // ensure the flush doesn't finish until manual compaction has had a
4192 // chance to be delayed.
4193 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4194 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4195 "FlushJob::WriteLevel0Table"}});
4196 } else {
4197 // ensure the flush doesn't finish until manual compaction has continued
4198 // without delay.
4199 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4200 {{"DBImpl::FlushMemTable:StallWaitDone",
4201 "FlushJob::WriteLevel0Table"}});
4202 }
4203 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4204
4205 Random rnd(301);
4206 for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
4207 ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
4208 FlushOptions flush_opts;
4209 flush_opts.wait = false;
4210 flush_opts.allow_write_stall = true;
4211 dbfull()->Flush(flush_opts);
4212 }
4213
4214 auto manual_compaction_thread = port::Thread([this]() {
4215 CompactRangeOptions cro;
4216 cro.allow_write_stall = false;
4217 db_->CompactRange(cro, nullptr, nullptr);
4218 });
4219
4220 manual_compaction_thread.join();
4221 dbfull()->TEST_WaitForFlushMemTable();
4222 ASSERT_EQ(0, NumTableFilesAtLevel(0));
4223 ASSERT_GT(NumTableFilesAtLevel(1), 0);
4224 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4225 }
4226 }
4227
TEST_F(DBCompactionTest,CompactRangeShutdownWhileDelayed)4228 TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
4229 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
4230 // does not hang if CF is dropped or DB is closed
4231 const int kNumL0FilesTrigger = 4;
4232 const int kNumL0FilesLimit = 8;
4233 Options options = CurrentOptions();
4234 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4235 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4236 // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
4237 // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
4238 // simulate what happens during Close as we can't call Close (it
4239 // blocks on the auto-compaction, making a cycle).
4240 for (int i = 0; i < 2; ++i) {
4241 CreateAndReopenWithCF({"one"}, options);
4242 // The calls to close CF/DB wait until the manual compaction stalls.
4243 // The auto-compaction waits until the manual compaction finishes to ensure
4244 // the signal comes from closing CF/DB, not from compaction making progress.
4245 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4246 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4247 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
4248 {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
4249 "CompactionJob::Run():End"}});
4250 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4251
4252 Random rnd(301);
4253 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
4254 for (int k = 0; k < 2; ++k) {
4255 ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
4256 }
4257 Flush(1);
4258 }
4259 auto manual_compaction_thread = port::Thread([this, i]() {
4260 CompactRangeOptions cro;
4261 cro.allow_write_stall = false;
4262 Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
4263 if (i == 0) {
4264 ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4265 .IsColumnFamilyDropped());
4266 } else {
4267 ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
4268 .IsShutdownInProgress());
4269 }
4270 });
4271
4272 TEST_SYNC_POINT(
4273 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
4274 if (i == 0) {
4275 ASSERT_OK(db_->DropColumnFamily(handles_[1]));
4276 } else {
4277 dbfull()->CancelAllBackgroundWork(false /* wait */);
4278 }
4279 manual_compaction_thread.join();
4280 TEST_SYNC_POINT(
4281 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
4282 dbfull()->TEST_WaitForCompact();
4283 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4284 }
4285 }
4286
TEST_F(DBCompactionTest,CompactRangeSkipFlushAfterDelay)4287 TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
4288 // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
4289 // CompactRange skips its flush if the delay is long enough that the memtables
4290 // existing at the beginning of the call have already been flushed.
4291 const int kNumL0FilesTrigger = 4;
4292 const int kNumL0FilesLimit = 8;
4293 Options options = CurrentOptions();
4294 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
4295 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
4296 Reopen(options);
4297
4298 Random rnd(301);
4299 // The manual flush includes the memtable that was active when CompactRange
4300 // began. So it unblocks CompactRange and precludes its flush. Throughout the
4301 // test, stall conditions are upheld via high L0 file count.
4302 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
4303 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
4304 "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
4305 {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
4306 "DBImpl::FlushMemTable:StallWaitDone"},
4307 {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
4308 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4309
4310 //used for the delayable flushes
4311 FlushOptions flush_opts;
4312 flush_opts.allow_write_stall = true;
4313 for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
4314 for (int j = 0; j < 2; ++j) {
4315 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
4316 }
4317 dbfull()->Flush(flush_opts);
4318 }
4319 auto manual_compaction_thread = port::Thread([this]() {
4320 CompactRangeOptions cro;
4321 cro.allow_write_stall = false;
4322 db_->CompactRange(cro, nullptr, nullptr);
4323 });
4324
4325 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
4326 Put(ToString(0), RandomString(&rnd, 1024));
4327 dbfull()->Flush(flush_opts);
4328 Put(ToString(0), RandomString(&rnd, 1024));
4329 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
4330 manual_compaction_thread.join();
4331
4332 // If CompactRange's flush was skipped, the final Put above will still be
4333 // in the active memtable.
4334 std::string num_keys_in_memtable;
4335 db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
4336 ASSERT_EQ(ToString(1), num_keys_in_memtable);
4337
4338 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
4339 }
4340
TEST_F(DBCompactionTest,CompactRangeFlushOverlappingMemtable)4341 TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
4342 // Verify memtable only gets flushed if it contains data overlapping the range
4343 // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
4344 const int kNumEndpointKeys = 5;
4345 std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
4346 Options options = CurrentOptions();
4347 options.disable_auto_compactions = true;
4348 Reopen(options);
4349
4350 // One extra iteration for nullptr, which means left side of interval is
4351 // unbounded.
4352 for (int i = 0; i <= kNumEndpointKeys; ++i) {
4353 Slice begin;
4354 Slice* begin_ptr;
4355 if (i == 0) {
4356 begin_ptr = nullptr;
4357 } else {
4358 begin = keys[i - 1];
4359 begin_ptr = &begin;
4360 }
4361 // Start at `i` so right endpoint comes after left endpoint. One extra
4362 // iteration for nullptr, which means right side of interval is unbounded.
4363 for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
4364 Slice end;
4365 Slice* end_ptr;
4366 if (j == kNumEndpointKeys) {
4367 end_ptr = nullptr;
4368 } else {
4369 end = keys[j];
4370 end_ptr = &end;
4371 }
4372 ASSERT_OK(Put("b", "val"));
4373 ASSERT_OK(Put("d", "val"));
4374 CompactRangeOptions compact_range_opts;
4375 ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
4376
4377 uint64_t get_prop_tmp, num_memtable_entries = 0;
4378 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
4379 &get_prop_tmp));
4380 num_memtable_entries += get_prop_tmp;
4381 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
4382 &get_prop_tmp));
4383 num_memtable_entries += get_prop_tmp;
4384 if (begin_ptr == nullptr || end_ptr == nullptr ||
4385 (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
4386 // In this case `CompactRange`'s range overlapped in some way with the
4387 // memtable's range, so flush should've happened. Then "b" and "d" won't
4388 // be in the memtable.
4389 ASSERT_EQ(0, num_memtable_entries);
4390 } else {
4391 ASSERT_EQ(2, num_memtable_entries);
4392 // flush anyways to prepare for next iteration
4393 db_->Flush(FlushOptions());
4394 }
4395 }
4396 }
4397 }
4398
TEST_F(DBCompactionTest,CompactionStatsTest)4399 TEST_F(DBCompactionTest, CompactionStatsTest) {
4400 Options options = CurrentOptions();
4401 options.level0_file_num_compaction_trigger = 2;
4402 CompactionStatsCollector* collector = new CompactionStatsCollector();
4403 options.listeners.emplace_back(collector);
4404 DestroyAndReopen(options);
4405
4406 for (int i = 0; i < 32; i++) {
4407 for (int j = 0; j < 5000; j++) {
4408 Put(std::to_string(j), std::string(1, 'A'));
4409 }
4410 ASSERT_OK(Flush());
4411 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
4412 }
4413 dbfull()->TEST_WaitForCompact();
4414 ColumnFamilyHandleImpl* cfh =
4415 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4416 ColumnFamilyData* cfd = cfh->cfd();
4417
4418 VerifyCompactionStats(*cfd, *collector);
4419 }
4420
TEST_F(DBCompactionTest,CompactFilesOutputRangeConflict)4421 TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
4422 // LSM setup:
4423 // L1: [ba bz]
4424 // L2: [a b] [c d]
4425 // L3: [a b] [c d]
4426 //
4427 // Thread 1: Thread 2:
4428 // Begin compacting all L2->L3
4429 // Compact [ba bz] L1->L3
4430 // End compacting all L2->L3
4431 //
4432 // The compaction operation in thread 2 should be disallowed because the range
4433 // overlaps with the compaction in thread 1, which also covers that range in
4434 // L3.
4435 Options options = CurrentOptions();
4436 FlushedFileCollector* collector = new FlushedFileCollector();
4437 options.listeners.emplace_back(collector);
4438 Reopen(options);
4439
4440 for (int level = 3; level >= 2; --level) {
4441 ASSERT_OK(Put("a", "val"));
4442 ASSERT_OK(Put("b", "val"));
4443 ASSERT_OK(Flush());
4444 ASSERT_OK(Put("c", "val"));
4445 ASSERT_OK(Put("d", "val"));
4446 ASSERT_OK(Flush());
4447 MoveFilesToLevel(level);
4448 }
4449 ASSERT_OK(Put("ba", "val"));
4450 ASSERT_OK(Put("bz", "val"));
4451 ASSERT_OK(Flush());
4452 MoveFilesToLevel(1);
4453
4454 SyncPoint::GetInstance()->LoadDependency({
4455 {"CompactFilesImpl:0",
4456 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
4457 {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
4458 "CompactFilesImpl:1"},
4459 });
4460 SyncPoint::GetInstance()->EnableProcessing();
4461
4462 auto bg_thread = port::Thread([&]() {
4463 // Thread 1
4464 std::vector<std::string> filenames = collector->GetFlushedFiles();
4465 filenames.pop_back();
4466 ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
4467 3 /* output_level */));
4468 });
4469
4470 // Thread 2
4471 TEST_SYNC_POINT(
4472 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
4473 std::string filename = collector->GetFlushedFiles().back();
4474 ASSERT_FALSE(
4475 db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
4476 .ok());
4477 TEST_SYNC_POINT(
4478 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
4479
4480 bg_thread.join();
4481 }
4482
TEST_F(DBCompactionTest,CompactionHasEmptyOutput)4483 TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
4484 Options options = CurrentOptions();
4485 SstStatsCollector* collector = new SstStatsCollector();
4486 options.level0_file_num_compaction_trigger = 2;
4487 options.listeners.emplace_back(collector);
4488 Reopen(options);
4489
4490 // Make sure the L0 files overlap to prevent trivial move.
4491 ASSERT_OK(Put("a", "val"));
4492 ASSERT_OK(Put("b", "val"));
4493 ASSERT_OK(Flush());
4494 ASSERT_OK(Delete("a"));
4495 ASSERT_OK(Delete("b"));
4496 ASSERT_OK(Flush());
4497
4498 dbfull()->TEST_WaitForCompact();
4499 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
4500 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
4501
4502 // Expect one file creation to start for each flush, and zero for compaction
4503 // since no keys are written.
4504 ASSERT_EQ(2, collector->num_ssts_creation_started());
4505 }
4506
TEST_F(DBCompactionTest,CompactionLimiter)4507 TEST_F(DBCompactionTest, CompactionLimiter) {
4508 const int kNumKeysPerFile = 10;
4509 const int kMaxBackgroundThreads = 64;
4510
4511 struct CompactionLimiter {
4512 std::string name;
4513 int limit_tasks;
4514 int max_tasks;
4515 int tasks;
4516 std::shared_ptr<ConcurrentTaskLimiter> limiter;
4517 };
4518
4519 std::vector<CompactionLimiter> limiter_settings;
4520 limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
4521 limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
4522 limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
4523
4524 for (auto& ls : limiter_settings) {
4525 ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
4526 }
4527
4528 std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
4529 NewConcurrentTaskLimiter("unique_limiter", -1));
4530
4531 const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
4532 "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
4533 const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
4534
4535 std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
4536
4537 Options options = CurrentOptions();
4538 options.write_buffer_size = 110 * 1024; // 110KB
4539 options.arena_block_size = 4096;
4540 options.num_levels = 3;
4541 options.level0_file_num_compaction_trigger = 4;
4542 options.level0_slowdown_writes_trigger = 64;
4543 options.level0_stop_writes_trigger = 64;
4544 options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
4545 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4546 options.max_write_buffer_number = 10; // Enough memtables
4547 DestroyAndReopen(options);
4548
4549 std::vector<Options> option_vector;
4550 option_vector.reserve(cf_count);
4551
4552 for (unsigned int cf = 0; cf < cf_count; cf++) {
4553 ColumnFamilyOptions cf_opt(options);
4554 if (cf == 0) {
4555 // "Default" CF does't use compaction limiter
4556 cf_opt.compaction_thread_limiter = nullptr;
4557 } else if (cf == 1) {
4558 // "1" CF uses bypass compaction limiter
4559 unique_limiter->SetMaxOutstandingTask(-1);
4560 cf_opt.compaction_thread_limiter = unique_limiter;
4561 } else {
4562 // Assign limiter by mod
4563 auto& ls = limiter_settings[cf % 3];
4564 cf_opt.compaction_thread_limiter = ls.limiter;
4565 cf_to_limiter[cf_names[cf]] = &ls;
4566 }
4567 option_vector.emplace_back(DBOptions(options), cf_opt);
4568 }
4569
4570 for (unsigned int cf = 1; cf < cf_count; cf++) {
4571 CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
4572 }
4573
4574 ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
4575 cf_names + cf_count),
4576 option_vector);
4577
4578 port::Mutex mutex;
4579
4580 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4581 "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
4582 const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4583 auto iter = cf_to_limiter.find(cf_name);
4584 if (iter != cf_to_limiter.end()) {
4585 MutexLock l(&mutex);
4586 ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
4587 iter->second->max_tasks =
4588 std::max(iter->second->max_tasks, iter->second->limit_tasks);
4589 }
4590 });
4591
4592 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4593 "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
4594 const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
4595 auto iter = cf_to_limiter.find(cf_name);
4596 if (iter != cf_to_limiter.end()) {
4597 MutexLock l(&mutex);
4598 ASSERT_GE(--iter->second->tasks, 0);
4599 }
4600 });
4601
4602 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4603
4604 // Block all compact threads in thread pool.
4605 const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
4606 const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
4607 env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
4608 env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
4609
4610 test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
4611
4612 // Block all compaction threads in thread pool.
4613 for (size_t i = 0; i < kTotalCompactTasks; i++) {
4614 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
4615 &sleeping_compact_tasks[i], Env::LOW);
4616 sleeping_compact_tasks[i].WaitUntilSleeping();
4617 }
4618
4619 int keyIndex = 0;
4620
4621 for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
4622 for (unsigned int cf = 0; cf < cf_count; cf++) {
4623 for (int i = 0; i < kNumKeysPerFile; i++) {
4624 ASSERT_OK(Put(cf, Key(keyIndex++), ""));
4625 }
4626 // put extra key to trigger flush
4627 ASSERT_OK(Put(cf, "", ""));
4628 }
4629
4630 for (unsigned int cf = 0; cf < cf_count; cf++) {
4631 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4632 }
4633 }
4634
4635 // Enough L0 files to trigger compaction
4636 for (unsigned int cf = 0; cf < cf_count; cf++) {
4637 ASSERT_EQ(NumTableFilesAtLevel(0, cf),
4638 options.level0_file_num_compaction_trigger);
4639 }
4640
4641 // Create more files for one column family, which triggers speed up
4642 // condition, all compactions will be scheduled.
4643 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
4644 for (int i = 0; i < kNumKeysPerFile; i++) {
4645 ASSERT_OK(Put(0, Key(i), ""));
4646 }
4647 // put extra key to trigger flush
4648 ASSERT_OK(Put(0, "", ""));
4649 dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
4650 ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
4651 NumTableFilesAtLevel(0, 0));
4652 }
4653
4654 // All CFs are pending compaction
4655 ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
4656
4657 // Unblock all compaction threads
4658 for (size_t i = 0; i < kTotalCompactTasks; i++) {
4659 sleeping_compact_tasks[i].WakeUp();
4660 sleeping_compact_tasks[i].WaitUntilDone();
4661 }
4662
4663 for (unsigned int cf = 0; cf < cf_count; cf++) {
4664 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
4665 }
4666
4667 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4668
4669 // Max outstanding compact tasks reached limit
4670 for (auto& ls : limiter_settings) {
4671 ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
4672 ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
4673 }
4674
4675 // test manual compaction under a fully throttled limiter
4676 int cf_test = 1;
4677 unique_limiter->SetMaxOutstandingTask(0);
4678
4679 // flush one more file to cf 1
4680 for (int i = 0; i < kNumKeysPerFile; i++) {
4681 ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
4682 }
4683 // put extra key to trigger flush
4684 ASSERT_OK(Put(cf_test, "", ""));
4685
4686 dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
4687 ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
4688
4689 Compact(cf_test, Key(0), Key(keyIndex));
4690 ASSERT_OK(dbfull()->TEST_WaitForCompact());
4691 }
4692
4693 INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
4694 ::testing::Values(std::make_tuple(1, true),
4695 std::make_tuple(1, false),
4696 std::make_tuple(4, true),
4697 std::make_tuple(4, false)));
4698
TEST_P(DBCompactionDirectIOTest,DirectIO)4699 TEST_P(DBCompactionDirectIOTest, DirectIO) {
4700 Options options = CurrentOptions();
4701 Destroy(options);
4702 options.create_if_missing = true;
4703 options.disable_auto_compactions = true;
4704 options.use_direct_io_for_flush_and_compaction = GetParam();
4705 options.env = new MockEnv(Env::Default());
4706 Reopen(options);
4707 bool readahead = false;
4708 SyncPoint::GetInstance()->SetCallBack(
4709 "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
4710 bool* use_direct_writes = static_cast<bool*>(arg);
4711 ASSERT_EQ(*use_direct_writes,
4712 options.use_direct_io_for_flush_and_compaction);
4713 });
4714 if (options.use_direct_io_for_flush_and_compaction) {
4715 SyncPoint::GetInstance()->SetCallBack(
4716 "SanitizeOptions:direct_io", [&](void* /*arg*/) {
4717 readahead = true;
4718 });
4719 }
4720 SyncPoint::GetInstance()->EnableProcessing();
4721 CreateAndReopenWithCF({"pikachu"}, options);
4722 MakeTables(3, "p", "q", 1);
4723 ASSERT_EQ("1,1,1", FilesPerLevel(1));
4724 Compact(1, "p", "q");
4725 ASSERT_EQ(readahead, options.use_direct_reads);
4726 ASSERT_EQ("0,0,1", FilesPerLevel(1));
4727 Destroy(options);
4728 delete options.env;
4729 }
4730
4731 INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
4732 testing::Bool());
4733
4734 class CompactionPriTest : public DBTestBase,
4735 public testing::WithParamInterface<uint32_t> {
4736 public:
CompactionPriTest()4737 CompactionPriTest() : DBTestBase("/compaction_pri_test") {
4738 compaction_pri_ = GetParam();
4739 }
4740
4741 // Required if inheriting from testing::WithParamInterface<>
SetUpTestCase()4742 static void SetUpTestCase() {}
TearDownTestCase()4743 static void TearDownTestCase() {}
4744
4745 uint32_t compaction_pri_;
4746 };
4747
TEST_P(CompactionPriTest,Test)4748 TEST_P(CompactionPriTest, Test) {
4749 Options options = CurrentOptions();
4750 options.write_buffer_size = 16 * 1024;
4751 options.compaction_pri = static_cast<CompactionPri>(compaction_pri_);
4752 options.hard_pending_compaction_bytes_limit = 256 * 1024;
4753 options.max_bytes_for_level_base = 64 * 1024;
4754 options.max_bytes_for_level_multiplier = 4;
4755 options.compression = kNoCompression;
4756
4757 DestroyAndReopen(options);
4758
4759 Random rnd(301);
4760 const int kNKeys = 5000;
4761 int keys[kNKeys];
4762 for (int i = 0; i < kNKeys; i++) {
4763 keys[i] = i;
4764 }
4765 std::random_shuffle(std::begin(keys), std::end(keys));
4766
4767 for (int i = 0; i < kNKeys; i++) {
4768 ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102)));
4769 }
4770
4771 dbfull()->TEST_WaitForCompact();
4772 for (int i = 0; i < kNKeys; i++) {
4773 ASSERT_NE("NOT_FOUND", Get(Key(i)));
4774 }
4775 }
4776
4777 INSTANTIATE_TEST_CASE_P(
4778 CompactionPriTest, CompactionPriTest,
4779 ::testing::Values(CompactionPri::kByCompensatedSize,
4780 CompactionPri::kOldestLargestSeqFirst,
4781 CompactionPri::kOldestSmallestSeqFirst,
4782 CompactionPri::kMinOverlappingRatio));
4783
4784 class NoopMergeOperator : public MergeOperator {
4785 public:
NoopMergeOperator()4786 NoopMergeOperator() {}
4787
FullMergeV2(const MergeOperationInput &,MergeOperationOutput * merge_out) const4788 bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
4789 MergeOperationOutput* merge_out) const override {
4790 std::string val("bar");
4791 merge_out->new_value = val;
4792 return true;
4793 }
4794
Name() const4795 const char* Name() const override { return "Noop"; }
4796 };
4797
TEST_F(DBCompactionTest,PartialManualCompaction)4798 TEST_F(DBCompactionTest, PartialManualCompaction) {
4799 Options opts = CurrentOptions();
4800 opts.num_levels = 3;
4801 opts.level0_file_num_compaction_trigger = 10;
4802 opts.compression = kNoCompression;
4803 opts.merge_operator.reset(new NoopMergeOperator());
4804 opts.target_file_size_base = 10240;
4805 DestroyAndReopen(opts);
4806
4807 Random rnd(301);
4808 for (auto i = 0; i < 8; ++i) {
4809 for (auto j = 0; j < 10; ++j) {
4810 Merge("foo", RandomString(&rnd, 1024));
4811 }
4812 Flush();
4813 }
4814
4815 MoveFilesToLevel(2);
4816
4817 std::string prop;
4818 EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
4819 uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
4820 ASSERT_OK(dbfull()->SetOptions(
4821 {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
4822
4823 CompactRangeOptions cro;
4824 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4825 dbfull()->CompactRange(cro, nullptr, nullptr);
4826 }
4827
TEST_F(DBCompactionTest,ManualCompactionFailsInReadOnlyMode)4828 TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
4829 // Regression test for bug where manual compaction hangs forever when the DB
4830 // is in read-only mode. Verify it now at least returns, despite failing.
4831 const int kNumL0Files = 4;
4832 std::unique_ptr<FaultInjectionTestEnv> mock_env(
4833 new FaultInjectionTestEnv(Env::Default()));
4834 Options opts = CurrentOptions();
4835 opts.disable_auto_compactions = true;
4836 opts.env = mock_env.get();
4837 DestroyAndReopen(opts);
4838
4839 Random rnd(301);
4840 for (int i = 0; i < kNumL0Files; ++i) {
4841 // Make sure files are overlapping in key-range to prevent trivial move.
4842 Put("key1", RandomString(&rnd, 1024));
4843 Put("key2", RandomString(&rnd, 1024));
4844 Flush();
4845 }
4846 ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
4847
4848 // Enter read-only mode by failing a write.
4849 mock_env->SetFilesystemActive(false);
4850 // Make sure this is outside `CompactRange`'s range so that it doesn't fail
4851 // early trying to flush memtable.
4852 ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
4853
4854 // In the bug scenario, the first manual compaction would fail and forget to
4855 // unregister itself, causing the second one to hang forever due to conflict
4856 // with a non-running compaction.
4857 CompactRangeOptions cro;
4858 cro.exclusive_manual_compaction = false;
4859 Slice begin_key("key1");
4860 Slice end_key("key2");
4861 ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4862 ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
4863
4864 // Close before mock_env destruct.
4865 Close();
4866 }
4867
4868 // ManualCompactionBottomLevelOptimization tests the bottom level manual
4869 // compaction optimization to skip recompacting files created by Ln-1 to Ln
4870 // compaction
TEST_F(DBCompactionTest,ManualCompactionBottomLevelOptimized)4871 TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
4872 Options opts = CurrentOptions();
4873 opts.num_levels = 3;
4874 opts.level0_file_num_compaction_trigger = 5;
4875 opts.compression = kNoCompression;
4876 opts.merge_operator.reset(new NoopMergeOperator());
4877 opts.target_file_size_base = 1024;
4878 opts.max_bytes_for_level_multiplier = 2;
4879 opts.disable_auto_compactions = true;
4880 DestroyAndReopen(opts);
4881 ColumnFamilyHandleImpl* cfh =
4882 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4883 ColumnFamilyData* cfd = cfh->cfd();
4884 InternalStats* internal_stats_ptr = cfd->internal_stats();
4885 ASSERT_NE(internal_stats_ptr, nullptr);
4886
4887 Random rnd(301);
4888 for (auto i = 0; i < 8; ++i) {
4889 for (auto j = 0; j < 10; ++j) {
4890 ASSERT_OK(
4891 Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4892 }
4893 Flush();
4894 }
4895
4896 MoveFilesToLevel(2);
4897
4898 for (auto i = 0; i < 8; ++i) {
4899 for (auto j = 0; j < 10; ++j) {
4900 ASSERT_OK(
4901 Put("bar" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4902 }
4903 Flush();
4904 }
4905 const std::vector<InternalStats::CompactionStats>& comp_stats =
4906 internal_stats_ptr->TEST_GetCompactionStats();
4907 int num = comp_stats[2].num_input_files_in_output_level;
4908 ASSERT_EQ(num, 0);
4909
4910 CompactRangeOptions cro;
4911 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
4912 dbfull()->CompactRange(cro, nullptr, nullptr);
4913
4914 const std::vector<InternalStats::CompactionStats>& comp_stats2 =
4915 internal_stats_ptr->TEST_GetCompactionStats();
4916 num = comp_stats2[2].num_input_files_in_output_level;
4917 ASSERT_EQ(num, 0);
4918 }
4919
TEST_F(DBCompactionTest,CompactionDuringShutdown)4920 TEST_F(DBCompactionTest, CompactionDuringShutdown) {
4921 Options opts = CurrentOptions();
4922 opts.level0_file_num_compaction_trigger = 2;
4923 opts.disable_auto_compactions = true;
4924 DestroyAndReopen(opts);
4925 ColumnFamilyHandleImpl* cfh =
4926 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
4927 ColumnFamilyData* cfd = cfh->cfd();
4928 InternalStats* internal_stats_ptr = cfd->internal_stats();
4929 ASSERT_NE(internal_stats_ptr, nullptr);
4930
4931 Random rnd(301);
4932 for (auto i = 0; i < 2; ++i) {
4933 for (auto j = 0; j < 10; ++j) {
4934 ASSERT_OK(
4935 Put("foo" + std::to_string(i * 10 + j), RandomString(&rnd, 1024)));
4936 }
4937 Flush();
4938 }
4939
4940 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
4941 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
4942 [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
4943 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
4944 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
4945 ASSERT_OK(dbfull()->error_handler_.GetBGError());
4946 }
4947
4948 // FixFileIngestionCompactionDeadlock tests and verifies that compaction and
4949 // file ingestion do not cause deadlock in the event of write stall triggered
4950 // by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam,FixFileIngestionCompactionDeadlock)4951 TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
4952 const int kNumKeysPerFile = 100;
4953 // Generate SST files.
4954 Options options = CurrentOptions();
4955
4956 // Generate an external SST file containing a single key, i.e. 99
4957 std::string sst_files_dir = dbname_ + "/sst_files/";
4958 test::DestroyDir(env_, sst_files_dir);
4959 ASSERT_OK(env_->CreateDir(sst_files_dir));
4960 SstFileWriter sst_writer(EnvOptions(), options);
4961 const std::string sst_file_path = sst_files_dir + "test.sst";
4962 ASSERT_OK(sst_writer.Open(sst_file_path));
4963 ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
4964 ASSERT_OK(sst_writer.Finish());
4965
4966 SyncPoint::GetInstance()->DisableProcessing();
4967 SyncPoint::GetInstance()->ClearAllCallBacks();
4968 SyncPoint::GetInstance()->LoadDependency({
4969 {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
4970 "BackgroundCallCompaction:0"},
4971 });
4972 SyncPoint::GetInstance()->EnableProcessing();
4973
4974 options.write_buffer_size = 110 << 10; // 110KB
4975 options.level0_file_num_compaction_trigger =
4976 options.level0_stop_writes_trigger;
4977 options.max_subcompactions = max_subcompactions_;
4978 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
4979 DestroyAndReopen(options);
4980 Random rnd(301);
4981
4982 // Generate level0_stop_writes_trigger L0 files to trigger write stop
4983 for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
4984 for (int j = 0; j != kNumKeysPerFile; ++j) {
4985 ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
4986 }
4987 if (0 == i) {
4988 // When we reach here, the memtables have kNumKeysPerFile keys. Note that
4989 // flush is not yet triggered. We need to write an extra key so that the
4990 // write path will call PreprocessWrite and flush the previous key-value
4991 // pairs to e flushed. After that, there will be the newest key in the
4992 // memtable, and a bunch of L0 files. Since there is already one key in
4993 // the memtable, then for i = 1, 2, ..., we do not have to write this
4994 // extra key to trigger flush.
4995 ASSERT_OK(Put("", ""));
4996 }
4997 dbfull()->TEST_WaitForFlushMemTable();
4998 ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
4999 }
5000 // When we reach this point, there will be level0_stop_writes_trigger L0
5001 // files and one extra key (99) in memory, which overlaps with the external
5002 // SST file. Write stall triggers, and can be cleared only after compaction
5003 // reduces the number of L0 files.
5004
5005 // Compaction will also be triggered since we have reached the threshold for
5006 // auto compaction. Note that compaction may begin after the following file
5007 // ingestion thread and waits for ingestion to finish.
5008
5009 // Thread to ingest file with overlapping key range with the current
5010 // memtable. Consequently ingestion will trigger a flush. The flush MUST
5011 // proceed without waiting for the write stall condition to clear, otherwise
5012 // deadlock can happen.
5013 port::Thread ingestion_thr([&]() {
5014 IngestExternalFileOptions ifo;
5015 Status s = db_->IngestExternalFile({sst_file_path}, ifo);
5016 ASSERT_OK(s);
5017 });
5018
5019 // More write to trigger write stop
5020 ingestion_thr.join();
5021 ASSERT_OK(dbfull()->TEST_WaitForCompact());
5022 Close();
5023 }
5024
TEST_F(DBCompactionTest,ConsistencyFailTest)5025 TEST_F(DBCompactionTest, ConsistencyFailTest) {
5026 Options options = CurrentOptions();
5027 DestroyAndReopen(options);
5028
5029 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5030 "VersionBuilder::CheckConsistency", [&](void* arg) {
5031 auto p =
5032 reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
5033 // just swap the two FileMetaData so that we hit error
5034 // in CheckConsistency funcion
5035 FileMetaData* temp = *(p->first);
5036 *(p->first) = *(p->second);
5037 *(p->second) = temp;
5038 });
5039
5040 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5041
5042 for (int k = 0; k < 2; ++k) {
5043 ASSERT_OK(Put("foo", "bar"));
5044 Flush();
5045 }
5046
5047 ASSERT_NOK(Put("foo", "bar"));
5048 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5049 }
5050
IngestOneKeyValue(DBImpl * db,const std::string & key,const std::string & value,const Options & options)5051 void IngestOneKeyValue(DBImpl* db, const std::string& key,
5052 const std::string& value, const Options& options) {
5053 ExternalSstFileInfo info;
5054 std::string f = test::PerThreadDBPath("sst_file" + key);
5055 EnvOptions env;
5056 ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
5057 auto s = writer.Open(f);
5058 ASSERT_OK(s);
5059 // ASSERT_OK(writer.Put(Key(), ""));
5060 ASSERT_OK(writer.Put(key, value));
5061
5062 ASSERT_OK(writer.Finish(&info));
5063 IngestExternalFileOptions ingest_opt;
5064
5065 ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
5066 }
5067
TEST_P(DBCompactionTestWithParam,FlushAfterIntraL0CompactionCheckConsistencyFail)5068 TEST_P(DBCompactionTestWithParam,
5069 FlushAfterIntraL0CompactionCheckConsistencyFail) {
5070 Options options = CurrentOptions();
5071 options.force_consistency_checks = true;
5072 options.compression = kNoCompression;
5073 options.level0_file_num_compaction_trigger = 5;
5074 options.max_background_compactions = 2;
5075 options.max_subcompactions = max_subcompactions_;
5076 DestroyAndReopen(options);
5077
5078 const size_t kValueSize = 1 << 20;
5079 Random rnd(301);
5080 std::atomic<int> pick_intra_l0_count(0);
5081 std::string value(RandomString(&rnd, kValueSize));
5082
5083 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5084 {{"DBCompactionTestWithParam::FlushAfterIntraL0:1",
5085 "CompactionJob::Run():Start"}});
5086 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5087 "FindIntraL0Compaction",
5088 [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5089
5090 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5091
5092 // prevents trivial move
5093 for (int i = 0; i < 10; ++i) {
5094 ASSERT_OK(Put(Key(i), "")); // prevents trivial move
5095 }
5096 ASSERT_OK(Flush());
5097 Compact("", Key(99));
5098 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5099
5100 // Flush 5 L0 sst.
5101 for (int i = 0; i < 5; ++i) {
5102 ASSERT_OK(Put(Key(i + 1), value));
5103 ASSERT_OK(Flush());
5104 }
5105 ASSERT_EQ(5, NumTableFilesAtLevel(0));
5106
5107 // Put one key, to make smallest log sequence number in this memtable is less
5108 // than sst which would be ingested in next step.
5109 ASSERT_OK(Put(Key(0), "a"));
5110
5111 ASSERT_EQ(5, NumTableFilesAtLevel(0));
5112
5113 // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
5114 for (int i = 5; i < 10; i++) {
5115 IngestOneKeyValue(dbfull(), Key(i), value, options);
5116 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
5117 }
5118
5119 TEST_SYNC_POINT("DBCompactionTestWithParam::FlushAfterIntraL0:1");
5120 // Put one key, to make biggest log sequence number in this memtable is bigger
5121 // than sst which would be ingested in next step.
5122 ASSERT_OK(Put(Key(2), "b"));
5123 ASSERT_EQ(10, NumTableFilesAtLevel(0));
5124 dbfull()->TEST_WaitForCompact();
5125 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5126 std::vector<std::vector<FileMetaData>> level_to_files;
5127 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
5128 &level_to_files);
5129 ASSERT_GT(level_to_files[0].size(), 0);
5130 ASSERT_GT(pick_intra_l0_count.load(), 0);
5131
5132 ASSERT_OK(Flush());
5133 }
5134
TEST_P(DBCompactionTestWithParam,IntraL0CompactionAfterFlushCheckConsistencyFail)5135 TEST_P(DBCompactionTestWithParam,
5136 IntraL0CompactionAfterFlushCheckConsistencyFail) {
5137 Options options = CurrentOptions();
5138 options.force_consistency_checks = true;
5139 options.compression = kNoCompression;
5140 options.level0_file_num_compaction_trigger = 5;
5141 options.max_background_compactions = 2;
5142 options.max_subcompactions = max_subcompactions_;
5143 options.write_buffer_size = 2 << 20;
5144 options.max_write_buffer_number = 6;
5145 DestroyAndReopen(options);
5146
5147 const size_t kValueSize = 1 << 20;
5148 Random rnd(301);
5149 std::string value(RandomString(&rnd, kValueSize));
5150 std::string value2(RandomString(&rnd, kValueSize));
5151 std::string bigvalue = value + value;
5152
5153 // prevents trivial move
5154 for (int i = 0; i < 10; ++i) {
5155 ASSERT_OK(Put(Key(i), "")); // prevents trivial move
5156 }
5157 ASSERT_OK(Flush());
5158 Compact("", Key(99));
5159 ASSERT_EQ(0, NumTableFilesAtLevel(0));
5160
5161 std::atomic<int> pick_intra_l0_count(0);
5162 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
5163 {{"DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1",
5164 "CompactionJob::Run():Start"}});
5165 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
5166 "FindIntraL0Compaction",
5167 [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
5168 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
5169 // Make 6 L0 sst.
5170 for (int i = 0; i < 6; ++i) {
5171 if (i % 2 == 0) {
5172 IngestOneKeyValue(dbfull(), Key(i), value, options);
5173 } else {
5174 ASSERT_OK(Put(Key(i), value));
5175 ASSERT_OK(Flush());
5176 }
5177 }
5178
5179 ASSERT_EQ(6, NumTableFilesAtLevel(0));
5180
5181 // Stop run flush job
5182 env_->SetBackgroundThreads(1, Env::HIGH);
5183 test::SleepingBackgroundTask sleeping_tasks;
5184 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
5185 Env::Priority::HIGH);
5186 sleeping_tasks.WaitUntilSleeping();
5187
5188 // Put many keys to make memtable request to flush
5189 for (int i = 0; i < 6; ++i) {
5190 ASSERT_OK(Put(Key(i), bigvalue));
5191 }
5192
5193 ASSERT_EQ(6, NumTableFilesAtLevel(0));
5194 // ingest file to trigger IntraL0Compaction
5195 for (int i = 6; i < 10; ++i) {
5196 ASSERT_EQ(i, NumTableFilesAtLevel(0));
5197 IngestOneKeyValue(dbfull(), Key(i), value2, options);
5198 }
5199 ASSERT_EQ(10, NumTableFilesAtLevel(0));
5200
5201 // Wake up flush job
5202 sleeping_tasks.WakeUp();
5203 sleeping_tasks.WaitUntilDone();
5204 TEST_SYNC_POINT("DBCompactionTestWithParam::IntraL0CompactionAfterFlush:1");
5205 dbfull()->TEST_WaitForCompact();
5206 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
5207
5208 uint64_t error_count = 0;
5209 db_->GetIntProperty("rocksdb.background-errors", &error_count);
5210 ASSERT_EQ(error_count, 0);
5211 ASSERT_GT(pick_intra_l0_count.load(), 0);
5212 for (int i = 0; i < 6; ++i) {
5213 ASSERT_EQ(bigvalue, Get(Key(i)));
5214 }
5215 for (int i = 6; i < 10; ++i) {
5216 ASSERT_EQ(value2, Get(Key(i)));
5217 }
5218 }
5219
TEST_F(DBCompactionTest,FifoCompactionGetFileCreationTime)5220 TEST_F(DBCompactionTest, FifoCompactionGetFileCreationTime) {
5221 MockEnv mock_env(env_);
5222 do {
5223 Options options = CurrentOptions();
5224 options.table_factory.reset(new BlockBasedTableFactory());
5225 options.env = &mock_env;
5226 options.ttl = static_cast<uint64_t>(24) * 3600;
5227 options.compaction_style = kCompactionStyleFIFO;
5228 constexpr size_t kNumFiles = 24;
5229 options.max_open_files = 20;
5230 constexpr size_t kNumKeysPerFile = 10;
5231 DestroyAndReopen(options);
5232 for (size_t i = 0; i < kNumFiles; ++i) {
5233 for (size_t j = 0; j < kNumKeysPerFile; ++j) {
5234 ASSERT_OK(Put(std::to_string(j), "value_" + std::to_string(i)));
5235 }
5236 ASSERT_OK(Flush());
5237 }
5238 mock_env.FakeSleepForMicroseconds(
5239 static_cast<uint64_t>(1000 * 1000 * (1 + options.ttl)));
5240 ASSERT_OK(Put("foo", "value"));
5241 ASSERT_OK(Flush());
5242 } while (ChangeOptions());
5243 }
5244
5245 #endif // !defined(ROCKSDB_LITE)
5246 } // namespace ROCKSDB_NAMESPACE
5247
main(int argc,char ** argv)5248 int main(int argc, char** argv) {
5249 #if !defined(ROCKSDB_LITE)
5250 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
5251 ::testing::InitGoogleTest(&argc, argv);
5252 return RUN_ALL_TESTS();
5253 #else
5254 (void) argc;
5255 (void) argv;
5256 return 0;
5257 #endif
5258 }
5259